Spark shuffle模塊主要包含三個(gè)組件:
- 負(fù)責(zé)shuffle數(shù)據(jù)讀寫的shuffleManager
- 負(fù)責(zé)跟蹤shuffle數(shù)據(jù)地址的MapOutputTracker
- 負(fù)責(zé)遠(yuǎn)程拉去數(shù)據(jù)的Shuffle RpcService
ShuffleManager
ShuffleManager在SparkEnv初始化,參見(jiàn)SparkEnv.scala#301, 有一種實(shí)現(xiàn)SortShuffleManager(spark1.6之前是存在HashShuffleManager的)
HashShuffleManager(spark1.6之前)
HashShuffleManager使用HashShuffleWriter和FileShuffleBlockResolver,map task會(huì)將每個(gè)reduce partition的結(jié)果都輸出成一個(gè)文件
問(wèn)題:如果shuffle個(gè)數(shù)過(guò)多,每個(gè)ShuffleMapTask會(huì)產(chǎn)生reduce parition個(gè)數(shù)文件,造成系統(tǒng)文件句柄耗盡。
SortShuffleManager
SortShuffleManager使用Writer有三種實(shí)現(xiàn):
- SortShuffleWriter
類MapReduce實(shí)現(xiàn),內(nèi)存buffer一定數(shù)據(jù),超過(guò)閾值之后先在內(nèi)存排序,輸出文件。 如果數(shù)據(jù)量非常大,可能會(huì)輸出多個(gè)文件,需要做一次或者多次歸并排序
- UnsafeShuffleWriter
如果支持序列化,就會(huì)使用這個(gè)writer。這個(gè)writer的優(yōu)勢(shì)是可以直接對(duì)序列化的數(shù)據(jù)進(jìn)行排序等操作,省去了大量反序列化的開(kāi)銷 先寫到memory,超過(guò)閾值就會(huì)spill。和SortShuffleWriter區(qū)別就是,使用的是shuffleExternalSorter,通過(guò)taskMemoryManager使用 Tungsten方式進(jìn)行內(nèi)存管理。同時(shí)對(duì)數(shù)據(jù)進(jìn)行排序。最后對(duì)spill的文件做一個(gè)merge。
merge的時(shí)候,是1~n partition id去遍歷spills文件,從每個(gè)spill文件讀取屬于該partition的數(shù)據(jù)。所以partition id之間有序,但是partition內(nèi)部不保證有序。
- BypassMergeSortShuffleHandle
如果map side沒(méi)有使用Aggregator,并且partition數(shù)目小于spark.shuffle.sort.bypassMergeThreshold(default:200),就會(huì)使用這個(gè)writer。 類似HashShuffleWriter,不會(huì)將record先cache到memory,而是直接寫文件,對(duì)每個(gè)reduce partition單獨(dú)寫一個(gè)文件。最后合并到一個(gè)文件里面,同時(shí)寫index文件用于reduce的時(shí)候拉取。

MapOutputTracker
跟蹤每個(gè)stage里面每個(gè)parition的output的location信息
- Driver端 MapOutputTrackerMaster
每個(gè)task執(zhí)行完,都會(huì)將mapStatus返回driver,driver端DAGScheduler收到task完成事件,通知MapOutputTrackerMaster會(huì)記錄數(shù)據(jù)屬于哪個(gè)shuffleId和reduce partition.作業(yè)的所有mapStatus都存在master的shuffleStatuses字段 -
Executor上MapOutputTrackerWorker
在reduce時(shí),worker根據(jù)ShuffledRDD的shuffleId從driver fetch這個(gè)id對(duì)應(yīng)的mapStatus
image.png
RPcService
BlockManager 根據(jù)是否依賴外部shuffle服務(wù)(通過(guò)參數(shù)spark.shuffle.service.enabled,默認(rèn)false),決定初始化內(nèi)部或者外部shuffle client, 參見(jiàn)BlockManager.scala#128
// Client to read other executors' shuffle files. This is either an external service, or just the
// standard BlockTransferService to directly connect to other Executors.
private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
new ExternalShuffleClient(transConf, securityManager, securityManager.isAuthenticationEnabled(),
securityManager.isSaslEncryptionEnabled(), conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT))
} else {
blockTransferService
}
內(nèi)置ShuffleService
NettyBlockTransferService 既是客戶端也是服務(wù)端。 服務(wù)端對(duì)應(yīng)的RpcHandler:NettyBlockRpcServer
主要支持兩個(gè)操作:
- fetchBlocks 從remote node拉取blockId對(duì)應(yīng)的文件
- uploadBlock 將一個(gè)local block上傳到remote node(用于block復(fù)制)
外部ShuffleService
- 客戶端:ExternalShuffleClient
- 服務(wù)端:
- Standalone Mode: ExternalShuffleService
- Spark on Yarn: YarnShuffleService
TODO(交互圖)
通信協(xié)議
- RegisterExecutor
- OpenBlocks
注意:此時(shí)Executor上還是會(huì)啟動(dòng)NettyBlockTransferService, 只不過(guò)只用于Block復(fù)制,不用于shuffle服務(wù)
Sorted Shuffle 寫入數(shù)據(jù)流程
- Executor執(zhí)行ShuffleMapTask#runTask
- 根據(jù)Sparkenv獲取shufflemanager為SortShuffleManager
- 通過(guò)shuffleDepedency拿到BasedShuffleHandle再通過(guò)SortShuffleManager獲取對(duì)應(yīng)的SortShuffleWriter
- SortShuffleWriter在寫入之前先調(diào)用ExternalSorter的insertAll對(duì)數(shù)據(jù)進(jìn)行聚合,根據(jù)shuffleDepedency中的partitioner,去匯聚同一個(gè)reduce partition的數(shù)據(jù)
- SortShuffleWriter#write會(huì)在executor本地寫數(shù)據(jù)(memory或者spill到磁盤)
最后生成兩個(gè)文件,data文件通過(guò)偏移量來(lái)區(qū)分不同reduce partition的數(shù)據(jù),index文件保存不同partition的offset - 生成mapstatus,作為task result序列化后返回給driver
Sorted Shuffle 拉取數(shù)據(jù)流程

- 基于ShuffleRDD,通過(guò)BlockStoreShuffleReader,read的底層使用ShuffleBlockFetcherIterator
- ShuffleBlockFetcherIterator首先根據(jù)blockManagerId劃分remote和local的請(qǐng)求,對(duì)于remote的數(shù)據(jù),會(huì)根據(jù)shuffleId去driver拉取mapstatus,根據(jù)mapstatus,去對(duì)應(yīng)的remote executor讀取數(shù)據(jù)
- ShuffleBlockFetcherIterator對(duì)每個(gè)remote req,會(huì)使用ShuffleClient拉去Shuffle數(shù)據(jù),放在內(nèi)存Result隊(duì)列里面
- BlockStoreShuffleReader#read不斷迭代ShuffleBlockFetcherIterator,將拉取完成的blocks,反序列化成key-value的iterator
- 根據(jù)shuffleDependency的aggregator,將結(jié)果按照key匯聚到一個(gè)內(nèi)存中PartitionedAppendOnlyMap,如果內(nèi)存不足,就spill
-
最后將內(nèi)存數(shù)據(jù)和磁盤數(shù)據(jù)歸并成一個(gè)完整的key/value iterator, 給ShuffledRDD
image.png
ShuffleBlockFetcherIterator 通過(guò)一個(gè)spark.reducer.maxSizeInFlight控制同時(shí)拉去的shuffle數(shù)據(jù)量總大小,防止內(nèi)存OOM

