Hadoop中MapReduce基本案例及代码(五)
前四节提供了几个小案例
下面详细介绍MapReduce中Map任务Reduce任务以及MapReduce的执行流程。
Map任务:
- 读取输入文件内容,解析成key,value对。对输入文件的每一行,解析成key,value对。每一个键值对调用一次map函数。
- 写自己的逻辑,对输入的key,value处理,转换成新的key,value输出。
- 对输出的key,value进行分区。
- 对相同分区的数据,按照key进行排序(默认按照字典排序)、分组。相同key的value放在一个集合中。
- (可选)分组后对数据进行归约。
注意:MapReduce中,Mapper可以单独存在,但是Reducer不能存在。
Reduce任务:
- 对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点。这个过程并不是map将数据发送给reduce,而是reduce主动去获取数据。Reduce的个数>=分区的数量。
- 对多个map任务的输出进行合并、排序。写reduce函数自己的逻辑,对输入的key、value处理,转换成新的key、value输出。
- 把reduce的输出保存到文件中。
MapReduce执行流程:
此图为hadoop1.0图 ,2.0多了个yarn。《hadoop权威指南》上有。
- run job:客户端提交一个mr的jar包给JobClient(提交方式:hadoop jar …。
a. 做job环境信息的收集,比如各个组件类,输入输出的kv类型等,检测是否合法。
b. 检测输入输出的路径是否合法. - JobClient通过RPC和ResourceManager进行通信,返回一个存放jar包的地址(HDFS)和jobId。jobID是全局唯一的,用于标识该job。
- client将jar包写入到HDFS当中(path = hdfs上的地址 + jobId)
- 开始提交任务(任务的描述信息,不是jar, 包括jobid,jar存放的位置,配置信息等等)
- JobTracker进行初始化任务
- 读取HDFS上的要处理的文件,开始计算输入切片,每一个切片对应一个 MapperTask。注意,切片是一个对象,存储的是这个切片的数据描述信息;切块是文件块(数据块),里面存储的是真正的文件数据。
- TaskTracker通过心跳机制领取任务(任务的描述信息)。切片一般和切块是一样的,即在实际开发中,切块和切片认为是相同的。在领取到任务之后,要满足数据本地化策略。
- 下载所需的jar,配置文件等。体现的思想:移动的是运算/逻辑,而不是数据。
- TaskTracker启动一个java child子进程,用来执行具体的任务(MapperTask 或ReducerTask)。
- 将结果写入到HDFS当中。
注意几点
Shuffle
Shuffle就是数据重新打乱然后汇聚到不同节点的过程。
Spark中也有shuffle,所以shuffle在数据处理过程中相当重要。
MapReduce中的Shuffle过程分为MapTask 和 ReduceTask。
MapTask
- 获取到切片(FileSplit)信息。
- 每一个切片对应一个MapTask。
- 读取具体的数据块(Block)
- 按行读取数据
- 每一行数据会调用一次map方法,进行处理。
- map方法在执行完成之后会产生k-v结构,这个数据会存在缓冲区中。
- 在缓冲区中会进行partition/sort/combine
- 缓冲区默认大小是100M,缓冲区中还会有一个阈值—80% —就意味着如果缓冲区使用达到了80%的时候,认为缓冲区满了。
- 如果缓冲区满了,就会将缓冲区中的数据写到磁盘的文件中,过程称为Spill(溢写),写出的文件称之为溢写文件。
- 每一个溢写文件中的数据是分好区且排好序的。
- 每一次Spill过程都会产生一个新的溢写文件,所以所有的溢写文件从整体上不是分区且排序的。
- 在交给ReduceTask之前,会对所有的溢写文件进行一次合并— merge.
- 合并之后的文件是对所有的数据进行了整体的分区并且排序。
注意点:
- Spill过程不一定发生
- 如果产生了Spill过程,且最后一次的数据不足阈值,将最后一次的缓冲区中的数据flush到最后一个溢写文件中。
- 切片的大小和溢写文件的个数不是对等的。(经过map各种combine Parition操作后,内部增加数据,比如之前案例中单词个数统计,增加的单词数的数据,数据量一大,原来的切片数量肯定小于溢写文件数量;反之亦然)
- 达到缓冲区的80%的时候会Spill到溢写文件中,理论上Spill文件应该是 80M,实际上溢写文件一定是80M么?—不一定①要考虑最后一次的 flush;②要考虑序列化的因素。
- 如果溢写文件的个数>=3个,在merge的时候会再进行一次combine过程
- 每一个MapTask对应一个缓冲区
- 缓冲区本质上是一个字节数组
- 缓冲区是一个环形缓冲区,为了重复利用缓冲区
- 阈值的作用:①防止数据覆盖②防止写入过程的阻塞
ReduceTask
- ReduceTask通过http请求来访问对应的MapTask获取到分区的数据 — fetch- 线程数量默认是5
- 获取到不同的MapTask的数据之后,会对数据进行merge,将数据合并(将相同的键所对应的值放入一个迭代器中)且排序(根据键进行排序)
- 每一个键调用一次reduce方法来进行处理
- 将处理之后的数据写到HDFS中
注意
- merge因子:确定每次将几个文件合并一次。默认是10。如果文件个数 <merge因子,直接合并。
- ReduceTask的阈值:ReduceTask不是等所有的MapTask都结束之后才启动执行,而是在一定数量的MapTask结束之后就开始启动抓取数据。
5% —当5%的数量的MapTask结束之后,ReduceTask就开始启动抓取数据