Spark2.3.2源码解析: 算子解析 [ 四 . Action算子  ]

 

本文讲述的是[   Action算子   ] 分类的算子. 

  本质上在 Action 算子中通过 SparkContext 进行了提交作业的 runJob 操作,触发了RDD DAG 的执行。

一、无输出

    22、foreach算子

 foreach 对 RDD 中的每个元素都应用 f 函数操作,不返回 RDD 和 Array, 而是返回Uint。图22表示 foreach 算子通过用户自定义函数对每个数据项进行操作。本例中自定义函数为 println(),控制台打印所有数据项。

Spark2.3.2源码解析: 算子解析 [ 四 . Action算子  ]

 

/**
 * Applies a function f to all elements of this RDD.
 */
def foreach(f: T => Unit): Unit = withScope {
  val cleanF = sc.clean(f)
  sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}

 

 

二、HDFS

    23、saveAsTextFile算子

 

函数将数据输出,存储到 HDFS 的指定目录。

下面为 saveAsTextFile 函数的内部实现,其内部
  通过调用 saveAsHadoopFile 进行实现:
this.map(x => (NullWritable.get(), new Text(x.toString))).saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
将 RDD 中的每个元素映射转变为 (null, x.toString),然后再将其写入 HDFS。
  图 23中左侧方框代表 RDD 分区,右侧方框代表 HDFS 的 Block。通过函数将RDD 的每个分区存储为 HDFS 中的一个 Block。

Spark2.3.2源码解析: 算子解析 [ 四 . Action算子  ]

 

 

/**
 * Save this RDD as a text file, using string representations of elements.
 */
def saveAsTextFile(path: String): Unit = withScope {
  // https://issues.apache.org/jira/browse/SPARK-2075
  //
  // NullWritable is a `Comparable` in Hadoop 1.+, so the compiler cannot find an implicit
  // Ordering for it and will use the default `null`. However, it's a `Comparable[NullWritable]`
  // in Hadoop 2.+, so the compiler will call the implicit `Ordering.ordered` method to create an
  // Ordering for `NullWritable`. That's why the compiler will generate different anonymous
  // classes for `saveAsTextFile` in Hadoop 1.+ and Hadoop 2.+.
  //
  // Therefore, here we provide an explicit Ordering `null` to make sure the compiler generate
  // same bytecodes for `saveAsTextFile`.
  val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
  val textClassTag = implicitly[ClassTag[Text]]
  val r = this.mapPartitions { iter =>
    val text = new Text()
    iter.map { x =>
      text.set(x.toString)
      (NullWritable.get(), text)
    }
  }
  RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
    .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
}
/**
 * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
 * supporting the key and value types K and V in this RDD.
 */
def saveAsHadoopFile[F <: OutputFormat[K, V]](
    path: String)(implicit fm: ClassTag[F]): Unit = self.withScope {
  saveAsHadoopFile(path, keyClass, valueClass, fm.runtimeClass.asInstanceOf[Class[F]])
}

 

/**
 * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
 * supporting the key and value types K and V in this RDD.
 *
 * @note We should make sure our tasks are idempotent when speculation is enabled, i.e. do
 * not use output committer that writes data directly.
 * There is an example in https://issues.apache.org/jira/browse/SPARK-10063 to show the bad
 * result of using direct output committer with speculation enabled.
 */
