Spark2.3.2源码解析: 算子解析 [ 二 . Value数据类型的Transformation算子  ]

 

本文讲述的是[   Value数据类型的Transformation算子   ] 分类的算子. 

 

一、输入分区与输出分区一对一型

 

    1、map算子

 

将原来 RDD 的每个数据项通过 map 中的用户自定义函数 f 映射转变为一个新的元素。源码中 map 算子相当于初始化一个 RDD, 新 RDD 叫做 MappedRDD(this, sc.clean(f))。

  图 1中每个方框表示一个 RDD 分区,左侧的分区经过用户自定义函数 f:T->U 映射为右侧的新 RDD 分区。但是,实际只有等到 Action算子触发后,这个 f 函数才会和其他函数在一个stage 中对数据进行运算。在图中的第一个分区,数据记录 V1 输入 f,通过 f 转换输出为转换后的分区中的数据记录 V’1。

 

Spark2.3.2源码解析: 算子解析 [ 二 . Value数据类型的Transformation算子  ]

 

源码:

/**
 * Return a new RDD by applying a function to all elements of this RDD.
 */
def map[U: ClassTag](f: T => U): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

 

 

 

    2、flatMap算子

 

将原来 RDD 中的每个元素通过函数 f 转换为新的元素,并将生成的 RDD 的每个集合中的元素合并为一个集合,内部创建 FlatMappedRDD(this,sc.clean(f))。
  图2  表 示 RDD 的 一 个 分 区 ,进 行 flatMap函 数 操 作, flatMap 中 传 入 的 函 数 为 f:T->U, T和 U 可以是任意的数据类型。将分区中的数据通过用户自定义函数 f 转换为新的数据。外部大方框可以认为是一个 RDD 分区,小方框代表一个集合。 V1、 V2、 V3 在一个集合作为 RDD 的一个数据项,可能存储为数组或其他容器,转换为V'1、 V'2、 V'3 后,将原来的数组或容器结合拆散,拆散的数据形成为 RDD 中的数据项。

Spark2.3.2源码解析: 算子解析 [ 二 . Value数据类型的Transformation算子  ]

/**
 *  Return a new RDD by first applying a function to all elements of this
 *  RDD, and then flattening the results.
 */
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}

 

 

 

    3、mapPartitions算子

 

  mapPartitions 函 数 获 取 到 每 个 分 区 的 迭 代器,在 函 数 中 通 过 这 个 分 区 整 体 的 迭 代 器 对整 个 分 区  元 素 进 行 操 作。 内 部 实 现 是 生 成MapPartitionsRDD。图中的方框代表一个 RDD 分区。图 中,用户通过函数 f (iter)=>iter.f ilter(_>=3) 对分区中所有数据进行过滤,大于和等于 3 的数据保留。一个方块代表一个 RDD 分区,含有 1、 2、 3 的分区过滤只剩下元素 3。

 

Spark2.3.2源码解析: 算子解析 [ 二 . Value数据类型的Transformation算子  ]

 

 

/**
 * Return a new RDD by applying a function to each partition of this RDD.
 *
 * `preservesPartitioning` indicates whether the input function preserves the partitioner, which
 * should be `false` unless this is a pair RDD and the input function doesn't modify the keys.
 */
def mapPartitions[U: ClassTag](
    f: Iterator[T] => Iterator[U],
    preservesPartitioning: Boolean = false): RDD[U] = withScope {
  val cleanedF = sc.clean(f)
  new MapPartitionsRDD(
    this,
    (context: TaskContext, index: Int, iter: Iterator[T]) => cleanedF(iter),
    preservesPartitioning)
}

 

 

    4、glom算子

 

 glom函数将每个分区形成一个数组,内部实现是返回的GlommedRDD。 图4中的每个方框代表一个RDD分区。图4中的方框代表一个分区。 该图表示含有V1、 V2、 V3的分区通过函数glom形成一数组Array[(V1),(V2),(V3)]。

 

Spark2.3.2源码解析: 算子解析 [ 二 . Value数据类型的Transformation算子  ]

 

 

/**
 * Return an RDD created by coalescing all elements within each partition into an array.
 */
