MapReduce的工作流程,shuffle过程,yarn资源调度过程,job提交流程

MapReduce的工作流程,shuffle过程,yarn资源调度过程,job提交流程

一:shuffle过程

概述:

mapTask处理的结果存入环形缓冲区中(因为频繁的I/O操作会导致减低效率,所以会有环形缓冲区,默认的环形缓冲区的大小是100M),存入的过程中进行partition(partition默认的是hashCode%reduceTask的个数,也可以自定义,自定义类继承partition类),partiton之后每一个键值对会加上一个partiton的属性值,用于确定是哪个分区;环形缓冲区有两个区,分别是数据区和索引区,当环形缓冲区的值达到阈值(默认是80%,可以通过mapreduce.map.io.sort.split.persen设置)后,有数据溢出,数据区溢出的叫file.out,索引区的file.out.index(多个溢出文件会合并成一个大的溢出文件),会启动线程将数据溢写(spill)到磁盘的临时文件中,并且通过key对数据进行排序(sort)和Combiner(类似于map阶段的reduce,对数据进行局部的合并,Combiner是可选操作)。溢写过程是按照轮询的方式将缓冲区的数据写到通过mapreduce.cluster.local.dir属性指定的目录中 。当整个map任务完成溢写之后,会对所有的临时文件(spill文件)进行归并(merge)操作最终生成一个正式输出文件。此时的归并是将所有spill文件中的相同partition合并到一起,并对各个partition中的数据再进行一次排序(sort),生成key和对应的value-list,文件归并时,如果溢写文件数量超过参数min.num.spills.for.combine的值(默认为3)时,可以再次进行合并。至此,map端shuffle过程结束,接下来等待reduce task来拉取数据。对于reduce端的shuffle过程来说,reduce task在执行之前的工作就是不断地拉取当前job里每个map task的最终结果,然后对从不同地方拉取过来的数据不断地做merge最后合并成一个分区相同的大文件,然后对这个文件中的键值对按照key进行sort排序,排好序之后紧接着进行分组,分组完成后才将整个文件交给reduce task处理。

二:MapReduce的工作流程

###1.Map task

MapReduce的工作流程,shuffle过程,yarn资源调度过程,job提交流程

1.由程序内的InputFormat(默认实现类TextInputFormat)来读取外部数据,它会调用RecordReader(它的成员变量)的read()方法来读取,返回k,v键值对。

2.读取的k,v键值对传送给map()方法,作为其入参来执行用户定义的map逻辑。

3.context.write方法被调用时,outputCollector组件会将map()方法的输出结果写入到环形缓冲区内。

4.环形缓冲区其实就是一个数组,后端不断接受数据的同时,前端数据不断被溢出,长度用完后读取的新数据再从前端开始覆盖。这个缓冲区默认大小100M,可以通过MR.SORT.MB(应该是它)配置。

5.spiller组件会从环形缓冲区溢出文件,这过程会按照定义的partitioner分区(默认是hashpartition),并且按照key.compareTo进行排序(底层主要用快排和外部排序),若有combiner也会执行combiner。spiller的不断工作,会不断溢出许多小文件。这些文件仍在map task所处机器上。

6.小文件执行merge(合并),行程分区且区内有序的大文件(归并排序,会再一次调用combiner)。

7.Reduce会根据自己的分区,去所有map task中,从文件读取对应的数据。

2.Reduce Task

MapReduce的工作流程,shuffle过程,yarn资源调度过程,job提交流程

1.reduce task通过网络向map task获取某一分区的数据。

2.通过GroupingComparator()分辨同一组的数据,把他们发送给reduce(k,iterator)方法

  • (这里多个数据合成一组时,只取其中一个key,取得是第一个)。

3.调用context.write()方法,会让OutPurFormat调用RecodeWriter的write()方法将处理结果写入到数据仓库中。写出的只有一个分区的文件数据,如图。

就此mapreduce的工作结束,其中map的context.write()调用后,开始聚合数据写入到reduce的过程叫做Shuffle,是mapreduce的核心所在。

三:yarn的资源调度过程

yarn主要分为resourmanager和nodemanager

1.resourceManager

ResourceManager 是基于应用程序对集群资源的需求进行调度的 YARN 集群主控节点,负责 协调和管理整个集群(所有 NodeManager)的资源,响应用户提交的不同类型应用程序的 解析,调度,监控等工作。ResourceManager 会为每一个 Application 启动一个 MRAppMaster, 并且 MRAppMaster 分散在各个 NodeManager 节点

resourceManager主要由两个组件构成:调度器(Scheduler)和应用程序管理器(ApplicationsManager, ASM)

