RDD内部转换详解

转自scalahome的博客


1,RDD的转换共分为Transformation和Action两类

Transformation和Action的区别在于:Action会触发作业的提交,而Transformation不会触发作业的提交

如map()和collect()

[java] view plain copy
  1. def map[U: ClassTag](f: T => U): RDD[U] = withScope {  
  2.   val cleanF = sc.clean(f)  
  3.   new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))  
  4. }  
[java] view plain copy
  1. def collect(): Array[T] = withScope {  
  2.   val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)  
  3.   Array.concat(results: _*)  
  4. }  
2,RDD之间的依赖分为两类NarrowDependency和ShuffleDependency

RDD之间的依赖关系影响着Stage的划分,每遇到一个ShuffleDependency就会产生一个新的Stage

[java] view plain copy
  1. private def getMissingParentStages(stage: Stage): List[Stage] = {  
  2.   val missing = new HashSet[Stage]  
  3.   val visited = new HashSet[RDD[_]]  
  4.   // We are manually maintaining a stack here to prevent StackOverflowError  
  5.   // caused by recursively visiting  
  6.   val waitingForVisit = new Stack[RDD[_]]  
  7.   def visit(rdd: RDD[_]) {  
  8.     if (!visited(rdd)) {  
  9.       visited += rdd  
  10.       val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)  
  11.       if (rddHasUncachedPartitions) {  
  12.         for (dep <- rdd.dependencies) {  
  13.           dep match {  
  14.             case shufDep: ShuffleDependency[_, _, _] =>  
  15.               val mapStage = getShuffleMapStage(shufDep, stage.firstJobId)  
  16.               if (!mapStage.isAvailable) {  
  17.                 missing += mapStage  
  18.               }  
  19.             case narrowDep: NarrowDependency[_] =>  
  20.               waitingForVisit.push(narrowDep.rdd)  
  21.           }  
  22.         }  
  23.       }  
  24.     }  
  25.   }  
  26.   waitingForVisit.push(stage.rdd)  
  27.   while (waitingForVisit.nonEmpty) {  
  28.     visit(waitingForVisit.pop())  
  29.   }  
  30.   missing.toList  
  31. }  
以下面的转换为例分析Stage的划分

[java] view plain copy
  1. sc.parallelize(1 to 100).map(_%3).map(i=>(i,1)).reduceByKey(_+_).collect()  
依照代码的逻辑,从最后一个RDD分析起:

[java] view plain copy
  1. def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {  
  2.   reduceByKey(defaultPartitioner(self), func)  
  3. }  
[java] view plain copy
  1. def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {  
  2.   val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse  
  3.   for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) {  
  4.     return r.partitioner.get  
  5.   }  
  6.   if (rdd.context.conf.contains("spark.default.parallelism")) {  
  7.     new HashPartitioner(rdd.context.defaultParallelism)  
  8.   } else {  
  9.     new HashPartitioner(bySize.head.partitions.size)  
  10.   }  
  11. }  
首先判断当前RDD是否有partitioner,如果没有则用默认的HashPartitioner

[java] view plain copy
  1. override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None  
从MapPartitionsRdd代码可知,此处partitioner为None,因此defaultPartitioner返回的是HashPartitioner

[java] view plain copy
  1. def combineByKeyWithClassTag[C](  
  2.     createCombiner: V => C,  
  3.     mergeValue: (C, V) => C,  
  4.     mergeCombiners: (C, C) => C,  
  5.     partitioner: Partitioner,  
  6.     mapSideCombine: Boolean = true,  
  7.     serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {  
  8.   require(mergeCombiners != null"mergeCombiners must be defined"// required as of Spark 0.9.0  
  9.   if (keyClass.isArray) {  
  10.     if (mapSideCombine) {  
  11.       throw new SparkException("Cannot use map-side combining with array keys.")  
  12.     }  
  13.     if (partitioner.isInstanceOf[HashPartitioner]) {  
  14.       throw new SparkException("Default partitioner cannot partition array keys.")  
  15.     }  
  16.   }  
  17.   val aggregator = new Aggregator[K, V, C](  
  18.     self.context.clean(createCombiner),  
  19.     self.context.clean(mergeValue),  
  20.     self.context.clean(mergeCombiners))  
  21.   if (self.partitioner == Some(partitioner)) {  
  22.     self.mapPartitions(iter => {  
  23.       val context = TaskContext.get()  
  24.       new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))  
  25.     }, preservesPartitioning = true)  
  26.   } else {  
  27.     new ShuffledRDD[K, V, C](self, partitioner)  
  28.       .setSerializer(serializer)  
  29.       .setAggregator(aggregator)  
  30.       .setMapSideCombine(mapSideCombine)  
  31.   }  
  32. }  
