MapReduce:Mapper阶段的输出之MapOutputBuffer、环形缓冲区工作原理
MapOutputBuffer
在上一篇博客中说过,Mapper的输出中有两个重要部分:一是collector,负责收集Mapper输出并将其交付给Reducer;二是partitioner,决定了应该将具体的输出交付给哪一个Reducer。
Mapper的输出是通过其RecordWriter写出去的,此RecordWriter就是一个负责收集Mapper输出的Collector,而MapOutputBuffer就是一种对流经键值对实施排序的Collector。
MapOutputBuffer是MapTask的一个内部类,重要属性如下:
class MapOutputBuffer<K extends Object, V extends Object>
implements MapOutputCollector<K, V>, IndexedSortable {
private int partitions; //输出数据的分区数,即Reducer数
byte[] kvbuffer; //键值对缓冲区
private IndexedSorter sorter; //用于排序的Sorter对象
final BlockingBuffer bb = new BlockingBuffer(); //实际上是一个输出流,提供将缓冲区对键值对的序列化和写入的操作方法,核心是Buffer类
final SpillThread spillThread = new SpillThread(); //专门讲缓冲区内容写入Spill文件的线程
private FileSystem rfs; //存放spill文件的文件系统
MapOutputBuffer的核心就在于其内部定义的Buffer类(new BlockingBuffer()返回的是Buffer类)和一个充当缓冲区存储介质的字节数组kvbuffer。前者提供了对缓冲区进行的操作,而后者是物理意义上的缓冲区。
Spill线程SpillThread专门负责在缓冲区被写满后将其内容spill(溅出)到文件系统的spill文件中,一个Mapper的输出可能会形成多个spill文件。
在之前新建Collector时,有对collector.init()的调用,其实底层就是对MapOutputBuffer.init()的调用,在这个方法中可了解到输出缓冲区的大致构成
public void init(MapOutputCollector.Context context
) throws IOException, ClassNotFoundException {
partitions = job.getNumReduceTasks(); //分区数=Reducer数
rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw(); //spill文件所在目录
final float spillper =
job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8); //Spill"门槛值",默认80%
final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100); //用于排序的缓冲区大小,默认100M
sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
QuickSort.class, IndexedSorter.class), job); //用于排序的算法,默认为QuickSort
int maxMemUsage = sortmb << 20;
maxMemUsage -= maxMemUsage % METASIZE; //METASIZE为元数据长度,为16。maxMemUsage是16的倍数
kvbuffer = new byte[maxMemUsage]; //创建物理缓冲区,默认大小为100M
bufvoid = kvbuffer.length;
kvmeta = ByteBuffer.wrap(kvbuffer)
.order(ByteOrder.nativeOrder())
.asIntBuffer(); //先将kvbuffer包装成一个字节缓冲区ByteBuffer,并设置此缓冲区中存储证书时的字节顺序
//并建立了一个用于整数缓冲区IntBuffer的视图
setEquator(0); //将分割点设置在0位上
bufstart = bufend = bufindex = equator; //ByteBuffer视图为空
kvstart = kvend = kvindex; //IntBuffer视图为空
maxRec = kvmeta.capacity() / NMETA; //最大记录数是IntBuffer容量/16
//Mapper输出的kv都是内存中的对象,所以需要序列化
serializationFactory = new SerializationFactory(job); //用于对键值对的序列化
keySerializer = serializationFactory.getSerializer(keyClass); //对Key序列化
keySerializer.open(bb); //通过bb,即BlockingBuffer用于key序列化后的输出
valSerializer = serializationFactory.getSerializer(valClass);
valSerializer.open(bb);
spillThread.setDaemon(true); //Spill线程是一个守护线程
spillThread.start()
Mapper每次调用map()方法锁输出的键值对首先就存放在kvbuffer中,不过除了kv对,还要存放它们的元数据:key在缓冲区中的起点,value的起点,value的长度以及kv对所属的partition。其中kv的值是按字节存放的,而元数据是作为整数存放的,所以会有一个字节缓冲区ByteBuffer还有整数缓冲区IntBuffer。为了方便整数操作,这里为kvbuffer创建了一个作为IntBuffer的视图。
Spill线程spillThread一经创建并启动就在run()方法中进入循环,一有通知就从spillReady.await()中被唤醒,进行一次sortAndSpill(),把缓冲区的内容排序并溅到Spill文件中,以腾出缓冲区空间。每个Spill文件中都是经过排序后的kv对,最后把这些Spill文件合并(Merge)成一个排好序的大文件,这就是一个MapTask在整个Map阶段的输出
protected class SpillThread extends Thread {
public void run() {
while (true) {
while (!spillInProgress) {
spillReady.await();
}
try {
sortAndSpill();
}
//...
环形缓冲区kvbuffer工作原理
缓冲区的工作机制很简单,逐次将输出数据写入缓冲区,当缓冲区中的剩余空间已不足以容纳本次输出时,就将整个缓冲区的内容Spill到文件中,腾出缓冲区空间,再继续往里面写。但是,在将缓冲区内容Spill到文件中的过程中,对于缓冲区的写入就被阻塞了,会导致Mapper只能工作一段时间,停顿一段时间。
为了避免这样的全同步方式,另外有一个Spill线程。Mapper线程源源不断地往缓冲区中写,Spill线程把缓冲区的内容写入文件。这样Mapper的输出就编程了不阻塞的异步方式。但是为了不让Mapper对缓冲区的写入与Spill线程从缓冲区的读出互相干扰。所以,一般缓冲区都是环形缓冲区,让写入者在前面跑,读出者在后面追,如果写入者跑得太快,跑了一圈追上了读者者,就让写入者停一下;如果读出者追得太快,追上了写入者就让读出者停一下。可以想象成读和写是两名在跑道上跑步的运动员,但是读不能超过写,写不能超读一圈。
环形缓冲区只适合用于写入和读出的内容保持顺序的条件下,而Mapper输出的数据写入Spill文件之前是需要排序的,这个排序并不是对缓冲区的这些kv对挪来挪去,而是改变它们写入Spill文件时的顺序。
kv对经过序列化写入缓冲区后,需要确定它们的边界和位置,这就需要另外存储元数据。每个kv对的元数据中有4个32位的整数,共16字节,元数据包括了V值起点字节的下标,K值起点字节的下标,kv对所属Partition,V值的长度。
数据和元数据都存放在环形缓冲区中,它们之间通过分割点(quator)来区分
如上图就是字节数组的缓冲区kvbuffer,起点下边为0,终点下标为kvbuffer.length。分割点可以在缓冲区的任何位置上,分割点的位置确定后,数据都位于其右侧,元数据都位于其左侧。所有元数据合在一起就是一个元数据块,相当于一个“反向数组”,通过下标即可找到某个kv对的元数据,再按照元数据信息就可找到kv对。
初始化的时候,分割点设置在缓冲区的起点,即下标为0的地方
此时数据位于缓冲区底部,自底向上伸展;元数据位于顶部,自顶向下伸展
对于数据:bufstart指向数据块的起点,bufindex指向现有数据块的末尾,有新的数据到来时就从这个位置开始写;
对于元数据:kvstart指向元数据块中的第一份元数据,kvend指向元数据中最后一份元数据,kvindex指向准备写入下一份元数据的位置,因为环形缓冲区其实是下标沿顺时针增大的数组,所以每写入一份元数据,kvindex会减4,所以kvend和kvindex始终保持一份元数据的距离。
在写入时我们把缓冲区视为字节数组,bufstart,bufindex是字节数组内的下标;而在写入元数据时则把缓冲区视为整数数组,kvstart,kvend,kvindex都是整数数组内的下标。
随着数据的到来,缓冲区的空闲部分会越来越小,实际上Spill操作应该在缓冲区空间耗尽之前就开始,这样Mapper还能继续往缓冲区中写入。在MapOutputBuffer.init()中有个启动Spill的门槛值spiller,默认为80%,对应配置文件mapred-default.xml中的属性mapreduce.map.sort.spill.percent。当缓冲区达到80%满时就开始排序和Spill。排序只需要调整元数据项的位置即可,数据块的内容无需变动。
由于Spill线程和Mapper线程是并发进行的,所以当Spill完成后,原先的kvend到bufindex之间的区间就被释放了,而原先空闲的区间则可能已经有了一些内容;那么新写入的数据和元数据并不是背靠背相连以equator分隔,两者中间隔开了因Spill而腾出来的空闲区间
此时就需要搬动元数据使两者可以背靠背地相连,两者中间就是新的分割点。但是此时新的数据块的起点,也就是之前的bufindex所指的位置未必是与整数边界对齐的,而新的元数据块的起点必须与整数边界对齐。所以经过Spill后,数据块和元数据块未必是无缝地连接在一起了,中间可能会有一个小于4字节大小的空隙
新的数据块起点作为分割点,其右侧为数据块,左侧为元数据块