Spark[三]——Spark对内存的管理[On-Heap Memory、Off-Heap Memory、Storage、Execution、Other]

Spark[三]——Spark对内存的管理[On-Heap Memory、Off-Heap Memory、Storage、Execution、Other]

​ 由于Driver的内存管理较为简单,内存管理主要对Executor的内存管理进行探讨。

一、堆内(On-Heap Memory)和堆外(Off-Heap Memory)内存规划

​ Executor作为一个JVM进程,Executor的内存管理建立在JVM的内存管理之上。Spark对堆内内存进行JVM内存管理,引入了堆外内存,使之可以直接在工作节点的系统内存中开辟空间,进一步优化了内存的使用;其中,堆外内存直接向操作系统申请,对内存的申请和释放操作。

二、堆内内存(On-Heap Memory)

​ 堆内内存概述:

Spark[三]——Spark对内存的管理[On-Heap Memory、Off-Heap Memory、Storage、Execution、Other]

在Spark程序启动时,堆内内存的大小由spark-submit中的–executor-memory或spark.executor.memory参数配置。Spark对于堆内内存的管理是一种逻辑上的“规划式”管理,因为对象实例占用内存的申请和释放都由JVM完成,Spark只能在申请后和释放前记录这些内存。

​ 对于Spark的序列化对象,由于是字节流的形式,其占用的内存大小可以直接计算,而对于非序列化对象,其占用的内存是通过周期性的采样估算而得,即并不是新增数据项都会计算一次占用内存的大小,这种方法降低了时间开销,但是有可能误差较大,导致某一时刻的实际内存远远超出预期。所以Spark并不能准确记录实际可用的堆内内存,从而也无法避免内存溢出—OOM。

1.内存空间的动态分配

​ 在Spark1.6之前使用静态内存管理,即存储内存、执行内存和其他内存的大小在Spark应用程序运行期间是固定的。缺点很多,因此在Spark1.6以后,堆内内存和堆外内存的管理采用动态分配的形式,即:存储内存(Storage)和执行内存(Execution)共享同一块存储空间,双方可以动态的占用对方的空闲区域。

动态占用机制的规则如下:

​ 1.首先设定基本的Storage和Execution的区域(spark.storage.storageFraction参数),该设定确定了双方各自拥有的空间范围;

​ 2.双方的空间都不足时,则都存储到磁盘上面;

​ 3.若己方空间不足,而对方空间空余时,可借用对方的空间(存储空间不足指不足以放下一个完整的Block)。

Spark[三]——Spark对内存的管理[On-Heap Memory、Off-Heap Memory、Storage、Execution、Other]

​ 在上图1中的情况中,Storage占用了Execution的空余内存后,Execution要开始用了,想把虚线标示的内存要回来,于是Storage便开始在自己缓存的RDD中,找到 StorageLevel.MEMORY_AND_DISK / .MEMORY_AND_DISK_2 / .MEMORY_AND_DISK_SER / .MEMORY_AND_DISK_SER_2 的数据,溢写到磁盘上。如果写到磁盘上以后,还是达不到虚线的标准,在这一部分存储的数据,会直接被删除。鉴于此种状况,cache / persist一般都与checkpoint结合使用。Spark-core的checkpoint将一个RDD存储到分布式文件系统中;Spark-streaming的checkpoint:1.保存整个运行环境;2.保存未处理的RDD,便于恢复故障;

​ 在上图2中的情况中,Execution占用了Storage的空余内存,但是Storage要开始用了,想把虚线标示的内存要回来,这时候!Execution对Storage理直气壮地说:我的Shuffle调度时对内存的应用太复杂了,给你腾不出来地方,这些内存就不还给你了!Storage当然义愤填膺的说:好!(哈哈哈哈哈哈哈哈哈。。。。。。感觉这时候Storage好怂)。

2.淘汰与落盘

