Spark 任务性能优化浅谈
1 spark on yarn(cluster模式)框架
图1- 1
1.1 yarn组件概念
ResourceManager:负责集群的资源管理和分配。
NodeManager:每个节点的资源和任务管理器。
Application Master:YARN中每个Application对应一个AM进程,负责与RM协商获取资源,获取资源后告诉NodeManager为其分配并启动Container。
Container:YARN中的抽象资源。
1.2 spark组件概念
Driver:进行资源申请、任务分配并监督其运行状况等。
DAGScheduler:将spark job转换成DAG图。
TaskScheduler:负责任务(task)调度
2 spark shuffle
2.1 窄依赖与宽依赖
理解shuffle之前,需要先理解窄依赖和宽依赖。
窄依赖:父RDD的每个分区都只被子RDD的一个分区依赖 例如map、filter、union等操作会产生窄依赖。
宽依赖:父RDD的分区被子RDD的多个分区依赖 例如 groupByKey、reduceByKey、sortByKey等操作会产生宽依赖,会产生shuffle过程,也是划分stage依据。
图2- 1
2.2 Shuffle过程
图2- 2
Shuffle过程包括:shuffle write与shuffle read。
Shuffle write:在stage n-1内,每个task经过一系列算子处理后,将数据按key进行分类写入相应磁盘文件中,相同key会被写到同一个磁盘文件中。
Shuffle read:stage n中每个task进行运算前,需要从上游stage n-1拉取属于自己的磁盘文件。
疑问:
a、在shuffle write中,如何决定Block File文件数?
对于产生shuffle过程的算子,有一个参数指定分区数,即shuffle write时Block File文件数。如以reduceByKey为例,下图是reduceByKey的python api说明,numPartitions指定shuffle write时Block File文件数,若numPartitions值为None,则参数spark.default.parallelism决定shuffle write时Block File文件数。
图2- 3
b、Block File文件数下游stage有何影响?
在Stage n-1中Shuffle write总数据量是一定的,若Block File文件数较小,则Stage n中每个task处理的数据量较大,在计算资源不变的情况下,容易出现oom;若Block File文件数较大,则Stage n中每个task处理的数据量较小,在计算资源不变的情况下,会出现资源浪费。因此在计算资源不变情况下,合适Block File文件数能充分利用计算资源
c、如何知道每个stage中shuffle write的数据量?
通过spark web ui查看任务运行过程图,具体详看2.3 spark web ui介绍
d、如何设置合适的分区数,即Block File文件数?
需要根据stage运算过程的计算复杂度而定,若stage n中使用了较多的算子并且运算过程复杂,则在计算资源不变的情况,需要设置较大的分区数;反之设置较小的分区数。
Spark性能优化原则之一:根据计算资源,合适设置每个stage中task的处理数据量
2.3 spark web ui介绍
2.3.1 启动HistoryServer
对于运行完成的spark任务,可以通过HistoryServer查看运行过程图。
- 如何启动spark HistoryServer?
在${spark_home}/sbin下,运行start-history-server.sh
- 如何查看任务的运行过程图?
在浏览器中输入启动HistoryServer的ip以及端口(默认18080),搜索任务的application id即可
2.3.2 spark job
图2- 4
Spark任务中,每个Action算子都会生成一个Job,如collect()、count()、first()、saveAsTextFile()、collectAsMap()。
在图2-4中可以了解到任务有几个job,以及每个Job的提交时间,运行时长,以及Tasks数等等。
2.3.3 spark stage
图2- 5
由图2-5可以知道每个stage的Shuffle Read数据量、Shuffle Write数据量、Tasks数据量(分区数)。
图2- 6
由图2-6可以知道某个stage中更详细信息,如每个task的Shuffle Red Size,从而了解每个task处理的数据量是否均匀,是否出现数据倾斜
3 spark参数设置与算子优化
3.1 spark参数设置
spark.executor.memory:每个executor拥有的堆内存
spark.yarn.executor.memoryOverhead:每个executor拥有的非堆内存
spark.executor.cores:每个executor拥有的core数量
spark.default.parallelism:设置全局默认的分区数
spark.dynamicAllocation.enabled : 是否允许动态分配Executor数
spark.dynamicAllocation.initialExecutors : 设置初始Executor数
spark.dynamicAllocation.minExecutors :设置最小Executor数
spark.dynamicAllocation.maxExecutors :设置最大Executor数
参数设置原则:根据集群资源情况,合理设置参数。如若集群资源充足,设置较大内存并且申请更多Executor数,会使任务更快完成;若集群资源不足,增大分区数(设置spark.default.parallelism或者算子的分区参数),减少每个task处理的数据量,以保证任务完成。
3.2 spark算子优化
算子优化原则:减少shuffle过程,减少算子使用
- groupByKey:避免使用groupByKey,采用reduceByKey代替,因为reduceByKey会进行本地聚合,类似于MapReduce中的combiner
- collect:减少使用collect算子
- collectAsMap:少使用collectAsMap算子,如果数据量大,采用join算子代替
- broadcast:合理使用broadcast,其作用是在每个executor保存一份变量副本,从而避免在每个task中保存一份变量副本
- coalesce 与repartition: repartition(numPartitions)等效于
coalesce(numPartitions, shuffle=True)。若需要分区数由多变少,使用coalesce;若每个分区数据量相差较大,可以考虑repartition均匀每个分区数据量,但不一定有效。
6、persist:对重复使用的RDD进行序列化,建议使用方式为persist(StorageLevel.MEMORY_AND_DISK),避免使用cache()或者persisit()
7、减少算子使用:合并算子,比如rdd.filter().map().filter(),考虑是否可以修改成rdd.map().filter()或者rdd.filter().map()