Spark Shuffle 分析

1.Shuffle 原理

1.1 概述

1.1.1 Map task端操作

1.1.2 Reduce task 端操作

1.1.3 Spark Shuffle

2.Spark Shuffle 的实现

2.1 Shuffle 的写操作

2.1.1 基于 Hash 的 Shuffle 写操作

2.1.2 基于 sort 的 Shuffle 写操作

2.1.3  Shuffle 读操作


1.Shuffle 原理

1.1 概述

Shuffle 描述着数据从map task输出到reduce task输入的这段过程。在分布式情况下,reduce task需要跨节点去拉取
其它节点上的map task结果。这一过程将会产生网络资源消耗和内存,磁盘IO的消耗。

Shuffle 是连接 Map 和 Reduce 之间的桥梁,Map 的输出要用到 Reduce 中必须经过 Shuffle 这个环节。由于Shuffle 阶段涉及磁盘的读写和网络传输,因此 Shuffle 的性能高低直接影响整个程序的性能和吞吐量。下图Hadoop的MapReduce 整个流程,其中Shuffle 阶段是介干Map 阶段和 Reduce 阶段之间。

Spark Shuffle 分析

1.1.1 Map task端操作

每个 map task 都有一个内存缓冲区(默认是100MB),存储着 map 的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个map task 结束后再对磁盘中这个map task 产生的所有临时文件做合并,生成最终的正式输出文件,然后等待reduce task来拉数据。

Spill过程:这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写。整个缓冲区有个溢写的比例spill.percent(默认是0.8),当达到阀值时map task 可以继续往剩余的memory写,同时溢写线程锁定已用memory,先对key(序列化的字节)做排序,如果client程序设置了Combiner,那么在溢写的过程中就会进行局部聚合。

Merge过程:每次溢写都会生成一个临时文件,在map task真正完成时会将这些文件归并成一个文件,这个过程叫做Merge。

1.1.2 Reduce task 端操作

当某台 TaskTracker上 的所有 map task 执行完成,对应节点的 reduce task 开始启动,简单地说,此阶段就是不断地拉取
(Fetcher)每个 map task 所在节点的最终结果,然后不断地做merge形成reduce task的输入文件。

Copy过程:Reduce进程启动一些数据copy线程(Fetcher)通过HTTP协议拉取TaskTracker的map阶段输出文件

Merge过程:Copy过来的数据会先放入内存缓冲区(基于JVM的heap size设置),如果内存缓冲区不足也会发生map task的
spill(sort 默认,combine 可选),多个溢写文件时会发生map task的merge

1.1.3 Spark Shuffle

Spark 作为 MapReduce 框架的一种实现,自然也实现了Shuffle 的逻辑。

Spark Shuffle:Shuffle 在中文的意思是“洗牌、混洗”的意思,在 MapReduce 过程中需要各个节点上的同一个类型数据汇集到某一节点中进行计算,把这些分布在不同节点的数据按照一定的规则汇集到一起的过程称为 Shuffle

  • 数据量非常大,达到TB甚至PB级别。这些数据分散到数百台甚至数千台的集群中运行,如果管理为后续的任务创建数据众多的文件,以及处理大小超过内存的数据量呢?
  • 如果对结果进行序列化和反序列化,以及传输之前如何进行压缩呢?

2.Spark Shuffle 的实现

Spark中有好几个Shuffle的实现。那具体使用哪个Shuffle的实现,取决于 spark.shuffle.manager 这个参数。可能的选项是  hash, sort, tungsten-sort。从Spark 1.2.0开始,sort 是默认选项。

2.1 Shuffle 的写操作

2.1.1 基于 Hash 的 Shuffle 写操作

在 Hadoop 中 Reduce 所处理的数据都是经过排序的,但在实际处理中很多场景并不需要排序,因此在 Spark1.0之间实现的是基于哈希的Shuffle写操作机制。在该机制中每一个 Mapper 会根据 Reduce 的数量创建出相应的 bucket,bucket 的数据是 M*R,其中M是Map的个数,R是Reduce的个数;Mapper 生成的结果会根据设置的Partition算法填充到每个bucket中。这里的bucket是一个抽象的概念,在该机制中每个bucket对应一个文件;当Reduce启动时,会根据任务的编号和所依赖的Mapper的编号从远端或者是本地取得相应的bucket作为Reduce任务的输入进行处理

