Spark Structured Streaming源码分析--(五)Continuous连续处理模式流式Shuffle

一、概述

不同于Spark静态RDD shuffle过程需要创建分段文件及分段索引,流式shuffle通过RPC方式实时发送数据,使得数据处理的时效性更高,例如可以达到~100ms级。
本文主要介绍Spark Structured Streaming 2.4.0 中,Continous流处理模式新增流式Repartition和对应Shuffle操作,为后续流式Shuffle源码扩展提供理论基础。
下一篇文章,我们会从应用项目的需求出发,探讨如何进行流式Shuffle源码扩展,包括以下三个扩展功能:

  • 流式Shuffle支持多分区(开源版本仅支持shuffle输出端为1个分区)
  • Continuous支持侧边输出(1个输入源对应多个输出,多线程执行)
  • 支持下游侧边输出分区偏好设置(如果多个目标侧边分区在同一远端主机,且具有相同的Partitioner,将数条重复数据合并为一条发送,提升性能)

二、 原生ContinuousCoaleseExec执行相关

如何创建ContinuousCoaleseExec及其对应RDD:

Spark Structured Streaming源码分析--(五)Continuous连续处理模式流式Shuffle

1、创建ContinuousCoaleseEexc

  • 在Spark 2.3.0流式处理中,默认不支持Repartition,IncrementalExecution#planner中也没有流式Repartition相关的处理
  • 在Spark 2.4.0流式处理中,Continuous连续处理模式已经支持Repartition操作
  • 静态Repartition操作绑定的是ShuffleExchangeExec或CoalesceExec
  • ContinuousCoaleseEexc是Continous模式中,Reparation操作的绑定SparkPlan物理计划
  • 从以下创建算子的代码中可以看到,spark 2.4.0 continuous连续处理时,目前只支持reparation为1的操作(包括流式Aggregation,只要涉及到重分区):
// Spark 2.4.0流处理中,Continuous连续处理模式Repartition在DataSourceV2Strategy中定义
object DataSourceV2Strategy extends Strategy {
	...
	override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
	  ...
	  case Repartition(1, false, child) =>
      val isContinuous = child.collectFirst {
        case StreamingDataSourceV2Relation(_, _, _, r: ContinuousReader) => r
      }.isDefined

      if (isContinuous) {
        ContinuousCoalesceExec(1, planLater(child)) :: Nil
      } else {
        Nil
      }
	}
}


// 静态Repartition绑定策略在BasicOperators中定义
object BasicOperators extends Strategy {
      case logical.Repartition(numPartitions, shuffle, child) =>
        if (shuffle) {
          // Dataset#repartition()默认走ShuffleExchangeExec
          ShuffleExchangeExec(RoundRobinPartitioning(numPartitions), planLater(child)) :: Nil
        } else {
          // 分区合并
          execution.CoalesceExec(numPartitions, planLater(child)) :: Nil
        }
}

2、ContinuousCoaleseEexc执行的操作

  • ContinuousCoalesceExec类实现较为简单,主要是重写#doExecutue,创建一个ContinuousCoalesceRDD,用来实现Reparation需要的流式Shuffle操作
  • ContinuousCoalesceExec重写了#outputPartitioning,指定为SinglePartition单一分区
case class ContinuousCoalesceExec(numPartitions: Int, child: SparkPlan) extends SparkPlan {
  ...
  // 指定分区策略
  override def outputPartitioning: Partitioning = SinglePartition

  // 创建ContinuousCoalesceRDD,并指定分区个数(1)、ContinuousShuffleReader缓存队列大小(reader默认缓存1024条数据,超过则Blocking)
  override def doExecute(): RDD[InternalRow] = {
    assert(numPartitions == 1)
    new ContinuousCoalesceRDD(
      sparkContext,
      numPartitions,
      conf.continuousStreamingExecutorQueueSize,
      sparkContext.getLocalProperty(ContinuousExecution.EPOCH_INTERVAL_KEY).toLong,
      child.execute())
  }
}

三、ContinuousCoalesceRDD流式shuffle实现(包括writer、reader)

  • 接上一章节创建ContinuousCoalesceRDD,我们重点介绍下它实现流式Shuffle的原理,及其内部创建的ContinuousShuffleWriter(发送数据)、ContinuousShuffleReader(接收数据)
  • ContinuousCoalesceRDD#compute()计算数据图示:(注意下图中shuffle reader只有一个,内部有多个message queue队列)

Spark Structured Streaming源码分析--(五)Continuous连续处理模式流式Shuffle

1、ContinuousCoalesceRDD介绍

