Hadoop——YARN组件
1 YARN
Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而MapReduce等运算程序则相当于运行于操作系统之上的应用程序。
1.1 旧的MapReduce架构
JobTracker: 负责资源管理,跟踪资源消耗和可用性,作业生命周期管理(调度作业任务,跟踪进度,为任务提供容错)
TaskTracker: 加载或关闭任务,定时报告认为状态
此架构会有以下问题:
- JobTracker是MapReduce的集中处理点,存在单点故障
- JobTracker完成了太多的任务,造成了过多的资源消耗,当MapReduce job 非常多的时候,会造成很大的内存开销。这也是业界普遍总结出老Hadoop的MapReduce只能支持4000 节点主机的上限
- 在TaskTracker端,以map/reduce task的数目作为资源的表示过于简单,没有考虑到cpu内存的占用情况,如果两个大内存消耗的task被调度到了一块,很容易出现OOM
- 在TaskTracker端,把资源强制划分为map task slot和reduce task slot, 如果当系统中只有map task或者只有reduce task的时候,会造成资源的浪费,也就集群资源利用的问题
总的来说就是单点问题和资源利用率问题
1.2 HA架构
YARN就是将JobTracker的职责进行拆分,将资源管理和任务调度监控拆分成独立的进程:一个全局的资源管理和一个每个作业的管理(ApplicationMaster); ResourceManager和NodeManager提供了计算资源的分配和管理,而ApplicationMaster则完成应用程序的运行
- ResourceManager: 全局资源管理和任务调度
- NodeManager: 单个节点的资源管理和监控
- ApplicationMaster: 单个作业的资源管理和任务监控
- Container: 资源申请的单位和任务运行的容器
1.3 Hadoop ResourceManager
YARN - ResourceManager
负责全局的资源管理和任务调度,把整个集群当作计算资源池,只关注分配,不管应用,且不负责容错
资源管理
- 以前资源是每个节点分成一个个的Map slot和Reduce slot,现在是一个个Container,每个Container可以根据需要运行ApplicationMaster、Map、Reduce或者任意的程序
- 以前的资源分配是静态的,目前是动态的,资源利用率更高
- Container是资源申请的单位,一个资源申请格式:
<resource-name, priority, resource-requirement, number-of-containers>
, resource-name:主机名、机架名或*(代表任意机器), resource-requirement:目前只支持CPU和内存 - 用户提交作业到ResourceManager,然后在某个NodeManager上分配一个Container来运行ApplicationMaster,ApplicationMaster再根据自身程序需要向ResourceManager申请资源
- YARN有一套Container的生命周期管理机制,而ApplicationMaster和其Container之间的管理是应用程序自己定义的
任务调度
- 只关注资源的使用情况,根据需求合理分配资源
- Scheluer可以根据申请的需要,在特定的机器上申请特定的资源(ApplicationMaster负责申请资源时的数据本地化的考虑,ResourceManager将尽量满足其申请需求,在指定的机器上分配Container,从而减少数据移动)
1.4 YARN - NodeManager
Node节点下的Container管理
- 启动时向ResourceManager注册并定时发送心跳消息,等待ResourceManager的指令
- 监控Container的运行,维护Container的生命周期,监控Container的资源使用情况
- 启动或停止Container,管理任务运行时的依赖包(根据ApplicationMaster的需要,启动Container之前将需要的程序及其依赖包、配置文件等拷贝到本地)
1.5 YARN - ApplicationMaster
单个作业的资源管理和任务监控
具体功能描述:
- 计算应用的资源需求,资源可以是静态或动态计算的,静态的一般是Client申请时就指定了,动态则需要ApplicationMaster根据应用的运行状态来决定
- 根据数据来申请对应位置的资源(Data Locality)
- 向ResourceManager申请资源,与NodeManager交互进行程序的运行和监控,监控申请的资源的使用情况,监控作业进度
- 跟踪任务状态和进度,定时向ResourceManager发送心跳消息,报告资源的使用情况和应用的进度信息
- 负责本作业内的任务的容错
ApplicationMaster可以是用任何语言编写的程序,它和ResourceManager和NodeManager之间是通过ProtocolBuf交互,以前是一个全局的JobTracker负责的,现在每个作业都一个,可伸缩性更强,至少不会因为作业太多,造成JobTracker瓶颈。同时将作业的逻辑放到一个独立的ApplicationMaster中,使得灵活性更加高,每个作业都可以有自己的处理方式,不用绑定到MapReduce的处理模式上
如何计算资源需求
一般的MapReduce是根据block数量来定Map和Reduce的计算数量,然后一般的Map或Reduce就占用一个Container
如何发现数据的本地化
数据本地化是通过HDFS的block分片信息获取的
1.6 YARN - Container
- 基本的资源单位(CPU、内存等)
- Container可以加载任意程序,而且不限于Java
- 一个Node可以包含多个Container,也可以是一个大的Container
- ApplicationMaster可以根据需要,动态申请和释放Container
1.7 YARN - Failover
失败类型
程序问题
进程崩溃
硬件问题
任务失败
- 运行时异常或者JVM退出都会报告给ApplicationMaster
- 通过心跳来检查挂住的任务(timeout),会检查多次(可配置)才判断该任务是否失效
- 一个作业的任务失败率超过配置,则认为该作业失败
- 失败的任务或作业都会有ApplicationMaster重新运行
ApplicationMaster失败
- ApplicationMaster定时发送心跳信号到ResourceManager,通常一旦ApplicationMaster失败,则认为失败,但也可以通过配置多次后才失败
- 一旦ApplicationMaster失败,ResourceManager会启动一个新的ApplicationMaster
- 新的ApplicationMaster负责恢复之前错误的ApplicationMaster的状态(
yarn.app.mapreduce.am.job.recovery.enable=true
),这一步是通过将应用运行状态保存到共享的存储上来实现的,ResourceManager不会负责任务状态的保存和恢复 - Client也会定时向ApplicationMaster查询进度和状态,一旦发现其失败,则向ResouceManager询问新的ApplicationMaster
NodeManager失败
- NodeManager定时发送心跳到ResourceManager,如果超过一段时间没有收到心跳消息,ResourceManager就会将其移除
- 任何运行在该NodeManager上的任务和ApplicationMaster都会在其他NodeManager上进行恢复
- 如果某个NodeManager失败的次数太多,ApplicationMaster会将其加入黑名单(ResourceManager没有),任务调度时不在其上运行任务
ResourceManager失败
- 通过checkpoint机制,定时将其状态保存到磁盘,然后失败的时候,重新运行
- 通过zookeeper同步状态和实现透明的HA
- 可以看出,一般的错误处理都是由当前模块的父模块进行监控(心跳)和恢复。而最顶端的模块则通过定时保存、同步状态和zookeeper来ֹ实现HA
2 Yarn工作机制
工作机制详解
(1)MR程序提交到客户端所在的节点。
(2)YarnRunner向ResourceManager申请一个Application。
(3)RM将该应用程序的资源路径返回给YarnRunner。
(4)该程序将运行所需资源提交到HDFS上。
(5)程序资源提交完毕后,申请运行mrAppMaster。
(6)RM将用户的请求初始化成一个Task。
(7)其中一个NodeManager领取到Task任务。
(8)该NodeManager创建容器Container,并产生MRAppmaster。
(9)Container从HDFS上拷贝资源到本地。
(10)MRAppmaster向RM 申请运行MapTask资源。
(11)RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。
(12)MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序。
(13)MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。
(14)ReduceTask向MapTask获取相应分区的数据。
(15)程序运行完毕后,MR会向RM申请注销自己。
作业提交全过程详解
(1)作业提交
第1步:Client调用job.waitForCompletion方法,向整个集群提交MapReduce作业。
第2步:Client向RM申请一个作业id。
第3步:RM给Client返回该job资源的提交路径和作业id。
第4步:Client提交jar包、切片信息和配置文件到指定的资源提交路径。
第5步:Client提交完资源后,向RM申请运行MrAppMaster。
(2)作业初始化
第6步:当RM收到Client的请求后,将该job添加到容量调度器中。
第7步:某一个空闲的NM领取到该Job。
第8步:该NM创建Container,并产生MRAppmaster。
第9步:下载Client提交的资源到本地。
(3)任务分配
第10步:MrAppMaster向RM申请运行多个MapTask任务资源。
第11步:RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。
(4)任务运行
第12步:MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序。
第13步:MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。
第14步:ReduceTask向MapTask获取相应分区的数据。
第15步:程序运行完毕后,MR会向RM申请注销自己。
(5)进度和状态更新
YARN中的任务将其进度和状态(包括counter)返回给应用管理器, 客户端每秒(通过mapreduce.client.progressmonitor.pollinterval设置)向应用管理器请求进度更新, 展示给用户。
(6)作业完成
除了向应用管理器请求作业进度外, 客户端每5秒都会通过调用waitForCompletion()来检查作业是否完成。时间间隔可以通过mapreduce.client.completion.pollinterval来设置。作业完成之后, 应用管理器和Container会清理工作状态。作业的信息会被作业历史服务器存储以备之后用户核查。
作业提交过程之MapReduce
3 YARN-HA配置
配置YARN-HA集群
- 环境准备
(1)修改IP
(2)修改主机名及主机名和IP地址的映射
(3)关闭防火墙
(4)ssh免密登录
(5)安装JDK,配置环境变量等
(6)配置Zookeeper集群 - 规划集群
- 具体配置
(1)yarn-site.xml
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<!--启用resourcemanager ha-->
<property>
<name>yarn.resourcemanager.ha.enabled</name>
<value>true</value>
</property>
<!--声明两台resourcemanager的地址-->
<property>
<name>yarn.resourcemanager.cluster-id</name>
<value>cluster-yarn1</value>
</property>
<property>
<name>yarn.resourcemanager.ha.rm-ids</name>
<value>rm1,rm2</value>
</property>
<property>
<name>yarn.resourcemanager.hostname.rm1</name>
<value>hadoop102</value>
</property>
<property>
<name>yarn.resourcemanager.hostname.rm2</name>
<value>hadoop103</value>
</property>
<!--指定zookeeper集群的地址-->
<property>
<name>yarn.resourcemanager.zk-address</name>
<value>hadoop102:2181,hadoop103:2181,hadoop104:2181</value>
</property>
<!--启用自动恢复-->
<property>
<name>yarn.resourcemanager.recovery.enabled</name>
<value>true</value>
</property>
<!--指定resourcemanager的状态信息存储在zookeeper集群-->
<property>
<name>yarn.resourcemanager.store.class</name> <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
</property>
</configuration>
(2)同步更新其他节点的配置信息
启动hdfs
(1)在各个JournalNode节点上,输入以下命令启动journalnode服务:
sbin/hadoop-daemon.sh start journalnode
(2)在[nn1]上,对其进行格式化,并启动:
bin/hdfs namenode -format
sbin/hadoop-daemon.sh start namenode
(3)在[nn2]上,同步nn1的元数据信息:
bin/hdfs namenode -bootstrapStandby
(4)启动[nn2]:
sbin/hadoop-daemon.sh start namenode
(5)启动所有DataNode
sbin/hadoop-daemons.sh start datanode
(6)将[nn1]切换为Active
bin/hdfs haadmin -transitionToActive nn1
启动YARN
(1)在hadoop102中执行:
sbin/start-yarn.sh
(2)在hadoop103中执行:
sbin/yarn-daemon.sh start resourcemanager
(3)查看服务状态,如图3-24所示
bin/yarn rmadmin -getServiceState rm1