Spark TorrentBroadcast

Broadcast 就是將數(shù)據(jù)從一個(gè)節(jié)點(diǎn)發(fā)送到其他各個(gè)節(jié)點(diǎn)上去。Spark有兩種方式:一種是HttpBroadcast,另一種是TorrentBroadcast。

Driver 端:
Driver 先把 data 序列化到 byteArray,然后切割成 BLOCK_SIZE(由 spark.broadcast.blockSize = 4MB 設(shè)置)大小的 data block。完成分塊切割后,就將分塊信息(稱為 meta 信息)存放到 driver 自己的 blockManager 里面,StorageLevel 為內(nèi)存+磁盤,同時(shí)會(huì)通知 driver 自己的 blockManagerMaster 說 meta 信息已經(jīng)存放好。通知 blockManagerMaster 這一步很重要,因?yàn)?blockManagerMaster 可以被 driver 和所有 executor 訪問到,信息被存放到 blockManagerMaster 就變成了全局信息。之后將每個(gè)分塊 data block 存放到 driver 的 blockManager 里面,StorageLevel 為內(nèi)存+磁盤。存放后仍然通知 blockManagerMaster 說 blocks 已經(jīng)存放好。到這一步,driver 的任務(wù)已經(jīng)完成。

Executor 端:
executor 收到 serialized task 后,先反序列化 task,這時(shí)候會(huì)反序列化 serialized task 中包含的數(shù)據(jù)類型是 TorrentBroadcast,也就是去調(diào)用 TorrentBroadcast.readBroadcastBlock()。先詢問所在的 executor 里的 blockManager 是會(huì)否包含 data,包含就直接從本地 blockManager 讀取 data。否則,就通過本地 blockManager 去連接 driver 的 blockManagerMaster 獲取 data 分塊的 meta 信息,獲取信息后,就開始了 BT 過程。

BT 過程:task 先在本地開一個(gè)ByteBuffer用于存放將要 fetch 過來的data block。然后打亂要 fetch 的 data blocks 的順序,比如如果 data block 共有 5 個(gè),那么打亂后的 fetch 順序可能是 3-1-2-4-5。然后按照打亂后的順序去 fetch 一個(gè)個(gè) data block。fetch 的過程就是通過 “本地 blockManager-driver/executor 的 blockManager-data” 得到 data。每 fetch 到一個(gè) block 就將其存放到 executor 的 blockManager 里面,同時(shí)通知 driver 上的 blockManagerMaster 說該 data block 多了一個(gè)存儲(chǔ)地址。這一步通知非常重要,意味著 blockManagerMaster 知道 data block 現(xiàn)在在 cluster 中有多份,下一個(gè)不同節(jié)點(diǎn)上的 task 再去 fetch 這個(gè) data block 的時(shí)候,可以有兩個(gè)選擇了,而且會(huì)隨機(jī)選擇一個(gè)去 fetch。這個(gè)過程持續(xù)下去就是 BT 協(xié)議,隨著下載的客戶端越來越多,data block 服務(wù)器也越來越多,就變成 p2p下載了。

最后將 data 存放到 task 所在 executor 的 blockManager 里面,StorageLevel 為內(nèi)存+磁盤。

  1. 在SparkEnv中創(chuàng)建BroadcastManager
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
  1. SparkContext的broadcast()方法會(huì)創(chuàng)建HttpBroadcast或者TorrentBroadcast
  def broadcast[T: ClassTag](value: T): Broadcast[T] = {
    assertNotStopped()
    if (classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass)) {
      // This is a warning instead of an exception in order to avoid breaking user programs that
      // might have created RDD broadcast variables but not used them:
      logWarning("Can not directly broadcast RDDs; instead, call collect() and "
        + "broadcast the result (see SPARK-5063)")
    }
    val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
    val callSite = getCallSite
    logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
    cleaner.foreach(_.registerBroadcastForCleanup(bc))
    bc
  }
  1. TorrentBroadcast的writeBlocks()方法
  private def writeBlocks(value: T): Int = {
    // Store a copy of the broadcast variable in the driver so that tasks run on the driver
    // do not create a duplicate copy of the broadcast variable's value.
    SparkEnv.get.blockManager.putSingle(broadcastId, value, StorageLevel.MEMORY_AND_DISK,
      tellMaster = false)
    val blocks =
      TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
    blocks.zipWithIndex.foreach { case (block, i) =>
      SparkEnv.get.blockManager.putBytes(
        BroadcastBlockId(id, "piece" + i),
        block,
        StorageLevel.MEMORY_AND_DISK_SER,
        tellMaster = true)
    }
    blocks.length
  }

首先調(diào)用putSingle將整個(gè)數(shù)據(jù)寫入到blockManager,然后調(diào)用blockifyObject將數(shù)據(jù)分成多個(gè)block,然后將每個(gè)block寫入到blockManager。

  1. TorrentBroadcast的readBlocks()方法
private def readBlocks(): Array[ByteBuffer] = {
    // Fetch chunks of data. Note that all these chunks are stored in the BlockManager and reported
    // to the driver, so other executors can pull these chunks from this executor as well.
    val blocks = new Array[ByteBuffer](numBlocks)
    val bm = SparkEnv.get.blockManager
    
    // 將block打散
    for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
      val pieceId = BroadcastBlockId(id, "piece" + pid)
      logDebug(s"Reading piece $pieceId of $broadcastId")
      // First try getLocalBytes because there is a chance that previous attempts to fetch the
      // broadcast blocks have already fetched some of the blocks. In that case, some blocks
      // would be available locally (on this executor).
      def getLocal: Option[ByteBuffer] = bm.getLocalBytes(pieceId)
      def getRemote: Option[ByteBuffer] = bm.getRemoteBytes(pieceId).map { block =>
        // If we found the block from remote executors/driver's BlockManager, put the block
        // in this executor's BlockManager.
        SparkEnv.get.blockManager.putBytes(
          pieceId,
          block,
          StorageLevel.MEMORY_AND_DISK_SER,
          tellMaster = true)
        block
      }
      val block: ByteBuffer = getLocal.orElse(getRemote).getOrElse(
        throw new SparkException(s"Failed to get $pieceId of $broadcastId"))
      blocks(pid) = block
    }
    blocks
  }

先看本地有沒有block,如果沒有,則從driver或者其他executor獲取。下面是從遠(yuǎn)處獲取block的方法。

private def doGetRemote(blockId: BlockId, asBlockResult: Boolean): Option[Any] = {
    require(blockId != null, "BlockId is null")
    val locations = Random.shuffle(master.getLocations(blockId))
    for (loc <- locations) {
      logDebug(s"Getting remote block $blockId from $loc")
      val data = blockTransferService.fetchBlockSync(
        loc.host, loc.port, loc.executorId, blockId.toString).nioByteBuffer()

      if (data != null) {
        if (asBlockResult) {
          return Some(new BlockResult(
            dataDeserialize(blockId, data),
            DataReadMethod.Network,
            data.limit()))
        } else {
          return Some(data)
        }
      }
      logDebug(s"The value of block $blockId is null")
    }
    logDebug(s"Block $blockId not found")
    None
  }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容