ContinuousCoalesceRDD是真正执行Coalesce操作的节点,总体逻辑:

  • 根据prev RDD的分区数量,创建对应大小的threadPool线程池,用于writer所需数据shuffle计算
  • 在ContinuousCoalesceRDD#compute()中,遍历上游RDD所有分区执行prev.compute(),获取需要coalesce的数据迭代
  • 创建当前上游RDD分区对应RPCContinuousShuffleWriter,使用#write()方法,将数据通过RPC方式发送给RPCContinuousShuffleReader()(从writer、reader的创建逻辑来看,目前是writer个数与上游分区个数一致,支持多个,reader只有一个)
    在ContinuousCoalesceRDD#compute()中,执行一次reader.read(),获取prev RDD所有数据,并返回为迭代器

以下是ContinuousCoalesceRDD部分重要代码注释:

class ContinuousCoalesceRDD(
    context: SparkContext,
    numPartitions: Int, // 这里为1
    readerQueueSize: Int, // 从配置获取,默认1024
    epochIntervalMs: Long, // 创建新纪元epoch的间隔,在reader中使用,只是用来设置pool数据的超时时间
    prev: RDD[InternalRow]) // 上游RDD
  extends RDD[InternalRow](context, Nil) {
  
  // When we support more than 1 target partition, we'll need to figure out how to pass in the
  // required partitioner.
  // 分区器,writer分发数据时,调用outputPartitioner#getPartition(row)
  private val outputPartitioner = new HashPartitioner(1)

  // 下游reader Endpoint端点名,只有一个
  private val readerEndpointNames = (0 until numPartitions).map { i =>
    s"ContinuousCoalesceRDD-part$i-${UUID.randomUUID()}"
  }
  
  // 获取分区列表Array[Partition],ContinuousCoalesceRDDPartition内部实现较为简单,创建reader及对应Endpoint,并注册到rpcEnv
  override def getPartitions: Array[Partition] = {
    (0 until numPartitions).map { partIndex =>
      ContinuousCoalesceRDDPartition(
        partIndex,
        readerEndpointNames(partIndex),
        readerQueueSize,
        prev.getNumPartitions,
        epochIntervalMs)
    }.toArray
  }

  // prev RDD分区个数大小的线程池
  private lazy val threadPool = ThreadUtils.newDaemonFixedThreadPool(
    prev.getNumPartitions,
    this.name)
  
  // 分区真实执行的函数,因分区为1,一个epoch纪元周期,只执行一次
  override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
    val part = split.asInstanceOf[ContinuousCoalesceRDDPartition]

    if (!part.writersInitialized) {
      val rpcEnv = SparkEnv.get.rpcEnv

      // trigger lazy initialization
      part.endpoint
      val endpointRefs = readerEndpointNames.map { endpointName =>
        rpcEnv.setupEndpointRef(rpcEnv.address, endpointName)
      }

      val runnables = prev.partitions.map { prevSplit =>
        new Runnable() {
          override def run(): Unit = {
            TaskContext.setTaskContext(context)

			// 创建当前分区对应Writer
            val writer: ContinuousShuffleWriter = new RPCContinuousShuffleWriter(
              prevSplit.index, outputPartitioner, endpointRefs.toArray)

			// 设置Epoch
            EpochTracker.initializeCurrentEpoch(
              context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong)
            while (!context.isInterrupted() && !context.isCompleted()) {
			  // 上游RDD单一分区执行prev.compute(prevSplit, context),获取其结果迭代器,使用writer RPC发送数据至reader
              writer.write(prev.compute(prevSplit, context).asInstanceOf[Iterator[UnsafeRow]])
              
			  // 增加Epoch
              EpochTracker.incrementCurrentEpoch()
            }
          }
        }
      }

      ...

      runnables.foreach(threadPool.execute)
    }

	// 使用reader读取多个writer发送过来的结果,#read()内部也是创建上游分区个数对应大小的线程池,多线程并行读取数据
    part.reader.read()
  }
  
}

2、流式 shuffle writer(RPC发送数据)

由于writer跟reader都在同一主机,RPC走Local模式,直接调用dispatcher发送数据给目标Endpoint端点,并不需要真正走网络发送
流式shuffle writer具体实现类为RPCContinuousShuffleWriter,其实现较为简单,重写了#write()发送数据方法,给ContinuousCoalesceRDD调用:

class RPCContinuousShuffleWriter(
    writerId: Int,
    outputPartitioner: Partitioner, // 即HashPartitioner(1),发送数据前使用outputPartitioner获取数据对应的分区,这里获取出来均为0
    endpoints: Array[RpcEndpointRef] // reader Endpoint端点引用,长度为1
 ) extends ContinuousShuffleWriter {

  if (outputPartitioner.numPartitions != 1) {
    throw new IllegalArgumentException("multiple readers not yet supported")
  }

  if (outputPartitioner.numPartitions != endpoints.length) {
    throw new IllegalArgumentException(s"partitioner size ${outputPartitioner.numPartitions} did " +
      s"not match endpoint count ${endpoints.length}")
  }

  def write(epoch: Iterator[UnsafeRow]): Unit = {
    // 遍历上游数据RDD某一分区数据,构造ReceiverRow对象,发送给reader
    while (epoch.hasNext) {
      val row = epoch.next()
      endpoints(outputPartitioner.getPartition(row)).askSync[Unit](ReceiverRow(writerId, row))
    }

	// 遍历结束后发送一个ReceiverEpochMarker(纪元epoch结束的消息,reader有单独处理)
    val futures = endpoints.map(_.ask[Unit](ReceiverEpochMarker(writerId))).toSeq
    implicit val ec = ThreadUtils.sameThread
    ThreadUtils.awaitResult(Future.sequence(futures), Duration.Inf)
  }
}

3、流式shuffle reader(接收RPC数据)

  • RPCContinuousShuffleReader继承了ThreadSafeRpcEndpoint,具有接收消息的特性,同时继承了ContinuousShuffleReader接口,实现#read()接口,返回当前一个epoch(纪元期间)数据的迭代器。
  • #read()执行前,创建了一个completion:ExecutorCompletionService对线程池executor进行包装,提供实时异步获取线程执行的结果(区别于executor需要future.get()获取),completion.submit()持续进行,任意一条缓存数据被读取出来,future结束,可以通过completion.poll()获取到
private[continuous] class RPCContinuousShuffleReader(
      queueSize: Int,
      numShuffleWriters: Int,
      epochIntervalMs: Long,
      override val rpcEnv: RpcEnv)
    extends ThreadSafeRpcEndpoint with ContinuousShuffleReader with Logging {
  // 每个上游writer对应一个ArrayBlockingQueue缓存数据队列,每个writer对应的队列是内部有序的
  private val queues = Array.fill(numShuffleWriters) {
    new ArrayBlockingQueue[RPCContinuousShuffleMessage](queueSize)
  }

  ...

  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case r: RPCContinuousShuffleMessage =>
      // 将接收到的ReceiverRow或ReceiverEpochMarker消息放入对应writer的缓存队列
      queues(r.writerId).put(r)
      context.reply(())
  }
  // 返回当前epoch数据的迭代器
  override def read(): Iterator[UnsafeRow] = {
    new NextIterator[UnsafeRow] {
      // 标记上游writers数据是否被全部处理,如果是,则迭代器和#read()结束
      private val writerEpochMarkersReceived = Array.fill(numShuffleWriters)(false)

      private val executor = Executors.newFixedThreadPool(numShuffleWriters)
      private val completion = new ExecutorCompletionService[RPCContinuousShuffleMessage](executor)
      // 上游numPartitions个取数据的线程,从queues队列中取。
      private def completionTask(writerId: Int) = new Callable[RPCContinuousShuffleMessage] {
        override def call(): RPCContinuousShuffleMessage = queues(writerId).take()
      }

	  // 初始化取数据,每个writer队列取一条
      (0 until numShuffleWriters).foreach(writerId => completion.submit(completionTask(writerId)))

      override def getNext(): UnsafeRow = {
        var nextRow: UnsafeRow = null
        while (!finished && nextRow == null) {
          // 取任意writer队列中一条数据
          completion.poll(epochIntervalMs, TimeUnit.MILLISECONDS) match {
            case null =>
             // 如果没有获取到数据,且writerEpochMarkersReceived未将所有的队列数据都全部获取,logWarning
              val writerIdsUncommitted = writerEpochMarkersReceived.zipWithIndex.collect {
                case (flag, idx) if !flag => idx
              logWarning(
                s"Completion service failed to make progress after $epochIntervalMs ms. Waiting " +
                  s"for writers ${writerIdsUncommitted.mkString(",")} to send epoch markers.")
              }
              ...
            case future => future.get() match {
              case ReceiverRow(writerId, r) =>
                // 取到一条数据,同时提交一个任务取下一条数据
                completion.submit(completionTask(writerId))
                // 返回当前取到的数据
                nextRow = r
              case ReceiverEpochMarker(writerId) =>
                 // 标记writerId对应的队列已清空
                writerEpochMarkersReceived(writerId) = true
                // 标记所有writers 队列已清空,结束迭代器
                if (writerEpochMarkersReceived.forall(_ == true)) {
                  finished = true
                }
            }
          }
        }

        nextRow
      }
      ...
    }
  }
}