Mapreduce工作机制深度解析
Mapreduce工作机制深度解析
2020年11月4日
19:55
Mapreduce整体的编程框架看起来非常直观,就是Map和Reduce两个阶段,但事实上有很多问题,细细想来其实并不容易。例如,我一开始学习wordcount的一些疑惑:
- 文件如何切分?数据源虽然只有一个,但是hadoop可是分布式的系统,这个文件可能存于多个节点上。其二,文本按行切分可以理解,但若输入数据不是一行一行的文本呢,由谁控制?
- Reduce接收wordcount的k,v组的时候,其实是一组一组发过来的,多个map任务并行执行之后如何分组?(其实这个例子中太过简单,map之后需要一系列排序来实现shuffle,其中的排序比想象的要复杂许多)
- map多个并行,但是reduce全都交给一个节点?这个的效率不是特别差吗?可以由多个reduce吗,如何划分任务?如何控制。
- ……
通过学习框架的详细流程,这些问题都可以解答。而且经过了几个案例的实现,可以清晰地体会到框架这种插件式框架编程,才能真正理解Driver的功能。下图是全流程的示意图,有点乱,以下将按序号开始解释:
1. 提交任务(1-2)
- 从集群中读取到数据源(存储分布式,逻辑上仍然是一个文件)之后,最重要的事情就是向集群提交任务,其中要完成的最重要的事情就是:将Jar包等各种需要共享的东西提交到临时的文件夹,然后将数据源切分
- 共享临时目录,由于多个节点分布式运算,所以必须在集群上有一个共享的目录,用于共享环境和数据。
- 数据切分,这相当于任务切分,源代码中,由InputFormat负责,它负责实现两个功能,数据的划分,以及管理RecordReader用于如何切分数据(指的是将一部分文件切分成更细粒度的单元,例如按行切分,就是这个Reader负责的)。值得一提的是,文件实际是分布式存储的,如果随意切分,那么势必要在存储节点之间传输大量数据(切分边角料),所以正常而言,最好的方式是按hadoop的block size进行切分。若不然,还得实现切分方式,因为不一定所有资料都是以行为单位的。
2. Map阶段(4-6),本阶段主要负责将得到的单元数据,进行处理,例如一行数据(切分,获取值,统计等等)
- 在Map之前,实际上一个重点就是,InputFormat给了我们切分部分的数据源,我们如何从中获取单元数据呢(例如,每次获取一行)。前面也说了,就是RecordReader负责的,我们可以实现这个接口,指定数据读取的方式
- Map任务的并行数量,这个其实就是由前面InputFormat的split决定的,然后就可以向hadoop中的Resouce Manager事情,RM则启动相应的MapAppMaster进行处理。
- Map,于是就到了我们编写的map逻辑,处理RecordReader每次返回的单位数据(例如行),然后写回context
3. Shuffle阶段(7-10),map生成了大量k,v对,在进入reduce之前,有一件非常重要的事情,就是对这些数据进行分类整理分组。整理是必要的,因为大多数时候,并行任务之间是比较独立,但是仍然会有逻辑联系。例如wordcount中,我们会喜欢key,即word相同的单词自动归为一组送给reducer,在多节点的情况下,这件事并不是很简单。此阶段有点复杂
- 写入collector缓冲区,由于产生了大量的k,v,所以我们要进行处理,很自然地,就需要一个缓冲区。由于collector是比较小的,所以需要不断地溢出到磁盘,我们不希望中断。所以它的策略是这样的:两个方向缓存,一个方向缓存数据,一个方向缓存index,到达百分之八十的时候,这部分溢出到磁盘,而从反方向选一个点开始写入。——这是非常简洁,优雅的解决方案。
- 排序,我们先考虑对key排序,由于大量文件,内存放不进所有东西,实现这种需求最好的自然是归并排序(同操作系统)。上文提到,数据会溢出到磁盘中,此时,进行局部的快速排序,然后多个局部归并成有序的分区。(我们这里忽略分区,才不影响理解)
由于多次溢出会产生多份有序的分区数据,所以一个map任务最终输出之前还要进行一次归并。
最后,由于我们事实上是多个map任务,所以多个任务之间最终还要进行最后一次归并。(上图中没有体现,不过好理解)
上述就是shuffle阶段的三次排序了。
- 分区。这其实就是一开始我们提出的问题,reduce交给一个节点不合适,那是不是应该交给多个节点呢?如何划分?这也就是分区的意义了。
- 这不太好说明,以wordcount为例,这里分区的策略是,hash(key)% num,num为分区数量,用这种方式将大量的单词划分为num个分区。
- 事实上,wordcount比较简单,实际中,我们可以按key中的省份、年份、月份进行分区,这些由实现Partitioner来解决,可以编程指定分区方案
- 当然了,分区数量就理所当然地等于reducer的数量,由用户手动配置,以及实现分区方案。
- 注意,每次溢出都会产生多个分区,合并的时候,全程都是各个相同分区内部进行归并的!
- 归约,这个又是什么?为了理解这个东西,我们先想象wordcount中一个很奇怪的地方,每次只写出<word, 1>这样一对数据,如果有多个“hello”的单词,那不得写出多个<hello,1>,换成<hello,n>不是更好吗?我们考虑到一个节点上运行map task产生的k,v会通过网络传输到collector中被收集,那么<hello, n >的方案不是能大大减少传输量?是的,归约就是做这件事的
- 返回上图的9和10处我们可以看到,9是一个map task中产生的多个溢出有序数据,那么这里我们就可以进行归约,而10则是map task的最终输出,这也可以归约。
- 归约通过指定combiner来实现。
4. 再说排序。事实上,此处的排序不一定是我们传统理解上的“排序”。大量的需求需要对数据进行特定的整理,这也是排序,例如说按多个关键词进行比对。通过控制排序的方法,我们可以控制shuffle的整理和分组。具体而言,和java中一样,这部分由实现Comparable接口或者指定Comparator决定
5. 分组,Reducer之前,我们可以指定分组的方法,得到我们想要的分组,这部分同样是指定分组的Comparator
6. Reduce阶段,每个分区按照我们指定的方式进行reduce,然后写出
7. 写出,由OutputFormat控制,同样的也有RecordWriter都可以进行编写指定。