Spark Core源碼精讀計(jì)劃#21:Spark存儲(chǔ)中塊(Block)的基本實(shí)現(xiàn)

目錄

前言

前面我們用3篇文章的時(shí)間講解了RDD的基礎(chǔ)知識(shí),包括其五要素、算子、依賴、分區(qū)以及檢查點(diǎn)。實(shí)際上,與RDD相關(guān)的細(xì)節(jié)還有很多,滲透在之后的研究過(guò)程中。在時(shí)機(jī)合適時(shí),會(huì)再撥出專門(mén)的時(shí)間更深入地講解RDD。從本篇開(kāi)始,進(jìn)入Spark Core存儲(chǔ)子系統(tǒng)。

提起“存儲(chǔ)”這個(gè)詞,自然就包括內(nèi)部存儲(chǔ)(內(nèi)存)與外部存儲(chǔ)(磁盤(pán)等)。Spark的存儲(chǔ)子系統(tǒng)會(huì)同時(shí)對(duì)內(nèi)存和外存進(jìn)行管理,這些管理組件的名稱本身就很容易理解,如MemoryManager、DiskBlockManager、MemoryStore、DiskStore等,我們會(huì)逐漸接觸到它們。

前文已經(jīng)多次提到過(guò),Spark存儲(chǔ)子系統(tǒng)的“司令官”是BlockManager,即塊管理器,用主從架構(gòu)實(shí)現(xiàn)。由此可見(jiàn),“塊”(Block)是Spark存儲(chǔ)的基本單位,看官如果學(xué)過(guò)操作系統(tǒng)理論,對(duì)這個(gè)詞應(yīng)該已經(jīng)非常熟悉了。不過(guò)這里的塊與操作系統(tǒng)和JVM都無(wú)關(guān),只是Spark體系內(nèi)的概念而已。

本文先來(lái)探索與塊相關(guān)的基本實(shí)現(xiàn),包括塊的ID、實(shí)際數(shù)據(jù)與元信息的封裝。

塊ID:BlockId

與RDD類似,塊也需要一個(gè)ID來(lái)表明它的身份。不過(guò)RDD的ID只是一個(gè)整形值而已,塊ID包含的東西稍微多點(diǎn)。BlockId抽象類的定義如下。

代碼#21.1 - o.a.s.storage.BlockId抽象類

sealed abstract class BlockId {
  def name: String

  def asRDDId: Option[RDDBlockId] = if (isRDD) Some(asInstanceOf[RDDBlockId]) else None
  def isRDD: Boolean = isInstanceOf[RDDBlockId]
  def isShuffle: Boolean = isInstanceOf[ShuffleBlockId]
  def isBroadcast: Boolean = isInstanceOf[BroadcastBlockId]

  override def toString: String = name
}

這個(gè)類本身已經(jīng)很清楚了,name方法返回該BlockId的唯一名稱。三個(gè)以is為前綴的布爾方法分別判斷當(dāng)前BlockId是否為RDDBlockId、ShuffleBlockId和BroadcastBlockId。這三個(gè)實(shí)現(xiàn)類(當(dāng)然還有一些其他的實(shí)現(xiàn))都是BlockId的子類,以下是類圖。

圖#21.1 - BlockId的子類

代碼示例如下。

代碼#21.2 - o.a.s.storage.RDDBlockId/ShuffleBlockId/BroadcastBlockId類

@DeveloperApi
case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
  override def name: String = "rdd_" + rddId + "_" + splitIndex
}

@DeveloperApi
case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
  override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
}

@DeveloperApi
case class BroadcastBlockId(broadcastId: Long, field: String = "") extends BlockId {
  override def name: String = "broadcast_" + broadcastId + (if (field == "") "" else "_" + field)
}

可見(jiàn),它們都是簡(jiǎn)單的樣例類,覆寫(xiě)了name方法,并且命名都符合一定的規(guī)則。比如RDD數(shù)據(jù)塊ID的命名為rdd_[RDD ID]_[分區(qū)號(hào)],Shuffle數(shù)據(jù)塊ID的命名為shuffle_[Shuffle過(guò)程ID]_[Map任務(wù)ID]_[Reduce任務(wù)ID]等。在BlockId類的伴生對(duì)象中,也有所有命名的正則表示。

