mapreduce 过程详解/yarn技术架构详解/mapreduce工作机制
荐书:《hadoop技术内幕》
对于MapReduce的工作原理,看视频,看直播,都云里雾里的。没办法,找书吧,找的时候还想着,就算只有那本hadoop权威指南,我也只能硬着头皮去啃了。然后找到这本,终于解惑了,读的停不下来。真是一本好书,不晦涩,连涉及到的基础知识点都解释的非常详细,java基础一般的读起来也没有困难。
PS:书是基于hadoop1.x写的。
在MapReduce计算框架中,一个应用程序被划分成Map和Reduce两个计算阶段,它们分别由一个或者多个Map Task和Reduce Task组成。其中,每个Map Task处理输入集合中的一个数据分片(InputSplit),Reduce Task则从每个Map Task上远程拷贝相应的数据片段,经过分组聚集和规约后,将结果写到hdfs上作为最终结果。
一、自己简单总结一下MapTask过程
1、首先InputFormat处理每个分片的数据,默认是用TextInputFormat,按行将数据处理成key=行偏移量、value=当前行的一串字符,然后传给map()方法。用户可以自定义InputFormat去解析每行数据,自定义复杂的key的类型和value的类型
FileInputFormat(自定义InputFormat一般实现此接口),
TextInputFortmat,(key=行偏移量,value=行字符串)
KeyValueTextInputFormat(以\t为分隔符,key=\t前的数据,value=\t后的数据)
CombineFileInputFormat(合并大量小文件时使用)
MultipleInputs(多种格式输入处理,可以为每种格式的输入指定逻辑处理的Mapper)
CustomInputFormat extends FileInputFormat {
public InputSplit[] getSplits (JobConf job, int numSplits) throws IOException {//使用父类的实现即可,不用自己实现 }
public RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException {
return new MyRecorderReader();}
}
class MyRecorderReader extends RecorderReader<K,V> {
initialize()
getCurrentKey()
getProcess()
}
2、开启线程,调用用户自定义的map()方法,将传入的一个个key/value,经过数据进行过滤、清洗等,再由context.write()输出一个个新的key/value
3、调用用户自定义的partitioner方法,对数据进行分区。在context.write()方法内部,会首先调用用户自定义的partitioner方法,获取partitionno,形成一个三元组<key,value,partitionno>,然后将数据写入内存中的二级索引结构的环形缓冲区。
4、数据溢写。当环形缓冲区中数据达到设定的阈值后,会溢写到本地磁盘上去。
1)排序:溢写时首先要对数据进行排序,先快排,当需要快排的数据小于13个时,变成直接插入排序。排序的元祖是:partitionno,key,这样经过排序后,数据以分区为单位聚集在一起,且同一分区内的所有相同key的数据也聚集在一起。
2)调用用户自定义的combiner,对每个分区中的数据进行一次聚集操作。
3)将数据写入Map任务工作目录下的临时文件output/spillN.out中,其中N是当前是第几次溢写。并将分区数据的元信息(partitionno,spillN,offset)写到内存索引数据结构spillRecord中。
5、合并临时文件。
当Map将所有数据都处理完后,会将所有临时文件合并成一个大文件out/file.out,同时生成索引文件out/file.out.index。合并时,以分区为单位,采用小顶堆,首先对文件大小每进行排序,默认合并最小的100个文件,合并后的文件作为一个新文件,再次参与新的排序,重复这个过程,直到最后生成一个大文件。
如果用户指定Reduce Task数量为0,则直接将文件存储到HDFS上。
6、疑问:每台机器同时能运行多个应用的Map任务否,能?
每个Map任务都会启动一个JVM进程吗,是?
同一个应用的Map任务,有可能能在一台机器上开启多个吗,不能?
对于某台机器上一个正在运行的Map任务内部,InputFormat是多线程并发的吗,不是?
对于某台机器上一个正在运行的Map任务,map是多线程并发的吗,是?
需要将应用程序代码拷贝到每台机器上去吗?
二、Reduce Task过程
1、shuffle:getMapEventsThread线程周期性从Task Tracker上获取已完成的Map Task任务列表,将其保存到映射表中,
为防止后续拷贝时出现网络热点(多个Reduce同时拷贝同一个Map任务的输出数据,同一台主机),会将映射表的顺序打乱,即“混洗”。
2、merge:远程拷贝数据:多线程并发,HTTPGet拷贝多个MapTask的输出,直接存放到内存,如果分片比较大,就会存到磁盘上。
为防止文件过多,会不断对这些数据和文件进行合并,
3、sort reduce:这两个是并发进行的,
Map输出的数据本身已经sort过一次了,只需进行归并排序即可。通过排序聚集了相同的key,直接传给reduce()方法。
三、应用程序优化
根据MapReduce过程分析,可以通过以下方式优化。
1、如果可以,设置Combiner,这样就可以减少Map的输出,减少shuffle阶段拷贝数据的耗时,因为在大数据环境下,网络耗时是最瓶颈。
2、使用压缩,
3、合理合适map Task和ReduceTask 的数量
如果一个job的输入超过1TB,那么久增加blocksize到256或者512,这样能减少task的数量。动态调整已有数据的块大小的命令:
hadoop distcp -Ddfs.block.size=$[256*1024*1024]/path/to/inputdata /path/to/inputdata-with/largeblocks,命令执行完成后,将原来的/path/to/inputdata目录删掉,然后修改新的目录为原来的目录名自
只要Task的耗时超过30-40s,那么就增加maptask的数量,增加到整个cluster上mapslot总数的几倍。如果你的cluster中有100个mapslot,那就避免运行一个有101个map task的job。
不要调度太多的reduce task,推荐reduce task的数量应当等于或者略小于reduceslot的总数量。并且略小于或者等于map task的数量
4、选择合理的Writeable类型,尽管Text对象看起来很方便,但它在由数值转换到文本或者由UTF-8字符转换到文本时都是低效的,且消耗大量CPU时间。
5、重用writeable,不要使用context.write(new Text(key),new IntWritable(value)),这样会生成成千上万个对象,造成频繁GC?匿名变量也会如此?
而是应该在mapper类内,map方法外定义两个变量:Text key=new Text();IntWriteable value = new IntWriteable(1);
运行流程
当你在MapReduce程序中调用了Job实例的Submit()或者waitForCompletion()方法,该程序将会被提交到Yarn中运行
其中的过程大部分被Hadoop隐藏起来了,对开发者来说是透明的
程序运行的过程涉及到个概念:
1.Client:提交程序的客户端
2.ResourceManager:集群中的资源分配管理
3.NodeManager:启动和监管各自节点上的计算资源
4.ApplicationMaster:每个程序对应一个AM,负责程序的任务调度,本身也是运行在NM的Container中
5.HDFS:分布式文件系统,和以上各个实体进行作业数据的共享
MapReduce作业在Yarn中的运行流程如图所示:
这张图和Hadoop核心组件之Yarn中提到的流程图很相似
因为MapReduce作业也是属于Yarn管理的一部分,只是这张图针对MapReduce的运行更加细化了
作业提交
1.客户端调用Job实例的Submit()或者waitForCompletion()方法提交作业
2.客户端向RM请求分配一个Application ID
进行步骤2的时候,客户端会对程序的输出路径进行检查,如果没有问题,进行作业输入分片的计算
3.将作业运行所需要的资源拷贝到HDFS中,包括jar包、配置文件和计算出来的输入分片信息等
4.调用RM的submitApplication方法将作业提交到RM
作业初始化
5a.RM收到submitApplication方法的调用之后会命令一个NM启动一个Container
5b.在该NM的Container上启动管理该作业的ApplicationMaster进程
6.AM对作业进行初始化操作,并将会接收作业的处理和完成情况报告
7.AM从HDFS中获得输入数据的分片信息
在步骤7中,AM将会根据分片信息确定要启动的map任务数,reduce任务数则根据mapreduce.job.reduces属性或者Job实例的setNumReduceTasks方法来决定
任务分配
8.AM为其每个map和reduce任务向RM请求计算资源
步骤8中,map任务的资源分配是要优先于reduce任务的,因为在reduce的排序阶段开始之前,map任务必须全部完成
因此,reduce任务的资源请求只有当map任务完成了至少5%的时候才会进行
reduce任务是可以在集群上的任意一个节点运行的,所以进行计算资源分配的时候RM不需要为reduce任务考虑分配哪个节点的资源给它
但是map任务不一样,map任务有一个数据本地化的优化特性
数据本地优化是指map任务中处理的数据存储在各个运行map本身的节点上,这能够使得作业以最好的状态运行,因为不需要跨界点消耗网络带宽进行数据传输
移动计算而不移动数据
Yarn会优先给map任务分配本地数据,如果不存在,则在同一机架内的不同节点上搜寻数据,最差的情况是跨机架之间的数据传输
map每个split大小默认和hdfs的block块大小一致的原因就是:
太大,会导致map读取的数据可能跨越不同的节点,没有了数据本地化的优势
太小,会导致map数量过多,任务启动和切换开销太大,并行度过高
RM在map任务要处理的数据块的那个节点上为map分配计算资源,如此一来,map任务就不需要跨网络进行数据传输了
因为AM中有输入数据的分片信息和要启动的map任务的信息,所以在为map任务请求资源的时候,RM会根据这些信息为map分配计算资源
这里的计算资源指的是map/reduce任务运行所需要的内存和CPU等,默认每个任务分配1024M的内存和一个CPU虚拟核心
可以通过修改以下选项来修改这个配置:
- mapreduce.map.memory.mb
- mapreduce.reduce.memory.mb
- mapreduce.map.cpu.vcores
- mapreduce.reduce.cpu.vcores
任务执行
9a.AM在RM指定的NM上启动Container
9b.在Container上启动任务(通过YarnChild进行来运行)
10.在真正执行任务之前,从HDFS从将任务运行需要的资源拷贝到本地,包括jar包、配置文件信息和分布式缓存文件等
11.执行map/reduce任务
作业完成
作业执行过程中,我们可以通过Yarn Web UI界面的AM页面中查看作业的运行信息
当在客户端调用waitForCompletion方法,每隔5秒钟客户端会检查一次作业的运行情况
作业执行完毕之后将调用OutputCommitter方法对作业的信息进行最后的清理工作
失败处理
在实际场景中,用户的代码总是会有bug、程序异常和节点失效等问题,Hadoop提供了失败处理的机制尽可能的保证用户的作业可以被顺利完成
其中的过程需要考虑以下几个实体:
Task的失败
任务失败标记
当由于用户的代码导致map/reduce任务产生运行时异常时
在该任务退出之前,JVM会发送报告给AM,并且将错误信息写入用户日志中
AM将该任务标记为失败,释放Container资源
除了用户代码之外,还有很多其他原因会导致任务失败,如JVM的bug等
AM会间隔性的接收来自各个任务的汇报,当一段时间过后AM没有接收到某个任务的报告
AM将会判断该任务超时,将该任务标记为失败并让节点上的任务退出
任务超时的时间可以在作业中通过mapreduce.task.timeout选项来为每个作业单独配置
设置为0表示无任务超时时间,此时任务运行再久也不会被标记为失败,其资源也无法释放,会导致集群效率降低
失败任务的重启
当AM注意到一个任务失败了之后,将会尝试重新调度该任务
任务的重试不会在之前失败了的节点上运行,并且失败四次之后AM将不会继续重启该任务
这个值同样是可以配置的:
- mapreduce.map.maxattempts
- mapreduce.reduce.maxattempts
默认的,一旦作业中有任何一个任务失败超过4次,那么整个作业将会标记为失败
但是很多情况下,即使作业中的某些任务失败了,其他任务的执行结果还是有价值的
所以我们可以配置一个作业中允许任务失败的最大比例:
- mapreduce.map.failures.maxpercent
- mapreduce.reduce.failures.maxpercent
ApplicationMater的失败
和任务失败一样,AM也可能由于各种原因(如网络问题或者硬件故障)失效,Yarn同样会尝试重启AM
可以为每个作业单独配置AM的尝试重启次数:mapreduce.am.max-attempts,默认值为2
需要注意的是,Yarn中限制了每个AM重启的最大限制,默认也为2,如果为单个作业设置重启次数为3,超过了这个上限也不会起到作用
所以还需要注意将Yarn中的上限一起提高:yarn.resourcemanager.am.nax-attempts
由于AM会通过心跳机制向RM信息,当RM注意到AM失败了之后,会在另外一个节点的Container上重启,并将恢复已经执行的任务进度(心跳机制保留)
使得重启的AM不用重头执行任务,任务进度恢复默认是开启的,可以通过yarn.app.mapreduce.am.job.recovery.enable为false来禁用
客户端通过AM来获得作业的执行情况,当AM失效的时候,客户端会重新向RM请求新的AM地址来更新信息
NodeManager的失败
NM也通过心跳机制向RM汇报情况,当一个NM失效,或者运行缓慢的时候,RM将收不到该NM的心跳,或者心跳时间超时
此时RM会认为该NM失败并移出可用NM管理池,心跳超时的时间通过yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms来配置
默认为10分钟
在失败的NM上运行的AM或者任务都会按照之前讨论的机制进行恢复
Yarn对于NM的管理还有一个类似黑名单的功能,当该NM上的任务失败次数超过3次之后(默认),该NM会被拉入黑名单
此时,即使该NM没有失效,AM也不会在该NM上运行任务了
NM上的最大任务失败次数可以通过mapreduce.job.maxtaskfailures.per.tracker来配置
ResourceManager的失败
RM是Yarn中最终要的一个角色,没有了RM集群将无法使用
RM类似于HDFS中的Namenode,需要一种高可用的机制来保障
RM一开始就使用一种检查点的机制来将集群信息持久化到磁盘中
当RM失效了之后,管理员可以手动启动一个备用的RM读取持久化的信息
Shuffle和Sort
Shuffle是MapReduce中的核心概念,也是代码优化中很重要的一部分
理解Shuffle过程可以帮助你写出更高级的MapReduce程序
Map Side
如上图所示,map函数在产生数据的时候并不是直接写入磁盘的,而是先写入一个内存中的环形缓冲区
理由是在内存中对数据进行分区、分组排序等操作对比在磁盘上快很多
每个map都有一个缓冲区,默认大小为100M
当缓冲区的数据达到一个阈值的时候将会产生spill操作写入到磁盘中,阈值默认为80%
缓冲区中的数据spill的时候,map产生的数据会源源不断的写入到缓冲区中空出来的空间
当缓冲区占满的时候,map任务将会阻塞直到缓冲区可以写入数据
在map的数据写入磁盘之前,在内存的缓冲区中会根据程序中设置的分区数对数据进行分区
并在每个分区内对数据进行分组和排序
如果设置了combiner函数,其将会在排序之后的数据上运行,以减少写入到磁盘的数据量
缓冲区的每次spill操作都会在磁盘上产生一个spill文件,所以一个map可能产生多个spill文件
任务完成之前,这些spill文件会被合并为一个已分区且已排序的输出文件
如果至少存在三个spill文件(默认)且设置了combiner函数,那么在合并spill文件再次写入到磁盘的时候会再次调用combiner
此时Map端的任务完成
Reduce Side
Map端的任务完成之后,Reduce端将会启动会多个线程通过HTTP的方式到Map端获取数据
为了优化reduce的执行时间,hadoop中等第一个map结束后,所有的reduce就开始尝试从完成的map中下载该reduce对应的partition部分数据
在这个shuffle过程中,由于map的数量通常是很多个的,而每个map中又都有可能包含每个reduce所需要的数据
所以对于每个reduce来说,去各个map中拿数据也是并行的
Reduce端会启动一个线程周期性的向AM请求Map端产生数据所在的位置(map任务和AM之间有心跳机制)
Map端产生的数据并不会在Reduce端获取之后马上删除,因为reduce任务可能会因为失败而重启
Reduce端将Map端的数据拷贝过来之后也会放入一个内存缓冲区中,数据达到缓冲区的指定阈值之后h合并写入到磁盘
随着磁盘数据文件的增多,和Map端一样,Reduce端也会对溢出文件进行合并
mapreduce.task.io.sort.factor可以控制Map和Reduce端的文件数量达到多少个时进行合并
和Map端的合并不同,假设上述选项采用默认值10,共有40个溢出文件
Map端最终会合并形成4个文件
而Reduce端第一次只会合并4个文件,随后三次各合并10个文件,还剩余6个文件
此时Reduce端中的文件还有10个,最后一次合并为1个文件输入到reduce函数中
由此可以看出,Reduce端的合并目标是合并最小的文件数量以满足最后一次合并刚好达到设置的文件合并系数
其目的是为了reduce读取减少磁盘的开销
如果指定了combiner函数则会在合并期间运行
随后进入reduce函数的执行阶段,并产生数据输出到HDFS
由于大部分情况下,运行NM的节点往往还运行着Datanode,所以输出数据的第一个副本通常是保存在本地
YARN架构设计详解
一、YARN基本服务组件
YARN是Hadoop 2.0中的资源管理系统,它的基本设计思想是将MRv1中的JobTracker拆分成了两个独立的服务:一个全局的资源管理器ResourceManager和每个应用程序特有的ApplicationMaster。其中ResourceManager负责整个系统的资源管理和分配,而ApplicationMaster负责单个应用程序的管理。
YARN总体上仍然是master/slave结构,在整个资源管理框架中,resourcemanager为master,nodemanager是slave。Resourcemanager负责对各个nademanger上资源进行统一管理和调度。当用户提交一个应用程序时,需要提供一个用以跟踪和管理这个程序的ApplicationMaster,它负责向ResourceManager申请资源,并要求NodeManger启动可以占用一定资源的任务。由于不同的ApplicationMaster被分布到不同的节点上,因此它们之间不会相互影响。
YARN的基本组成结构,YARN主要由ResourceManager、NodeManager、ApplicationMaster和Container等几个组件构成。
ResourceManager是Master上一个独立运行的进程,负责集群统一的资源管理、调度、分配等等;NodeManager是Slave上一个独立运行的进程,负责上报节点的状态;App Master和Container是运行在Slave上的组件,Container是yarn中分配资源的一个单位,包涵内存、CPU等等资源,yarn以Container为单位分配资源。
Client向ResourceManager提交的每一个应用程序都必须有一个Application Master,它经过ResourceManager分配资源后,运行于某一个Slave节点的Container中,具体做事情的Task,同样也运行与某一个Slave节点的Container中。RM,NM,AM乃至普通的Container之间的通信,都是用RPC机制。
YARN的架构设计使其越来越像是一个云操作系统,数据处理操作系统。
1、Resourcemanager
RM是一个全局的资源管理器,集群只有一个,负责整个系统的资源管理和分配,包括处理客户端请求、启动/监控APP master、监控nodemanager、资源的分配与调度。它主要由两个组件构成:调度器(Scheduler)和应用程序管理器(Applications Manager,ASM)。
(1) 调度器
调度器根据容量、队列等限制条件(如每个队列分配一定的资源,最多执行一定数量的作业等),将系统中的资源分配给各个正在运行的应用程序。需要注意的是,该调度器是一个“纯调度器”,它不再从事任何与具体应用程序相关的工作,比如不负责监控或者跟踪应用的执行状态等,也不负责重新启动因应用执行失败或者硬件故障而产生的失败任务,这些均交由应用程序相关的ApplicationMaster完成。调度器仅根据各个应用程序的资源需求进行资源分配,而资源分配单位用一个抽象概念“资源容器”(Resource Container,简称Container)表示,Container是一个动态资源分配单位,它将内存、CPU、磁盘、网络等资源封装在一起,从而限定每个任务使用的资源量。此外,该调度器是一个可插拔的组件,用户可根据自己的需要设计新的调度器,YARN提供了多种直接可用的调度器,比如Fair Scheduler和Capacity Scheduler等。
(2) 应用程序管理器
应用程序管理器负责管理整个系统中所有应用程序,包括应用程序提交、与调度器协商资源以启动ApplicationMaster、监控ApplicationMaster运行状态并在失败时重新启动它等。
2. ApplicationMaster(AM)
管理YARN内运行的应用程序的每个实例。
功能:
数据切分
为应用程序申请资源并进一步分配给内部任务。
任务监控与容错
负责协调来自resourcemanager的资源,并通过nodemanager监视容易的执行和资源使用情况。
3. NodeManager(NM)
Nodemanager整个集群有多个,负责每个节点上的资源和使用。
功能:
单个节点上的资源管理和任务。
处理来自于resourcemanager的命令。
处理来自域app master的命令。
Nodemanager管理着抽象容器,这些抽象容器代表着一些特定程序使用针对每个节点的资源。
Nodemanager定时地向RM汇报本节点上的资源使用情况和各个Container的运行状态(cpu和内存等资源)
4.Container
Container是YARN中的资源抽象,它封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等,当AM向RM申请资源时,RM为AM返回的资源便是用Container表示的。YARN会为每个任务分配一个Container,且该任务只能使用该Container中描述的资源。需要注意的是,Container不同于MRv1中的slot,它是一个动态资源划分单位,是根据应用程序的需求动态生成的。目前为止,YARN仅支持CPU和内存两种资源,且使用了轻量级资源隔离机制Cgroups进行资源隔离。
功能:
对task环境的抽象
描述一系列信息
任务运行资源的集合(cpu、内存、io等)
任务运行环境
二、YARN的资源管理
1、资源调度和隔离是yarn作为一个资源管理系统,最重要且最基础的两个功能。资源调度由resourcemanager完成,而资源隔离由各个nodemanager实现。
2、Resourcemanager将某个nodemanager上资源分配给任务(这就是所谓的“资源调度”)后,nodemanager需按照要求为任务提供相应的资源,甚至保证这些资源应具有独占性,为任务运行提供基础和保证,这就是所谓的资源隔离。
3、当谈及到资源时,我们通常指内存、cpu、io三种资源。Hadoop yarn目前为止仅支持cpu和内存两种资源管理和调度。
4、内存资源多少决定任务的生死,如果内存不够,任务可能运行失败;相比之下,cpu资源则不同,它只会决定任务的快慢,不会对任务的生死产生影响。
Yarn的内存管理:
yarn允许用户配置每个节点上可用的物理内存资源,注意,这里是“可用的”,因为一个节点上内存会被若干个服务贡享,比如一部分给了yarn,一部分给了hdfs,一部分给了hbase等,yarn配置的只是自己可用的,配置参数如下:
yarn.nodemanager.resource.memory-mb
表示该节点上yarn可以使用的物理内存总量,默认是8192m,注意,如果你的节点内存资源不够8g,则需要调减这个值,yarn不会智能的探测节点物理内存总量。
yarn.nodemanager.vmem-pmem-ratio
任务使用1m物理内存最多可以使用虚拟内存量,默认是2.1
yarn.nodemanager.pmem-check-enabled
是否启用一个线程检查每个任务证使用的物理内存量,如果任务超出了分配值,则直接将其kill,默认是true。
yarn.nodemanager.vmem-check-enabled
是否启用一个线程检查每个任务证使用的虚拟内存量,如果任务超出了分配值,则直接将其kill,默认是true。
yarn.scheduler.minimum-allocation-mb
单个任务可以使用最小物理内存量,默认1024m,如果一个任务申请物理内存量少于该值,则该对应值改为这个数。
yarn.scheduler.maximum-allocation-mb
单个任务可以申请的最多的内存量,默认8192m
Yarn cpu管理:
目前cpu被划分为虚拟cpu,这里的虚拟cpu是yarn自己引入的概念,初衷是考虑到不同节点cpu性能可能不同,每个cpu具有计算能力也是不一样的,比如,某个物理cpu计算能力可能是另外一个物理cpu的2倍,这时候,你可以通过为第一个物理cpu多配置几个虚拟cpu弥补这种差异。用户提交作业时,可以指定每个任务需要的虚拟cpu个数。在yarn中,cpu相关配置参数如下:
yarn.nodemanager.resource.cpu-vcores
表示该节点上yarn可使用的虚拟cpu个数,默认是8个,注意,目前推荐将该值为与物理cpu核数相同。如果你的节点cpu合数不够8个,则需要调减小这个值,而yarn不会智能的探测节点物理cpu总数。
yarn.scheduler.minimum-allocation-vcores
单个任务可申请最小cpu个数,默认1,如果一个任务申请的cpu个数少于该数,则该对应值被修改为这个数
yarn.scheduler.maximum-allocation-vcores
单个任务可以申请最多虚拟cpu个数,默认是32.