1、Spark Streaming消息通信總體流程
在啟動(dòng)流處理引擎的過(guò)程中,將進(jìn)行啟動(dòng)所有的流數(shù)據(jù)接受器Receiver和注冊(cè)流數(shù)據(jù)接收器Receiver兩個(gè)消息通信。在接收存儲(chǔ)流數(shù)據(jù)中,當(dāng)數(shù)據(jù)塊存儲(chǔ)完成后發(fā)送添加數(shù)據(jù)塊消息,而當(dāng)Spark Streaming停止時(shí),需要發(fā)送關(guān)閉所有的流數(shù)據(jù)接收器Receiver消息,整個(gè)流程如下圖:

2、Spark Streaming消息通信具體流程
一、在啟動(dòng)流引擎過(guò)程中,JobScheduler會(huì)在內(nèi)部啟動(dòng)ReceiverTracker和ReceiverTrackerEndPoint終端點(diǎn),當(dāng)ReceiverTracker準(zhǔn)備完畢后向終端點(diǎn)發(fā)送startAllReceivers消息,通知其分發(fā)并啟動(dòng)所有的流數(shù)據(jù)接收器Receiver,其代碼如下:


二、啟動(dòng)流數(shù)據(jù)接收器Receiver前,ReceiverSupervisor會(huì)向ReceiverTrackerEndPoint終端點(diǎn)發(fā)送RegisterReceiver注冊(cè)信息,當(dāng)注冊(cè)成功后才會(huì)繼續(xù)流數(shù)據(jù)接收器Receiver的啟動(dòng)。其中注冊(cè)過(guò)程的代碼是在ReceiverSupervisor的startReceiver方法中,具體如下:

三、流數(shù)據(jù)接收器Receiver接收數(shù)據(jù)的過(guò)程中,當(dāng)保存完一個(gè)數(shù)據(jù)塊時(shí),作為數(shù)據(jù)轉(zhuǎn)儲(chǔ)的管理者ReceiverSupervisor會(huì)把數(shù)據(jù)塊的元信息發(fā)送給ReceiverTrackerEndPoint終端點(diǎn),ReceiverTracker再把這些信息轉(zhuǎn)發(fā)給ReceiverBlockTracker,由它負(fù)責(zé)管理接收到數(shù)據(jù)塊的元信息。發(fā)送增加數(shù)據(jù)塊消息代碼位于ReceiverSupervisorImpl的pushAndReportBlock方法中,具體代碼如下:

四、當(dāng)SparkStreaming停止時(shí),ReceiverTracker發(fā)送注銷所有流數(shù)據(jù)接收器Receiver消息,ReceiverTrackerEndPoint終端點(diǎn)接到消息后會(huì)調(diào)用ReceiverTracker.stop方法注銷,在停止的過(guò)程當(dāng)中,ReceiverTracker會(huì)發(fā)送兩次注銷消息,發(fā)送的間隔為10S,用于等待流數(shù)據(jù)接收器Receiver。

參考內(nèi)容
1、https://blog.csdn.net/zhanglh046/article/details/78505067
2、《圖解Spark核心技術(shù)與案例實(shí)戰(zhàn)》