目錄
前言
在RPC的領域里摸爬滾打了很長時間,是時候抽身出來看一看其他東西了。順著SparkEnv初始化的思路繼續(xù)看,下一個主要組件是廣播管理器BroadcastManager。本文就主要講解Spark中廣播機制的實現(xiàn)。
廣播變量是Spark兩種共享變量中的一種(另一種是累加器)。它適合處理多節(jié)點跨Stage的共享數(shù)據(jù),特別是輸入數(shù)據(jù)量較大的集合,可以提高效率。
廣播管理器BroadcastManager
BroadcastManager在SparkEnv中是直接初始化的,其代碼邏輯也很短,如下。
代碼#11.1 - o.a.s.broadcast.BroadcastManager類
private[spark] class BroadcastManager(
val isDriver: Boolean,
conf: SparkConf,
securityManager: SecurityManager)
extends Logging {
private var initialized = false
private var broadcastFactory: BroadcastFactory = null
initialize()
private def initialize() {
synchronized {
if (!initialized) {
broadcastFactory = new TorrentBroadcastFactory
broadcastFactory.initialize(isDriver, conf, securityManager)
initialized = true
}
}
}
def stop() {
broadcastFactory.stop()
}
private val nextBroadcastId = new AtomicLong(0)
private[broadcast] val cachedValues = {
new ReferenceMap(AbstractReferenceMap.HARD, AbstractReferenceMap.WEAK)
}
def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean): Broadcast[T] = {
broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
}
def unbroadcast(id: Long, removeFromDriver: Boolean, blocking: Boolean) {
broadcastFactory.unbroadcast(id, removeFromDriver, blocking)
}
}
構造方法參數(shù)
BroadcastManager在構造時有三個參數(shù),分別是isDriver(是否為Driver節(jié)點)、conf(對應的SparkConf配置)、securityManager(對應的SecurityManager)。非常簡單,不再贅述。
屬性成員
BroadcastManager內有四個屬性成員:
- initialized表示BroadcastManager是否已經初始化完成。
- broadcastFactory持有廣播工廠的實例(類型是BroadcastFactory特征的實現(xiàn)類)。
- nextBroadcastId表示下一個廣播變量的唯一標識(AtomicLong類型的)。
- cachedValues用來緩存已廣播出去的變量。它屬于ReferenceMap類型,是apache-commons提供的一個弱引用映射數(shù)據(jù)結構。與我們常見的各種Map不同,它的鍵值對有可能會在GC過程中被回收。
初始化邏輯
initialize()方法做的事情也非常簡單,它首先判斷BroadcastManager是否已初始化。如果未初始化,就新建廣播工廠TorrentBroadcastFactory,將其初始化,然后將初始化標記設為true。
對外提供的方法
BroadcastManager提供的方法有兩個:newBroadcast()方法,用于創(chuàng)建一個新的廣播變量;以及unbroadcast()方法,將已存在的廣播變量取消廣播。它們都是簡單地調用了TorrentBroadcastFactory中的同名方法,因此我們必須通過閱讀TorrentBroadcastFactory的相關源碼,才能了解Spark廣播機制的細節(jié)。
廣播變量TorrentBroadcast
來看TorrentBroadcastFactory.newBroadcast()方法。
代碼#11.2 - o.a.s.broadcast.TorrentBroadcastFactory.newBroadcast()方法
override def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long): Broadcast[T] = {
new TorrentBroadcast[T](value_, id)
}
可見只是簡單地(真的很簡單嗎?)創(chuàng)建了一個TorrentBroadcast對象實例,它就是前面一直在說的“廣播變量”的廬山真面目。下面我們來仔細研究它。
屬性成員及參數(shù)初始化
這個類中的屬性不算少哦。
代碼#11.3 - o.a.s.broadcast.TorrentBroadcast類的屬性成員
@transient private lazy val _value: T = readBroadcastBlock()
@transient private var compressionCodec: Option[CompressionCodec] = _
@transient private var blockSize: Int = _
private val broadcastId = BroadcastBlockId(id)
private val numBlocks: Int = writeBlocks(obj)
private var checksumEnabled: Boolean = false
private var checksums: Array[Int] = _
private def setConf(conf: SparkConf) {
compressionCodec = if (conf.getBoolean("spark.broadcast.compress", true)) {
Some(CompressionCodec.createCodec(conf))
} else {
None
}
blockSize = conf.getSizeAsKb("spark.broadcast.blockSize", "4m").toInt * 1024
checksumEnabled = conf.getBoolean("spark.broadcast.checksum", true)
}
setConf(SparkEnv.get.conf)
- _value:廣播塊的具體數(shù)據(jù)。注意它由lazy關鍵字定義,因此是懶加載的,也就是在TorrentBroadcast構造時不會調用readBroadcastBlock()方法獲取數(shù)據(jù),而會推遲到第一次訪問_value時。
- compressionCodec:廣播塊的壓縮編解碼邏輯。當配置項spark.broadcast.compress為true時,會啟用壓縮。
- blockSize:廣播塊的大小。由spark.broadcast.blockSize配置項來控制,默認值4MB。
- broadcastId:廣播變量的ID。BroadcastBlockId是個結構非常簡單的case class,每產生一個新的廣播變量就會自增。
- numBlocks:該廣播變量包含的塊數(shù)量。它與_value不同,并沒有l(wèi)azy關鍵字定義,因此在TorrentBroadcast構造時就會直接調用writeBlocks()方法。
- checksumEnabled:是否允許對廣播塊計算校驗值,由spark.broadcast.checksum配置項控制,默認值true。
- checksums:廣播塊的校驗值。
廣播變量的寫入
上面已經提到在TorrentBroadcast構造時會直接調用writeBlocks()方法,來看一看它的代碼。
代碼#11.4 - o.a.s.broadcast.TorrentBroadcast.writeBlocks()方法
private def writeBlocks(value: T): Int = {
import StorageLevel._
val blockManager = SparkEnv.get.blockManager
if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) {
throw new SparkException(s"Failed to store $broadcastId in BlockManager")
}
val blocks =
TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
if (checksumEnabled) {
checksums = new Array[Int](blocks.length)
}
blocks.zipWithIndex.foreach { case (block, i) =>
if (checksumEnabled) {
checksums(i) = calcChecksum(block)
}
val pieceId = BroadcastBlockId(id, "piece" + i)
val bytes = new ChunkedByteBuffer(block.duplicate())
if (!blockManager.putBytes(pieceId, bytes, MEMORY_AND_DISK_SER, tellMaster = true)) {
throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager")
}
}
blocks.length
}
這個方法中涉及到了塊管理器BlockManager,它是Spark存儲子系統(tǒng)中的基礎組件,我們現(xiàn)在暫時不考慮它,后面還會對它進行十分詳盡的分析。writeBlocks()方法的執(zhí)行邏輯如下:
- 獲取BlockManager實例,調用其putSingle()方法將廣播數(shù)據(jù)作為單個對象寫入本地存儲。注意StorageLevel為MEMORY_AND_DISK,亦即在內存不足時會溢寫到磁盤,且副本數(shù)為1,不會進行復制。
- 調用blockifyObject()方法將廣播數(shù)據(jù)轉化為塊,即Spark存儲的基本單元。使用的序列化器為SparkEnv中初始化的JavaSerializer。
- 如果校驗值開關有效,就用calcChecksum()方法為每個塊計算校驗值。
- 為廣播數(shù)據(jù)切分成的每個塊(稱為piece)都生成一個帶"piece"的廣播ID,調用BlockManager.putBytes()方法將各個塊寫入MemoryStore(內存)或DiskStore(磁盤)。StorageLevel為MEMORY_AND_DISK_SER,寫入的數(shù)據(jù)會序列化。
- 最終返回塊的計數(shù)值。
上面提到的blockifyObject()、calcChecksum()方法的實現(xiàn)都比較簡單,就不再贅述。
廣播變量的讀取
先來看readBroadcastBlock()方法。
代碼#11.5 - o.a.s.broadcast.TorrentBroadcast.readBroadcastBlock()方法
private def readBroadcastBlock(): T = Utils.tryOrIOException {
TorrentBroadcast.synchronized {
val broadcastCache = SparkEnv.get.broadcastManager.cachedValues
Option(broadcastCache.get(broadcastId)).map(_.asInstanceOf[T]).getOrElse {
setConf(SparkEnv.get.conf)
val blockManager = SparkEnv.get.blockManager
blockManager.getLocalValues(broadcastId) match {
case Some(blockResult) =>
if (blockResult.data.hasNext) {
val x = blockResult.data.next().asInstanceOf[T]
releaseLock(broadcastId)
if (x != null) {
broadcastCache.put(broadcastId, x)
}
x
} else {
throw new SparkException(s"Failed to get locally stored broadcast data: $broadcastId")
}
case None =>
logInfo("Started reading broadcast variable " + id)
val startTimeMs = System.currentTimeMillis()
val blocks = readBlocks()
logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs))
try {
val obj = TorrentBroadcast.unBlockifyObject[T](
blocks.map(_.toInputStream()), SparkEnv.get.serializer, compressionCodec)
val storageLevel = StorageLevel.MEMORY_AND_DISK
if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) {
throw new SparkException(s"Failed to store $broadcastId in BlockManager")
}
if (obj != null) {
broadcastCache.put(broadcastId, obj)
}
obj
} finally {
blocks.foreach(_.dispose())
}
}
}
}
}
其執(zhí)行邏輯如下:
- 獲取BlockManager實例,調用其getLocalValues()方法將之前寫入的廣播數(shù)據(jù)對象取出。
- 如果能夠直接取得廣播數(shù)據(jù),就調用releaseLock()方法【實際上對應BlockManager.releaseLock(),又對應Object.notifyAll()】解開當前塊的鎖。這個鎖用來保證塊讀寫的互斥性。
- 如果不能直接取得廣播數(shù)據(jù),說明數(shù)據(jù)都已經序列化,并且有可能不在本地存儲。此時調用readBlocks()方法從本地和遠端同時獲取塊,然后調用unBlockifyObject()方法將塊轉換回廣播數(shù)據(jù)的對象。
- 再次調用BlockManager.putSingle()方法將廣播數(shù)據(jù)作為單個對象寫入本地存儲,再將其加入廣播緩存Map中,下次讀取時就不用大費周章了。
readBlocks()方法的具體實現(xiàn)如下所示。
代碼#11.6 - o.a.s.broadcast.TorrentBroadcast.readBlocks()方法
private def readBlocks(): Array[BlockData] = {
val blocks = new Array[BlockData](numBlocks)
val bm = SparkEnv.get.blockManager
for (pid <- Random.shuffle(Seq.range(0, numBlocks))) {
val pieceId = BroadcastBlockId(id, "piece" + pid)
logDebug(s"Reading piece $pieceId of $broadcastId")
bm.getLocalBytes(pieceId) match {
case Some(block) =>
blocks(pid) = block
releaseLock(pieceId)
case None =>
bm.getRemoteBytes(pieceId) match {
case Some(b) =>
if (checksumEnabled) {
val sum = calcChecksum(b.chunks(0))
if (sum != checksums(pid)) {
throw new SparkException(s"corrupt remote block $pieceId of $broadcastId:" +
s" $sum != ${checksums(pid)}")
}
}
if (!bm.putBytes(pieceId, b, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)) {
throw new SparkException(
s"Failed to store $pieceId of $broadcastId in local BlockManager")
}
blocks(pid) = new ByteBufferBlockData(b, true)
case None =>
throw new SparkException(s"Failed to get $pieceId of $broadcastId")
}
}
}
blocks
}
該方法會首先對所有廣播數(shù)據(jù)的piece進行打散,然后對打散之后的每個piece執(zhí)行以下步驟:
- 調用BlockManager.getLocalBytes()方法,從本地獲取序列化的廣播數(shù)據(jù)塊。將獲取到的塊放入對應下標的位置,并釋放該塊的鎖。
- 如果本地沒有廣播數(shù)據(jù),就調用BlockManager.getRemoteBytes()方法從遠端(其他Executor或者Driver)獲取廣播數(shù)據(jù)塊。
- 對遠程獲取的塊計算校驗值,并與之前寫入時計算的校驗值比對。如果不同,說明傳輸發(fā)生錯誤,拋異常出去。
- 若一切正常,調用BlockManager.putBytes()方法,將各個塊寫入MemoryStore(內存)或DiskStore(磁盤),并將其放入對應下標的位置。最終返回所有讀取的塊。
廣播變量讀取的流程圖描述
上面單單通過文字敘述可能會令人費解,因此下面畫一個標準的Flow chart來描述它的過程。

總結
本文從廣播管理器BroadcastManager的初始化入手,揭示了廣播變量的本質——TorrentBroadcast,并通過引入塊管理器BlockManager的相關知識,詳細分析了廣播數(shù)據(jù)的寫入和讀取流程。