spark并行计算

 

目标:

  1. 解释RDD在SPARK的集群是如何分布的。
  2. 分析SPARK如何对基于文件的RDD进行分区。
  3. 阐述SPARK如何并行执行RDD操作
  4. 说明如何通过分区来实现并行控制
  5. 分析如何查看和监控任务task和阶段stage.

首先来看一看spark是如何在云模式(cluster mode)工作的。

spark cluster

一个云模式下的spark程序运行流程如下所示。spark并行计算

用户可以通过Spark-submit提交spark的job.一旦spark job提交,Sparkcontext 驱动程序将打开,并且稍后会将该程序传递给云管理节点(cluster Manager:可以是yarn,standalone,mesos,local,k8s等,在我的理解中,这里仅仅提交了任务触发的驱动程序,至于真正的spark程序(数据如何处理的部分,通常是以打包的形式自己上传到指定的位置,可以是dfs或者nfs等)这里并没有提交)到集群的master节点。

集群Master节点将根据该程序(driver program)在工作节点中创建容器。接下来,在工作节点中的Executors将根据容器创建,然后通过driver Program中的sparkContext进行交互。

在SPARK CLUSTER中的RDDs

在RDD(Resilient Distributed DataSets弹性分布式数据集)中,spark将数据跨工作节点进行分区。你也可以控制创建分区的数量。

spark并行计算

spark并行计算

下面尝试理解单个文件的分区概念

文件分区:单文件

分区将基于文件大小。你也可以指定所需文本文件的最小分区数(file,minPartitions)。默认情况下,在spark cluster运行的文件被分为2个部分。分区越多,并行度越高。

文件分区:多文件

使用命令:sc.textFile('mydir/*'),每个文件至少是一个分区。可以对每个分区执行基于文件的操作,例如解析XML。

接下来的这个命令:sc.wholeTextFiles("mydir").该命令用于对很多的小文件进行分区,而且也能用于创建一个键值对RRD(键表示文件名,值表示文件内容)

大多数RDD操作将作用于RDD中的每一个元素,少数一部分作用于每一个分区。作用于分区的一些命令是:

foreachPartition ----用于为每个分区调用一个函数

mapPartitions  --- 用于通过对当前RDD的每个分区执行函数来创建新的RDD

mapPartitionsWithIndex ---该命令类似mapPartitions,只是该命令包含了分区的index

注意:分区操作的函数采用了迭代器。为了更好的理解我们看一个RDD的例子。

首先通过例子来理解foreachPartition

foreachPartition

在下面的例子中,我们创建了一个函数printFirstLine用来计算各分区的第一行。

spark并行计算

假设我们已经创建了一个名为myrdd的RDD。我们将创建的函数printFirstLine传递给需要计算个分区第一行的foreachPartition.

现在你已经明白了分区的命令,下一章将通过一个例子来尝试理解HDFS和本地数据概念

HDFS 和 本地数据(Data locality)

在图中,你能看到多个数据节点.

spark并行计算

现在可以通过hdfs dfs -put mydata可以将mydata文件推送到hdfs上。假设该文件以三个块的形式保存在hdfs磁盘中。

当数据保存到HDFS磁盘中之后你就可以在spark中编程。开始你的程序后,Spark context将会获得并且在datanodes上启动executor。(这里应该假设spark程序运行的cluster manager为yarn,yarn将调度程序的副本至相应的datanodes中来执行)

使用sc.textfile命令,可以将mydata文件推入executor中。由于这仅仅是一个转换步骤,所以RDD将仍然是空的。

使用动作触发器执行,执行器上的任务将数据从块加载到分区中。 然后,数据将在执行程序之间分配使用,直到操作将值返回给驱动程序为止。

分区的并行操作(Parallel Operations on Partitions)

RDD操作能够在每一个分区上以并行的方式执行。任务将在数据存储的工作节点上被执行。

一些操作会保留分区,例如map,flatMap,filter,distinct等等。一些操作进行了重新分区,例如reduceByKey,sortByKey,join,groupByKey等等操作。

接着了解些stages的操作

Stages的操作(operations in stages)

在stages上操作能够运行在相同的分区上(这句不是很理解Operations that can run on the same partition are executed in stages)。处于同一个stage上的任务被流水线约束到一起。开发者应该意识到stages 操作来提升性能。

下面是列举了一些Spark术语:

Job: 是一行为的执行任务集合(个人理解是为了某一意图的所有任务集合,可以理解为一个dag)

stage: 是job中能够并行执行的一个任务集合(个人理解是从单个分区中执行的任务集合,可以理解为优化后的DAG)

Task:发送给一个executor的单个执行工作单元(个人理解为是一个函数)。

Application:是单个driver管理的包含若干job的集合(可以理解为单个程序脚本)

接下来看看spark是如何计算stages 的。

spark如何计算stages

Spark构造了RDD依赖关系的有向无环图或DAG。这些依赖关系有两种类型

窄依赖

窄依赖是指child RDD中的每个分区仅仅依赖父RDD的一个分区。在不同的executors没有shuffle的需求。创建RDD的节点可以分割成一个stage.例如:map,filter

宽依赖或者混洗(shuffle)依赖

宽依赖或shuffle依赖,很多child RDD 分区依赖parent RDD的各个分区。宽k依赖定义了一个新的stage.例如:reduceByKey,join,groupByKey.接下来查看并行度控制的过程。

宽依赖操作例如ReduceByKey 分区RDD中的结果。分区数量越多,并行任务越多。如果分区太少,spark 集群将不能充分利用。

可以通过函数调用过程中的numPartitionsparameter选项来控制分区数量。可以通过localhost:4040来查看spark应用UI,在UI中可以查看所有的spark job。

总结:

RDDS存储在SPARK executor的内存中

虚拟机和JVMS数据被分成区

在分离的Executor RDD的每一个分区都是以并行的方式执行操作

基于相同分区的操作在stage中以流水线的方式约束在一起

依赖多分区的操作在stage中分离的方式执行