MapReduce原理(执行机制)

MapReduce简介

MapReduce是面向大数据并行处理的计算模型、框架和平台,它隐含了以下三层含义:

1)MapReduce是一个基于集群的高性能并行计算平台(Cluster Infrastructure)。它允许用市场上普通的商用服务器构成一个包含数十、数百至数千个节点的分布和并行计算集群。

2)MapReduce是一个并行计算与运行软件框架(Software Framework)。它提供了一个庞大但设计精良的并行计算软件框架,能自动完成计算任务的并行化处理,自动划分计算数据和计算任务,在集群节点上自动分配和执行任务以及收集计算结果,将数据分布存储、数据通信、容错处理等并行计算涉及到的很多系统底层的复杂细节交由系统负责处理,大大减少了软件开发人员的负担。

3MapReduce是一个并行程序设计模型与方法(Programming Model & Methodology)。它借助于函数式程序设计语言Lisp的设计思想,提供了一种简便的并行程序设计方法,用MapReduce两个函数编程实现基本的并行计算任务,提供了抽象的操作和并行编程接口,以简单方便地完成大规模数据的编程和计算处理 

MapReduce原理(执行机制)

图一

MapReduce原理(执行机制)

MapReduce原理(执行机制)

主要执行步骤

1、向client端提交MapReduce job.

2、随后yarnResourceManager进行资源的分配.

3、由NodeManager进行加载与监控containers.

4、通过applicationMasterResourceManager进行资源的申请及状态的交互,由NodeManagers进行MapReduce运行时job的管理.

5、通过hdfs进行job配置文件、jar包的各节点分发。

Job 提交过程

job的提交通过调用submit()方法创建一个JobSubmitter实例,并调用submitJobInternal()方法。整个job的运行过程如下:

1、向ResourceManager申请application ID,此ID为该MapReducejobId

2、检查output的路径是否正确,是否已经被创建。

3、计算inputsplits

4、拷贝运行job 需要的jar包、配置文件以及计算inputsplit 到各个节点。

5、在ResourceManager中调用submitAppliction()方法,执行job

Job 初始化过程

1、当resourceManager收到了submitApplication()方法的调用通知后,scheduler开始分配container,随之ResouceManager发送applicationMaster进程,告知每个nodeManager管理器。

2、由applicationMaster决定如何运行tasks,如果job数据量比较小,applicationMaster便选择将tasks运行在一个JVM中。那么如何判别这个job是大是小呢?当一个jobmappers数量小于10个,只有一个reducer或者读取的文件大小要小于一个HDFS block时,

(可通过修改配置项

mapreduce.job.ubertask.maxmaps,mapreduce.job.ubertask.maxreduces以及

mapreduce.job.ubertask.maxbytes 进行调整)

3、在运行tasks之前,applicationMaster将会调用setupJob()方法,随之创建output的输出路径(这就能够解释,不管你的mapreduce一开始是否报错,输出路径都会创建)

Task 任务分配

1、接下来applicationMasterResourceManager请求containers用于执行mapreducetasksstep 8),这里map task的优先级要高于reduce task,当所有的map tasks结束后,随之进行sort(这里是shuffle过程后面再说),最后进行reduce task的开始。(这里有一点,当map tasks执行了百分之5%的时候,将会请求reduce,具体下面再总结)

2、运行tasks的是需要消耗内存与CPU资源的,默认情况下,mapreducetask资源分配为1024MB与一个核,(可修改运行的最小与最大参数配置,mapreduce.map.memory.mb,mapreduce.reduce.memory.mb,mapreduce.map.cpu.vcores,mapreduce.reduce.reduce.cpu.vcores.)

Task 任务执行

1、这时一个task已经被ResourceManager分配到一个container中,由applicationMaster告知nodemanager启动container,这个task将会被一个主函数为YarnChildjava application运行,但在运行task之前,首先定位task需要的jar包、配置文件以及加载在缓存中的文件。

2YarnChild运行于一个专属的JVM中,所以任何一个mapreduce任务出现问题,都不会影响整个nodemanagercrash或者hang

3、每个task都可以在相同的JVM task中完成,随之将完成的处理数据写入临时文件中。

Mapreduce数据流

运行进度与状态更新

