spark三类算子小总结

spark算子概述

RDD:弹性分布式数据集,是一种特殊集合、支持多种来源、有容错机制、可以被缓存、支持并行操作,一个RDD代表多个分区里的数据集。

RDD有三种操作算子:

1. Transformation(转换)

Transformation属于延迟计算,当一个RDD转换成另一个RDD时并没有立即进行转换,仅仅是记住了数据集的逻辑操作。当有Action算子出现时,他才会真正的执行

2. Action(执行)

触发Spark作业的运行,真正触发转换算子的计算。

3. 控制

Spark中控制算子也是懒执行的,需要Action算子触发才能执行,主要是为了对数据进行缓存。。当有Action算子出现时,他才会真正的执行

其中延迟计算就是懒执行的意思,就像是创建了一个视图,他并不是把查询好的数据放入视图了,而是当你需要这些数据时,查看视图时,他才执行定义视图时候的SQL语句。

需要说明的是,下面写的scala代码,其实都是可以简写的,但是为了方便理解,我都没有简写,因为要简写的话对于scala来说真的就是一句话的事情了。

Transformation算子

概述

需要操作的Transformation算子说明如下:

  • map(f:T→U): RDD[T]→RDD[U]
    返回一个新的分布式数据集,由每个原元素经过func函数转换后组成
  • filter(f:T→Boolean) : RDD[T]→RDD[T]
    返回一个新的数据集,由经过func函数后返回值为true的原元素组成
  • flatMap(f:→ Seq[U]) : RDD[T]→RDD[U]
    类似于map,但是每一个输入元素,会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素)
  • groupByKey() : RDD[(K,V)] → RDD[(K,Seq[V])]
    在一个由(K,V)对组成的数据集上调用,返回一个(K,Seq[V])对的数据集。注意:默认情况下,使用8个并行任务进行分组,你可以传入numTask可选参数,根据数据量设置不同数目的Task
  • reduceByKey(f: (V,V)→V) :RDD[(K,V)] → RDD[(K,V)]
    在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据集,key相同的值,都被使用指定的reduce函数聚合到一起。和groupbykey类似,任务的个数是可以通过第二个可选参数来配置的。
  • sample(withReplacement,fraction,seed)
    Boolean withReplacement : 抽样的方式 true放回式抽样 false 不放回式抽样
    Double fraction:抽样比例
    Long seed:随机算法的初始值
    根据给定的随机种子seed,随机抽样出数量为fraction的数据
  • union(otherDataset)
    返回一个新的数据集,由原数据集和参数联合而成
  • join(otherDataset, [numTasks])内连接
    在类型为(K,V)和(K,W)类型的数据集上调用,返回一个(K,(V,W))对,每个key中的所有元素都在一起的数据集

测试

获取RDD

    /**
      * 1.把本地集合转化为RDD
      *    val rdd = sc.parallelize(List((1,1),(1,2),(1,3),(2,2),(3,1)))
      * 2.把本地集合转化为RDD
      *    val rdd1 = sc.makeRDD(List((1,1),(1,2),(1,3),(2,2),(3,1)))
      * 3. 调用SparkContext.textFile()方法,从外部存储中读取数据来创建 RDD
      *    val lineRDD = sc.textFile("d:/test.txt")
      */

Map

object Map {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("MapTest")
    conf.setMaster("local")
     // 设置Spark上下文,很重要
    val sc = new SparkContext(conf)
	 /**
      * 1.把本地集合转化为RDD
      *    val rdd = sc.parallelize(List((1,1),(1,2),(1,3),(2,2),(3,1)))
      * 2.把本地集合转化为RDD
      *    val rdd1 = sc.makeRDD(List((1,1),(1,2),(1,3),(2,2),(3,1)))
      * 3. 调用SparkContext.textFile()方法,从外部存储中读取数据来创建 RDD
      *    val lineRDD = sc.textFile("d:/test.txt")
      */
    val lineRdd = sc.textFile("d:/test.txt")
    lineRdd.map(x => x + "_经过Map处理").foreach(println)
    sc.stop()
  }
}

执行结果如下

helli a_经过Map处理
helli b_经过Map处理
helli c_经过Map处理
helli d_经过Map处理
helli e_经过Map处理

filter

