Spark2.3.2源码解析: 算子解析 [ 三 . Key-Value数据类型的Transfromation算子  ]

本文讲述的是[   Key-Value数据类型的Transfromation算子   ] 分类的算子. 

 

一、输入分区与输出分区一对一

    15、mapValues算子

 

mapValues :针对(Key, Value)型数据中的 Value 进行 Map 操作,而不对 Key 进行处理。

   图 15 中的方框代表 RDD 分区。

a=>a+2 代表对 (V1,1) 这样的 Key Value 数据对,数据只对 Value 中的 1 进行加 2 操作,返回结果为 3。

Spark2.3.2源码解析: 算子解析 [ 三 . Key-Value数据类型的Transfromation算子  ]

 

/**
 * Pass each value in the key-value pair RDD through a map function without changing the keys;
 * this also retains the original RDD's partitioning.
 */
def mapValues[U](f: JFunction[V, U]): JavaPairRDD[K, U] = {
  implicit val ctag: ClassTag[U] = fakeClassTag
  fromRDD(rdd.mapValues(f))
}
def fromRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]): JavaPairRDD[K, V] = {
  new JavaPairRDD[K, V](rdd)
}

 

 

 

 

二、对单个RDD或两个RDD聚集

               单个RDD聚集

    16、combineByKey算子

 

combineByKey是Spark中一个比较核心的高级函数,其他一些高阶键值对函数底层都是用它实现的。诸如 groupByKey,reduceByKey等等

 

 

 

下面代码为 combineByKey 函数的定义:
  combineByKey[C](createCombiner:(V) C,
  mergeValue:(C, V) C,
  mergeCombiners:(C, C) C,
  partitioner:Partitioner,
  mapSideCombine:Boolean=true,
  serializer:Serializer=null):RDD[(K,C)]

说明:
  createCombiner: V => C, C 不存在的情况下,比如通过 V 创建 seq C。
   mergeValue: (C, V) => C,当 C 已经存在的情况下,需要 merge,比如把 item V
加到 seq C 中,或者叠加。
   mergeCombiners: (C, C) => C,合并两个 C。
  partitioner: Partitioner, Shuff le 时需要的 Partitioner。
  mapSideCombine : Boolean = true,为了减小传输量,很多 combine 可以在 map
端先做,比如叠加,可以先在一个 partition 中把所有相同的 key 的 value 叠加,
再 shuff le。
  serializerClass: String = null,传输需要序列化,用户可以自定义序列化类:

 

举例: 相当于将元素为 (Int, Int) 的 RDD 转变为了 (Int, Seq[Int]) 类型元素的 RDD。图 16中的方框代表 RDD 分区。如图,通过 combineByKey, 将 (V1,2), (V1,1)数据合并为( V1,Seq(2,1))。

 

Spark2.3.2源码解析: 算子解析 [ 三 . Key-Value数据类型的Transfromation算子  ]

 

/**
 * Generic function to combine the elements for each key using a custom set of aggregation
 * functions. This method is here for backward compatibility. It does not provide combiner
 * classtag information to the shuffle.
 *
 * @see `combineByKeyWithClassTag`
 */
def combineByKey[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C,
    partitioner: Partitioner,
    mapSideCombine: Boolean = true,
    serializer: Serializer = null): RDD[(K, C)] = self.withScope {
  combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
    partitioner, mapSideCombine, serializer)(null)
}

 

/**
 * :: Experimental ::
 * Generic function to combine the elements for each key using a custom set of aggregation
 * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
 *
 * Users provide three functions:
 *
 *  - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
 *  - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
 *  - `mergeCombiners`, to combine two C's into a single one.
 *
 * In addition, users can control the partitioning of the output RDD, and whether to perform
 * map-side aggregation (if a mapper can produce multiple items with the same key).
 *
 * @note V and C can be different -- for example, one might group an RDD of type
 * (Int, Int) into an RDD of type (Int, Seq[Int]).
 */
