一、Spark Streaming的數(shù)據(jù)源
對(duì)于SparkStreaming中處理的數(shù)據(jù)無(wú)論是通過(guò)內(nèi)部接口獲取,還是通過(guò)Kafka、Flume、以及TCP Socket等外部數(shù)據(jù)源,對(duì)于這些數(shù)據(jù)的處理,這些整個(gè)過(guò)程的數(shù)據(jù)均抽象于DStream,如下圖所示:

不同的ReceiverInputDStream包含不同的流數(shù)據(jù)接收器Receiver(內(nèi)部類(lèi)),這些接收器繼承于Receiver。在StreamingContext啟動(dòng)過(guò)程中,ReceiverTracker會(huì)把流數(shù)據(jù)接收器Receiver分發(fā)到Executor上,在每個(gè)Executor上由ReceiverSupervisor啟動(dòng)對(duì)應(yīng)的Receiver。在Spark1.4及以前的版本中根據(jù)N個(gè)Receiver實(shí)例,在StreamingContext中創(chuàng)建一個(gè)作業(yè),該作業(yè)包含N個(gè)任務(wù),其創(chuàng)建結(jié)構(gòu)如下圖所示:

創(chuàng)建過(guò)程如下:
1、先遍歷ReceiverInputDStream,通過(guò)其getReceiver獲取需要啟動(dòng)的N個(gè)Receiver實(shí)例,然后把這些實(shí)例作為N份數(shù)據(jù),在StreamingContext創(chuàng)建一個(gè)RDD實(shí)例,該實(shí)例分為N個(gè)partition,每個(gè)partition對(duì)應(yīng)包含一個(gè)Receiver數(shù)據(jù)(即Receiver實(shí)例)。
2、在這里把Receiver所進(jìn)行的計(jì)算定義為func函數(shù),該函數(shù)以Receiver實(shí)例作為參數(shù)構(gòu)建ReceiverSupervisorImpl實(shí)例supervisor,構(gòu)造完畢后使用新線(xiàn)程啟動(dòng)該supervisor并阻塞該線(xiàn)程:


3、把ReceiverTracker盡可能地按照Receiver的首選位置分發(fā)到集群并啟動(dòng),啟動(dòng)完畢后Receiver會(huì)處于阻塞狀態(tài),持續(xù)不斷的接入流數(shù)據(jù)。
該Receiver分發(fā)方式在長(zhǎng)時(shí)間的運(yùn)行過(guò)程中,如果出現(xiàn)某個(gè)任務(wù)失敗,則Spark會(huì)重新發(fā)送該任務(wù)到其他Executor進(jìn)行重跑,但由于該分發(fā)過(guò)程屬于隨機(jī)分發(fā),無(wú)法實(shí)現(xiàn)集群的負(fù)載均衡,可能會(huì)出現(xiàn)某Worker節(jié)點(diǎn)運(yùn)行多個(gè)任務(wù),而某些Worker節(jié)點(diǎn)卻是空閑。而當(dāng)該任務(wù)的失敗次數(shù)超過(guò)規(guī)定的上限,會(huì)導(dǎo)致Receiver無(wú)法啟動(dòng),針對(duì)這些問(wèn)題,Spark1.5以及以后的版本,在StreamingContext中根據(jù)N個(gè)Receiver實(shí)例創(chuàng)建N個(gè)作業(yè),各個(gè)作業(yè)中只包含一個(gè)任務(wù),并加入了可插拔的Receiver分發(fā)策略,其結(jié)構(gòu)如下圖所示:

這樣在SparkStreaming中每個(gè)Receiver都有一個(gè)作業(yè)來(lái)分發(fā)(該作業(yè)紙包含一個(gè)任務(wù)),而且對(duì)于這僅有的一個(gè)任務(wù)只有在第一次啟動(dòng)時(shí),才嘗試啟動(dòng)Receiver。如果該任務(wù)失敗了,則不再?lài)L試啟動(dòng)Receiver,對(duì)應(yīng)的作業(yè)設(shè)置為完成狀態(tài),此時(shí)ReceiverTracker會(huì)新生成一個(gè)作業(yè),在其他Executor嘗試啟動(dòng),直到成功,這樣Receiver就不會(huì)受到任務(wù)失敗上限而無(wú)法啟動(dòng)。通過(guò)這種方式,SparkStreaming中所有的Receiver總是保持活性。
可拔插的Receiver分發(fā)策略在ReceiverSchedulingPolicy 類(lèi)定義,在Receiver分發(fā)之前會(huì)收集所有的InputDStream包含的所有Receiver實(shí)例和Executor,然后調(diào)用該類(lèi)中scheduleReceiver方法計(jì)算每個(gè)Receiver對(duì)應(yīng)的Executor。在該方法中以輪詢(xún)調(diào)度方式進(jìn)行分配,首先對(duì)存在首選位置的Receiver進(jìn)行處理,盡可能把Receiver運(yùn)行在首選位置機(jī)器進(jìn)行Receiver個(gè)數(shù)最少的Executor中,接著對(duì)于沒(méi)有首選位置的Receiver,則優(yōu)先分配到運(yùn)行Receiver個(gè)數(shù)最少的Executor中,分配完后返回調(diào)度好的Executor列表。源碼如下(對(duì)該函數(shù)的功能描述):

參考內(nèi)容
1、《圖解Spark核心技術(shù)與案例實(shí)戰(zhàn)》