spark源码分析:(二)1.RDD个数的判断
RDD创建个数的判断。
代码如下:
val rdd1 = sc.textFile(input,3)
val rdd2 = rdd1.map( x => x + s"(${x.length})" )
println(rdd2.collect().mkString("\n"))
通过debug,我们来看看rdd1,rdd2是什么类型的RDD。由于RDD之间并没有发生Shuffle,因此RDD之间的依赖都是OneToOneDependency。
要想知道RDD创建的细节,我们首先从源码入手。
来看下textFile API的实现方式:
def textFile( path: String,minPartitions: Int = defaultMinPartitions): RDD[String] =
withScope {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString)
}
在这个方法中,我们可以清楚的看见调用了hadoopFile API。接来下,我们看看这个hadoopFile API具体做了啥。
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
keyClass: Class[K],
valueClass: Class[V],
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
assertNotStopped()
// A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
new HadoopRDD(
this,
confBroadcast,
Some(setInputPathsFunc),
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
}
通过上面的代码,我们可以清楚的看见方法内部new了一个Hadoop RDD。这样我们得到了一个Hadoop RDD。
hadoopFile 返回了一个Hadoop RDD,然后,调用了map(pair => pair._2.toString)方法。我们跟进这个方法进行查看。
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}
从上面我们可以清楚的看见,map方法内部实现了一个MapPartitionsRDD。
根据上面所述,上面两句代码(rdd1,rdd2),共创建了3个RDD。分别是:HadoopRDD,MapPartitionsRDD,MapPartitionsRDD。
-------------------------------------------------------------------------------------------------------------------------------------------------------------------
由于该博客内容属于另一博客的分支,故将总博客的链接附上: