转自scalahome的博客
1,RDD的转换共分为Transformation和Action两类
Transformation和Action的区别在于:Action会触发作业的提交,而Transformation不会触发作业的提交
如map()和collect()
-
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
-
val cleanF = sc.clean(f)
-
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
-
}
-
def collect(): Array[T] = withScope {
-
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
-
Array.concat(results: _*)
-
}
2,RDD之间的依赖分为两类NarrowDependency和ShuffleDependency
RDD之间的依赖关系影响着Stage的划分,每遇到一个ShuffleDependency就会产生一个新的Stage
-
private def getMissingParentStages(stage: Stage): List[Stage] = {
-
val missing = new HashSet[Stage]
-
val visited = new HashSet[RDD[_]]
-
// We are manually maintaining a stack here to prevent StackOverflowError
-
// caused by recursively visiting
-
val waitingForVisit = new Stack[RDD[_]]
-
def visit(rdd: RDD[_]) {
-
if (!visited(rdd)) {
-
visited += rdd
-
val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
-
if (rddHasUncachedPartitions) {
-
for (dep <- rdd.dependencies) {
-
dep match {
-
case shufDep: ShuffleDependency[_, _, _] =>
-
val mapStage = getShuffleMapStage(shufDep, stage.firstJobId)
-
if (!mapStage.isAvailable) {
-
missing += mapStage
-
}
-
case narrowDep: NarrowDependency[_] =>
-
waitingForVisit.push(narrowDep.rdd)
-
}
-
}
-
}
-
}
-
}
-
waitingForVisit.push(stage.rdd)
-
while (waitingForVisit.nonEmpty) {
-
visit(waitingForVisit.pop())
-
}
-
missing.toList
-
}
以下面的转换为例分析Stage的划分
-
sc.parallelize(1 to 100).map(_%3).map(i=>(i,1)).reduceByKey(_+_).collect()
依照代码的逻辑,从最后一个RDD分析起:
-
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
-
reduceByKey(defaultPartitioner(self), func)
-
}
-
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
-
val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
-
for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) {
-
return r.partitioner.get
-
}
-
if (rdd.context.conf.contains("spark.default.parallelism")) {
-
new HashPartitioner(rdd.context.defaultParallelism)
-
} else {
-
new HashPartitioner(bySize.head.partitions.size)
-
}
-
}
首先判断当前RDD是否有partitioner,如果没有则用默认的HashPartitioner
-
override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
从MapPartitionsRdd代码可知,此处partitioner为None,因此defaultPartitioner返回的是HashPartitioner
-
def combineByKeyWithClassTag[C](
-
createCombiner: V => C,
-
mergeValue: (C, V) => C,
-
mergeCombiners: (C, C) => C,
-
partitioner: Partitioner,
-
mapSideCombine: Boolean = true,
-
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
-
require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
-
if (keyClass.isArray) {
-
if (mapSideCombine) {
-
throw new SparkException("Cannot use map-side combining with array keys.")
-
}
-
if (partitioner.isInstanceOf[HashPartitioner]) {
-
throw new SparkException("Default partitioner cannot partition array keys.")
-
}
-
}
-
val aggregator = new Aggregator[K, V, C](
-
self.context.clean(createCombiner),
-
self.context.clean(mergeValue),
-
self.context.clean(mergeCombiners))
-
if (self.partitioner == Some(partitioner)) {
-
self.mapPartitions(iter => {
-
val context = TaskContext.get()
-
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
-
}, preservesPartitioning = true)
-
} else {
-
new ShuffledRDD[K, V, C](self, partitioner)
-
.setSerializer(serializer)
-
.setAggregator(aggregator)
-
.setMapSideCombine(mapSideCombine)
-
}
-
}
从上文分析知,self.partitioner为None,partitioner为HashPartitioner,因此这里返回的是一个ShuffledRDD,从上面的代码可知Stage的划分主要是依据RDD的Dependency,ShuffleRDD的dependency是一个ShuffleDependency
-
override def getDependencies: Seq[Dependency[_]] = {
-
List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
-
}
而MapPartitionsRDD的dependency为OneToOneDependency
-
def this(@transient oneParent: RDD[_]) =
-
this(oneParent.context , List(new OneToOneDependency(oneParent)))
ParallelCollectionRDD的dependency则为Nil
-
private[spark] class ParallelCollectionRDD[T: ClassTag](
-
sc: SparkContext,
-
@transient private val data: Seq[T],
-
numSlices: Int,
-
locationPrefs: Map[Int, Seq[String]])
-
extends RDD[T](sc, Nil) {
因此这里将会被划分成2个Stage

下面对上面的转换过程进行简单的修改,看看对Stage的划分有什么影响:
-
sc.parallelize(1 to 100).map(_%3).map(i=>(i,1)).reduceByKey(_+_).reduceByKey(_+_)collect()
-
sc.parallelize(1 to 100).map(_%3).map(i=>(i,1)).reduceByKey(_+_).map(i=>(i._1,i._2*2)).reduceByKey(_+_).collect()
-
sc.parallelize(1 to 100).map(_%3).map(i=>(i,1)).reduceByKey(_+_).filter(i=>i._2>10).reduceByKey(_+_).collect()
这里有一个非常有意思的现象,reduce + reduce 没有产生新的stage,但是reduce + map + reduce 产生了新的stage,而reduce
+ filter + reduce又没有产生新的stage,这里主要涉及到Dependency的传递问题。首先,对于reduce操作并不是必然产生一个ShuffleRDD,也有可能是一个MapPartitionsRDD,主要看当前的partitioner和self.partitioner是否相等
-
if (self.partitioner == Some(partitioner)) {
-
self.mapPartitions(iter => {
-
val context = TaskContext.get()
-
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
-
}, preservesPartitioning = true)
-
} else {
-
new ShuffledRDD[K, V, C](self, partitioner)
-
.setSerializer(serializer)
-
.setAggregator(aggregator)
-
.setMapSideCombine(mapSideCombine)
-
}
对于reduce + reduce,因为前后两个partitioner相等,因此不需要产生新的ShuffleDependency,而对于MapPartitionsRDD是否采用parent的partitioner取决于preserversPartitioning参数的值
-
private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
-
var prev: RDD[T],
-
f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator)
-
preservesPartitioning: Boolean = false)
-
extends RDD[U](prev) {
-
-
override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
而map()操作和filter()在preserversPartitioning参数的取值上是不同的:
-
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
-
val cleanF = sc.clean(f)
-
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
-
}
-
def filter(f: T => Boolean): RDD[T] = withScope {
-
val cleanF = sc.clean(f)
-
new MapPartitionsRDD[T, T](
-
this,
-
(context, pid, iter) => iter.filter(cleanF),
-
preservesPartitioning = true)
-
}
3,RDD的执行
上文分析了RDD之间的依赖,以及依赖对Stage划分的影响,当完成DAG的Stage划分之后,就需要将各Stage以TaskSet的形式发送到各个Executor执行,下面主要分析这部分逻辑
Driver在完成资源调度为Task分配完Executor之后,向Executor发送LaunchTask信息
-
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
Executor在收到Driver的LaunchTask信息之后,创建了一个TaskRunner对象,并调用threadPool.execute()方法执行Task
-
def launchTask(
-
context: ExecutorBackend,
-
taskId: Long,
-
attemptNumber: Int,
-
taskName: String,
-
serializedTask: ByteBuffer): Unit = {
-
val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
-
serializedTask)
-
runningTasks.put(taskId, tr)
-
threadPool.execute(tr)
-
}
需要注意的是这里的threadPool是一个守护线程池
-
private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker")
最终调用Task的runTask()方法
-
(runTask(context), context.collectAccumulators())
Task主要有两个子类ShuffleMapTask和ResultTask
对于ShuffleMapTask主要是通过调用rdd.iterator()来进行计算
-
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
最终会调用RDD.compute()方法,下面是MapPartitionsRDD的compute方法的实现:
-
override def compute(split: Partition, context: TaskContext): Iterator[U] =
-
f(context, split.index, firstParent[T].iterator(split, context))
通过firstParent.iterator()的方式,调用父RDD的interator,实现逐层向上调用
ShuffledRDD过程相对复杂,后续博文会做详细的讨论。
对于ResultTask:
-
func(context, rdd.iterator(partition, context))
这里的func是在调用sc.runJob()时传递的func参数:
-
def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U] = {
-
runJob(rdd, func, 0 until rdd.partitions.length)
-
}
对于collect()操作这里的func为:
-
(iter: Iterator[T]) => iter.toArray
-
(ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it)
对于saveAsTextFile()操作这里的func为:
-
val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => {
-
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
-
// around by taking a mod. We expect that no task will be attempted 2 billion times.
-
val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt
-
-
val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context)
-
-
writer.setup(context.stageId, context.partitionId, taskAttemptId)
-
writer.open()
-
var recordsWritten = 0L
-
-
Utils.tryWithSafeFinallyAndFailureCallbacks {
-
while (iter.hasNext) {
-
val record = iter.next()
-
writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])
-
-
// Update bytes written metric every few records
-
maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)
-
recordsWritten += 1
-
}
-
}(finallyBlock = writer.close())
-
writer.commit()
-
bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }
-
outputMetrics.setRecordsWritten(recordsWritten)
-
}
Task启动的流程大致如下:
