MapReduce的工作流程,shuffle过程,yarn资源调度过程,job提交流程
MapReduce的工作流程,shuffle过程,yarn资源调度过程,job提交流程
一:shuffle过程
概述:
mapTask处理的结果存入环形缓冲区中(因为频繁的I/O操作会导致减低效率,所以会有环形缓冲区,默认的环形缓冲区的大小是100M),存入的过程中进行partition(partition默认的是hashCode%reduceTask的个数,也可以自定义,自定义类继承partition类),partiton之后每一个键值对会加上一个partiton的属性值,用于确定是哪个分区;环形缓冲区有两个区,分别是数据区和索引区,当环形缓冲区的值达到阈值(默认是80%,可以通过mapreduce.map.io.sort.split.persen设置)后,有数据溢出,数据区溢出的叫file.out,索引区的file.out.index(多个溢出文件会合并成一个大的溢出文件),会启动线程将数据溢写(spill)到磁盘的临时文件中,并且通过key对数据进行排序(sort)和Combiner(类似于map阶段的reduce,对数据进行局部的合并,Combiner是可选操作)。溢写过程是按照轮询的方式将缓冲区的数据写到通过mapreduce.cluster.local.dir属性指定的目录中 。当整个map任务完成溢写之后,会对所有的临时文件(spill文件)进行归并(merge)操作最终生成一个正式输出文件。此时的归并是将所有spill文件中的相同partition合并到一起,并对各个partition中的数据再进行一次排序(sort),生成key和对应的value-list,文件归并时,如果溢写文件数量超过参数min.num.spills.for.combine的值(默认为3)时,可以再次进行合并。至此,map端shuffle过程结束,接下来等待reduce task来拉取数据。对于reduce端的shuffle过程来说,reduce task在执行之前的工作就是不断地拉取当前job里每个map task的最终结果,然后对从不同地方拉取过来的数据不断地做merge最后合并成一个分区相同的大文件,然后对这个文件中的键值对按照key进行sort排序,排好序之后紧接着进行分组,分组完成后才将整个文件交给reduce task处理。
二:MapReduce的工作流程
###1.Map task
1.由程序内的InputFormat(默认实现类TextInputFormat)来读取外部数据,它会调用RecordReader(它的成员变量)的read()方法来读取,返回k,v键值对。
2.读取的k,v键值对传送给map()方法,作为其入参来执行用户定义的map逻辑。
3.context.write方法被调用时,outputCollector组件会将map()方法的输出结果写入到环形缓冲区内。
4.环形缓冲区其实就是一个数组,后端不断接受数据的同时,前端数据不断被溢出,长度用完后读取的新数据再从前端开始覆盖。这个缓冲区默认大小100M,可以通过MR.SORT.MB(应该是它)配置。
5.spiller组件会从环形缓冲区溢出文件,这过程会按照定义的partitioner分区(默认是hashpartition),并且按照key.compareTo进行排序(底层主要用快排和外部排序),若有combiner也会执行combiner。spiller的不断工作,会不断溢出许多小文件。这些文件仍在map task所处机器上。
6.小文件执行merge(合并),行程分区且区内有序的大文件(归并排序,会再一次调用combiner)。
7.Reduce会根据自己的分区,去所有map task中,从文件读取对应的数据。
2.Reduce Task
1.reduce task通过网络向map task获取某一分区的数据。
2.通过GroupingComparator()分辨同一组的数据,把他们发送给reduce(k,iterator)方法
- (这里多个数据合成一组时,只取其中一个key,取得是第一个)。
3.调用context.write()方法,会让OutPurFormat调用RecodeWriter的write()方法将处理结果写入到数据仓库中。写出的只有一个分区的文件数据,如图。
就此mapreduce的工作结束,其中map的context.write()调用后,开始聚合数据写入到reduce的过程叫做Shuffle,是mapreduce的核心所在。
三:yarn的资源调度过程
yarn主要分为resourmanager和nodemanager
1.resourceManager
ResourceManager 是基于应用程序对集群资源的需求进行调度的 YARN 集群主控节点,负责 协调和管理整个集群(所有 NodeManager)的资源,响应用户提交的不同类型应用程序的 解析,调度,监控等工作。ResourceManager 会为每一个 Application 启动一个 MRAppMaster, 并且 MRAppMaster 分散在各个 NodeManager 节点
resourceManager主要由两个组件构成:调度器(Scheduler)和应用程序管理器(ApplicationsManager, ASM)
YARN 集群的主节点 ResourceManager 的职责:
- 处理客户端请求
- 启动或监控 MRAppMaster
- 监控 NodeManager
- 资源的分配与调度
2.nodeManager
NodeManager 是 YARN 集群当中真正资源的提供者,是真正执行应用程序的容器的提供者, 监控应用程序的资源使用情况(CPU,内存,硬盘,网络),并通过心跳向集群资源调度器 ResourceManager 进行汇报以更新自己的健康状态。同时其也会监督 Container 的生命周期 管理,监控每个 Container 的资源使用(内存、CPU 等)情况,追踪节点健康状况,管理日 志和不同应用程序用到的附属服务(auxiliary service)
YARN 集群的从节点 NodeManager 的职责:
- 管理单个节点上的资源
- 处理来自 ResourceManager 的命令
- 处理来自 MRAppMaster 的命令
3.Container
Container 容器是一个抽象出来的逻辑资源单位。容器是由 ResourceManager Scheduler 服务动态分配的资源构成,它包括了该节点上的一定量 CPU,内存,磁盘,网络等信息,MapReduce 程序的所有 Task 都是在一个容器里执行完成的,容器的大小是可以动态调整的
4.ASM
应用程序管理器 ASM 负责管理整个系统中所有应用程序,包括应用程序提交、与调度器协 商资源以启动 MRAppMaster、监控 MRAppMaster 运行状态并在失败时重新启动它等
5.Scheduler
调度器根据应用程序的资源需求进行资源分配,不参与应用程序具体的执行和监控等工作 资源分配的单位就是 Container,调度器是一个可插拔的组件,用户可以根据自己的需求实 现自己的调度器。YARN 本身为我们提供了多种直接可用的调度器,比如 FIFO,Fair Scheduler 和 Capacity Scheduler 等
6.Yarn的工作执行流程
1)用户向ResourceManager提交应用程序,
2)ResourceManager里面的scheduler给该应用程序分配第一个container,并且通知对应的nodemanager启动MRappMaster;
3)MRappMaster启动之后,通过RPC协议向Resourcemanager注册,注册之后用户就可以直接在resourcemanager中查看程序的状态;
3)MrAppMaster通过RPC协议,采用轮询的方式向Resoucemanager申领资源,一旦申请到资源,便于对应的nodemanager通信要求启动任务;
4)nodemanager准备好运行环境,将启动命令写到脚本文件里,通过加载脚本文件启动任务;
5)各个任务通过RPC协议向MRappMaster汇报自身的情况;
6)当任务结束之后,MRappMaster注销自己。
四:job的提交流程
(1)作业提交
第 0 步:client 调用 job.waitForCompletion 方法,向整个集群提交 MapReduce 作业。
第 1 步:client 向 RM 申请一个作业 id。
第 2 步:RM 给 client 返回该 job 资源的提交路径和作业 id。
第 3 步:client 提交 jar 包、切片信息和配置文件到指定的资源提交路径。
第 4 步:client 提交完资源后,向 RM 申请运行 MrAppMaster。
(2)作业初始化
第 5 步:当 RM 收到 client 的请求后,将该 job 添加到容量调度器中。
第 6 步:某一个空闲的 NM 领取到该 job。
第 7 步:该 NM 创建 Container,并产生 MRAppmaster。
第 8 步:下载 client 提交的资源到本地。
(3)任务分配
第 9 步:MrAppMaster 向 RM 申请运行多个 maptask 任务资源。
第 10 步 :RM 将运行 maptask 任务分配给另外两个 NodeManager,另两个 NodeManager
分别领取任务并创建容器。
(4)任务运行
第 11 步:MR 向两个接收到任务的 NodeManager 发送程序启动脚本,这两个NodeManager 分别启动 maptask,maptask 对数据分区排序。
第 12 步:MrAppMaster 等待所有 maptask 运行完毕后,向 RM 申请容器,运行 reduce task。
第 13 步:reduce task 向 maptask 获取相应分区的数据。
第 14 步:程序运行完毕后,MR 会向 RM 申请注销自己。
(5)进度和状态更新
YARN 中的任务将其进度和状态(包括 counter)返回给应用管理器, 客户端每秒(通mapreduce.client.progressmonitor.pollinterval 设置)向应用管理器请求进度更新, 展示给用户。
(6)作业完成
除了向应用管理器请求作业进度外, 客户端每 5 分钟都会通过调用 waitForCompletion()来检查作业是否完成。时间间隔可以通过 mapreduce.client.completion.pollinterval 来设置。作业完成之后, 应用管理器和 container 会清理工作状态。作业的信息会被作业历史服务器存储以备之后用户核查。