测试代码如下:

object filtet {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("filter")
    val sc = new SparkContext(conf)
	 /**
      * 从本地集合1-10个数字导入RDD
      * 在SC的filter方法得到偶数
      * 然后打印
      */
    var rdd = sc.makeRDD(1 to 10)
    rdd.filter(x => x % 2 == 0).foreach(println)

    sc.stop()
  }
}

输出结果如下

2
4
6
8
10

flatMap和reduceByKey

我们用flatMap写一个简单的Word Count

object WCSpark {
  def main(args: Array[String]): Unit = {
    //创建配置对象
    val conf = new SparkConf()
    //设置App的名称   有啥用? 方便在监控页面找到  MR-》Yarn 8088
    conf.setAppName("WCSpark")
    //设置Spark的运行模式  local本地运行  用于测试环境
    conf.setMaster("local")

    //创建Spark上下文 他是通往集群的唯一通道
    val sc = new SparkContext(conf)

    /**
      * 处理数据   在SparkCore中一切得计算都是基于RDD
      * R(Resilient)D(Distributed )D(Dataset)
      * RDD 弹性分布式数据集
      */
    val lineRDD = sc.textFile("d:/test.txt")
    //基于lineRDD中的数据 进行分词
    val wordRDD = lineRDD.flatMap { _.split(" ") }
    //每一个单词计数为1  pairRDD  K:word V:1
    val pairRDD = wordRDD.map { (_,1) }
    //相同的单词进行分组,对组内的数据进行累加
    //restRDD K:word V:count
    val restRDD = pairRDD.reduceByKey((v1,v2)=>v1+v2)
    /**
      * 根据单词出现的次数来排序
      * sortByKey 根据key来排序
      * sortBy
      */
    //    restRDD
    //      .map(_.swap)
    //      .sortByKey(false)
    //      .map(_.swap)
    //      .foreach(println)

    //释放资源
    sc.stop()
  }
}

执行结果:

(helli,5)
(d,1)
(e,1)
(a,1)
(b,1)
(c,1)

sample

测试代码如下

object sample {
  def main(args: Array[String]): Unit = {

    var conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("sampleTest")
    var sc = new SparkContext(conf)

    /**
      * Boolean withReplacement : 抽样的方式   true放回式抽样   false 不放回式抽样
      * Double fraction:抽样比例
      * Long seed:随机算法的初始值
      */
    var testRdd = sc.parallelize(1 to 10000)
     val l = testRdd.sample(true,0.2).count()
    println(l)
    sc.stop()
  }
}		

执行结果:
因为是随机算法,所以并不一定严格按照比例

2011

union

测试代码


object union {
  def main(args: Array[String]): Unit = {
    var conf = new SparkConf()
    conf.setAppName("union")
    conf.setMaster("local")
    var sc = new SparkContext(conf)

    var rdd1 = sc.parallelize(1 to 3,2)
    var rdd2 = sc.makeRDD(100 to 103,2)
    var resultRdd = rdd1.union(rdd2)
    resultRdd.foreach(println)
    println("rdd1有2个分区")
    println("rdd2有2个分区")
    println("那么他们union之后新的rdd有几个分区?")
    println(resultRdd.getNumPartitions)
    sc.stop()
  }
}
  }
}

执行结果:

1
2
3
100
101
102
103
rdd1有2个分区
rdd2有2个分区
那么他们union之后新的rdd有几个分区?
4

GroupByKey

object GroupByKey {
  def main(args: Array[String]): Unit = {
    // 获取配置对象
    val conf = new SparkConf()
    // 设置工作的名字
    conf.setAppName("WCSpark")
    // 设置运行模式
    conf.setMaster("local")
    // 设置Spark上下文,很重要
    val sc = new SparkContext(conf)
    /**
      * 1.把本地集合转化为RDD
      *    val rdd = sc.parallelize(List((1,1),(1,2),(1,3),(2,2),(3,1)))
      * 2.把本地集合转化为RDD
      *    val rdd1 = sc.makeRDD(List((1,1),(1,2),(1,3),(2,2),(3,1)))
      * 3. 调用SparkContext.textFile()方法,从外部存储中读取数据来创建 RDD
      *    val lineRDD = sc.textFile("d:/test.txt")
      */
    // 把本地集合转化为RDD
    val rdd = sc.parallelize(List((1,1),(1,2),(1,3),(2,2),(3,1)))
    // 根据key值分组
    val groupRDD = rdd.groupByKey()
    // 对相同key值的Value进行操作
    groupRDD.foreach(x=>{
      val key = x._1
      val values = x._2
      val iterator = values.iterator
      var sum = 0
      while(iterator.hasNext){
        sum += iterator.next()
      }
      println("key:" + key + "\tsum: " + sum)
    })

    rdd.reduceByKey(_+_).foreach(println)

    sc.stop()
  }
}

