BroadcastManager--SparkEnv

Broadcast是分布式的數(shù)據(jù)共享,由BroadcastManager負(fù)責(zé)管理其創(chuàng)建或銷毀。Broadcast一般用于處理共享的配置文件、通用Dataset、常用數(shù)據(jù)結(jié)構(gòu)

廣播的Broadcast變量是只讀變量,保證數(shù)據(jù)的一致性。其數(shù)據(jù)保存方式是StorageLevel.MEMORY_AND_DISK,所以不會(huì)內(nèi)存溢出,但廣播大對(duì)象會(huì)導(dǎo)致網(wǎng)絡(luò)IO或單點(diǎn)壓力

Broadcast架構(gòu)是常用的工廠模式(只生產(chǎn)一個(gè)產(chǎn)品Broadcast):BroadcastManager;BroadcastFactory、TorrentBroadcastFactory;Broadcast、TorrentBroadcast

Broadcast使用姿勢(shì)

scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1, 2, 3)

通過SparkContext創(chuàng)建廣播對(duì)象,然后value獲取廣播對(duì)象值。下面分析SparkContext的創(chuàng)建廣播代碼

def broadcast[T: ClassTag](value: T): Broadcast[T] = {
  assertNotStopped()
  // 不能直接對(duì)RDD類型進(jìn)行broadcast操作,必須針對(duì)具體的值??梢韵萺dd.collect(),然后對(duì)結(jié)果broadcast
  require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass),
    "Can not directly broadcast RDDs; instead, call collect() and broadcast the result.")
  // 調(diào)用SparkEnv的BroadcastManager創(chuàng)建廣播對(duì)象
  val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
  // 獲取調(diào)用堆棧信息
  val callSite = getCallSite
  logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm)
  // 將broadcast對(duì)象關(guān)聯(lián)CleanupTaskWeakReference弱引用,并while循環(huán)檢測(cè)referenceQueue。當(dāng)Broadcast對(duì)象沒有強(qiáng)引用時(shí),會(huì)被GC回收,同時(shí)其關(guān)聯(lián)的CleanupTaskWeakReference弱引用對(duì)象會(huì)被put到referenceQueue
  // CleanupTaskWeakReference繼承WeakReference,并加入CleanupTask類型字段,這樣從referenceQueue remove出弱引用時(shí),能獲取到CleanupTask字段值,進(jìn)一步判斷是否為CleanBroadcast,也就是獲取到broadcastId上下文,再調(diào)用broadcastManager.unbroadcast()執(zhí)行刪除集群廣播對(duì)象工作
  cleaner.foreach(_.registerBroadcastForCleanup(bc))
  bc
}

也可以使用Guava的FinalizableWeakReference類,在強(qiáng)引用對(duì)象被回收時(shí),回調(diào)finalizeReferent()方法:無須自己新建一個(gè)Thread,維護(hù)ReferenceQueue,并while循環(huán)執(zhí)行referenceQueue.remove(100),獲取弱引用對(duì)象,執(zhí)行額外清理工作

Guava的FinalizableReference接口只有一個(gè)方法void finalizeReferent();方法沒有入?yún)?,沒有類似的broadcastId上下文,通過閉包實(shí)現(xiàn)上下文的傳遞

WeakReference

Guava中的FinalizableReferenceQueue解析

BroadcastManager的創(chuàng)建

BroadcastManager是用來管理Broadcast,該對(duì)象在SparkEnv中創(chuàng)建

val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)

BroadcastManager類的initialize()初始化TorrentBroadcastFactory工廠,stop()、newBroadcast()、unbroadcast()分別調(diào)用BroadcastFactory接口方法,nextBroadcastId是由AtomicLong(0)生成,且自增

