Spark核心RDD及相关算子

目录

1. RDD是什么

2. RDD的五大属性

3.RDD图及相关理解

 4.RDD的创建方式(共三种)

5.RDD的算子分类

6.cache ,persist ,checkpoint 区别


1. RDD是什么

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变可分区、里面的元素可并行计算的集合。

  • Dataset: 就是一个集合,存储很多数据。
  • Distributed:它内部的元素进行了分布式存储,方便于后期进行分布式计算。
  • Resilient: 表示弹性,rdd的数据是可以保存在内存或者是磁盘中。

2. RDD的五大属性

Spark核心RDD及相关算子

(1)RDD是由一系列的partition组成的。

数据集的基本组成单位;spark中任务是以task线程的方式运行, 一个分区就对应一个task线程。

用户可以在创建RDD时指定RDD的分区个数,如果没有指定,那么就会采用默认值(默认值2)。

Spark核心RDD及相关算子

Spark核心RDD及相关算子

(2)函数是作用在每一个partition(split)上的。

(3)RDD之间有一系列的依赖关系。

spark任务的容错机制就是根据这个特性(血统)而来。

(4)分区器是作用在K,V格式的RDD上。

当前Spark中实现了两种类型的分区函数,一个是基于哈希的HashPartitioner,(key.hashcode % 分区数= 分区号);
另外一个是基于范围的RangePartitioner。
只有对于key-value的RDD(RDD[(String, Int)]),并且产生shuffle,才会有Partitioner;
非key-value的RDD(RDD[String])的Parititioner的值是None。

(5)RDD提供一系列最佳的计算位置。

涉及到数据的本地性,数据块位置最优。
spark任务在调度的时候会优先考虑对存储有数据的节点开启计算任务减少数据的网络传输,提升计算效率。

3.RDD图及相关理解

Spark核心RDD及相关算子

(1)textFile方法底层封装的是MR读取文件的方式,读取文件之前先split,默认split大小是一个block大小

(2)RDD实际上不存储数据;partition(存的是计算逻辑)也是不存数据的;

(3)什么是K,V格式的RDD?

如果RDD里面存储的数据都是二元组对象,那么这个RDD我们就叫做K,V格式的RDD。

(4)哪里体现RDD的弹性(容错)?

partition数量,大小没有限制,体现了RDD的弹性。

RDD之间依赖关系,可以基于上一个RDD重新计算出RDD。

(5)哪里体现RDD的分布式?

RDD是由Partition组成,partition是分布在不同节点上的。

RDD提供计算最佳位置,体现了数据本地化。体现了大数据中“计算移动数据不移动”的理念。

(6)RDD中最小的单元是partition;

 4.RDD的创建方式(共三种)

(1)通过已经存在的scala集合去构建(少用)

 rdd3=sc.makeRDD(List(1,2,3,4))

(2)加载外部的数据源去构建

 rdd1=sc.textFile("/words.txt")

(3)从已经存在的rdd进行转换生成一个新的rdd

 rdd2=rdd1.flatMap(_.split(" "))
val rdd3=rdd2.map((_,1))

5.RDD的算子分类

(1)transformation转换算子

根据已经存在的rdd转换生成一个新的rdd, 它是延迟加载它不会立即执行

<1> map

将一个RDD中的每个数据项,通过map中的函数映射变为一个新的元素;

特点:输入一条,输出一条数据

<2> flatMap

扁平化输出:先map后flat;与map类似,每个输入项可以映射为0到多个输出项

<3> filter

过滤符合条件的记录数,true保留,false过滤掉

<4> reduceByKey

将相同的Key根据相应的逻辑进行处理;

与groupByKey的区别:当调用 groupByKey时,所有的键值对(key-value pair) 都会被移动,在网络上传输这些数据非常没必要,因此避免使用 GroupByKey;

<5> sample

随机抽样算子,根据传进去的小数按比例进行又放回或者无放回的抽样;

<6> sortByKey/sortBy

作用在K,V格式的RDD上,对key进行升序或者降序排序

<7> join,leftOuterJoin,rightOuterJoin,fullOuterJoin

作用在K,V格式的RDD上。根据K进行连接,对(K,V)join(K,W)返回(K,(V,W));

            join后的分区数与父RDD分区数多的那一个相同。

<8> union

合并两个数据集。两个数据集的类型要一致。返回新的RDD的分区数是合并RDD分区数的总和。

<9> intersection

取两个数据集的交集,返回新的RDD与父RDD分区多的一致

<10> subtract

取两个数据集的差集,结果RDD的分区数与subtract前面的RDD的分区数一致。

<11> mapPartitions

与map类似,遍历的单位是每个partition上的数据。

<12> distinct(map+reduceByKey+map)

<13> cogroup

当调用类型(K,V)和(K,W)的数据上时,返回一个数据集(K,(Iterable<V>,Iterable<W>)),子RDD的分区与父RDD多的一致。

(2)action 行动算子

它会真正触发任务的运行;将rdd的计算的结果数据返回给Driver端,或者是保存结果数据到外部存储介质中。

