主要對(duì) Executor 的內(nèi)存管理進(jìn)行分析,下文中的 Spark 內(nèi)存均特指 Executor 的內(nèi)存
堆內(nèi)內(nèi)存和堆外內(nèi)存
作為一個(gè) JVM 進(jìn)程,Executor 的內(nèi)存管理建立在 JVM 的內(nèi)存管理之上,此外spark還引入了堆外內(nèi)存(不在JVM中的內(nèi)存),在spark中是指不屬于該executor的內(nèi)存。
堆內(nèi)內(nèi)存:
由 JVM 控制,由GC(垃圾回收)進(jìn)行內(nèi)存回收堆外內(nèi)存:
不受 JVM 控制,可以自由分配
堆外內(nèi)存的優(yōu)點(diǎn): 減少了垃圾回收的工作。
堆外內(nèi)存的缺點(diǎn):
- 堆外內(nèi)存難以控制,如果內(nèi)存泄漏,那么很難排查
- 堆外內(nèi)存相對(duì)來(lái)說(shuō),不適合存儲(chǔ)很復(fù)雜的對(duì)象。一般簡(jiǎn)單的對(duì)象或者扁平化的比較適合。
堆內(nèi)內(nèi)存
堆內(nèi)內(nèi)存的大小,由 Spark 應(yīng)用程序啟動(dòng)時(shí)的 executor-memory 或 spark.executor.memory 參數(shù)配置,這些配置在 spark-env.sh 配置文件中。
Executor 內(nèi)運(yùn)行的并發(fā)任務(wù)共享 JVM 堆內(nèi)內(nèi)存,這些內(nèi)存被規(guī)劃為 存儲(chǔ)(Storage)內(nèi)存 和 執(zhí)行(Execution)內(nèi)存
Storage 內(nèi)存:
用于存儲(chǔ) RDD 的緩存數(shù)據(jù) 和 廣播(Broadcast)數(shù)據(jù),主要用于存儲(chǔ) spark 的 cache 數(shù)據(jù),例如RDD的緩存Execution 內(nèi)存:
執(zhí)行 Shuffle 時(shí)占用的內(nèi)存,主要用于存放 Shuffle、Join、Sort 等計(jì)算過(guò)程中的臨時(shí)數(shù)據(jù)用戶(hù)內(nèi)存(User Memory):
主要用于存儲(chǔ) RDD 轉(zhuǎn)換操作所需要的數(shù)據(jù),例如 RDD 依賴(lài)等信息預(yù)留內(nèi)存(Reserved Memory):
系統(tǒng)預(yù)留內(nèi)存,會(huì)用來(lái)存儲(chǔ)Spark內(nèi)部對(duì)象。
剩余的部分不做特殊規(guī)劃,那些 Spark 內(nèi)部的對(duì)象實(shí)例,或者用戶(hù)定義的 Spark 應(yīng)用程序中的對(duì)象實(shí)例,均占用剩余的空間。
Spark 對(duì)堆內(nèi)內(nèi)存的管理是一種邏輯上的”規(guī)劃式”的管理,因?yàn)閷?duì)象實(shí)例占用內(nèi)存的申請(qǐng)和釋放都由 JVM 完成,Spark 只能在申請(qǐng)后和釋放前記錄這些內(nèi)存。
對(duì)于 Spark 中序列化的對(duì)象,由于是字節(jié)流的形式,其占用的內(nèi)存大小可直接計(jì)算,而對(duì)于非序列化的對(duì)象,其占用的內(nèi)存是通過(guò)周期性地采樣近似估算而得,這種方法降低了時(shí)間開(kāi)銷(xiāo)但是有可能誤差較大,導(dǎo)致某一時(shí)刻的實(shí)際內(nèi)存有可能遠(yuǎn)遠(yuǎn)超出預(yù)期。此外,在被 Spark 標(biāo)記為釋放的對(duì)象實(shí)例,很有可能在實(shí)際上并沒(méi)有被 JVM 回收,導(dǎo)致實(shí)際可用的內(nèi)存小于 Spark 記錄的可用內(nèi)存。所以 Spark 并不能準(zhǔn)確記錄實(shí)際可用的堆內(nèi)內(nèi)存,從而也就無(wú)法完全避免內(nèi)存溢出(OOM, Out of Memory)的異常。
Spark 通過(guò)對(duì)存儲(chǔ)內(nèi)存和執(zhí)行內(nèi)存各自獨(dú)立的規(guī)劃管理,可以決定是否要在存儲(chǔ)內(nèi)存里緩存新的 RDD,以及是否為新的任務(wù)分配執(zhí)行內(nèi)存。
如果當(dāng)前 Exector 內(nèi)存不夠用,可以分配到其他內(nèi)存占用小的 Exector 上。
在一定程度上可以提升其他 Exector 的內(nèi)存利用率,減少當(dāng)前 Exector 異常的出現(xiàn)。
堆外內(nèi)存
為了進(jìn)一步優(yōu)化內(nèi)存的使用以及提高 Shuffle 時(shí)排序的效率,Spark 1.6 引入了堆外(Off-heap)內(nèi)存,使之可以直接在工作節(jié)點(diǎn)的系統(tǒng)內(nèi)存中開(kāi)辟空間,存儲(chǔ)經(jīng)過(guò)序列化的二進(jìn)制數(shù)據(jù)。
這種模式不在 JVM 內(nèi)申請(qǐng)內(nèi)存,而是調(diào)用 Java 的 unsafe 相關(guān) API 進(jìn)行諸如 C 語(yǔ)言里面的 malloc() 直接向操作系統(tǒng)申請(qǐng)內(nèi)存,由于這種方式不進(jìn)過(guò) JVM 內(nèi)存管理,所以可以避免頻繁的 GC,這種內(nèi)存申請(qǐng)的缺點(diǎn)是必須自己編寫(xiě)內(nèi)存申請(qǐng)和釋放的邏輯。
Spark 可以直接操作系統(tǒng)堆外內(nèi)存,減少了不必要的內(nèi)存開(kāi)銷(xiāo),以及頻繁的 GC 掃描和回收,提升了處理性能。堆外內(nèi)存可以被精確地申請(qǐng)和釋放,而且序列化的數(shù)據(jù)占用的空間可以被精確計(jì)算,所以相比堆內(nèi)內(nèi)存來(lái)說(shuō)降低了管理的難度,也降低了誤差。
在默認(rèn)情況下堆外內(nèi)存并不啟用,可通過(guò)配置 spark.memory.offHeap.enabled 參數(shù)啟用,并由 spark.memory.offHeap.size 參數(shù)設(shè)定堆外空間的大小,單位為字節(jié)。堆外內(nèi)存與堆內(nèi)內(nèi)存的劃分方式相同,所有運(yùn)行中的并發(fā)任務(wù)共享存儲(chǔ)內(nèi)存和執(zhí)行內(nèi)存。
如果堆外內(nèi)存被啟用,那么 Executor 內(nèi)將同時(shí)存在堆內(nèi)和堆外內(nèi)存,兩者的使用互補(bǔ)影響,這個(gè)時(shí)候 Executor 中的 Execution 內(nèi)存是堆內(nèi)的 Execution 內(nèi)存和堆外的 Execution 內(nèi)存之和,同理,Storage 內(nèi)存也一樣。相比堆內(nèi)內(nèi)存,堆外內(nèi)存只區(qū)分 Execution 內(nèi)存和 Storage 內(nèi)存。
spark內(nèi)存分配
靜態(tài)內(nèi)存管理
在 Spark 最初采用的靜態(tài)內(nèi)存管理機(jī)制下,存儲(chǔ)內(nèi)存、執(zhí)行內(nèi)存和其他內(nèi)存的大小在 Spark 應(yīng)用程序運(yùn)行期間均為固定的,但用戶(hù)可以應(yīng)用程序啟動(dòng)前進(jìn)行配置,堆內(nèi)內(nèi)存的分配如圖 所示:

可用堆內(nèi)內(nèi)存空間計(jì)算:
可用的存儲(chǔ)內(nèi)存 = systemMaxMemory * spark.storage.memoryFraction * spark.storage.safetyFraction
可用的執(zhí)行內(nèi)存 = systemMaxMemory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction
靜態(tài)內(nèi)存管理圖示——堆外

統(tǒng)一內(nèi)存管理
Spark 1.6 之后引入的統(tǒng)一內(nèi)存管理機(jī)制,與靜態(tài)內(nèi)存管理的區(qū)別在于存儲(chǔ)內(nèi)存和執(zhí)行內(nèi)存共享同一塊空間,可以動(dòng)態(tài)占用對(duì)方的空閑區(qū)域,如圖 所示
統(tǒng)一內(nèi)存管理圖示——堆內(nèi)

reservedMemory 在 Spark 2.2.1 中是寫(xiě)死的
統(tǒng)一內(nèi)存管理圖示——堆外

其中最重要的優(yōu)化在于動(dòng)態(tài)占用機(jī)制,其規(guī)則如下:
程序提交的時(shí)候我們都會(huì)設(shè)定基本的 Execution 內(nèi)存和 Storage 內(nèi)存區(qū)域(通過(guò) spark.memory.storageFraction 參數(shù)設(shè)置);
在程序運(yùn)行時(shí),如果雙方的空間都不足時(shí),則存儲(chǔ)到硬盤(pán);將內(nèi)存中的塊存儲(chǔ)到磁盤(pán)的策略是按照 LRU 規(guī)則進(jìn)行的。若己方空間不足而對(duì)方空余時(shí),可借用對(duì)方的空間;(存儲(chǔ)空間不足是指不足以放下一個(gè)完整的 Block)
Execution 內(nèi)存的空間被對(duì)方占用后,可讓對(duì)方將占用的部分轉(zhuǎn)存到硬盤(pán),然后"歸還"借用的空間,Storage 占用 Execution 內(nèi)存的數(shù)據(jù)被回收后,重新計(jì)算即可恢復(fù)。
Storage 內(nèi)存的空間被對(duì)方占用后,目前的實(shí)現(xiàn)是無(wú)法讓對(duì)方"歸還",因?yàn)樾枰紤] Shuffle 過(guò)程中的很多因素,實(shí)現(xiàn)起來(lái)較為復(fù)雜;而且 Shuffle 過(guò)程產(chǎn)生的文件在后面一定會(huì)被使用到。
動(dòng)態(tài)占用機(jī)制圖示