执行结果

(1,6)
(3,1)
(2,2)

join

他就是内连接,其实还有外连接

  1. leftOuterJoin;左外连接
  2. rightOuterJoin;右外连接
  3. fullOuterJoin;全外连接

测试代码:

object join {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local")
    conf.setAppName("join")

    val sc = new SparkContext(conf)

    val nameRdd = sc.makeRDD(Array((1,"Angelababy"),(2,"Dilireba"),(3,"gulinazha")))
    val facePowerRDD = sc.parallelize(List((1,99),(2,98),(3,97),(3,97),(4,59)))

    println("内连接,进行连接的两个rdd的顺序关系到结果的Value值中K-V的顺序。例如:")
    nameRdd.join(facePowerRDD).foreach(println)
    facePowerRDD.join(nameRdd).foreach(println)

    println("如下,是三个外连接")
    val leftOuterJoinRDD = nameRdd.leftOuterJoin(facePowerRDD)
    leftOuterJoinRDD.foreach(println)

    val rightOuterJoinRDD = nameRdd.rightOuterJoin(facePowerRDD)
    rightOuterJoinRDD.foreach(println)

    val fullOuterJoinRDD = nameRdd.fullOuterJoin(facePowerRDD)
    fullOuterJoinRDD.foreach(println)
    sc.stop()
  }
}

执行结果


内连接,进行连接的两个rdd的顺序关系到结果的Value值中K-V的顺序。例如:


(1,(Angelababy,99))
(3,(gulinazha,97))
(3,(gulinazha,97))
(2,(Dilireba,98))

(1,(99,Angelababy))
(3,(97,gulinazha))
(3,(97,gulinazha))
(2,(98,Dilireba))

如下,是三个外连接


(1,(Angelababy,Some(99)))
(3,(gulinazha,Some(97)))
(3,(gulinazha,Some(97)))
(2,(Dilireba,Some(98)))


(4,(None,59))
(1,(Some(Angelababy),99))
(3,(Some(gulinazha),97))
(3,(Some(gulinazha),97))
(2,(Some(Dilireba),98))

(4,(None,Some(59)))
(1,(Some(Angelababy),Some(99)))
(3,(Some(gulinazha),Some(97)))
(3,(Some(gulinazha),Some(97)))
(2,(Some(Dilireba),Some(98)))

Action(执行)算子

  • collect()
    在Driver的程序中,以数组的形式,返回数据集的所有元素。这通常会在使用filter或者其它操作后,返回一个足够小的数据子集再使用,直接将整个RDD集Collect返回,很可能会让Driver程序OOM
  • count()
    返回数据集的元素个数
  • foreach(func)
    在数据集的每一个元素上,运行函数func。这通常用于更新一个累加器变量,或者和外部存储系统做交互
  • reduce(func)
    通过函数func聚集数据集中的所有元素。Func函数接受2个参数,返回一个值。这个函数必须是关联性的,确保可以被正确的并发执行

如何鉴别Action算子和Transformation算子

看返回值,若返回RDD则是Transformation算子

控制算子

控制算法的提出

请看伪代码