def saveAsHadoopFile(
    path: String,
    keyClass: Class[_],
    valueClass: Class[_],
    outputFormatClass: Class[_ <: OutputFormat[_, _]],
    conf: JobConf = new JobConf(self.context.hadoopConfiguration),
    codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {
  // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
  val hadoopConf = conf
  hadoopConf.setOutputKeyClass(keyClass)
  hadoopConf.setOutputValueClass(valueClass)
  conf.setOutputFormat(outputFormatClass)
  for (c <- codec) {
    hadoopConf.setCompressMapOutput(true)
    hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true")
    hadoopConf.setMapOutputCompressorClass(c)
    hadoopConf.set("mapreduce.output.fileoutputformat.compress.codec", c.getCanonicalName)
    hadoopConf.set("mapreduce.output.fileoutputformat.compress.type",
      CompressionType.BLOCK.toString)
  }

  // Use configured output committer if already set
  if (conf.getOutputCommitter == null) {
    hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
  }

  // When speculation is on and output committer class name contains "Direct", we should warn
  // users that they may loss data if they are using a direct output committer.
  val speculationEnabled = self.conf.getBoolean("spark.speculation", false)
  val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "")
  if (speculationEnabled && outputCommitterClass.contains("Direct")) {
    val warningMessage =
      s"$outputCommitterClass may be an output committer that writes data directly to " +
        "the final location. Because speculation is enabled, this output committer may " +
        "cause data loss (see the case in SPARK-10063). If possible, please use an output " +
        "committer that does not have this behavior (e.g. FileOutputCommitter)."
    logWarning(warningMessage)
  }

  FileOutputFormat.setOutputPath(hadoopConf,
    SparkHadoopWriterUtils.createPathFromString(path, hadoopConf))
  saveAsHadoopDataset(hadoopConf)
}
/**
 * Output the RDD to any Hadoop-supported storage system, using a Hadoop JobConf object for
 * that storage system. The JobConf should set an OutputFormat and any output paths required
 * (e.g. a table name to write to) in the same way as it would be configured for a Hadoop
 * MapReduce job.
 */
def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope {
  val config = new HadoopMapRedWriteConfigUtil[K, V](new SerializableJobConf(conf))
  SparkHadoopWriter.write(
    rdd = self,
    config = config)
}

 

 

 

 

 

    24、saveAsObjectFile算子

saveAsObjectFile将分区中的每10个元素组成一个Array,然后将这个Array序列化,映射为(Null,BytesWritable(Y))的元素,写入HDFS为SequenceFile的格式。
  下面代码为函数内部实现。
  map(x=>(NullWritable.get(),new BytesWritable(Utils.serialize(x))))
  图24中的左侧方框代表RDD分区,右侧方框代表HDFS的Block。 通过函数将RDD的每个分区存储为HDFS上的一个Block。

 

Spark2.3.2源码解析: 算子解析 [ 四 . Action算子  ]

 

/**
 * Save this RDD as a SequenceFile of serialized objects.
 */
def saveAsObjectFile(path: String): Unit = withScope {
  this.mapPartitions(iter => iter.grouped(10).map(_.toArray))
    .map(x => (NullWritable.get(), new BytesWritable(Utils.serialize(x))))
    .saveAsSequenceFile(path)
}

 

 

/**
 * Output the RDD as a Hadoop SequenceFile using the Writable types we infer from the RDD's key
 * and value types. If the key or value are Writable, then we use their classes directly;
 * otherwise we map primitive types such as Int and Double to IntWritable, DoubleWritable, etc,
 * byte arrays to BytesWritable, and Strings to Text. The `path` can be on any Hadoop-supported
 * file system.
 */