​ 由于同一个Executor的所有计算任务共享有限的存储内存空间,当有新的Block需要缓存但是剩余的内存空间不足且无法动态占用时,就要对LinkHashMap中的旧Block进行淘汰(Eviction),而被淘汰的Block如果存储级别中包含存储到磁盘的要求,则要对其进行落盘(Drop),否则直接删除该Block。遍历LinkedHashMap中Block,按照LRU进行淘汰,被淘汰的旧Block与新Block的MemoryMode相同,即同属堆外或堆内内存;新旧Block不能同属一个 RDD,避免循环淘汰。

​ 在RDD的缓存时,将原先存在Others中的含有多个Partition,且每个Partition中的record不连续的RDD,转存至Storage中,并且每个RDD中含有多个Block,且每个Block中的record连续,这个过程称为Unroll。

​ Storage模块在逻辑上以Block为基本存储单位,RDD的每个Partition经过处理后唯一对应一个Block(BlockId格式为rdd_RDD-ID_PARTITION-ID)。Driver端的Master负责整个Spark应用程序的Block的元数据信息管理和维护,而Executor端的Slave负责将Block的更新状态上报到Master,同时接收Master的命令,例如新增或者删除一个RDD。

3.执行内存(Execution)

​ Execution主要用来存储任务在执行Shuffle时占用的内存,Shuffle是按照一定规则对RDD数据重新分区的过程,由Shuffle Write和Shuffle Read两个阶段组成。

3-1 Shuffle Write

​ (1) 若在map端选用普通的排序方式,会采用ExternalSorter进行外派,在内存中存储数据时主要占用堆内执行空间;

​ (2)若在map端选择Tungsten方式,则采用ShuffleExternalSorter直接对序列化形式存储的数据排序,在内存中执行存储数据时可以使用堆内或堆外执行空间,取决了用户是否开启了堆外内存及堆外内存是否足够。

3-2 Shuffle Write

​ (1)在对reduce端数据进行聚合时,要将数据交给Aggregator处理,在内存中存储数据时占用堆内执行空间;

​ (2) 若对最终结果进行排序,则再次将数据交给ExternalSorter处理,此时占用堆内执行空间;

​ 在ExternalSorter和Aggregator中,Spark会使用一种叫AppendOnlyMap的哈希表在堆内执行内存中存储数据,但是在Shuffle过程中并不是所有的数据都能保存到哈希表中,这个哈希表占用的内存会周期性的进行估算,当达到一定程度,无法再从MemoryManager中申请到执行内存时,Spark就会全部存储到磁盘文件中,这个过程被称为溢存(Spill),溢存到磁盘的文件最后会被归并(Merge)。

4.其他内存(other)

​ 这里用于存储没有被缓存的RDD、元数据及Spark对象实例。其中,RDD以Partition为基本存储单位。

总结

​ Spark的存储内存和执行内存有着截然不同的管理方式:对于Storage来说,Spark用一个LinkedHashMap来集中管理所有的Block,Block由需要缓存的RDD的Partition转化而成,而对于Execution来说,Spark用AppendOnlyMap来存储Shuffle过程中的数据,在Tungsten排序中甚至抽象成为页式管理,开辟了全新的JVM内存管理机制。

三、堆外内存(Off-Heap Memory)

​ Spark引入堆外内存(Off-Heap),使之可以直接在工作节点的系统内存中开辟空间,存储经过序列化的二进制数据;

​ 堆外内存意味着把内存对象分配到Java虚拟以外的内存,这些内存直接受操作系统(而不是虚拟机)管理。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。Spark可以直接操作系统堆外内存,减少了不必要的系统开销,以及频繁的GC扫描和回收,提高了处理性能。堆外内存可以被精确地申请和释放(JVM对于内存的清理是无法精确指定时间点的,因此无法实现精确的释放),而且序列化的数据占用的空间可以被精确地计算,所以相比堆内内存来说降低了难度,也降低了误差;

在默认情况下堆外内存并不启用,可以通过配置spark.memory.offheap.enabled参数启用,并由spark.memory.offheap.size设定堆外空间的大小。除了没有Other空间外,堆内内存与堆外内存的划分方式相同,所有运行中的并发任务共享存储内存和执行内存。

Spark[三]——Spark对内存的管理[On-Heap Memory、Off-Heap Memory、Storage、Execution、Other]

注:图为原创,如有转载请注明出处