spark三类算子小总结
文章目录
spark算子概述
RDD:弹性分布式数据集,是一种特殊集合、支持多种来源、有容错机制、可以被缓存、支持并行操作,一个RDD代表多个分区里的数据集。
RDD有三种操作算子:
1. Transformation(转换)
Transformation属于延迟计算,当一个RDD转换成另一个RDD时并没有立即进行转换,仅仅是记住了数据集的逻辑操作。当有Action算子出现时,他才会真正的执行
2. Action(执行)
触发Spark作业的运行,真正触发转换算子的计算。
3. 控制
Spark中控制算子也是懒执行的,需要Action算子触发才能执行,主要是为了对数据进行缓存。。当有Action算子出现时,他才会真正的执行
其中延迟计算就是懒执行的意思,就像是创建了一个视图,他并不是把查询好的数据放入视图了,而是当你需要这些数据时,查看视图时,他才执行定义视图时候的SQL语句。
需要说明的是,下面写的scala代码,其实都是可以简写的,但是为了方便理解,我都没有简写,因为要简写的话对于scala来说真的就是一句话的事情了。
Transformation算子
概述
需要操作的Transformation算子说明如下:
- map(f:T→U): RDD[T]→RDD[U]
返回一个新的分布式数据集,由每个原元素经过func函数转换后组成 - filter(f:T→Boolean) : RDD[T]→RDD[T]
返回一个新的数据集,由经过func函数后返回值为true的原元素组成 - flatMap(f:→ Seq[U]) : RDD[T]→RDD[U]
类似于map,但是每一个输入元素,会被映射为0到多个输出元素(因此,func函数的返回值是一个Seq,而不是单一元素) - groupByKey() : RDD[(K,V)] → RDD[(K,Seq[V])]
在一个由(K,V)对组成的数据集上调用,返回一个(K,Seq[V])对的数据集。注意:默认情况下,使用8个并行任务进行分组,你可以传入numTask可选参数,根据数据量设置不同数目的Task - reduceByKey(f: (V,V)→V) :RDD[(K,V)] → RDD[(K,V)]
在一个(K,V)对的数据集上使用,返回一个(K,V)对的数据集,key相同的值,都被使用指定的reduce函数聚合到一起。和groupbykey类似,任务的个数是可以通过第二个可选参数来配置的。 - sample(withReplacement,fraction,seed)
Boolean withReplacement : 抽样的方式 true放回式抽样 false 不放回式抽样
Double fraction:抽样比例
Long seed:随机算法的初始值
根据给定的随机种子seed,随机抽样出数量为fraction的数据 - union(otherDataset)
返回一个新的数据集,由原数据集和参数联合而成 - join(otherDataset, [numTasks])内连接
在类型为(K,V)和(K,W)类型的数据集上调用,返回一个(K,(V,W))对,每个key中的所有元素都在一起的数据集
测试
获取RDD
/**
* 1.把本地集合转化为RDD
* val rdd = sc.parallelize(List((1,1),(1,2),(1,3),(2,2),(3,1)))
* 2.把本地集合转化为RDD
* val rdd1 = sc.makeRDD(List((1,1),(1,2),(1,3),(2,2),(3,1)))
* 3. 调用SparkContext.textFile()方法,从外部存储中读取数据来创建 RDD
* val lineRDD = sc.textFile("d:/test.txt")
*/
Map
object Map {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("MapTest")
conf.setMaster("local")
// 设置Spark上下文,很重要
val sc = new SparkContext(conf)
/**
* 1.把本地集合转化为RDD
* val rdd = sc.parallelize(List((1,1),(1,2),(1,3),(2,2),(3,1)))
* 2.把本地集合转化为RDD
* val rdd1 = sc.makeRDD(List((1,1),(1,2),(1,3),(2,2),(3,1)))
* 3. 调用SparkContext.textFile()方法,从外部存储中读取数据来创建 RDD
* val lineRDD = sc.textFile("d:/test.txt")
*/
val lineRdd = sc.textFile("d:/test.txt")
lineRdd.map(x => x + "_经过Map处理").foreach(println)
sc.stop()
}
}
执行结果如下
helli a_经过Map处理
helli b_经过Map处理
helli c_经过Map处理
helli d_经过Map处理
helli e_经过Map处理
filter
测试代码如下:
object filtet {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("filter")
val sc = new SparkContext(conf)
/**
* 从本地集合1-10个数字导入RDD
* 在SC的filter方法得到偶数
* 然后打印
*/
var rdd = sc.makeRDD(1 to 10)
rdd.filter(x => x % 2 == 0).foreach(println)
sc.stop()
}
}
输出结果如下
2
4
6
8
10
flatMap和reduceByKey
我们用flatMap写一个简单的Word Count
object WCSpark {
def main(args: Array[String]): Unit = {
//创建配置对象
val conf = new SparkConf()
//设置App的名称 有啥用? 方便在监控页面找到 MR-》Yarn 8088
conf.setAppName("WCSpark")
//设置Spark的运行模式 local本地运行 用于测试环境
conf.setMaster("local")
//创建Spark上下文 他是通往集群的唯一通道
val sc = new SparkContext(conf)
/**
* 处理数据 在SparkCore中一切得计算都是基于RDD
* R(Resilient)D(Distributed )D(Dataset)
* RDD 弹性分布式数据集
*/
val lineRDD = sc.textFile("d:/test.txt")
//基于lineRDD中的数据 进行分词
val wordRDD = lineRDD.flatMap { _.split(" ") }
//每一个单词计数为1 pairRDD K:word V:1
val pairRDD = wordRDD.map { (_,1) }
//相同的单词进行分组,对组内的数据进行累加
//restRDD K:word V:count
val restRDD = pairRDD.reduceByKey((v1,v2)=>v1+v2)
/**
* 根据单词出现的次数来排序
* sortByKey 根据key来排序
* sortBy
*/
// restRDD
// .map(_.swap)
// .sortByKey(false)
// .map(_.swap)
// .foreach(println)
//释放资源
sc.stop()
}
}
执行结果:
(helli,5)
(d,1)
(e,1)
(a,1)
(b,1)
(c,1)
sample
测试代码如下
object sample {
def main(args: Array[String]): Unit = {
var conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("sampleTest")
var sc = new SparkContext(conf)
/**
* Boolean withReplacement : 抽样的方式 true放回式抽样 false 不放回式抽样
* Double fraction:抽样比例
* Long seed:随机算法的初始值
*/
var testRdd = sc.parallelize(1 to 10000)
val l = testRdd.sample(true,0.2).count()
println(l)
sc.stop()
}
}
执行结果:
因为是随机算法,所以并不一定严格按照比例
2011
union
测试代码
object union {
def main(args: Array[String]): Unit = {
var conf = new SparkConf()
conf.setAppName("union")
conf.setMaster("local")
var sc = new SparkContext(conf)
var rdd1 = sc.parallelize(1 to 3,2)
var rdd2 = sc.makeRDD(100 to 103,2)
var resultRdd = rdd1.union(rdd2)
resultRdd.foreach(println)
println("rdd1有2个分区")
println("rdd2有2个分区")
println("那么他们union之后新的rdd有几个分区?")
println(resultRdd.getNumPartitions)
sc.stop()
}
}
}
}
执行结果:
1
2
3
100
101
102
103
rdd1有2个分区
rdd2有2个分区
那么他们union之后新的rdd有几个分区?
4
GroupByKey
object GroupByKey {
def main(args: Array[String]): Unit = {
// 获取配置对象
val conf = new SparkConf()
// 设置工作的名字
conf.setAppName("WCSpark")
// 设置运行模式
conf.setMaster("local")
// 设置Spark上下文,很重要
val sc = new SparkContext(conf)
/**
* 1.把本地集合转化为RDD
* val rdd = sc.parallelize(List((1,1),(1,2),(1,3),(2,2),(3,1)))
* 2.把本地集合转化为RDD
* val rdd1 = sc.makeRDD(List((1,1),(1,2),(1,3),(2,2),(3,1)))
* 3. 调用SparkContext.textFile()方法,从外部存储中读取数据来创建 RDD
* val lineRDD = sc.textFile("d:/test.txt")
*/
// 把本地集合转化为RDD
val rdd = sc.parallelize(List((1,1),(1,2),(1,3),(2,2),(3,1)))
// 根据key值分组
val groupRDD = rdd.groupByKey()
// 对相同key值的Value进行操作
groupRDD.foreach(x=>{
val key = x._1
val values = x._2
val iterator = values.iterator
var sum = 0
while(iterator.hasNext){
sum += iterator.next()
}
println("key:" + key + "\tsum: " + sum)
})
rdd.reduceByKey(_+_).foreach(println)
sc.stop()
}
}
执行结果
(1,6)
(3,1)
(2,2)
join
他就是内连接,其实还有外连接
- leftOuterJoin;左外连接
- rightOuterJoin;右外连接
- fullOuterJoin;全外连接
测试代码:
object join {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("join")
val sc = new SparkContext(conf)
val nameRdd = sc.makeRDD(Array((1,"Angelababy"),(2,"Dilireba"),(3,"gulinazha")))
val facePowerRDD = sc.parallelize(List((1,99),(2,98),(3,97),(3,97),(4,59)))
println("内连接,进行连接的两个rdd的顺序关系到结果的Value值中K-V的顺序。例如:")
nameRdd.join(facePowerRDD).foreach(println)
facePowerRDD.join(nameRdd).foreach(println)
println("如下,是三个外连接")
val leftOuterJoinRDD = nameRdd.leftOuterJoin(facePowerRDD)
leftOuterJoinRDD.foreach(println)
val rightOuterJoinRDD = nameRdd.rightOuterJoin(facePowerRDD)
rightOuterJoinRDD.foreach(println)
val fullOuterJoinRDD = nameRdd.fullOuterJoin(facePowerRDD)
fullOuterJoinRDD.foreach(println)
sc.stop()
}
}
执行结果
内连接,进行连接的两个rdd的顺序关系到结果的Value值中K-V的顺序。例如:
(1,(Angelababy,99))
(3,(gulinazha,97))
(3,(gulinazha,97))
(2,(Dilireba,98))
(1,(99,Angelababy))
(3,(97,gulinazha))
(3,(97,gulinazha))
(2,(98,Dilireba))
如下,是三个外连接
(1,(Angelababy,Some(99)))
(3,(gulinazha,Some(97)))
(3,(gulinazha,Some(97)))
(2,(Dilireba,Some(98)))
(4,(None,59))
(1,(Some(Angelababy),99))
(3,(Some(gulinazha),97))
(3,(Some(gulinazha),97))
(2,(Some(Dilireba),98))
(4,(None,Some(59)))
(1,(Some(Angelababy),Some(99)))
(3,(Some(gulinazha),Some(97)))
(3,(Some(gulinazha),Some(97)))
(2,(Some(Dilireba),Some(98)))
Action(执行)算子
- collect()
在Driver的程序中,以数组的形式,返回数据集的所有元素。这通常会在使用filter或者其它操作后,返回一个足够小的数据子集再使用,直接将整个RDD集Collect返回,很可能会让Driver程序OOM - count()
返回数据集的元素个数 - foreach(func)
在数据集的每一个元素上,运行函数func。这通常用于更新一个累加器变量,或者和外部存储系统做交互 - reduce(func)
通过函数func聚集数据集中的所有元素。Func函数接受2个参数,返回一个值。这个函数必须是关联性的,确保可以被正确的并发执行
如何鉴别Action算子和Transformation算子
看返回值,若返回RDD则是Transformation算子
控制算子
控制算法的提出
请看伪代码
RDD是一个抽象的概念、并不是真实的存储数据
Applicationlines = sc.textFile( "htd://.. )
errors = lines.filter( .startsWith( "ERROR" ))
Mysql_ errors = errors.fiter( .contain( "MySQL" ))
Mysql_ errors.count //job0 -个action类算子 就会触发一个job执行 action类算子个数=job个数
http. errors = rrs.filter( .contain( "Http"))
http_ errors .count //job1
每当遇到一个count(即Action算子)就会执行一次job任务,对Mysql_ errors执行job任务,但是Mysql_ errors无数据,所以去找他的依赖 errors,但是 errors也没有数据所以去找他的依赖,知道找到最前面的hdfs中的源数据,从此开始计算,得到最后进行job的数据。
当第二个job工作时还是什么数据也没有,要和第一个job的流程一样,所以这样很慢,所以我们可以使用控制算子,将第一个job执行后的数据放入内存,第二个以及以后的任务可以直接从内存中拿到数据,不必再去HDFS中重新计算。
这里有一个注意点,只有当第一个count出发执行时,才会向内存中写数据,但是数据是一条一条执行往内存中写的,所以第一个job的文件的数据来源是来自于HDFS中的(不是先写入内存中,才从内存中拿出来,这样不如直接从HDFS中拿取快),直到第一个count执行完成之后,内存中才会写完完成的数据.
注意点
- 控制类算子都是懒执行的,需要一个action类算子触发执行
- 控制类算子后面不能立即紧跟action类算子(rdd.cache().count()?
- cache和persist算子的返回值可以赋值给一个变量,在其他job中直接使用这个变量就是使用持久化的数据了。持久化的单位是partition。
问题
- 如果你写的Application中只有一个action类算子,那么有没有必要使用控制类算子?
没有,因为必须等一个action算子执行完后才会往内存中(获其他)写好数据,第一个job是用不到这些数据的 - 如果持久化的级别选择的是MEMORY_ONLY|cache,如果内存不足,会不会OOM?
不会,内存不足,能存多少就存多少,不会报错。剩下存不下的就不存了,等到时候用的时候会根据依赖关系重新计算。
在存储的时候,存储单元是partition - 如果持久化的级别是MEMOPRY_AND_DISK,会不会将一个partition的数据一部分存储到内存一部分在磁盘?
存储单元是partition
测试代码
object CacheTest {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setAppName("WCSpark")
conf.setMaster("local")
val sc = new SparkContext(conf)
var rdd = sc.textFile("d:/wc.txt")
//textFile方法去wc.txt中读来的数据 一边会存储到内存中,一边进行count计算
val rdd1 = rdd.cache() //cache = persist(StorageLevel.MEMORY_ONLY)
// rdd = rdd.persist(StorageLevel.MEMORY_ONLY)
/**
* 控制类算子注意点:
* 1、控制类算子都是懒执行的,需要一个action类算子触发执行
* 2、控制类算子后面不能立即紧跟action类算子
*
* 问题:
* 1、如果你写的Application中只有一个action类算子,那么有没有必要使用控制类算子?
* 没有
* 2、如果持久化的级别选择的是MEMORY_ONLY|cache,如果内存不足,会不会OOM?
* 不会,内存不足,能存多少就存多少,不会报错。剩下存不下的就不存了
* 在存储的时候,存储单元是partition
* 3、 如果持久化的级别是MEMOPRY_AND_DISK,会不会将一个partition的数据一部分存储到内存一部分在磁盘?
* 存储单元是partition
*/
val startTime = System.currentTimeMillis()
var count = rdd.count() //job0
val endTime = System.currentTimeMillis()
println("count:" + count + "\tduration:" + (endTime-startTime) + "ms")
val startTime1 = System.currentTimeMillis()
count = rdd1.count() //job1 第二个job在执行时候,直接从内存中拿rdd的数据,就不需要从磁盘中去读了
val endTime1 = System.currentTimeMillis()
println("count:" + count + "\tduration:" + (endTime1-startTime1) + "ms")
rdd.unpersist()
sc.stop()
}
}
常用的控制算子
- cache()
- persist()
- checkpoint()
- unpersist()
以上1,2,3算子都可以将RDD持久化,持久化的最小单位是partition。其中【cache () = persist()=persist(StorageLevel.Memory_Only)】
4算子是解除持久化。
Spark的任务监控是端口4040
如图,各个字段的意义
persist()持久化的级别
我们来看Spark的源代码:
可以得到cache=persist(StorageLevel.MEMORY_ONLY)