菜鸟的Spark 源码学习之路 -8 Shuffle

上一次研究的BlockManager相关的源码实现。本次进入Shuffle管理的学习。

Shuffle是spark的一个重要的过程抽象,它涉及到以下几个问题:

1. 数据重新分区

2.数据传输

3.数据压缩

4.磁盘IO

1. 结构概览

我们先看一下Shuffle包的类和对象组成,包括java和scala两部分:

菜鸟的Spark 源码学习之路 -8 Shuffle菜鸟的Spark 源码学习之路 -8 Shuffle

Spark2.x后,原有的MapShuffleManager已经被弃用,现在只有SortShuffleManager。

ShuffleManager是一个接口:

/**
 * Pluggable interface for shuffle systems. A ShuffleManager is created in SparkEnv on the driver
 * and on each executor, based on the spark.shuffle.manager setting. The driver registers shuffles
 * with it, and executors (or tasks running locally in the driver) can ask to read and write data.
 * 它是shuffle系统的一个可插拔接口。ShuffleManager 在driver和executor的SparkEnv中创建。它由spark.shuffle.manager参数设定。Driver使用它注册shuffle。executor(或运行在Driver上的本地任务)可以请求读写数据。
 * NOTE: this will be instantiated by SparkEnv so its constructor can take a SparkConf and
 * boolean isDriver as parameters.
 */