def glom(): RDD[Array[T]] = withScope {
  new MapPartitionsRDD[Array[T], T](this, (context, pid, iter) => Iterator(iter.toArray))
}

 

 

 

二、输入分区与输出分区多对一型 

 

    5、union算子

 

 使用 union 函数时需要保证两个 RDD 元素的数据类型相同,返回的 RDD 数据类型和被合并的 RDD 元素数据类型相同,并不进行去重操作,保存所有元素。如果想去重可以使用 distinct()。同时 Spark 还提供更为简洁的使用 union 的 API,通过 ++ 符号相当于 union 函数操作。
     图 5 中左侧大方框代表两个 RDD,大方框内的小方框代表 RDD 的分区。右侧大方框代表合并后的 RDD,大方框内的小方框代表分区。

  含有V1、V2、U1、U2、U3、U4的RDD和含有V1、V8、U5、U6、U7、U8的RDD合并所有元素形成一个RDD。V1、V1、V2、V8形成一个分区,U1、U2、U3、U4、U5、U6、U7、U8形成一个分区。

 

 

Spark2.3.2源码解析: 算子解析 [ 二 . Value数据类型的Transformation算子  ]

 

/**
 * Return the union of this RDD and another one. Any identical elements will appear multiple
 * times (use `.distinct()` to eliminate them).
 */
def union(other: RDD[T]): RDD[T] = withScope {
  sc.union(this, other)
}

 

 

/** Build the union of a list of RDDs passed as variable-length arguments. */
def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] = withScope {
  union(Seq(first) ++ rest)
}

 

/** Build the union of a list of RDDs. */
def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = withScope {
  val partitioners = rdds.flatMap(_.partitioner).toSet
  if (rdds.forall(_.partitioner.isDefined) && partitioners.size == 1) {
    new PartitionerAwareUnionRDD(this, rdds)
  } else {
    new UnionRDD(this, rdds)
  }
}

这里暂时先不仔细说明, 可以参考连接:

https://blog.****.net/wl044090432/article/details/59484525

 

 

    6、cartesian算子

  对 两 个 RDD 内 的 所 有 元 素 进 行 笛 卡 尔 积 操 作。 操 作 后, 内 部 实 现 返 回CartesianRDD。图6中左侧大方框代表两个 RDD,大方框内的小方框代表 RDD 的分区。右侧大方框代表合并后的 RDD,大方框内的小方框代表分区。图6中的大方框代表RDD,大方框中的小方框代表RDD分区。
      例 如: V1 和 另 一 个 RDD 中 的 W1、 W2、 Q5 进 行 笛 卡 尔 积 运 算 形 成 (V1,W1)、(V1,W2)、 (V1,Q5)。

Spark2.3.2源码解析: 算子解析 [ 二 . Value数据类型的Transformation算子  ]

/**
 * Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of
 * elements (a, b) where a is in `this` and b is in `other`.
 */
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
  new CartesianRDD(sc, this, other)
}
private[spark]
class CartesianRDD[T: ClassTag, U: ClassTag](
    sc: SparkContext,
    var rdd1 : RDD[T],
    var rdd2 : RDD[U])
  extends RDD[(T, U)](sc, Nil)
  with Serializable {

  val numPartitionsInRdd2 = rdd2.partitions.length

  override def getPartitions: Array[Partition] = {
    // create the cross product split
    val array = new Array[Partition](rdd1.partitions.length * rdd2.partitions.length)
    for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) {
      val idx = s1.index * numPartitionsInRdd2 + s2.index
      array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index)
    }
    array
  }

  override def getPreferredLocations(split: Partition): Seq[String] = {
    val currSplit = split.asInstanceOf[CartesianPartition]
    (rdd1.preferredLocations(currSplit.s1) ++ rdd2.preferredLocations(currSplit.s2)).distinct
  }

  override def compute(split: Partition, context: TaskContext): Iterator[(T, U)] = {
    val currSplit = split.asInstanceOf[CartesianPartition]
    for (x <- rdd1.iterator(currSplit.s1, context);
         y <- rdd2.iterator(currSplit.s2, context)) yield (x, y)
  }

  override def getDependencies: Seq[Dependency[_]] = List(
    new NarrowDependency(rdd1) {
      def getParents(id: Int): Seq[Int] = List(id / numPartitionsInRdd2)
    },
    new NarrowDependency(rdd2) {
      def getParents(id: Int): Seq[Int] = List(id % numPartitionsInRdd2)
    }
  )

  override def clearDependencies() {
    super.clearDependencies()
    rdd1 = null
    rdd2 = null
  }

 

 

 

