目錄
前言
前面我們用3篇文章的時(shí)間講解了RDD的基礎(chǔ)知識(shí),包括其五要素、算子、依賴、分區(qū)以及檢查點(diǎn)。實(shí)際上,與RDD相關(guān)的細(xì)節(jié)還有很多,滲透在之后的研究過(guò)程中。在時(shí)機(jī)合適時(shí),會(huì)再撥出專門(mén)的時(shí)間更深入地講解RDD。從本篇開(kāi)始,進(jìn)入Spark Core存儲(chǔ)子系統(tǒng)。
提起“存儲(chǔ)”這個(gè)詞,自然就包括內(nèi)部存儲(chǔ)(內(nèi)存)與外部存儲(chǔ)(磁盤(pán)等)。Spark的存儲(chǔ)子系統(tǒng)會(huì)同時(shí)對(duì)內(nèi)存和外存進(jìn)行管理,這些管理組件的名稱本身就很容易理解,如MemoryManager、DiskBlockManager、MemoryStore、DiskStore等,我們會(huì)逐漸接觸到它們。
前文已經(jīng)多次提到過(guò),Spark存儲(chǔ)子系統(tǒng)的“司令官”是BlockManager,即塊管理器,用主從架構(gòu)實(shí)現(xiàn)。由此可見(jiàn),“塊”(Block)是Spark存儲(chǔ)的基本單位,看官如果學(xué)過(guò)操作系統(tǒng)理論,對(duì)這個(gè)詞應(yīng)該已經(jīng)非常熟悉了。不過(guò)這里的塊與操作系統(tǒng)和JVM都無(wú)關(guān),只是Spark體系內(nèi)的概念而已。
本文先來(lái)探索與塊相關(guān)的基本實(shí)現(xiàn),包括塊的ID、實(shí)際數(shù)據(jù)與元信息的封裝。
塊ID:BlockId
與RDD類似,塊也需要一個(gè)ID來(lái)表明它的身份。不過(guò)RDD的ID只是一個(gè)整形值而已,塊ID包含的東西稍微多點(diǎn)。BlockId抽象類的定義如下。
代碼#21.1 - o.a.s.storage.BlockId抽象類
sealed abstract class BlockId {
def name: String
def asRDDId: Option[RDDBlockId] = if (isRDD) Some(asInstanceOf[RDDBlockId]) else None
def isRDD: Boolean = isInstanceOf[RDDBlockId]
def isShuffle: Boolean = isInstanceOf[ShuffleBlockId]
def isBroadcast: Boolean = isInstanceOf[BroadcastBlockId]
override def toString: String = name
}
這個(gè)類本身已經(jīng)很清楚了,name方法返回該BlockId的唯一名稱。三個(gè)以is為前綴的布爾方法分別判斷當(dāng)前BlockId是否為RDDBlockId、ShuffleBlockId和BroadcastBlockId。這三個(gè)實(shí)現(xiàn)類(當(dāng)然還有一些其他的實(shí)現(xiàn))都是BlockId的子類,以下是類圖。

