Hadoop The Definitive Guide 2nd读书笔记 – 第六章

这张围绕着Map/Reduce的流程和一些优化。

1、Map/Reduce任务执行涉及到的四方:
client:运行JobClient的进程。
JobTracker:分配、调度任务。
TaskTracker:具体运行任务的虚拟机。
HDFS:分布式文件系统。

2、JobClient执行runJob()后,实际是submitJob(),然后进入执行流程

(1)run job
(2)向JobTracker申请JobID
(3)拷贝所需的JAR包、配置文件等到HDFS上
(4)sumbitJob()
(5)JobTracker上初始化
(6)读取Input分片,调用TaskTracker
(7)TaskTracker定时心跳神马的
(8)TaskTracker决定执行具体的map/reduce任务。

3、Map任务个数由Input分片决定,Reduce任务个数由mapred.reduce.tasks或者setNumReduceTasks()决定。

4、如何选择Map/Reduce任务?
默认调度策略:Map优先于Reduce。
Map有DataLocal问题,Reduce无(因为要从所有Map-Result中收集结果)

5、通过心跳、SetStatus等识别进度(Progress),当进度10分钟无动静时,认为已死。

6、任务成功的同时,可设置出发HTTP页面,配置:job.end.notification.url

7、任务出错:
(1)Task任务执行出错,异常导致JVM退出等。10分钟无进度的被当作Haning Task,也视为出错。
有时,即使部分成功也有用,可通过mapred.max.map.failures.percent和mapred.max.reduce.failures.percent进行设置。

(2)TaskTracker出错,当超过mapred.tasktracker.expiry.interval没有心跳,视为死亡。

(3)JobTracker出错,目前错了就挂了,单点故障。

8、任务调度
默认调度:FIFO,先来的Job先干,也支持setJobPriority()设置优先,但是非抢占的。
公平调度:目标使每个用户占用同样多的map/reduce slot资源,可能抢占
Capacity Scheduler:公平调度+多个队列

9、Shuffle:map的结果(给reduce的输入)是经过sort后传输给reduce,这就是shuffle。一般来说,Shuffle是最可能进行优化的地方!

10、Shuffle过程

(1)Map端(输出、分发accept)
每输出io.sort.mb(默认100MB)的超过io.sort.spill.percent(默认80%),就会以一个spill写入到磁盘。
由于分片了,所以最终需要合并,每轮最多合并io.sort.factor个spill。
如果指定了Combine,则最少min.num.spills.for.combine(默认3)会保证执行一次Combine函数。
Map结果最好进行压缩,mapred.compress.map.output->True并设置mapred.map.output.compression.codec。
输出是放在Map的Local磁盘上!因此需要分发到Reduce上。
Map的输出需要分发到reduce上,HTTP协议,线程个数为tasktracker.http.threads(默认40)

(2)Reduce端的Shuffle
首要任务就是把每个map的output拷贝到reduce端,由于map完成时间不一,因此一有map完成,就拷贝一个。并发拷贝共mapred.reduce.parallel.copies(默认5)个线程。
关于谁先完成,并通知Reduce端这事,是由JobTracker完成的。
收集所有的map阶段结果后,reduce也要sort和merge,每轮io.sort.factor(默认10)个片段,比如有50个map结果,那么要进行5轮的sort/map。

(3)Reduce
最后的Reduce阶段,对每个key及其values,调用一次reduce函数。并把output直接写在HDFS上。

11、优化原则

(1)尽可能多的给Shuffle阶段内存(同时注意减少map/reduce函数的内存消耗)。在map端,最好只有一个spill,即io.sort.mb * io.sort.spill.percent > 你map的输出
(2)调整启动map/reduce的Child JVM的内存,mapred.child.java.opts。
(3)Reduce端,当数据可全部放入内存时,性能最好。当然一般是不可能的。书上说mapred.inmem.merge.threshold到0mapred.job.reduce.input.buffer.percent到1.0,但我觉得这是找OutofMemory呢,除非你Map、Combine后数据小的可怜。

12、优化选项列表

io.sort.mb: map输出的缓存,默认100(mb),可适当调大。
io.sort.spill.percent : 上面io.sort.mb到大多少时,写一个spill到磁盘。
io.sort.factor : map输出排序时同时合并多少个spill,默认10,可以调高甚至100。
min.num.spills.for.combine : 最少多少spill会执行combine(默认3),如果很希望,可以调小。
mapred.compress.map.output(默认false) 和 mapred.map.output.compression.codec(默认org.apache.hadoop.io.compress.DefaultCodec)。
tasktracker.http.threads : map端开多少个线程供用于传送output结果。
mapred.reduce.parallel.copies : 并发多少个线程从map拷贝output(默认5)
mapred.reduce.copy.backoff : 拷贝一个map结果的超时(默认300秒)
io.sort.factor : 多少个spill一起合并,默认10,与map共享一个变量参数。
还有一些参数看不太懂。。。

13、根据木桶效应,显然任务分解后,速度将受限于执行最慢的那个map/reduce,因此Hadoop使用speculative execution(推测执行)来判断一个任务是不是执行“太”慢了,然后会启动一个额外副本,看看能不能加速执行。
开关1:mapred.map.tasks.speculative.execution 默认true
开关2:mapred.reduce.tasks.speculative.execution 默认true
如果你很信任你得集群和程序,其实这是可以关闭的,毕竟一旦重复执行,就会影响性能。

14、JVM是可重用的,避免了反复启、停JVM带来的时间消耗。同一个Job的不同Task可以在同一JVM中执行,通过mapred.job.reuse.jvm.num.tasks或者JobConf.setNumTasksToExecutePerJvm()设置(默认为1,如果设置为-1,则所有的task都在一个JVM中执行)。即最多多少个task可以在同一个JVM中执行。不同Job的Task肯定会在不同的JVM中执行。

15、Hadoop提供了skipping mode用于跳过潜在的“有Bug的数据”。

16、Hadoop将实验相关的参数做为环境变量,可以通过os.environ读取。
例如mapred.job.id job_200811201130_0004
os.environ["mapred.job.id"]

17、除了OutputCollector外,也可以直接将Output写入到文件中。但需要注意的是,一定注意写锁问题,不要多个task同时写一个文件,Hadoop的解决方案是不同的目录!${mapred.output.dir}/_temporary/${mapred.task.id},肯定是岔开的。成功后再拷贝到${mapred.output.dir}下。
通过getWorkOutputPath()调用来获得对应的文件夹。

Leave a Reply

Your email address will not be published. Required fields are marked *