目錄
前言
在前文的末尾,我們分析了靜態(tài)內(nèi)存管理器StaticMemoryManager的優(yōu)缺點(diǎn),并指出統(tǒng)一內(nèi)存管理器UnifiedMemoryManager能夠彌補(bǔ)它的缺點(diǎn),同時(shí)也是目前Spark內(nèi)存管理的事實(shí)標(biāo)準(zhǔn)。本文盡可能深入地剖析UnifiedMemoryManager的具體實(shí)現(xiàn)。
統(tǒng)一內(nèi)存管理器UnifiedMemoryManager
UnifiedMemoryManager與StaticMemoryManager相比,主要有兩點(diǎn)改進(jìn):
- 存儲(chǔ)內(nèi)存和執(zhí)行內(nèi)存不再是靠比例定死的,而是在一定條件下可以相互借用,更加靈活;
- 存儲(chǔ)內(nèi)存和執(zhí)行內(nèi)存都可以在堆外分配了。
按照慣例,我們?nèi)匀粡钠錁?gòu)造開(kāi)始看起。
構(gòu)造方法
代碼#25.1 - o.a.s.memory.UnifiedMemoryManager類(lèi)的構(gòu)造
private[spark] class UnifiedMemoryManager private[memory] (
conf: SparkConf,
val maxHeapMemory: Long,
onHeapStorageRegionSize: Long,
numCores: Int)
extends MemoryManager(
conf,
numCores,
onHeapStorageRegionSize,
maxHeapMemory - onHeapStorageRegionSize) {
private def assertInvariants(): Unit = {
assert(onHeapExecutionMemoryPool.poolSize + onHeapStorageMemoryPool.poolSize == maxHeapMemory)
assert(
offHeapExecutionMemoryPool.poolSize + offHeapStorageMemoryPool.poolSize == maxOffHeapMemory)
}
assertInvariants()
// ......
}
其構(gòu)造方法參數(shù)與StaticMemoryManager相比有微小的變化,需要傳入堆內(nèi)內(nèi)存總量maxHeapMemory,以及堆內(nèi)存儲(chǔ)內(nèi)存空間的量onHeapStorageRegionSize,堆內(nèi)執(zhí)行內(nèi)存空間的量就是兩者之差。另外,還會(huì)校驗(yàn)堆內(nèi)、堆外內(nèi)存池的大小,保證它們與規(guī)定的內(nèi)存總量對(duì)的上。
但是,在代碼#24.1初始化MemoryManager實(shí)現(xiàn)時(shí),調(diào)用的UnifiedMemoryManager構(gòu)造方法只有兩個(gè)參數(shù),這是因?yàn)槠浒樯鷮?duì)象實(shí)現(xiàn)了apply()方法。如果看官對(duì)Scala不太熟的話(huà),可以去翻翻Scala官方文檔,其中講述了apply()方法的具體作用。下面還是來(lái)看代碼。
計(jì)算內(nèi)存量
代碼#25.2 - o.a.s.memory.UnifiedMemoryManager.apply()方法
def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = {
val maxMemory = getMaxMemory(conf)
new UnifiedMemoryManager(
conf,
maxHeapMemory = maxMemory,
onHeapStorageRegionSize =
(maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong,
numCores = numCores)
}
可見(jiàn),Spark可利用的內(nèi)存總量(為避免混淆,下面叫“統(tǒng)一內(nèi)存”)是調(diào)用getMaxMemory()方法計(jì)算出來(lái)的,存儲(chǔ)內(nèi)存占統(tǒng)一內(nèi)存的初始比例(因?yàn)榭梢越栌?,所以是初始比例)由配置?xiàng)spark.memory.storageFraction決定,默認(rèn)值0.5。剩下的(1 - spark.memory.storageFraction)比例的內(nèi)存就是執(zhí)行內(nèi)存了。
getMaxMemory()方法的代碼如下。
代碼#25.3 - o.a.s.memory.UnifiedMemoryManager.getMaxMemory()方法
private def getMaxMemory(conf: SparkConf): Long = {
val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
val reservedMemory = conf.getLong("spark.testing.reservedMemory",
if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
val minSystemMemory = (reservedMemory * 1.5).ceil.toLong
if (systemMemory < minSystemMemory) {
throw new IllegalArgumentException(s"System memory $systemMemory must " +
s"be at least $minSystemMemory. Please increase heap size using the --driver-memory " +
s"option or spark.driver.memory in Spark configuration.")
}
if (conf.contains("spark.executor.memory")) {
val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
if (executorMemory < minSystemMemory) {
throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
s"$minSystemMemory. Please increase executor memory using the " +
s"--executor-memory option or spark.executor.memory in Spark configuration.")
}
}
val usableMemory = systemMemory - reservedMemory
val memoryFraction = conf.getDouble("spark.memory.fraction", 0.6)
(usableMemory * memoryFraction).toLong
}
該方法的執(zhí)行流程是:
- 通過(guò)Runtime.maxMemory()這個(gè)native方法取得當(dāng)前JVM可用的最大內(nèi)存(堆內(nèi)存)。spark.testing.memory參數(shù)是測(cè)試參數(shù),幾乎不用。
- 取得保留內(nèi)存的大小,由常量RESERVED_SYSTEM_MEMORY_BYTES定義,大小是300MB。這個(gè)量可以通過(guò)spark.testing.reservedMemory參數(shù)來(lái)改,但同樣幾乎不用。
- 以保留內(nèi)存的1.5倍(也就是450MB)作為Driver和Executor的最小內(nèi)存并校驗(yàn)之。
- 用堆內(nèi)存量減去保留內(nèi)存量,得到可用內(nèi)存。spark.memory.fraction配置項(xiàng)指定了統(tǒng)一內(nèi)存能實(shí)際利用的可用內(nèi)存比例,默認(rèn)值為0.6(60%),最終返回可用內(nèi)存與spark.memory.fraction的乘積。
可見(jiàn),UnifiedMemoryManager的堆內(nèi)內(nèi)存布局其實(shí)比StaticMemoryManager要簡(jiǎn)單。因?yàn)榇鎯?chǔ)內(nèi)存和執(zhí)行內(nèi)存之間的邊界是浮動(dòng)的,所以展開(kāi)內(nèi)存的比例以及安全比例都不再需要了。下面仍然先用一幅框圖來(lái)形象地表示出來(lái)。
統(tǒng)一內(nèi)存管理布局圖示
圖中Storage和Execution區(qū)域的界限是虛線(xiàn),并且有上下箭頭,表示它們之間的邊界是浮動(dòng)的。

