[Spark进阶]--自定义Accumulator的AccumulatorParam
Spark版本:Spark-1.6.0
环境:Windows 10、jdk-1.7、scal-2.10.5、idea-2016
先看看官方对Accumulators的说明
官方已经提示如果想要自定义Accumulator的AccumulatorParam,那么可以参考如下
AccumulatorParam接口
/** * Helper object defining how to accumulate values of a particular type. An implicit * AccumulableParam needs to be available when you create [[Accumulable]]s of a specific type. * * @tparam R the full accumulated data (result type) * @tparam T partial data that can be added in */ trait AccumulableParam[R, T] extends Serializable { /** * Add additional data to the accumulator value. Is allowed to modify and return `r` * for efficiency (to avoid allocating objects). * * @param r the current value of the accumulator * @param t the data to be added to the accumulator * @return the new value of the accumulator */ def addAccumulator(r: R, t: T): R /** * Merge two accumulated values together. Is allowed to modify and return the first value * for efficiency (to avoid allocating objects). * * @param r1 one set of accumulated data * @param r2 another set of accumulated data * @return both data sets merged together */ def addInPlace(r1: R, r2: R): R /** * Return the "zero" (identity) value for an accumulator type, given its initial value. For * example, if R was a vector of N dimensions, this would return a vector of N zeroes. */ def zero(initialValue: R): R }
自定义实现AccumulatorParam
功能:自定义一个Set[String],主要用来存放错误的日志信息
说明:由于scala中不可变的集合是线程安全的,所以选择Set
代码类
StringSetAccumulatorParam:自定义AccumulatorParam的实现
import org.apache.spark.AccumulatorParam /** * Created by yangjf on 20180618. * Update date: * Time: 15:01 * Project: * Package: * Describe : Implement customer-param . * Result of Test: test ok * Command: * */ object StringSetAccumulatorParam extends AccumulatorParam[Set[String]] { def zero(initialValue: Set[String]): Set[String] = { Set() } def addInPlace(s1: Set[String], s2: Set[String]): Set[String] = { s1 ++ s2 } }
ErrorLogHostSet:创建一个共享变量,变量类型是Accumulator[Set[String]]
import org.apache.spark.{Accumulator, SparkContext} /** * Created by yangjf on 20170618. * Update date: * Time: 14:31 * Project: * Package: * Describe : 共享变量 * * Result of Test: test ok * */ object ErrorLogHostSet extends Serializable{ @volatile private var instanceError: Accumulator[Set[String]] = null def getInstance(sc: SparkContext): Accumulator[Set[String]] = { if (instanceError == null) { synchronized { if (instanceError == null) { instanceError = sc.accumulator(Set[String]())(StringSetAccumulatorParam) } } } instanceError } }
使用自定义的Accumulator
def main(args: Array[String]) { // 本地测试 val conf=new SparkConf().setAppName("TestCustomerAccumulator").setMaster("local[2]") val sc=new SparkContext(conf) val array=Array("a2","c4","6v","67s","3d","45s","2c6","35d","7c8d9","34dc5") // 创建RDD val dataRDD=sc.parallelize(array) // 获取Accumulator变量 val errorHostSet = ErrorLogHostSet.getInstance(dataRDD.sparkContext) val filterRDD=dataRDD.filter(element=>{ val result=element.contains("d") if(result){ // 如果包含字符d,则添加到Set集合中 errorHostSet += Set(element) } result }) // 打印符合条件的数据 filterRDD.foreach(println) println("--------------------------------------") // 打印errorHostSet errorHostSet.value.foreach(println) }
输出结果:
3d 35d 7c8d9 34dc5 -------------------------------------- 3d 35d 7c8d9 34dc5
参考
- 最新版本:https://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators
- spark-1.6.0:https://spark.apache.org/docs/1.6.0/programming-guide.html#accumulators-a-nameaccumlinka