private[spark] class BroadcastManager(
    val isDriver: Boolean,
    conf: SparkConf,
    securityManager: SecurityManager)
  extends Logging {

  private var initialized = false
  private var broadcastFactory: BroadcastFactory = null

  initialize()

  // Called by SparkContext or Executor before using Broadcast
  private def initialize() {
    synchronized {
      if (!initialized) {
        broadcastFactory = new TorrentBroadcastFactory
        broadcastFactory.initialize(isDriver, conf, securityManager)
        initialized = true
      }
    }
  }

  def stop() {
    broadcastFactory.stop()
  }

  private val nextBroadcastId = new AtomicLong(0)

  private[broadcast] val cachedValues = {
    // HARD key, WEAK value。key強(qiáng)引用不會(huì)被回收,value弱引用在gc時(shí)被回收
    new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)
  }

  def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = {
    broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
  }

  def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
    broadcastFactory.unbroadcast(id, removeFromDriver, blocking)
  }
}

StrongReference(強(qiáng)) > SoftReference(軟) > WeakReference(弱) > PhantomReference(虛)

使用SoftReference或WeakReference包裝對(duì)象時(shí),比如new WeakReference(Object, new ReferenceQueue()),當(dāng)GC時(shí),Object只被軟引用或弱引用,沒有被別的對(duì)象強(qiáng)引用

軟引用:內(nèi)存足夠,GC時(shí)不回收此Object;內(nèi)存OOM前,GC回收此Object。常用于Cache
弱引用:不管內(nèi)存是否足夠,GC時(shí)Object都會(huì)被回收

Java中的四種引用類型

cachedValues是apache ReferenceMap實(shí)現(xiàn)的,key是HARD強(qiáng)引用,value是WEAK弱引用。WeakHashMap的key是weak弱引用,value是強(qiáng)引用,當(dāng)key被回收后,通過expungeStaleEntries()方法將e.value = null;斷開WeakHashMap的強(qiáng)引用

ReferenceMap是線程不安全的,可以使用Collections.synchronizedMap包裝,或使用spring的ConcurrentReferenceHashMap,類似ConcurrentHashMap實(shí)現(xiàn),并使用WeakReference或SoftReference包裝Entry<K,V>,而不是K、V使用不同的引用類型;或者使用Guava的MapMaker類構(gòu)造線程安全Map:ConcurrentMap<String, String> concurrentMap = new MapMaker().weakValues().makeMap();

TorrentBroadcastFactory工廠

定義BroadcastFactory接口工廠,創(chuàng)建Broadcast的newBroadcast()方法、初始化initialize()、刪除Broadcast的unbroadcast()、停止stop()

/**
 * An interface for all the broadcast implementations in Spark (to allow
 * multiple broadcast implementations). SparkContext uses a BroadcastFactory
 * implementation to instantiate a particular broadcast for the entire Spark job.
 */
private[spark] trait BroadcastFactory {

  def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager): Unit

  /**
   * Creates a new broadcast variable.
   *
   * @param value value to broadcast
   * @param isLocal whether we are in local mode (single JVM process)
   * @param id unique id representing this broadcast variable
   */
  def newBroadcast[T: ClassTag](value: T, isLocal: Boolean, id: Long): Broadcast[T]

  def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit

  def stop(): Unit
}

TorrentBroadcastFactory具體實(shí)現(xiàn),調(diào)用TorrentBroadcast方法實(shí)現(xiàn)。也就是BroadcastManager調(diào)用BroadcastFactory,BroadcastFactory再調(diào)用Broadcast

/**
 * A [[org.apache.spark.broadcast.Broadcast]] implementation that uses a BitTorrent-like
 * protocol to do a distributed transfer of the broadcasted data to the executors. Refer to
 * [[org.apache.spark.broadcast.TorrentBroadcast]] for more details.
 */
private[spark] class TorrentBroadcastFactory extends BroadcastFactory {

  override def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) { }

  override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long): Broadcast[T] = {
    new TorrentBroadcast[T](value_, id)
  }

  override def stop() { }

  /**
   * Remove all persisted state associated with the torrent broadcast with the given ID.
   * @param removeFromDriver Whether to remove state from the driver.
   * @param blocking Whether to block until unbroadcasted
   */
  override def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
    TorrentBroadcast.unpersist(id, removeFromDriver, blocking)
  }
}

TorrentBroadcast對(duì)象

Broadcast是抽象類,因?yàn)橛袑?shí)例字段_isValid、_destroySite,普通方法value()、unpersist()、destroy(),以及抽象方法getValue()、doUnpersist(blocking)、doDestroy(blocking);而BroadcastFactory則定義為接口

