MapReduce运行机制(一) 剖析MR作业运行机制

剖析MR作业运行机制


MR作业:

1) 客户端:提交MapReduce作业

2) YARN Resource Manager,协调集群资源分配

3) YARN Node Manager,启动和监控集群计算容器

4) MapReduce application master,协调运行MR作任务。和MR任务在容器中运行,容器由RM分配,NM启动,监控和管理

5) HDFS 与其他实体间共享作业文件


提交作业

Job的submit()方法创建JobCommitter的实例,并调用submitJobInternal()。提交作业后,waitForCompletion()每秒轮询作业进度,如果与上次报告有改变,则将进度报告到控制台。完成后如果成功,显示Counter,如果失败,显示错误详细信息。

MapReduce运行机制(一) 剖析MR作业运行机制

Jobcommitter作业提交过程:

1) 向RM申请新的application ID, 用于MR Job ID

2) 检查作业的输出说明,有错误则抛出给MR程序,无错误则提交

3) 计算作业的输入分片,有错误则抛出给MR程序

4) 将运行作业所需要的资源(作业JAR文件,配置文件,计算所得输入分片)复制到Job ID命名的目录下的共享文件系统中。

mapreduce.client.submit.file.replication可以控制副本数量,默认是10,如果副本多的话,集群很多副本都可以供NM访问。

5) 调用RM的submitApplication()提交作业


作业的初始化

1) 资源管理器收到调用submitApplication()的请求后,将请求传递给YARN scheduler,scheduler分配一个容器,然后在NM的管理下由RM启动applicatoin master进程,注意:容器由NM启动和管理,application master由RM启动

2) 对于MR程序,application master是一个Java应用程序MRAppMaster

    MRAppMaster初始化过程:

    > 通过创建多个bookkeeping对象保持对作业进度的跟踪,接收来自任务的进度和完成报告

    > 从共享文件系统接受输入分片,该分片由客户端计算,在作业提交时(JAR/配置文件/输入分片)提交给共享文件系统如HDFS

    > 为每一个分片创建一个map任务对象,由mapreduce.job.reduces或者Job的setNumReduceTasks确定多个reduce任务对象,并分配任务ID

3) application master决定如何运行任务

    > 小作业可以和application master在同一个JVM上运行,如果在新的容器中分配运行任务的开销大于在一个节点上运行,则可以在一个JVM上运行作业,这种作业叫做uber作业

    > 小作业:默认小于10个mapper且只有1个reducer且输入大小小于一个HDFS块的作业

    mapreduce.job.ubertask.maxmaps/mapreduce.job.ubertask.maxreduces/mapreduce.job.ubertask.maxbytes

    启用uber任务 mapreduce.job.ubertask.enabled设置为true

    > 在任务运行之前,application master调用setupJob()设置outputCommitter,FileOutputCommitter为默认值,建立作业左中输出目录和任务输出的临时工作空间


任务的分配

1) Map任务请求RM分配容器,优先级高于reduce任务,因为map要在reduce排序阶段能够启动前完成,默认时5%的map任务完成时,reduce任务请求才会发出

2) reduce任务能够在集群任意位置运行,map任务请求有数据本地化局限

3) 任务是数据本地化/机架本地化/非机架本地化,特定的作业,可以查看计数器确定不同本地化层次的任务数量

4) 请求为任务制定了内存和CPU,任务默认分配1024M内存和一个虚拟内核

    mapreduce.map.memory.mb/mapreduce.reduce.memory.mb

    mapreduce.map.cpu.vcores/mapreduce.reduce.vcoresp.memory.mb


任务的执行

1) scheduler分配容器,application master与NM通信建立启动容器。任务由YarnChild的Java应用程序执行

    > YarnChild在指定JVM中运行,用户定义的map或reduce函数任何缺陷不会影响到NM

2) 运行任务之前,将任务需要的资源本地化,JAR/配置文件/来自分布式缓存的文件

3) 运行map任务或者reduce任务

4) 每个任务执行setup和commit动作,在JVM中运行,由作业的outputCommitter确定

    > 文件作业提交动作将任务输出由临时位置移到最终位置。提交协议确保当执行开始时,只有一个副本被提交

5) Streaming:Streaming运行特殊的map任务和reduce任务,运行用户提供的可执行程序,并与之通信

    > 使用标准输入和输出流与进程通信

    > 任务执行过程中,Java进程把输入键值对传给外部进程,外部进程通过用户自定义map和reduce执行并把输出键值对传回给Java进程。


进度和状态更新

1) 作业和任务的状态/map和reduce的进度/作业Counter的值/状态消息后者描述

2) 进度状态与客户端通信

    > 跟踪进度 

        >> map进度:map已处理输入占输入的比例

        >>reduce进度: shuffle对应

    > 任务

        >> 任务的计数器: 对任务运行过程中的事件进行计数,内置于框架或用户定义

      >> 任务运行时,子进程和application master使用umbilical接口通信,每3秒向application master报告进度和状态,application master形成作业汇聚视图

    > 客户端

        >> 客户端每秒轮询application master接收最新状态,使用mapreduce.client.progressmonitor.pollinterval设置或者Job的getStatus()

MapReduce运行机制(一) 剖析MR作业运行机制


作业的完成

1) application master收到作业最后一个任务完成的通知,将作业状态设置为成功。Job轮询时,通知任务已成功完成,Job打印成功消息(成功+统计信息+Counter)通知客户端,从waitForCompletion()方法返回

2) application master设置mapreduce.job.end-notification.url属性发送HTTP作业通知

3) application master和任务容器清理工作状态,中间输出会被删除,OutputCommitter的commitJob()方法会被调用。作业信息在作业历史服务器存档,可以查询