1、Map/Reduce任务
输入:
文件格式
id value
其中id是1~100之间的随机整数,value为1~100之间的随机浮点数。
输出:
每个id的最大value
生成这类文件,可以用python搞定,见本文末尾的附录。
2、Map/Reduce程序
这里就直接使用新(0.20.2)的API了,即org.apache.hadoop.mapreduce.*下的接口。
特别注意:
job.setNumReduceTasks(5)
指定了本Job的Reduce个数,默认为1的。
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MaxNumber {
static class MaxNumberMapper extends
Mapper<LongWritable, Text, LongWritable, DoubleWritable> {
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString().trim();
String tmp[] = line.split("\t");
// Parse id and value
long id = -1;
double val = -1;
try {
id = Long.parseLong(tmp[0]);
val = Double.parseDouble(tmp[1]);
} catch (Exception e) {
// Do nothing
}
// Collect
if (id != -1 && val != -1) {
context.write(new LongWritable(id), new DoubleWritable(val));
}
}
}
static class MaxNumberReducer extends
Reducer<LongWritable, DoubleWritable, LongWritable, DoubleWritable> {
public void reduce(LongWritable key, Iterable<DoubleWritable> values,
Context context) throws IOException, InterruptedException {
// Traverse an key's all value and get the max
double max = Double.MIN_VALUE;
for (DoubleWritable val : values) {
max = Math.max(max, val.get());
}
// Collect result
context.write(key, new DoubleWritable(max));
}
}
//<a href="http://www.coder4.com/archives/2021">Hadoop小集群(5结点)测试</a>
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage:");
System.err.println("MaxNumber <input_path> <output path>");
System.exit(-1);
}
// Job Basic
Job job = new Job();
job.setJobName("Max Number Map/Reduce");
job.setJarByClass(MaxNumber.class);
// Job Set input && output
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// Job Class
job.setMapperClass(MaxNumberMapper.class);
job.setReducerClass(MaxNumberReducer.class);
// Combine Class is useful in our case
// job.setCombinerClass(MaxNumberReducer.class);
// Set OutputClass
job.setOutputKeyClass(LongWritable.class);
job.setOutputValueClass(DoubleWritable.class);
// Set map/reduce numbers
job.setNumReduceTasks(5);
// Return Ret Code to shell
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
3、集群配置
除了在Hadoop集群配置之外,还要额外注意一下几点:
结点部署
5个,hadoop1~hadoop5,内网映射/etc/hosts,如下:
需要特别注意的是:hosts映射一定要和hostname一致!!
比如这里我就把外网的hostname 50*注视掉了。
127.0.0.1 localhost localhost.localdomain #50.57.48.244 hadoop1 #For Hadoop Nodes 10.182.169.24 hadoop1 10.182.169.29 hadoop2 10.182.169.30 hadoop3 10.182.169.31 hadoop4 10.182.169.32 hadoop5
配置程序
(1) core-site.xml
<?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>fs.default.name</name> <value>hdfs://hadoop1:54310</value> </property> <property> <name>hadoop.tmp.dir</name> <value>/home/hadoop/hadoop_home/var</value> </property> </configuration>
(2) mapred-site.xml
这里mapred.tasktracker.map.tasks.maximum和mapred.tasktracker.reduce.tasks.maximum
控制了每个node上能运行最多的map和reduce任务。
map一般是I/O密集型任务,可以设置为CPU核数。
reduce一般是CPU密集型任务,设置为1~2比较合理。
<?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <!-- Put site-specific property overrides in this file. --> <configuration> <property> <name>mapred.job.tracker</name> <value>hadoop1:54311</value> </property> <property> <name>mapred.local.dir</name> <value>/home/hadoop/hadoop_home/var</value> </property> <property> <name>mapred.tasktracker.map.tasks.maximum</name> <value>4</value> </property> <property> <name>mapred.tasktracker.reduce.tasks.maximum</name> <value>1</value> </property> </configuration>
转载请注明:Hadoop小集群(5结点)测试
(3) hdfs-site.xml
注意:我这里的replication是3份,因为5个结点都有datanode。
<?xml version="1.0"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>dfs.replication</name> <value>3</value> </property> </configuration>
4、运行结果
50个测试文件,一个60MB左右。
(1) max map = 4, max reduce = 2, job.setNumReduceTasks(1)
map能跑到同时20个(5 * 4)
reduce只有1个
时间:7mins, 4sec
(2) max map = 4, max reduce = 2, job.setNumReduceTasks(10)
map能跑到同时20个(5 * 4)
reduce并行10个
时间:3mins, 58sec
(3) max map = 4, max reduce = 1, job.setNumReduceTasks(5)
map能跑到同时20个(5 * 4)
reduce并行5个
时间:4mins, 4sec
(4) max map = 4, max reduce = 1, job.setNumReduceTasks(5),加上Combination特性
map能跑到同时20个(5 * 4)
reduce并行5个, 且拷贝非常快
1mins, 49sec
当然了,不是所有Map/Reduce任务都能满足Combination特性。
(5) 单机用Python脚本跑 (见附录2)
时间:11mins, 5ses
小结:
5个机器的Hadoop集群运行效率大概是单机Python的2.8倍,加上Combine后能到6倍左右。
Hadoop性能不够理想是因为我没有对性能调优,比如内存、压缩存储、传输等。
附录1:生成输入测试数据用的Python程序
import random
if __name__ == "__main__":
path = "/home/hadoop/hadoop_home/input"
files = 10
rows = 10000000
for i in xrange(files):
fp = open(path+"/"+str(i),"w")
for j in xrange(rows):
print >>fp,"%d\t%f" % (random.randint(1,100),random.random()*100)
fp.close()
附录2:单机跑的程序
import os,time
if __name__ == "__main__":
#<a href="http://www.coder4.com/archives/2021">Hadoop小集群(5结点)测试</a>
start = int(time.time())
path = r"/home/hadoop/hadoop_home/input"
dict = {}
for root,dirs,files in os.walk(path):
for f in files:
file = root + "/" + f
for line in open(file,"r").readlines():
id,val = line.split("\t")
id = str(id)
val = float(val)
dict.setdefault(id,-1)
dict[id] = max(val,dict[id])
for k,v in dict.items():
print k,v
end = int(time.time())
print str(end-start),"(s)"