Spark Core源碼精讀計劃#11:Spark廣播機制的實現(xiàn)

目錄

前言

在RPC的領域里摸爬滾打了很長時間,是時候抽身出來看一看其他東西了。順著SparkEnv初始化的思路繼續(xù)看,下一個主要組件是廣播管理器BroadcastManager。本文就主要講解Spark中廣播機制的實現(xiàn)。

廣播變量是Spark兩種共享變量中的一種(另一種是累加器)。它適合處理多節(jié)點跨Stage的共享數(shù)據(jù),特別是輸入數(shù)據(jù)量較大的集合,可以提高效率。

廣播管理器BroadcastManager

BroadcastManager在SparkEnv中是直接初始化的,其代碼邏輯也很短,如下。

代碼#11.1 - o.a.s.broadcast.BroadcastManager類

private[spark] class BroadcastManager(
    val isDriver: Boolean,
    conf: SparkConf,
    securityManager: SecurityManager)
  extends Logging {
  private var initialized = false
  private var broadcastFactory: BroadcastFactory = null

  initialize()

  private def initialize() {
    synchronized {
      if (!initialized) {
        broadcastFactory = new TorrentBroadcastFactory
        broadcastFactory.initialize(isDriver, conf, securityManager)
        initialized = true
      }
    }
  }

  def stop() {
    broadcastFactory.stop()
  }

  private val nextBroadcastId = new AtomicLong(0)

  private[broadcast] val cachedValues = {
    new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)
  }

  def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = {
    broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
  }

  def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
    broadcastFactory.unbroadcast(id, removeFromDriver, blocking)
  }
}

構造方法參數(shù)

BroadcastManager在構造時有三個參數(shù),分別是isDriver(是否為Driver節(jié)點)、conf(對應的SparkConf配置)、securityManager(對應的SecurityManager)。非常簡單,不再贅述。

屬性成員

BroadcastManager內有四個屬性成員:

  • initialized表示BroadcastManager是否已經初始化完成。
  • broadcastFactory持有廣播工廠的實例(類型是BroadcastFactory特征的實現(xiàn)類)。
  • nextBroadcastId表示下一個廣播變量的唯一標識(AtomicLong類型的)。
  • cachedValues用來緩存已廣播出去的變量。它屬于ReferenceMap類型,是apache-commons提供的一個弱引用映射數(shù)據(jù)結構。與我們常見的各種Map不同,它的鍵值對有可能會在GC過程中被回收。

初始化邏輯

initialize()方法做的事情也非常簡單,它首先判斷BroadcastManager是否已初始化。如果未初始化,就新建廣播工廠TorrentBroadcastFactory,將其初始化,然后將初始化標記設為true。

對外提供的方法

BroadcastManager提供的方法有兩個:newBroadcast()方法,用于創(chuàng)建一個新的廣播變量;以及unbroadcast()方法,將已存在的廣播變量取消廣播。它們都是簡單地調用了TorrentBroadcastFactory中的同名方法,因此我們必須通過閱讀TorrentBroadcastFactory的相關源碼,才能了解Spark廣播機制的細節(jié)。

廣播變量TorrentBroadcast

來看TorrentBroadcastFactory.newBroadcast()方法。

代碼#11.2 - o.a.s.broadcast.TorrentBroadcastFactory.newBroadcast()方法

  override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long): Broadcast[T] = {
    new TorrentBroadcast[T](value_, id)
  }

可見只是簡單地(真的很簡單嗎?)創(chuàng)建了一個TorrentBroadcast對象實例,它就是前面一直在說的“廣播變量”的廬山真面目。下面我們來仔細研究它。

屬性成員及參數(shù)初始化

這個類中的屬性不算少哦。

