Spark Structured Streaming源码分析--(五)Continuous连续处理模式流式Shuffle
文章目录
一、概述
不同于Spark静态RDD shuffle过程需要创建分段文件及分段索引,流式shuffle通过RPC方式实时发送数据,使得数据处理的时效性更高,例如可以达到~100ms级。
本文主要介绍Spark Structured Streaming 2.4.0 中,Continous流处理模式新增流式Repartition和对应Shuffle操作,为后续流式Shuffle源码扩展提供理论基础。
下一篇文章,我们会从应用项目的需求出发,探讨如何进行流式Shuffle源码扩展,包括以下三个扩展功能:
- 流式Shuffle支持多分区(开源版本仅支持shuffle输出端为1个分区)
- Continuous支持侧边输出(1个输入源对应多个输出,多线程执行)
- 支持下游侧边输出分区偏好设置(如果多个目标侧边分区在同一远端主机,且具有相同的Partitioner,将数条重复数据合并为一条发送,提升性能)
二、 原生ContinuousCoaleseExec执行相关
如何创建ContinuousCoaleseExec及其对应RDD:
1、创建ContinuousCoaleseEexc
- 在Spark 2.3.0流式处理中,默认不支持Repartition,IncrementalExecution#planner中也没有流式Repartition相关的处理
- 在Spark 2.4.0流式处理中,Continuous连续处理模式已经支持Repartition操作
- 静态Repartition操作绑定的是ShuffleExchangeExec或CoalesceExec
- ContinuousCoaleseEexc是Continous模式中,Reparation操作的绑定SparkPlan物理计划
- 从以下创建算子的代码中可以看到,spark 2.4.0 continuous连续处理时,目前只支持reparation为1的操作(包括流式Aggregation,只要涉及到重分区):
// Spark 2.4.0流处理中,Continuous连续处理模式Repartition在DataSourceV2Strategy中定义
object DataSourceV2Strategy extends Strategy {
...
override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
...
case Repartition(1, false, child) =>
val isContinuous = child.collectFirst {
case StreamingDataSourceV2Relation(_, _, _, r: ContinuousReader) => r
}.isDefined
if (isContinuous) {
ContinuousCoalesceExec(1, planLater(child)) :: Nil
} else {
Nil
}
}
}
// 静态Repartition绑定策略在BasicOperators中定义
object BasicOperators extends Strategy {
case logical.Repartition(numPartitions, shuffle, child) =>
if (shuffle) {
// Dataset#repartition()默认走ShuffleExchangeExec
ShuffleExchangeExec(RoundRobinPartitioning(numPartitions), planLater(child)) :: Nil
} else {
// 分区合并
execution.CoalesceExec(numPartitions, planLater(child)) :: Nil
}
}
2、ContinuousCoaleseEexc执行的操作
- ContinuousCoalesceExec类实现较为简单,主要是重写#doExecutue,创建一个ContinuousCoalesceRDD,用来实现Reparation需要的流式Shuffle操作
- ContinuousCoalesceExec重写了#outputPartitioning,指定为SinglePartition单一分区
case class ContinuousCoalesceExec(numPartitions: Int, child: SparkPlan) extends SparkPlan {
...
// 指定分区策略
override def outputPartitioning: Partitioning = SinglePartition
// 创建ContinuousCoalesceRDD,并指定分区个数(1)、ContinuousShuffleReader缓存队列大小(reader默认缓存1024条数据,超过则Blocking)
override def doExecute(): RDD[InternalRow] = {
assert(numPartitions == 1)
new ContinuousCoalesceRDD(
sparkContext,
numPartitions,
conf.continuousStreamingExecutorQueueSize,
sparkContext.getLocalProperty(ContinuousExecution.EPOCH_INTERVAL_KEY).toLong,
child.execute())
}
}
三、ContinuousCoalesceRDD流式shuffle实现(包括writer、reader)
- 接上一章节创建ContinuousCoalesceRDD,我们重点介绍下它实现流式Shuffle的原理,及其内部创建的ContinuousShuffleWriter(发送数据)、ContinuousShuffleReader(接收数据)
- ContinuousCoalesceRDD#compute()计算数据图示:(注意下图中shuffle reader只有一个,内部有多个message queue队列)
1、ContinuousCoalesceRDD介绍
ContinuousCoalesceRDD是真正执行Coalesce操作的节点,总体逻辑:
- 根据prev RDD的分区数量,创建对应大小的threadPool线程池,用于writer所需数据shuffle计算
- 在ContinuousCoalesceRDD#compute()中,遍历上游RDD所有分区执行prev.compute(),获取需要coalesce的数据迭代
- 创建当前上游RDD分区对应RPCContinuousShuffleWriter,使用#write()方法,将数据通过RPC方式发送给RPCContinuousShuffleReader()(从writer、reader的创建逻辑来看,目前是writer个数与上游分区个数一致,支持多个,reader只有一个)
在ContinuousCoalesceRDD#compute()中,执行一次reader.read(),获取prev RDD所有数据,并返回为迭代器
以下是ContinuousCoalesceRDD部分重要代码注释:
class ContinuousCoalesceRDD(
context: SparkContext,
numPartitions: Int, // 这里为1
readerQueueSize: Int, // 从配置获取,默认1024
epochIntervalMs: Long, // 创建新纪元epoch的间隔,在reader中使用,只是用来设置pool数据的超时时间
prev: RDD[InternalRow]) // 上游RDD
extends RDD[InternalRow](context, Nil) {
// When we support more than 1 target partition, we'll need to figure out how to pass in the
// required partitioner.
// 分区器,writer分发数据时,调用outputPartitioner#getPartition(row)
private val outputPartitioner = new HashPartitioner(1)
// 下游reader Endpoint端点名,只有一个
private val readerEndpointNames = (0 until numPartitions).map { i =>
s"ContinuousCoalesceRDD-part$i-${UUID.randomUUID()}"
}
// 获取分区列表Array[Partition],ContinuousCoalesceRDDPartition内部实现较为简单,创建reader及对应Endpoint,并注册到rpcEnv
override def getPartitions: Array[Partition] = {
(0 until numPartitions).map { partIndex =>
ContinuousCoalesceRDDPartition(
partIndex,
readerEndpointNames(partIndex),
readerQueueSize,
prev.getNumPartitions,
epochIntervalMs)
}.toArray
}
// prev RDD分区个数大小的线程池
private lazy val threadPool = ThreadUtils.newDaemonFixedThreadPool(
prev.getNumPartitions,
this.name)
// 分区真实执行的函数,因分区为1,一个epoch纪元周期,只执行一次
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val part = split.asInstanceOf[ContinuousCoalesceRDDPartition]
if (!part.writersInitialized) {
val rpcEnv = SparkEnv.get.rpcEnv
// trigger lazy initialization
part.endpoint
val endpointRefs = readerEndpointNames.map { endpointName =>
rpcEnv.setupEndpointRef(rpcEnv.address, endpointName)
}
val runnables = prev.partitions.map { prevSplit =>
new Runnable() {
override def run(): Unit = {
TaskContext.setTaskContext(context)
// 创建当前分区对应Writer
val writer: ContinuousShuffleWriter = new RPCContinuousShuffleWriter(
prevSplit.index, outputPartitioner, endpointRefs.toArray)
// 设置Epoch
EpochTracker.initializeCurrentEpoch(
context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong)
while (!context.isInterrupted() && !context.isCompleted()) {
// 上游RDD单一分区执行prev.compute(prevSplit, context),获取其结果迭代器,使用writer RPC发送数据至reader
writer.write(prev.compute(prevSplit, context).asInstanceOf[Iterator[UnsafeRow]])
// 增加Epoch
EpochTracker.incrementCurrentEpoch()
}
}
}
}
...
runnables.foreach(threadPool.execute)
}
// 使用reader读取多个writer发送过来的结果,#read()内部也是创建上游分区个数对应大小的线程池,多线程并行读取数据
part.reader.read()
}
}
2、流式 shuffle writer(RPC发送数据)
由于writer跟reader都在同一主机,RPC走Local模式,直接调用dispatcher发送数据给目标Endpoint端点,并不需要真正走网络发送
流式shuffle writer具体实现类为RPCContinuousShuffleWriter,其实现较为简单,重写了#write()发送数据方法,给ContinuousCoalesceRDD调用:
class RPCContinuousShuffleWriter(
writerId: Int,
outputPartitioner: Partitioner, // 即HashPartitioner(1),发送数据前使用outputPartitioner获取数据对应的分区,这里获取出来均为0
endpoints: Array[RpcEndpointRef] // reader Endpoint端点引用,长度为1
) extends ContinuousShuffleWriter {
if (outputPartitioner.numPartitions != 1) {
throw new IllegalArgumentException("multiple readers not yet supported")
}
if (outputPartitioner.numPartitions != endpoints.length) {
throw new IllegalArgumentException(s"partitioner size ${outputPartitioner.numPartitions} did " +
s"not match endpoint count ${endpoints.length}")
}
def write(epoch: Iterator[UnsafeRow]): Unit = {
// 遍历上游数据RDD某一分区数据,构造ReceiverRow对象,发送给reader
while (epoch.hasNext) {
val row = epoch.next()
endpoints(outputPartitioner.getPartition(row)).askSync[Unit](ReceiverRow(writerId, row))
}
// 遍历结束后发送一个ReceiverEpochMarker(纪元epoch结束的消息,reader有单独处理)
val futures = endpoints.map(_.ask[Unit](ReceiverEpochMarker(writerId))).toSeq
implicit val ec = ThreadUtils.sameThread
ThreadUtils.awaitResult(Future.sequence(futures), Duration.Inf)
}
}
3、流式shuffle reader(接收RPC数据)
- RPCContinuousShuffleReader继承了ThreadSafeRpcEndpoint,具有接收消息的特性,同时继承了ContinuousShuffleReader接口,实现#read()接口,返回当前一个epoch(纪元期间)数据的迭代器。
- #read()执行前,创建了一个completion:ExecutorCompletionService对线程池executor进行包装,提供实时异步获取线程执行的结果(区别于executor需要future.get()获取),completion.submit()持续进行,任意一条缓存数据被读取出来,future结束,可以通过completion.poll()获取到
private[continuous] class RPCContinuousShuffleReader(
queueSize: Int,
numShuffleWriters: Int,
epochIntervalMs: Long,
override val rpcEnv: RpcEnv)
extends ThreadSafeRpcEndpoint with ContinuousShuffleReader with Logging {
// 每个上游writer对应一个ArrayBlockingQueue缓存数据队列,每个writer对应的队列是内部有序的
private val queues = Array.fill(numShuffleWriters) {
new ArrayBlockingQueue[RPCContinuousShuffleMessage](queueSize)
}
...
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case r: RPCContinuousShuffleMessage =>
// 将接收到的ReceiverRow或ReceiverEpochMarker消息放入对应writer的缓存队列
queues(r.writerId).put(r)
context.reply(())
}
// 返回当前epoch数据的迭代器
override def read(): Iterator[UnsafeRow] = {
new NextIterator[UnsafeRow] {
// 标记上游writers数据是否被全部处理,如果是,则迭代器和#read()结束
private val writerEpochMarkersReceived = Array.fill(numShuffleWriters)(false)
private val executor = Executors.newFixedThreadPool(numShuffleWriters)
private val completion = new ExecutorCompletionService[RPCContinuousShuffleMessage](executor)
// 上游numPartitions个取数据的线程,从queues队列中取。
private def completionTask(writerId: Int) = new Callable[RPCContinuousShuffleMessage] {
override def call(): RPCContinuousShuffleMessage = queues(writerId).take()
}
// 初始化取数据,每个writer队列取一条
(0 until numShuffleWriters).foreach(writerId => completion.submit(completionTask(writerId)))
override def getNext(): UnsafeRow = {
var nextRow: UnsafeRow = null
while (!finished && nextRow == null) {
// 取任意writer队列中一条数据
completion.poll(epochIntervalMs, TimeUnit.MILLISECONDS) match {
case null =>
// 如果没有获取到数据,且writerEpochMarkersReceived未将所有的队列数据都全部获取,logWarning
val writerIdsUncommitted = writerEpochMarkersReceived.zipWithIndex.collect {
case (flag, idx) if !flag => idx
logWarning(
s"Completion service failed to make progress after $epochIntervalMs ms. Waiting " +
s"for writers ${writerIdsUncommitted.mkString(",")} to send epoch markers.")
}
...
case future => future.get() match {
case ReceiverRow(writerId, r) =>
// 取到一条数据,同时提交一个任务取下一条数据
completion.submit(completionTask(writerId))
// 返回当前取到的数据
nextRow = r
case ReceiverEpochMarker(writerId) =>
// 标记writerId对应的队列已清空
writerEpochMarkersReceived(writerId) = true
// 标记所有writers 队列已清空,结束迭代器
if (writerEpochMarkersReceived.forall(_ == true)) {
finished = true
}
}
}
}
nextRow
}
...
}
}
}