Spark的两种核心Shuffle(HashShuffle与SortShuffle)的工作流程与源码分析(手把手看源码)
写在前面的话:本篇博客为原创,认真阅读需要比对spark 2.1.1的源码,预计阅读耗时30分钟,如果大家发现有问题或者是不懂的,欢迎讨论
欢迎关注公众号:后来X
spark 2.1.1的源码包(有需要自取):关注公众号【后来X】,回复spark源码
在spark中说到shuffle,大家应该不陌生,因为有shuffle所以才把stage分为
- ShuffleMapStage:前面的所有stage
- finalStage:最后一个stage
今天主要说的是Spark的两种核心Shuffle
1、HashShuffle(2.0版本已淘汰)
通过上述的两张图,大家就能够看出来,优化后的hashShuffle明显比优化前少了很多中间的小文件,而原因是:Task复用Buffer缓冲区(也称为合并机制)
优化的HashShuffle开启合并机制的配置是spark.shuffle.consolidateFiles,设置为true即可。
2、SortShuffle
过程解析:
- 先把数据直接写入内存
- 判断是否达到阈值,如果达到就会将内存数据结构的数据写入到磁盘
- 在溢写磁盘前,先根据key进行排序,排序过后的数据,会分批写入到磁盘文件中。默认批次为10000条,也就是10000条写一次
- 写入磁盘文件通过缓冲区溢写的方式,每次溢写都会产生一个磁盘文件,也就是说一个Task过程会产生多个临时文件。
- 最后在每个Task中,将所有的临时文件合并为一个文件,这就是merge过程,同时单独写一份索引文件,标识下游各个Task的数据在文件中的索引,start offset和end offset。
这其中的有一个关键点:达到阈值,这里的这个阈值不像之前在hadoop中的shuffle缓冲区是固定的100M,这里的缓冲区是可变的(具体看源码分析第15步)
bypass SortShuffle
特点:
- 不排序(优于SortShuffle)
- 最后合并文件(优于HashShuffle)
bypass运行机制的触发条件如下:
- shuffle reduce task数量小于spark.shuffle.sort.bypassMergeThreshold参数的值,默认为200。(减少中间小文件产生)
- 不是聚合类的shuffle算子(比如reduceByKey)
那我们接下来一起看看sortShuffle的源码,来加深我们对于spark SortShuffle的理解
首先这个shuffle过程肯定是发生在ShuffleMapStage最后一步执行任务的阶段,所以我们呢从Executor端执行任务的入口开始看:
- 双击Shift,找到Executor,并且通过ctrl + F找到执行任务的run()方法
- 前面都是变量赋值,我们找到真正的执行task的run方法
- 从run()进入,发现还是在判断线程是否死掉,继续找到runTask()方法
- 进入后发现需要找实现类,ctrl + h ,找到ShuffleMapTask实现类
- 找到实现类后,还是继续找真正的执行,runTask()方法,发现直接就是开始Write写数据,而读数据是执行括号内的iterator()方法,那么我们先了解一下是怎么读数据的,要了解怎么写数据直接跳到12步
- 进入iterator()方法,最终找到compute方法,这个方法是特别关键的,是整个运算的核心,我们找到实现类的compute()方法
我们注意到在最终这个compute方法张,有真正的read()方法,来读取数据 - 到现在为止,读数据的流程我们已经清楚了,接下来我们返回来第5步开始继续看写数据的流程,我们先了解一下这个依赖的关系
- 通过上面的实现类,找到注册shuffle的实现方法,这一步非常关键,在这一步判断是否能使用bypass SortShuffle
- 进入if条件,根据2个条件是否都满足来决定返回值为true还是false,来决定能否使用bypass SortShuffle
- 到这里我们已经看到了bypass运行机制的2个触发条件,我们现在返回第5步从getWrite()方法进入,找到实现类中的实现方法
到这一步的时候已经能够前后对应起来了,上面这些步骤最终知道了最后才采用哪种shuffle方式,那么接下来我们继续接着看具体的shuffle过程有什么区别,我么主要关心SortShuffle和bypassSortShuffle - 还是从第5步中的write方法进入,先看实现类SortShuffle
- 判断是否需要排序,并把数据全部插入内存
- 我们先从insetAll()方法进入,看一下数据是如何写出的
- 判断是否需要溢写
- 当上面的步骤最后需要溢写的时候就开始找真正溢写的方法,发现最溢写出的文件组成了一个集合
- 现在溢写出道一个集合中后,我们看一下merge方法,参数就是一个集合,对应上一步的溢写。
上面就是SortShuffle的全过程,那么接下来继续看bypassSortShuffle - 返回第12步中,我们选择bypassSortShuffle,找到对应的write方法,按照分区进行溢写(因为也不需要排序),最后把溢写出的文件合并为一个文件
以上就是bypassSortShuffle的溢写过程。
最后总结一下:
hashShuffle:
- 优化前:每个task将数据按照分区器(hash/numreduce取模)计算,写入buffer
最后的小文件个数为: Task数量 * 文件类型(分区器的计算结果的种类个数) - 优化后:只是多个task共用buffer缓冲区,
最后的小文件个数为: CPU核数 * 文件类型(分区器的计算结果的种类个数)
普通的SortShuffle:
合并为1个文件+1个索引文件,中间先排序后溢写
bypassSortShuffle:
- 不排序,不能使用预聚合算子(比如reduceByKey)
- reduce task数量小于200(为了减少中间小文件的产生)
好啦,今天就说到这里啦,下一篇再见。
最后再来一波,喜欢我的欢迎点赞,欢迎关注我的公众号:后来X,回复:spark源码,获取spark2.1.1源码包以及大数据资料
持续更新,未完待续!