目錄
前言
在上一篇文章的最后,我們閱讀了內(nèi)存管理器MemoryManager抽象類的源碼,并且提到它有兩種實現(xiàn):靜態(tài)內(nèi)存管理器StaticMemoryManager、統(tǒng)一內(nèi)存管理器UnifiedMemoryManager。其中,StaticMemoryManager是隨著Spark誕生就存在的,UnifiedMemoryManager則是從Spark 1.6版本開始服役,并且后者是目前Spark Core中的默認內(nèi)存管理器,前者已經(jīng)標記為過時。雖然StaticMemoryManager已經(jīng)不怎么用了,但它的邏輯相對簡單,適合用來開胃,本文先來研究它??垂僖部梢韵葟土曇幌律掀恼玛P(guān)于MemoryManager的部分。
前面也已說過,了解內(nèi)存管理機制是Spark內(nèi)存調(diào)優(yōu)的基礎(chǔ),因此本篇及下一篇內(nèi)容都非常重要,同樣也可以當做面試知識點手冊來讀。
MemoryManager的初始化
在之前講SparkEnv時,略去了MemoryManager相關(guān)的初始化代碼,它位于SparkEnv.create()方法中,如下。
代碼#24.1 - SparkEnv.create()方法中初始化MemoryManager
val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
val memoryManager: MemoryManager =
if (useLegacyMemoryManager) {
new StaticMemoryManager(conf, numUsableCores)
} else {
UnifiedMemoryManager(conf, numUsableCores)
}
可見,如果SparkConf中spark.memory.useLegacyMode配置項為true,則使用較老的StaticMemoryManager,反之默認使用較新的UnifiedMemoryManager。
靜態(tài)內(nèi)存管理器StaticMemoryManager
構(gòu)造方法
代碼#24.2 - o.a.s.memory.StaticMemoryManager類的構(gòu)造方法與屬性成員
private[spark] class StaticMemoryManager(
conf: SparkConf,
maxOnHeapExecutionMemory: Long,
override val maxOnHeapStorageMemory: Long,
numCores: Int)
extends MemoryManager(
conf,
numCores,
maxOnHeapStorageMemory,
maxOnHeapExecutionMemory) {
def this(conf: SparkConf, numCores: Int) {
this(
conf,
StaticMemoryManager.getMaxExecutionMemory(conf),
StaticMemoryManager.getMaxStorageMemory(conf),
numCores)
}
offHeapExecutionMemoryPool.incrementPoolSize(offHeapStorageMemoryPool.poolSize)
offHeapStorageMemoryPool.decrementPoolSize(offHeapStorageMemoryPool.poolSize)
private val maxUnrollMemory: Long = {
(maxOnHeapStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
}
// ......
}
SparkEnv初始化MemoryManager時,都不是直接調(diào)用主構(gòu)造方法,而是調(diào)用輔助構(gòu)造方法。也就是說,堆內(nèi)存儲內(nèi)存總量maxOnHeapStorageMemory通過調(diào)用StaticMemoryManager伴生對象中的getMaxStorageMemory()方法來計算,堆內(nèi)執(zhí)行內(nèi)存總量maxOnHeapExecutionMemory則通過調(diào)用getMaxExecutionMemory()方法來計算。
在構(gòu)造方法中還有兩句代碼,它負責把原本屬于堆外存儲池的空間轉(zhuǎn)移到堆外執(zhí)行池。也就是說StaticMemoryManager只有堆外執(zhí)行內(nèi)存,沒有堆外存儲內(nèi)存。至于maxUnrollMemory,稍后再說。
計算堆內(nèi)存儲/執(zhí)行內(nèi)存總量
這兩個方法位于伴生對象中,代碼如下。
代碼#24.3 - o.a.s.memory.StaticMemoryManager.getMaxStorageMemory()/getMaxExecutionMemory()方法
private def getMaxStorageMemory(conf: SparkConf): Long = {
val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
(systemMaxMemory * memoryFraction * safetyFraction).toLong
}
private def getMaxExecutionMemory(conf: SparkConf): Long = {
val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
if (systemMaxMemory < MIN_MEMORY_BYTES) {
throw new IllegalArgumentException(s"System memory $systemMaxMemory must " +
s"be at least $MIN_MEMORY_BYTES. Please increase heap size using the --driver-memory " +
s"option or spark.driver.memory in Spark configuration.")
}
if (conf.contains("spark.executor.memory")) {
val executorMemory = conf.getSizeAsBytes("spark.executor.memory")
if (executorMemory < MIN_MEMORY_BYTES) {
throw new IllegalArgumentException(s"Executor memory $executorMemory must be at least " +
s"$MIN_MEMORY_BYTES. Please increase executor memory using the " +
s"--executor-memory option or spark.executor.memory in Spark configuration.")
}
}
val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
(systemMaxMemory * memoryFraction * safetyFraction).toLong
}
getMaxStorageMemory()方法的執(zhí)行流程為:
- 通過Runtime.maxMemory()這個native方法取得當前JVM可用的最大內(nèi)存(堆內(nèi)存)。spark.testing.memory參數(shù)是測試參數(shù),幾乎不用。
- 根據(jù)spark.storage.memoryFraction配置項,取得存儲內(nèi)存占堆內(nèi)內(nèi)存的比例,默認0.6(60%)。
- 根據(jù)spark.storage.safetyFraction配置項,取得存儲內(nèi)存的安全比例,默認0.9(90%)。
- 將1~3步中的三個量相乘并取整,即得到堆內(nèi)存儲內(nèi)存的總量。
前面也已經(jīng)提過,存儲內(nèi)存中有一塊特殊用途的區(qū)域,叫展開內(nèi)存。它占存儲內(nèi)存的比例由spark.storage.unrollFraction配置項指定,默認值0.2(20%)。
getMaxExecutionMemory()方法的執(zhí)行流程類似,不過它會預(yù)先校驗Driver和Executor的內(nèi)存量,確保有32MB以上。另外,執(zhí)行內(nèi)存占堆內(nèi)內(nèi)存的比例由配置項spark.shuffle.memoryFraction指定,默認0.2(20%);執(zhí)行內(nèi)存的安全比例則由spark.shuffle.safetyFraction指定,默認0.8(80%)。
除了上邊的兩塊內(nèi)存之外,堆內(nèi)內(nèi)存還會剩余(1 - spark.storage.memoryFraction - spark.shuffle.memoryFraction)比例的空間,默認占20%,它就用來存儲用戶代碼中自定義的數(shù)據(jù)結(jié)構(gòu),以及Spark內(nèi)部的一些元數(shù)據(jù)。
看官可能會問,為什么除了實際占比之外,還會有一個安全比例呢?我們已經(jīng)知道,Spark中的對象可以序列化存儲,也可以非序列化存儲。對于序列化對象,可以通過其字節(jié)流的長度獲知其大小。而對于非序列化對象,其占用的內(nèi)存就只能通過估算得到,與實際情況可能出入較大。另外,MemoryManager申請的內(nèi)存空間可能還未實際分配,而標記為釋放的內(nèi)存空間也可能并未被JVM實際GC掉,存在滯后性??傊?,Spark并不能準確地跟蹤堆內(nèi)內(nèi)存的占用量,為了防止偏差過大出現(xiàn)OOM,就必須預(yù)留一些緩沖空間了。默認會預(yù)留10%的存儲內(nèi)存、20%的執(zhí)行內(nèi)存作為緩沖。
內(nèi)存申請方法
StaticMemoryManager覆寫了MemoryManager中定義的申請內(nèi)存的3個方法,其源碼如下。
代碼#24.4 - o.a.s.memory.StaticMemoryManager中的內(nèi)存申請方法
override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
memoryMode: MemoryMode): Boolean = synchronized {
require(memoryMode != MemoryMode.OFF_HEAP,
"StaticMemoryManager does not support off-heap storage memory")
if (numBytes > maxOnHeapStorageMemory) {
logInfo(s"Will not store $blockId as the required space ($numBytes bytes) exceeds our " +
s"memory limit ($maxOnHeapStorageMemory bytes)")
false
} else {
onHeapStorageMemoryPool.acquireMemory(blockId, numBytes)
}
}
override def acquireUnrollMemory(
blockId: BlockId,
numBytes: Long,
memoryMode: MemoryMode): Boolean = synchronized {
require(memoryMode != MemoryMode.OFF_HEAP,
"StaticMemoryManager does not support off-heap unroll memory")
val currentUnrollMemory = onHeapStorageMemoryPool.memoryStore.currentUnrollMemory
val freeMemory = onHeapStorageMemoryPool.memoryFree
val maxNumBytesToFree = math.max(0, maxUnrollMemory - currentUnrollMemory - freeMemory)
val numBytesToFree = math.max(0, math.min(maxNumBytesToFree, numBytes - freeMemory))
onHeapStorageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree)
}
private[memory]
override def acquireExecutionMemory(
numBytes: Long,
taskAttemptId: Long,
memoryMode: MemoryMode): Long = synchronized {
memoryMode match {
case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
}
}
可見,它們基本上是代理了各個MemoryPool的acquireMemory()方法,并且存儲內(nèi)存只在堆內(nèi)申請,執(zhí)行內(nèi)存可以根據(jù)MemoryMode的不同在堆內(nèi)或堆外申請。
對于展開內(nèi)存還有一些特殊處理:由于將RDD展開為塊需要占用連續(xù)的存儲空間,在必要的情況下需要釋放其他緩存的空間,以放下這個塊。釋放空間的上限為“最大展開內(nèi)存 - 現(xiàn)占用的展開內(nèi)存 - 空閑存儲內(nèi)存”,之所以要規(guī)定這個上限,是為了防止展開一個超大的塊導致所有緩存都陣亡(blow away the entire cache)。
靜態(tài)內(nèi)存管理布局圖解
只用文字描述過于抽象,所以用下圖來形象地說明Spark的靜態(tài)內(nèi)存管理布局。

總結(jié)
經(jīng)過上面的分析,我們可以看出,StaticMemoryManager之所以名為“靜態(tài)”,是因為它的內(nèi)存區(qū)域都是由各個比例參數(shù)(fraction)規(guī)定好的。這樣實現(xiàn)起來簡單,但是在復雜業(yè)務(wù)場景或者參數(shù)設(shè)定不當時,它容易造成“冰火兩重天”的情況,即一方內(nèi)存過剩而另一方內(nèi)存緊張。在Spark 1.6版本提出的UnifiedMemoryManager致力于解決這個問題,并且是現(xiàn)在的事實標準,當然它也更加復雜。下一篇文章再來仔細地探索它,晚安。