Spark Shuffle 服務(wù)

Spark shuffle模塊主要包含三個(gè)組件:

  • 負(fù)責(zé)shuffle數(shù)據(jù)讀寫的shuffleManager
  • 負(fù)責(zé)跟蹤shuffle數(shù)據(jù)地址的MapOutputTracker
  • 負(fù)責(zé)遠(yuǎn)程拉去數(shù)據(jù)的Shuffle RpcService

ShuffleManager

ShuffleManager在SparkEnv初始化,參見(jiàn)SparkEnv.scala#301, 有一種實(shí)現(xiàn)SortShuffleManager(spark1.6之前是存在HashShuffleManager的)

HashShuffleManager(spark1.6之前)

HashShuffleManager使用HashShuffleWriter和FileShuffleBlockResolver,map task會(huì)將每個(gè)reduce partition的結(jié)果都輸出成一個(gè)文件

問(wèn)題:如果shuffle個(gè)數(shù)過(guò)多,每個(gè)ShuffleMapTask會(huì)產(chǎn)生reduce parition個(gè)數(shù)文件,造成系統(tǒng)文件句柄耗盡。

SortShuffleManager

SortShuffleManager使用Writer有三種實(shí)現(xiàn):

  • SortShuffleWriter

類MapReduce實(shí)現(xiàn),內(nèi)存buffer一定數(shù)據(jù),超過(guò)閾值之后先在內(nèi)存排序,輸出文件。 如果數(shù)據(jù)量非常大,可能會(huì)輸出多個(gè)文件,需要做一次或者多次歸并排序

  • UnsafeShuffleWriter

如果支持序列化,就會(huì)使用這個(gè)writer。這個(gè)writer的優(yōu)勢(shì)是可以直接對(duì)序列化的數(shù)據(jù)進(jìn)行排序等操作,省去了大量反序列化的開(kāi)銷 先寫到memory,超過(guò)閾值就會(huì)spill。和SortShuffleWriter區(qū)別就是,使用的是shuffleExternalSorter,通過(guò)taskMemoryManager使用 Tungsten方式進(jìn)行內(nèi)存管理。同時(shí)對(duì)數(shù)據(jù)進(jìn)行排序。最后對(duì)spill的文件做一個(gè)merge。

merge的時(shí)候,是1~n partition id去遍歷spills文件,從每個(gè)spill文件讀取屬于該partition的數(shù)據(jù)。所以partition id之間有序,但是partition內(nèi)部不保證有序。

  • BypassMergeSortShuffleHandle

如果map side沒(méi)有使用Aggregator,并且partition數(shù)目小于spark.shuffle.sort.bypassMergeThreshold(default:200),就會(huì)使用這個(gè)writer。 類似HashShuffleWriter,不會(huì)將record先cache到memory,而是直接寫文件,對(duì)每個(gè)reduce partition單獨(dú)寫一個(gè)文件。最后合并到一個(gè)文件里面,同時(shí)寫index文件用于reduce的時(shí)候拉取。


image.png

MapOutputTracker

跟蹤每個(gè)stage里面每個(gè)parition的output的location信息

  • Driver端 MapOutputTrackerMaster
    每個(gè)task執(zhí)行完,都會(huì)將mapStatus返回driver,driver端DAGScheduler收到task完成事件,通知MapOutputTrackerMaster會(huì)記錄數(shù)據(jù)屬于哪個(gè)shuffleId和reduce partition.作業(yè)的所有mapStatus都存在master的shuffleStatuses字段
  • Executor上MapOutputTrackerWorker
    在reduce時(shí),worker根據(jù)ShuffledRDD的shuffleId從driver fetch這個(gè)id對(duì)應(yīng)的mapStatus


    image.png

RPcService

BlockManager 根據(jù)是否依賴外部shuffle服務(wù)(通過(guò)參數(shù)spark.shuffle.service.enabled,默認(rèn)false),決定初始化內(nèi)部或者外部shuffle client, 參見(jiàn)BlockManager.scala#128