@Experimental
def combineByKeyWithClassTag[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C,
    partitioner: Partitioner,
    mapSideCombine: Boolean = true,
    serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
  require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
  if (keyClass.isArray) {
    if (mapSideCombine) {
      throw new SparkException("Cannot use map-side combining with array keys.")
    }
    if (partitioner.isInstanceOf[HashPartitioner]) {
      throw new SparkException("HashPartitioner cannot partition array keys.")
    }
  }
  val aggregator = new Aggregator[K, V, C](
    self.context.clean(createCombiner),
    self.context.clean(mergeValue),
    self.context.clean(mergeCombiners))
  if (self.partitioner == Some(partitioner)) {
    self.mapPartitions(iter => {
      val context = TaskContext.get()
      new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
    }, preservesPartitioning = true)
  } else {
    new ShuffledRDD[K, V, C](self, partitioner)
      .setSerializer(serializer)
      .setAggregator(aggregator)
      .setMapSideCombine(mapSideCombine)
  }
}

 

 

 

    17、reduceByKey算子

   reduceByKey 是比 combineByKey 更简单的一种情况,只是两个值合并成一个值,( Int, Int V)to (Int, Int C),比如叠加。所以 createCombiner reduceBykey 很简单,就是直接返回 v,而 mergeValue和 mergeCombiners 逻辑是相同的,没有区别。
    函数实现:
    def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]
= {
combineByKey[V]((v: V) => v, func, func, partitioner)
}
  图17中的方框代表 RDD 分区。通过用户自定义函数 (A,B) => (A + B) 函数,将相同 key 的数据 (V1,2) 和 (V1,1) 的 value 相加运算,结果为( V1,3)。

 

Spark2.3.2源码解析: 算子解析 [ 三 . Key-Value数据类型的Transfromation算子  ]

 

/**
 * Merge the values for each key using an associative and commutative reduce function. This will
 * also perform the merging locally on each mapper before sending results to a reducer, similarly
 * to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
 */
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {
  reduceByKey(new HashPartitioner(numPartitions), func)
}

 

 

/**
 * A [[org.apache.spark.Partitioner]] that implements hash-based partitioning using
 * Java's `Object.hashCode`.
 *
 * Java arrays have hashCodes that are based on the arrays' identities rather than their contents,
 * so attempting to partition an RDD[Array[_]] or RDD[(Array[_], _)] using a HashPartitioner will
 * produce an unexpected or incorrect result.
 */
class HashPartitioner(partitions: Int) extends Partitioner {
  require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")

  def numPartitions: Int = partitions

  def getPartition(key: Any): Int = key match {
    case null => 0
    case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
  }

  override def equals(other: Any): Boolean = other match {
    case h: HashPartitioner =>
      h.numPartitions == numPartitions
    case _ =>
      false
  }

  override def hashCode: Int = numPartitions
}

 

 

/**
 * Merge the values for each key using an associative and commutative reduce function. This will
 * also perform the merging locally on each mapper before sending results to a reducer, similarly
 * to a "combiner" in MapReduce.
 */
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
  combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}

 

 

 

 

    18、partitionBy算子

partitionBy函数对RDD进行分区操作。
  函数定义如下。
  partitionBy(partitioner:Partitioner)
  如果原有RDD的分区器和现有分区器(partitioner)一致,则不重分区,如果不一致,则相当于根据分区器生成一个新的ShuffledRDD。
  图18中的方框代表RDD分区。 通过新的分区策略将原来在不同分区的V1、 V2数据都合并到了一个分区。

 

Spark2.3.2源码解析: 算子解析 [ 三 . Key-Value数据类型的Transfromation算子  ]

 

/**
 * Return a copy of the RDD partitioned using the specified partitioner.
 */
def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
  if (keyClass.isArray && partitioner.isInstanceOf[HashPartitioner]) {
    throw new SparkException("HashPartitioner cannot partition array keys.")
  }
  if (self.partitioner == Some(partitioner)) {
    self
  } else {
    new ShuffledRDD[K, V, V](self, partitioner)
  }
}

 

 

 

 

               两个RDD聚集

    19、Cogroup算子

