MapReduce的工作机制(一)
本文从《Hadoop权威指南》总结而来。
MapReduce过程包含以下四个独立实体:
- 客户端,用于提交MapReduce作业。
- jobtracker,协调作业的运行。jobtracker是Java应用程序,主类是JobTracker。
- tasktracker,运行作业划分后的任务。tasktracker是Java应用程序,主类是TaskTracker。
- 分布式文件系统,一般为HDFS,用来在其他实体间共享作业文件。
run job过程(步骤1)
- JobClient的runJob()方法是用于新建JobClient实例并调用其submitJob()方法的便捷方式。
- 提交作业后,runJob()每秒轮询作业的进度,如果发现自上次报告后有改变,便把进度报告到控制台。
- 作业完成后, 如果成功,就显示作业计数器,如果失败,导致作业失败的错误被记录到控制台。
submit job过程(步骤2,3,4),主体为JobClient
- 通过getNewJobId()(步骤2)向jobtracker请求一个新的job ID
- 检查job的输出说明(如输出目录是否存在)是否正确
- 计算job输入的分片(如输入路径是否存在)是否正确
- 通过copy job resources(步骤3)将运行job所需要的资源复制到一个以job ID命名的目录下jobtracker的文件系统中。
job JAR的副本受mapred.submit.replication=10
控制,在运行job的任务时,集群有很多个副本可供tasktracker访问。
- 通过submit job(步骤4)告知jobtracker执行job
initialize job过程(步骤5),主体为JobTracker
- 将到来的submitJob()方法放置队列中,由作业调度器(job scheduler)进行调度以及初始化。
- 初始化包括创建一个正在运行作业的对象,用于跟踪任务的状态和进展情况。
作业调度器,retrieve input splits(步骤6)- 首先从共享文件系统(HDFS)中获取JobClient已经计算好的输入分片信息,然后为每个分片创建一个map任务
- reduce任务数量取决于
mapred.reduce.task
的值。
hearbeat(步骤7),主体为TaskTracker
- 通过简单的循环定期发送心跳包给jobtracker,jobtracker通过心跳包来确认tasktracker存活。
- tasktracker能够通过心跳告知jobtracker是否已经准备好运行新任务。
- tasktracker运行的任务数量受到tasktracker核的数量和内存大小限制,可以同时运行多个map以及多个reduce任务。
- 默认调度器在处理reduce任务槽(slot)之前,会填满空闲的map任务槽,因此只有map全被填满才会执行reduce,否则继续执行map任务。
retrieve job resources(步骤8),主体为Tasktracker
数据本地化:任务和输入分片在同一个节点上。
机架本地化:任务和输入分片在同一个机架,不同节点上。
对于下一个map任务,jobtracker会考虑tasktracker的网络位置,尽量做到数据本地化或是机架本地化。
对于下一个reduce任务,jobtracker从reduce任务等待队列中取下一个来执行,不需要考虑数据的本地化?
jobtracker需要先选定任务所在的作业(job)?,jobtracker才能为tasktracker选择任务(map任务或是reduce任务)
launch(),主体为tasktracker
- 通过retrieveJobResource()将HDFS(共享文件系统)中的job的JAR文件复制到tasktracker所在节点的文件系统中(本地化),将应用程序所需要的全部文件从分布式缓存复制到本地磁盘。
- 本地解压JAR文件,新建TaskRunner实例,TaskRunner启动launch()一个新的JVM(防止map或reduce过程中导致tasktracker崩溃),执行每个任务run()
进度和状态的更新
一个job和它的每个任务都有一个状态:
- job或任务的状态(运行中,成功,失败)
- map和reduce的进度
- job计数器的值
- 状态消息或描述
任务在运行中,会对自身进度保持跟踪:
- 对map任务,进度表示已处理的输入占总体的比例
- 对reduce任务,表示的是系统估计的,已经处理的reduce的输入比例。
- 独立线程每隔三秒检查一次状态标志位,状态汇报至tasktracker(线程->tasktracker)
tasktracker每隔五秒(由集群大小决定)将“心跳”发送到jobtracker(tasktracker->jobtracker)
tasktracker将所有任务状态发送到jobtracker(tasktracker->jobtracker)
- jobtracker将所有更新合并,产生一个代表所有作业及其所含任务状态的状态
- JobClient通过getJobStatus()每秒查询jobtracker来接受最新状态
作业调度
- FIFO,先进先出
-
调度用户作业的简单方法(不支持抢占,无法满足即时性高的需求)
- 优先级属性设置:mapred.job.priority,JobClient.setJobPriority(),可选优先级有VERY_HIGH,HIGH,NORMAL,LOW,VERY_LOW
- Fair Scheduler,公平调度器
-
作业池,每个用户默认拥有自己的作业池
- 可以用map和reduce的任务槽数来制定作业池的最小容量,也可以设置每个池的权重
- 支持抢占
- 如果一个池在特定的一段时间内未得到公平的资源共享,调度器会终止运行池中得到过多资源的任务,以便把任务槽让给之前那个缺少资源的池
- Capacity Scheduler,能力调度器
-
针对多用户调度
集群有很多队列组成,每个队列都有一个分配能力 - 队列间可能有父子关系,队列间使用该调度,队列内使用FIFO调度