Hadoop学习之路(七):理解Hadoop三大核心组件之MapReduce

一、MapReduce简介

MapReduce是一种编程模型,用于大规模数据集的并行运算,它是Hadoop的三大核心组件之一,承担着Hadoop的计算工作。概念"Map(映射)“和"Reduce(聚合)”,是这个编程模型的核心思想,它提供了一个庞大但设计精良的并行计算软件框架,能自动完成计算任务的并行化处理,自动划分计算数据和计算任务,在集群节点上自动分配和执行任务以及收集计算结果,将数据分布存储、数据通信、容错处理等并行计算涉及到的很多系统底层的复杂细节交由系统负责处理,大大减少了软件开发人员的负担。

二、MapReduce核心阶段

手绘了一张MapReduce执行简图:
Hadoop学习之路(七):理解Hadoop三大核心组件之MapReduce
MapReduce计算模型主要由三个核心阶段构成:Map、Shuffle、Reduce。Map是映射,Reduce是聚合,Shuffle是对Map的输出进行一定的排序与分割,然后再交给对应的Reduce的过程。

1. Map

  • 从HDFS读入切片数据,切片的数量在作业提交时由JobSummiter类决定,切片的数量即为Map的个数,切片法则是这样的:在1和long的最大值还有用户输入的切片大小之间选择中间那个值作为切片的大小,由切片大小结合数据量划分切片的数量。事实上我们并没有设置切片大小,因而默认为HDFS块大小,在Hadoop1.x之前是64MB,2.x后是128MB,128MB在1和long最大值之间所以默认的切片大小就是HDFS的块大小。
  • 在map会调用我们写的业务代码,即map方法的代码,对数据进行映射,比如词频统计的时候,单词word经过map后变为(word,1),切片数据经过这一步将会被映射为大量的Key-Value元组。
  • 线程根据Reduce的个数设置分区(partition),即Reduce的个数就是分区的个数。Reduce个数由我们在作业配置类中调用作业的setNumReduceTasks方法设置。这一步,由map出来的元组会根据Key进入到不同的分区,这样做是为了避免有些Reduce任务分配到大量数据,而有些Reduce任务却分到很少数据,甚至没有分到数据。默认情况下,是根据Key的哈希值进行分区。
  • 经过分区之后,数据需要写入缓冲区,默认是100MB,而map的处理结果量往往很大,当数据量超过缓冲区的80%即80MB的时候会益处(spill)出到磁盘上,这个操作由分线程执行,与map线程并行执行,剩余的20%继续被map线程写入。值得注意的是,在这80MB空间内的key会被排序(Sort)。排序是MapReduce模型默认的行为
  • 在每个分区中存在大量的元组,在同一个分区中有许多元组的Key是一样的,比如说都是(word,1),这时候为了减少网络间数据的分发量,会有一个Combine操作,这是Map端的reduce操作,它会在每个分区中对Key进行聚合,变成类似于这种(word,156),将156个元组聚合为一个,即减轻网络传输量,又减轻对应Reducer的负担。但是值得注意的是,Combine的输出是Reduce的输入,Combine操作绝不能改变最终的计算结果,这个操作并不是一定可以使用的,比如说如果我要求海量数字的平均数就不可以使用这个操作,否则会出错。

2. Shuffle

Hadoop学习之路(七):理解Hadoop三大核心组件之MapReduce
关于Shuffle这样解释的:Shuffle的正常意思是洗牌或弄乱,可能大家更熟悉的是Java API里的Collections.shuffle(List)方法,它会随机地打乱参数list里的元素顺序。但是这其实并不能够清楚说明这个阶段,具体分为:

  • Map端Shuffle 其实,广义上Map端的Shuffle 就是map进程以后的那些流程:对于从map出来的数据进行分区处理(图中2过程),然后将其写入内存,超出内存限额部分将溢出到磁盘,除此Map端的Shuffle还有一个重要的作用就是合并溢出到磁盘的文件(图中3过程),随着map进程的执行,不断溢出文件,直到完成map进程可能会产生大量的溢出文件,这时候需要对这些文化进行合并,最后合并成了一个分区且排好序的大文件,此时还可以配置压缩处理,以减少网络间传输的数量。
  • Redcue端Shuffle Reduce会使用HTTP协议从不同的Map中拖取其对应要处理分区的数据,如果Reduce收到的数据比较小,则直接写入内存缓冲区,如果超过内存限额或内存不足时则会溢出到磁盘中,当溢出的文件超出一定的数量或者已经从所有的Map端获取完所有的数据就需要合并为一个大的有序的文件,值得注意的是,如果Map端的输出数据经过压缩处理则需要经过解压处理。