1MapReduce是一个较长运行时间的批处理过程,可以是一小时、几小时甚至几天,那么Job的运行状态监控就非常重要。每个job以及每个task都有一个包含jobrunning,successfully completed,failed)的状态,以及value的计数器,状态信息及描述信息(描述信息一般都是在代码中加的打印信息),那么,这些信息是如何与客户端进行通信的呢?

2、当一个task开始执行,它将会保持运行记录,记录task完成的比例,对于map的任务,将会记录其运行的百分比,对于reduce来说可能复杂点,但系统依旧会估计reduce的完成比例。当一个mapreduce任务执行时,子进程会持续每三秒钟与applicationMaster进行交互。

Job 完成

   最终,applicationMaster会收到一个job完成的通知,随后改变job的状态为successful。最终,applicationMastertask containers被清空。

 

图二

 MapReduce原理(执行机制)

Combiner 是对mapper输出结果进行局部的汇总,reducer是对mapper输出结果进行全局的汇总。

Partitionermap阶段

数据分片处理好之后,要经过map阶段的partitioner进行分区,来选择不同的reduce进行处理数据。

图三

 MapReduce原理(执行机制)

第一步:假设一个文件有三行英文单词作为 MapReduce Input(输入),这里经过 Splitting 过程把文件分割为3块。分割后的3块数据就可以并行处理,每一块交给一个 map 线程处理。

第二步:每个 map 线程中,以每个单词为key,以1作为词频数value,然后输出。

第三步:每个 map 的输出要经过 shuffling(混洗),将相同的单词key放在一个桶里面,然后交给 reduce 处理。

第四步:reduce 接受到 shuffling 后的数据,会将相同的单词进行合并Merge,得到每个单词的词频数,最后将统计好的每个单词的词频数作为输出结果

上述就是 MapReduce 的大致流程,前两步可以看做 map 阶段,后两步可以看做 reduce 阶段。

图四

 MapReduce原理(执行机制)

1)首先文档的数据记录(如文本中的行,或数据表格中的行)是以键值对的形式传入map 函数,然后map函数对这些键值对进行处理(如统计词频),然后输出到中间结果。

2)在键值对进入reduce进行处理之前,必须等到所有的map函数都做完,所以既为了达到这种同步又提高运行效率,在mapreduce中间的过程引入了barrier(同步障)

在负责同步的同时完成对map的中间结果的统计,包括 

a. 对同一个map节点的相同keyvalue值进行合并

b. 之后将来自不同map的具有相同的key的键值对送到同一个reduce进行处理。

3)在reduce阶段,每个reduce节点得到的是从所有map节点传过来的具有相同的key的键值对。reduce节点对这些键值进行合并

图五

MapReduce原理(执行机制)

图五图四的进一步细化,主要体现在:

1Combiner 节点负责完成上面提到的将同一个map中相同的key进行合并,避免重复传输,从而减少传输中的通信开销。

2Partitioner节点负责将map产生的中间结果进行划分,确保相同的key到达同一个reduce节点.

图六

 MapReduce原理(执行机制)

MapReduce的执行步骤

1Map任务处理

1.1 读取HDFS中的文件。每一行解析成一个<k,v>

每一个键值对调用一次map函数。            

    <0,hello you>   <10,hello me>                    

1.2 覆盖map(),接收1.1产生的<k,v>,进行处理,转换为新的<k,v>输出。    

      <hello,1> <you,1> <hello,1> <me,1>

1.3 1.2输出的<k,v>进行分区。默认分为一个区。

详见partitioner

http://www.cnblogs.com/ahu-lichang/p/6657895.html

1.4 对不同分区中的数据进行排序(按照k)、分组。分组指的是相同keyvalue放到一个集合中。

 排序后:<hello,1> <hello,1> <me,1> <you,1>  

分组后:<hello,{1,1}><me,{1}><you,{1}>

1.5 (可选)对分组后的数据进行归约

详见:Combiner

http://www.cnblogs.com/ahu-lichang/p/6657572.html

2Reduce任务处理

2.1 多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点上。(shuffle

详见:shuffle的过程分析

http://www.cnblogs.com/ahu-lichang/p/6665242.html

2.2 对多个map的输出进行合并、排序。

覆盖reduce函数,接收的是分组后的数据,实现自己的业务逻辑, <hello,2> <me,1> <you,1>

    处理后,产生新的<k,v>输出。

2.3 reduce输出的<k,v>写到HDFS中。