/**
 * A broadcast variable. Broadcast variables allow the programmer to keep a read-only variable
 * cached on each machine rather than shipping a copy of it with tasks. They can be used, for
 * example, to give every node a copy of a large input dataset in an efficient manner. Spark also
 * attempts to distribute broadcast variables using efficient broadcast algorithms to reduce
 * communication cost.
 *
 * Broadcast variables are created from a variable `v` by calling
 * [[org.apache.spark.SparkContext#broadcast]].
 * The broadcast variable is a wrapper around `v`, and its value can be accessed by calling the
 * `value` method. The interpreter session below shows this:
 *
 * {{{
 * scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
 * broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
 *
 * scala> broadcastVar.value
 * res0: Array[Int] = Array(1, 2, 3)
 * }}}
 *
 * After the broadcast variable is created, it should be used instead of the value `v` in any
 * functions run on the cluster so that `v` is not shipped to the nodes more than once.
 * In addition, the object `v` should not be modified after it is broadcast in order to ensure
 * that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped
 * to a new node later).
 *
 * @param id A unique identifier for the broadcast variable.
 * @tparam T Type of the data contained in the broadcast variable.
 */
abstract class Broadcast[T: ClassTag](val id: Long) extends Serializable with Logging {

  /**
   * Flag signifying whether the broadcast variable is valid
   * (that is, not already destroyed) or not.
   */
  @volatile private var _isValid = true

  private var _destroySite = ""

  /** Get the broadcasted value. */
  // 下面是3個(gè)抽象方法,下沉到具體子類實(shí)現(xiàn)邏輯
  def value: T = {
    assertValid()
    getValue()
  }

  /**
   * Asynchronously delete cached copies of this broadcast on the executors.
   * If the broadcast is used after this is called, it will need to be re-sent to each executor.
   */
  def unpersist() {
    unpersist(blocking = false)
  }

  /**
   * Delete cached copies of this broadcast on the executors. If the broadcast is used after
   * this is called, it will need to be re-sent to each executor.
   * @param blocking Whether to block until unpersisting has completed
   */
  def unpersist(blocking: Boolean) {
    assertValid()
    doUnpersist(blocking)
  }

  /**
   * Destroy all data and metadata related to this broadcast variable. Use this with caution;
   * once a broadcast variable has been destroyed, it cannot be used again.
   * This method blocks until destroy has completed
   */
  def destroy() {
    destroy(blocking = true)
  }

  /**
   * Destroy all data and metadata related to this broadcast variable. Use this with caution;
   * once a broadcast variable has been destroyed, it cannot be used again.
   * @param blocking Whether to block until destroy has completed
   */
  private[spark] def destroy(blocking: Boolean) {
    assertValid()
    _isValid = false
    
    // 獲取線程方法棧信息。last、first都是針對(duì)先進(jìn)后出的方法棧命名
    // shortForm: s"$lastSparkMethod at $firstUserFile:$firstUserLine"
    // longForm = callStack.take(callStackDepth).mkString("\n")
    _destroySite = Utils.getCallSite().shortForm
    logInfo("Destroying %s (from %s)".format(toString, _destroySite))
    doDestroy(blocking)
  }

  /**
   * Whether this Broadcast is actually usable. This should be false once persisted state is
   * removed from the driver.
   */
  private[spark] def isValid: Boolean = {
    _isValid
  }

  /**
   * Actually get the broadcasted value. Concrete implementations of Broadcast class must
   * define their own way to get the value.
   */
  protected def getValue(): T

  /**
   * Actually unpersist the broadcasted value on the executors. Concrete implementations of
   * Broadcast class must define their own logic to unpersist their own data.
   */
  protected def doUnpersist(blocking: Boolean)

  /**
   * Actually destroy all data and metadata related to this broadcast variable.
   * Implementation of Broadcast class must define their own logic to destroy their own
   * state.
   */
  protected def doDestroy(blocking: Boolean)

  /** Check if this broadcast is valid. If not valid, exception is thrown. */
  protected def assertValid() {
    if (!_isValid) {
      throw new SparkException(
        "Attempted to use %s after it was destroyed (%s) ".format(toString, _destroySite))
    }
  }

