Hadoop中MapReduce基本案例及代码(五)

前四节提供了几个小案例
下面详细介绍MapReduce中Map任务Reduce任务以及MapReduce的执行流程。

Map任务:

  1. 读取输入文件内容,解析成key,value对。对输入文件的每一行,解析成key,value对。每一个键值对调用一次map函数。
  2. 写自己的逻辑,对输入的key,value处理,转换成新的key,value输出。
  3. 对输出的key,value进行分区。
  4. 对相同分区的数据,按照key进行排序(默认按照字典排序)、分组。相同key的value放在一个集合中。
  5. (可选)分组后对数据进行归约。
    注意:MapReduce中,Mapper可以单独存在,但是Reducer不能存在。

Reduce任务:

  1. 对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。这个过程并不是map将数据发送给reduce,而是reduce主动去获取数据。Reduce的个数>=分区的数量。
  2. 对多个map任务的输出进行合并、排序。写reduce函数自己的逻辑,对输入的key、value处理,转换成新的key、value输出。
  3. 把reduce的输出保存到文件中。

MapReduce执行流程:

Hadoop中MapReduce基本案例及代码(五)
此图为hadoop1.0图 ,2.0多了个yarn。《hadoop权威指南》上有。

  1. run job:客户端提交一个mr的jar包给JobClient(提交方式:hadoop jar …。
    a. 做job环境信息的收集,比如各个组件类,输入输出的kv类型等,检测是否合法。
    b. 检测输入输出的路径是否合法.
  2. JobClient通过RPC和ResourceManager进行通信,返回一个存放jar包的地址(HDFS)和jobId。jobID是全局唯一的,用于标识该job。
  3. client将jar包写入到HDFS当中(path = hdfs上的地址 + jobId)
  4. 开始提交任务(任务的描述信息,不是jar, 包括jobid,jar存放的位置,配置信息等等)
  5. JobTracker进行初始化任务
  6. 读取HDFS上的要处理的文件,开始计算输入切片,每一个切片对应一个 MapperTask。注意,切片是一个对象,存储的是这个切片的数据描述信息;切块是文件块(数据块),里面存储的是真正的文件数据。
  7. TaskTracker通过心跳机制领取任务(任务的描述信息)。切片一般和切块是一样的,即在实际开发中,切块和切片认为是相同的。在领取到任务之后,要满足数据本地化策略。
  8. 下载所需的jar,配置文件等。体现的思想:移动的是运算/逻辑,而不是数据。
  9. TaskTracker启动一个java child子进程,用来执行具体的任务(MapperTask 或ReducerTask)。
  10. 将结果写入到HDFS当中。

注意几点
Hadoop中MapReduce基本案例及代码(五)
Hadoop中MapReduce基本案例及代码(五)

Shuffle

Shuffle就是数据重新打乱然后汇聚到不同节点的过程。
Spark中也有shuffle,所以shuffle在数据处理过程中相当重要。
MapReduce中的Shuffle过程分为MapTask 和 ReduceTask。

MapTask
Hadoop中MapReduce基本案例及代码(五)

  1. 获取到切片(FileSplit)信息。
  2. 每一个切片对应一个MapTask。
  3. 读取具体的数据块(Block)
  4. 按行读取数据
  5. 每一行数据会调用一次map方法,进行处理。
  6. map方法在执行完成之后会产生k-v结构,这个数据会存在缓冲区中。
  7. 在缓冲区中会进行partition/sort/combine
  8. 缓冲区默认大小是100M,缓冲区中还会有一个阈值—80% —就意味着如果缓冲区使用达到了80%的时候,认为缓冲区满了。
  9. 如果缓冲区满了,就会将缓冲区中的数据写到磁盘的文件中,过程称为Spill(溢写),写出的文件称之为溢写文件。
  10. 每一个溢写文件中的数据是分好区且排好序的。
  11. 每一次Spill过程都会产生一个新的溢写文件,所以所有的溢写文件从整体上不是分区且排序的。
  12. 在交给ReduceTask之前,会对所有的溢写文件进行一次合并— merge.
  13. 合并之后的文件是对所有的数据进行了整体的分区并且排序。

注意点:

  1. Spill过程不一定发生
  2. 如果产生了Spill过程,且最后一次的数据不足阈值,将最后一次的缓冲区中的数据flush到最后一个溢写文件中。
  3. 切片的大小和溢写文件的个数不是对等的。(经过map各种combine Parition操作后,内部增加数据,比如之前案例中单词个数统计,增加的单词数的数据,数据量一大,原来的切片数量肯定小于溢写文件数量;反之亦然)
  4. 达到缓冲区的80%的时候会Spill到溢写文件中,理论上Spill文件应该是 80M,实际上溢写文件一定是80M么?—不一定①要考虑最后一次的 flush;②要考虑序列化的因素。
  5. 如果溢写文件的个数>=3个,在merge的时候会再进行一次combine过程
  6. 每一个MapTask对应一个缓冲区
  7. 缓冲区本质上是一个字节数组
  8. 缓冲区是一个环形缓冲区,为了重复利用缓冲区
  9. 阈值的作用:①防止数据覆盖②防止写入过程的阻塞

ReduceTask
Hadoop中MapReduce基本案例及代码(五)

  1. ReduceTask通过http请求来访问对应的MapTask获取到分区的数据 — fetch- 线程数量默认是5
  2. 获取到不同的MapTask的数据之后,会对数据进行merge,将数据合并(将相同的键所对应的值放入一个迭代器中)且排序(根据键进行排序)
  3. 每一个键调用一次reduce方法来进行处理
  4. 将处理之后的数据写到HDFS中

注意

  1. merge因子:确定每次将几个文件合并一次。默认是10。如果文件个数 <merge因子,直接合并。
  2. ReduceTask的阈值:ReduceTask不是等所有的MapTask都结束之后才启动执行,而是在一定数量的MapTask结束之后就开始启动抓取数据。
    5% —当5%的数量的MapTask结束之后,ReduceTask就开始启动抓取数据