private[spark] trait ShuffleManager {

2. SortShuffleManager

ShuffleManager的唯一实现类则是SortShuffleManager:

/**
 * In sort-based shuffle, incoming records are sorted according to their target partition ids, then
 * written to a single map output file. Reducers fetch contiguous regions of this file in order to
 * read their portion of the map output. In cases where the map output data is too large to fit in
 * memory, sorted subsets of the output can are spilled to disk and those on-disk files are merged
 * to produce the final output file.
 * 基于sort的shuffle过程中,对传入的数据基于目标分区ID进行排序,然后写入一个单一的map输出文件。Reducer获取文件的连续区域以读取他们相应的map输出。在内存不足以存放输出数据时,经过排序后的输出子集回输出到磁盘,这些磁盘文件会被合并成最终的输出文件。
   基于排序的Shuffle过程有两种路径来产生其Map输出文件:
 * Sort-based shuffle has two different write paths for producing its map output files:
 * Serialized sorting模式在满足以下三个条件会使用:
 *  - Serialized sorting: used when all three of the following conditions hold:
      Shuffle依赖不指定聚集或输出排序
 *    1. The shuffle dependency specifies no aggregation or output ordering.
      Shuffle的序列化支持序列化重定位(当前有KryoSerializer 和Spark SQL's custom serializers支持该功能)
 *    2. The shuffle serializer supports relocation of serialized values (this is currently
 *       supported by KryoSerializer and Spark SQL's custom serializers).
      shuffle输出分区少于16777216个
 *    3. The shuffle produces fewer than 16777216 output partitions.
      其他情况下使用Deserialized sorting 模式
 *  - Deserialized sorting: used to handle all other cases.
 *
 * -----------------------
 * Serialized sorting mode
 * -----------------------
 *该模式下,数据在传给shuffle writer后会尽快序列化并在排序过程中缓存在一个序列化格式中。有几个优化点:
 * In the serialized sorting mode, incoming records are serialized as soon as they are passed to the
 * shuffle writer and are buffered in a serialized form during sorting. This write path implements
 * several optimizations:
 *    排序在序列化后的二进制数据上进行,而非java对象,以便减少内存消耗和GC开销。此种优化要求记录序列化工具有重排序列化数据功能,不需要反序列化数据进行排序。
 *  - Its sort operates on serialized binary data rather than Java objects, which reduces memory
 *    consumption and GC overheads. This optimization requires the record serializer to have certain
 *    properties to allow serialized records to be re-ordered without requiring deserialization.
 *    See SPARK-4550, where this optimization was first proposed and implemented, for more details.
 *    它使用一种专门的缓存高效排序器,可以对压缩记录指针数组和分区id数组进行排序。排序数组中每个记录占用8字节空间,可在缓存中存放更多数组
 *  - It uses a specialized cache-efficient sorter ([[ShuffleExternalSorter]]) that sorts
 *    arrays of compressed record pointers and partition ids. By using only 8 bytes of space per
 *    record in the sorting array, this fits more of the array into cache.
 *    磁盘存储数据块操作也基于同分区的序列化记录,不需要在合并文件时反序列化数据。
 *  - The spill merging procedure operates on blocks of serialized records that belong to the same
 *    partition and does not need to deserialize records during the merge.
 *    当数据溢出压缩编解码支持压缩数据级联时,文件合并简单地级联序列化和压缩后的溢出分区从而生成最终输出分区。这就允许高效的数据复制方法,eg. 使用NIO's `transferTo`来避免合并过程分配解压缓冲区或者复制缓冲区。
 *  - When the spill compression codec supports concatenation of compressed data, the spill merge
 *    simply concatenates the serialized and compressed spill partitions to produce the final output
 *    partition.  This allows efficient data copying methods, like NIO's `transferTo`, to be used
 *    and avoids the need to allocate decompression or copying buffers during the merge.
 *
 * For more details on these optimizations, see SPARK-7081.
 */
// 有一个SparkConf作为参数
private[spark] class SortShuffleManager(conf: SparkConf) 

它有两个重要构件:

/**
 * A mapping from shuffle ids to the number of mappers producing output for those shuffles.
   记录Shuffle id 和产生输出的mappers的结构
 */
private[this] val numMapsForShuffle = new ConcurrentHashMap[Int, Int]()
// 创建和维护逻辑block和物理文件位置映射关系的组件
override val shuffleBlockResolver = new IndexShuffleBlockResolver(conf)

IndexShuffleBlockResolver的实现如下:

/**创建和维护逻辑block和物理文件映射关系。同一个map task的shuffle Blocks 存储在一个单一统一数据文件中。文件中的数据块偏移量存放在单独的索引文件中。
 * Create and maintain the shuffle blocks' mapping between logic block and physical file location.
 * Data of shuffle blocks from the same map task are stored in a single consolidated data file.
 * The offsets of the data blocks in the data file are stored in a separate index file.
 *使用"shuffle_" + shuffleId + "_" + mapId + "_" + reduceId 作为文件名,以.data为数据文件后缀,.index作为索引文件后缀
 * We use the name of the shuffle data's shuffleBlockId with reduce ID set to 0 and add ".data"
 * as the filename postfix for data file, and ".index" as the filename postfix for index file.
 *
 */
// Note: Changes to the format in this file should be kept in sync with
// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getSortBasedShuffleBlockData().
private[spark] class IndexShuffleBlockResolver(
    conf: SparkConf,
    _blockManager: BlockManager = null)
  extends ShuffleBlockResolver
  with Logging 

2.1 shuffle的注册和注销

2.1.1 注册Shuffle

shuffle 注册过程实现如下:

/**
 * Obtains a [[ShuffleHandle]] to pass to tasks.
 */
override def registerShuffle[K, V, C](
    shuffleId: Int,
    numMaps: Int,
    dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
  // 在小于spark.shuffle.sort.bypassMergeThreshold参数设定的分区数值时,map端不需要进行聚集操作,因此就直接将输出数据并在尾部追加数据。这样可以避免合并时两次序列化和反序列数据。不好的地方在于,同时打开多个文件是会给缓冲区分配更多空间。
  if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
    // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
    // need map-side aggregation, then write numPartitions files directly and just concatenate
    // them at the end. This avoids doing serialization and deserialization twice to merge
    // together the spilled files, which would happen with the normal code path. The downside is
    // having multiple files open at a time and thus more memory allocated to buffers.
    //返回一个BypassMergeSortShuffleHandle
    new BypassMergeSortShuffleHandle[K, V](
      shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
   //检查是否可以使用基于序列化数据的shuffle优化方式
  } else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
    // Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
    new SerializedShuffleHandle[K, V](
      shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
  } else {
    // Otherwise, buffer map outputs in a deserialized form:
    new BaseShuffleHandle(shuffleId, numMaps, dependency)
  }
}

返回一个ShuffleHandle给一个shuffle task。

该方法在shuffleDependency中调用:

val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
  shuffleId, _rdd.partitions.length, this)

2.1.2 注销shuffle

主要是从numMapsForShuffle 移除对应的map记录,并且删除shuffle产生的数据文件和索引文件