三、输入分区与输出分区多对多型

 

    7、grouBy算子

 

 函数实现如下:
  1)将用户函数预处理:
  val cleanF = sc.clean(f)
  2)对数据 map 进行函数操作,最后再进行 groupByKey 分组操作。

     this.map(t => (cleanF(t), t)).groupByKey(p)
  其中, p 确定了分区个数和分区函数,也就决定了并行化的程度。

  图7 中方框代表一个 RDD 分区,相同key 的元素合并到一个组。例如 V1 和 V2 合并为 V, Value 为 V1,V2。形成 V,Seq(V1,V2)。

 

Spark2.3.2源码解析: 算子解析 [ 二 . Value数据类型的Transformation算子  ]

建议使用 替换::

PairRDDFunctions.aggregateByKey

PairRDDFunctions.reduceByKey

 

 

/**
 * Return an RDD of grouped items. Each group consists of a key and a sequence of elements
 * mapping to that key. The ordering of elements within each group is not guaranteed, and
 * may even differ each time the resulting RDD is evaluated.
 *
 * @note This operation may be very expensive. If you are grouping in order to perform an
 * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey`
 * or `PairRDDFunctions.reduceByKey` will provide much better performance.
 */
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {
  groupBy[K](f, defaultPartitioner(this))
}

 

/**
 * Return an RDD of grouped items. Each group consists of a key and a sequence of elements
 * mapping to that key. The ordering of elements within each group is not guaranteed, and
 * may even differ each time the resulting RDD is evaluated.
 *
 * @note This operation may be very expensive. If you are grouping in order to perform an
 * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey`
 * or `PairRDDFunctions.reduceByKey` will provide much better performance.
 */
def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null)
    : RDD[(K, Iterable[T])] = withScope {
  val cleanF = sc.clean(f)
  this.map(t => (cleanF(t), t)).groupByKey(p)
}
/**
 * Group the values for each key in the RDD into a single sequence. Allows controlling the
 * partitioning of the resulting key-value pair RDD by passing a Partitioner.
 * The ordering of elements within each group is not guaranteed, and may even differ
 * each time the resulting RDD is evaluated.
 *
 * @note This operation may be very expensive. If you are grouping in order to perform an
 * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey`
 * or `PairRDDFunctions.reduceByKey` will provide much better performance.
 *
 * @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any
 * key in memory. If a key has too many values, it can result in an `OutOfMemoryError`.
 */
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
  // groupByKey shouldn't use map side combine because map side combine does not
  // reduce the amount of data shuffled and requires all map side data be inserted
  // into a hash table, leading to more objects in the old gen.
  val createCombiner = (v: V) => CompactBuffer(v)
  val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
  val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
  val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
    createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
  bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}

 

 

 

 

 

 

四、输出分区为输入分区子集型

 

    8、filter算子

   filter 函数功能是对元素进行过滤,对每个 元 素 应 用 f 函 数, 返 回 值 为 true 的 元 素 在RDD 中保留,返回值为 false 的元素将被过滤掉。 
  图 8 中每个方框代表一个 RDD 分区, T 可以是任意的类型。通过用户自定义的过滤函数 f,对每个数据项操作,将满足条件、返回结果为 true 的数据项保留。例如,过滤掉 V2 和 V3 保留了 V1,为区分命名为 V'1。

 

Spark2.3.2源码解析: 算子解析 [ 二 . Value数据类型的Transformation算子  ]

 

/**
 * Return a new RDD containing only the elements that satisfy a predicate.
 */
def filter(f: T => Boolean): RDD[T] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[T, T](
    this,
    (context, pid, iter) => iter.filter(cleanF),
    preservesPartitioning = true)
}

 

    9、distinct算子

