Spark Storage ④ - 存儲(chǔ)執(zhí)行類介紹(DiskBlockManager、DiskStore、MemoryStore)

本文為 Spark 2.0 源碼分析筆記,某些實(shí)現(xiàn)可能與其他版本有所出入

這篇文章前半部分我們對(duì)直接在 Block 存取發(fā)揮重要作用的類進(jìn)行介紹,主要是 DiskBlockManager、MemoryStore、DiskStore。后半部分以存取 Broadcast 來(lái)進(jìn)一步加深對(duì) Block 存取的理解。

DiskBlockManager

DiskBlockManager 主要用來(lái)創(chuàng)建并持有邏輯 blocks 與磁盤上的 blocks之間的映射,一個(gè)邏輯 block 通過(guò) BlockId 映射到一個(gè)磁盤上的文件。

主要成員

  • localDirs: Array[File]:創(chuàng)建根據(jù) spark.local.dir (備注①)指定的目錄列表,這些目錄下會(huì)創(chuàng)建子目錄,這些子目錄用來(lái)存放 Application 運(yùn)行過(guò)程中產(chǎn)生的存放在磁盤上的中間數(shù)據(jù),比如 cached RDD partition 對(duì)應(yīng)的 block、Shuffle Write 產(chǎn)生的數(shù)據(jù)等,會(huì)根據(jù)文件名將 block 文件 hash 到不同的目錄下
  • subDirs: Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir)):localDirs 代表的各個(gè)目錄下的子目錄,子目錄個(gè)數(shù)由 spark.diskStore.subDirectories 指定,子目錄用來(lái)存儲(chǔ)具體的 block 對(duì)應(yīng)的文件,會(huì)根據(jù) block file 文件名先 hash 確定放在哪個(gè) localDir,在 hash 決定放在該 localDir 的哪個(gè)子目錄下(尋找該 block 文件也是通過(guò)這種方式)
  • shutdownHook = addShutdownHook():即關(guān)閉鉤子,在進(jìn)程結(jié)束時(shí)會(huì)遞歸刪除 localDirs 下所有屬于該 Application 的文件

主要方法

看了上面幾個(gè)主要成員的介紹相信已經(jīng)對(duì)邏輯 block 如何與磁盤文件映射已經(jīng)有了大致了解。接下來(lái)看看幾個(gè)主要的方法:

  • getFile(filename: String): File:通過(guò)文件名來(lái)查找 block 文件并獲取文件句柄,先通過(guò)文件名 hash 到指定目錄再查找
  • getFile(blockId: BlockId): File:通過(guò) blockId 來(lái)查找 block 文件并獲取文件句柄,事實(shí)上是通過(guò)調(diào)用 getFile(filename: String): File 來(lái)查找的
  • containsBlock(blockId: BlockId): Boolean:是否包含某個(gè) blockId 對(duì)應(yīng)的文件
  • getAllFiles(): Seq[File]:獲取存儲(chǔ)在磁盤上所有 block 文件的句柄,以列表的形式返回
  • getAllBlocks(): Seq[BlockId]:獲取存儲(chǔ)在磁盤上的所有 blockId
  • stop(): Unit:清理存儲(chǔ)在磁盤上所有的 block 文件
  • createTempLocalBlock(): (TempLocalBlockId, File):產(chǎn)生一個(gè)唯一的 Block Id 和文件句柄用于存儲(chǔ)本地中間結(jié)果
  • createTempShuffleBlock(): (TempShuffleBlockId, File):產(chǎn)生一個(gè)唯一的 Block Id 和文件句柄用于存儲(chǔ) shuffle 中間結(jié)果

如上述,DiskBlockManager 提供的方法主要是為了提供映射的方法,而并不會(huì)將現(xiàn)成的映射關(guān)系保存在某個(gè)成員中,這是需要明了的一點(diǎn)。DiskBlockManager 方法主要在需要?jiǎng)?chuàng)建或獲取某個(gè) block 對(duì)應(yīng)的磁盤文件以及在 BlockManager 退出時(shí)要清理磁盤文件時(shí)被調(diào)用。


DiskStore

DiskStore 用來(lái)將 block 數(shù)據(jù)存儲(chǔ)至磁盤,是直接的磁盤文件操作者。其封裝了:

兩個(gè)寫方法

  • put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit:用文件輸出流的方式寫 block 數(shù)據(jù)至磁盤
  • putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit:以字節(jié) buffer 的方式寫 block 數(shù)據(jù)至磁盤

一個(gè)讀方法

  • getBytes(blockId: BlockId): ChunkedByteBuffer:通過(guò) block id 讀取存儲(chǔ)在磁盤上的 block 數(shù)據(jù),以字節(jié) buffer 的形式返回

兩個(gè)查方法

  • getSize(blockId: BlockId): Long:通過(guò) block id 獲取存儲(chǔ)在磁盤上的 block 數(shù)據(jù)的大小
  • contains(blockId: BlockId): Boolean:查詢磁盤上是否包含某個(gè) block id 的數(shù)據(jù)

一個(gè)刪方法

  • remove(blockId: BlockId): Boolean:刪除磁盤上某個(gè) block id 的數(shù)據(jù)

需要說(shuō)明的是,DiskStore 的各個(gè)方法中,通過(guò) block id 或文件名來(lái)找到對(duì)應(yīng)的 block 文件句柄是通過(guò)調(diào)用 DiskBlockManager 的方法來(lái)達(dá)成的


MemoryStore

MemoryStore 用來(lái)將沒(méi)有序列化的 Java 對(duì)象數(shù)組和序列化的字節(jié) buffer 存儲(chǔ)至內(nèi)存中。它的實(shí)現(xiàn)比 DiskStore 稍復(fù)雜,我們先來(lái)看看主要成員