/** Remove a shuffle's metadata from the ShuffleManager. */
override def unregisterShuffle(shuffleId: Int): Boolean = {
  Option(numMapsForShuffle.remove(shuffleId)).foreach { numMaps =>
    (0 until numMaps).foreach { mapId =>
      shuffleBlockResolver.removeDataByMap(shuffleId, mapId)
    }
  }
  true
}

下面是删除数据和索引的过程:

/**
 * Remove data file and index file that contain the output data from one map.
 */
def removeDataByMap(shuffleId: Int, mapId: Int): Unit = {
  var file = getDataFile(shuffleId, mapId)
  if (file.exists()) {
    if (!file.delete()) {
      logWarning(s"Error deleting data ${file.getPath()}")
    }
  }

  file = getIndexFile(shuffleId, mapId)
  if (file.exists()) {
    if (!file.delete()) {
      logWarning(s"Error deleting index ${file.getPath()}")
    }
  }
}

下面是获取数据和索引文件的过程:该过程是依靠blockManager完成的。

def getDataFile(shuffleId: Int, mapId: Int): File = {
  blockManager.diskBlockManager.getFile(ShuffleDataBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
}

private def getIndexFile(shuffleId: Int, mapId: Int): File = {
  blockManager.diskBlockManager.getFile(ShuffleIndexBlockId(shuffleId, mapId, NOOP_REDUCE_ID))
}

这里顺道看一下DiskBlockManager,它是管理逻辑block和物理文件映射的:

/**
 * Creates and maintains the logical mapping between logical blocks and physical on-disk
 * locations. One block is mapped to one file with a name given by its BlockId.
 * 一个block会被映射成一个文件,按blockId命名
 * Block files are hashed among the directories listed in spark.local.dir (or in
 * SPARK_LOCAL_DIRS, if it's set).
   block文件被hash分布在 spark.local.dir指定的目录中
 */
private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolean) extends Logging 

2.2 reader 和writer获取

reader 主要是reduce task 用来读取shuffle结果

/**
 * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
 * Called on executors by reduce tasks.
 */
override def getReader[K, C](
    handle: ShuffleHandle,
    startPartition: Int,
    endPartition: Int,
    context: TaskContext): ShuffleReader[K, C] = {
  // 返回shufflereader,用于读取其他节点的block,分区id的范围是startPartition to endPartition-1 包括端点。
  new BlockStoreShuffleReader(
    handle.asInstanceOf[BaseShuffleHandle[K, _, C]], startPartition, endPartition, context)
}

executor的maptask用于给定分区数据输出
/** Get a writer for a given partition. Called on executors by map tasks. */
override def getWriter[K, V](
    handle: ShuffleHandle,
    mapId: Int,
    context: TaskContext): ShuffleWriter[K, V] = {
  // 记录shuffleId
  numMapsForShuffle.putIfAbsent(
    handle.shuffleId, handle.asInstanceOf[BaseShuffleHandle[_, _, _]].numMaps)
  val env = SparkEnv.get
  // 根据不同情况进行模式匹配,实例化相应的writer
  handle match {
    case unsafeShuffleHandle: SerializedShuffleHandle[K @unchecked, V @unchecked] =>
      new UnsafeShuffleWriter(
        env.blockManager,
        shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
        context.taskMemoryManager(),
        unsafeShuffleHandle,
        mapId,
        context,
        env.conf)
    case bypassMergeSortHandle: BypassMergeSortShuffleHandle[K @unchecked, V @unchecked] =>
      new BypassMergeSortShuffleWriter(
        env.blockManager,
        shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver],
        bypassMergeSortHandle,
        mapId,
        context,
        env.conf)
    case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
      new SortShuffleWriter(shuffleBlockResolver, other, mapId, context)
  }
}

以上就是ShuffleManager中的主要源码实现。

梳理一下:

ShuffleManager在spark2.x后只有一个实现SortShuffleManager,它包含两个组件,一个是记录shuffleId和对应map数的结构:

numMapsForShuffle = new ConcurrentHashMap[Int, Int]()

一个用于创建和维护逻辑block和物理文件位置映射关系

 shuffleBlockResolver = new IndexShuffleBlockResolver(conf)

ShuffleManager提供的主要功能是shuffle注册和注销,reader和writer获取。