代碼#11.3 - o.a.s.broadcast.TorrentBroadcast類的屬性成員

  @transient private lazy val _value: T = readBroadcastBlock()
  @transient private var compressionCodec: Option[CompressionCodec] = _
  @transient private var blockSize: Int = _

  private val broadcastId = BroadcastBlockId(id)
  private val numBlocks: Int = writeBlocks(obj)
  private var checksumEnabled: Boolean = false
  private var checksums: Array[Int] = _

  private def setConf(conf: SparkConf) {
    compressionCodec = if (conf.getBoolean("spark.broadcast.compress", true)) {
      Some(CompressionCodec.createCodec(conf))
    } else {
      None
    }
    blockSize = conf.getSizeAsKb("spark.broadcast.blockSize", "4m").toInt * 1024
    checksumEnabled = conf.getBoolean("spark.broadcast.checksum", true)
  }
  setConf(SparkEnv.get.conf)
  • _value:廣播塊的具體數(shù)據(jù)。注意它由lazy關鍵字定義,因此是懶加載的,也就是在TorrentBroadcast構造時不會調用readBroadcastBlock()方法獲取數(shù)據(jù),而會推遲到第一次訪問_value時。
  • compressionCodec:廣播塊的壓縮編解碼邏輯。當配置項spark.broadcast.compress為true時,會啟用壓縮。
  • blockSize:廣播塊的大小。由spark.broadcast.blockSize配置項來控制,默認值4MB。
  • broadcastId:廣播變量的ID。BroadcastBlockId是個結構非常簡單的case class,每產生一個新的廣播變量就會自增。
  • numBlocks:該廣播變量包含的塊數(shù)量。它與_value不同,并沒有l(wèi)azy關鍵字定義,因此在TorrentBroadcast構造時就會直接調用writeBlocks()方法。
  • checksumEnabled:是否允許對廣播塊計算校驗值,由spark.broadcast.checksum配置項控制,默認值true。
  • checksums:廣播塊的校驗值。

廣播變量的寫入

上面已經提到在TorrentBroadcast構造時會直接調用writeBlocks()方法,來看一看它的代碼。

代碼#11.4 - o.a.s.broadcast.TorrentBroadcast.writeBlocks()方法

  private def writeBlocks(value: T): Int = {
    import StorageLevel._

    val blockManager = SparkEnv.get.blockManager
    if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) {
      throw new SparkException(s"Failed to store $broadcastId in BlockManager")
    }
    val blocks =
      TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
    if (checksumEnabled) {
      checksums = new Array[Int](blocks.length)
    }
    blocks.zipWithIndex.foreach { case (block, i) =>
      if (checksumEnabled) {
        checksums(i) = calcChecksum(block)
      }
      val pieceId = BroadcastBlockId(id, "piece" + i)
      val bytes = new ChunkedByteBuffer(block.duplicate())
      if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) {
        throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager")
      }
    }
    blocks.length
  }

這個方法中涉及到了塊管理器BlockManager,它是Spark存儲子系統(tǒng)中的基礎組件,我們現(xiàn)在暫時不考慮它,后面還會對它進行十分詳盡的分析。writeBlocks()方法的執(zhí)行邏輯如下:

  1. 獲取BlockManager實例,調用其putSingle()方法將廣播數(shù)據(jù)作為單個對象寫入本地存儲。注意StorageLevel為MEMORY_AND_DISK,亦即在內存不足時會溢寫到磁盤,且副本數(shù)為1,不會進行復制。
  2. 調用blockifyObject()方法將廣播數(shù)據(jù)轉化為塊,即Spark存儲的基本單元。使用的序列化器為SparkEnv中初始化的JavaSerializer。
  3. 如果校驗值開關有效,就用calcChecksum()方法為每個塊計算校驗值。
  4. 為廣播數(shù)據(jù)切分成的每個塊(稱為piece)都生成一個帶"piece"的廣播ID,調用BlockManager.putBytes()方法將各個塊寫入MemoryStore(內存)或DiskStore(磁盤)。StorageLevel為MEMORY_AND_DISK_SER,寫入的數(shù)據(jù)會序列化。
  5. 最終返回塊的計數(shù)值。

上面提到的blockifyObject()、calcChecksum()方法的實現(xiàn)都比較簡單,就不再贅述。

廣播變量的讀取

先來看readBroadcastBlock()方法。

代碼#11.5 - o.a.s.broadcast.TorrentBroadcast.readBroadcastBlock()方法

  private def readBroadcastBlock(): T = Utils.tryOrIOException {
    TorrentBroadcast.synchronized {
      val broadcastCache = SparkEnv.get.broadcastManager.cachedValues

      Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse {
        setConf(SparkEnv.get.conf)
        val blockManager = SparkEnv.get.blockManager
        blockManager.getLocalValues(broadcastId) match {
          case Some(blockResult) =>
            if (blockResult.data.hasNext) {
              val x = blockResult.data.next().asInstanceOf[T]
              releaseLock(broadcastId)
              if (x != null) {
                broadcastCache.put(broadcastId, x)
              }
              x
            } else {
              throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId")
            }
          case None =>
            logInfo("Started reading broadcast variable " + id)
            val startTimeMs = System.currentTimeMillis()
            val blocks = readBlocks()
            logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs))

            try {
              val obj = TorrentBroadcast.unBlockifyObject[T](
                blocks.map(_.toInputStream()), SparkEnv.get.serializer, compressionCodec)
              val storageLevel = StorageLevel.MEMORY_AND_DISK
              if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) {
                throw new SparkException(s"Failed to store $broadcastId in BlockManager")
              }
              if (obj != null) {
                broadcastCache.put(broadcastId, obj)
              }
              obj
            } finally {
              blocks.foreach(_.dispose())
            }
        }
      }
    }
  }

