spark內(nèi)存使用大小管理
MemoryManager 的具體實現(xiàn)上,Spark 1.6 之后默認為統(tǒng)一管理(Unified Memory Manager)方式,1.6 之前采用的靜態(tài)管理(Static Memory Manager)方式仍被保留
- 堆內(nèi)內(nèi)存管理
對于 Spark 中序列化的對象,由于是字節(jié)流的形式,其占用的內(nèi)存大小可直接計算,而對于非序列化的對象,其占用的內(nèi)存是通過周期性地采樣近似估算而得,即并不是每次新增的數(shù)據(jù)項都會計算一次占用的內(nèi)存大小,這種方法降低了時間開銷但是有可能誤差較大,導(dǎo)致某一時刻的實際內(nèi)存有可能遠遠超出預(yù)期[2]。此外,在被 Spark 標記為釋放的對象實例,很有可能在實際上并沒有被 JVM 回收,導(dǎo)致實際可用的內(nèi)存小于 Spark 記錄的可用內(nèi)存。所以 Spark 并不能準確記錄實際可用的堆內(nèi)內(nèi)存,從而也就無法完全避免內(nèi)存溢出(OOM, Out of Memory)的異常。
- 堆外內(nèi)存
堆外內(nèi)存不再基于Tachyon,改為基于 JDK Unsafe API 實現(xiàn)(version > 2.0)
Spark 可以直接操作系統(tǒng)堆外內(nèi)存,減少了不必要的內(nèi)存開銷,以及頻繁的 GC 掃描和回收,提升了處理性能。堆外內(nèi)存可以被精確地申請和釋放,而且序列化的數(shù)據(jù)占用的空間可以被精確計算,所以相比堆內(nèi)內(nèi)存來說降低了管理的難度,也降低了誤差。
在默認情況下堆外內(nèi)存并不啟用,可通過配置 spark.memory.offHeap.enabled 參數(shù)啟用,并由 spark.memory.offHeap.size 參數(shù)設(shè)定堆外空間的大小。除了沒有 other 空間,堆外內(nèi)存與堆內(nèi)內(nèi)存的劃分方式相同,所有運行中的并發(fā)任務(wù)共享存儲內(nèi)存和執(zhí)行內(nèi)存。
spark內(nèi)存分布
- 堆內(nèi)內(nèi)存分布
spark內(nèi)存分布
safetyFraction 參數(shù),其意義在于在邏輯上預(yù)留出 1-safetyFraction 這么一塊保險區(qū)域,降低因?qū)嶋H內(nèi)存超出當前預(yù)設(shè)范圍而導(dǎo)致 OOM 的風險。值得注意的是,這個預(yù)留的保險區(qū)域僅僅是一種邏輯上的規(guī)劃,在具體使用時 Spark 并沒有區(qū)別對待,和"其它內(nèi)存"一樣交給了 JVM 去管理。
- 堆外內(nèi)存管理
可用的執(zhí)行內(nèi)存和存儲內(nèi)存占用的空間大小直接由參數(shù) spark.memory.storageFraction 決定,由于堆外內(nèi)存占用的空間可以被精確計算,所以無需再設(shè)定保險區(qū)域。
堆外內(nèi)存
統(tǒng)一內(nèi)存管理
Spark 1.6 之后引入的統(tǒng)一內(nèi)存管理機制,與靜態(tài)內(nèi)存管理的區(qū)別在于存儲內(nèi)存和執(zhí)行內(nèi)存共享同一塊空間,可以動態(tài)占用對方的空閑區(qū)域
內(nèi)存統(tǒng)一管理-堆內(nèi)內(nèi)存
內(nèi)存統(tǒng)一管理-堆外內(nèi)存
最重要的優(yōu)化在于動態(tài)占用機制,其規(guī)則如下:
- 設(shè)定基本的存儲內(nèi)存和執(zhí)行內(nèi)存區(qū)域(spark.storage.storageFraction 參數(shù)),該設(shè)定確定了雙方各自擁有的空間的范圍
- 雙方的空間都不足時,則存儲到硬盤;若己方空間不足而對方空余時,可借用對方的空間;(存儲空間不足是指不足以放下一個完整的 Block)
- 執(zhí)行內(nèi)存的空間被對方占用后,可讓對方將占用的部分轉(zhuǎn)存到硬盤,然后"歸還"借用的空間
-
存儲內(nèi)存的空間被對方占用后,無法讓對方"歸還",因為需要考慮 Shuffle 過程中的很多因素,實現(xiàn)起來較為復(fù)雜
Paste_Image.png
存儲內(nèi)存管理
RDD 的持久化由 Spark 的 Storage 模塊 [7] 負責,實現(xiàn)了 RDD 與物理存儲的解耦合。Storage 模塊負責管理 Spark 在計算過程中產(chǎn)生的數(shù)據(jù),將那些在內(nèi)存或磁盤、在本地或遠程存取數(shù)據(jù)的功能封裝了起來。在具體實現(xiàn)時 Driver 端和 Executor 端的 Storage 模塊構(gòu)成了主從式的架構(gòu),即 Driver 端的 BlockManager 為 Master,Executor 端的 BlockManager 為 Slave。Storage 模塊在邏輯上以 Block 為基本存儲單位,RDD 的每個 Partition 經(jīng)過處理后唯一對應(yīng)一個 Block(BlockId 的格式為 rdd_RDD-ID_PARTITION-ID )。Master 負責整個 Spark 應(yīng)用程序的 Block 的元數(shù)據(jù)信息的管理和維護,而 Slave 需要將 Block 的更新等狀態(tài)上報到 Master,同時接收 Master 的命令,例如新增或刪除一個 RDD。

存儲級別
class StorageLevel private(
private var _useDisk: Boolean, //磁盤
private var _useMemory: Boolean, //這里其實是指堆內(nèi)內(nèi)存
private var _useOffHeap: Boolean, //堆外內(nèi)存
private var _deserialized: Boolean, //是否為非序列化
private var _replication: Int = 1 //副本個數(shù)
)
通過對數(shù)據(jù)結(jié)構(gòu)的分析,可以看出存儲級別從三個維度定義了 RDD 的 Partition(同時也就是 Block)的存儲方式:
- 存儲位置:磁盤/堆內(nèi)內(nèi)存/堆外內(nèi)存。如 MEMORY_AND_DISK 是同時在磁盤和堆內(nèi)內(nèi)存上存儲,實現(xiàn)了冗余備份。OFF_HEAP 則是只在堆外內(nèi)存存儲,目前選擇堆外內(nèi)存時不能同時存儲到其他位置。
- 存儲形式:Block 緩存到存儲內(nèi)存后,是否為非序列化的形式。如 MEMORY_ONLY 是非序列化方式存儲,OFF_HEAP 是序列化方式存儲。
- 副本數(shù)量:大于 1 時需要遠程冗余備份到其他節(jié)點。如 DISK_ONLY_2 需要遠程備份 1 個副本。
spark RDD緩存的過程
RDD 在緩存到存儲內(nèi)存之前,Partition 中的數(shù)據(jù)一般以迭代器(Iterator)的數(shù)據(jù)結(jié)構(gòu)來訪問。通過 Iterator 可以獲取分區(qū)中每一條序列化或者非序列化的數(shù)據(jù)項(Record),這些 Record 的對象實例在邏輯上占用了 JVM 堆內(nèi)內(nèi)存的 other 部分的空間,同一 Partition 的不同 Record 的空間并不連續(xù)。
RDD 在緩存到存儲內(nèi)存之后,Partition 被轉(zhuǎn)換成 Block,Record 在堆內(nèi)或堆外存儲內(nèi)存中占用一塊連續(xù)的空間。將Partition由不連續(xù)的存儲空間轉(zhuǎn)換為連續(xù)存儲空間的過程,Spark稱之為"展開"(Unroll)。
Block 有序列化和非序列化兩種存儲格式,具體以哪種方式取決于該 RDD 的存儲級別。非序列化的 Block 以一種 DeserializedMemoryEntry 的數(shù)據(jù)結(jié)構(gòu)定義,用一個數(shù)組存儲所有的對象實例,序列化的 Block 則以 SerializedMemoryEntry的數(shù)據(jù)結(jié)構(gòu)定義,用字節(jié)緩沖區(qū)(ByteBuffer)來存儲二進制數(shù)據(jù)。每個 Executor 的 Storage 模塊用一個鏈式 Map 結(jié)構(gòu)(LinkedHashMap)來管理堆內(nèi)和堆外存儲內(nèi)存中所有的 Block 對象的實例,對這個 LinkedHashMap 新增和刪除間接記錄了內(nèi)存的申請和釋放。
因為不能保證存儲空間可以一次容納 Iterator 中的所有數(shù)據(jù),當前的計算任務(wù)在 Unroll 時要向 MemoryManager 申請足夠的 Unroll 空間來臨時占位,空間不足則 Unroll 失敗,空間足夠時可以繼續(xù)進行。對于序列化的 Partition,其所需的 Unroll 空間可以直接累加計算,一次申請。而非序列化的 Partition 則要在遍歷 Record 的過程中依次申請,即每讀取一條 Record,采樣估算其所需的 Unroll 空間并進行申請,空間不足時可以中斷,釋放已占用的 Unroll 空間。如果最終 Unroll 成功,當前 Partition 所占用的 Unroll 空間被轉(zhuǎn)換為正常的緩存 RDD 的存儲空間。
Spark Unroll
淘汰和落盤
由于同一個 Executor 的所有的計算任務(wù)共享有限的存儲內(nèi)存空間,當有新的 Block 需要緩存但是剩余空間不足且無法動態(tài)占用時,就要對 LinkedHashMap 中的舊 Block 進行淘汰(Eviction),而被淘汰的 Block 如果其存儲級別中同時包含存儲到磁盤的要求,則要對其進行落盤(Drop),否則直接刪除該 Block。
存儲內(nèi)存的淘汰規(guī)則為:
- 被淘汰的舊 Block 要與新 Block 的 MemoryMode 相同,即同屬于堆外或堆內(nèi)內(nèi)存
- 新舊 Block 不能屬于同一個 RDD,避免循環(huán)淘汰
- 舊 Block 所屬 RDD 不能處于被讀狀態(tài),避免引發(fā)一致性問題
- 遍歷 LinkedHashMap 中 Block,按照最近最少使用(LRU)的順序淘汰,直到滿足新 Block 所需的空間。其中 LRU 是 LinkedHashMap 的特性。
落盤的流程則比較簡單,如果其存儲級別符合_useDisk 為 true 的條件,再根據(jù)其_deserialized 判斷是否是非序列化的形式,若是則對其進行序列化,最后將數(shù)據(jù)存儲到磁盤,在 Storage 模塊中更新其信息。
執(zhí)行內(nèi)存管理
多任務(wù)間內(nèi)存分配
Executor 內(nèi)運行的任務(wù)同樣共享執(zhí)行內(nèi)存,Spark 用一個 HashMap 結(jié)構(gòu)保存了任務(wù)到內(nèi)存耗費的映射。每個任務(wù)可占用的執(zhí)行內(nèi)存大小的范圍為 1/(2N) ~ 1/N of total memory,其中 N 為當前 Executor 內(nèi)正在運行的任務(wù)的個數(shù)。每個任務(wù)在啟動之時,要向 MemoryManager 請求申請最少為 1/(2N) of total memory 的執(zhí)行內(nèi)存,如果不能被滿足要求則該任務(wù)被阻塞,直到有其他任務(wù)釋放了足夠的執(zhí)行內(nèi)存,該任務(wù)才可以被喚醒。
Shuffle 的內(nèi)存占用
執(zhí)行內(nèi)存主要用來存儲任務(wù)在執(zhí)行 Shuffle 時占用的內(nèi)存,Shuffle 是按照一定規(guī)則對 RDD 數(shù)據(jù)重新分區(qū)的過程,我們來看 Shuffle 的 Write 和 Read 兩階段對執(zhí)行內(nèi)存的使用:
- Shuffle Write
若在 map 端選擇普通的排序方式,會采用 ExternalSorter 進行外排,在內(nèi)存中存儲數(shù)據(jù)時主要占用堆內(nèi)執(zhí)行空間。
若在 map 端選擇 Tungsten 的排序方式,則采用 ShuffleExternalSorter 直接對以序列化形式存儲的數(shù)據(jù)排序,在內(nèi)存中存儲數(shù)據(jù)時可以占用堆外或堆內(nèi)執(zhí)行空間,取決于用戶是否開啟了堆外內(nèi)存以及堆外執(zhí)行內(nèi)存是否足夠。
- Shuffle Read
在對 reduce 端的數(shù)據(jù)進行聚合時,要將數(shù)據(jù)交給 Aggregator 處理,在內(nèi)存中存儲數(shù)據(jù)時占用堆內(nèi)執(zhí)行空間。
如果需要進行最終結(jié)果排序,則要將再次將數(shù)據(jù)交給 ExternalSorter 處理,占用堆內(nèi)執(zhí)行空間。
在 ExternalSorter 和 Aggregator 中,Spark 會使用一種叫 AppendOnlyMap 的哈希表在堆內(nèi)執(zhí)行內(nèi)存中存儲數(shù)據(jù),但在 Shuffle 過程中所有數(shù)據(jù)并不能都保存到該哈希表中,當這個哈希表占用的內(nèi)存會進行周期性地采樣估算,當其大到一定程度,無法再從 MemoryManager 申請到新的執(zhí)行內(nèi)存時,Spark 就會將其全部內(nèi)容存儲到磁盤文件中,這個過程被稱為溢存(Spill),溢存到磁盤的文件最后會被歸并(Merge)。
Shuffle Write 階段中用到的 Tungsten 是 Databricks 公司提出的對 Spark 優(yōu)化內(nèi)存和 CPU 使用的計劃
,解決了一些 JVM 在性能上的限制和弊端。Spark 會根據(jù) Shuffle 的情況來自動選擇是否采用 Tungsten 排序。Tungsten 采用的頁式內(nèi)存管理機制建立在 MemoryManager 之上,即 Tungsten 對執(zhí)行內(nèi)存的使用進行了一步的抽象,這樣在 Shuffle 過程中無需關(guān)心數(shù)據(jù)具體存儲在堆內(nèi)還是堆外。每個內(nèi)存頁用一個 MemoryBlock 來定義,并用 Object obj 和 long offset 這兩個變量統(tǒng)一標識一個內(nèi)存頁在系統(tǒng)內(nèi)存中的地址。堆內(nèi)的 MemoryBlock 是以 long 型數(shù)組的形式分配的內(nèi)存,其 obj 的值為是這個數(shù)組的對象引用,offset 是 long 型數(shù)組的在 JVM 中的初始偏移地址,兩者配合使用可以定位這個數(shù)組在堆內(nèi)的絕對地址;堆外的 MemoryBlock 是直接申請到的內(nèi)存塊,其 obj 為 null,offset 是這個內(nèi)存塊在系統(tǒng)內(nèi)存中的 64 位絕對地址。Spark 用 MemoryBlock 巧妙地將堆內(nèi)和堆外內(nèi)存頁統(tǒng)一抽象封裝,并用頁表(pageTable)管理每個 Task 申請到的內(nèi)存頁。
Tungsten 頁式管理下的所有內(nèi)存用 64 位的邏輯地址表示,由頁號和頁內(nèi)偏移量組成:
頁號:占 13 位,唯一標識一個內(nèi)存頁,Spark 在申請內(nèi)存頁之前要先申請空閑頁號。
頁內(nèi)偏移量:占 51 位,是在使用內(nèi)存頁存儲數(shù)據(jù)時,數(shù)據(jù)在頁內(nèi)的偏移地址。
有了統(tǒng)一的尋址方式,Spark 可以用 64 位邏輯地址的指針定位到堆內(nèi)或堆外的內(nèi)存,整個 Shuffle Write 排序的過程只需要對指針進行排序,并且無需反序列化,整個過程非常高效,對于內(nèi)存訪問效率和 CPU 使用效率帶來了明顯的提升。
Spark 的存儲內(nèi)存和執(zhí)行內(nèi)存有著截然不同的管理方式:對于存儲內(nèi)存來說,Spark 用一個 LinkedHashMap 來集中管理所有的 Block,Block 由需要緩存的 RDD 的 Partition 轉(zhuǎn)化而成;而對于執(zhí)行內(nèi)存,Spark 用 AppendOnlyMap 來存儲 Shuffle 過程中的數(shù)據(jù),在 Tungsten 排序中甚至抽象成為頁式內(nèi)存管理,開辟了全新的 JVM 內(nèi)存管理機制。
參考資源
Spark Memory Management
Spark Cluster Mode Overview
Spark Sort Based Shuffle 內(nèi)存分析
Spark OFF_HEAP
Unified Memory Management in Spark 1.6
Tuning Spark: Garbage Collection Tuning
Spark Architecture
《Spark 技術(shù)內(nèi)幕:深入解析 Spark 內(nèi)核架構(gòu)于實現(xiàn)原理》第 8 章 Storage 模塊詳解
Spark Sort Based Shuffle 內(nèi)存分析
Project Tungsten: Bringing Apache Spark Closer to Bare Metal
Spark Tungsten-sort Based Shuffle 分析
探索 Spark Tungsten 的秘密
Spark Task 內(nèi)存管理(on-heap&off-heap)