代碼#21.3 - BlockId命名的正則表示

  val RDD = "rdd_([0-9]+)_([0-9]+)".r
  val SHUFFLE = "shuffle_([0-9]+)_([0-9]+)_([0-9]+)".r
  val SHUFFLE_DATA = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).data".r
  val SHUFFLE_INDEX = "shuffle_([0-9]+)_([0-9]+)_([0-9]+).index".r
  val BROADCAST = "broadcast_([0-9]+)([_A-Za-z0-9]*)".r
  val TASKRESULT = "taskresult_([0-9]+)".r
  val STREAM = "input-([0-9]+)-([0-9]+)".r
  val TEMP_LOCAL = "temp_local_([-A-Fa-f0-9]+)".r
  val TEMP_SHUFFLE = "temp_shuffle_([-A-Fa-f0-9]+)".r
  val TEST = "test_(.*)".r

塊的ID已經(jīng)有了,接下來(lái)看具體的數(shù)據(jù)如何封裝。

塊數(shù)據(jù):BlockData

BlockData特征

BlockData是一個(gè)松散的Scala特征,其源碼如下。

代碼#21.4 - o.a.s.storage.BlockData特征

private[spark] trait BlockData {
  def toInputStream(): InputStream
  def toNetty(): Object
  def toChunkedByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer
  def toByteBuffer(): ByteBuffer

  def size: Long
  def dispose(): Unit
}

其中定義的都是虛方法,它們的含義分別是:

  • toInputStream():將塊數(shù)據(jù)轉(zhuǎn)化為java.io.InputStream。
  • toNetty():將塊數(shù)據(jù)轉(zhuǎn)化為適合Netty上傳輸?shù)膶?duì)象格式。
  • toChunkedByteBuffer():將塊數(shù)據(jù)轉(zhuǎn)化為o.a.s.util.io.ChunkedByteBuffer。ChunkedByteBuffer在文章#11講解廣播變量時(shí)已出現(xiàn)過(guò),是對(duì)多個(gè)java.nio.ByteBuffer的封裝,表示多個(gè)不連續(xù)的內(nèi)存緩沖區(qū)中的數(shù)據(jù)。雖然Chunk這個(gè)詞在中文中一般也翻譯作“塊”,但它與上面的Block相比,更是一個(gè)邏輯概念而非物理概念。
  • toByteBuffer():將塊數(shù)據(jù)轉(zhuǎn)化為單個(gè)java.nio.ByteBuffer。
  • size():返回這個(gè)BlockData的長(zhǎng)度。
  • dispose():銷毀BlockData。

可見(jiàn),BlockData只是定義了數(shù)據(jù)轉(zhuǎn)化的規(guī)范,并沒(méi)有涉及具體的存儲(chǔ)格式和讀寫(xiě)流程,實(shí)現(xiàn)起來(lái)比較自由,所以前面說(shuō)它是個(gè)松散的特征。BlockData目前有3個(gè)實(shí)現(xiàn)類:基于內(nèi)存和ChunkedByteBuffer的ByteBufferBlockData、基于磁盤(pán)和File的DiskBlockData,以及加密的EncryptedBlockData。下面來(lái)看看最簡(jiǎn)單的ByteBufferBlockData實(shí)現(xiàn)。

ByteBufferBlockData

以下是ByteBufferBlockData類的源碼,可見(jiàn)它是直接代理了ChunkedByteBuffer的各種方法。

代碼#21.5 - o.a.s.storage.ByteBufferBlockData類

private[spark] class ByteBufferBlockData(
    val buffer: ChunkedByteBuffer,
    val shouldDispose: Boolean) extends BlockData {
  override def toInputStream(): InputStream = buffer.toInputStream(dispose = false)

  override def toNetty(): Object = buffer.toNetty

  override def toChunkedByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer = {
    buffer.copy(allocator)
  }

  override def toByteBuffer(): ByteBuffer = buffer.toByteBuffer

  override def size: Long = buffer.size

  override def dispose(): Unit = {
    if (shouldDispose) {
      buffer.dispose()
    }
  }
}

ChunkedByteBuffer實(shí)際上就是定義了對(duì)Array[ByteBuffer]類型的各種操作,它在Spark存儲(chǔ)中是個(gè)很常用的類,下面來(lái)看一下。

ChunkedByteBuffer簡(jiǎn)介

ChunkedByteBuffer的構(gòu)造方法參數(shù)是一個(gè)名為chunks的Array[ByteBuffer]類型對(duì)象,也就是說(shuō)一個(gè)ByteBuffer就是一個(gè)Chunk。該類的成員屬性如下。