  override def toString: String = "Broadcast(" + id + ")"
}

重點(diǎn)分析TorrentBroadcast類,構(gòu)造函數(shù)參數(shù)是obj廣播對(duì)象+nextBroadcastId。涉及BlockManager類的方法后續(xù)分析

/**
 * A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]].
 *
 * The mechanism is as follows:
 *
 * The driver divides the serialized object into small chunks and
 * stores those chunks in the BlockManager of the driver.
 *
 * On each executor, the executor first attempts to fetch the object from its BlockManager. If
 * it does not exist, it then uses remote fetches to fetch the small chunks from the driver and/or
 * other executors if available. Once it gets the chunks, it puts the chunks in its own
 * BlockManager, ready for other executors to fetch from.
 *
 * This prevents the driver from being the bottleneck in sending out multiple copies of the
 * broadcast data (one per executor).
 *
 * When initialized, TorrentBroadcast objects read SparkEnv.get.conf.
 *
 * @param obj object to broadcast
 * @param id A unique identifier for the broadcast variable.
 */
private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
  extends Broadcast[T](id) with Logging with Serializable {

  /**
   * Value of the broadcast object on executors. This is reconstructed by [[readBroadcastBlock]],
   * which builds this value by reading blocks from the driver and/or other executors.
   *
   * On the driver, if the value is required, it is read lazily from the block manager.
   */
   // 這是一個(gè)lazy方式,當(dāng)executor需要廣播對(duì)象時(shí),從BlockManager中延遲讀取
  @transient private lazy val _value: T = readBroadcastBlock()

  /** The compression codec to use, or None if compression is disabled */
  @transient private var compressionCodec: Option[CompressionCodec] = _
  /** Size of each block. Default value is 4MB.  This value is only read by the broadcaster. */
  @transient private var blockSize: Int = _

  // 初始化三個(gè)屬性:compressionCodec、blockSize、checksumEnabled
  private def setConf(conf: SparkConf) {
    compressionCodec = if (conf.getBoolean("spark.broadcast.compress", true)) {
      Some(CompressionCodec.createCodec(conf))
    } else {
      None
    }
    // Note: use getSizeAsKb (not bytes) to maintain compatibility if no units are provided
    // 每個(gè)數(shù)據(jù)塊的大小是4M,也就是4*1024*1024
    blockSize = conf.getSizeAsKb("spark.broadcast.blockSize", "4m").toInt * 1024
    checksumEnabled = conf.getBoolean("spark.broadcast.checksum", true)
  }
  setConf(SparkEnv.get.conf)

  // broadcast_0 (id是BroadcastManager類的nextBroadcastId字段,AtomicLong自增)
  private val broadcastId = BroadcastBlockId(id)

  /** Total number of blocks this broadcast variable contains. */
  // 構(gòu)造TorrentBroadcast對(duì)象時(shí),進(jìn)行廣播對(duì)象的寫操作,并返回?cái)?shù)據(jù)塊總數(shù)
  private val numBlocks: Int = writeBlocks(obj)

  /** Whether to generate checksum for blocks or not. */
  private var checksumEnabled: Boolean = false
  /** The checksum for all the blocks. */
  // 保存每個(gè)block的校驗(yàn)和,對(duì)應(yīng)writeBlocks()方法
  private var checksums: Array[Int] = _

  override protected def getValue() = {
    _value
  }

  // 對(duì)block計(jì)算校驗(yàn)和:Adler32相比CRC32計(jì)算更快
  // Checksum checksumEngine = new Adler32(); checksumEngine.update(bytes); long checksum = checksumEngine.getValue();
  private def calcChecksum(block: ByteBuffer): Int = {
    val adler = new Adler32()
    if (block.hasArray) {
      adler.update(block.array, block.arrayOffset + block.position(), block.limit()
        - block.position())
    } else {
      val bytes = new Array[Byte](block.remaining())
      block.duplicate.get(bytes)
      adler.update(bytes)
    }
    adler.getValue.toInt
  }

  /**
   * Divide the object into multiple blocks and put those blocks in the block manager.
   *
   * @param value the object to divide
   * @return number of blocks this broadcast variable is divided into
   */
   // 1. value對(duì)象putSingle到driver本地  2. 對(duì)象按block拆分成ByteBuffer數(shù)組:blockifyObject  3. 計(jì)算每個(gè)block的校驗(yàn)和  4. 將每個(gè)block進(jìn)行putBytes
  private def writeBlocks(value: T): Int = {
    import StorageLevel._
    // Store a copy of the broadcast variable in the driver so that tasks run on the driver
    // do not create a duplicate copy of the broadcast variable's value.
    // 獲取當(dāng)前Executor的BlockManager組件
    val blockManager = SparkEnv.get.blockManager
    
    // 調(diào)用BlockManager的putSingle方法將廣播對(duì)象寫入本地的存儲(chǔ)體系。當(dāng)Spark以local模式運(yùn)行時(shí),則會(huì)將廣播對(duì)象寫入Driver本地的存儲(chǔ)體系,以便于任務(wù)也可以在Driver上執(zhí)行。由于MEMORY_AND_DISK對(duì)應(yīng)的StorageLevel的_replication屬性固定為1,因此此處只會(huì)將廣播對(duì)象寫入Driver或Executor本地的存儲(chǔ)體系
    if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) {
      throw new SparkException(s"Failed to store $broadcastId in BlockManager")
    }
    
    // 將對(duì)象經(jīng)過序列化、壓縮轉(zhuǎn)換成一系列的字節(jié)塊Array[ByteBuffer],每個(gè)塊大小: 4*1024*1024
    val blocks =
      TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
      
    // 如果需要給分片廣播塊生成校驗(yàn)和,則創(chuàng)建與blocks塊長度一致的checksums數(shù)組
    if (checksumEnabled) {
      checksums = new Array[Int](blocks.length)
    }
    blocks.zipWithIndex.foreach { case (block, i) =>
      if (checksumEnabled) {
        // 為每個(gè)ByteBuffer的block計(jì)算校驗(yàn)和,并保存到數(shù)組
        checksums(i) = calcChecksum(block)
      }
      // broadcast_0_piece0、broadcast_0_piece1、broadcast_0_piece2...
      val pieceId = BroadcastBlockId(id, "piece" + i)
      val bytes = new ChunkedByteBuffer(block.duplicate())
      // 將每個(gè)分片廣播數(shù)據(jù)塊以序列化方式寫入Driver本地的存儲(chǔ)體系,存儲(chǔ)方式為MEMORY_AND_DISK_SER,同時(shí)tellMaster注冊(cè)成為下載源
      if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) {
        throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager")
      }
    }
    blocks.length
  }

  /** Fetch torrent blocks from the driver and/or other executors. */
  // 從driver或executors循環(huán)獲取廣播對(duì)象的每個(gè)塊數(shù)據(jù)
  // 將BlockData定義為接口,而不是某個(gè)具體實(shí)現(xiàn)類,抽象程度更高
  private def readBlocks(): Array[BlockData] = {
    // Fetch chunks of data. Note that all these chunks are stored in the BlockManager and reported
    // to the driver, so other executors can pull these chunks from this executor as well.
    val blocks = new Array[BlockData](numBlocks)
    val bm = SparkEnv.get.blockManager

    // 對(duì)各個(gè)廣播分片隨機(jī)shuffle,避免某個(gè)塊的獲取出現(xiàn)“熱點(diǎn)”。都從0開始的話,前面的塊都是"熱點(diǎn)"
    for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
      val pieceId = BroadcastBlockId(id, "piece" + pid)
      logDebug(s"Reading piece $pieceId of $broadcastId")
      // First try getLocalBytes because there is a chance that previous attempts to fetch the
      // broadcast blocks have already fetched some of the blocks. In that case, some blocks
      // would be available locally (on this executor).
      // 從存儲(chǔ)體系獲取pieceId對(duì)應(yīng)的Block數(shù)據(jù),并封裝為BlockData
      // 因?yàn)榭赡芷渌鹐xecutor進(jìn)程正在或已經(jīng)下載了此Block數(shù)據(jù),所以先getLocalBytes
      bm.getLocalBytes(pieceId) match {
        case Some(block) =>
          blocks(pid) = block
          releaseLock(pieceId)
        case None =>
          // 從遠(yuǎn)端的BlockManager以序列化的字節(jié)形式獲取Block數(shù)據(jù),返回ChunkedByteBuffer
          bm.getRemoteBytes(pieceId) match {
            case Some(b) =>
              // 檢查校驗(yàn)和
              if (checksumEnabled) {
                // ChunkedByteBuffer數(shù)組的第一個(gè)ByteBuffer是Block數(shù)據(jù)
                val sum = calcChecksum(b.chunks(0))
                // 與本地保存的校驗(yàn)和數(shù)組對(duì)應(yīng)的值進(jìn)行對(duì)比
                if (sum != checksums(pid)) {
                  throw new SparkException(s"corrupt remote block $pieceId of $broadcastId:" +
                    s" $sum != ${checksums(pid)}")
                }
              }
              // We found the block from remote executors/driver's BlockManager, so put the block
              // in this executor's BlockManager.
              // 因?yàn)锽lock是從遠(yuǎn)程的Driver或其他Executors的BlockManager獲取的,所以再把此Block保存到當(dāng)前Executor進(jìn)程的BlockManager里,并tellMaster成為下載源
              if (!bm.putBytes(pieceId, b, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)) {
                throw new SparkException(
                  s"Failed to store $pieceId of $broadcastId in local BlockManager")
              }
              // 封裝成ByteBufferBlockData對(duì)象
              blocks(pid) = new ByteBufferBlockData(b, true)
            case None =>
              throw new SparkException(s"Failed to get $pieceId of $broadcastId")
          }
      }
    }
    blocks
  }

  /**
   * Remove all persisted state associated with this Torrent broadcast on the executors.
   */
   // 刪除所有Executors的廣播對(duì)象。與下面一個(gè)方法的區(qū)別就是:是否同時(shí)刪除Driver上的廣播對(duì)象
  override protected def doUnpersist(blocking: Boolean) {
    TorrentBroadcast.unpersist(id, removeFromDriver = false, blocking)
  }

  /**
   * Remove all persisted state associated with this Torrent broadcast on the executors
   * and driver.
   */
  override protected def doDestroy(blocking: Boolean) {
    TorrentBroadcast.unpersist(id, removeFromDriver = true, blocking)
  }

  /** Used by the JVM when serializing this object. */
  // 序列化類的一種方式。當(dāng)類繼承Serializable接口,序列化對(duì)象時(shí),ObjectOutputStream.writeSerialData()會(huì)反射調(diào)用類的writeObject()方法實(shí)現(xiàn)序列化操作
  // 序列化的幾種實(shí)現(xiàn): 1. 直接繼承Serializable接口 2. Serializable+transient 3. 實(shí)現(xiàn)Externalizable接口的writeExternal、readExternal方法 4. Serializable+類中實(shí)現(xiàn)writeObject、readObject方法,通過ObjectOutputStream反射調(diào)用
  private def writeObject(out: ObjectOutputStream): Unit = Utils.tryOrIOException {   // 這句就是重寫writeObject()方法的意義所在,判斷廣播對(duì)象是否已經(jīng)銷毀
    assertValid()
    // 將當(dāng)前類的非靜態(tài)和非transient字段寫入OutputStream,Serializable接口的默認(rèn)實(shí)現(xiàn)
    // 序列化的是對(duì)象狀態(tài),靜態(tài)字段不屬于對(duì)象,而是類級(jí)
    out.defaultWriteObject()
  }

  // 廣播對(duì)象的讀操作,同步操作
  private def readBroadcastBlock(): T = Utils.tryOrIOException {
    TorrentBroadcast.synchronized {
      // 獲取broadcastCache,key是BroadcastBlockId對(duì)象,HARD引用類型;value是廣播對(duì)象T,WEAK引用類型
      val broadcastCache = SparkEnv.get.broadcastManager.cachedValues

      // 先走cache,因?yàn)関alue是Weak引用,所以當(dāng)GC回收廣播對(duì)象后,先blockManager.getLocalValues從存儲(chǔ)獲取廣播對(duì)象,并cache
      Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse {
        setConf(SparkEnv.get.conf)
        val blockManager = SparkEnv.get.blockManager
        // 從本地的存儲(chǔ)系統(tǒng)中獲取廣播對(duì)象,即通過BlockManager的putSingle方法寫入存儲(chǔ)體系的廣播對(duì)象。這里返回BlockResult類型,使用Iterator包裝廣播對(duì)象obj
        blockManager.getLocalValues(broadcastId) match {
          case Some(blockResult) =>
            if (blockResult.data.hasNext) {
             // 轉(zhuǎn)換成廣播對(duì)象,并釋放塊鎖
              val x = blockResult.data.next().asInstanceOf[T]
              releaseLock(broadcastId)

              if (x != null) {
                // 緩存
                broadcastCache.put(broadcastId, x)
              }

              x
            } else {
              throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId")
            }
          case None =>
            logInfo("Started reading broadcast variable " + id)
            val startTimeMs = System.currentTimeMillis()
            
            // 讀取廣播對(duì)象的所有Blocks,返回Array[BlockData]數(shù)據(jù)
            val blocks = readBlocks()
            logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs))

            try {
             // 將Array[BlockData]合并InputStream,并解壓、反序列化成對(duì)象T
              val obj = TorrentBroadcast.unBlockifyObject[T](
                blocks.map(_.toInputStream()), SparkEnv.get.serializer, compressionCodec)
              // Store the merged copy in BlockManager so other tasks on this executor don't
              // need to re-fetch it.
              val storageLevel = StorageLevel.MEMORY_AND_DISK
              // 保存obj到當(dāng)前Executor的BlockManager,供本機(jī)的其他Executor進(jìn)程獲取
              if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) {
                throw new SparkException(s"Failed to store $broadcastId in BlockManager")
              }

              if (obj != null) {
                broadcastCache.put(broadcastId, obj)
              }

              obj
            } finally {
              // blocks生命周期結(jié)束,clean堆外內(nèi)存
              blocks.foreach(_.dispose())
            }
        }
      }
    }
  }

  /**
   * If running in a task, register the given block's locks for release upon task completion.
   * Otherwise, if not running in a task then immediately release the lock.
   */
   // 喚醒BlockId對(duì)應(yīng)的讀或?qū)懽枞瑘?zhí)行BlockInfoManager.unlock()方法
   // 這個(gè)鎖保證當(dāng)數(shù)據(jù)塊被一個(gè)運(yùn)行中的任務(wù)使用時(shí),其他任務(wù)不能再次使用,直到此任務(wù)完成并釋放鎖
   // BlockId是抽象類設(shè)計(jì)
  private def releaseLock(blockId: BlockId): Unit = {
    val blockManager = SparkEnv.get.blockManager
    // TaskContext是ThreadLocal模式,抽象類設(shè)計(jì)
    Option(TaskContext.get()) match {
      case Some(taskContext) =>
      // 當(dāng)get到TaskContext對(duì)象,也就是在Task執(zhí)行中進(jìn)行Block獲取時(shí),在TaskContext執(zhí)行結(jié)束(無論成功或失敗),釋放lock
        taskContext.addTaskCompletionListener(_ => blockManager.releaseLock(blockId))
      case None =>
        // This should only happen on the driver, where broadcast variables may be accessed
        // outside of running tasks (e.g. when computing rdd.partitions()). In order to allow
        // broadcast variables to be garbage collected we need to free the reference here
        // which is slightly unsafe but is technically okay because broadcast variables aren't
        // stored off-heap.
        blockManager.releaseLock(blockId)
    }
  }
}

