MapTask工作机制

MapTask工作机制流程图:

MapTask工作机制

基本概念:

  • 切片数决定MapTask的数量
  • 切片大小=blockSize(默认128MB)
  • 针对每一个文件单独切片(除了处理小文件的CombineTextInputFormat和部分自定义InputFormat)

流程图解析:

客户端job提交--->文件读取--->map操作--->写入环形缓冲区--->分区,排序--->溢出到磁盘

  1. 首先由客户端提交job相关信息(比如jar包,配置文件,切片信息InputSplit)。
  2. MapTask通过InputFormat和RecorderReader来读取待处理的文本(默认的InputFormat采用的是TextInputFormat,默认是读取一行,以偏移量为key,以一行数据作为value)到map方法中。
  3. 在map方法中进行相关的逻辑处理。
  4. 然后通过outputCollerctor将<key,value>写出到环形缓冲区中。注意:环形缓冲区分为两个部分,分别是左边部分和右边部分,左边部分通常存放的是索引和元数据等信息,右边部分存放的是数据相关信息。
  5. 当环形缓冲区写满以后(默认是到达80%),则开始对环形缓冲区里面的数据进行分区(默认使用HashPartitioner分区)和排序(默认使用Key.compareTo方法进行排序),然后溢出到文件。
  6. 最后将所有溢出的文件进行Merge归并排序,准备为reduce阶段使用。

(1)Read阶段:

Map Task通过用户编写的RecordReader从输入InputSplit解析出一个个key/value

(2)Map阶段:

该节点主要是将解析出的key/value交给用户编写map()函数处理,并产生一系列新的key/value。默认使用的是TextInputFormat输入流(读取的key是偏移量,value是文件中的一行)。

(3)Collect收集阶段:

在用户编写map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在函数内部,它会生成的key/value分区调用Partitioner)并写入一个环形内存缓冲区中。

(4)Spill阶段:

即“写”当环形缓冲区满后(默认是到达环形缓冲区的80%。环形缓冲区左边部分一般记录的是元数据信息,右边部分记录的是数据信息),MapReduce会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要对数据进行合并压缩等操作

写阶段详情:

步骤1:

利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号partition进行排序,然后按照key进行排序。这样经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照key有序。

步骤2:

按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件output/spillN.out(N表示当前溢写次数)中。如果用户设置了Combiner,则写入文件之前,对每个分区中数据进行一次聚集操作。

步骤3:

将分区数据的元信息写到内存索引数据结构SpillRecord,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件output/spillN.out.index

(5)Combine阶段

当所有数据处理完成后,MapTask所有临时文件进行一次合并,以确保最终只会生成一个数据文件。

所有数据处理完后,MapTask将所有临时文件合并成一个大文件保存到文件output/file.out,同时生成相应的索引文件output/file.out.index

进行文件合并过程中,MapTask分区为单位进行合并。对于某个分区,将采用多轮递归合并的方式每轮合并io.sort.factor(默认100)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。

每个MapTask最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。