Spark之 cache 的坑
转子:http://bit1129.iteye.com/blog/2182146
调用reduceByKey对应的ShuffledRDD对应的cache
cache不起作用
- package spark.examples
- import org.apache.spark.SparkConf
- import org.apache.spark.SparkContext
- import org.apache.spark.SparkContext._
- object SparkWordCountCache {
- def main(args: Array[String]) {
- System.setProperty("hadoop.home.dir", "E:\\devsoftware\\hadoop-2.5.2\\hadoop-2.5.2");
- val conf = new SparkConf()
- conf.setAppName("SparkWordCount")
- conf.setMaster("local[3]")
- conf.set("spark.shuffle.manager", "hash"); ///hash是否有影响?
- val sc = new SparkContext(conf)
- val rdd1 = sc.textFile("file:///D:/word.in.3");
- val rdd2 = rdd1.flatMap(_.split(" "))
- val rdd3 = rdd2.map((_, 1))
- val rdd4 = rdd3.reduceByKey(_ + _, 3);
- rdd4.cache();
- rdd4.saveAsTextFile("file:///D:/wordout" + System.currentTimeMillis());
- val result = rdd4.collect; ///没有触发ShuffleMapTask执行,但是依然需要从ShuffleMapTask产生的结果拉取数据
- result.foreach(println(_));
- sc.stop
- }
- }
以上代码调用rdd3.cache(),而rdd3是一个ShuffleMapRDD,也就是说,保存的是Stage2里面的RDD结果。此时调用cache.collect时,产生的Task都是ResultTask,也就是说,由于cache作用,最后一个Job并没有从前面从头计算?
感觉不对,即使不用cache,也应该不会从头计算吧
经验证,感觉是对的,将上面的代码做如下修改,结果一样,最后也不会调用ShuffleMapTask,但是在执行ResultTask时,还是会从MapTask的输出中拉取数据,所以并没有对Shuffle读过程进行简化。
- rdd3.saveAsTextFile("file:///D:/wordout" + System.currentTimeMillis());
- val result = rdd3.collect;
- result.foreach(println(_));
上来就踩了个cache的坑!Spark是不支持ShuffleMapRDD的cache的,虽然上面不需要ShuffleMapTask,但是ResultTask运行时,依然需要从MapTask的结果中拉取数据
调用groupByKey对应的ShuffledRDD对应的cache
结果rdd.cache起作用了
- package spark.examples
- import org.apache.spark.SparkContext
- import org.apache.spark.SparkContext._
- import org.apache.spark.SparkConf
- object SparkGroupByExample {
- def main(args: Array[String]) {
- val conf = new SparkConf().setAppName("GroupByKey").setMaster("local")
- val sc = new SparkContext(conf)
- sc.setCheckpointDir("/tmp/checkpoint/" + System.currentTimeMillis())
- val data = Array[(Int, Char)]((1, 'a'), (2, 'b'),
- (3, 'c'), (4, 'd'),
- (5, 'e'), (3, 'f'),
- (2, 'g'), (1, 'h')
- )
- val pairs = sc.parallelize(data)
- val rdd = pairs.groupByKey(2)
- rdd.cache
- rdd.count;
- rdd.collect.foreach(println(_));
- }
- }
调用textFile对应的MappedRDD对应的cache操作
基本流程:假如在一个程序中有两个Job。第一个Job运行时,,对于调用了cache的RDD首先计算它的数据,然后写入cache。第二个job在运行时,会直接从cache中读取。
这对于迭代计算的Job,会非常适合,将上个任务的结果缓存,供第二个任务使用,然后依次类推
- package spark.examples
- import org.apache.spark.SparkConf
- import org.apache.spark.SparkContext
- import org.apache.spark.SparkContext._
- object SparkWordCountCache {
- def main(args: Array[String]) {
- System.setProperty("hadoop.home.dir", "E:\\devsoftware\\hadoop-2.5.2\\hadoop-2.5.2");
- val conf = new SparkConf()
- conf.setAppName("SparkWordCount")
- conf.setMaster("local")
- //Hash based Shuffle;
- conf.set("spark.shuffle.manager", "hash");
- val sc = new SparkContext(conf)
- val rdd1 = sc.textFile("file:///D:/word.in.3");
- rdd1.cache() ///数据读取后即做cache,第一个job运行后,就会缓存
- val rdd2 = rdd1.flatMap(_.split(" "))
- val rdd3 = rdd2.map((_, 1))
- val result = rdd3.collect; ///打印rdd3的内容
- result.foreach(println(_));
- val rdd4 = rdd3.reduceByKey(_ + _); ///对rdd3做reduceByKey操作
- rdd4.saveAsTextFile("file:///D:/wordout" + System.currentTimeMillis());
- sc.stop
- }
- }
源代码基本流程:
- 调用RDD的iterator方法,计算RDD的数据集合(得到的是一个可迭代的集合)
- 在RDD的iterator方法中,检查RDD的storage level,如果设置了storage level,那么调用SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
- 在CacheManager的getOrCompute方法中,
a.首先判断是否存在于cache中,如果存在则直接返回,
b.如果不存在,则调用 val computedValues = rdd.computeOrReadCheckpoint(partition, context)进行计算。
c.计算结束后,调用CacheManager自身的putInBlockManager将计算得到的数据缓存
d. 数据放入BlockManager后,还需要更新这个RDD和BlockManager之间的对应关系,以便下次再计算这个RDD时,检查RDD数据是否已经缓存
主要源代码
1. getOrCompute方法
- /** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */
- def getOrCompute[T](
- rdd: RDD[T],
- partition: Partition,
- context: TaskContext,
- storageLevel: StorageLevel): Iterator[T] = {
- val key = RDDBlockId(rdd.id, partition.index) //RDD的id和partition的index构造RDDBlockId,一个RDD可以有多个partition
- logDebug(s"Looking for partition $key")
- blockManager.get(key) match { ///从blockManger中根据key查找,key最后会存入BlockManager么吗?BlockManager管理Spark的块信息
- case Some(blockResult) =>
- // Partition is already materialized, so just return its values
- context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics)
- new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
- case None =>
- // Acquire a lock for loading this partition
- // If another thread already holds the lock, wait for it to finish return its results
- val storedValues = acquireLockForPartition[T](key) ///根据Key获取缓存的数据,acquireLockForPartition名字起得不好
- if (storedValues.isDefined) { ///找到数据
- return new InterruptibleIterator[T](context, storedValues.get)
- }
- // Otherwise, we have to load the partition ourselves
- ///为找到缓存的数据,表明是job第一次运行
- try {
- logInfo(s"Partition $key not found, computing it")
- val computedValues = rdd.computeOrReadCheckpoint(partition, context) ///计算RDD数据
- // If the task is running locally, do not persist the result
- if (context.isRunningLocally) { ///如果数据在本地,就不需要缓存了?
- return computedValues
- }
- // Otherwise, cache the values and keep track of any updates in block statuses
- ///缓存数据
- val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
- ///将数据存入BlockManager,注意四个参数
- val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
- ///这是什么意思?任务的metrics,任务的
- val metrics = context.taskMetrics
- val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
- metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)
- new InterruptibleIterator(context, cachedValues)
- } finally {
- loading.synchronized {
- loading.remove(key)
- loading.notifyAll()
- }
- }
- }
- }
2. putInBlockManager方法
- /**
- * Cache the values of a partition, keeping track of any updates in the storage statuses of
- * other blocks along the way.
- *
- * The effective storage level refers to the level that actually specifies BlockManager put
- * behavior, not the level originally specified by the user. This is mainly for forcing a
- * MEMORY_AND_DISK partition to disk if there is not enough room to unroll the partition,
- * while preserving the the original semantics of the RDD as specified by the application.
- */
- private def putInBlockManager[T](
- key: BlockId,
- values: Iterator[T],
- level: StorageLevel,
- updatedBlocks: ArrayBuffer[(BlockId, BlockStatus)],
- effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {
- val putLevel = effectiveStorageLevel.getOrElse(level)
- if (!putLevel.useMemory) {
- /*
- * This RDD is not to be cached in memory, so we can just pass the computed values as an
- * iterator directly to the BlockManager rather than first fully unrolling it in memory.
- */
- updatedBlocks ++=
- blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
- blockManager.get(key) match {
- case Some(v) => v.data.asInstanceOf[Iterator[T]]
- case None =>
- logInfo(s"Failure to store $key")
- throw new BlockException(key, s"Block manager failed to return cached value for $key!")
- }
- } else {
- /*
- * This RDD is to be cached in memory. In this case we cannot pass the computed values
- * to the BlockManager as an iterator and expect to read it back later. This is because
- * we may end up dropping a partition from memory store before getting it back.
- *
- * In addition, we must be careful to not unroll the entire partition in memory at once.
- * Otherwise, we may cause an OOM exception if the JVM does not have enough space for this
- * single partition. Instead, we unroll the values cautiously, potentially aborting and
- * dropping the partition to disk if applicable.
- */
- blockManager.memoryStore.unrollSafely(key, values, updatedBlocks) match {
- case Left(arr) =>
- // We have successfully unrolled the entire partition, so cache it in memory
- updatedBlocks ++=
- blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
- arr.iterator.asInstanceOf[Iterator[T]]
- case Right(it) =>
- // There is not enough space to cache this partition in memory
- val returnValues = it.asInstanceOf[Iterator[T]]
- if (putLevel.useDisk) {
- logWarning(s"Persisting partition $key to disk instead.")
- val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,
- useOffHeap = false, deserialized = false, putLevel.replication)
- putInBlockManager[T](key, returnValues, level, updatedBlocks, Some(diskOnlyLevel))
- } else {
- returnValues
- }
- }
- }