Spark Shuffle之Tungsten-Sort
概述
这篇文章是Spark Shuffle之Sort Shuffle的一部分,介绍UnsafeShuffleWriter的实现,也就是tungsten-sort。
实现
存储
UnsafeShuffleWriter内部使用了和BytesToBytesMap基本相同的数据结构处理map端的输出,不过将其细化为ShuffleExternalSorter和ShuffleInMemorySorter两部分,功能如下
ShuffleExternalSorter | 使用MemoryBlock存储数据,每条记录包括长度信息和K-V Pair |
ShuffleInMemorySorter | 使用long数组存储每条记录对应的位置信息(page number + offset),以及其对应的PartitionId,共8 bytes |
排序
写文件或溢写前根据数据的PartitionId信息,使用TimSort对ShuffleInMemorySorter的long数组排序,排序的结果为,PartitionId相同的聚集在一起,且PartitionId较小的排在前面,ShuffleExternalSorter中的数据不需要处理,如下
写文件
依次读取ShuffleInMemorySorter中long数组的元素,再根据page number和offset信息去ShuffleExternalSorter中读取K-V Pair写入文件,如下
溢写 & 合并
内存不足时,溢写数据到磁盘,每次溢写会生成上图中的一个dataFile,如果多次溢写产生多个dataFile,会在map端数据处理结束后进行merge合并为一个dataFile,如下
至此,UnsafeShuffleWriter的实现就介绍完了。
优势
SPARK-7081中简述了UnsafeShuffleManager的优势,如下
- ShuffleExternalSorter使用UnSafe API操作序列化数据,而不是Java对象,减少了内存占用及因此导致的GC耗时(参考Spark 内存管理之Tungsten),这个优化需要Serializer支持relocation。
- ShuffleExternalSorter存原始数据,ShuffleInMemorySorter使用压缩指针存储元数据,每条记录仅占8 bytes,并且排序时不需要处理原始数据,效率高。
- 溢写 & 合并这一步操作的是同一Partition的数据,因为使用UnSafe API直接操作序列化数据,合并时不需要反序列化数据。
-
溢写 & 合并可以使用fastMerge提升效率(调用NIO的transferTo方法),设置spark.shuffle.unsafe.fastMergeEnabled为true,并且如果使用了压缩,需要压缩算法支持SerializedStreams的连接,各默认值如下
默认 含义 spark.shuffle.compress true 是否压缩map端输出 spark.shuffle.unsafe.fastMergeEnabled true 是否开启fastMerge spark.io.compression.codec snappy 默认的压缩算法,支持fastMerge的压缩算法为snappy、lzf
使用
Spark Shuffle之Sort Shuffle中讨论了使用UnsafeShuffleWriter需满足的前提条件,如下
map-side aggregation | Partition数(RDD) | Serializer支持relocation |
否 | 小于16777216 | 是 |
接下来分析下为什么要满足这三个要求
- map-side aggregation:从上面的实现也可以看出,UnsafeShuffleWriter不是类似HashMap的数据结构,无法聚合key对应的value,所以无法支持map端的aggregation。
- Partition数小于16777216:参考第一幅图,存储PartitionId信息使用24bit,能表示的最大值为 (1 << 24) = 16777215,因此Partition数要小于16777216。
- Serializer支持relocation:原始数据首先被序列化处理,并且再也不需要反序列,在其对应的元数据被排序后,需要Serializer支持relocation,在指定位置读取对应数据。
总结
介绍tungsten-sort(UnsafeShuffleWriter)的实现、优势及何种情况下被Spark使用。