// 寫數(shù)據(jù)時(shí),從外而內(nèi),先序列化再壓縮: ((compress)serialize).write(obj)
// 讀數(shù)據(jù)時(shí),從內(nèi)而外,先解壓再反序列化: ((compress(obj))serialize).read
private object TorrentBroadcast extends Logging {

  // 將對(duì)象obj轉(zhuǎn)換成一系列的字節(jié)塊Array[ByteBuffer],每個(gè)塊大小: 4*1024*1024
  // 先序列化,再壓縮,最后轉(zhuǎn)換成字節(jié)塊。OutputStream是從外而內(nèi)操作數(shù)據(jù)
  def blockifyObject[T: ClassTag](
      obj: T,
      blockSize: Int,
      serializer: Serializer,
      compressionCodec: Option[CompressionCodec]): Array[ByteBuffer] = {
    // ChunkedByteBufferOutputStream包裝
    val cbbos = new ChunkedByteBufferOutputStream(blockSize, ByteBuffer.allocate)
    // CompressionCodec壓縮
    val out = compressionCodec.map(c => c.compressedOutputStream(cbbos)).getOrElse(cbbos)
    val ser = serializer.newInstance()
    // 序列化Stream
    val serOut = ser.serializeStream(out)
    Utils.tryWithSafeFinally {
      // OutputStream的write方法輸入是byte[],obj先序列化成byte[],再壓縮成另一個(gè)byte[],最后將壓縮的byte[]轉(zhuǎn)換成字節(jié)塊,也就是調(diào)用ChunkedByteBufferOutputStream的write方法
      serOut.writeObject[T](obj)
    } {
      serOut.close()
    }
    cbbos.toChunkedByteBuffer.getChunks()
  }