def saveAsSequenceFile(
    path: String,
    codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {
  def anyToWritable[U <% Writable](u: U): Writable = u

  // TODO We cannot force the return type of `anyToWritable` be same as keyWritableClass and
  // valueWritableClass at the compile time. To implement that, we need to add type parameters to
  // SequenceFileRDDFunctions. however, SequenceFileRDDFunctions is a public class so it will be a
  // breaking change.
  val convertKey = self.keyClass != _keyWritableClass
  val convertValue = self.valueClass != _valueWritableClass

  logInfo("Saving as sequence file of type " +
    s"(${_keyWritableClass.getSimpleName},${_valueWritableClass.getSimpleName})" )
  val format = classOf[SequenceFileOutputFormat[Writable, Writable]]
  val jobConf = new JobConf(self.context.hadoopConfiguration)
  if (!convertKey && !convertValue) {
    self.saveAsHadoopFile(path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)
  } else if (!convertKey && convertValue) {
    self.map(x => (x._1, anyToWritable(x._2))).saveAsHadoopFile(
      path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)
  } else if (convertKey && !convertValue) {
    self.map(x => (anyToWritable(x._1), x._2)).saveAsHadoopFile(
      path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)
  } else if (convertKey && convertValue) {
    self.map(x => (anyToWritable(x._1), anyToWritable(x._2))).saveAsHadoopFile(
      path, _keyWritableClass, _valueWritableClass, format, jobConf, codec)
  }
}

 

/**
 * Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
 * supporting the key and value types K and V in this RDD.
 *
 * @note We should make sure our tasks are idempotent when speculation is enabled, i.e. do
 * not use output committer that writes data directly.
 * There is an example in https://issues.apache.org/jira/browse/SPARK-10063 to show the bad
 * result of using direct output committer with speculation enabled.
 */
def saveAsHadoopFile(
    path: String,
    keyClass: Class[_],
    valueClass: Class[_],
    outputFormatClass: Class[_ <: OutputFormat[_, _]],
    conf: JobConf = new JobConf(self.context.hadoopConfiguration),
    codec: Option[Class[_ <: CompressionCodec]] = None): Unit = self.withScope {
  // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
  val hadoopConf = conf
  hadoopConf.setOutputKeyClass(keyClass)
  hadoopConf.setOutputValueClass(valueClass)
  conf.setOutputFormat(outputFormatClass)
  for (c <- codec) {
    hadoopConf.setCompressMapOutput(true)
    hadoopConf.set("mapreduce.output.fileoutputformat.compress", "true")
    hadoopConf.setMapOutputCompressorClass(c)
    hadoopConf.set("mapreduce.output.fileoutputformat.compress.codec", c.getCanonicalName)
    hadoopConf.set("mapreduce.output.fileoutputformat.compress.type",
      CompressionType.BLOCK.toString)
  }

  // Use configured output committer if already set
  if (conf.getOutputCommitter == null) {
    hadoopConf.setOutputCommitter(classOf[FileOutputCommitter])
  }

  // When speculation is on and output committer class name contains "Direct", we should warn
  // users that they may loss data if they are using a direct output committer.
  val speculationEnabled = self.conf.getBoolean("spark.speculation", false)
  val outputCommitterClass = hadoopConf.get("mapred.output.committer.class", "")
  if (speculationEnabled && outputCommitterClass.contains("Direct")) {
    val warningMessage =
      s"$outputCommitterClass may be an output committer that writes data directly to " +
        "the final location. Because speculation is enabled, this output committer may " +
        "cause data loss (see the case in SPARK-10063). If possible, please use an output " +
        "committer that does not have this behavior (e.g. FileOutputCommitter)."
    logWarning(warningMessage)
  }

  FileOutputFormat.setOutputPath(hadoopConf,
    SparkHadoopWriterUtils.createPathFromString(path, hadoopConf))
  saveAsHadoopDataset(hadoopConf)
}

 

 

 

三、Scala集合和数据类型

    25、collect算子

collect 相当于 toArray, toArray 已经过时不推荐使用, collect 将分布式的 RDD 返回为一个单机的 scala Array 数组。在这个数组上运用 scala 的函数式操作。
  图 25中左侧方框代表 RDD 分区,右侧方框代表单机内存中的数组。通过函数操作,将结果返回到 Driver 程序所在的节点,以数组形式存储。

Spark2.3.2源码解析: 算子解析 [ 四 . Action算子  ]

 

/**
 * Return an array that contains all of the elements in this RDD.
 *
 * @note This method should only be used if the resulting array is expected to be small, as
 * all the data is loaded into the driver's memory.
 */
def collect(): Array[T] = withScope {
  val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
  Array.concat(results: _*)
}

 

 

    26、collectAsMap算子

 

collectAsMap对(K,V)型的RDD数据返回一个单机HashMap。 对于重复K的RDD元素,后面的元素覆盖前面的元素。
  图26中的左侧方框代表RDD分区,右侧方框代表单机数组。 数据通过collectAsMap函数返回给Driver程序计算结果,结果以HashMap形式存储。

Spark2.3.2源码解析: 算子解析 [ 四 . Action算子  ]

/**
 * Return the key-value pairs in this RDD to the master as a Map.
 *
 * Warning: this doesn't return a multimap (so if you have multiple values to the same key, only
 *          one value per key is preserved in the map returned)
 *
 * @note this method should only be used if the resulting data is expected to be small, as
 * all the data is loaded into the driver's memory.
 */
def collectAsMap(): Map[K, V] = self.withScope {

  val data = self.collect()

  val map = new mutable.HashMap[K, V]

  map.sizeHint(data.length)
  data.foreach { pair => map.put(pair._1, pair._2) }
  map

}

 

 

    27、reduceByKeyLocally算子

 

实现的是先reduce再collectAsMap的功能,先对RDD的整体进行reduce操作,然后再收集所有结果返回为一个HashMap。

 

 

 

/**
 * Merge the values for each key using an associative and commutative reduce function, but return
 * the results immediately to the master as a Map. This will also perform the merging locally on
 * each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce.
 */
def reduceByKeyLocally(func: (V, V) => V): Map[K, V] = self.withScope {
  val cleanedF = self.sparkContext.clean(func)

  if (keyClass.isArray) {
    throw new SparkException("reduceByKeyLocally() does not support array keys")
  }

  val reducePartition = (iter: Iterator[(K, V)]) => {
    val map = new JHashMap[K, V]
    iter.foreach { pair =>
      val old = map.get(pair._1)
      map.put(pair._1, if (old == null) pair._2 else cleanedF(old, pair._2))
    }
    Iterator(map)
  } : Iterator[[K, V]]

  val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => {
    m2.asScala.foreach { pair =>
      val old = m1.get(pair._1)
      m1.put(pair._1, if (old == null) pair._2 else cleanedF(old, pair._2))
    }
    m1
  } : JHashMap[K, V]

  self.mapPartitions(reducePartition).reduce(mergeMaps).asScala
}


 

    28、lookup算子

 

下面代码为lookup的声明。
lookup(key:K):Seq[V]
Lookup函数对(Key,Value)型的RDD操作,返回指定Key对应的元素形成的Seq。 这个函数处理优化的部分在于,如果这个RDD包含分区器,则只会对应处理K所在的分区,然后返回由(K,V)形成的Seq。 如果RDD不包含分区器,则需要对全RDD元素进行暴力扫描处理,搜索指定K对应的元素。
  图28中的左侧方框代表RDD分区,右侧方框代表Seq,最后结果返回到Driver所在节点的应用中。

 

Spark2.3.2源码解析: 算子解析 [ 四 . Action算子  ]

/**
 * Return the list of values in the RDD for key `key`. This operation is done efficiently if the
 * RDD has a known partitioner by only searching the partition that the key maps to.
 */
def lookup(key: K): Seq[V] = self.withScope {
  self.partitioner match {
    case Some(p) =>
      val index = p.getPartition(key)
      val process = (it: Iterator[(K, V)]) => {
        val buf = new ArrayBuffer[V]
        for (pair <- it if pair._1 == key) {
          buf += pair._2
        }
        buf
      } : Seq[V]
      val res = self.context.runJob(self, process, Array(index))
      res(0)
    case None =>
      self.filter(_._1 == key).map(_._2).collect()
  }
}

 

    29、count算子

 

 count 返回整个 RDD 的元素个数。
  内部函数实现为:
  defcount():Long=sc.runJob(this,Utils.getIteratorSize_).sum
  图 29中,返回数据的个数为 5。一个方块代表一个 RDD 分区。

 

Spark2.3.2源码解析: 算子解析 [ 四 . Action算子  ]

/**
 * Return the number of elements in the RDD.
 */
def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

 

 

 

    30、top算子

top可返回最大的k个元素。 函数定义如下。
top(num:Int)(implicit ord:Ordering[T]):Array[T]

相近函数说明如下。
·top返回最大的k个元素。
·take返回最小的k个元素。
·takeOrdered返回最小的k个元素,并且在返回的数组中保持元素的顺序。
·first相当于top(1)返回整个RDD中的前k个元素,可以定义排序的方式Ordering[T]。
返回的是一个含前k个元素的数组。

 

/**
 * Returns the top k (largest) elements from this RDD as defined by the specified
 * implicit Ordering[T] and maintains the ordering. This does the opposite of
 * [[takeOrdered]]. For example:
 * {{{
 *   sc.parallelize(Seq(10, 4, 2, 12, 3)).top(1)
 *   // returns Array(12)
 *
 *   sc.parallelize(Seq(2, 3, 4, 5, 6)).top(2)
 *   // returns Array(6, 5)
 * }}}
 *
 * @note This method should only be used if the resulting array is expected to be small, as
 * all the data is loaded into the driver's memory.
 *
 * @param num k, the number of top elements to return
 * @param ord the implicit ordering for T
 * @return an array of top elements
 */
def top(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
  takeOrdered(num)(ord.reverse)
}

 

 

 

 

    31、reduce算子

 

 reduce函数相当于对RDD中的元素进行reduceLeft函数的操作。 函数实现如下。
  Some(iter.reduceLeft(cleanF))
  reduceLeft先对两个元素<K,V>进行reduce函数操作,然后将结果和迭代器取出的下一个元素<k,V>进行reduce函数操作,直到迭代器遍历完所有元素,得到最后结果。在RDD中,先对每个分区中的所有元素<K,V>的集合分别进行reduceLeft。 每个分区形成的结果相当于一个元素<K,V>,再对这个结果集合进行reduceleft操作。
  例如:用户自定义函数如下。
  f:(A,B)=>(A._1+"@"+B._1,A._2+B._2)
  图31中的方框代表一个RDD分区,通过用户自定函数f将数据进行reduce运算。 示例
最后的返回结果为[email protected][1]V2U!@[email protected]@U4,12。

 

Spark2.3.2源码解析: 算子解析 [ 四 . Action算子  ]

 

/**
 * Reduces the elements of this RDD using the specified commutative and
 * associative binary operator.
 */
def reduce(f: (T, T) => T): T = withScope {
  val cleanF = sc.clean(f)
  val reducePartition: Iterator[T] => Option[T] = iter => {
    if (iter.hasNext) {
      Some(iter.reduceLeft(cleanF))
    } else {
      None
    }
  }
  var jobResult: Option[T] = None
  val mergeResult = (index: Int, taskResult: Option[T]) => {
    if (taskResult.isDefined) {
      jobResult = jobResult match {
        case Some(value) => Some(f(value, taskResult.get))
        case None => taskResult
      }
    }
  }
  sc.runJob(this, reducePartition, mergeResult)
  // Get the final result out of our Option, or throw an exception if the RDD was empty
  jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}

 

/** Applies a binary operator to all elements of this $coll,
 *  going left to right.
 *  $willNotTerminateInf
 *  $orderDependentFold
 *
 *  @param  op    the binary operator.
 *  @tparam  B    the result type of the binary operator.
 *  @return  the result of inserting `op` between consecutive elements of this $coll,
 *           going left to right:
 *           {{{
 *             op( op( ... op(x_1, x_2) ..., x_{n-1}), x_n)
 *           }}}
 *           where `x,,1,,, ..., x,,n,,` are the elements of this $coll.
 *  @throws UnsupportedOperationException if this $coll is empty.   */
def reduceLeft[B >: A](op: (B, A) => B): B = {
  if (isEmpty)
    throw new UnsupportedOperationException("empty.reduceLeft")

  var first = true
  var acc: B = 0.asInstanceOf[B]

  for (x <- self) {
    if (first) {
      acc = x
      first = false
    }
    else acc = op(acc, x)
  }
  acc
}

 

 

 

 

 

    32、fold算子

 fold和reduce的原理相同,但是与reduce不同,相当于每个reduce时,迭代器取的第一个元素是zeroValue。
  图32中通过下面的用户自定义函数进行fold运算,图中的一个方框代表一个RDD分区。 读者可以参照reduce函数理解。
  fold(("[email protected]",2))( (A,B)=>(A._1+"@"+B._1,A._2+B._2))

 


fold算子签名:  def fold(zeroValue: T)(op: (T, T) => T): T ,算子其实就是先对rdd分区的每一个分区进行使用op函数,在调用op函数过程中将zeroValue参与计算,最后在对每一个分区的结果调用op函数,同理此处zeroValue再次参与计算!
 

 

 

Spark2.3.2源码解析: 算子解析 [ 四 . Action算子  ]

 

 

/**
 * Aggregate the elements of each partition, and then the results for all the partitions, using a
 * given associative function and a neutral "zero value". The function
 * op(t1, t2) is allowed to modify t1 and return it as its result value to avoid object
 * allocation; however, it should not modify t2.
 *
 * This behaves somewhat differently from fold operations implemented for non-distributed
 * collections in functional languages like Scala. This fold operation may be applied to
 * partitions individually, and then fold those results into the final result, rather than
 * apply the fold to each element sequentially in some defined ordering. For functions
 * that are not commutative, the result may differ from that of a fold applied to a
 * non-distributed collection.
 *
 * @param zeroValue the initial value for the accumulated result of each partition for the `op`
 *                  operator, and also the initial value for the combine results from different
 *                  partitions for the `op` operator - this will typically be the neutral
 *                  element (e.g. `Nil` for list concatenation or `0` for summation)
 * @param op an operator used to both accumulate results within a partition and combine results
 *                  from different partitions
 */
def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
  // Clone the zero value since we will also be serializing it as part of tasks
  var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
  val cleanOp = sc.clean(op)
  val foldPartition = (iter: Iterator[T]) => iter.fold(zeroValue)(cleanOp)
  val mergeResult = (index: Int, taskResult: T) => jobResult = op(jobResult, taskResult)
  sc.runJob(this, foldPartition, mergeResult)
  jobResult
}

 

 

 

 

    33、aggregate算子
 

