Spark RDD深入理解


通过RDD.scala源码来解读什么是RDD

Spark Github:
https://github.com/apache/spark
RDD源码:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala

让开发者大大降低了开发分布式应用程序的门槛及提高执行效率

Resilient Distributed Dataset(RDD)

  1. 弹性
    体现在计算之上;
    它代表的是Spark在分布式计算的时候可以进行容错,比如说:数据丢失了,可以通过RDD的容错机制,来进行修复;
    因此准确的说,弹性在计算层面体现的尤为明显
  2. 分布式
    数据归属在一个大的集群之上,可以拆开的存在每个机器上;
    数据分布在不同的节点上
    计算的时候也是一样,代码可以运行在多个节点之上
  3. 数据集
    可以通过不同的方式进行创建
    会被拆分成很多分区,进行计算

通过RDD.scala中的注释来解读RDD的概念

  1. the basic abstraction in Spark
    在Spark中最基本的抽象单元
    在Spark每个框架都应该有个抽象的编程模型

    • 在Spark Core里是基于RDD进行编程
    • 在Spark Streaming里是基于DStream进行编程
    • 在Spark SQL里是基于SQLContext/HiveContext类似的进行编程
  2. Represents an immutable,partitioned collection of elements that can be operated on in parallel
    RDD代表了不可变的(scala中不可变额用val进行修饰)
    这也就意味着RDD一旦产生,就没有办法改变
    举例:
    RDDA ==各种操作==> RDDB
    RDDA与RDDB都是独一无二的,因为它们是不可变的,可以通过一系列的操作转换得到

集合里的元素能够被拆成各个分区:
这句话也可以理解为HDFS中的Block或者MapReduce中的InputSplit;
集合中的元素被拆成各个分区之后,数据集能够以一种并行的方式进行操作
举例:
Spark RDD深入理解
对RDDA执行”+1”的操作,实质上是对RDDA内的所有分区(在不同机器上的)都执行了相同的操作

我们通过RDD.scala源码可以发现对RDD的定义如下:
Spark RDD深入理解

  1. 抽象类
    代表RDD不能直接使用,这就好比在Java中抽象类不能直接new出来使用一样
    因此RDD必然是有子类实现的,我们使用时直接使用其子类即可
  2. Serializable(序列化)
    不序列化,提交上个区会报task未被序列化的错误(Spark开发中常见的错误)
    因此分布式框架中序列化的好坏直接影响了性能的好坏
  3. Logging
    在Spark 1.6.x版本中是可以直接使用的
    在2.x版本中,是被移走了
  4. T(泛型)
    意味着可以支持各种数据类型
    也说明了RDD中存储的数据类型是不确定的
  5. SparkContext
  6. @transient

考察JdbcRDD.scala的定义:
Spark RDD深入理解
继承了RDD,直接使用RDD是不能使用的,需要去实现该抽象类

RDD五大特性

Spark RDD深入理解
每一个RDD都会有如下的五个特点:

  1. A list of partitions
  2. A function for computing each split(split理解为partition)
    对RDD做的操作,即对每个分区做一个相同的function
  3. A list of dependencies on other RDDs
    RDD之间有相互的依赖关系
    RDDA ==> RDDB ==> RDDC ==> RDDD
    RDD中又有多个partition构成,那么如何体现RDD的弹性呢?
    举例:
    RDDA中的一个partition挂掉了,把数据重新加载一下,就行了
    如果RDDC中的一个partition挂掉了,倘若RDDB与RDDC之间的关系是窄依赖,那么直接通过一个转换操作就过来了
    因此,当某个partition里面的数据丢失以后,Spark可以通过RDD之间的依赖关系,重新计算partition的分区数据;
    【注意】不是对RDD中所有的partition的数据进行重新计算,而是通过计算出了问题的partition就可以了(前提是一个窄依赖关系)
  4. Optionally, a Partitioner for key-value RDDs (e.g. to say that RDD is hash-partitioned)
    【可选】
    K-V类型RDD的partition(比如hash-partitioned)
  5. Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
    【可选】
    对每一个分片机芯计算的时候,会有preferred locations
    这个特性,体现了数据本地性

preferred locations:
数据在哪,就把task分配到数据所在地去运行(这样就可以大程度的避免数据的移动);
最后的s是副本的概念;
根据e.g. block locations for an HDFS file锁具的案例:HDFS在存放的时候会有多个Block,每个Block又会有它的副本,因此多出来的s就是副本的意思;
那么我们在计算的时候,对于一个split来说,每个Block又会有多个preferred location
(因为对于1个Block来说副本会有多个,因此split的最佳位置也可能有多个,因为副本的大小是一样的,只是所在的机器位置不同而已)