Task 之間內(nèi)存分布
為了更好地使用使用內(nèi)存,Executor 內(nèi)運(yùn)行的 Task 之間共享著 Execution 內(nèi)存。具體的,Spark 內(nèi)部維護(hù)了一個(gè) HashMap 用于記錄每個(gè) Task 占用的內(nèi)存。當(dāng) Task 需要在 Execution 內(nèi)存區(qū)域申請(qǐng) numBytes 內(nèi)存,其先判斷 HashMap 里面是否維護(hù)著這個(gè) Task 的內(nèi)存使用情況,如果沒(méi)有,則將這個(gè) Task 內(nèi)存使用置為0,并且以 TaskId 為 key,內(nèi)存使用為 value 加入到 HashMap 里面。之后為這個(gè) Task 申請(qǐng) numBytes 內(nèi)存,如果 Execution 內(nèi)存區(qū)域正好有大于 numBytes 的空閑內(nèi)存,則在 HashMap 里面將當(dāng)前 Task 使用的內(nèi)存加上 numBytes,然后返回;如果當(dāng)前 Execution 內(nèi)存區(qū)域無(wú)法申請(qǐng)到每個(gè) Task 最小可申請(qǐng)的內(nèi)存,則當(dāng)前 Task 被阻塞,直到有其他任務(wù)釋放了足夠的執(zhí)行內(nèi)存,該任務(wù)才可以被喚醒。每個(gè) Task 可以使用 Execution 內(nèi)存大小范圍為 1/2N ~ 1/N,其中 N 為當(dāng)前 Executor 內(nèi)正在運(yùn)行的 Task 個(gè)數(shù)。一個(gè) Task 能夠運(yùn)行必須申請(qǐng)到最小內(nèi)存為 (1/2N * Execution 內(nèi)存);當(dāng) N = 1 的時(shí)候,Task 可以使用全部的 Execution 內(nèi)存。
比如如果 Execution 內(nèi)存大小為 10GB,當(dāng)前 Executor 內(nèi)正在運(yùn)行的 Task 個(gè)數(shù)為5,則該 Task 可以申請(qǐng)的內(nèi)存范圍為 10 / (2 * 5) ~ 10 / 5,也就是 1GB ~ 2GB的范圍。
示例
1. 只用了堆內(nèi)內(nèi)存
現(xiàn)在我們提交的 Spark 作業(yè)關(guān)于內(nèi)存的配置如下:
--executor-memory 18g
由于沒(méi)有設(shè)置 spark.memory.fraction(Storage 和 Execution 共用內(nèi)存 占可用內(nèi)存的比例,默認(rèn)為0.6) 和 spark.memory.storageFraction(Storage 內(nèi)存占 Storage 和 Execution 共用內(nèi)存 比例,默認(rèn)0.5) 參數(shù),我們可以看到 Spark UI 關(guān)于 Storage Memory 的顯示如下:

上圖很清楚地看到 Storage Memory 的可用內(nèi)存是 10.1GB,這個(gè)數(shù)是咋來(lái)的呢?根據(jù)前面的規(guī)則,我們可以得出以下的計(jì)算:
systemMemory = spark.executor.memory
reservedMemory = 300MB
usableMemory = systemMemory - reservedMemory
StorageMemory= usableMemory * spark.memory.fraction * spark.memory.storageFraction
把數(shù)據(jù)代進(jìn)去,得出結(jié)果為:5.312109375 GB。
和上面的 10.1GB 對(duì)不上。為什么呢?這是因?yàn)?Spark UI 上面顯示的 Storage Memory 可用內(nèi)存其實(shí)等于 Execution 內(nèi)存和 Storage 內(nèi)存之和,也就是 usableMemory * spark.memory.fraction
我們?cè)O(shè)置了 --executor-memory 18g,但是 Spark 的 Executor 端通過(guò) Runtime.getRuntime.maxMemory 拿到的內(nèi)存其實(shí)沒(méi)這么大,只有 17179869184 字節(jié),這個(gè)數(shù)據(jù)是怎么計(jì)算的?
Runtime.getRuntime.maxMemory 是程序能夠使用的最大內(nèi)存,其值會(huì)比實(shí)際配置的執(zhí)行器內(nèi)存的值小。這是因?yàn)閮?nèi)存分配池的堆部分分為 Eden,Survivor 和 Tenured 三部分空間,而這里面一共包含了兩個(gè) Survivor 區(qū)域,而這兩個(gè) Survivor 區(qū)域在任何時(shí)候我們只能用到其中一個(gè),所以我們可以使用下面的公式進(jìn)行描述:
ExecutorMemory = Eden + 2 * Survivor + Tenured
Runtime.getRuntime.maxMemory = Eden + Survivor + Tenured
2. 用了堆內(nèi)和堆外內(nèi)存
現(xiàn)在如果我們啟用了堆外內(nèi)存,情況咋樣呢?我們的內(nèi)存相關(guān)配置如下:
spark.executor.memory 18g
spark.memory.offHeap.enabled true
spark.memory.offHeap.size 10737418240
從上面可以看出,堆外內(nèi)存為 10GB,現(xiàn)在 Spark UI 上面顯示的 Storage Memory 可用內(nèi)存為 20.9GB,如下:
總結(jié)
憑借統(tǒng)一內(nèi)存管理機(jī)制,Spark 在一定程度上提高了堆內(nèi)和堆外內(nèi)存資源的利用率,降低了開(kāi)發(fā)者維護(hù) Spark 內(nèi)存的難度,但并不意味著開(kāi)發(fā)者可以高枕無(wú)憂(yōu)。譬如,所以如果存儲(chǔ)內(nèi)存的空間太大或者說(shuō)緩存的數(shù)據(jù)過(guò)多,反而會(huì)導(dǎo)致頻繁的 GC 垃圾回收,降低任務(wù)執(zhí)行時(shí)的性能。
使用建議
首先,建議使用新模式,所以接下來(lái)的配置建議都是基于新模式的。
spark.memory.fraction:如果 application spill 或踢除 block 發(fā)生的頻率過(guò)高(可通過(guò)日志觀察),可以適當(dāng)調(diào)大該值,這樣 execution 和 storage 的總可用內(nèi)存變大,能有效減少發(fā)生 spill 和踢除 block 的頻率
spark.memory.storageFraction:為 storage 占 storage、execution 內(nèi)存總和的比例。雖然新方案中 storage 和 execution 之間可以發(fā)生內(nèi)存借用,但總的來(lái)說(shuō),spark.memory.storageFraction 越大,運(yùn)行過(guò)程中,storage 能用的內(nèi)存就會(huì)越多。所以,如果你的 app 是更吃 storage 內(nèi)存的,把這個(gè)值調(diào)大一點(diǎn);如果是更吃 execution 內(nèi)存的,把這個(gè)值調(diào)小一點(diǎn)
spark.memory.offHeap.enabled:堆外內(nèi)存最大的好處就是可以避免 GC,如果你希望使用堆外內(nèi)存,將該值置為 true 并設(shè)置堆外內(nèi)存的大小,即設(shè)置
spark.memory.offHeap.size,這是必須的
另外,需要特別注意的是,堆外內(nèi)存的大小不會(huì)算在 executor memory 中,也就是說(shuō)加入你設(shè)置了 --executor memory 10G 和 -spark.memory.offHeap.size=10G,那總共可以使用 20G 內(nèi)存,堆內(nèi)和堆外分別 10G。