YARN 集群的主节点 ResourceManager 的职责:
  1. 处理客户端请求
  2. 启动或监控 MRAppMaster
  3. 监控 NodeManager
  4. 资源的分配与调度

2.nodeManager

NodeManager 是 YARN 集群当中真正资源的提供者,是真正执行应用程序的容器的提供者, 监控应用程序的资源使用情况(CPU,内存,硬盘,网络),并通过心跳向集群资源调度器 ResourceManager 进行汇报以更新自己的健康状态。同时其也会监督 Container 的生命周期 管理,监控每个 Container 的资源使用(内存、CPU 等)情况,追踪节点健康状况,管理日 志和不同应用程序用到的附属服务(auxiliary service)

YARN 集群的从节点 NodeManager 的职责:
  1. 管理单个节点上的资源
  2. 处理来自 ResourceManager 的命令
  3. 处理来自 MRAppMaster 的命令

3.Container

Container 容器是一个抽象出来的逻辑资源单位。容器是由 ResourceManager Scheduler 服务动态分配的资源构成,它包括了该节点上的一定量 CPU,内存,磁盘,网络等信息,MapReduce 程序的所有 Task 都是在一个容器里执行完成的,容器的大小是可以动态调整的

4.ASM

应用程序管理器 ASM 负责管理整个系统中所有应用程序,包括应用程序提交、与调度器协 商资源以启动 MRAppMaster、监控 MRAppMaster 运行状态并在失败时重新启动它等

5.Scheduler

调度器根据应用程序的资源需求进行资源分配,不参与应用程序具体的执行和监控等工作 资源分配的单位就是 Container,调度器是一个可插拔的组件,用户可以根据自己的需求实 现自己的调度器。YARN 本身为我们提供了多种直接可用的调度器,比如 FIFO,Fair Scheduler 和 Capacity Scheduler 等

6.Yarn的工作执行流程

1)用户向ResourceManager提交应用程序,

2)ResourceManager里面的scheduler给该应用程序分配第一个container,并且通知对应的nodemanager启动MRappMaster;

3)MRappMaster启动之后,通过RPC协议向Resourcemanager注册,注册之后用户就可以直接在resourcemanager中查看程序的状态;

3)MrAppMaster通过RPC协议,采用轮询的方式向Resoucemanager申领资源,一旦申请到资源,便于对应的nodemanager通信要求启动任务;

4)nodemanager准备好运行环境,将启动命令写到脚本文件里,通过加载脚本文件启动任务;

5)各个任务通过RPC协议向MRappMaster汇报自身的情况;

6)当任务结束之后,MRappMaster注销自己。

四:job的提交流程

(1)作业提交

第 0 步:client 调用 job.waitForCompletion 方法,向整个集群提交 MapReduce 作业。

第 1 步:client 向 RM 申请一个作业 id。

第 2 步:RM 给 client 返回该 job 资源的提交路径和作业 id。

第 3 步:client 提交 jar 包、切片信息和配置文件到指定的资源提交路径。

第 4 步:client 提交完资源后,向 RM 申请运行 MrAppMaster。

(2)作业初始化

第 5 步:当 RM 收到 client 的请求后,将该 job 添加到容量调度器中。

第 6 步:某一个空闲的 NM 领取到该 job。

第 7 步:该 NM 创建 Container,并产生 MRAppmaster。

第 8 步:下载 client 提交的资源到本地。

(3)任务分配

第 9 步:MrAppMaster 向 RM 申请运行多个 maptask 任务资源。

第 10 步 :RM 将运行 maptask 任务分配给另外两个 NodeManager,另两个 NodeManager

分别领取任务并创建容器。

(4)任务运行

第 11 步:MR 向两个接收到任务的 NodeManager 发送程序启动脚本,这两个NodeManager 分别启动 maptask,maptask 对数据分区排序。

第 12 步:MrAppMaster 等待所有 maptask 运行完毕后,向 RM 申请容器,运行 reduce task。

第 13 步:reduce task 向 maptask 获取相应分区的数据。

第 14 步:程序运行完毕后,MR 会向 RM 申请注销自己。

(5)进度和状态更新

YARN 中的任务将其进度和状态(包括 counter)返回给应用管理器, 客户端每秒(通mapreduce.client.progressmonitor.pollinterval 设置)向应用管理器请求进度更新, 展示给用户。

(6)作业完成

除了向应用管理器请求作业进度外, 客户端每 5 分钟都会通过调用 waitForCompletion()来检查作业是否完成。时间间隔可以通过 mapreduce.client.completion.pollinterval 来设置。作业完成之后, 应用管理器和 container 会清理工作状态。作业的信息会被作业历史服务器存储以备之后用户核查。