[Spark进阶]--自定义Accumulator的AccumulatorParam

Spark版本:Spark-1.6.0

环境:Windows 10、jdk-1.7、scal-2.10.5、idea-2016

先看看官方对Accumulators的说明

[Spark进阶]--自定义Accumulator的AccumulatorParam

官方已经提示如果想要自定义Accumulator的AccumulatorParam,那么可以参考如下

[Spark进阶]--自定义Accumulator的AccumulatorParam

AccumulatorParam接口

/**
 * Helper object defining how to accumulate values of a particular type. An implicit
 * AccumulableParam needs to be available when you create [[Accumulable]]s of a specific type.
 *
 * @tparam R the full accumulated data (result type)
 * @tparam T partial data that can be added in
 */
trait AccumulableParam[R, T] extends Serializable {
  /**
   * Add additional data to the accumulator value. Is allowed to modify and return `r`
   * for efficiency (to avoid allocating objects).
   *
   * @param r the current value of the accumulator
   * @param t the data to be added to the accumulator
   * @return the new value of the accumulator
   */
  def addAccumulator(r: R, t: T): R

  /**
   * Merge two accumulated values together. Is allowed to modify and return the first value
   * for efficiency (to avoid allocating objects).
   *
   * @param r1 one set of accumulated data
   * @param r2 another set of accumulated data
   * @return both data sets merged together
   */
  def addInPlace(r1: R, r2: R): R

  /**
   * Return the "zero" (identity) value for an accumulator type, given its initial value. For
   * example, if R was a vector of N dimensions, this would return a vector of N zeroes.
   */
  def zero(initialValue: R): R
}

自定义实现AccumulatorParam

功能:自定义一个Set[String],主要用来存放错误的日志信息

说明:由于scala中不可变的集合是线程安全的,所以选择Set

代码类

StringSetAccumulatorParam:自定义AccumulatorParam的实现

import org.apache.spark.AccumulatorParam

/**
  * Created by yangjf on 20180618.
  * Update date:
  * Time: 15:01
  * Project: 
  * Package: 
  * Describe :  Implement customer-param .
  * Result of Test: test ok
  * Command:
  *
  */
object StringSetAccumulatorParam extends AccumulatorParam[Set[String]] {

  def zero(initialValue: Set[String]): Set[String] = { Set() }
  def addInPlace(s1: Set[String], s2: Set[String]): Set[String] = { s1 ++ s2 }

}

ErrorLogHostSet:创建一个共享变量,变量类型是Accumulator[Set[String]]

import org.apache.spark.{Accumulator, SparkContext}

/**
  * Created by yangjf on 20170618.
  * Update date:
  * Time: 14:31
  * Project: 
  * Package: 
  * Describe : 共享变量    
  *
  * Result of Test: test ok
  *
  */
object ErrorLogHostSet extends Serializable{
  @volatile private var instanceError: Accumulator[Set[String]] = null

  def getInstance(sc: SparkContext): Accumulator[Set[String]] = {
    if (instanceError == null) {
      synchronized {
        if (instanceError == null) {
          instanceError = sc.accumulator(Set[String]())(StringSetAccumulatorParam)
        }
      }
    }
    instanceError
  }

}

使用自定义的Accumulator

  def main(args: Array[String]) {
    // 本地测试
    val conf=new SparkConf().setAppName("TestCustomerAccumulator").setMaster("local[2]")
    val sc=new SparkContext(conf)
    val array=Array("a2","c4","6v","67s","3d","45s","2c6","35d","7c8d9","34dc5")
    // 创建RDD
    val dataRDD=sc.parallelize(array)
    // 获取Accumulator变量
    val errorHostSet = ErrorLogHostSet.getInstance(dataRDD.sparkContext)
    val filterRDD=dataRDD.filter(element=>{
      val result=element.contains("d")
      if(result){
        // 如果包含字符d,则添加到Set集合中
        errorHostSet += Set(element)
      }
      result
    })
    // 打印符合条件的数据
    filterRDD.foreach(println)
    println("--------------------------------------")
    // 打印errorHostSet
    errorHostSet.value.foreach(println)
  }

输出结果:

    3d
    35d
    7c8d9
    34dc5
    --------------------------------------
    3d
    35d
    7c8d9 
    34dc5


参考

  • 最新版本:https://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators
  • spark-1.6.0:https://spark.apache.org/docs/1.6.0/programming-guide.html#accumulators-a-nameaccumlinka