spark Executor的內(nèi)存大小設(shè)置一直是長期困擾開發(fā)人員和分析師的一個大問題。不同應(yīng)用使用的算法和數(shù)據(jù)不同,一次內(nèi)存設(shè)置也是難以評估,設(shè)置過大會造成資源浪費,其余任務(wù)得不到資源而等待。設(shè)置過小,計算過程中會帶來頻繁的GC和磁盤讀寫,緩存數(shù)據(jù)比較少,還有可能會帶來OOM,也會影響到性能。
針對上述的問題,spark 1.6 帶來了新的內(nèi)存管理機(jī)制,Unified Memory Manager。
以前的內(nèi)存管理
日常使用中,我們通過spark.executor.memory來控制一個executor最多可以使用的內(nèi)存大小,實際上是通過設(shè)置Executor的JVM的Heap大小實現(xiàn)的。
在spark1.6之前,Executor的內(nèi)存界限分明,分別由3部分組成:execution,storage和system。
execution
execution空間通過設(shè)置spark.shuffle.memoryFraction參數(shù)來控制大小,默認(rèn)為0.2。為了避免shuffle,join,排序和聚合這些操作直接將數(shù)據(jù)寫入磁盤,所設(shè)置的buffer大小,減少了磁盤讀寫的次數(shù)。storage
storage空間通過設(shè)置spark.storage.memoryFraction參數(shù)來控制大小,默認(rèn)為0.6。用于存儲用戶顯示調(diào)用的persist,cache,broadcast等命令存儲的數(shù)據(jù)空間。system
程序運行需要的空間,存儲一些spark內(nèi)部的元數(shù)據(jù)信息,用戶的數(shù)據(jù)結(jié)構(gòu),避免一些不尋常的大記錄帶來的OOM。
之前的管理方式,最明顯的就是對execution和storage空間進(jìn)行了明顯的劃分。舉個例子,一些任務(wù)可能對數(shù)據(jù)緩存的需求并不是很高,就會造成storage空間的浪費。
因此,spark1.6帶來了新的內(nèi)存管理機(jī)制。
Unified MemoryManager簡述
為了解決上述出現(xiàn)的問題,新提出的內(nèi)存管理機(jī)制淡化了execution空間和storage空間的邊界,讓它們之間可以相互“借”內(nèi)存。
通過spark.memory.useLegacyMode參數(shù)來決定是否開啟新的內(nèi)存管理機(jī)制,默認(rèn)為開啟。
它們總共可用的內(nèi)存由spark.memory.fraction參數(shù)控制(默認(rèn)為0.75)。在該空間內(nèi)部,對execution和storage進(jìn)行了進(jìn)一步的劃分。由spark.memory.storageFraction參數(shù)控制(默認(rèn)為0.5),這意味著strorage空間真實占有的executor空間為:
0.75*0.5 = 0.375
exetution向storage”借“內(nèi)存
在程序執(zhí)行過程中,如果execution空間不足,則會向storage空間提出申請,storage會將空閑內(nèi)存借給execution使用,如果不夠,則會釋放之前向execution借的內(nèi)存。
execution申請在acquireStorageMemory方法中實現(xiàn)。
每次申請空間前會進(jìn)行相應(yīng)判斷,在maybeGrowExecutionPool方法中:
//extraMemoryNeeded為需要申請的內(nèi)存
def maybeGrowExecutionPool(extraMemoryNeeded: Long): Unit = {
if (extraMemoryNeeded > 0) {
//判斷storage空閑空間和向execution借去的空間,取較大的那個。
//如果需要,storage會將先前向execution借的內(nèi)存取消緩存進(jìn)行返還。
val memoryReclaimableFromStorage =
math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize - storageRegionSize)
//判斷是否還有內(nèi)存可以借出
if (memoryReclaimableFromStorage > 0) {
//如果剩余內(nèi)存不夠申請的,則將可用內(nèi)存全部借出。
val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace(
math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed)
}
}
}
storage向execution“借”內(nèi)存
同樣的,在storage需要內(nèi)存的時候,execution也會將它的空閑內(nèi)存借出去。因為實現(xiàn)復(fù)雜,所以execution空間不會被storage驅(qū)逐。這種機(jī)制帶來的問題是,如果execution占據(jù)了storage大部分空間。這時候?qū)σ恍?shù)據(jù)的緩存可能會失敗。
在下面的方法中可以清楚的看出,storage可使用的最大內(nèi)存是maxMemory-execution已使用的內(nèi)存數(shù),意思就是說,即使execution占據(jù)了storage的空間也不會被收回,直到execution自己釋放。
override def maxStorageMemory: Long = synchronized {
maxMemory - onHeapExecutionMemoryPool.memoryUsed
}
storage申請在acquireExecutionMemory方法中進(jìn)行了實現(xiàn)。
override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
//如果需要空間大于Storage現(xiàn)有的最大內(nèi)存空間,直接返回失敗
if (numBytes > maxStorageMemory) {
return false
}
//如果需要空間比當(dāng)前storage剩余空間多,則去借execution的空閑空間。
if (numBytes > storageMemoryPool.memoryFree) {
val memoryBorrowedFromExecution = Math.min(onHeapExecutionMemoryPool.memoryFree, numBytes)
onHeapExecutionMemoryPool.decrementPoolSize(memoryBorrowedFromExecution)
storageMemoryPool.incrementPoolSize(memoryBorrowedFromExecution)
}
storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks)
更多實現(xiàn)細(xì)節(jié)可以去org.apache.spark.memory.UnifiedMemoryManager代碼中查看。
帶來的好處
除開極端內(nèi)存使用環(huán)境(execution和storage空間都被占滿)的情況下,新的內(nèi)存管理機(jī)制能夠減少shuffle過程中將數(shù)據(jù)spill到磁盤的次數(shù)或者提高數(shù)據(jù)的緩存比例,都能對程序運行速度帶來一定的提升,同時也減少了調(diào)試參數(shù)的次數(shù)。