distinct将RDD中的元素进行去重操作。 采用 reduceByKey 进行去重操作.

 

图9中的每个方框代表一个RDD分区,通过distinct函数,将数据去重。 例如,重复数据V1、 V1去重后只保留一份V1。

Spark2.3.2源码解析: 算子解析 [ 二 . Value数据类型的Transformation算子  ]

 

 

/**
 * Return a new RDD containing the distinct elements in this RDD.
 */
def distinct(): RDD[T] = withScope {
  distinct(partitions.length)
}

 

/**
 * Return a new RDD containing the distinct elements in this RDD.
 */
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
  map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
}

 

 

 

    10、subtract算子

 

subtract相当于进行集合的差操作,RDD 1去除RDD 1和RDD 2交集中的所有元素。图10中左侧的大方框代表两个RDD,大方框内的小方框代表RDD的分区。 右侧大方框代表合并后的RDD,大方框内的小方框代表分区。 V1在两个RDD中均有,根据差集运算规则,新RDD不保留,V2在第一个RDD有,第二个RDD没有,则在新RDD元素中包含V2。

 

Spark2.3.2源码解析: 算子解析 [ 二 . Value数据类型的Transformation算子  ]

 

/**
 * Return an RDD with the elements from `this` that are not in `other`.
 *
 * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
 * RDD will be &lt;= us.
 */
def subtract(other: RDD[T]): RDD[T] = withScope {
  subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.length)))
}

 

/**
 * Return an RDD with the elements from `this` that are not in `other`.
 */
def subtract(
    other: RDD[T],
    p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
  if (partitioner == Some(p)) {
    // Our partitioner knows how to handle T (which, since we have a partitioner, is
    // really (K, V)) so make a new Partitioner that will de-tuple our fake tuples
    val p2 = new Partitioner() {
      override def numPartitions: Int = p.numPartitions
      override def getPartition(k: Any): Int = p.getPartition(k.asInstanceOf[(Any, _)]._1)
    }
    // Unfortunately, since we're making a new p2, we'll get ShuffleDependencies
    // anyway, and when calling .keys, will not have a partitioner set, even though
    // the SubtractedRDD will, thanks to p2's de-tupled partitioning, already be
    // partitioned by the right/real keys (e.g. p).
    this.map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys
  } else {
    this.map(x => (x, null)).subtractByKey(other.map((_, null)), p).keys
  }
}

 

/**
 * Return an RDD with the pairs from `this` whose keys are not in `other`.
 */
def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)] = self.withScope {
  new SubtractedRDD[K, V, W](self, other, p)
}

 

 

 

    11、sample算子

  sample 将 RDD 这个集合内的元素进行采样,获取所有元素的子集。用户可以设定是否有放回的抽样、百分比、随机种子,进而决定采样方式。
  函数参数设置:
  withReplacement=true,表示有放回的抽样。
  withReplacement=false,表示无放回的抽样。
  图 11中 的 每 个 方 框 是 一 个 RDD 分 区。 通 过 sample 函 数, 采 样 50% 的 数 据。V1、 V2、 U1、 U2、U3、U4 采样出数据 V1 和 U1、 U2 形成新的 RDD。

Spark2.3.2源码解析: 算子解析 [ 二 . Value数据类型的Transformation算子  ]

 

/**
 * Return a sampled subset of this RDD.
 *
 * @param withReplacement can elements be sampled multiple times (replaced when sampled out)
 * @param fraction expected size of the sample as a fraction of this RDD's size
 *  without replacement: probability that each element is chosen; fraction must be [0, 1]
 *  with replacement: expected number of times each element is chosen; fraction must be greater
 *  than or equal to 0
 * @param seed seed for the random number generator
 *
 * @note This is NOT guaranteed to provide exactly the fraction of the count
 * of the given [[RDD]].
 */
def sample(
    withReplacement: Boolean,
    fraction: Double,
    seed: Long = Utils.random.nextLong): RDD[T] = {
  require(fraction >= 0,
    s"Fraction must be nonnegative, but got ${fraction}")

  withScope {
    require(fraction >= 0.0, "Negative fraction value: " + fraction)
    if (withReplacement) {
      new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
    } else {
      new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
    }
  }
}

 

