MapReduce工作机制(三、Reduce端流程)
文章从《Hadoop权威指南》以及《Hadoop技术内幕:深入解析MapReduce架构设计与实现原理》中总结而来。
四种Reduce Task:
- Job-setup Task:作业运行时启动的第一个任务
- Job-cleanup Task:作业运行时启动的最后一个任务
- Task-cleanup Task:任务失败或是被杀死后用于清理已写入临时目录中数据的任务
- Reduce Task: 从各个Map Task上读取一片数据,经排序后以组为单位交给用用户编写的reduce()函数处理,并将结果写到HDFS上。
- shuffle阶段
-
Reduce Task从各个Map Task上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写在磁盘上,否则放在内存中。
- Merge阶段
-
在远程拷贝数据的同时,Reduce Task启动了两个后台线程对内存和磁盘上的文件进行合并,防止内存使用过多或磁盘上文件过多
- Sort阶段
-
将key相同的数据聚集在一起,由于各个Map Task已经实现了对自己处理结果的局部排序,Reduce Task只需要对所有数据进行一次归并排序即可
- Reduce阶段
-
将每组数据一次交给用户编写的reduce()函数处理
- Write阶段
-
将reduce()结果写到HDFS上
Shuffle & Merge
- 并行执行
- 远程拷贝数据量达到阈值,触发相应合并线程进行合并
- 均由ReduceCopier实现
- 1. 准备运行完成的Map Task列表
-
GetMapEventsThread线程周期性通过RPC从TaskTracker获取已完成Map Task列表mapLocation(保存了TaskTrackerHost与已完成任务列表的映射关系)中。
-
网络热点(大量进程集中读取某个TaskTracker上的数据)
- Reduce Task将所有的TaskTracker Host进行混洗操作,然后保存到scheduledCopies列表中
- MapOutputCopier将从scheduledCopies列表中获取待拷贝的Map Task输出数据位置。
- 曾经拷贝失败的Map Task将优先被拷贝。
- 2. 远程拷贝数据
-
Reduce Task同时启动
mapred.reduce.parallel.copies = 5
个数据拷贝线程MapOutputCopier - MapOutputCopier从scheduledCopies列表中获取Map Task的输出数据
- 利用HTTP Get从对应的TaskTracker拷贝数据
-
数据量过大则暂时写到磁盘上(工作目录)
mapOutputsFilesOnDisk
,否则可以直接放在内存中mapOutputsFilesInMemory
,均带有一个MapOutput元数据对象- 过大的数据量
- 可用于缓存数据分片的内存大小占JVM Max Heap Size的
mapred.job.shuffle.input.buffer.percent = 0.70
- 内存管理类
ShuffleRamManager
- 一个数据分片不能超过内存0.25倍
- ShuffleRamManager
- 数据分片先申请内存,经过ShuffleRAMManager批准后获得内存,才能拷贝数据
- 拷贝时,如果错误次数超过
abortFailureLimit
则杀死该Reduce Task - 如果错误次数超过
maxFetchFailureBeforeReporting
值为mapreduce.reduce.shuffle.maxfetchfailures = 10
则对分片进行检查 - 若均不超过,则使用延迟时间delayTime对数据进行拷贝
3. 合并内存文件和磁盘文件
- 磁盘文件合并
- LocalFSMerger负责进行磁盘文件合并
- 文件数据超过
2 * (io.sort.factor = 10) - 1
时,线程将会从mapOutputsFilesOnDisk
取出最小的ioSortFactor
个文件进行合并后重新写入 - 内存文件合并,转移至磁盘
- InMemFSMergerThread负责进行内存文件合并转移
- 若全部数据拷贝完毕,则关闭ShuffleRamManager,将内存文件转移至磁盘
- 若已使用的内存超过可用内存的
mapred.job.shuffle.merger.percent = 66%
,且文件数超过2个 - 若内存中的文件数目超过
mapred.inmem.merge.threshold = 1000
- 若阻塞在ShuffleRamManager的请求数目超过拷贝线程数目
mapred.reduce.parallel = 75%
- 由于各个Map Task已经事先对自己的输出分片进行了局部排序,因此Reduce Task秩序进行一次归并排序即可保证数据整体有序。
- 为了提高效率,Hadoop将Sort阶段和Reduce阶段并行化
- Sort阶段时,Reduce Task为内存和磁盘中的文件建立了小顶堆,保存了指向小顶堆根节点的迭代器
- Reduce阶段时,Reduce Task不断移动迭代器,以将key相同的数据顺次交给reduce()处理,期间移动迭代器的过程实际上就是不断调整小顶堆的过程。