菜鸟的Spark 源码学习之路 -8 RDD-依赖关系
RDD之间的依赖关系,是spark stage划分的重要依据。总所周知,RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency),宽依赖(wide dependency)。 stage的划分就在宽依赖和窄依赖的边界处,一旦RDD间存在宽依赖,则会有shuffle产生。
本文我们就来看一下spark如何抽象RDD之间的依赖关系。
1. 窄依赖
NarrowDependency是窄依赖的抽象父类。
/** * :: DeveloperApi :: * Base class for dependencies where each partition of the child RDD depends on a small number * of partitions of the parent RDD. Narrow dependencies allow for pipelined execution. */ @DeveloperApi abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] { /** * Get the parent partitions for a child partition. * @param partitionId a partition of the child RDD * @return the partitions of the parent RDD that the child partition depends upon */ def getParents(partitionId: Int): Seq[Int] override def rdd: RDD[T] = _rdd }
定义了两个基本方法,getParents和rdd。前者用于返回某个分区的父分区,后者用于返回它持有的RDD。
它有三个实现类:
分别抽象了三种父RDD和子RDD分区之间的对应关系。从名字就可以看出来:
/**
* :: DeveloperApi ::
* Represents a one-to-one dependency between partitions of the parent and child RDDs.
分区之间一一对应
*/
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
/**
* Represents a dependency between the PartitionPruningRDD and its parent. In this
* case, the child RDD contains a subset of partitions of the parents'.
子RDD分区是父RDD分区的子集
*/
private[spark] class PruneDependency[T](rdd: RDD[T], partitionFilterFunc: Int => Boolean)
extends NarrowDependency[T](rdd) {
@transient
val partitions: Array[Partition] = rdd.partitions
.filter(s => partitionFilterFunc(s.index)).zipWithIndex
.map { case(split, idx) => new PartitionPruningRDDPartition(idx, split) : Partition }
override def getParents(partitionId: Int): List[Int] = {
List(partitions(partitionId).asInstanceOf[PartitionPruningRDDPartition].parentSplit.index)
}
/** * :: DeveloperApi :: 某个范围内的分区存在一一对应关系 * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs. * @param rdd the parent RDD * @param inStart the start of the range in the parent RDD 父RDD中分区范围的起点 * @param outStart the start of the range in the child RDD 子RDD中分区范围的起点 * @param length the length of the range */ @DeveloperApi class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int) extends NarrowDependency[T](rdd) { override def getParents(partitionId: Int): List[Int] = { if (partitionId >= outStart && partitionId < outStart + length) { 返回父RDD的中一一对应的分区 List(partitionId - outStart + inStart) } else { Nil } } }
2. 宽依赖
宽依赖,在源码中是Shuffle依赖:
/** * :: DeveloperApi :: * Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle, * the RDD is transient since we don't need it on the executor side. * 代表对一个对shuffle stage输出结果的依赖关系。注意,shuffle过程中,RDD是易变的因此不需要它在executor端。 * @param _rdd the parent RDD * @param partitioner partitioner used to partition the shuffle output 用于对shuffle输出进行分区的工具 * @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If not set * explicitly then the default serializer, as specified by `spark.serializer` * config option, will be used. * @param keyOrdering key ordering for RDD's shuffles key的排序规则 * @param aggregator map/reduce-side aggregator for RDD's shuffle RDD的shuffle聚集工具 * @param mapSideCombine whether to perform partial aggregation (also known as map-side combine)是否先在map端进行聚合 */ @DeveloperApi class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( @transient private val _rdd: RDD[_ <: Product2[K, V]], val partitioner: Partitioner, val serializer: Serializer = SparkEnv.get.serializer, val keyOrdering: Option[Ordering[K]] = None, val aggregator: Option[Aggregator[K, V, C]] = None, val mapSideCombine: Boolean = false) extends Dependency[Product2[K, V]]
这里面shuffleHandle的具体操作,已经在Shuffle的源码学习中提过,请参考Shuffle
以上就是RDD宽依赖和窄依赖的源码实现。