代碼示例如下。
代碼#21.2 - o.a.s.storage.RDDBlockId/ShuffleBlockId/BroadcastBlockId類
@DeveloperApi
case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
override def name: String = "rdd_" + rddId + "_" + splitIndex
}
@DeveloperApi
case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
}
@DeveloperApi
case class BroadcastBlockId(broadcastId: Long, field: String = "") extends BlockId {
override def name: String = "broadcast_" + broadcastId + (if (field == "") "" else "_" + field)
}
可見(jiàn),它們都是簡(jiǎn)單的樣例類,覆寫(xiě)了name方法,并且命名都符合一定的規(guī)則。比如RDD數(shù)據(jù)塊ID的命名為rdd_[RDD ID]_[分區(qū)號(hào)],Shuffle數(shù)據(jù)塊ID的命名為shuffle_[Shuffle過(guò)程ID]_[Map任務(wù)ID]_[Reduce任務(wù)ID]等。在BlockId類的伴生對(duì)象中,也有所有命名的正則表示。
代碼#21.3 - BlockId命名的正則表示
val RDD = "rdd_([0-9]+)_([0-9]+)".r
val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r
val SHUFFLE_DATA = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).data".r
val SHUFFLE_INDEX = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).index".r
val BROADCAST = "broadcast_([0-9]+)([_A-Za-z0-9]*)".r
val TASKRESULT = "taskresult_([0-9]+)".r
val STREAM = "input-([0-9]+)-([0-9]+)".r
val TEMP_LOCAL = "temp_local_([-A-Fa-f0-9]+)".r
val TEMP_SHUFFLE = "temp_shuffle_([-A-Fa-f0-9]+)".r
val TEST = "test_(.*)".r
塊的ID已經(jīng)有了,接下來(lái)看具體的數(shù)據(jù)如何封裝。
塊數(shù)據(jù):BlockData
BlockData特征
BlockData是一個(gè)松散的Scala特征,其源碼如下。
代碼#21.4 - o.a.s.storage.BlockData特征
private[spark] trait BlockData {
def toInputStream(): InputStream
def toNetty(): Object
def toChunkedByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer
def toByteBuffer(): ByteBuffer
def size: Long
def dispose(): Unit
}
其中定義的都是虛方法,它們的含義分別是:
- toInputStream():將塊數(shù)據(jù)轉(zhuǎn)化為java.io.InputStream。
- toNetty():將塊數(shù)據(jù)轉(zhuǎn)化為適合Netty上傳輸?shù)膶?duì)象格式。
- toChunkedByteBuffer():將塊數(shù)據(jù)轉(zhuǎn)化為o.a.s.util.io.ChunkedByteBuffer。ChunkedByteBuffer在文章#11講解廣播變量時(shí)已出現(xiàn)過(guò),是對(duì)多個(gè)java.nio.ByteBuffer的封裝,表示多個(gè)不連續(xù)的內(nèi)存緩沖區(qū)中的數(shù)據(jù)。雖然Chunk這個(gè)詞在中文中一般也翻譯作“塊”,但它與上面的Block相比,更是一個(gè)邏輯概念而非物理概念。
- toByteBuffer():將塊數(shù)據(jù)轉(zhuǎn)化為單個(gè)java.nio.ByteBuffer。
- size():返回這個(gè)BlockData的長(zhǎng)度。
- dispose():銷毀BlockData。
可見(jiàn),BlockData只是定義了數(shù)據(jù)轉(zhuǎn)化的規(guī)范,并沒(méi)有涉及具體的存儲(chǔ)格式和讀寫(xiě)流程,實(shí)現(xiàn)起來(lái)比較自由,所以前面說(shuō)它是個(gè)松散的特征。BlockData目前有3個(gè)實(shí)現(xiàn)類:基于內(nèi)存和ChunkedByteBuffer的ByteBufferBlockData、基于磁盤(pán)和File的DiskBlockData,以及加密的EncryptedBlockData。下面來(lái)看看最簡(jiǎn)單的ByteBufferBlockData實(shí)現(xiàn)。
ByteBufferBlockData
以下是ByteBufferBlockData類的源碼,可見(jiàn)它是直接代理了ChunkedByteBuffer的各種方法。
代碼#21.5 - o.a.s.storage.ByteBufferBlockData類
private[spark] class ByteBufferBlockData(
val buffer: ChunkedByteBuffer,
val shouldDispose: Boolean) extends BlockData {
override def toInputStream(): InputStream = buffer.toInputStream(dispose = false)
override def toNetty(): Object = buffer.toNetty
override def toChunkedByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer = {
buffer.copy(allocator)
}
override def toByteBuffer(): ByteBuffer = buffer.toByteBuffer
override def size: Long = buffer.size
override def dispose(): Unit = {
if (shouldDispose) {
buffer.dispose()
}
}
}
ChunkedByteBuffer實(shí)際上就是定義了對(duì)Array[ByteBuffer]類型的各種操作,它在Spark存儲(chǔ)中是個(gè)很常用的類,下面來(lái)看一下。
ChunkedByteBuffer簡(jiǎn)介
ChunkedByteBuffer的構(gòu)造方法參數(shù)是一個(gè)名為chunks的Array[ByteBuffer]類型對(duì)象,也就是說(shuō)一個(gè)ByteBuffer就是一個(gè)Chunk。該類的成員屬性如下。
代碼#21.6 - o.a.s.util.io.ChunkedByteBuffer類的屬性成員
private val bufferWriteChunkSize =
Option(SparkEnv.get).map(_.conf.get(config.BUFFER_WRITE_CHUNK_SIZE))
.getOrElse(config.BUFFER_WRITE_CHUNK_SIZE.defaultValue.get).toInt
private[this] var disposed: Boolean = false
val size: Long = chunks.map(_.limit().asInstanceOf[Long]).sum
- bufferWriteChunkSize:將緩存數(shù)據(jù)寫(xiě)出時(shí)的Chunk大小,由spark.buffer.write.chunkSize配置項(xiàng)來(lái)確定,默認(rèn)值64MB。
- disposed:該ChunkedByteBuffer是否已銷毀。
- size:該ChunkedByteBuffer的大小,通過(guò)調(diào)用ByteBuffer.limit()方法獲取每個(gè)Chunk的大小并累加而來(lái)。
它提供了一個(gè)writeFully()方法,用來(lái)將緩存塊數(shù)據(jù)以bufferWriteChunkSize的大小寫(xiě)入NIO Channel。
代碼#21.7 - o.a.s.util.io.ChunkedByteBuffer.writeFully()方法
def writeFully(channel: WritableByteChannel): Unit = {
for (bytes <- getChunks()) {
val curChunkLimit = bytes.limit()
while (bytes.hasRemaining) {
try {
val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
bytes.limit(bytes.position() + ioSize)
channel.write(bytes)
} finally {
bytes.limit(curChunkLimit)
}
}
}
}
關(guān)于它的其他方法,我們會(huì)在今后的講解過(guò)程中逐漸接觸到,不難。
塊元信息:BlockInfo
為了方便跟蹤塊的一些基本數(shù)據(jù),需要用一個(gè)專門(mén)的數(shù)據(jù)結(jié)構(gòu)BlockInfo來(lái)維護(hù)。其完整代碼如下所示。
代碼#21.8 - o.a.s.storage.BlockInfo類
private[storage] class BlockInfo(
val level: StorageLevel,
val classTag: ClassTag[_],
val tellMaster: Boolean) {
def size: Long = _size
def size_=(s: Long): Unit = {
_size = s
checkInvariants()
}
private[this] var _size: Long = 0
def readerCount: Int = _readerCount
def readerCount_=(c: Int): Unit = {
_readerCount = c
checkInvariants()
}
private[this] var _readerCount: Int = 0
def writerTask: Long = _writerTask
def writerTask_=(t: Long): Unit = {
_writerTask = t
checkInvariants()
}
private[this] var _writerTask: Long = BlockInfo.NO_WRITER
private def checkInvariants(): Unit = {
assert(_readerCount >= 0)
assert(_readerCount == 0 || _writerTask == BlockInfo.NO_WRITER)
}
checkInvariants()
}
該類有三個(gè)構(gòu)造方法參數(shù):
- level:塊的期望存儲(chǔ)等級(jí),不代表實(shí)際的存儲(chǔ)情況。例如,如果設(shè)定為StorageLevel.MEMORY_AND_DISK,那么這個(gè)塊有可能只在內(nèi)存而不在磁盤(pán)中,反之同理。
- classTag:塊的類標(biāo)簽。
- tellMaster:是否要將該塊的元信息告知Master。
BlockInfo內(nèi)定義了3對(duì)Getter/Setter:
- size:塊的大小,以字節(jié)為單位。
- readerCount:該塊被讀取的次數(shù)。因?yàn)樽x取塊時(shí)需要上鎖,因此也就相當(dāng)于加讀鎖的次數(shù)。
- writerTask:當(dāng)前持有該塊寫(xiě)鎖的Task ID。
雖然上面提到了讀鎖和寫(xiě)鎖,但BlockInfo本身并沒(méi)有提供任何鎖機(jī)制,而是藉由BlockInfo的管理器BlockInfoManager來(lái)實(shí)現(xiàn)。關(guān)于BlockInfoManager的細(xì)節(jié)將在下一篇文章討論。
總結(jié)
本文研究了與塊相關(guān)的三大基本組件:BlockId、BlockData與BlockInfo,它們?nèi)吆掀饋?lái)就可以基本完整地描述Spark中的一個(gè)塊了。理解了它們,我們就可以繼續(xù)研究塊在內(nèi)存與外存中是分別如何管理的。