【Spark的那些事儿】论RDD算子的重要性
Spark的那些事中前后文章关联比较紧密,没有阅读第一篇的可以关注公众号查找Spark的那些事RDD篇。
上一篇已经介绍完RDD结构。虽然RDD结构是spark设计思想最重要的组成,但是没有辅助的功能只有结构并不能独立使用。真正使RDD完成计算优化的,就是今天我们要讲到的spark RDD的另一个重要组成部分RDD算子。
-
RDD算子的定义
-
RDD算子在spark中的实现
-
RDD算子的分类
-
RDD算子的使用
RDD算子的定义
用来生成或处理RDD的方法叫做RDD算子。RDD算子就是一些方法,用来构建RDD及数据之间的关系。算子可以把数据转换成RDD,也可以由RDD产生新RDD,或者将RDD持久化到磁盘或内存。
从技术角度讲RDD算子可能比较枯燥,我们举个生活学习中的例子来类比RDD算子的作用。
完成计算需要什么呢?
需要数据载体和运算方式。数据载体可以是数字,数组,集合,分区,矩阵等。一个普通的计算器,它的运算单位是数字,而运算符号是加减乘除,这样就可以得到结果并输出了。一个矩阵通过加减乘除也可以得到结果,但是结果跟计算器的加减乘除一样吗?非也!
矩阵相乘的运算法则:
AB的行数 = A的行数;AB的列数 = B的列数。
A与B的先后次序不能改变。
假设A是m行,B是n列。
AB的结果是一个m*n 大小的矩阵。
所以说加减乘除在不同的计算框架作用是不同的,而加减乘除这样的符号就是运算方式。在spark计算框架有自己的运算单位(RDD)和自己的运算符(RDD算子)。
可以理解为:RDD(或数据)算子 RDD 算子 = RDD(或数据)
是不是很抽象?下面来点具体的。
RDD算子在spark中的实现
Action算子的作用很单一,主要是整合数据,比如collect()返回RDD所有元素,count()返回RDD中元素的个数,当然其主要的作用是执行了runjob方法提交了作业。
下面我们主要分析一下Transformation RDD算子的源码结构:
上图划分为4个大块,我们从上到下顺序说起:
-
上一篇中RDD结构中有一个属性是Dependency,生成一个新的RDD必须为Dependency赋值。它被两个类继承,NarrowDependency(窄依赖)和ShuffleDependency(宽依赖)。窄依赖又分onetoonedependency和rangedependency,这是窄依赖提供的2种抽样方式1对1数据抽样和平衡数据抽样,返回值是一个partitonid的list集合。在第一层中只描述了RDD之间的依赖关系并没有计算逻辑,也就是在算子生成新RDD时,与父RDD的依赖关系在这一层定义。
-
第二层,是提供RDD底层计算的基本算法,继承了RDD,并实现了dependency的一种或多种依赖关系的计算逻辑。
-
最下层,利用第二层的基本算法,实现RDD所有的Transfromation算子。提供给用户spark API。
-
右边的泛型,是scala的一种类型,可以理解为类的泛型,泛指编译时被抽象的类型。Spark利用scala的这一特性把依赖关系抽象成一种泛型结构,并不需要真实的数据类型参与编译过程。编译的结构类由序列化和反序列化到集群的计算节点取数并计算。
我们用RDD.map()算子的实现来举例:
当我们使用RDD.map算子创建新的RDD,map首先创建一个MapPartitionsRDD,而MapPartitionsRDD类继承RDD类,并为dependency赋值为OneToOneDependency的依赖关系。这样2个RDD的关系就建立完成了。
以上4点是不是很难理?spark为什么不提供第二层给用户开放API?
假设RDD是基本变量x,y,上图的第二部分是加减乘除,第三部分是由加减乘除构成的公式。如:
x2+y2+2xy=(x+y)2
图第一部分相当于定义了加减乘除。
图第二部分实现了x+y,x-y,x*y,x/y的运算
图第三部分实现了x2+y2+2xy
结果被第四部抽象成公式,在计算中优化为(x+y)2
这样本来需要运算5次的公式x2+y2+2xy,被优化成2次计算(x+y)2
所以spark API只开放了第三部分,也就是公式,作为一个使用者,我们不需要关心x2+y2+2xy=(x+y)2是什么原理,我们只需要调用x2+y2+2xy,这个优秀的计算框架会帮助你。
注:这个举例只是为了方便大家理解spark框架的设计思想,实际spark的算子抽象并不是加减乘除和公式的抽象。
RDD算子的分类
算子从否触发job的角度划分,可以分为Transformation算子和Action算子,Transformation算子不会产生job,是惰性算子,只记录该算子产生的RDD及父RDD partiton之间的关系,而Action算子将触发job,完成依赖关系的所有计算操作。
那么如果一个程序里有多个action算子怎么办?为什么有的算子返回的不是RDD类型?
当有多个action时,顺序完成action操作,每个action算子产生一个job,上一job的结果转换成RDD,继续给后续的action使用。
多数action返回结果都不是RDD,而transformation算子的返回结果都是RDD,但可能是多个RDD(如:randomSplit,将一个RDD切分成多个RDD)。注:下文讲到哪些属于action算子,它们的返回结果不一定是RDD,需要转换成RDD才能继续使用。
下面给出一些算子的使用供大家理解:
操作类型 |
函数名 |
作用 |
转化操作 |
map() |
参数是函数,函数应用于RDD每一个元素,返回值是新的RDD |
flatMap() |
参数是函数,函数应用于RDD每一个元素,将元素数据进行拆分,变成迭代器,返回值是新的RDD |
|
filter() |
参数是函数,函数会过滤掉不符合条件的元素,返回值是新的RDD |
|
distinct() |
没有参数,将RDD里的元素进行去重操作 |
|
union() |
参数是RDD,生成包含两个RDD所有元素的新RDD |
|
intersection() |
参数是RDD,求出两个RDD的共同元素 |
|
subtract() |
参数是RDD,将原RDD里和参数RDD里相同的元素去掉 |
|
cartesian() |
参数是RDD,求两个RDD的笛卡儿积 |
|
行动操作 |
collect() |
返回RDD所有元素 |
count() |
RDD里元素个数 |
|
countByValue() |
各元素在RDD中出现次数 |
|
reduce() |
并行整合所有RDD数据,例如求和操作 |
|
fold(0)(func) |
和reduce功能一样,不过fold带有初始值 |
|
aggregate(0)(seqOp,combop) |
和reduce功能一样,但是返回的RDD数据类型和原RDD不一样 |
|
foreach(func) |
对RDD每个元素都是使用特定函数 |
转化算子:这类转换并不触发提交作业,完成作业中间过程处理。Transformation按照数据类型又分为两种,value数据类型算子和key-value数据类型算子。
1) Value数据类型的Transformation算子
map,flatMap,mapPartitions,glom,union,cartesian,groupBy,filter,distinct,subtract,sample,takeSample
2)Key-Value数据类型的Transfromation算子
mapValues,combineByKey,reduceByKey,partitionBy,cogroup,join,leftOuterJoin和rightOuterJoin
行动算子: 这类算子会触发SparkContext提交Job作业。Action算子是用来整合和输出数据的,主要包括以下几种:
foreach,HDFS,saveAsTextFile,saveAsObjectFile, collect,collectAsMap,reduceByKeyLocally,lookup,count,top,reduce,fold,aggregate
存储算子:改变当前RDD的存储策略(下文有详解)。如:persist,cache 和checkpoint 。
注:上述举例算子只是一部分,随着spark的更新也会不断有新的算子加入其中。
RDD算子的使用
Spark算子非常丰富,有几十个,开发者把算子组合使用,从一个基础的RDD计算出想要的结果。并且算子是优化Spark计算框架的主要依据。
我们以top算子举例,rdd.top(n)获取RDD的前n个排序后的结果。
例如计算:(文件a)的2倍与(文件b)的TOP 3的结果。
-
窄依赖优化:如图中的RDD1,RDD2,RDD3在Stage3中被优化为RDD1到RDD3直接计算。是否可以直接计算是由算子的宽窄依赖决定,推荐使用数据流向区分宽窄依赖: partiton流向子RDD的多个partiton属于宽依赖,父RDD的partiton流向子RDD一个partiton或多个partiton流向一个子RDD的partiton属于窄依赖。上图中的RDD3和RDD4做top(3)操作,top是先排序后取出前3个值,排序过程属于宽依赖,spark计算过程是逆向的DAG(DAG和拓扑排序下一篇介绍),RDD5不能直接计算,必须等待依赖的RDD完成计算,我把这种算子叫做不可优化算子(计算流程不可优化,必须等待父RDD的完成),Action算子(后文讲解)都是不可优化算子,Transformation算子也有很多不可优化的算子(宽依赖算子),如:groupbykey,reducebykey,cogroup,join等。
-
数据量优化:上图中的a文件数据乘2,为什么前面有一个filter,假设filter过滤后的数据减少到三分之一,那么对后续RDD和shuffle的操作优化可想而知。而这只是提供一个思路,并不是所有的过滤都是高效的。
-
利用存储算子优化Lineage:RDD算子中除了save(输出结果)算子之外,还有几个比较特别的算子,用来保存中间结果的,如:persist,cache 和checkpoint ,当RDD的数据保持不变并被复用多次的时候可以用它们临时保存计算结果。
1). cache和persist
修改当前RDD的存储方案StorageLevel,默认状态下与persist级别是一样的MEMORY_ONLY级别,保存到内存,内存不足选择磁盘。
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
def cache(): this.type = persist()
这2个方法都不会触发任务,只是修改了RDD的存储方案,当RDD被执行的时候按照方案存储到相应位置。而checkpoint会单独执行一个job,并把数据写入磁盘。
注:不要把RDD cache和Dataframe cache混淆。Dataframe cache将在spark sql中介绍。
2).checkpoint
检查RDD是否被物化或计算,一般在程序运行比较长或者计算量大的情况下,需要进行Checkpoint。这样可以避免在运行中出现异常导致RDD回溯代价过大的问题。Checkpoint会把数据写在本地磁盘上。Checkpoint的数据可以被同一session的多个job共用。
本期RDD算子讲解结束,与第一篇RDD有很多互补的地方,除了源码解读也包括了从个人的角度理解spark,当然每个人的角度不同,对spark计算框架的理解也不一样,若有表达歧义或错漏还请谅解,此理论仅为引导,您的实践才是检验真理的唯一标准。后续我们会继续聊一聊spark的其他事儿,还请关注,后会有期!