菜鸟的Spark 源码学习之路 -6 Memory管理源码 -part2 MemoryManager
上文中 菜鸟的Spark 源码学习之路 -6 Memory管理源码 -part1 功能概览,对Spark Memory管理的实现做了一个整体的了解,这次我们从MemoryManager开始深入了解spark 内存管理:
首先看一下MemoryManager的结构:
这里有几个重要的数据结构:
// -- Methods related to memory allocation policies and bookkeeping ------------------------------ @GuardedBy("this") protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP) @GuardedBy("this") protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP) @GuardedBy("this") protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP) @GuardedBy("this") protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP) onHeapStorageMemoryPool.incrementPoolSize(onHeapStorageMemory) onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory) protected[this] val maxOffHeapMemory = conf.get(MEMORY_OFFHEAP_SIZE) protected[this] val offHeapStorageMemory = (maxOffHeapMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong offHeapExecutionMemoryPool.incrementPoolSize(maxOffHeapMemory - offHeapStorageMemory) offHeapStorageMemoryPool.incrementPoolSize(offHeapStorageMemory)
主要是存储和执行的堆内内存划分和堆外内存划分。
上文讲到MemoryManager的两个重要实现UnifiedMemoryManager和StaticMemoryManager,如下图所示:
由于StaticMemoryManager主要是向前的兼容,我们重点看一下UnifiedMemoryManager。
/** * A [[MemoryManager]] that enforces a soft boundary between execution and storage such that * either side can borrow memory from the other. * * The region shared between execution and storage is a fraction of (the total heap space - 300MB) * configurable through `spark.memory.fraction` (default 0.6). The position of the boundary * within this space is further determined by `spark.memory.storageFraction` (default 0.5). * This means the size of the storage region is 0.6 * 0.5 = 0.3 of the heap space by default. //执行和存储的内存区域中,共享内存在总堆内存中的比例默认是0.6。共享内存中,边界的位置由spark.memory.storageFraction 参数决定。因此存储区域的内存在堆空间中的比例默认是0.3。 * Storage can borrow as much execution memory as is free until execution reclaims its space. * When this happens, cached blocks will be evicted from memory until sufficient borrowed * memory is released to satisfy the execution memory request. *//在其他执行收回空间前,可尽可能多的向其他的执行内存空间借用空闲内存资源。内存如果被收回,则缓存的数据块会被删除,直至可以重新“挪用”满足大小的内存空间为止。 * Similarly, execution can borrow as much storage memory as is free. However, execution * memory is *never* evicted by storage due to the complexities involved in implementing this. * The implication is that attempts to cache blocks may fail if execution has already eaten * up most of the storage space, in which case the new blocks will be evicted immediately * according to their respective storage levels. //同样地,执行内存也可以向存储内存“借用”空间。不同的是,由于执行本身存在复杂性,内存空间一旦占用无法被存储内存收回。这意味着执行占用过多内存后会导致缓存数据失败,这种情况下,会根据新存储的block存储优先级进行空间释放。 * * @param onHeapStorageRegionSize Size of the storage region, in bytes. * This region is not statically reserved; execution can borrow from * it if necessary. Cached blocks can be evicted only if actual * storage memory usage exceeds this region. */ private[spark] class UnifiedMemoryManager private[memory] ( conf: SparkConf, val maxHeapMemory: Long, onHeapStorageRegionSize: Long, numCores: Int) extends MemoryManager( conf, numCores, onHeapStorageRegionSize, maxHeapMemory - onHeapStorageRegionSize)
这里有两个方法用于计算堆内和堆外的最大剩余空间:
override def maxOnHeapStorageMemory: Long = synchronized { maxHeapMemory - onHeapExecutionMemoryPool.memoryUsed } override def maxOffHeapStorageMemory: Long = synchronized { maxOffHeapMemory - offHeapExecutionMemoryPool.memoryUsed }
1. 执行内存空间获取
尝试为当前任务获取指定大小(numBytes)的内存空间,返回获取的空间大小,如果没有分配任何空间则返回0
/** * Try to acquire up to `numBytes` of execution memory for the current task and return the * number of bytes obtained, or 0 if none can be allocated. * * This call may block until there is enough free memory in some situations, to make sure each * task has a chance to ramp up to at least 1 / 2N of the total memory pool (where N is the # of * active tasks) before it is forced to spill. This can happen if the number of tasks increase * but an older task had a lot of memory already. */
override private[memory] def acquireExecutionMemory( numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Long = synchronized { assertInvariants() assert(numBytes >= 0) //获取当前内存状态 val (executionPool, storagePool, storageRegionSize, maxMemory) = memoryMode match { case MemoryMode.ON_HEAP => ( onHeapExecutionMemoryPool, onHeapStorageMemoryPool, onHeapStorageRegionSize, maxHeapMemory) case MemoryMode.OFF_HEAP => ( offHeapExecutionMemoryPool, offHeapStorageMemoryPool, offHeapStorageMemory, maxOffHeapMemory) } /** * Grow the execution pool by evicting cached blocks, thereby shrinking the storage pool. * * When acquiring memory for a task, the execution pool may need to make multiple * attempts. Each attempt must be able to evict storage in case another task jumps in * and caches a large block between the attempts. This is called once per attempt. */ def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = { if (extraMemoryNeeded > 0) { // There is not enough free memory in the execution pool, so try to reclaim memory from // storage. We can reclaim any free memory from the storage pool. If the storage pool // has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim // the memory that storage has borrowed from execution. //在执行内存空间不足时,尝试从存储内存获取一部分空间。这里有两种方式: //1. 存储内存区剩余空闲空间>所需内存空间 //2. 移除部分数据block,回收空间以供执行内存空间“借用” val memoryReclaimableFromStorage = math.max( storagePool.memoryFree, storagePool.poolSize - storageRegionSize) if (memoryReclaimableFromStorage > 0) { // Only reclaim as much space as is necessary and available: // 这里按最小需求释放空间 val spaceToReclaim = storagePool.freeSpaceToShrinkPool( math.min(extraMemoryNeeded, memoryReclaimableFromStorage)) //缩减存储内存 storagePool.decrementPoolSize(spaceToReclaim) //扩展执行内存 executionPool.incrementPoolSize(spaceToReclaim) } } } /** * The size the execution pool would have after evicting storage memory. * 移除数据后执行池的大小 * The execution memory pool divides this quantity among the active tasks evenly to cap * the execution memory allocation for each task. It is important to keep this greater * than the execution pool size, which doesn't take into account potential memory that * could be freed by evicting storage. Otherwise we may hit SPARK-12155. * 执行内存会将剩余空间平均分配给每个Active的task。这个空间必须大于执行池在未回收空间时的大小。否则会出现问题。 * 此外,这个值必须小于maxMemory,保证执行内存在任务间的随机公正分配,因为某个任务可能需要大于平均分配的内存空间, * 此时,该task会错误认为可以使用已占用的存储内存空间。 * Additionally, this quantity should be kept below `maxMemory` to arbitrate fairness * in execution memory allocation across tasks, Otherwise, a task may occupy more than * its fair share of execution memory, mistakenly thinking that other tasks can acquire * the portion of storage memory that cannot be evicted. */ def computeMaxExecutionPoolSize(): Long = { maxMemory - math.min(storagePool.memoryUsed, storageRegionSize) } //获取内存空间 executionPool.acquireMemory( numBytes, taskAttemptId, maybeGrowExecutionPool, () => computeMaxExecutionPoolSize) }
这里有两个嵌套方法,一个用于尝试回收存储内存并扩展执行内存,一个用于计算执行内存剩余的最大空间。
2. 存储内存空间获取
override def acquireStorageMemory( blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean = synchronized { assertInvariants() assert(numBytes >= 0) val (executionPool, storagePool, maxMemory) = memoryMode match { case MemoryMode.ON_HEAP => ( onHeapExecutionMemoryPool, onHeapStorageMemoryPool, maxOnHeapStorageMemory) case MemoryMode.OFF_HEAP => ( offHeapExecutionMemoryPool, offHeapStorageMemoryPool, maxOffHeapStorageMemory) } //所需空间>全部存储空间,此情况不可能满足需求 if (numBytes > maxMemory) { // Fail fast if the block simply won't fit logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " + s"memory limit ($maxMemory bytes)") return false } // 所需空间不足,尝试“挪用”部分执行内存 if (numBytes > storagePool.memoryFree) { // There is not enough free memory in the storage pool, so try to borrow free memory from // the execution pool. // 计算可释放空间的最小值 val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes - storagePool.memoryFree) executionPool.decrementPoolSize(memoryBorrowedFromExecution) storagePool.incrementPoolSize(memoryBorrowedFromExecution) } storagePool.acquireMemory(blockId, numBytes) }
以上过程基本与执行内存获取类似,基本过程是回收执行内存用于扩展存储内存。
现在我们梳理一下:MemoryManager主要有维护了Spark的内存,分为执行计算内存和存储内存,每个内存又包括堆内和堆外内存空间。它主要提供了两个入口方法用于获取执行内存和存储内存。实际上,内存的获取都是由执行内存和存储内存对应的MemoryPool来完成的。下一次我们就深入看一下MemoryPool如何进行某个内存空间的管理。