移动计算而不是移动数据,数据在哪里,就优先把作业调度到哪里去执行,这样就能减少数据的读取、网络的传输,提升作业的执行时间。
数据存储的时候一定要切割,以多副本的形式进行存储:

  • 一方面可以便于存储,可以进行容错
  • 另一方面在计算的时候,也是要进行切分的(默认按照blocksize的大小进行切分,当然也可以自己指定split的大小)

关于切片split的理解,可以参考Blog:
[考究Hadoop中split的计算方法]
http://blog.****.net/lemonzhaotao/article/details/77538289
注意:切片是指对文件进行切分,而Block也是指对文件进行切分
因此对移动计算的理解为:
分散在多台机器的某个大小为300的文件的Block大小分别为:128、128、44 副本数为3个;
切片大小为100,因此split作为输入分片的大小,但是在某个机器的时候,会发现数据不够,因此必然会涉及到数据的移动;
切片是指对要计算的文件进行切分,因而不管怎么设置该大小,肯定会在某个机器上遇到该文件的Block的数据不够作为分片输入task的情况,需要去其它机器上移动些数据过来,才能完成该台计算机计算任务的情况。
每个split作为task的输入,倘若需要组成该split的block在不同机器上,就必然会涉及到数据的传输(spark中的split可以理解为partition,MapReduce中也有split)

关于Spark数据本地性的理解:
分布式计算系统的精粹在于移动计算而非移动数据,但是在实际的计算过程中,总会存在移动数据的情况,除非是在集群的所有节点上都保存了数据的副本。(举个例子:如果要运算的数据在该节点没有或者是少了点量,就需要去其它节点拉数据了);
移动数据,将数据从一个节点移动到另一个节点进行计算,不但消耗了网络IO,也消耗了磁盘IO,降低了整个计算的效率;
为了提高数据的本地性,除了优化算法(也就是修改spark内存,难度有点高),就是设置合理的数据副本;
设置数据的副本,这需要通过配置参数并长期观察运行状态才能获取的一个经验值

总结
Spark的五大特性,完美诠释了Resilient Distributed Dataset

RDD五大特性在源码中的体现

分析RDD.scala

分析源码的地址:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala
对应第二点:
Spark RDD深入理解
对RDD做计算,实质就是对分区做计算;传入了split,类型为Partition

对应第一点:
Spark RDD深入理解
一个RDD由一系列的partition组成;因此传入了一个Array[]数组,泛型的类型为Partition

对应第三点:
Spark RDD深入理解
RDD之间有相互的依赖关系;因此需要得到Dependencies

对应第五点:
Spark RDD深入理解
要计算一个Partition类型的split的时候,必然是需要得到它的
PreferredLocations,要知道它到底在哪个位置;因此最终返回的是一个Seq序列(类似于数组或者集合的东西)
Seq[String]可以理解为 所对应的一个路径

对应第四点:
Spark RDD深入理解
key-value形式的partition

分析JdbcRDD.scala

分析源码的地址:
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/JdbcRDD.scala

分析getPartitions方法

Spark RDD深入理解
获取到分区信息/有起始和结束的位置/每一个分区处理了多少数据
假设有10个分区和1W条数据,1个粪污1000条数据,从第0个分区开始
第0个必然是前面1000条,
第1个必然是1000到1999条
…以此类推
对每个分区做一个map操作,通过开始和结束的位置结合起来 ==> 来构建出JdbcPartition
返回的是一个Partition数组,得到了一系列的Partition信息

分析compute方法

Spark RDD深入理解
Spark RDD深入理解

图解RDD

Spark RDD深入理解
5个分区,3台机器

partition
有可能存在DISK上
有可能存在MEMORY上
有可能既存在DISK上又存在MEMORY上
有可能存多份
在Spark中,计算时有多少个partition就会对应有多少个task来执行
这里有5个partition,就会有5个task来执行

【注意】

  1. 如果core够的话,就是并行执行
  2. 如果core不够的话,肯定就是先跑一轮,再跑一轮…以此类推,跑完为止

数据本地性
如果在NODE2上运行partition1肯定就涉及到了数据的传输,会影响作业的执行速度
partition1在NODE1上运行肯定最好(体现了数据的本地性)