然后,我們來(lái)看看申請(qǐng)內(nèi)存的方法的實(shí)現(xiàn),其中也包含了借用內(nèi)存的邏輯。
申請(qǐng)/借用存儲(chǔ)內(nèi)存
以下是重寫(xiě)的acquireStorageMemory()方法代碼。
代碼#25.4 - o.a.s.memory.UnifiedMemoryManager.acquireStorageMemory()方法
override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
memoryMode: MemoryMode): Boolean = synchronized {
assertInvariants()
assert(numBytes >= 0)
val (executionPool, storagePool, maxMemory) = memoryMode match {
case MemoryMode.ON_HEAP => (
onHeapExecutionMemoryPool,
onHeapStorageMemoryPool,
maxOnHeapStorageMemory)
case MemoryMode.OFF_HEAP => (
offHeapExecutionMemoryPool,
offHeapStorageMemoryPool,
maxOffHeapStorageMemory)
}
if (numBytes > maxMemory) {
logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
s"memory limit ($maxMemory bytes)")
return false
}
if (numBytes > storagePool.memoryFree) {
val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree,
numBytes - storagePool.memoryFree)
executionPool.decrementPoolSize(memoryBorrowedFromExecution)
storagePool.incrementPoolSize(memoryBorrowedFromExecution)
}
storagePool.acquireMemory(blockId, numBytes)
}
該方法首先根據(jù)MemoryMode決定是在堆內(nèi)還是在堆外申請(qǐng)存儲(chǔ)內(nèi)存。如果申請(qǐng)量沒(méi)有超過(guò)存儲(chǔ)內(nèi)存池的空閑量,就可以直接調(diào)用StorageMemoryPool.acquireMemory()方法申請(qǐng)內(nèi)存。但若存儲(chǔ)池中剩余的內(nèi)存不夠分配,就會(huì)試圖向執(zhí)行池借用內(nèi)存,借用的量為當(dāng)前執(zhí)行池空閑量與(塊大小 - 當(dāng)前存儲(chǔ)池空閑量)兩個(gè)量之間的較小者。然后會(huì)調(diào)用decrementPoolSize()方法縮小執(zhí)行池,調(diào)用incrementPoolSize()方法擴(kuò)大存儲(chǔ)池。
由上面的描述可以看出,借用內(nèi)存的過(guò)程是比較保守的,也就是一次只會(huì)借用當(dāng)時(shí)不足的內(nèi)存量,不會(huì)多借。并且借到的內(nèi)存有可能仍然無(wú)法滿(mǎn)足需求,這時(shí)就只能把原先存儲(chǔ)的一部分塊淘汰掉了,這部分邏輯之前提到過(guò),參見(jiàn)代碼#23.4。
在統(tǒng)一內(nèi)存管理機(jī)制下,展開(kāi)內(nèi)存雖然仍屬于存儲(chǔ)內(nèi)存的一部分,但不再有邊界,所以申請(qǐng)展開(kāi)內(nèi)存的方法與申請(qǐng)存儲(chǔ)內(nèi)存完全相同。
代碼#25.5 - o.a.s.memory.UnifiedMemoryManager.acquireUnrollMemory()方法
override def acquireUnrollMemory(
blockId: BlockId,
numBytes: Long,
memoryMode: MemoryMode): Boolean = synchronized {
acquireStorageMemory(blockId, numBytes, memoryMode)
}
申請(qǐng)/借用執(zhí)行內(nèi)存
這個(gè)流程就比較復(fù)雜一些了。
代碼#25.6 - o.a.s.memory.UnifiedMemoryManager.acquireExecutionMemory()方法
override private[memory] def acquireExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Long = synchronized {
assertInvariants()
assert(numBytes >= 0)
val (executionPool, storagePool, storageRegionSize, maxMemory) = memoryMode match {
case MemoryMode.ON_HEAP => (
onHeapExecutionMemoryPool,
onHeapStorageMemoryPool,
onHeapStorageRegionSize,
maxHeapMemory)
case MemoryMode.OFF_HEAP => (
offHeapExecutionMemoryPool,
offHeapStorageMemoryPool,
offHeapStorageMemory,
maxOffHeapMemory)
}
def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
if (extraMemoryNeeded > 0) {
val memoryReclaimableFromStorage = math.max(
storagePool.memoryFree,
storagePool.poolSize - storageRegionSize)
if (memoryReclaimableFromStorage > 0) {
val spaceToReclaim = storagePool.freeSpaceToShrinkPool(
math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
storagePool.decrementPoolSize(spaceToReclaim)
executionPool.incrementPoolSize(spaceToReclaim)
}
}
}
def computeMaxExecutionPoolSize(): Long = {
maxMemory - math.min(storagePool.memoryUsed, storageRegionSize)
}
executionPool.acquireMemory(
numBytes, taskAttemptId, maybeGrowExecutionPool, () => computeMaxExecutionPoolSize)
}
來(lái)看其內(nèi)部嵌套定義的方法maybeGrowExecutionPool(),它負(fù)責(zé)在執(zhí)行內(nèi)存不夠用時(shí),向存儲(chǔ)內(nèi)存池借用內(nèi)存,邏輯是:
- 計(jì)算有多少內(nèi)存能夠從存儲(chǔ)內(nèi)存池回收回來(lái)。該大小為存儲(chǔ)池的空閑空間與之前存儲(chǔ)池向執(zhí)行池借用過(guò)的內(nèi)存量(注意這個(gè)描述)的較大值。
- 如果有內(nèi)存可以回收,就調(diào)用存儲(chǔ)池的freeSpaceToShrinkPool()方法,淘汰掉一部分存儲(chǔ)塊。淘汰掉的塊所占內(nèi)存是理論可回收量與實(shí)際需要的執(zhí)行內(nèi)存之間的較小值。
- 調(diào)用decrementPoolSize()方法縮小存儲(chǔ)池,調(diào)用incrementPoolSize()方法擴(kuò)大執(zhí)行池。
從這個(gè)邏輯可以看出,執(zhí)行內(nèi)存不夠用時(shí),并不太像是“借用”(borrow)存儲(chǔ)內(nèi)存,而是“占用”或者“回收”(reclaim)。也就是說(shuō),執(zhí)行池可以“要求”存儲(chǔ)池淘汰掉自身持有的塊來(lái)歸還曾經(jīng)借用的空間,而存儲(chǔ)池并不會(huì)反過(guò)來(lái)要求執(zhí)行池也同樣歸還。其原因在于,存儲(chǔ)內(nèi)存中的塊可以方便地持久化到磁盤(pán),而執(zhí)行內(nèi)存中的塊大多為中間數(shù)據(jù)(比如Shuffle數(shù)據(jù)),比較難持久化,并且一旦淘汰掉這些中間數(shù)據(jù),整個(gè)Task很可能就會(huì)直接失敗,重算成本太高。
下圖示出默認(rèn)情況下(spark.memory.storageFraction=0.5),內(nèi)存借用的流程。