代碼#21.6 - o.a.s.util.io.ChunkedByteBuffer類的屬性成員

  private val bufferWriteChunkSize =
    Option(SparkEnv.get).map(_.conf.get(config.BUFFER_WRITE_CHUNK_SIZE))
      .getOrElse(config.BUFFER_WRITE_CHUNK_SIZE.defaultValue.get).toInt

  private[this] var disposed: Boolean = false

  val size: Long = chunks.map(_.limit().asInstanceOf[Long]).sum
  • bufferWriteChunkSize:將緩存數(shù)據(jù)寫(xiě)出時(shí)的Chunk大小,由spark.buffer.write.chunkSize配置項(xiàng)來(lái)確定,默認(rèn)值64MB。
  • disposed:該ChunkedByteBuffer是否已銷毀。
  • size:該ChunkedByteBuffer的大小,通過(guò)調(diào)用ByteBuffer.limit()方法獲取每個(gè)Chunk的大小并累加而來(lái)。

它提供了一個(gè)writeFully()方法,用來(lái)將緩存塊數(shù)據(jù)以bufferWriteChunkSize的大小寫(xiě)入NIO Channel。

代碼#21.7 - o.a.s.util.io.ChunkedByteBuffer.writeFully()方法

  def writeFully(channel: WritableByteChannel): Unit = {
    for (bytes <- getChunks()) {
      val curChunkLimit = bytes.limit()
      while (bytes.hasRemaining) {
        try {
          val ioSize = Math.min(bytes.remaining(), bufferWriteChunkSize)
          bytes.limit(bytes.position() + ioSize)
          channel.write(bytes)
        } finally {
          bytes.limit(curChunkLimit)
        }
      }
    }
  }

關(guān)于它的其他方法,我們會(huì)在今后的講解過(guò)程中逐漸接觸到,不難。

塊元信息:BlockInfo

為了方便跟蹤塊的一些基本數(shù)據(jù),需要用一個(gè)專門(mén)的數(shù)據(jù)結(jié)構(gòu)BlockInfo來(lái)維護(hù)。其完整代碼如下所示。

代碼#21.8 - o.a.s.storage.BlockInfo類

private[storage] class BlockInfo(
    val level: StorageLevel,
    val classTag: ClassTag[_],
    val tellMaster: Boolean) {
  def size: Long = _size
  def size_=(s: Long): Unit = {
    _size = s
    checkInvariants()
  }
  private[this] var _size: Long = 0

  def readerCount: Int = _readerCount
  def readerCount_=(c: Int): Unit = {
    _readerCount = c
    checkInvariants()
  }
  private[this] var _readerCount: Int = 0

  def writerTask: Long = _writerTask
  def writerTask_=(t: Long): Unit = {
    _writerTask = t
    checkInvariants()
  }
  private[this] var _writerTask: Long = BlockInfo.NO_WRITER

  private def checkInvariants(): Unit = {
    assert(_readerCount >= 0)
    assert(_readerCount == 0 || _writerTask == BlockInfo.NO_WRITER)
  }

  checkInvariants()
}

該類有三個(gè)構(gòu)造方法參數(shù):

  • level:塊的期望存儲(chǔ)等級(jí),不代表實(shí)際的存儲(chǔ)情況。例如,如果設(shè)定為StorageLevel.MEMORY_AND_DISK,那么這個(gè)塊有可能只在內(nèi)存而不在磁盤(pán)中,反之同理。
  • classTag:塊的類標(biāo)簽。
  • tellMaster:是否要將該塊的元信息告知Master。

BlockInfo內(nèi)定義了3對(duì)Getter/Setter:

  • size:塊的大小,以字節(jié)為單位。
  • readerCount:該塊被讀取的次數(shù)。因?yàn)樽x取塊時(shí)需要上鎖,因此也就相當(dāng)于加讀鎖的次數(shù)。
  • writerTask:當(dāng)前持有該塊寫(xiě)鎖的Task ID。

雖然上面提到了讀鎖和寫(xiě)鎖,但BlockInfo本身并沒(méi)有提供任何鎖機(jī)制,而是藉由BlockInfo的管理器BlockInfoManager來(lái)實(shí)現(xiàn)。關(guān)于BlockInfoManager的細(xì)節(jié)將在下一篇文章討論。

總結(jié)

本文研究了與塊相關(guān)的三大基本組件:BlockId、BlockData與BlockInfo,它們?nèi)吆掀饋?lái)就可以基本完整地描述Spark中的一個(gè)塊了。理解了它們,我們就可以繼續(xù)研究塊在內(nèi)存與外存中是分別如何管理的。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容