Hadoop——MapReduce过程详解

Hadoop——MapReduce过程详解
1、MapReduce程序读取文件的输入目录上存放的相应文件

2、客户端在submit()方法执行之前获取要处理的数据信息,根据集群中的配置形成一个任务分配规划

3、客户端提交切片信息给Yarn,Yarn中的resourcemanager启动MRAppmaster

----------------------maptask开始

4、MRAPPmaster启动后根据本次job的描述信息计算出需要maptask的实例对象。首先,读取数据组件InputFormat(默认TextInputFormat)getSplits方法对输入目录中文件进行逻辑切片规划得到splits,有多少个split就对应启动多少个MapTask。split与block的对应关系默认是一对一。

5、将输入文件切分为splits之后,由RecordReader对象(默认LineRecordReader)进行读取,以\n作为分隔符,读取一行数据,返回<key,value>。Key表示每行首字符偏移值,value表示这一行文本内容。

6、将刚才获取的<k,v>键值对当作输入参数传入用户自己继承的Mapper类中,执行用户重写的map函数。RecordReader读取一行,这里调用一次。

---------------------shuffle开始

7、map逻辑完之后,生成新的<k,v>键值对,把它们通过context.write进行collect数据收集。首先对数据进行分区处理,默认使用HashPartitioner。
(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks
默认分区是根据key的hashCode对ReduceTasks个数取模得到的

8、然后把数据发送到环形缓冲区,缓冲区默认是100MB,环形缓冲区达到80%时,触发spill溢写,在spill之前会数据进行排序,排序按照key的索引进行字典排序,排序的手段是快速排序

9、如果job设置过Combiner,那么现在就是使用Combiner的时候了。将有相同key的key/value对的value加起来,减少溢写到磁盘的数据量。Combiner会优化MapReduce的中间结果,所以它在整个模型中会多次使用。
那哪些场景才能使用Combiner呢?从这里分析,Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。注意求平均值不能使用Combiner,会影响最终结果

9、合并溢写文件:每次溢写会在磁盘上生成一个临时文件(写之前判断是否有combiner),如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个临时文件存在。当整个数据处理结束之后开始对磁盘中的临时文件进行merge合并,因为最终的文件只有一个,写入磁盘,并且为这个文件提供了一个索引文件,以记录每个reduce对应数据的偏移量。

---------------------maptask结束

---------------------reducetask开始

10、Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
11、Merge阶段。这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活。merge有三种形式:内存到内存;内存到磁盘;磁盘到磁盘。默认情况下第一种形式不启用。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的文件。

12、合并排序。把分散的数据合并成一个大的数据后,还会再对合并后的数据排序。

-----------------------shuffle结束

13、对排序后的键值对调用reduce方法,键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到HDFS文件中。

----------------------reducetask结束