Spark RDD深入理解
通过RDD.scala源码来解读什么是RDD
Spark Github:
https://github.com/apache/spark
RDD源码:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala
让开发者大大降低了开发分布式应用程序的门槛及提高执行效率
Resilient Distributed Dataset(RDD)
- 弹性
体现在计算之上;
它代表的是Spark在分布式计算的时候可以进行容错,比如说:数据丢失了,可以通过RDD的容错机制,来进行修复;
因此准确的说,弹性在计算层面体现的尤为明显 - 分布式
数据归属在一个大的集群之上,可以拆开的存在每个机器上;
数据分布在不同的节点上
计算的时候也是一样,代码可以运行在多个节点之上 - 数据集
可以通过不同的方式进行创建
会被拆分成很多分区,进行计算
通过RDD.scala中的注释来解读RDD的概念
-
the basic abstraction in Spark
在Spark中最基本的抽象单元
在Spark每个框架都应该有个抽象的编程模型- 在Spark Core里是基于RDD进行编程
- 在Spark Streaming里是基于DStream进行编程
- 在Spark SQL里是基于SQLContext/HiveContext类似的进行编程
Represents an immutable,partitioned collection of elements that can be operated on in parallel
RDD代表了不可变的(scala中不可变额用val进行修饰)
这也就意味着RDD一旦产生,就没有办法改变
举例:
RDDA ==各种操作==> RDDB
RDDA与RDDB都是独一无二的,因为它们是不可变的,可以通过一系列的操作转换得到
集合里的元素能够被拆成各个分区:
这句话也可以理解为HDFS中的Block或者MapReduce中的InputSplit;
集合中的元素被拆成各个分区之后,数据集能够以一种并行的方式进行操作
举例:
对RDDA执行”+1”的操作,实质上是对RDDA内的所有分区(在不同机器上的)都执行了相同的操作
我们通过RDD.scala源码可以发现对RDD的定义如下:
- 抽象类
代表RDD不能直接使用,这就好比在Java中抽象类不能直接new出来使用一样
因此RDD必然是有子类实现的,我们使用时直接使用其子类即可 - Serializable(序列化)
不序列化,提交上个区会报task未被序列化的错误(Spark开发中常见的错误)
因此分布式框架中序列化的好坏直接影响了性能的好坏 - Logging
在Spark 1.6.x版本中是可以直接使用的
在2.x版本中,是被移走了 - T(泛型)
意味着可以支持各种数据类型
也说明了RDD中存储的数据类型是不确定的 - SparkContext
- @transient
考察JdbcRDD.scala的定义:
继承了RDD,直接使用RDD是不能使用的,需要去实现该抽象类
RDD五大特性
每一个RDD都会有如下的五个特点:
- A list of partitions
- A function for computing each split(split理解为partition)
对RDD做的操作,即对每个分区做一个相同的function - A list of dependencies on other RDDs
RDD之间有相互的依赖关系
RDDA ==> RDDB ==> RDDC ==> RDDD
RDD中又有多个partition构成,那么如何体现RDD的弹性呢?
举例:
RDDA中的一个partition挂掉了,把数据重新加载一下,就行了
如果RDDC中的一个partition挂掉了,倘若RDDB与RDDC之间的关系是窄依赖,那么直接通过一个转换操作就过来了
因此,当某个partition里面的数据丢失以后,Spark可以通过RDD之间的依赖关系,重新计算partition的分区数据;
【注意】不是对RDD中所有的partition的数据进行重新计算,而是通过计算出了问题的partition就可以了(前提是一个窄依赖关系) - Optionally, a Partitioner for key-value RDDs (e.g. to say that RDD is hash-partitioned)
【可选】
K-V类型RDD的partition(比如hash-partitioned) - Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
【可选】
对每一个分片机芯计算的时候,会有preferred locations
这个特性,体现了数据本地性
preferred locations:
数据在哪,就把task分配到数据所在地去运行(这样就可以大程度的避免数据的移动);
最后的s是副本的概念;
根据e.g. block locations for an HDFS file锁具的案例:HDFS在存放的时候会有多个Block,每个Block又会有它的副本,因此多出来的s就是副本的意思;
那么我们在计算的时候,对于一个split来说,每个Block又会有多个preferred location
(因为对于1个Block来说副本会有多个,因此split的最佳位置也可能有多个,因为副本的大小是一样的,只是所在的机器位置不同而已)
移动计算而不是移动数据,数据在哪里,就优先把作业调度到哪里去执行,这样就能减少数据的读取、网络的传输,提升作业的执行时间。
数据存储的时候一定要切割,以多副本的形式进行存储:
- 一方面可以便于存储,可以进行容错
- 另一方面在计算的时候,也是要进行切分的(默认按照blocksize的大小进行切分,当然也可以自己指定split的大小)
关于切片split的理解,可以参考Blog:
[考究Hadoop中split的计算方法]
http://blog.****.net/lemonzhaotao/article/details/77538289
注意:切片是指对文件进行切分,而Block也是指对文件进行切分
因此对移动计算的理解为:
分散在多台机器的某个大小为300的文件的Block大小分别为:128、128、44 副本数为3个;
切片大小为100,因此split作为输入分片的大小,但是在某个机器的时候,会发现数据不够,因此必然会涉及到数据的移动;
切片是指对要计算的文件进行切分,因而不管怎么设置该大小,肯定会在某个机器上遇到该文件的Block的数据不够作为分片输入task的情况,需要去其它机器上移动些数据过来,才能完成该台计算机计算任务的情况。
每个split作为task的输入,倘若需要组成该split的block在不同机器上,就必然会涉及到数据的传输(spark中的split可以理解为partition,MapReduce中也有split)关于Spark数据本地性的理解:
分布式计算系统的精粹在于移动计算而非移动数据,但是在实际的计算过程中,总会存在移动数据的情况,除非是在集群的所有节点上都保存了数据的副本。(举个例子:如果要运算的数据在该节点没有或者是少了点量,就需要去其它节点拉数据了);
移动数据,将数据从一个节点移动到另一个节点进行计算,不但消耗了网络IO,也消耗了磁盘IO,降低了整个计算的效率;
为了提高数据的本地性,除了优化算法(也就是修改spark内存,难度有点高),就是设置合理的数据副本;
设置数据的副本,这需要通过配置参数并长期观察运行状态才能获取的一个经验值
总结
Spark的五大特性,完美诠释了Resilient Distributed Dataset
RDD五大特性在源码中的体现
分析RDD.scala
分析源码的地址:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala
对应第二点:
对RDD做计算,实质就是对分区做计算;传入了split,类型为Partition
对应第一点:
一个RDD由一系列的partition组成;因此传入了一个Array[]数组,泛型的类型为Partition
对应第三点:
RDD之间有相互的依赖关系;因此需要得到Dependencies
对应第五点:
要计算一个Partition类型的split的时候,必然是需要得到它的
PreferredLocations,要知道它到底在哪个位置;因此最终返回的是一个Seq序列(类似于数组或者集合的东西)
Seq[String]可以理解为 所对应的一个路径
对应第四点:
key-value形式的partition
分析JdbcRDD.scala
分析源码的地址:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala
分析getPartitions方法
获取到分区信息/有起始和结束的位置/每一个分区处理了多少数据
假设有10个分区和1W条数据,1个粪污1000条数据,从第0个分区开始
第0个必然是前面1000条,
第1个必然是1000到1999条
…以此类推
对每个分区做一个map操作,通过开始和结束的位置结合起来 ==> 来构建出JdbcPartition
返回的是一个Partition数组,得到了一系列的Partition信息
分析compute方法
图解RDD
5个分区,3台机器
partition
有可能存在DISK上
有可能存在MEMORY上
有可能既存在DISK上又存在MEMORY上
有可能存多份
在Spark中,计算时有多少个partition就会对应有多少个task来执行
这里有5个partition,就会有5个task来执行
【注意】
- 如果core够的话,就是并行执行
- 如果core不够的话,肯定就是先跑一轮,再跑一轮…以此类推,跑完为止
数据本地性
如果在NODE2上运行partition1肯定就涉及到了数据的传输,会影响作业的执行速度
partition1在NODE1上运行肯定最好(体现了数据的本地性)