Spark核心RDD及相关算子
目录
6.cache ,persist ,checkpoint 区别
1. RDD是什么
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。
- Dataset: 就是一个集合,存储很多数据。
- Distributed:它内部的元素进行了分布式存储,方便于后期进行分布式计算。
- Resilient: 表示弹性,rdd的数据是可以保存在内存或者是磁盘中。
2. RDD的五大属性
(1)RDD是由一系列的partition组成的。
数据集的基本组成单位;spark中任务是以task线程的方式运行, 一个分区就对应一个task线程。
用户可以在创建RDD时指定RDD的分区个数,如果没有指定,那么就会采用默认值(默认值2)。
(2)函数是作用在每一个partition(split)上的。
(3)RDD之间有一系列的依赖关系。
spark任务的容错机制就是根据这个特性(血统)而来。
(4)分区器是作用在K,V格式的RDD上。
当前Spark中实现了两种类型的分区函数,一个是基于哈希的HashPartitioner,(key.hashcode % 分区数= 分区号);
另外一个是基于范围的RangePartitioner。
只有对于key-value的RDD(RDD[(String, Int)]),并且产生shuffle,才会有Partitioner;
非key-value的RDD(RDD[String])的Parititioner的值是None。
(5)RDD提供一系列最佳的计算位置。
涉及到数据的本地性,数据块位置最优。
spark任务在调度的时候会优先考虑对存储有数据的节点开启计算任务,减少数据的网络传输,提升计算效率。
3.RDD图及相关理解
(1)textFile方法底层封装的是MR读取文件的方式,读取文件之前先split,默认split大小是一个block大小。
(2)RDD实际上不存储数据;partition(存的是计算逻辑)也是不存数据的;
(3)什么是K,V格式的RDD?
如果RDD里面存储的数据都是二元组对象,那么这个RDD我们就叫做K,V格式的RDD。
(4)哪里体现RDD的弹性(容错)?
partition数量,大小没有限制,体现了RDD的弹性。
RDD之间依赖关系,可以基于上一个RDD重新计算出RDD。
(5)哪里体现RDD的分布式?
RDD是由Partition组成,partition是分布在不同节点上的。
RDD提供计算最佳位置,体现了数据本地化。体现了大数据中“计算移动数据不移动”的理念。
(6)RDD中最小的单元是partition;
4.RDD的创建方式(共三种)
(1)通过已经存在的scala集合去构建(少用)
rdd3=sc.makeRDD(List(1,2,3,4))
(2)加载外部的数据源去构建
rdd1=sc.textFile("/words.txt")
(3)从已经存在的rdd进行转换生成一个新的rdd
rdd2=rdd1.flatMap(_.split(" "))
val rdd3=rdd2.map((_,1))
5.RDD的算子分类
(1)transformation转换算子
根据已经存在的rdd转换生成一个新的rdd, 它是延迟加载,它不会立即执行;
<1> map
将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素;
特点:输入一条,输出一条数据
<2> flatMap
扁平化输出:先map后flat;与map类似,每个输入项可以映射为0到多个输出项
<3> filter
过滤符合条件的记录数,true保留,false过滤掉
<4> reduceByKey
将相同的Key根据相应的逻辑进行处理;
与groupByKey的区别:当调用 groupByKey时,所有的键值对(key-value pair) 都会被移动,在网络上传输这些数据非常没必要,因此避免使用 GroupByKey;
<5> sample
随机抽样算子,根据传进去的小数按比例进行又放回或者无放回的抽样;
<6> sortByKey/sortBy
作用在K,V格式的RDD上,对key进行升序或者降序排序
<7> join,leftOuterJoin,rightOuterJoin,fullOuterJoin
作用在K,V格式的RDD上。根据K进行连接,对(K,V)join(K,W)返回(K,(V,W));
join后的分区数与父RDD分区数多的那一个相同。
<8> union
合并两个数据集。两个数据集的类型要一致。返回新的RDD的分区数是合并RDD分区数的总和。
<9> intersection
取两个数据集的交集,返回新的RDD与父RDD分区多的一致
<10> subtract
取两个数据集的差集,结果RDD的分区数与subtract前面的RDD的分区数一致。
<11> mapPartitions
与map类似,遍历的单位是每个partition上的数据。
<12> distinct(map+reduceByKey+map)
<13> cogroup
当调用类型(K,V)和(K,W)的数据上时,返回一个数据集(K,(Iterable<V>,Iterable<W>)),子RDD的分区与父RDD多的一致。
(2)action 行动算子
它会真正触发任务的运行;将rdd的计算的结果数据返回给Driver端,或者是保存结果数据到外部存储介质中。
在Spark中有一个action算子,就有一个job。
<1> count
返回数据集中的元素数。会在结果计算完成后回收到Driver端;
<2> take(num)
返回一个包含数据集前n个元素的集合;
<3> first
first=take(1),返回数据集中的第一个元素;
<4> foreach
循环遍历数据集中的每个元素,运行相应的逻辑;
<5> collect
将计算结果回收到Driver端
<6> mapPartitions
一次获取的是一个分区的数据(hdfs);
正常情况下,mapPartitions 是一个高性能的算子;因为每次处理的是一个分区的数据,减少了去获取数据的次数;但是如果我们的分区如果设置得不合理,有可能导致每个分区里面的数据量过大。
<8> mapPartitionsWithIndex
每次获取和处理的就是一个分区的数据,并且知道处理的分区的分区号是啥
<9> union
当要将两个RDD合并时,便要用到union和join,其中union只是简单的将两个RDD累加起来,可以看做List的addAll方法。就想List中一样,当使用union及join时,必须保证两个RDD的泛型是一致的;
返回新的RDD的分区数是合并RDD分区数的总和
<10> groupByKey
groupBy是将RDD中的元素进行分组,组名是call方法中的返回值,而顾名思义groupByKey是将PairRDD中拥有相同key值得元素归为一组
<11> join
join是将两个PairRDD合并,并将有相同key的元素分为一组,可以理解为groupByKey和Union的结合;
join后的分区数与父RDD分区数多的那一个相同
<12> cogroup
对两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。与reduceByKey不同的是针对两个RDD中相同的key的元素进行合并
<13> foreachPartition
遍历的数据是每个partition的数据。
(3)持久化算子
持久化算子有三种,cache,persist,checkpoint,以上算子都可以将RDD持久化,持久化的单位是partition。cache和persist都是懒执行的。必须有一个action类算子触发执行。checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系。
<1> cache()
默认将数据存储在内存中,懒执行;底层还是调用的persist;
cache()=persist()=persisit(StroageLevel.MEMORY_ONLY)
<2> persist()
1)可以手动指定持久化的级别(共12种);
2)常用级别:
MEMORY_ONLY
MEMORY_ONLY_SER:序列化占空间小,但使用时需要反序列化
MEMORY_AND_DISK:先往内存放,内存不够再往磁盘放
MEMORY_AND_DISK_SER
3)尽量避免使用"_2"和DISK_ONLY级别:因为浪费空间
<3>checkpoint()
当RDD计算逻辑复杂,lineage非常长时,可以对RDD进行checkpoint,默认将数据持久化到指定目录中(HDFS中分布式的持久化)。
checkpoint将RDD持久化到磁盘,还可以切断RDD之间的依赖关系。checkpoint目录数据当application执行完之后不会被清除。
- checkpoint 的执行原理:
- 当RDD的job执行完毕后,会从finalRDD从后往前回溯。
- 当回溯到某一个RDD调用了checkpoint方法,会对当前的RDD做一个标记。
- Spark框架会自动启动一个新的job,重新计算这个RDD的数据,将数据持久化到HDFS上。
- 优化:对RDD执行checkpoint之前,最好对这个RDD先执行cache,这样新启动的job只需要将内存中的数据拷贝到HDFS上就可以,省去了重新计算这一步。
6.cache ,persist ,checkpoint 区别
cache 与persist:
1)cache()和persist()都是懒执行,需要action算子触发,最小持久化的单位是partition;
2)cache和persist算子的返回值可以赋值给一个变量,在其他job中直接使用这个变量就是持久化的数据了;
3)若使用第二种方式,后面不能紧跟action算子。
4)cache和persist算子持久化的数据当application执行完成之后会被自动清除;
5)cache默认将RDD数据持久化到内存中;而persist可以指定持久化的级别(12种),最常用的是MEMORY_ONLY和MEMORY_AND_DISK
checkpoint:
1)checkpoint将RDD持久化到磁盘;
2)checkpoint目录数据当application执行完成之后不会被清除;
3)checkpoint执行过程中会切断RDD之间的依赖关系,可以基于checkpoint的目录进行数据的恢复;
4)checkpoint不常用于持久化RDD,常用于保存元数据,而不是持久RDD。
persist(StorageLevel.DISK_ONLY) 与 checkpoint:
1)前者虽然可以将 RDD 的 partition 持久化到磁盘,但该 partition 由 blockManager 管理。一旦 driver program 执行结束,也就是 executor 所在进程 CoarseGrainedExecutorBackend stop,blockManager 也会 stop,被 cache 到磁盘上的 RDD 也会被清空(整个 blockManager 使用的 local 文件夹被删除)。
2)而 checkpoint 将 RDD 持久化到 HDFS 或本地文件夹,如果不被手动 remove 掉( 话说怎么 remove checkpoint 过的 RDD? ),是一直存在的,也就是说可以被下一个 driver program 使用,而 cached RDD 不能被其他 dirver program 使用。