// Client to read other executors' shuffle files. This is either an external service, or just the
// standard BlockTransferService to directly connect to other Executors.
private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled(),
securityManager.isSaslEncryptionEnabled(), conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT))
} else {
blockTransferService
}

內(nèi)置ShuffleService

NettyBlockTransferService 既是客戶端也是服務(wù)端。 服務(wù)端對(duì)應(yīng)的RpcHandler:NettyBlockRpcServer

主要支持兩個(gè)操作:

  • fetchBlocks 從remote node拉取blockId對(duì)應(yīng)的文件
  • uploadBlock 將一個(gè)local block上傳到remote node(用于block復(fù)制)

外部ShuffleService

  • 客戶端:ExternalShuffleClient
  • 服務(wù)端:
    • Standalone Mode: ExternalShuffleService
    • Spark on Yarn: YarnShuffleService
      TODO(交互圖)

通信協(xié)議

  • RegisterExecutor
  • OpenBlocks
    注意:此時(shí)Executor上還是會(huì)啟動(dòng)NettyBlockTransferService, 只不過(guò)只用于Block復(fù)制,不用于shuffle服務(wù)

Sorted Shuffle 寫入數(shù)據(jù)流程

  • Executor執(zhí)行ShuffleMapTask#runTask
  • 根據(jù)Sparkenv獲取shufflemanager為SortShuffleManager
  • 通過(guò)shuffleDepedency拿到BasedShuffleHandle再通過(guò)SortShuffleManager獲取對(duì)應(yīng)的SortShuffleWriter
  • SortShuffleWriter在寫入之前先調(diào)用ExternalSorter的insertAll對(duì)數(shù)據(jù)進(jìn)行聚合,根據(jù)shuffleDepedency中的partitioner,去匯聚同一個(gè)reduce partition的數(shù)據(jù)
  • SortShuffleWriter#write會(huì)在executor本地寫數(shù)據(jù)(memory或者spill到磁盤)
    最后生成兩個(gè)文件,data文件通過(guò)偏移量來(lái)區(qū)分不同reduce partition的數(shù)據(jù),index文件保存不同partition的offset
  • 生成mapstatus,作為task result序列化后返回給driver

Sorted Shuffle 拉取數(shù)據(jù)流程

image.png
  • 基于ShuffleRDD,通過(guò)BlockStoreShuffleReader,read的底層使用ShuffleBlockFetcherIterator
  • ShuffleBlockFetcherIterator首先根據(jù)blockManagerId劃分remote和local的請(qǐng)求,對(duì)于remote的數(shù)據(jù),會(huì)根據(jù)shuffleId去driver拉取mapstatus,根據(jù)mapstatus,去對(duì)應(yīng)的remote executor讀取數(shù)據(jù)
  • ShuffleBlockFetcherIterator對(duì)每個(gè)remote req,會(huì)使用ShuffleClient拉去Shuffle數(shù)據(jù),放在內(nèi)存Result隊(duì)列里面
  • BlockStoreShuffleReader#read不斷迭代ShuffleBlockFetcherIterator,將拉取完成的blocks,反序列化成key-value的iterator
  • 根據(jù)shuffleDependency的aggregator,將結(jié)果按照key匯聚到一個(gè)內(nèi)存中PartitionedAppendOnlyMap,如果內(nèi)存不足,就spill
  • 最后將內(nèi)存數(shù)據(jù)和磁盤數(shù)據(jù)歸并成一個(gè)完整的key/value iterator, 給ShuffledRDD


    image.png

    ShuffleBlockFetcherIterator 通過(guò)一個(gè)spark.reducer.maxSizeInFlight控制同時(shí)拉去的shuffle數(shù)據(jù)量總大小,防止內(nèi)存OOM

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。
禁止轉(zhuǎn)載,如需轉(zhuǎn)載請(qǐng)通過(guò)簡(jiǎn)信或評(píng)論聯(lián)系作者。

友情鏈接更多精彩內(nèi)容