RDD是一个抽象的概念、并不是真实的存储数据

      Applicationlines = sc.textFile( "htd://.. )

	  errors = lines.filter( .startsWith( "ERROR" ))

     Mysql_ errors = errors.fiter( .contain( "MySQL" ))

	Mysql_ errors.count //job0   -个action类算子 就会触发一个job执行 action类算子个数=job个数

	http. errors = rrs.filter( .contain( "Http"))
	http_ errors .count //job1

        每当遇到一个count(即Action算子)就会执行一次job任务,对Mysql_ errors执行job任务,但是Mysql_ errors无数据,所以去找他的依赖 errors,但是 errors也没有数据所以去找他的依赖,知道找到最前面的hdfs中的源数据,从此开始计算,得到最后进行job的数据。

        当第二个job工作时还是什么数据也没有,要和第一个job的流程一样,所以这样很慢,所以我们可以使用控制算子,将第一个job执行后的数据放入内存,第二个以及以后的任务可以直接从内存中拿到数据,不必再去HDFS中重新计算。
      这里有一个注意点,只有当第一个count出发执行时,才会向内存中写数据,但是数据是一条一条执行往内存中写的,所以第一个job的文件的数据来源是来自于HDFS中的(不是先写入内存中,才从内存中拿出来,这样不如直接从HDFS中拿取快),直到第一个count执行完成之后,内存中才会写完完成的数据.


注意点

  1. 控制类算子都是懒执行的,需要一个action类算子触发执行
  2. 控制类算子后面不能立即紧跟action类算子(rdd.cache().count()?
  3. cache和persist算子的返回值可以赋值给一个变量,在其他job中直接使用这个变量就是使用持久化的数据了。持久化的单位是partition。

问题

  1. 如果你写的Application中只有一个action类算子,那么有没有必要使用控制类算子?
    没有,因为必须等一个action算子执行完后才会往内存中(获其他)写好数据,第一个job是用不到这些数据的
  2. 如果持久化的级别选择的是MEMORY_ONLY|cache,如果内存不足,会不会OOM?
    不会,内存不足,能存多少就存多少,不会报错。剩下存不下的就不存了,等到时候用的时候会根据依赖关系重新计算。
    在存储的时候,存储单元是partition
  3. 如果持久化的级别是MEMOPRY_AND_DISK,会不会将一个partition的数据一部分存储到内存一部分在磁盘?
    存储单元是partition

测试代码

object CacheTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setAppName("WCSpark")
    conf.setMaster("local")
    val sc = new SparkContext(conf)
    var rdd = sc.textFile("d:/wc.txt")
    //textFile方法去wc.txt中读来的数据 一边会存储到内存中,一边进行count计算
    val rdd1  = rdd.cache()  //cache = persist(StorageLevel.MEMORY_ONLY)
    //    rdd = rdd.persist(StorageLevel.MEMORY_ONLY)
    /**
      * 控制类算子注意点:
      * 		1、控制类算子都是懒执行的,需要一个action类算子触发执行
      * 		2、控制类算子后面不能立即紧跟action类算子
      *
      * 问题:
      * 		1、如果你写的Application中只有一个action类算子,那么有没有必要使用控制类算子?
      * 			没有
      * 		2、如果持久化的级别选择的是MEMORY_ONLY|cache,如果内存不足,会不会OOM?
      * 			不会,内存不足,能存多少就存多少,不会报错。剩下存不下的就不存了
      * 			在存储的时候,存储单元是partition
      * 		3、 如果持久化的级别是MEMOPRY_AND_DISK,会不会将一个partition的数据一部分存储到内存一部分在磁盘?
      * 			存储单元是partition
      */


    val startTime = System.currentTimeMillis()
    var count = rdd.count() //job0
    val endTime = System.currentTimeMillis()
    println("count:" + count + "\tduration:" + (endTime-startTime) + "ms")

    val startTime1 = System.currentTimeMillis()
    count = rdd1.count() //job1  第二个job在执行时候,直接从内存中拿rdd的数据,就不需要从磁盘中去读了
    val endTime1 = System.currentTimeMillis()
    println("count:" + count + "\tduration:" + (endTime1-startTime1) + "ms")

        rdd.unpersist()
        sc.stop()


  }
}

常用的控制算子

  1. cache()
  2. persist()
  3. checkpoint()
  4. unpersist()

以上1,2,3算子都可以将RDD持久化,持久化的最小单位是partition。其中【cache () = persist()=persist(StorageLevel.Memory_Only)】


4算子是解除持久化。

Spark的任务监控是端口4040
spark三类算子小总结
如图,各个字段的意义

persist()持久化的级别

我们来看Spark的源代码:
spark三类算子小总结
可以得到cache=persist(StorageLevel.MEMORY_ONLY)
spark三类算子小总结