  // 將Array[InputStream]轉(zhuǎn)換成對(duì)象
  // 先解壓,再反序列化。InputStream是從內(nèi)而外的操作數(shù)據(jù)
  def unBlockifyObject[T: ClassTag](
      blocks: Array[InputStream],
      serializer: Serializer,
      compressionCodec: Option[CompressionCodec]): T = {
    require(blocks.nonEmpty, "Cannot unblockify an empty array of blocks")
    // 順序合并流SequenceInputStream
    val is = new SequenceInputStream(blocks.iterator.asJavaEnumeration)
    // 解壓
    val in: InputStream = compressionCodec.map(c => c.compressedInputStream(is)).getOrElse(is)
    val ser = serializer.newInstance()
    val serIn = ser.deserializeStream(in)
    // Utils.tryWithSafeFinally封裝
    val obj = Utils.tryWithSafeFinally {
      // 反序列化
      serIn.readObject[T]()
    } {
      serIn.close()
    }
    obj
  }

  /**
   * Remove all persisted blocks associated with this torrent broadcast on the executors.
   * If removeFromDriver is true, also remove these persisted blocks on the driver.
   */
  // 根據(jù)id刪除所有Executors進(jìn)程上的廣播數(shù)據(jù)blocks
  // removeFromDriver: 判斷是否刪除driver上的廣播對(duì)象
  def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean): Unit = {
    logDebug(s"Unpersisting TorrentBroadcast $id")
    SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
  }
}

