Spark Core源碼精讀計劃#28:磁盤存儲DiskStore

目錄

前言

在上一篇文章中,我們認識了Spark管理磁盤塊的組件DiskBlockManager,本文接著來看真正負責磁盤存儲的組件DiskStore,以及與它相關的BlockData。這部分內(nèi)容會涉及到一點與Java NIO相關的東西,看官需要稍微注意一下。

磁盤存儲DiskStore

構造方法與屬性成員

代碼#28.1 - o.a.s.storage.DiskStore類的構造方法與屬性成員

private[spark] class DiskStore(
    conf: SparkConf,
    diskManager: DiskBlockManager,
    securityManager: SecurityManager) extends Logging {

  private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
  private val maxMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapLimitForTests",
    Int.MaxValue.toString)
  private val blockSizes = new ConcurrentHashMap[BlockId, Long]()

  // ......
}

DiskStore接受3個構造方法參數(shù),分別是SparkConf、DiskBlockManager和SecurityManager的實例,其中SecurityManager用于提供對數(shù)據(jù)加密的支持。3個屬性字段的含義如下:

  • minMemoryMapBytes:使用內(nèi)存映射(memory map)讀取文件的最小閾值,由配置項spark.storage.memoryMapThreshold指定,默認值2M。當磁盤中的文件大小超過該值時,就不會直接讀取,而用內(nèi)存映射文件來讀取,提高效率。
  • maxMemoryMapBytes:使用內(nèi)存映射讀取文件的最大閾值,由配置項spark.storage.memoryMapLimitForTests指定。它是個測試參數(shù),默認值為不限制。
  • blockSizes:維護塊ID與其對應大小之間的映射關系的ConcurrentHashMap。

寫入塊

寫入塊的邏輯由put()方法來實現(xiàn)。

代碼#28.2 - o.a.s.storage.DiskStore.put()/contains()方法

  def put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit = {
    if (contains(blockId)) {
      throw new IllegalStateException(s"Block $blockId is already present in the disk store")
    }
    logDebug(s"Attempting to put block $blockId")
    val startTime = System.currentTimeMillis
    val file = diskManager.getFile(blockId)
    val out = new CountingWritableChannel(openForWrite(file))
    var threwException: Boolean = true
    try {
      writeFunc(out)
      blockSizes.put(blockId, out.getCount)
      threwException = false
    } finally {
      try {
        out.close()
      } catch {
        case ioe: IOException =>
          if (!threwException) {
            threwException = true
            throw ioe
          }
      } finally {
         if (threwException) {
          remove(blockId)
        }
      }
    }
    val finishTime = System.currentTimeMillis
    logDebug("Block %s stored as %s file on disk in %d ms".format(
      file.getName,
      Utils.bytesToString(file.length()),
      finishTime - startTime))
  }

  def contains(blockId: BlockId): Boolean = {
    val file = diskManager.getFile(blockId.name)
    file.exists()
  }

put()方法首先調(diào)用contains()方法檢查塊是否已經(jīng)以文件的形式寫入了,只有沒有寫入才會繼續(xù)操作。然后,調(diào)用DiskBlockManager.getFile()方法打開塊ID對應的文件,然后獲取該文件的WritableByteChannel(NIO中的寫通道,表示可以通過調(diào)用write()方法向文件寫入數(shù)據(jù))。最后,調(diào)用參數(shù)中傳入的writeFunc函數(shù),操作WritableByteChannel將數(shù)據(jù)寫入,并將塊ID與其對應的字節(jié)數(shù)加入blockSizes映射。

接下來看一看代碼#28.2中調(diào)用的openForWrite()方法。

代碼#28.3 - o.a.s.storage.DiskStore.openForWrite()方法

  private def openForWrite(file: File): WritableByteChannel = {
    val out = new FileOutputStream(file).getChannel()
    try {
      securityManager.getIOEncryptionKey().map { key =>
        CryptoStreamUtils.createWritableChannel(out, conf, key)
      }.getOrElse(out)
    } catch {
      case e: Exception =>
        Closeables.close(out, true)
        file.delete()
        throw e
    }
  }

可見,該方法就是通過文件對象構造了文件輸出流FileOutputStream,然后獲取它對應的Channel對象用于寫數(shù)據(jù)。特別地,如果I/O需要加密,就需要另外調(diào)用CryptoStreamUtils.createWritableChannel()方法包裝,本文就不涉及了。至于CountingWritableChannel,也只是基于WritableByteChannel接口擴展出來的一個簡單類,增加了統(tǒng)計字節(jié)數(shù)的方法,代碼也就不再列出。

寫入字節(jié)

代碼#28.4 - o.a.s.storage.DiskStore.putBytes()方法

  def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
    put(blockId) { channel =>
      bytes.writeFully(channel)
    }
  }

