大数据面试(六)_hadoop中MapReduce工作流程和MapTask、Shuffle、ReduceTask工作机制

MapReduce整个工作流程:

大数据面试(六)_hadoop中MapReduce工作流程和MapTask、Shuffle、ReduceTask工作机制

大数据面试(六)_hadoop中MapReduce工作流程和MapTask、Shuffle、ReduceTask工作机制

一、MapTask阶段大数据面试(六)_hadoop中MapReduce工作流程和MapTask、Shuffle、ReduceTask工作机制

大数据面试(六)_hadoop中MapReduce工作流程和MapTask、Shuffle、ReduceTask工作机制

 

(1)Read 阶段:Map Task 通过用户编写的 RecordReader,从输入 InputSplit 中解析出

一个个 key/value。

(2)Map 阶段:该节点主要是将解析出的 key/value 交给用户编写 map()函数处理,并

产生一系列新的 key/value。

(3)Collect 收集阶段:在用户编写 map()函数中,当数据处理完成后,一般会调用

OutputCollector.collect()输出结果。在该函数内部,它会将生成的 key/value 分区(调用

Partitioner),并写入一个环形内存缓冲区中。

(4)Spill 阶段:即“溢写”,当环形缓冲区满后,MapReduce 会将数据写到本地磁盘上,

生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排

序,并在必要时对数据进行合并、压缩等操作。

溢写阶段详情:

步骤 1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号

partition 进行排序,然后按照 key 进行排序。这样,经过排序后,数据以分区为单位聚集在

一起,且同一分区内所有数据按照 key 有序。

步骤 2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文

件 output/spillN.out(N 表示当前溢写次数)中。如果用户设置了 Combiner,则写入文件之

前,对每个分区中的数据进行一次聚集操作。

 

步骤 3:将分区数据的元信息写到内存索引数据结构 SpillRecord 中,其中每个分区的元

 

信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大

小超过 1MB,则将内存索引写到文件 output/spillN.out.index 中。

(5)Combine 阶段:当所有数据处理完成后,MapTask 对所有临时文件进行一次合并,

以确保最终只会生成一个数据文件。

当所有数据处理完后,MapTask 会将所有临时文件合并成一个大文件,并保存到文件

output/file.out 中,同时生成相应的索引文件 output/file.out.index。

在进行文件合并过程中,MapTask 以分区为单位进行合并。对于某个分区,它将采用多

轮递归合并的方式。每轮合并 io.sort.factor(默认 100)个文件,并将产生的文件重新加入

待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。

让每个 MapTask 最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量

小文件产生的随机读取带来的开销。

信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大

小超过 1MB,则将内存索引写到文件 output/spillN.out.index 中。

(5)Combine 阶段:当所有数据处理完成后,MapTask 对所有临时文件进行一次合并,

以确保最终只会生成一个数据文件。

当所有数据处理完后,MapTask 会将所有临时文件合并成一个大文件,并保存到文件

output/file.out 中,同时生成相应的索引文件 output/file.out.index。

在进行文件合并过程中,MapTask 以分区为单位进行合并。对于某个分区,它将采用多

轮递归合并的方式。每轮合并 io.sort.factor(默认 100)个文件,并将产生的文件重新加入

待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。

让每个 MapTask 最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量

小文件产生的随机读取带来的开销。

二、Shuffle阶段

大数据面试(六)_hadoop中MapReduce工作流程和MapTask、Shuffle、ReduceTask工作机制

 

1)maptask 收集我们的 map()方法输出的 kv 对,放到内存缓冲区中

2)从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件

3)多个溢出文件会被合并成大的溢出文件

4)在溢出过程中,及合并的过程中,都要调用 partitioner 进行分区和针对 key 进行排序

5)reducetask 根据自己的分区号,去各个 maptask 机器上取相应的结果分区数据

6)reducetask 会取到同一个分区的来自不同 maptask 的结果文件,reducetask 会将这些

文件再进行合并(归并排序)

7)合并成大文件后,shuffle 的过程也就结束了,后面进入 reducetask 的逻辑运算过程

(从文件中取出一个一个的键值对 group,调用用户自定义的 reduce()方法)

3)注意

Shuffle 中的缓冲区大小会影响到 mapreduce 程序的执行效率,原则上说,缓冲区越大,磁盘 io 的次数越少,执行速度就越快。

缓冲区的大小可以通过参数调整,参数:io.sort.mb 默认 100M。

三、ReduceTask阶段

大数据面试(六)_hadoop中MapReduce工作流程和MapTask、Shuffle、ReduceTask工作机制

 

(1)Copy 阶段:ReduceTask 从各个 MapTask 上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。

(2)Merge 阶段:在远程拷贝数据的同时,ReduceTask 启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。

(3)Sort 阶段:按照 MapReduce 语义,用户编写 reduce()函数输入数据是按 key 进行聚集的一组数据。为了将 key 相同的数据聚在一起,Hadoop 采用了基于排序的策略。由于各个 MapTask 已经实现对自己的处理结果进行了局部排序,因此,ReduceTask 只需对所有数据进行一次归并排序即可。

(4)Reduce 阶段:reduce()函数将计算结果写到 HDFS 上。

原文地址:https://blog.csdn.net/weixin_40102671/article/details/80628736