Broadcast實(shí)現(xiàn)總結(jié)

  1. Driver端創(chuàng)建廣播對(duì)象,writeBlocks(obj)將對(duì)象分塊,每個(gè)塊做為一個(gè)block存進(jìn)Driver端的BlockManager
  2. Broadcast在Driver端進(jìn)行序列化,在Executor端進(jìn)行反序列化,并調(diào)用broadcastVar.value獲取廣播對(duì)象,"@transient private lazy val _value: T = readBroadcastBlock()",這是不會(huì)被序列化且lazy方式;"private var checksums: Array[Int] = _",校驗(yàn)和數(shù)組可以被Driver序列化
  3. 每個(gè)Executor會(huì)試圖獲取所有的塊,來組裝成一個(gè)被Broadcast的變量。"獲取塊"的方法是首先從Executor自身的BlockManager中獲取,如果自己的BlockManager中沒有這個(gè)塊,就從別的BlockManager中獲取

最初的時(shí)候,Driver是獲取這些塊的唯一的源,隨著各個(gè)Executor的BlockManager從Driver端獲取了不同的塊(TorrentBroadcast會(huì)有意避免各個(gè)Executor以同樣的順序獲取這些塊: Random.shuffle(Seq.range(0, numBlocks))),"塊"的源就多了起來,
每個(gè)Executor就可能從多個(gè)源中的一個(gè),包括Driver和其它Executor的BlockManager中獲取塊,這樣就使得流量在整個(gè)集群中更均勻,而不是由Driver作為唯一的源

引申

SparkBroadcast實(shí)現(xiàn)原理

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容