cogroup函数将两个RDD进行协同划分,cogroup函数的定义如下。
  cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))]
  对在两个RDD中的Key-Value类型的元素,每个RDD相同Key的元素分别聚合为一个集合,并且返回两个RDD中对应Key的元素集合的迭代器。
  (K, (Iterable[V], Iterable[W]))
  其中,Key和Value,Value是两个RDD下相同Key的两个数据集合的迭代器所构成的元组。
  图19中的大方框代表RDD,大方框内的小方框代表RDD中的分区。 将RDD1中的数据(U1,1)、 (U1,2)和RDD2中的数据(U1,2)合并为(U1,((1,2),(2)))。

 

 

Spark2.3.2源码解析: 算子解析 [ 三 . Key-Value数据类型的Transfromation算子  ]

 

 

/**
 * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
 * list of values for that key in `this` as well as `other`.
 */
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
  cogroup(other, defaultPartitioner(self, other))
}
/**
 * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
 * list of values for that key in `this` as well as `other`.
 */
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
    : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
  if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
    throw new SparkException("HashPartitioner cannot partition array keys.")
  }
  val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
  cg.mapValues { case Array(vs, w1s) =>
    (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
  }
}

 

 

 

 

三、连接

    20、join算子

   join 对两个需要连接的 RDD 进行 cogroup函数操作,将相同 key 的数据能够放到一个分区,在 cogroup 操作之后形成的新 RDD 对每个key 下的元素进行笛卡尔积的操作,返回的结果再展平,对应 key 下的所有元组形成一个集合。最后返回 RDD[(K, (V, W))]。
  下 面 代 码 为 join 的 函 数 实 现, 本 质 是通 过 cogroup 算 子 先 进 行 协 同 划 分, 再 通 过flatMapValues 将合并的数据打散。
       this.cogroup(other,partitioner).f latMapValues{case(vs,ws) => for(v<-vs;w<-ws)yield(v,w) }
图 20是对两个 RDD 的 join 操作示意图。大方框代表 RDD,小方框代表 RDD 中的分区。函数对相同 key 的元素,如 V1 为 key 做连接后结果为 (V1,(1,1)) 和 (V1,(1,2))。
 

Spark2.3.2源码解析: 算子解析 [ 三 . Key-Value数据类型的Transfromation算子  ]

 

/**
 * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each
 * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and
 * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD.
 */
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
  this.cogroup(other, partitioner).flatMapValues( pair =>
    for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
  )
}

 

 

 

 

 

    21、leftOutJoin和 rightOutJoin算子

 

  LeftOutJoin(左外连接)和RightOutJoin(右外连接)相当于在join的基础上先判断一侧的RDD元素是否为空,如果为空,则填充为空。 如果不为空,则将数据进行连接运算,并 返回结果。

 

/**
 * Perform a left outer join of `this` and `other`. For each element (k, v) in `this`, the
 * resulting RDD will either contain all pairs (k, (v, Some(w))) for w in `other`, or the
 * pair (k, (v, None)) if no elements in `other` have key k. Uses the given Partitioner to
 * partition the output RDD.
 */
def leftOuterJoin[W](
    other: RDD[(K, W)],
    partitioner: Partitioner): RDD[(K, (V, Option[W]))] = self.withScope {
  this.cogroup(other, partitioner).flatMapValues { pair =>
    if (pair._2.isEmpty) {
      pair._1.iterator.map(v => (v, None))
    } else {
      for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w))
    }
  }
}

 

/**
 * Perform a right outer join of `this` and `other`. For each element (k, w) in `other`, the
 * resulting RDD will either contain all pairs (k, (Some(v), w)) for v in `this`, or the
 * pair (k, (None, w)) if no elements in `this` have key k. Uses the given Partitioner to
 * partition the output RDD.
 */
def rightOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner)
    : RDD[(K, (Option[V], W))] = self.withScope {
  this.cogroup(other, partitioner).flatMapValues { pair =>
    if (pair._1.isEmpty) {
      pair._2.iterator.map(w => (None, w))
    } else {
      for (v <- pair._1.iterator; w <- pair._2.iterator) yield (Some(v), w)
    }
  }
}