相对于传统的MapReduce,Spark假定大多数情况下Shufle的数据排序是不必要的,比如Word Count,强制进行排序只能使性能变差,因此Spark并不在Reduce 端做merge sort,而是使用聚合(aggregator)。聚合实际上是一个HashMap,它以当前任务输出结果的Key为键,以任意要combine类型为值,当在Word Count的Reduce进行单词计数时,它会将Shuffle读到的每一个键值对更新或者插入到HashMap中。这样就不需要预先把所有的键值对进行merge sort,而是来一个处理一个,省下了外部排序这个步骤。

Hash Shuffle的优点:

  • 非常快速。不需要排序,也不需要维护哈希表。没有给数据排序时的内存的开销。
  • 没有额外的IO开销。数据只被读写一次。

Hash Shuffle的缺点:

  • 当Reducer的数量很多时,很容易有性能问题
  • 当大量文件被写到文件系统时,会产生大量的Random IO。而Random IO是最慢的,比Sequential IO要慢100倍。

2.1.2 基于 sort 的 Shuffle 写操作

为了缓解 Shuffe 过程中产生文件过多的文件和 Writer Handlerde 缓存开销过大的问题,在 Spark1.1 版本中借鉴 Hadoop Shuffle中的处理方式,引入基于排序的Shufle写操作机制。在该机制中,每个 Shuffle Map Task 不会为后续的每个任务创建单独的件,而是会将所有的结果写到同一个文件中,对应的生成一个 Index 文件进行索引。通过这种机制避免了大量文件的产生,一方面可以减轻文件系统管理众多文件的压力,另一方面可以减少Writer Handlerde缓存所占用的内存大小,节省了内存同时避免了GC的风险和频率。

基于排序的 Shuffle 写操作解决了文件创建数目过多的问题,也化解了在需要排序时内存占用过大的问题。

2.1.3  Shuffle 读操作


Spark Shuffle 分析

(1)在SparkEnv启动时,会对ShuffleManager、BlockManager 和MapOutputTracker 等实例化。ShufleManager配置项有HashShuffleManager、SortShuffleManager和自定义的ShuffleManager 等3种选项,前两种在Shuffle 读中均实例化的BlockStoreShuffleReader,但是在HashShuffleManager中所持有的是FileShuffleBlockResolver实例,SortShuffleManager中所持有的IndexShufleBlockResolver实例。对于第一个问题在该步骤就能够知道答案:将根据不同的写入方式将采取相同的读取方式,读取的数据放在哈希列表中便于后续处理。
(2)在BlockStoreShufleReader 的read方法中,调用MapOutputTracker的getMapSizesByExecutorld方法,由Executor的MapOutputTracker发送获取结果状态的GetMapOutputStatuses消息给Driver 端的MapOutputTrackerMaster,请求获取上Shuffle输出结果对应的MapStatus,在该MapStatus存放了结果数据的位置信息,ShufmleMapTask 执行结果元信息。通过请求Driver 端的MapOutputTrackerMaster,可以得到上游Shuffle结果的位置信息。
(3)知道Shuffle结果的位置信息后,对这些位置进行筛选,判断当前运行的数据是从本地还是从远程节点获取。如果是本地获取,直接调用BlockManager的getBlockData方法,在读取数据的时候会根据写入方式采取不同ShuffmeBlockResolver 读取:如果是在远程节点上,需要通过Netty网络方式读取数据。在远程读取的过程中使用多线程的方式进行读取,一般来说,会启动5个线程到5个节点进行读取所有,每次请求的数据大小不会超过系统设置的1/5,该大小由spark.reducer.maxSizelnFlight配置项进行设置,默认情况该配置为48MB。
(4)读取数据后,判断ShuffleDependency是否定义聚合(Aggregation),如果需要,则根据键值进行聚合。需要注意的是,如果在上游ShufftleMapTask已经做了合并,则在合并数据的基础上做键值聚合。待数据处理完毕后,使用外部排序(ExternalSorter)对数据进行排序并放入存储中,至此完成Shufle读数据操作。

 

参考 《图解spark 核心技术与实践》《Spark内核设计的艺术》 https://www.jianshu.com/p/a3bb3001abae