MapReduce工作机制(三、Reduce端流程)


文章从《Hadoop权威指南》以及《Hadoop技术内幕:深入解析MapReduce架构设计与实现原理》中总结而来。


四种Reduce Task:
  1. Job-setup Task:作业运行时启动的第一个任务
  2. Job-cleanup Task:作业运行时启动的最后一个任务
  3. Task-cleanup Task:任务失败或是被杀死后用于清理已写入临时目录中数据的任务
  4. Reduce Task: 从各个Map Task上读取一片数据,经排序后以组为单位交给用用户编写的reduce()函数处理,并将结果写到HDFS上。

MapReduce工作机制(三、Reduce端流程)

shuffle阶段

Reduce Task从各个Map Task上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写在磁盘上,否则放在内存中。

Merge阶段

在远程拷贝数据的同时,Reduce Task启动了两个后台线程对内存磁盘上的文件进行合并,防止内存使用过多或磁盘上文件过多

Sort阶段

将key相同的数据聚集在一起,由于各个Map Task已经实现了对自己处理结果的局部排序,Reduce Task只需要对所有数据进行一次归并排序即可

Reduce阶段

将每组数据一次交给用户编写的reduce()函数处理

Write阶段

将reduce()结果写到HDFS上

Shuffle & Merge

  • 并行执行
  • 远程拷贝数据量达到阈值,触发相应合并线程进行合并
  • 均由ReduceCopier实现

MapReduce工作机制(三、Reduce端流程)

1. 准备运行完成的Map Task列表

GetMapEventsThread线程周期性通过RPC从TaskTracker获取已完成Map Task列表mapLocation(保存了TaskTrackerHost与已完成任务列表的映射关系)中。

网络热点(大量进程集中读取某个TaskTracker上的数据)

Reduce Task将所有的TaskTracker Host进行混洗操作,然后保存到scheduledCopies列表中
MapOutputCopier将从scheduledCopies列表中获取待拷贝的Map Task输出数据位置。
曾经拷贝失败的Map Task将优先被拷贝。
2. 远程拷贝数据

Reduce Task同时启动mapred.reduce.parallel.copies = 5个数据拷贝线程MapOutputCopier

MapOutputCopier从scheduledCopies列表中获取Map Task的输出数据
利用HTTP Get从对应的TaskTracker拷贝数据

数据量过大则暂时写到磁盘上(工作目录)mapOutputsFilesOnDisk,否则可以直接放在内存中mapOutputsFilesInMemory,均带有一个MapOutput元数据对象

过大的数据量
可用于缓存数据分片的内存大小占JVM Max Heap Sizemapred.job.shuffle.input.buffer.percent = 0.70
内存管理类ShuffleRamManager
一个数据分片不能超过内存0.25倍
ShuffleRamManager
数据分片先申请内存,经过ShuffleRAMManager批准后获得内存,才能拷贝数据
拷贝时,如果错误次数超过abortFailureLimit则杀死该Reduce Task
如果错误次数超过maxFetchFailureBeforeReporting值为mapreduce.reduce.shuffle.maxfetchfailures = 10则对分片进行检查
若均不超过,则使用延迟时间delayTime对数据进行拷贝
3. 合并内存文件和磁盘文件
磁盘文件合并
LocalFSMerger负责进行磁盘文件合并
文件数据超过2 * (io.sort.factor = 10) - 1时,线程将会从mapOutputsFilesOnDisk取出最小的ioSortFactor个文件进行合并后重新写入
内存文件合并,转移至磁盘
InMemFSMergerThread负责进行内存文件合并转移
若全部数据拷贝完毕,则关闭ShuffleRamManager,将内存文件转移至磁盘
若已使用的内存超过可用内存的mapred.job.shuffle.merger.percent = 66%,且文件数超过2个
若内存中的文件数目超过mapred.inmem.merge.threshold = 1000
若阻塞在ShuffleRamManager的请求数目超过拷贝线程数目mapred.reduce.parallel = 75%
  • 由于各个Map Task已经事先对自己的输出分片进行了局部排序,因此Reduce Task秩序进行一次归并排序即可保证数据整体有序。
  • 为了提高效率,Hadoop将Sort阶段和Reduce阶段并行化
  • Sort阶段时,Reduce Task为内存和磁盘中的文件建立了小顶堆,保存了指向小顶堆根节点的迭代器
  • Reduce阶段时,Reduce Task不断移动迭代器,以将key相同的数据顺次交给reduce()处理,期间移动迭代器的过程实际上就是不断调整小顶堆的过程。