一.什么是shuffle
MapOutputTrancker用于跟蹤map任務(wù)的輸出狀態(tài),此狀態(tài)便于reduce任務(wù)定位到map輸出結(jié)果所在的節(jié)點(diǎn)地址,進(jìn)而獲取中間輸出結(jié)果,每個(gè)map任務(wù)或者reduce任務(wù),都會(huì)有其唯一的標(biāo)識(shí),分別為mapid和reduceid,每個(gè)reduce任務(wù)的輸入可能是多個(gè)map任務(wù)的輸出,因?yàn)閞educe可能會(huì)到多個(gè)map任務(wù)所在的節(jié)點(diǎn)上去拉取Block,這一過(guò)程叫做shuffle,每次shuffle的過(guò)程都有其唯一的標(biāo)識(shí)shuffleid.
二.MapOutputTrancker的創(chuàng)建方式
在Driver端和Executor端啟動(dòng)的同時(shí),都會(huì)創(chuàng)建MapOutputTracker的實(shí)例,不同的是Driver端創(chuàng)建的是MapOutputTrackerMaster,Executor端創(chuàng)建的是MapOutputTranckerWoker。
Driver端啟動(dòng)時(shí)會(huì)創(chuàng)建MapOutputTrackerMaster,之后創(chuàng)建MapOutputTrackerMasterEndpoint,并且注冊(cè)到Dispatcher中,端點(diǎn)名稱為MapOutputTrancker。
Executor端創(chuàng)建MapOutputTranckerWorker,不僅會(huì)和Driver端一樣,注冊(cè)端點(diǎn)信息等,而且會(huì)從遠(yuǎn)端Driver獲取之前在NettyRpcEnv的Dispatcher中注冊(cè)好的MapOutputTrackerMasterEndpoint的引用。
三.MapOutputTrancker的屬性
- trackerEndpoint:持有Driver端上MapOutputTrackerMasterEndpoint的引用Ref
- mapStatuses:用于維護(hù)各個(gè)map任務(wù)輸出的狀態(tài),類型為Map[Int,Array[MapStatus]],key為shuffleid,Array存儲(chǔ)著各個(gè)map任務(wù)對(duì)于的狀態(tài)信息mapStatus。由于各個(gè)MapOutputTranckerWoker會(huì)不斷向MapOutputTranckerMaster匯報(bào)本節(jié)點(diǎn)的Executor運(yùn)行的map任務(wù)狀態(tài)信息,因此MapOutputTranckerMaster中的mapStatuses中維護(hù)的信息是最新最全的。而MapOutputTrackerWorker的mapStatuses對(duì)于本節(jié)點(diǎn)上的map任務(wù)狀態(tài)是及時(shí)更新的,對(duì)于其他節(jié)點(diǎn)的map任務(wù)狀態(tài)則是一個(gè)緩沖,如果后續(xù)在獲取mapStatus時(shí),無(wú)法命中緩存,則向Drievr端的MapOutputTranckerMaster獲取最新的任務(wù)狀態(tài)信息。
- fetching:shuffle拉取的集合,用來(lái)記錄當(dāng)前Executor正在從哪些Map輸出的位置拉取數(shù)據(jù)。
四.獲取mapStatus的流程
1.首先 從當(dāng)前Executor中的MapOutputTracker的mapStatuses緩存中,獲取MapStatus數(shù)組,如果沒(méi)有則向遠(yuǎn)端Driver中的MapOutputTranckerMaster去獲取任務(wù)狀態(tài)信息。
2.然后 判斷fetching中是否已經(jīng)存在要獲取的shuffleid,如果有,這就說(shuō)明有其他線程對(duì)此shuffleid的數(shù)據(jù)進(jìn)行遠(yuǎn)程拉取了,這樣就等待其他線程拉取完畢,直到fetching中不存在要取的shuffleid,這時(shí)就從mapStatuses中再次獲取mapStatus集合。
3.如果還獲取不到,則說(shuō)明其他線程拉取失敗了,則需要自己去拉取數(shù)據(jù),首先將shuufleid加入fetching集合中,表示當(dāng)前shuffleid的任務(wù)狀態(tài)信息,已經(jīng)有線程在拉取了,之后會(huì)調(diào)用ackTracker方法,向MapOutputTrackerMasterEndpoint發(fā)送消息去獲取map任務(wù)的狀態(tài)信息。
4.之后 MapOutputTrackerMaster接受到該消息之后,將請(qǐng)求包裝成MapOutputMessage消息,放入到消息隊(duì)列,異步的去處理該消息。
首先會(huì)getSerializedMapOutputStatuses方法,查詢本地記錄shuffle對(duì)應(yīng)的Map輸出狀態(tài)。
在獲取的過(guò)程中需要為每個(gè)shuffleId分配一個(gè)分段鎖,因?yàn)檫@里支持并發(fā)調(diào)用,同一時(shí)間有多個(gè)線程需要獲取同一個(gè)shuffleId對(duì)應(yīng)的輸出,所以需要保證Map元數(shù)據(jù)信息只序列化或者廣播一次。所以在獲取鎖之前和得到鎖之后都需要再次查詢一下緩存,可能有其他線程已經(jīng)緩存了MapStatus。
如果緩存還是為空,則需要將MapStatus序列化或者包裝為Broadcast。對(duì)于序列化還是廣播,通過(guò)比較序列化后的結(jié)果大小是否超出spark.shuffle.mapOutput.minSizeForBroadcast,默認(rèn)值為512K。
序列化完成后,將此結(jié)果進(jìn)行緩存,并向MapOutputTrackerWorker返回結(jié)果。
MapOutputTrackerWorker的askTracker接收到返回的結(jié)果后結(jié)束阻塞,將數(shù)據(jù)反序列化成mapStatus集合緩存下來(lái),然后將shuffleid從fetching中移除,喚醒哪些在fetching鎖上等待的線程,使這些線程可以獲取自己需要的MapStatus數(shù)組。
5.最后 返回任務(wù)狀態(tài)信息mapStatus數(shù)組。
6.注意 MapOutputTrancker中會(huì)有線程池,區(qū)別于Dispatcher中的線程池,同時(shí)還有MessageLoop,和Dispatcher中非常相似。
五.ShuffleReader如何使用mapStatus
1.在ShuffleRDD的compute方法中,會(huì)獲取BlockStoreShuffleReader,然后在BlockStoreShuffleReader中,會(huì)調(diào)用mapOutputTracker.getMapSizesByExecutorId方法獲取一組二元組序列Seq[(BlockManagerId, Seq[(BlockId, Long)])],第一項(xiàng)代表了BlockManagerId,第二項(xiàng)描述了存儲(chǔ)于該BlockManager上的一組shuffle blocks。
2.getMapSizesByExecutorId會(huì)調(diào)用getStatuses方法獲取MapStatus集合,然后最后返回MapStatus集合。
3.最后根據(jù)執(zhí)行的分區(qū)范圍[startPartition, endPartition]將返回的結(jié)果Array[MapStatus]轉(zhuǎn)換成Seq[(BlockManagerId, Seq[(BlockId, Long)])]。
4.利用這個(gè)Seq[(BlockManagerId, Seq[(BlockId, Long)])],去指定的BlockManager中去拉取對(duì)應(yīng)的Block塊的數(shù)據(jù)用來(lái)迭代計(jì)算。