aggregate先对每个分区的所有元素进行aggregate操作,再对分区的结果进行fold操作。
  aggreagate与fold和reduce的不同之处在于,aggregate相当于采用归并的方式进行数据聚集,这种聚集是并行化的。 而在fold和reduce函数的运算过程中,每个分区中需要进行串行处理,每个分区串行计算完结果,结果再按之前的方式进行聚集,并返回最终聚集结果。
  函数的定义如下。
aggregate[B](z: B)(seqop: (B,A) => B,combop: (B,B) => B): B
  图33通过用户自定义函数对RDD 进行aggregate的聚集操作,图中的每个方框代表一个RDD分区。
  rdd.aggregate("[email protected]",2)((A,B)=>(A._1+"@"+B._1,A._2+B._2)),(A,B)=>(A._1+"@"+B_1,[email protected]+B_.2))
  最后,介绍两个计算模型中的两个特殊变量。
  广播(broadcast)变量:其广泛用于广播Map Side Join中的小表,以及广播大变量等场景。 这些数据集合在单节点内存能够容纳,不需要像RDD那样在节点之间打散存储。
Spark运行时把广播变量数据发到各个节点,并保存下来,后续计算可以复用。 相比Hadoo的distributed cache,广播的内容可以跨作业共享。 Broadcast的底层实现采用了BT机制。

 

Spark2.3.2源码解析: 算子解析 [ 四 . Action算子  ]

 

 

/**
 * Aggregate the elements of each partition, and then the results for all the partitions, using
 * given combine functions and a neutral "zero value". This function can return a different result
 * type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U
 * and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are
 * allowed to modify and return their first argument instead of creating a new U to avoid memory
 * allocation.
 *
 * @param zeroValue the initial value for the accumulated result of each partition for the
 *                  `seqOp` operator, and also the initial value for the combine results from
 *                  different partitions for the `combOp` operator - this will typically be the
 *                  neutral element (e.g. `Nil` for list concatenation or `0` for summation)
 * @param seqOp an operator used to accumulate results within a partition
 * @param combOp an associative operator used to combine results from different partitions
 */
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
  // Clone the zero value since we will also be serializing it as part of tasks
  var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance())
  val cleanSeqOp = sc.clean(seqOp)
  val cleanCombOp = sc.clean(combOp)
  val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
  val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult)
  sc.runJob(this, aggregatePartition, mergeResult)
  jobResult
}