3. Reduce

  • Reduce在接收到合并后的有序数据后对数据按Key进行分组,比如收到了三份来自于三个Map端产生的数据(word,158),(word,517),(word,12),同一个Key会进入同一个reduce方法,该方法就是我们编写的reduce方法,在其中进行聚合,最后聚合为(word,687),这就是单词word的最终词频统计结果。
  • 因为在Map端接收过来的数据已经有序(默认按Key升序排序),所以当Reduce接收完所有Map的指定分区数据后,会在内存中存储了大量的聚合结果(可能有部分已经溢出到磁盘),会依次将数据写入至输出文件中。

三、MapReduce作业运行过程

Hadoop学习之路(七):理解Hadoop三大核心组件之MapReduce

1. 作业提交

  • Job对象会调用submit()方法,在内部创建一个JonSubmitter实例,然后该实例的submitJobInternal()(步骤1)方法,值得一提的是我们平常会使用Job对象的waitForCompletion()方法来提交作业,该方法每隔一秒轮询作业的进度情况,如果有变化则报告进度给控制台。当作业成功完成,则会把作业计数器打印出来,否则打印错误信息。
  • JonSubmitter回向资源管理器请求一个 application ID,该ID会被用作MapReduce作业的ID(步骤2)。检查作业的输出路径是否存在,若存在则作业不会被提交且会抛出一个异常。检查作业会否可以切片,如果不能切片这作业不会被提交且会抛出一个异常。
  • 拷贝作业运行的必备资源,包括作业的配置信息,切片信息,依赖的jar包等等到HDFS(步骤3),因为需要被其他节点共享使用。
  • 调用submitApplication来提交作业给资源管理器(步骤4)

2. 作业初始化

  • 资源调度框架会为该application分配一个节点管理器(步骤5),在该节点管理器会运行application的master进程,在这里这个进程就是MRAppMater,MRAppMater负责追踪作业进度和初始化作业(步骤6)。
  • MRAppMater会从HDFS中获取作业的信息(步骤7),然后他为每个分片分配一个map任务,然后通过查阅mapreduce.job.reduces的属性或者用户调用的setNumReduceTasks()方法设置。任务ID此时也会被设置。

3. 任务分配

  • MRAppMater为作业中的map任务和reduce任务向资源管理器请求容器(步骤8),首先为map任务请求,map任务的优先级高于reduce任务,因为reduce任务的输入依赖map任务的输出,默认情况下,至少map任务完成5%的情况下MRAppMater才会为reduce任务申请容器。
  • reduce任务可以在集群的任何节点,而map任务一般遵守数据本地约束,即在map任务的计算分片所在的主机处申请容器以减少网络数据传输。

4. 任务执行

  • 单资源管理器为某个节点管理器分配容器后,MRAppMater会连接该节点管理器来启动这个容器(步骤9)。任务通过YarnChild进程来执行,执行前会向HDFS申请得到作业的信息和所依赖的jar包(步骤10)。最后它运行MapTask或ReduceTask(步骤11)。

5. 作业完成

  • 当MRAppMater接收到最后一个任务完成后,它将作业状态置为“successful”。Job对象会打印一条信息告诉用户以及从waitForCompletion方法返回。
  • 当作业完成后,MRAppMater和作业容器清理它们的中间数据,然后OutPutCommiter的commitJob()方法被调用。

四、总结

该篇文章较为全面地介绍了MapReduce编程模型内部的核心执行阶段,介绍了MapReduce作业的详细提交流程。由于关于MapReduce的信息比较少,以上是结合找到的资料和阅读一点源代码写出来的,可能有错误或不准确的地方,有的话请指出,感谢你的阅读!