/**
 * An RDD sampled from its parent RDD partition-wise. For each partition of the parent RDD,
 * a user-specified [[org.apache.spark.util.random.RandomSampler]] instance is used to obtain
 * a random sample of the records in the partition. The random seeds assigned to the samplers
 * are guaranteed to have different values.
 *
 * @param prev RDD to be sampled
 * @param sampler a random sampler
 * @param preservesPartitioning whether the sampler preserves the partitioner of the parent RDD
 * @param seed random seed
 * @tparam T input RDD item type
 * @tparam U sampled RDD item type
 */
private[spark] class PartitionwiseSampledRDD[T: ClassTag, U: ClassTag](
    prev: RDD[T],
    sampler: RandomSampler[T, U],
    preservesPartitioning: Boolean,
    @transient private val seed: Long = Utils.random.nextLong)
  extends RDD[U](prev) {

  @transient override val partitioner = if (preservesPartitioning) prev.partitioner else None

  override def getPartitions: Array[Partition] = {
    val random = new Random(seed)
    firstParent[T].partitions.map(x => new PartitionwiseSampledRDDPartition(x, random.nextLong()))
  }

  override def getPreferredLocations(split: Partition): Seq[String] =
    firstParent[T].preferredLocations(split.asInstanceOf[PartitionwiseSampledRDDPartition].prev)

  override def compute(splitIn: Partition, context: TaskContext): Iterator[U] = {
    val split = splitIn.asInstanceOf[PartitionwiseSampledRDDPartition]
    val thisSampler = sampler.clone
    thisSampler.setSeed(split.seed)
    thisSampler.sample(firstParent[T].iterator(split.prev, context))
  }
}

 

 

         12、takeSample算子

 

takeSample()函数和上面的sample函数是一个原理,但是不使用相对比例采样,而是按设定的采样个数进行采样,同时返回结果不再是RDD,而是相当于对采样后的数据进行Collect(),返回结果的集合为单机的数组。
  图12中左侧的方框代表分布式的各个节点上的分区,右侧方框代表单机上返回的结果数组。 通过takeSample对数据采样,设置为采样一份数据,返回结果为V1。

Spark2.3.2源码解析: 算子解析 [ 二 . Value数据类型的Transformation算子  ]

 

/**
 * Return a fixed-size sampled subset of this RDD in an array
 *
 * @param withReplacement whether sampling is done with replacement
 * @param num size of the returned sample
 * @param seed seed for the random number generator
 * @return sample of specified size in an array
 *
 * @note this method should only be used if the resulting array is expected to be small, as
 * all the data is loaded into the driver's memory.
 */
def takeSample(
    withReplacement: Boolean,
    num: Int,
    seed: Long = Utils.random.nextLong): Array[T] = withScope {
  val numStDev = 10.0

  require(num >= 0, "Negative number of elements requested")
  require(num <= (Int.MaxValue - (numStDev * math.sqrt(Int.MaxValue)).toInt),
    "Cannot support a sample size > Int.MaxValue - " +
    s"$numStDev * math.sqrt(Int.MaxValue)")

  if (num == 0) {
    new Array[T](0)
  } else {
    val initialCount = this.count()
    if (initialCount == 0) {
      new Array[T](0)
    } else {
      val rand = new Random(seed)
      if (!withReplacement && num >= initialCount) {
        Utils.randomizeInPlace(this.collect(), rand)
      } else {
        val fraction = SamplingUtils.computeFractionForSampleSize(num, initialCount,
          withReplacement)
        var samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()

        // If the first sample didn't turn out large enough, keep trying to take samples;
        // this shouldn't happen often because we use a big multiplier for the initial size
        var numIters = 0
        while (samples.length < num) {
          logWarning(s"Needed to re-sample due to insufficient sample size. Repeat #$numIters")
          samples = this.sample(withReplacement, fraction, rand.nextInt()).collect()
          numIters += 1
        }
        Utils.randomizeInPlace(samples, rand).take(num)
      }
    }
  }
}

 

 

 

 

五、Cache型

 

 

    13、cache算子  