从上文分析知,self.partitioner为None,partitioner为HashPartitioner,因此这里返回的是一个ShuffledRDD,从上面的代码可知Stage的划分主要是依据RDD的Dependency,ShuffleRDD的dependency是一个ShuffleDependency

[java] view plain copy
  1. override def getDependencies: Seq[Dependency[_]] = {  
  2.   List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))  
  3. }  
而MapPartitionsRDD的dependency为OneToOneDependency

[java] view plain copy
  1. def this(@transient oneParent: RDD[_]) =  
  2.   this(oneParent.context , List(new OneToOneDependency(oneParent)))  
ParallelCollectionRDD的dependency则为Nil

[java] view plain copy
  1. private[spark] class ParallelCollectionRDD[T: ClassTag](  
  2.     sc: SparkContext,  
  3.     @transient private val data: Seq[T],  
  4.     numSlices: Int,  
  5.     locationPrefs: Map[Int, Seq[String]])  
  6.     extends RDD[T](sc, Nil) {  
因此这里将会被划分成2个Stage

RDD内部转换详解
下面对上面的转换过程进行简单的修改,看看对Stage的划分有什么影响:

[java] view plain copy
  1. sc.parallelize(1 to 100).map(_%3).map(i=>(i,1)).reduceByKey(_+_).reduceByKey(_+_)collect()  
RDD内部转换详解

[java] view plain copy
  1. sc.parallelize(1 to 100).map(_%3).map(i=>(i,1)).reduceByKey(_+_).map(i=>(i._1,i._2*2)).reduceByKey(_+_).collect()  
RDD内部转换详解

[java] view plain copy
  1. sc.parallelize(1 to 100).map(_%3).map(i=>(i,1)).reduceByKey(_+_).filter(i=>i._2>10).reduceByKey(_+_).collect()  

RDD内部转换详解

这里有一个非常有意思的现象,reduce + reduce 没有产生新的stage,但是reduce + map + reduce 产生了新的stage,而reduce + filter + reduce又没有产生新的stage,这里主要涉及到Dependency的传递问题。首先,对于reduce操作并不是必然产生一个ShuffleRDD,也有可能是一个MapPartitionsRDD,主要看当前的partitioner和self.partitioner是否相等

[java] view plain copy
  1. if (self.partitioner == Some(partitioner)) {  
  2.       self.mapPartitions(iter => {  
  3.         val context = TaskContext.get()  
  4.         new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))  
  5.       }, preservesPartitioning = true)  
  6.     } else {  
  7.       new ShuffledRDD[K, V, C](self, partitioner)  
  8.         .setSerializer(serializer)  
  9.         .setAggregator(aggregator)  
  10.         .setMapSideCombine(mapSideCombine)  
  11.     }  
对于reduce + reduce,因为前后两个partitioner相等,因此不需要产生新的ShuffleDependency,而对于MapPartitionsRDD是否采用parent的partitioner取决于preserversPartitioning参数的值

[java] view plain copy
  1. private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](  
  2.     var prev: RDD[T],  
  3.     f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)  
  4.     preservesPartitioning: Boolean = false)  
  5.   extends RDD[U](prev) {  
  6.   
  7.   override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None  
而map()操作和filter()在preserversPartitioning参数的取值上是不同的:

[java] view plain copy
  1. def map[U: ClassTag](f: T => U): RDD[U] = withScope {  
  2.   val cleanF = sc.clean(f)  
  3.   new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))  
  4. }  
