MapReduce:Mapper阶段的输出之MapOutputBuffer、环形缓冲区工作原理

MapOutputBuffer

在上一篇博客中说过,Mapper的输出中有两个重要部分:一是collector,负责收集Mapper输出并将其交付给Reducer;二是partitioner,决定了应该将具体的输出交付给哪一个Reducer。

Mapper的输出是通过其RecordWriter写出去的,此RecordWriter就是一个负责收集Mapper输出的Collector,而MapOutputBuffer就是一种对流经键值对实施排序的Collector。

MapOutputBuffer是MapTask的一个内部类,重要属性如下:

class MapOutputBuffer<K extends Object, V extends Object>
              implements MapOutputCollector<K, V>, IndexedSortable {
    private int partitions;        //输出数据的分区数,即Reducer数
    byte[] kvbuffer;               //键值对缓冲区
    private IndexedSorter sorter;  //用于排序的Sorter对象
    final BlockingBuffer bb = new BlockingBuffer();    //实际上是一个输出流,提供将缓冲区对键值对的序列化和写入的操作方法,核心是Buffer类
    final SpillThread spillThread = new SpillThread();    //专门讲缓冲区内容写入Spill文件的线程
    private FileSystem rfs;        //存放spill文件的文件系统
    

MapOutputBuffer的核心就在于其内部定义的Buffer类(new BlockingBuffer()返回的是Buffer类)和一个充当缓冲区存储介质的字节数组kvbuffer。前者提供了对缓冲区进行的操作,而后者是物理意义上的缓冲区。

Spill线程SpillThread专门负责在缓冲区被写满后将其内容spill(溅出)到文件系统的spill文件中,一个Mapper的输出可能会形成多个spill文件。

在之前新建Collector时,有对collector.init()的调用,其实底层就是对MapOutputBuffer.init()的调用,在这个方法中可了解到输出缓冲区的大致构成

public void init(MapOutputCollector.Context context
            ) throws IOException, ClassNotFoundException {

    partitions = job.getNumReduceTasks();        //分区数=Reducer数
    rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();    //spill文件所在目录
    final float spillper =
        job.getFloat(JobContext.MAP_SORT_SPILL_PERCENT, (float)0.8);    //Spill"门槛值",默认80%
    final int sortmb = job.getInt(JobContext.IO_SORT_MB, 100);    //用于排序的缓冲区大小,默认100M
    sorter = ReflectionUtils.newInstance(job.getClass("map.sort.class",
            QuickSort.class, IndexedSorter.class), job);    //用于排序的算法,默认为QuickSort
    
    int maxMemUsage = sortmb << 20;
    maxMemUsage -= maxMemUsage % METASIZE;    //METASIZE为元数据长度,为16。maxMemUsage是16的倍数
    kvbuffer = new byte[maxMemUsage];    //创建物理缓冲区,默认大小为100M
    bufvoid = kvbuffer.length;
    kvmeta = ByteBuffer.wrap(kvbuffer)
         .order(ByteOrder.nativeOrder())
         .asIntBuffer();     //先将kvbuffer包装成一个字节缓冲区ByteBuffer,并设置此缓冲区中存储证书时的字节顺序
                            //并建立了一个用于整数缓冲区IntBuffer的视图

    setEquator(0);    //将分割点设置在0位上
    bufstart = bufend = bufindex = equator;    //ByteBuffer视图为空
    kvstart = kvend = kvindex;    //IntBuffer视图为空
    maxRec = kvmeta.capacity() / NMETA;    //最大记录数是IntBuffer容量/16

    //Mapper输出的kv都是内存中的对象,所以需要序列化
    serializationFactory = new SerializationFactory(job);    //用于对键值对的序列化
    keySerializer = serializationFactory.getSerializer(keyClass);    //对Key序列化
    keySerializer.open(bb);    //通过bb,即BlockingBuffer用于key序列化后的输出
    valSerializer = serializationFactory.getSerializer(valClass);
    valSerializer.open(bb);

    spillThread.setDaemon(true);    //Spill线程是一个守护线程
    spillThread.start()
    

Mapper每次调用map()方法锁输出的键值对首先就存放在kvbuffer中,不过除了kv对,还要存放它们的元数据:key在缓冲区中的起点,value的起点,value的长度以及kv对所属的partition。其中kv的值是按字节存放的,而元数据是作为整数存放的,所以会有一个字节缓冲区ByteBuffer还有整数缓冲区IntBuffer。为了方便整数操作,这里为kvbuffer创建了一个作为IntBuffer的视图。

Spill线程spillThread一经创建并启动就在run()方法中进入循环,一有通知就从spillReady.await()中被唤醒,进行一次sortAndSpill(),把缓冲区的内容排序并溅到Spill文件中,以腾出缓冲区空间。每个Spill文件中都是经过排序后的kv对,最后把这些Spill文件合并(Merge)成一个排好序的大文件,这就是一个MapTask在整个Map阶段的输出

protected class SpillThread extends Thread {
    public void run() {
        while (true) {
            while (!spillInProgress) {
                spillReady.await();
            }
            try {
                sortAndSpill();
            }
            //...

 

环形缓冲区kvbuffer工作原理

缓冲区的工作机制很简单,逐次将输出数据写入缓冲区,当缓冲区中的剩余空间已不足以容纳本次输出时,就将整个缓冲区的内容Spill到文件中,腾出缓冲区空间,再继续往里面写。但是,在将缓冲区内容Spill到文件中的过程中,对于缓冲区的写入就被阻塞了,会导致Mapper只能工作一段时间,停顿一段时间。

为了避免这样的全同步方式,另外有一个Spill线程。Mapper线程源源不断地往缓冲区中写,Spill线程把缓冲区的内容写入文件。这样Mapper的输出就编程了不阻塞的异步方式。但是为了不让Mapper对缓冲区的写入与Spill线程从缓冲区的读出互相干扰。所以,一般缓冲区都是环形缓冲区,让写入者在前面跑,读出者在后面追,如果写入者跑得太快,跑了一圈追上了读者者,就让写入者停一下;如果读出者追得太快,追上了写入者就让读出者停一下。可以想象成读和写是两名在跑道上跑步的运动员,但是读不能超过写,写不能超读一圈。

环形缓冲区只适合用于写入和读出的内容保持顺序的条件下,而Mapper输出的数据写入Spill文件之前是需要排序的,这个排序并不是对缓冲区的这些kv对挪来挪去,而是改变它们写入Spill文件时的顺序。

kv对经过序列化写入缓冲区后,需要确定它们的边界和位置,这就需要另外存储元数据。每个kv对的元数据中有4个32位的整数,共16字节,元数据包括了V值起点字节的下标,K值起点字节的下标,kv对所属Partition,V值的长度。

数据和元数据都存放在环形缓冲区中,它们之间通过分割点(quator)来区分

MapReduce:Mapper阶段的输出之MapOutputBuffer、环形缓冲区工作原理

如上图就是字节数组的缓冲区kvbuffer,起点下边为0,终点下标为kvbuffer.length。分割点可以在缓冲区的任何位置上,分割点的位置确定后,数据都位于其右侧,元数据都位于其左侧。所有元数据合在一起就是一个元数据块,相当于一个“反向数组”,通过下标即可找到某个kv对的元数据,再按照元数据信息就可找到kv对。

初始化的时候,分割点设置在缓冲区的起点,即下标为0的地方

MapReduce:Mapper阶段的输出之MapOutputBuffer、环形缓冲区工作原理

此时数据位于缓冲区底部,自底向上伸展;元数据位于顶部,自顶向下伸展

对于数据:bufstart指向数据块的起点,bufindex指向现有数据块的末尾,有新的数据到来时就从这个位置开始写;

对于元数据:kvstart指向元数据块中的第一份元数据,kvend指向元数据中最后一份元数据,kvindex指向准备写入下一份元数据的位置,因为环形缓冲区其实是下标沿顺时针增大的数组,所以每写入一份元数据,kvindex会减4,所以kvend和kvindex始终保持一份元数据的距离。

在写入时我们把缓冲区视为字节数组,bufstart,bufindex是字节数组内的下标;而在写入元数据时则把缓冲区视为整数数组,kvstart,kvend,kvindex都是整数数组内的下标。

随着数据的到来,缓冲区的空闲部分会越来越小,实际上Spill操作应该在缓冲区空间耗尽之前就开始,这样Mapper还能继续往缓冲区中写入。在MapOutputBuffer.init()中有个启动Spill的门槛值spiller,默认为80%,对应配置文件mapred-default.xml中的属性mapreduce.map.sort.spill.percent。当缓冲区达到80%满时就开始排序和Spill。排序只需要调整元数据项的位置即可,数据块的内容无需变动。

由于Spill线程和Mapper线程是并发进行的,所以当Spill完成后,原先的kvend到bufindex之间的区间就被释放了,而原先空闲的区间则可能已经有了一些内容;那么新写入的数据和元数据并不是背靠背相连以equator分隔,两者中间隔开了因Spill而腾出来的空闲区间

MapReduce:Mapper阶段的输出之MapOutputBuffer、环形缓冲区工作原理

此时就需要搬动元数据使两者可以背靠背地相连,两者中间就是新的分割点。但是此时新的数据块的起点,也就是之前的bufindex所指的位置未必是与整数边界对齐的,而新的元数据块的起点必须与整数边界对齐。所以经过Spill后,数据块和元数据块未必是无缝地连接在一起了,中间可能会有一个小于4字节大小的空隙

MapReduce:Mapper阶段的输出之MapOutputBuffer、环形缓冲区工作原理

新的数据块起点作为分割点,其右侧为数据块,左侧为元数据块