可見,該方法除了塊ID外,還需要傳入封裝在ChunkedByteBuffer中的數(shù)據(jù)。調(diào)用上述put()方法時,傳入的writeFunc函數(shù)調(diào)用了ChunkedByteBuffer.writeFully()方法,負責將數(shù)據(jù)以一定的Chunk大小分塊寫入WritableByteChannel。

讀取字節(jié)

代碼#28.5 - o.a.s.storage.DiskStore.getBytes()方法

  def getBytes(blockId: BlockId): BlockData = {
    val file = diskManager.getFile(blockId.name)
    val blockSize = getSize(blockId)

    securityManager.getIOEncryptionKey() match {
      case Some(key) =>
        new EncryptedBlockData(file, blockSize, conf, key)
      case _ =>
        new DiskBlockData(minMemoryMapBytes, maxMemoryMapBytes, file, blockSize)
    }
  }

這段代碼很簡單,但可以注意到,在加密環(huán)境下和非加密環(huán)境下返回的結果是不同的,前者是EncryptedBlockData對象,后者是DiskBlockData對象,而它們都是BlockData的子類。顧名思義,BlockData就是對磁盤塊數(shù)據(jù)的具體封裝,下面選擇最常見的DiskBlockData來看一看。

磁盤塊數(shù)據(jù)DiskBlockData

這個類是定義在DiskStore下方的私有類,比較短,因此直接全貼在下面。

代碼#28.6 - o.a.s.storage.DiskBlockData類

private class DiskBlockData(
    minMemoryMapBytes: Long,
    maxMemoryMapBytes: Long,
    file: File,
    blockSize: Long) extends BlockData {
  override def toInputStream(): InputStream = new FileInputStream(file)

  override def toNetty(): AnyRef = new DefaultFileRegion(file, 0, size)

  override def toChunkedByteBuffer(allocator: (Int) => ByteBuffer): ChunkedByteBuffer = {
    Utils.tryWithResource(open()) { channel =>
      var remaining = blockSize
      val chunks = new ListBuffer[ByteBuffer]()
      while (remaining > 0) {
        val chunkSize = math.min(remaining, maxMemoryMapBytes)
        val chunk = allocator(chunkSize.toInt)
        remaining -= chunkSize
        JavaUtils.readFully(channel, chunk)
        chunk.flip()
        chunks += chunk
      }
      new ChunkedByteBuffer(chunks.toArray)
    }
  }

  override def toByteBuffer(): ByteBuffer = {
    require(blockSize < maxMemoryMapBytes,
      s"can't create a byte buffer of size $blockSize" +
      s" since it exceeds ${Utils.bytesToString(maxMemoryMapBytes)}.")
    Utils.tryWithResource(open()) { channel =>
      if (blockSize < minMemoryMapBytes) {
        val buf = ByteBuffer.allocate(blockSize.toInt)
        JavaUtils.readFully(channel, buf)
        buf.flip()
        buf
      } else {
        channel.map(MapMode.READ_ONLY, 0, file.length)
      }
    }
  }

  override def size: Long = blockSize

  override def dispose(): Unit = {}

  private def open() = new FileInputStream(file).getChannel
}

很久之前也已經(jīng)大概說過,BlockData特征只是定義了塊數(shù)據(jù)的轉(zhuǎn)化方式,具體的細節(jié)則留給各個實現(xiàn)類。我們具體看看toChunkedByteBuffer()和toByteBuffer()這兩個方法。

轉(zhuǎn)化為ChunkedByteBuffer

Utils.tryWithResource()方法實際上就是Java中try-with-resources的Scala實現(xiàn),因為Scala中并沒有這個語法糖。

toChunkedByteBuffer()方法會將文件轉(zhuǎn)化為輸入流FileInputStream,并獲取其ReadableFileChannel,再調(diào)用JavaUtils.readFully()方法將從Channel中取得的數(shù)據(jù)填充到ByteBuffer中。每個ByteBuffer即為一個Chunk,所有Chunk的數(shù)組形成最終的ChunkedByteBuffer。關于ChunkedByteBuffer在文章#21簡要提到過,之后會很快寫一篇番外文章專門講解它,因為有點意思。

轉(zhuǎn)化為ByteBuffer

toByteBuffer()方法會檢查塊大小是否小于spark.storage.memoryMapThreshold(終于出現(xiàn)了)。如果小于的話,就會采用與toChunkedByteBuffer()相同的方式直接填充ByteBuffer。反之,就調(diào)用ReadableFileChannel.map()方法將數(shù)據(jù)映射到MappedByteBuffer中,即進程的虛擬內(nèi)存中。不過,考慮到內(nèi)存映射的應用場景的話,2MB的閾值可能有點?。ūJ兀┝耍稽c碎碎念,請勿在意。

總結

本文研究了Spark磁盤存儲類DiskStore的具體實現(xiàn),主要是寫入塊/字節(jié)以及讀取字節(jié)的方法。另外,DiskStore讀取的字節(jié)會用BlockData來封裝,因此也順便了解了一下DiskBlockData的一點細節(jié)。

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容