cache 将 RDD 元素从磁盘缓存到内存。 相当于 persist(MEMORY_ONLY) 函数的功能。
     图13 中每个方框代表一个 RDD 分区,左侧相当于数据分区都存储在磁盘,通过 cache 算子将数据缓存在内存。

Spark2.3.2源码解析: 算子解析 [ 二 . Value数据类型的Transformation算子  ]

 

/**
 * Persist this RDD with the default storage level (`MEMORY_ONLY`).
 */
def cache(): this.type = persist()

 

 

 

    14、persist算子
 

persist 函数对 RDD 进行缓存操作。数据缓存在哪里依据 StorageLevel 这个枚举类型进行确定。 有以下几种类型的组合(见10), DISK 代表磁盘,MEMORY 代表内存, SER 代表数据是否进行序列化存储。

  下面为函数定义, StorageLevel 是枚举类型,代表存储模式,用户可以通过图 14-1 按需进行选择。
  persist(newLevel:StorageLevel)

object StorageLevel {
  val NONE = new StorageLevel(false, false, false, false)
  val DISK_ONLY = new StorageLevel(true, false, false, false)
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
  val OFF_HEAP = new StorageLevel(true, true, true, false, 1)

      ...........................................

}

 

 

 

图 14-2 中方框代表 RDD 分区。 disk 代表存储在磁盘, mem 代表存储在内存。数据最初全部存储在磁盘,通过 persist(MEMORY_AND_DISK) 将数据缓存到内存,但是有的分区无法容纳在内存,将含有 V1、 V2、 V3 的RDD存储到磁盘,将含有U1,U2的RDD仍旧存储在内存。

 

Spark2.3.2源码解析: 算子解析 [ 二 . Value数据类型的Transformation算子  ]

 

 

/**
 * Persist this RDD with the default storage level (`MEMORY_ONLY`).
 */
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

 

/**
 * Set this RDD's storage level to persist its values across operations after the first time
 * it is computed. This can only be used to assign a new storage level if the RDD does not
 * have a storage level set yet. Local checkpointing is an exception.
 */
def persist(newLevel: StorageLevel): this.type = {
  if (isLocallyCheckpointed) {
    // This means the user previously called localCheckpoint(), which should have already
    // marked this RDD for persisting. Here we should override the old storage level with
    // one that is explicitly requested by the user (after adapting it to use disk).
    persist(LocalRDDCheckpointData.transformStorageLevel(newLevel), allowOverride = true)
  } else {
    persist(newLevel, allowOverride = false)
  }
}

 

/**
 * Mark this RDD for persisting using the specified level.
 * 根据持久化级别,进行持久化
  * def useDisk: Boolean = _useDisk  硬盘
  * def useMemory: Boolean = _useMemory 内存
  * def useOffHeap: Boolean = _useOffHeap 使用堆外

 * @param newLevel the target storage level
 * @param allowOverride whether to override any existing level with the new one
 */
private def persist(newLevel: StorageLevel, allowOverride: Boolean): this.type = {
  // TODO: Handle changes of StorageLevel
  if (storageLevel != StorageLevel.NONE && newLevel != storageLevel && !allowOverride) {
    throw new UnsupportedOperationException(
      "Cannot change storage level of an RDD after it was already assigned a level")
  }
  // If this is the first time this RDD is marked for persisting, register it
  // with the SparkContext for cleanups and accounting. Do this only once.
  if (storageLevel == StorageLevel.NONE) {
    sc.cleaner.foreach(_.registerRDDForCleanup(this))
    sc.persistRDD(this)
  }
  storageLevel = newLevel
  this
}

 

/**
 * Register an RDD to be persisted in memory and/or disk storage
 */
private[spark] def persistRDD(rdd: RDD[_]) {
  persistentRdds(rdd.id) = rdd
}

 

private[spark] val persistentRdds = {
  logInfo("Keeps track of all persisted RDDs ")
  val map: ConcurrentMap[Int, RDD[_]] = new MapMaker().weakValues().makeMap[Int, RDD[_]]()
  map.asScala
}

 

仅仅是将RDD加入缓存, 然后交由后台线程进行处理操作..