在代碼#25.6中還有一個(gè)嵌套定義的方法computeMaxExecutionPoolSize(),它用于獲得執(zhí)行內(nèi)存池的最大可能大小,比較簡(jiǎn)單,不再多說(shuō)。
這兩個(gè)嵌套方法都會(huì)當(dāng)做執(zhí)行內(nèi)存池的acquireMemory()方法的參數(shù),作為函數(shù)傳進(jìn)去。我們?cè)谇懊媛匀チ薊xecutionMemoryPool類(lèi)的解釋過(guò)程,所以現(xiàn)在只是大致瞅一眼與申請(qǐng)內(nèi)存有關(guān)的代碼。
代碼#25.7 - o.a.s.memory.ExecutionMemoryPool.acquireMemory()方法
private[memory] def acquireMemory(
numBytes: Long,
taskAttemptId: Long,
maybeGrowPool: Long => Unit = (additionalSpaceNeeded: Long) => Unit,
computeMaxPoolSize: () => Long = () => poolSize): Long = lock.synchronized {
assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")
if (!memoryForTask.contains(taskAttemptId)) {
memoryForTask(taskAttemptId) = 0L
lock.notifyAll()
}
while (true) {
val numActiveTasks = memoryForTask.keys.size
val curMem = memoryForTask(taskAttemptId)
maybeGrowPool(numBytes - memoryFree)
val maxPoolSize = computeMaxPoolSize()
val maxMemoryPerTask = maxPoolSize / numActiveTasks
val minMemoryPerTask = poolSize / (2 * numActiveTasks)
val maxToGrant = math.min(numBytes, math.max(0, maxMemoryPerTask - curMem))
val toGrant = math.min(maxToGrant, memoryFree)
if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
lock.wait()
} else {
memoryForTask(taskAttemptId) += toGrant
return toGrant
}
}
0L
}
可見(jiàn),ExecutionMemoryPool申請(qǐng)內(nèi)存時(shí)是循環(huán)申請(qǐng)的,每次都調(diào)用參數(shù)中的maybeGrowPool()函數(shù)(實(shí)際上就是上面講的maybeGrowExecutionPool()方法)來(lái)檢查是否需要從StorageMemoryPool回收空間。如果分配到的內(nèi)存比實(shí)際申請(qǐng)的少,或者該Task分配完畢之后的內(nèi)存仍然小于每個(gè)Task應(yīng)獲得內(nèi)存的最小值(即池子的大小除以當(dāng)前活動(dòng)Task數(shù)的兩倍),就調(diào)用MemoryManager對(duì)象的wait()方法阻塞,直到有其他Task釋放內(nèi)存為止,再進(jìn)入下一波循環(huán),直到申請(qǐng)到足夠的內(nèi)存。
關(guān)于ExecutionMemoryPool,之后還會(huì)詳細(xì)地解釋?zhuān)@里只要有個(gè)印象就行。
總結(jié)
本文詳細(xì)閱讀了UnifiedMemoryManager的相關(guān)源碼,對(duì)Spark的統(tǒng)一內(nèi)存管理機(jī)制有了深入的了解。當(dāng)然,統(tǒng)一內(nèi)存管理雖然先進(jìn),但也不代表萬(wàn)事無(wú)憂(yōu)。比如當(dāng)程序中cache了大量RDD并且不及時(shí)釋放時(shí),很多存儲(chǔ)內(nèi)存中的塊都無(wú)法被淘汰,會(huì)造成Shuffle階段頻繁Full GC,作業(yè)執(zhí)行變慢。關(guān)于Spark作業(yè)故障定位和內(nèi)存調(diào)優(yōu)的事情,不屬于這個(gè)系列的范疇,筆者會(huì)專(zhuān)門(mén)挑個(gè)時(shí)間寫(xiě)一篇全面的總結(jié)(就像之前寫(xiě)的兩篇Hive調(diào)優(yōu)總結(jié)一樣),最近實(shí)在是太忙了。
看官也可能會(huì)問(wèn),講內(nèi)存管理講了這么長(zhǎng)時(shí)間,結(jié)果都是在做bookkeeping的工作,內(nèi)存的實(shí)際分配和釋放邏輯到底在哪里呢?這個(gè)由內(nèi)存存儲(chǔ)類(lèi)MemoryStore及其附屬類(lèi)來(lái)實(shí)現(xiàn),下一篇文章就會(huì)講到了。