在Spark中有一个action算子,就有一个job。

<1> count

返回数据集中的元素数。会在结果计算完成后回收到Driver端;

<2> take(num)

返回一个包含数据集前n个元素的集合;

<3> first

first=take(1),返回数据集中的第一个元素;

<4> foreach

循环遍历数据集中的每个元素,运行相应的逻辑;

<5> collect

将计算结果回收到Driver端

<6> mapPartitions

一次获取的是一个分区的数据(hdfs);

正常情况下,mapPartitions 是一个高性能的算子;因为每次处理的是一个分区的数据,减少了去获取数据的次数;但是如果我们的分区如果设置得不合理,有可能导致每个分区里面的数据量过大。

<8> mapPartitionsWithIndex

每次获取和处理的就是一个分区的数据,并且知道处理的分区的分区号是啥

<9> union

当要将两个RDD合并时,便要用到union和join,其中union只是简单的将两个RDD累加起来,可以看做List的addAll方法。就想List中一样,当使用union及join时,必须保证两个RDD的泛型是一致的;

返回新的RDD的分区数是合并RDD分区数的总和

<10> groupByKey

groupBy是将RDD中的元素进行分组,组名是call方法中的返回值,而顾名思义groupByKey是将PairRDD中拥有相同key值得元素归为一组

<11> join

join是将两个PairRDD合并,并将有相同key的元素分为一组,可以理解为groupByKey和Union的结合;

join后的分区数与父RDD分区数多的那一个相同

<12> cogroup

对两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。与reduceByKey不同的是针对两个RDD中相同的key的元素进行合并

<13> foreachPartition

遍历的数据是每个partition的数据。

(3)持久化算子

持久化算子有三种,cache,persist,checkpoint,以上算子都可以将RDD持久化,持久化的单位是partitioncache和persist都是懒执行的。必须有一个action类算子触发执行。checkpoint算子不仅能将RDD持久化到磁盘,还能切断RDD之间的依赖关系。

<1> cache()

默认将数据存储在内存中,懒执行;底层还是调用的persist;

cache()=persist()=persisit(StroageLevel.MEMORY_ONLY)

<2> persist()

1)可以手动指定持久化的级别(共12种);

2)常用级别:

MEMORY_ONLY

MEMORY_ONLY_SER:序列化占空间小,但使用时需要反序列化

MEMORY_AND_DISK:先往内存放,内存不够再往磁盘放

MEMORY_AND_DISK_SER

3)尽量避免使用"_2"和DISK_ONLY级别:因为浪费空间

<3>checkpoint()

当RDD计算逻辑复杂,lineage非常长时,可以对RDD进行checkpoint,默认将数据持久化到指定目录中(HDFS中分布式的持久化)。

checkpoint将RDD持久化到磁盘,还可以切断RDD之间的依赖关系。checkpoint目录数据当application执行完之后不会被清除。

  • checkpoint 的执行原理:
  1. 当RDD的job执行完毕后,会从finalRDD从后往前回溯
  2. 当回溯到某一个RDD调用了checkpoint方法,会对当前的RDD做一个标记。
  3. Spark框架会自动启动一个新的job,重新计算这个RDD的数据,将数据持久化到HDFS上。
  • 优化:对RDD执行checkpoint之前,最好对这个RDD先执行cache,这样新启动的job只需要将内存中的数据拷贝到HDFS上就可以,省去了重新计算这一步。

6.cache ,persist ,checkpoint 区别

cache 与persist:

1)cache()和persist()都是懒执行需要action算子触发最小持久化的单位是partition

2)cache和persist算子的返回值可以赋值给一个变量,在其他job中直接使用这个变量就是持久化的数据了;

3)若使用第二种方式,后面不能紧跟action算子

4)cache和persist算子持久化的数据当application执行完成之后会被自动清除

5)cache默认将RDD数据持久化到内存中;而persist可以指定持久化的级别(12种),最常用的是MEMORY_ONLY和MEMORY_AND_DISK

checkpoint:

1)checkpoint将RDD持久化到磁盘

2)checkpoint目录数据当application执行完成之后不会被清除

3)checkpoint执行过程中会切断RDD之间的依赖关系,可以基于checkpoint的目录进行数据的恢复

4)checkpoint不常用于持久化RDD,常用于保存元数据,而不是持久RDD。

persist(StorageLevel.DISK_ONLY) 与 checkpoint:

1)前者虽然可以将 RDD 的 partition 持久化到磁盘,但该 partition 由 blockManager 管理一旦 driver program 执行结束,也就是 executor 所在进程 CoarseGrainedExecutorBackend stopblockManager 也会 stop被 cache 到磁盘上的 RDD 也会被清空(整个 blockManager 使用的 local 文件夹被删除)。

2)而 checkpoint 将 RDD 持久化到 HDFS 或本地文件夹如果不被手动 remove 掉( 话说怎么 remove checkpoint 过的 RDD? ),是一直存在的,也就是说可以被下一个 driver program 使用,而 cached RDD 不能被其他 dirver program 使用。