Hadoop小集群(5结点)测试

1、Map/Reduce任务
输入:
文件格式
id value
其中id是1~100之间的随机整数,value为1~100之间的随机浮点数。
输出:
每个id的最大value

生成这类文件,可以用python搞定,见本文末尾的附录。

2、Map/Reduce程序
这里就直接使用新(0.20.2)的API了,即org.apache.hadoop.mapreduce.*下的接口。
特别注意:
job.setNumReduceTasks(5)
指定了本Job的Reduce个数,默认为1的。

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MaxNumber {

	static class MaxNumberMapper extends
			Mapper<LongWritable, Text, LongWritable, DoubleWritable> {

		public void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			String line = value.toString().trim();
			String tmp[] = line.split("\t");
			// Parse id and value
			long id = -1;
			double val = -1;
			try {
				id = Long.parseLong(tmp[0]);
				val = Double.parseDouble(tmp[1]);
			} catch (Exception e) {
				// Do nothing
			}
			// Collect
			if (id != -1 && val != -1) {
				context.write(new LongWritable(id), new DoubleWritable(val));
			}
		}

	}

	static class MaxNumberReducer extends
			Reducer<LongWritable, DoubleWritable, LongWritable, DoubleWritable> {

		public void reduce(LongWritable key, Iterable<DoubleWritable> values,
				Context context) throws IOException, InterruptedException {
			// Traverse an key's all value and get the max
			double max = Double.MIN_VALUE;
			for (DoubleWritable val : values) {
				max = Math.max(max, val.get());
			}
			// Collect result
			context.write(key, new DoubleWritable(max));
		}

	}
//<a href="http://www.coder4.com/archives/2021">Hadoop小集群(5结点)测试</a>
	public static void main(String[] args) throws Exception {
		if (args.length != 2) {
			System.err.println("Usage:");
			System.err.println("MaxNumber <input_path> <output path>");
			System.exit(-1);
		}
		// Job Basic
		Job job = new Job();
		job.setJobName("Max Number Map/Reduce");
		job.setJarByClass(MaxNumber.class);
		// Job Set input && output
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		// Job Class
		job.setMapperClass(MaxNumberMapper.class);
		job.setReducerClass(MaxNumberReducer.class);
		// Combine Class is useful in our case
		// job.setCombinerClass(MaxNumberReducer.class);
		// Set OutputClass
		job.setOutputKeyClass(LongWritable.class);
		job.setOutputValueClass(DoubleWritable.class);

		// Set map/reduce numbers
		job.setNumReduceTasks(5);

		// Return Ret Code to shell
		System.exit(job.waitForCompletion(true) ? 0 : 1);

	}
}

3、集群配置

除了在Hadoop集群配置之外,还要额外注意一下几点:

结点部署
5个,hadoop1~hadoop5,内网映射/etc/hosts,如下:
需要特别注意的是:hosts映射一定要和hostname一致!!
比如这里我就把外网的hostname 50*注视掉了。

127.0.0.1     localhost localhost.localdomain
#50.57.48.244     hadoop1

#For Hadoop Nodes
10.182.169.24   hadoop1
10.182.169.29   hadoop2
10.182.169.30   hadoop3
10.182.169.31   hadoop4
10.182.169.32   hadoop5

配置程序
(1) core-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
  <name>fs.default.name</name>
  <value>hdfs://hadoop1:54310</value>
</property>
<property>
  <name>hadoop.tmp.dir</name>
  <value>/home/hadoop/hadoop_home/var</value>
</property>
</configuration>

(2) mapred-site.xml
这里mapred.tasktracker.map.tasks.maximum和mapred.tasktracker.reduce.tasks.maximum
控制了每个node上能运行最多的map和reduce任务。
map一般是I/O密集型任务,可以设置为CPU核数。
reduce一般是CPU密集型任务,设置为1~2比较合理。

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<!-- Put site-specific property overrides in this file. -->
<configuration>
<property>
  <name>mapred.job.tracker</name>
  <value>hadoop1:54311</value>
</property>
<property>
  <name>mapred.local.dir</name>
  <value>/home/hadoop/hadoop_home/var</value>
</property>
<property>
  <name>mapred.tasktracker.map.tasks.maximum</name>
  <value>4</value>
</property>
<property>
  <name>mapred.tasktracker.reduce.tasks.maximum</name>
  <value>1</value>
</property>
</configuration>

转载请注明:Hadoop小集群(5结点)测试

(3) hdfs-site.xml
注意:我这里的replication是3份,因为5个结点都有datanode。

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
  <name>dfs.replication</name>
  <value>3</value>
</property>
</configuration>

4、运行结果
50个测试文件,一个60MB左右。

(1) max map = 4, max reduce = 2, job.setNumReduceTasks(1)
map能跑到同时20个(5 * 4)
reduce只有1个
时间:7mins, 4sec

(2) max map = 4, max reduce = 2, job.setNumReduceTasks(10)
map能跑到同时20个(5 * 4)
reduce并行10个
时间:3mins, 58sec

(3) max map = 4, max reduce = 1, job.setNumReduceTasks(5)
map能跑到同时20个(5 * 4)
reduce并行5个
时间:4mins, 4sec

(4) max map = 4, max reduce = 1, job.setNumReduceTasks(5),加上Combination特性
map能跑到同时20个(5 * 4)
reduce并行5个, 且拷贝非常快
1mins, 49sec
当然了,不是所有Map/Reduce任务都能满足Combination特性。

(5) 单机用Python脚本跑 (见附录2)
时间:11mins, 5ses

小结:
5个机器的Hadoop集群运行效率大概是单机Python的2.8倍,加上Combine后能到6倍左右。
Hadoop性能不够理想是因为我没有对性能调优,比如内存、压缩存储、传输等。

附录1:生成输入测试数据用的Python程序

import random

if __name__ == "__main__":

    path = "/home/hadoop/hadoop_home/input"

    files = 10
    rows = 10000000

    for i in xrange(files):
        fp = open(path+"/"+str(i),"w")
        for j in xrange(rows):
            print >>fp,"%d\t%f" % (random.randint(1,100),random.random()*100)
        fp.close()

附录2:单机跑的程序

import os,time

if __name__ == "__main__":
    #<a href="http://www.coder4.com/archives/2021">Hadoop小集群(5结点)测试</a>
    start = int(time.time())
    path = r"/home/hadoop/hadoop_home/input"
    dict  = {}
    for root,dirs,files in os.walk(path):
        for f in files:
            file = root + "/" + f
            for line in open(file,"r").readlines():
                id,val = line.split("\t")
                id = str(id)
                val = float(val)
                dict.setdefault(id,-1)
                dict[id] = max(val,dict[id])

    for k,v in dict.items():
        print k,v
    end = int(time.time())
    print str(end-start),"(s)"

Leave a Reply

Your email address will not be published.