[转载]PageRank in MapReduce

转载自一个台湾朋友写的PageRank in MapReduce

其实我一直没想到PR怎么算是因为...不知道怎么多次迭代,原来是用反复RunJob来实现的。

前言

有一陣子沒有寫MapReduce程式了,所以找個代表性的實例來練習一下...

PageRank in MapReduce

PageRank演算法最早是由Google兩位創辦人Sergey Brin & Larry Page在1998年的時候發表在World-Wide Web Conference的一篇論文:The Anatomy of a Large-Scale Hypertextual Web Search Engine所提出來的,該演算法主要用來計算網頁的重要性,以決定搜尋引擎該呈現搜尋結果時的一個排名依據,然而根據Google在1998年當時所索引的網頁數量來看,他們共索引了26 million pages(We knew the web was big...),所以可能三、四台機器就足以運算完成,但是到2000年時Google就索引了超過one billion pages,而這樣的規模就適合用MapReduce來分散式處理了,而本文主要介紹該如何用MapReduce的方式來完成這樣的演算法,然而重點在於 PageRank是一種反覆式演算法(Iterative Algorithm),所以該如何應用在MapReduce並決定何時該跳離這個反覆式迴圈以結束運算就需要一些方式來處理。

P.S. 本範例純粹使用「純文字型態」來處理,如果你有效率的考量請試著改寫特定的OutputFormat和Writable實作。

Google PageRank 範例

這裡的範例假設全世界只有四個網頁,它們分別為:Adobe, Google, MSN and Yahoo,每個網頁的PageRank值(簡稱PR值)預設為10。

1. Adobe有三個對外連結,分別連到Google, MSN and Yahoo。

2. Google只有一個對外連結為Adobe。

3. MSN有一個對外連結為Google。

4. Yahoo則有兩個對外連結為MSN and Google。

Adobe 10.00 Google,MSN,Yahoo
Google 10.00 Adobe
MSN 10.00 Google
Yahoo 10.00 MSN,Google

所以從這個範例來看,由於有三個網頁都連結到Google,所以相對來說它的PR值應該是最高的,其次應則為Adobe,因為Google的分數最高且又只連結到Adobe,所以Adobe的PR值也會比較高。

PageRank - MapReduce for Hadoop 0.21.x

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class PageRank
{
	static enum PageCount{
		Count,TotalPR
	}
	public static class PageRankMapper extends Mapper<Object, Text, Text, Text>
	{
		public void map(Object key, Text value, Context context) throws IOException, InterruptedException
		{
			context.getCounter(PageCount.Count).increment(1);
			String[] kv = value.toString().split("\t");
			String _key = kv[0];
			String _value = kv[1];
			String _PRnLink[] = _value.split(" ");
			String pr = _PRnLink[0];
			String link = _PRnLink[1];
			context.write(new Text(_key), new Text(link));

			String site[] = link.split(",");
			float score = Float.valueOf(pr)/(site.length)*1.0f;
			for(int i = 0 ; i < site.length ; i++)
			{
				context.write(new Text(site[i]), new Text(String.valueOf(score)));
			}	

		}
	}

	public static class PageRankReducer extends Reducer<Text, Text, Text, Text>
	{
		public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException
		{
			StringBuilder sb = new StringBuilder();
			float factor = 0.85f;
			float pr = 0f;
			for(Text f : values)
			{
				String value = f.toString();
				int s = value.indexOf(".");
				if(s != -1)
				{
					pr += Float.valueOf(value);
				}else{
					String site[] = value.split(",");
					int _len = site.length;
					for(int k = 0 ; k < _len ;k++)
					{
						sb.append(site[k]);
						sb.append(",");
					}
				}
			}

			pr = ((1-factor)+(factor*(pr)));
			context.getCounter(PageCount.TotalPR).increment((int)(pr*1000));
			String output = pr+" "+sb.toString();
			context.write(key, new Text(output));
		}
	}

	public static void main(String[] args) throws Exception
	{
		String input;
		String output;
		int threshold = 1000;
		int iteration = 0;
		int iterationLimit = 100;
		boolean status = false;

		while(iteration < iterationLimit)
		{
			if((iteration % 2) == 0)
			{
				input = "/pagerank_output/p*";
				output = "/pagerank_output2/";
			}else{
				input = "/pagerank_output2/p*";
				output = "/pagerank_output/";
			}

			Configuration conf = new Configuration();	

			FileSystem fs = FileSystem.get(conf);
			fs.delete(new Path(output), true);

			Job job = Job.getInstance(new Cluster(conf));
			job.setJobName("PageRank");
			job.setJarByClass(PageRank.class);
			job.setMapperClass(PageRankMapper.class);
			job.setReducerClass(PageRankReducer.class);
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(Text.class);
			TextInputFormat.addInputPath(job, new Path(input));
			TextOutputFormat.setOutputPath(job, new Path(output));
			status = job.waitForCompletion(true);
			iteration++;

			long count = job.getCounters().findCounter(PageCount.Count).getValue();
			long total_pr = job.getCounters().findCounter(PageCount.TotalPR).getValue();
			System.out.println("PageCount:"+count);
			System.out.println("TotalPR:"+total_pr);
			double per_pr = total_pr/(count*1.0d);
			System.out.println("Per PR:"+per_pr);
			if((int)per_pr == threshold)
			{
				System.out.println("Iteration:"+iteration);
				break;
			}
		}
		System.exit(status?0:1);
	}
}

關於上述程式所執行Map和Reduce所處理的過程及輸出結果就不詳加敘述了,留待有興趣的朋友們自行研究~

而關於如何決定跳離反覆式迴圈以結束運算的處理方式,筆者採用下述兩種方式:

1. 最多執行100次的反覆式運算,讓程式有一定的執行次數限制。

2. 分別累加頁面數量和每個網頁的PR值,並觀察其變化量呈現穩定狀態時就離開迴圈,上述範例求到小數第三位。

透過上述的處理方式,可以觀察到在執行第54次MapReduce運算時所呈現出來的結果:

Adobe 1.3334262 Google,MSN,Yahoo,
Google 1.39192 Adobe,
MSN 0.7523096 Google,
Yahoo 0.5279022 MSN,Google,

結果如預期的,Google的PR值最高,其次為Adobe,最後才是MSN和Yahoo。

P.S. 筆者沒有討厭Yahoo也沒有特別喜歡Google,純粹實驗性質... Orz (我比較愛Adobe)

相關資源

Jimmy Lin and Michael Schatz. Design Patterns for Efficient Graph Algorithms in MapReduce. Proceedings of the 2010 Workshop on Mining and Learning with Graphs Workshop (MLG-2010), July 2010, Washington, D.C.

Leave a Reply

Your email address will not be published. Required fields are marked *