Spark性能优化指南学习(四)——spark内存模型及shuffle调优

一、内存模型

Spark性能优化指南学习(四)——spark内存模型及shuffle调优

spark运行使用内存主要包含driver和executor,通过driver-memory和executor-memory进行设置,通过运行机制得知,driver负责提交注册,接受executor反向注册,stage划分和task任务分发等工作,默认内存大小为1G,在使用collect算子时,需要注意oom,因为collect算子将数据拉取到driver,spark的主要作业发生在executor中,当服务器允许的情况下,为保证spark的运行效率,可以尽量提高。

Spark性能优化指南学习(四)——spark内存模型及shuffle调优

spark在1.6以前(不包含1.6)使用的是静态内存模型(executor部分),

分为两部分:包括execution:用来进行shuffle操作,storage:用来存储广播变量和缓存

分为四部分:如上图所示,图中占比为默认值。

缺点:分配比例固定,如果spark作业缓存多shuffle操作少,或者缓存少shuffle操作多,或造成内存浪费,有限的资源不能得到充分的利用。

对应task的内存分配模式为:

Spark性能优化指南学习(四)——spark内存模型及shuffle调优

缺点:内存分配为平均分配,缺少灵活性

为了解决内存分配的固化,从spark1.6版本后采用统一内存模型

Spark性能优化指南学习(四)——spark内存模型及shuffle调优

图中比例分配为2.x版本的分配比例(其中1.6版本比例分配为,spark.memory.fraction=0.75,spark.memory.storageFraction =0.5),其中不再严格界定storage和execution内存,在缓存较多,shuffle较少时,storage占比会增大,满足需求;当缓存较少,shuffle较多时,execution内存占比会增大。

当内存不足时:

Spark性能优化指南学习(四)——spark内存模型及shuffle调优


<1>在缓存较多,shuffle较少 -->shuffle需求增加--->storage返还借用的内存,自身缓存内容写入磁盘

Spark性能优化指南学习(四)——spark内存模型及shuffle调优

<2>当缓存较少,shuffle较多时-->缓存需求增加-->execution不会返还借用的内存,storage将缓存文件写入磁盘

对应task的内存分配模式:

Spark性能优化指南学习(四)——spark内存模型及shuffle调优


每个task内存分配站决定于task的个数N,N越少,分配的内存越多,反之越少。内存使用更加灵活。有利于高效完成任务

对应task内部不同操作的内存分配(1.6+):

Spark性能优化指南学习(四)——spark内存模型及shuffle调优


二、shuffle调优

影响一个Spark作业性能的因素,主要还是代码开发、资源参数以及数据倾斜,shuffle调优只能在整个Spark的性能调优中占到一小部分而已

在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager。该ShuffleManager而HashShuffleManager有着一个非常严重的弊端,就是会产生大量的中间磁盘文件,进而由大量的磁盘IO操作影响了性能。

因此在Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。

1、未经优化的HashShuffleManager

产生中间磁盘文件=num_map_task * num_reduce_executor

Spark性能优化指南学习(四)——spark内存模型及shuffle调优

2、优化后的HashShuffleManager

产生中间磁盘文件=num_cpucore * num_reduce_executor

设置一个参数,spark.shuffle.consolidateFiles。该参数默认值为false,将其设置为true即可开启优化机制

一个Executor上有多少个CPU core,就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。

当Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,包括其中的磁盘文件。也就是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。

Spark性能优化指南学习(四)——spark内存模型及shuffle调优

3、SortShuffleManager运行原理

普通运行机制

在该模式下,数据会先写入一个内存数据结构中,此时根据不同的shuffle算子,可能选用不同的数据结构。如果是reduceByKey这种聚合类的shuffle算子,那么会选用Map数据结构,一边通过Map进行聚合,一边写入内存;如果是join这种普通的shuffle算子,那么会选用Array数据结构,直接写入内存。接着,每写一条数据进入内存数据结构之后,就会判断一下,是否达到了某个临界阈值。如果达到临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,然后清空内存数据结构。

在溢写到磁盘文件之前,会先根据key对内存数据结构中已有的数据进行排序。排序过后,会分批将数据写入磁盘文件。默认的batch数量是10000条,也就是说,排序好的数据,会以每批1万条数据的形式分批写入磁盘文件。写入磁盘文件是通过Java的BufferedOutputStream实现的。BufferedOutputStream是Java的缓冲输出流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次写入磁盘文件中,这样可以减少磁盘IO次数,提升性能。

一个task将所有数据写入内存数据结构的过程中,会发生多次磁盘溢写操作,也就会产生多个临时文件。最后会将之前所有的临时磁盘文件都进行合并,这就是merge过程,此时会将之前所有临时磁盘文件中的数据读取出来,然后依次写入最终的磁盘文件之中。此外,由于一个task就只对应一个磁盘文件,也就意味着该task为下游stage的task准备的数据都在这一个文件中,因此还会单独写一份索引文件,其中标识了下游各个task的数据在文件中的start offset与end offset。

Spark性能优化指南学习(四)——spark内存模型及shuffle调优

bypass运行机制

bypass运行机制的触发条件如下:

  • shuffle map task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值。
  • 不是聚合类的shuffle算子(比如reduceByKey)。

此时task会为每个下游task都创建一个临时磁盘文件,并将数据按key进行hash然后根据key的hash值,将key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,同样会将所有临时磁盘文件都合并成一个磁盘文件,并创建一个单独的索引文件。

该过程的磁盘写机制其实跟未经优化的HashShuffleManager是一模一样的,因为都要创建数量惊人的磁盘文件,只是在最后会做一个磁盘文件的合并而已。因此少量的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager来说,shuffle read的性能会更好。

而该机制与普通SortShuffleManager运行机制的不同在于:第一,磁盘写机制不同;第二,不会进行排序。也就是说,启用该机制的最大好处在于,shuffle write过程中,不需要进行数据的排序操作,也就节省掉了这部分的性能开销。

Spark性能优化指南学习(四)——spark内存模型及shuffle调优

shuffle相关参数调优


spark.shuffle.file.buffer 默认值是32k,建议可以调成64K

spark.reducer.maxSizeInFlight 默认值是48M,如果资源运行,可以调成96M

spark.shuffle.io.maxRetries 默认值是3   建议30次左右

spark.shuffle.io.retryWait 默认值是5s 建议可以到30s左右