先說(shuō)明 MemoryEntry

private sealed trait MemoryEntry[T] {
  def size: Long
  def memoryMode: MemoryMode
  def classTag: ClassTag[T]
}

public enum MemoryMode {
  ON_HEAP,
  OFF_HEAP
}

代表 JVM 或?qū)ν鈨?nèi)存的內(nèi)存大小

主要成員

  • entries: LinkedHashMap[BlockId, MemoryEntry[_]]:保存每個(gè) block id 及其存儲(chǔ)在內(nèi)存中的數(shù)據(jù)的大小及是保存在 JVM 內(nèi)存中還是堆外內(nèi)存中
  • unrollMemoryMap: mutable.HashMap[Long, Long]:保存每個(gè) task 占用的用來(lái)存儲(chǔ) block 而占用的 JVM 內(nèi)存
  • offHeapUnrollMemoryMap: mutable.HashMap[Long, Long]:保存每個(gè) task 占用的用來(lái)存儲(chǔ) block 而占用的對(duì)外內(nèi)存

以上幾個(gè)成員主要描述了每個(gè) block 占用了多少內(nèi)存空間,每個(gè) task 占用了多少內(nèi)存空間以及它們占用的是 JVM 內(nèi)存還是堆外內(nèi)存。接下來(lái)看看幾個(gè)重要的方法:

三個(gè)寫方法

  • putBytes[T: ClassTag](blockId: BlockId, size: Long, memoryMode: MemoryMode, _bytes: () => ChunkedByteBuffer): Boolean:先檢查是否還有空余內(nèi)存來(lái)存儲(chǔ)參數(shù) size 這么大的 block,若有則將 block 以字節(jié) buffer 形式存入;否則不存入,返回失敗
  • putIteratorAsValues[T](blockId: BlockId, values: Iterator[T], classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long]:嘗試將參數(shù) blockId 對(duì)應(yīng)的數(shù)據(jù)通過(guò)迭代器的方式寫入內(nèi)存。為避免由于空余內(nèi)存不足以存放 block 數(shù)據(jù)而導(dǎo)致的 OOM。該方法會(huì)逐步展開(kāi)迭代器來(lái)檢查是否還有空余內(nèi)存。如果迭代器順利展開(kāi)了,那么用來(lái)展開(kāi)迭代器的內(nèi)存直接轉(zhuǎn)換為存儲(chǔ)內(nèi)存,而不用再去分配內(nèi)存來(lái)存儲(chǔ)該 block 數(shù)據(jù)。如果未能完全開(kāi)展迭代器,則返回一個(gè)包含 block 數(shù)據(jù)的迭代器,其對(duì)應(yīng)的數(shù)據(jù)是由多個(gè)局部塊組合而成的 block 數(shù)據(jù)
  • putIteratorAsBytes[T](blockId: BlockId, values: Iterator[T], classTag: ClassTag[T], memoryMode: MemoryMode): Either[PartiallySerializedBlock[T], Long]:嘗試將參數(shù) blockId 對(duì)應(yīng)的數(shù)據(jù)通過(guò)字節(jié) buffer 的方式寫入內(nèi)存。為避免由于空余內(nèi)存不足以存放 block 數(shù)據(jù)而導(dǎo)致的 OOM。該方法會(huì)逐步展開(kāi)迭代器來(lái)檢查是否還有空余內(nèi)存。如果迭代器順利展開(kāi)了,那么用來(lái)展開(kāi)迭代器的內(nèi)存直接轉(zhuǎn)換為存儲(chǔ)內(nèi)存,而不用再去分配內(nèi)存來(lái)存儲(chǔ)該 block 數(shù)據(jù)。如果未能完全開(kāi)展迭代器,則返回一個(gè)包含 block 數(shù)據(jù)的迭代器,其對(duì)應(yīng)的數(shù)據(jù)是由多個(gè)局部塊組合而成的 block 數(shù)據(jù)

兩個(gè)讀方法

  • getBytes(blockId: BlockId): Option[ChunkedByteBuffer]:以字節(jié) buffer 的形式獲取參數(shù) blockId 指定的 block 數(shù)據(jù)
  • getValues(blockId: BlockId): Option[Iterator[_]]:以迭代器的形式獲取參數(shù) blockId 指定的 block 數(shù)據(jù)

若干個(gè)查方法

  • getSize(blockId: BlockId): Long:獲取 blockId 對(duì)應(yīng) block 占用的內(nèi)存大小
  • contains(blockId: BlockId): Boolean:內(nèi)存中是否包含某個(gè) blockId 對(duì)應(yīng)的 block 數(shù)據(jù)
  • currentUnrollMemory(): Long:當(dāng)前所有 tasks 用于存儲(chǔ) blocks 占用的總內(nèi)存
  • ...

兩個(gè)刪方法

  • remove(blockId: BlockId): Boolean:刪除內(nèi)存中 blockId 指定的 block 數(shù)據(jù)
  • clear(): Unit:清除 MemoryStore 中存儲(chǔ)的所有 blocks 數(shù)據(jù)

從上面描述的 MemoryStore 的主要方法來(lái)看,其功能和 DiskStore 類似,但由于要考慮到 JVM 內(nèi)存和堆外內(nèi)存以及有可能內(nèi)存不足以存儲(chǔ) block 數(shù)據(jù)等問(wèn)題會(huì)變得更加復(fù)雜


備注說(shuō)明

  • 備注①:設(shè)置 spark.local.dir 時(shí)可以設(shè)置多個(gè)目錄,目錄分別在不同磁盤上,可以增加整體 IO 帶寬;也盡量讓目錄位于更快的磁盤上以獲得更快的 IO 速度
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容