[java] view plain copy
  1. def filter(f: T => Boolean): RDD[T] = withScope {  
  2.   val cleanF = sc.clean(f)  
  3.   new MapPartitionsRDD[T, T](  
  4.     this,  
  5.     (context, pid, iter) => iter.filter(cleanF),  
  6.     preservesPartitioning = true)  
  7. }  


3,RDD的执行

上文分析了RDD之间的依赖,以及依赖对Stage划分的影响,当完成DAG的Stage划分之后,就需要将各Stage以TaskSet的形式发送到各个Executor执行,下面主要分析这部分逻辑

Driver在完成资源调度为Task分配完Executor之后,向Executor发送LaunchTask信息

[java] view plain copy
  1. executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))  
Executor在收到Driver的LaunchTask信息之后,创建了一个TaskRunner对象,并调用threadPool.execute()方法执行Task

[java] view plain copy
  1. def launchTask(  
  2.     context: ExecutorBackend,  
  3.     taskId: Long,  
  4.     attemptNumber: Int,  
  5.     taskName: String,  
  6.     serializedTask: ByteBuffer): Unit = {  
  7.   val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,  
  8.     serializedTask)  
  9.   runningTasks.put(taskId, tr)  
  10.   threadPool.execute(tr)  
  11. }  
需要注意的是这里的threadPool是一个守护线程池

[java] view plain copy
  1. private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker")  

最终调用Task的runTask()方法

[java] view plain copy
  1. (runTask(context), context.collectAccumulators())  
Task主要有两个子类ShuffleMapTask和ResultTask

对于ShuffleMapTask主要是通过调用rdd.iterator()来进行计算

[java] view plain copy
  1. writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])  
最终会调用RDD.compute()方法,下面是MapPartitionsRDD的compute方法的实现:

[java] view plain copy
  1. override def compute(split: Partition, context: TaskContext): Iterator[U] =  
  2.   f(context, split.index, firstParent[T].iterator(split, context))  
通过firstParent.iterator()的方式,调用父RDD的interator,实现逐层向上调用

ShuffledRDD过程相对复杂,后续博文会做详细的讨论。

对于ResultTask:

[java] view plain copy
  1. func(context, rdd.iterator(partition, context))  
这里的func是在调用sc.runJob()时传递的func参数:

[java] view plain copy
  1. def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U] = {  
  2.   runJob(rdd, func, 0 until rdd.partitions.length)  
  3. }  

对于collect()操作这里的func为:

[java] view plain copy
  1. (iter: Iterator[T]) => iter.toArray  

[java] view plain copy
  1. (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it)  

对于saveAsTextFile()操作这里的func为:

[java] view plain copy
  1. val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => {  
  2.       // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it  
  3.       // around by taking a mod. We expect that no task will be attempted 2 billion times.  
  4.       val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt  
  5.   
  6.       val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context)  
  7.   
  8.       writer.setup(context.stageId, context.partitionId, taskAttemptId)  
  9.       writer.open()  
  10.       var recordsWritten = 0L  
  11.   
  12.       Utils.tryWithSafeFinallyAndFailureCallbacks {  
  13.         while (iter.hasNext) {  
  14.           val record = iter.next()  
  15.           writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])  
  16.   
  17.           // Update bytes written metric every few records  
  18.           maybeUpdateOutputMetrics(bytesWrittenCallback, outputMetrics, recordsWritten)  
  19.           recordsWritten += 1  
  20.         }  
  21.       }(finallyBlock = writer.close())  
  22.       writer.commit()  
  23.       bytesWrittenCallback.foreach { fn => outputMetrics.setBytesWritten(fn()) }  
  24.       outputMetrics.setRecordsWritten(recordsWritten)  
  25.     }  
Task启动的流程大致如下:

RDD内部转换详解