本文為 Spark 2.0 源碼分析筆記,某些實(shí)現(xiàn)可能與其他版本有所出入
這篇文章前半部分我們對(duì)直接在 Block 存取發(fā)揮重要作用的類進(jìn)行介紹,主要是 DiskBlockManager、MemoryStore、DiskStore。后半部分以存取 Broadcast 來(lái)進(jìn)一步加深對(duì) Block 存取的理解。
DiskBlockManager
DiskBlockManager 主要用來(lái)創(chuàng)建并持有邏輯 blocks 與磁盤上的 blocks之間的映射,一個(gè)邏輯 block 通過(guò) BlockId 映射到一個(gè)磁盤上的文件。
主要成員
-
localDirs: Array[File]:創(chuàng)建根據(jù)spark.local.dir(備注①)指定的目錄列表,這些目錄下會(huì)創(chuàng)建子目錄,這些子目錄用來(lái)存放 Application 運(yùn)行過(guò)程中產(chǎn)生的存放在磁盤上的中間數(shù)據(jù),比如 cached RDD partition 對(duì)應(yīng)的 block、Shuffle Write 產(chǎn)生的數(shù)據(jù)等,會(huì)根據(jù)文件名將 block 文件 hash 到不同的目錄下 -
subDirs: Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)):localDirs 代表的各個(gè)目錄下的子目錄,子目錄個(gè)數(shù)由spark.diskStore.subDirectories指定,子目錄用來(lái)存儲(chǔ)具體的 block 對(duì)應(yīng)的文件,會(huì)根據(jù) block file 文件名先 hash 確定放在哪個(gè) localDir,在 hash 決定放在該 localDir 的哪個(gè)子目錄下(尋找該 block 文件也是通過(guò)這種方式) -
shutdownHook = addShutdownHook():即關(guān)閉鉤子,在進(jìn)程結(jié)束時(shí)會(huì)遞歸刪除 localDirs 下所有屬于該 Application 的文件
主要方法
看了上面幾個(gè)主要成員的介紹相信已經(jīng)對(duì)邏輯 block 如何與磁盤文件映射已經(jīng)有了大致了解。接下來(lái)看看幾個(gè)主要的方法:
-
getFile(filename: String): File:通過(guò)文件名來(lái)查找 block 文件并獲取文件句柄,先通過(guò)文件名 hash 到指定目錄再查找 -
getFile(blockId: BlockId): File:通過(guò) blockId 來(lái)查找 block 文件并獲取文件句柄,事實(shí)上是通過(guò)調(diào)用getFile(filename: String): File來(lái)查找的 -
containsBlock(blockId: BlockId): Boolean:是否包含某個(gè) blockId 對(duì)應(yīng)的文件 -
getAllFiles(): Seq[File]:獲取存儲(chǔ)在磁盤上所有 block 文件的句柄,以列表的形式返回 -
getAllBlocks(): Seq[BlockId]:獲取存儲(chǔ)在磁盤上的所有 blockId -
stop(): Unit:清理存儲(chǔ)在磁盤上所有的 block 文件 -
createTempLocalBlock(): (TempLocalBlockId, File):產(chǎn)生一個(gè)唯一的 Block Id 和文件句柄用于存儲(chǔ)本地中間結(jié)果 -
createTempShuffleBlock(): (TempShuffleBlockId, File):產(chǎn)生一個(gè)唯一的 Block Id 和文件句柄用于存儲(chǔ) shuffle 中間結(jié)果
如上述,DiskBlockManager 提供的方法主要是為了提供映射的方法,而并不會(huì)將現(xiàn)成的映射關(guān)系保存在某個(gè)成員中,這是需要明了的一點(diǎn)。DiskBlockManager 方法主要在需要?jiǎng)?chuàng)建或獲取某個(gè) block 對(duì)應(yīng)的磁盤文件以及在 BlockManager 退出時(shí)要清理磁盤文件時(shí)被調(diào)用。
DiskStore
DiskStore 用來(lái)將 block 數(shù)據(jù)存儲(chǔ)至磁盤,是直接的磁盤文件操作者。其封裝了:
兩個(gè)寫方法
-
put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit:用文件輸出流的方式寫 block 數(shù)據(jù)至磁盤 -
putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit:以字節(jié) buffer 的方式寫 block 數(shù)據(jù)至磁盤
一個(gè)讀方法
-
getBytes(blockId: BlockId): ChunkedByteBuffer:通過(guò) block id 讀取存儲(chǔ)在磁盤上的 block 數(shù)據(jù),以字節(jié) buffer 的形式返回
兩個(gè)查方法
-
getSize(blockId: BlockId): Long:通過(guò) block id 獲取存儲(chǔ)在磁盤上的 block 數(shù)據(jù)的大小 -
contains(blockId: BlockId): Boolean:查詢磁盤上是否包含某個(gè) block id 的數(shù)據(jù)
一個(gè)刪方法
-
remove(blockId: BlockId): Boolean:刪除磁盤上某個(gè) block id 的數(shù)據(jù)
需要說(shuō)明的是,DiskStore 的各個(gè)方法中,通過(guò) block id 或文件名來(lái)找到對(duì)應(yīng)的 block 文件句柄是通過(guò)調(diào)用 DiskBlockManager 的方法來(lái)達(dá)成的
MemoryStore
MemoryStore 用來(lái)將沒(méi)有序列化的 Java 對(duì)象數(shù)組和序列化的字節(jié) buffer 存儲(chǔ)至內(nèi)存中。它的實(shí)現(xiàn)比 DiskStore 稍復(fù)雜,我們先來(lái)看看主要成員
先說(shuō)明 MemoryEntry:
private sealed trait MemoryEntry[T] {
def size: Long
def memoryMode: MemoryMode
def classTag: ClassTag[T]
}
public enum MemoryMode {
ON_HEAP,
OFF_HEAP
}
代表 JVM 或?qū)ν鈨?nèi)存的內(nèi)存大小
主要成員
-
entries: LinkedHashMap[BlockId, MemoryEntry[_]]:保存每個(gè) block id 及其存儲(chǔ)在內(nèi)存中的數(shù)據(jù)的大小及是保存在 JVM 內(nèi)存中還是堆外內(nèi)存中 -
unrollMemoryMap: mutable.HashMap[Long, Long]:保存每個(gè) task 占用的用來(lái)存儲(chǔ) block 而占用的 JVM 內(nèi)存 -
offHeapUnrollMemoryMap: mutable.HashMap[Long, Long]:保存每個(gè) task 占用的用來(lái)存儲(chǔ) block 而占用的對(duì)外內(nèi)存
以上幾個(gè)成員主要描述了每個(gè) block 占用了多少內(nèi)存空間,每個(gè) task 占用了多少內(nèi)存空間以及它們占用的是 JVM 內(nèi)存還是堆外內(nèi)存。接下來(lái)看看幾個(gè)重要的方法:
三個(gè)寫方法
-
putBytes[T: ClassTag](blockId: BlockId, size: Long, memoryMode: MemoryMode, _bytes: () => ChunkedByteBuffer): Boolean:先檢查是否還有空余內(nèi)存來(lái)存儲(chǔ)參數(shù) size 這么大的 block,若有則將 block 以字節(jié) buffer 形式存入;否則不存入,返回失敗 -
putIteratorAsValues[T](blockId: BlockId, values: Iterator[T], classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long]:嘗試將參數(shù) blockId 對(duì)應(yīng)的數(shù)據(jù)通過(guò)迭代器的方式寫入內(nèi)存。為避免由于空余內(nèi)存不足以存放 block 數(shù)據(jù)而導(dǎo)致的 OOM。該方法會(huì)逐步展開(kāi)迭代器來(lái)檢查是否還有空余內(nèi)存。如果迭代器順利展開(kāi)了,那么用來(lái)展開(kāi)迭代器的內(nèi)存直接轉(zhuǎn)換為存儲(chǔ)內(nèi)存,而不用再去分配內(nèi)存來(lái)存儲(chǔ)該 block 數(shù)據(jù)。如果未能完全開(kāi)展迭代器,則返回一個(gè)包含 block 數(shù)據(jù)的迭代器,其對(duì)應(yīng)的數(shù)據(jù)是由多個(gè)局部塊組合而成的 block 數(shù)據(jù) -
putIteratorAsBytes[T](blockId: BlockId, values: Iterator[T], classTag: ClassTag[T], memoryMode: MemoryMode): Either[PartiallySerializedBlock[T], Long]:嘗試將參數(shù) blockId 對(duì)應(yīng)的數(shù)據(jù)通過(guò)字節(jié) buffer 的方式寫入內(nèi)存。為避免由于空余內(nèi)存不足以存放 block 數(shù)據(jù)而導(dǎo)致的 OOM。該方法會(huì)逐步展開(kāi)迭代器來(lái)檢查是否還有空余內(nèi)存。如果迭代器順利展開(kāi)了,那么用來(lái)展開(kāi)迭代器的內(nèi)存直接轉(zhuǎn)換為存儲(chǔ)內(nèi)存,而不用再去分配內(nèi)存來(lái)存儲(chǔ)該 block 數(shù)據(jù)。如果未能完全開(kāi)展迭代器,則返回一個(gè)包含 block 數(shù)據(jù)的迭代器,其對(duì)應(yīng)的數(shù)據(jù)是由多個(gè)局部塊組合而成的 block 數(shù)據(jù)
兩個(gè)讀方法
-
getBytes(blockId: BlockId): Option[ChunkedByteBuffer]:以字節(jié) buffer 的形式獲取參數(shù) blockId 指定的 block 數(shù)據(jù) -
getValues(blockId: BlockId): Option[Iterator[_]]:以迭代器的形式獲取參數(shù) blockId 指定的 block 數(shù)據(jù)
若干個(gè)查方法
-
getSize(blockId: BlockId): Long:獲取 blockId 對(duì)應(yīng) block 占用的內(nèi)存大小 -
contains(blockId: BlockId): Boolean:內(nèi)存中是否包含某個(gè) blockId 對(duì)應(yīng)的 block 數(shù)據(jù) -
currentUnrollMemory(): Long:當(dāng)前所有 tasks 用于存儲(chǔ) blocks 占用的總內(nèi)存 - ...
兩個(gè)刪方法
-
remove(blockId: BlockId): Boolean:刪除內(nèi)存中 blockId 指定的 block 數(shù)據(jù) -
clear(): Unit:清除 MemoryStore 中存儲(chǔ)的所有 blocks 數(shù)據(jù)
從上面描述的 MemoryStore 的主要方法來(lái)看,其功能和 DiskStore 類似,但由于要考慮到 JVM 內(nèi)存和堆外內(nèi)存以及有可能內(nèi)存不足以存儲(chǔ) block 數(shù)據(jù)等問(wèn)題會(huì)變得更加復(fù)雜
備注說(shuō)明
- 備注①:設(shè)置
spark.local.dir時(shí)可以設(shè)置多個(gè)目錄,目錄分別在不同磁盤上,可以增加整體 IO 帶寬;也盡量讓目錄位于更快的磁盤上以獲得更快的 IO 速度