其執(zhí)行邏輯如下:

  1. 獲取BlockManager實例,調用其getLocalValues()方法將之前寫入的廣播數(shù)據(jù)對象取出。
  2. 如果能夠直接取得廣播數(shù)據(jù),就調用releaseLock()方法【實際上對應BlockManager.releaseLock(),又對應Object.notifyAll()】解開當前塊的鎖。這個鎖用來保證塊讀寫的互斥性。
  3. 如果不能直接取得廣播數(shù)據(jù),說明數(shù)據(jù)都已經序列化,并且有可能不在本地存儲。此時調用readBlocks()方法從本地和遠端同時獲取塊,然后調用unBlockifyObject()方法將塊轉換回廣播數(shù)據(jù)的對象。
  4. 再次調用BlockManager.putSingle()方法將廣播數(shù)據(jù)作為單個對象寫入本地存儲,再將其加入廣播緩存Map中,下次讀取時就不用大費周章了。

readBlocks()方法的具體實現(xiàn)如下所示。

代碼#11.6 - o.a.s.broadcast.TorrentBroadcast.readBlocks()方法

  private def readBlocks(): Array[BlockData] = {
    val blocks = new Array[BlockData](numBlocks)
    val bm = SparkEnv.get.blockManager

    for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
      val pieceId = BroadcastBlockId(id, "piece" + pid)
      logDebug(s"Reading piece $pieceId of $broadcastId")
      bm.getLocalBytes(pieceId) match {
        case Some(block) =>
          blocks(pid) = block
          releaseLock(pieceId)
        case None =>
          bm.getRemoteBytes(pieceId) match {
            case Some(b) =>
              if (checksumEnabled) {
                val sum = calcChecksum(b.chunks(0))
                if (sum != checksums(pid)) {
                  throw new SparkException(s"corrupt remote block $pieceId of $broadcastId:" +
                    s" $sum != ${checksums(pid)}")
                }
              }
              if (!bm.putBytes(pieceId, b, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)) {
                throw new SparkException(
                  s"Failed to store $pieceId of $broadcastId in local BlockManager")
              }
              blocks(pid) = new ByteBufferBlockData(b, true)
            case None =>
              throw new SparkException(s"Failed to get $pieceId of $broadcastId")
          }
      }
    }
    blocks
  }

該方法會首先對所有廣播數(shù)據(jù)的piece進行打散,然后對打散之后的每個piece執(zhí)行以下步驟:

  1. 調用BlockManager.getLocalBytes()方法,從本地獲取序列化的廣播數(shù)據(jù)塊。將獲取到的塊放入對應下標的位置,并釋放該塊的鎖。
  2. 如果本地沒有廣播數(shù)據(jù),就調用BlockManager.getRemoteBytes()方法從遠端(其他Executor或者Driver)獲取廣播數(shù)據(jù)塊。
  3. 對遠程獲取的塊計算校驗值,并與之前寫入時計算的校驗值比對。如果不同,說明傳輸發(fā)生錯誤,拋異常出去。
  4. 若一切正常,調用BlockManager.putBytes()方法,將各個塊寫入MemoryStore(內存)或DiskStore(磁盤),并將其放入對應下標的位置。最終返回所有讀取的塊。

廣播變量讀取的流程圖描述

上面單單通過文字敘述可能會令人費解,因此下面畫一個標準的Flow chart來描述它的過程。


圖#11.1 - 廣播數(shù)據(jù)的讀取流程

總結

本文從廣播管理器BroadcastManager的初始化入手,揭示了廣播變量的本質——TorrentBroadcast,并通過引入塊管理器BlockManager的相關知識,詳細分析了廣播數(shù)據(jù)的寫入和讀取流程。

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容