spark streaming源碼解讀之基于貫通Spark Streaming流計(jì)算框架的運(yùn)行源碼

1在線動(dòng)態(tài)計(jì)算分類最熱門商品案例回顧與演示

我們用Spark?Streaming+Spark?SQL來實(shí)現(xiàn)分類最熱門商品的在線動(dòng)態(tài)計(jì)算。代碼如下:

package?com.dt.spark.streaming

import?org.apache.spark.SparkConf

import?org.apache.spark.sql.Row

import?org.apache.spark.sql.hive.HiveContext

import?org.apache.spark.sql.types.{IntegerType,?StringType,?StructField,?StructType}

import?org.apache.spark.streaming.{Seconds,?StreamingContext}

/**

*?使用Spark?Streaming+Spark?SQL來在線動(dòng)態(tài)計(jì)算電商中不同類別中最熱門的商品排名,例如手機(jī)

*?這個(gè)類別下面最熱門的三種手機(jī)、電視這個(gè)類別下最熱門的三種電視,該實(shí)例在實(shí)際生產(chǎn)環(huán)境下

*?具有非常重大的意義;

*

*?@author?DT大數(shù)據(jù)夢(mèng)工廠

*?新浪微博:http://weibo.com/ilovepains/

*

*?實(shí)現(xiàn)技術(shù):Spark?Streaming+Spark?SQL,之所以Spark?Streaming能夠使用ML、sql、graphx等

*?功能是因?yàn)橛衒oreachRDD和Transform等接口,這些接口中其實(shí)是基于RDD進(jìn)行操作,所以以RDD為

*?基石,就可以直接使用Spark其它所有的功能,就像直接調(diào)用API一樣簡(jiǎn)單。

*?假設(shè)說這里的數(shù)據(jù)的格式:user?item?category,例如Rocky?Samsung?Android

*/

object?OnlineTheTop3ItemForEachCategory2DB?{

def?main(args:?Array[String]){

/**

*?第1步:創(chuàng)建Spark的配置對(duì)象SparkConf,設(shè)置Spark程序的運(yùn)行時(shí)的配置信息,

*?例如說通過setMaster來設(shè)置程序要鏈接的Spark集群的Master的URL,如果設(shè)置

*?為local,則代表Spark程序在本地運(yùn)行,特別適合于機(jī)器配置條件非常差(例如

*?只有1G的內(nèi)存)的初學(xué)者???????*

*/

val?conf?=?new?SparkConf()?//創(chuàng)建SparkConf對(duì)象

//設(shè)置應(yīng)用程序的名稱,在程序運(yùn)行的監(jiān)控界面可以看到名稱

conf.setAppName("OnlineTheTop3ItemForEachCategory2DB")

conf.setMaster("spark://Master:7077")?//此時(shí),程序在Spark集群

//設(shè)置batchDuration時(shí)間間隔來控制Job生成的頻率并且創(chuàng)建Spark?Streaming執(zhí)行的入口

val?ssc?=?new?StreamingContext(conf,?Seconds(5))

ssc.checkpoint("/root/Documents/SparkApps/checkpoint")

val?userClickLogsDStream?=?ssc.socketTextStream("Master",?9999)

val?formattedUserClickLogsDStream?=?userClickLogsDStream.map(clickLog?=>

(clickLog.split("?")(2)?+?"_"?+?clickLog.split("?")(1),?1))

val?categoryUserClickLogsDStream?=?formattedUserClickLogsDStream.reduceByKeyAndWindow(_+_,

_-_,?Seconds(60),?Seconds(20))

categoryUserClickLogsDStream.foreachRDD?{?rdd?=>?{

if?(rdd.isEmpty())?{

println("No?data?inputted!!!")

}?else?{

val?categoryItemRow?=?rdd.map(reducedItem?=>?{

val?category?=?reducedItem._1.split("_")(0)

val?item?=?reducedItem._1.split("_")(1)

val?click_count?=?reducedItem._2

Row(category,?item,?click_count)

})

val?structType?=?StructType(Array(

StructField("category",?StringType,?true),

StructField("item",?StringType,?true),

StructField("click_count",?IntegerType,?true)

))

val?hiveContext?=?new?HiveContext(rdd.context)

val?categoryItemDF?=?hiveContext.createDataFrame(categoryItemRow,?structType)

categoryItemDF.registerTempTable("categoryItemTable")

val?reseltDataFram?=?hiveContext.sql("SELECT?category,item,click_count?FROM" +

" (SELECT?category,item,click_count,row_number()"?+

"?OVER?(PARTITION?BY?category?ORDER?BY?click_count?DESC)?rank"?+

"FROM?categoryItemTable)subquery?WHERE?rank?<=?3")

reseltDataFram.show()

val?resultRowRDD?=?reseltDataFram.rdd

resultRowRDD.foreachPartition?{?partitionOfRecords?=>?{

if?(partitionOfRecords.isEmpty){

println("This?RDD?is?not?null?but?partition?is?null")

}?else?{

//?ConnectionPool?is?a?static,?lazily?initialized?pool?of?connections

val?connection?=?ConnectionPool.getConnection()

partitionOfRecords.foreach(record?=>?{

val?sql?=?"insert?into?categorytop3(category,item,client_count)?"?+

values('"?+?record.getAs("category")?+?"','"?+

record.getAs("item")?+?"',"?+?record.getAs("click_count")?+?")"

val?stmt?=?connection.createStatement();

stmt.executeUpdate(sql);

})

ConnectionPool.returnConnection(connection)?//?return?to?the?pool?for?future?reuse

}

}

}

}

}

}

/**

*?在StreamingContext調(diào)用start方法的內(nèi)部其實(shí)是會(huì)啟動(dòng)JobScheduler的Start方法,進(jìn)行消息循環(huán),

*?在JobScheduler的start內(nèi)部會(huì)構(gòu)造JobGenerator和ReceiverTacker,并且調(diào)用JobGenerator和

*?ReceiverTacker的start方法:

* 1,JobGenerator啟動(dòng)后會(huì)不斷的根據(jù)batchDuration生成一個(gè)個(gè)的Job

* 2,ReceiverTracker啟動(dòng)后首先在Spark?Cluster中啟動(dòng)Receiver(其實(shí)是在Executor中先啟動(dòng)

* ReceiverSupervisor),在Receiver收到數(shù)據(jù)后會(huì)通過ReceiverSupervisor存儲(chǔ)到Executor并且

* 把數(shù)據(jù)的Metadata信息發(fā)送給Driver中的ReceiverTracker,在ReceiverTracker內(nèi)部會(huì)通過

* ReceivedBlockTracker來管理接受到的元數(shù)據(jù)信息

* 每個(gè)BatchInterval會(huì)產(chǎn)生一個(gè)具體的Job,其實(shí)這里的Job不是Spark?Core中所指的Job,它只是基于

* DStreamGraph而生成的RDD的DAG而已,從Java角度講,相當(dāng)于Runnable接口實(shí)例,此時(shí)要想運(yùn)行Job

* 需要提交給JobScheduler,在JobScheduler中通過線程池的方式找到一個(gè)單獨(dú)的線程來提交Job到集群

* 運(yùn)行(其實(shí)是在線程中基于RDD的Action觸發(fā)真正的作業(yè)的運(yùn)行),為什么使用線程池呢?

* 1,作業(yè)不斷生成,所以為了提升效率,我們需要線程池;這和在Executor中通過線程池執(zhí)行Task

* 有異曲同工之妙;

* 2,有可能設(shè)置了Job的FAIR公平調(diào)度的方式,這個(gè)時(shí)候也需要多線程的支持;

*/

ssc.start()

ssc.awaitTermination()

}

}

2 基于案例貫通Spark Streaming的運(yùn)行源碼

我們將基于以上案例,粗略地分析一下Spark源碼,提示一些有針對(duì)性的內(nèi)容,以了解其運(yùn)行的主要流程。

代碼沒有直接使用SparkContext,而是使用StreamingContext。

我們來看看StreamingContext的源碼片段:

/**

*?Create?a?StreamingContext?by?providing?the?configuration?necessary?for?a?new?SparkContext.

*?@param?conf?a?org.apache.spark.SparkConf?object?specifying?Spark?parameters

*?@param?batchDuration?the?time?interval?at?which?streaming?data?will?be?divided?into?batches

*/

def?this(conf:?SparkConf,?batchDuration:?Duration)?=?{

this(StreamingContext.createNewSparkContext(conf),?null,?batchDuration)

}

沒錯(cuò),createNewSparkContext就是創(chuàng)建SparkContext:

private[streaming]?def?createNewSparkContext(conf:?SparkConf):?SparkContext?=?{

new?SparkContext(conf)

}

這說明Spark Streaming也是Spark上的一個(gè)應(yīng)用程序。

案例最開始,肯定要通過數(shù)據(jù)流創(chuàng)建一個(gè)InputDStream。

val?userClickLogsDStream?=?ssc.socketTextStream("Master",?9999)

socketTextStream方法定義如下:

/**

*?Create?a?input?stream?from?TCP?source?hostname:port.?Data?is?received?using

*?a?TCP?socket?and?the?receive?bytes?is?interpreted?as?UTF8?encoded?`\n`?delimited

*?lines.

*?@param?hostname??????Hostname?to?connect?to?for?receiving?data

*?@param?port??????????Port?to?connect?to?for?receiving?data

*?@param?storageLevel??Storage?level?to?use?for?storing?the?received?objects

*??????????????????????(default:?StorageLevel.MEMORY_AND_DISK_SER_2)

*/

def?socketTextStream(

hostname:?String,

port:?Int,

storageLevel:?StorageLevel?=?StorageLevel.MEMORY_AND_DISK_SER_2

):?ReceiverInputDStream[String]?=?withNamedScope("socket?text?stream")?{

socketStream[String](hostname,?port,?SocketReceiver.bytesToLines,?storageLevel)

}

可看到代碼最后面調(diào)用socketStream。

socketStream定義如下:

/**

*?Create?a?input?stream?from?TCP?source?hostname:port.?Data?is?received?using

*?a?TCP?socket?and?the?receive?bytes?it?interepreted?as?object?using?the?given

*?converter.

*?@param?hostname??????Hostname?to?connect?to?for?receiving?data

*?@param?port??????????Port?to?connect?to?for?receiving?data

*?@param?converter?????Function?to?convert?the?byte?stream?to?objects

*?@param?storageLevel??Storage?level?to?use?for?storing?the?received?objects

*?@tparam?T????????????Type?of?the?objects?received?(after?converting?bytes?to?objects)

*/

def?socketStream[T:?ClassTag](

hostname:?String,

port:?Int,

converter:?(InputStream)?=>?Iterator[T],

storageLevel:?StorageLevel

):?ReceiverInputDStream[T]?=?{

new?SocketInputDStream[T](this,?hostname,?port,?converter,?storageLevel)

}

實(shí)際上生成SocketInputDStream。

SocketInputDStream類如下:

private[streaming]

class?SocketInputDStream[T:?ClassTag](

ssc_?:?StreamingContext,

host:?String,

port:?Int,

bytesToObjects:?InputStream?=>?Iterator[T],

storageLevel:?StorageLevel

)?extendsReceiverInputDStream[T](ssc_)?{

def?getReceiver():?Receiver[T]?=?{

new?SocketReceiver(host,?port,?bytesToObjects,?storageLevel)

}

}

SocketInputDStream繼承ReceiverInputDStream。

其中實(shí)現(xiàn)getReceiver方法,返回SocketReceiver對(duì)象。

總結(jié)一下SocketInputDStream的繼承關(guān)系:

SocketInputDStream -> ReceiverInputDStream -> InputDStream -> DStream。

DStream是生成RDD的模板,是邏輯級(jí)別,當(dāng)達(dá)到Interval的時(shí)候這些模板會(huì)被BatchData實(shí)例化成為RDD和DAG。

看看DStream的源碼片段:

//?RDDs?generated,?marked?as?private[streaming]?so?that?testsuites?can?access?it

@transient

private[streaming]?var?generatedRDDs?=new?HashMap[Time,?RDD[T]]?()

看看DStream的getOrCompute:

/**

*?Get?the?RDD?corresponding?to?the?given?time;?either?retrieve?it?from?cache

*?or?compute-and-cache?it.

*/

private[streaming]?final?def?getOrCompute(time:?Time):?Option[RDD[T]]?=?{

//?If?RDD?was?already?generated,?then?retrieve?it?from?HashMap,

//?or?else?compute?the?RDD

generatedRDDs.get(time).orElse?{

//?Compute?the?RDD?if?time?is?valid?(e.g.?correct?time?in?a?sliding?window)

//?of?RDD?generation,?else?generate?nothing.

if?(isTimeValid(time))?{

val?rddOption?=?createRDDWithLocalProperties(time,?displayInnerRDDOps?=?false)?{

//?Disable?checks?for?existing?output?directories?in?jobs?launched?by?the?streaming

//?scheduler,?since?we?may?need?to?write?output?to?an?existing?directory?during?checkpoint

//?recovery;?see?SPARK-4835?for?more?details.?We?need?to?have?this?call?here?because

//?compute()?might?cause?Spark?jobs?to?be?launched.

PairRDDFunctions.disableOutputSpecValidation.withValue(true)?{

compute(time)

}

}

rddOption.foreach?{?case?newRDD?=>

//?Register?the?generated?RDD?for?caching?and?checkpointing

if?(storageLevel?!=?StorageLevel.NONE)?{

newRDD.persist(storageLevel)

logDebug(s"Persisting?RDD?${newRDD.id}?for?time?$time?to?$storageLevel")

}

if?(checkpointDuration?!=?null?&&?(time?-?zeroTime).isMultipleOf(checkpointDuration))?{

newRDD.checkpoint()

logInfo(s"Marking?RDD?${newRDD.id}?for?time?$time?for?checkpointing")

}

generatedRDDs.put(time,?newRDD)

}

rddOption

}?else?{

None

}

}

}

主要是生成RDD,再將生成的RDD放在HashMap中。具體生成RDD過程以后剖析。

目前大致講了DStream和RDD這些核心概念在Spark Streaming中的使用。

體現(xiàn)Spark Streaming應(yīng)用運(yùn)行流程的關(guān)鍵類如下圖所示。

先看看ScreamingContext的start()。start()方法啟動(dòng)StreamContext,由于Spark應(yīng)用程序不能有多個(gè)SparkContext對(duì)象實(shí)例,所以Spark?Streaming框架在啟動(dòng)時(shí)對(duì)狀態(tài)進(jìn)行判斷。代碼如下:

/**

*?Start?the?execution?of?the?streams.

*

*?@throws?IllegalStateException?if?the?StreamingContext?is?already?stopped.

*/

def?start():?Unit?=?synchronized?{

state?match?{

case?INITIALIZED?=>

startSite.set(DStream.getCreationSite())

StreamingContext.ACTIVATION_LOCK.synchronized?{

StreamingContext.assertNoOtherContextIsActive()

try?{

validate()

//?Start?the?streaming?scheduler?in?a?new?thread,?so?that?thread?local?properties

//?like?call?sites?and?job?groups?can?be?reset?without?affecting?those?of?the

//?current?thread.

ThreadUtils.runInNewThread("streaming-start")?{

sparkContext.setCallSite(startSite.get)

sparkContext.clearJobGroup()

sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL,?"false")

//啟動(dòng)JobScheduler

scheduler.start()

}

state?=?StreamingContextState.ACTIVE

}?catch?{

case?NonFatal(e)?=>

logError("Error?starting?the?context,?marking?it?as?stopped",?e)

scheduler.stop(false)

state?=?StreamingContextState.STOPPED

throw?e

}

StreamingContext.setActiveContext(this)

}

shutdownHookRef?=?ShutdownHookManager.addShutdownHook(

StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)

//?Registering?Streaming?Metrics?at?the?start?of?the?StreamingContext

assert(env.metricsSystem?!=?null)

env.metricsSystem.registerSource(streamingSource)

uiTab.foreach(_.attach())

logInfo("StreamingContext?started")

case?ACTIVE?=>

logWarning("StreamingContext?has?already?been?started")

case?STOPPED?=>

throw?new?IllegalStateException("StreamingContext?has?already?been?stopped")

}

}

初始狀態(tài)時(shí),會(huì)啟動(dòng)JobScheduler。

來看下JobScheduler的啟動(dòng)過程start()。其中啟動(dòng)了EventLoop、StreamListenerBus、ReceiverTracker和jobGenerator等多項(xiàng)工作。

def?start():?Unit?=?synchronized?{

if?(eventLoop?!=?null)?return?//?scheduler?has?already?been?started

logDebug("Starting?JobScheduler")

eventLoop?=?new?EventLoop[JobSchedulerEvent]("JobScheduler")?{

override?protected?defonReceive(event:?JobSchedulerEvent):?Unit?=processEvent(event)

override?protected?defonError(e:?Throwable):?Unit?=reportError("Error?in?job?scheduler",?e)

}

// 啟動(dòng)消息循環(huán)處理線程。用于處理JobScheduler的各種事件。

eventLoop.start()

//?attach?rate?controllers?of?input?streams?to?receive?batch?completion?updates

for?{

inputDStream?<-?ssc.graph.getInputStreams

rateController?<-?inputDStream.rateController

}?ssc.addStreamingListener(rateController)

// 啟動(dòng)監(jiān)聽器。用于更新Spark UI中StreamTab的內(nèi)容。

listenerBus.start(ssc.sparkContext)

receiverTracker?=?new?ReceiverTracker(ssc)

// 生成InputInfoTracker。用于管理所有的輸入的流,以及他們輸入的數(shù)據(jù)統(tǒng)計(jì)。這些信息將通過?StreamingListener監(jiān)聽。

inputInfoTracker?=?new?InputInfoTracker(ssc)

// 啟動(dòng)ReceiverTracker。用于處理數(shù)據(jù)接收、數(shù)據(jù)緩存、Block生成。

receiverTracker.start()

// 啟動(dòng)JobGenerator。用于DStreamGraph初始化、DStream與RDD的轉(zhuǎn)換、生成Job、提交執(zhí)行等工作。

jobGenerator.start()

logInfo("Started?JobScheduler")

}

JobScheduler中的消息處理函數(shù)processEvent,處理三類消息:Job已開始,Job已完成,錯(cuò)誤報(bào)告。

private?def?processEvent(event:?JobSchedulerEvent)?{

try?{

event?match?{

caseJobStarted(job,?startTime)?=>?handleJobStart(job,?startTime)

caseJobCompleted(job,?completedTime)?=>?handleJobCompletion(job,?completedTime)

caseErrorReported(m,?e)?=>?handleError(m,?e)

}

}?catch?{

case?e:?Throwable?=>

reportError("Error?in?job?scheduler",?e)

}

}

我們?cè)俅致缘胤治鲆幌翵obScheduler.start()中啟動(dòng)的工作。

先看JobScheduler.start()啟動(dòng)的第一項(xiàng)工作EventLoop。EventLoop用于處理JobScheduler的各種事件。

EventLoop中有事件隊(duì)列:

private?valeventQueue:?BlockingQueue[E]?=?new?LinkedBlockingDeque[E]()

還有一個(gè)線程處理隊(duì)列中的事件:

private?val?eventThread?=?new?Thread(name)?{

setDaemon(true)

override?def?run():?Unit?=?{

try?{

while?(!stopped.get)?{

val?event?=?eventQueue.take()

try?{

onReceive(event)

}?catch?{

case?NonFatal(e)?=>?{

try?{

onError(e)

}?catch?{

case?NonFatal(e)?=>?logError("Unexpected?error?in?"?+?name,?e)

}

}

}

}

}?catch?{

case?ie:?InterruptedException?=>?//?exit?even?if?eventQueue?is?not?empty

case?NonFatal(e)?=>?logError("Unexpected?error?in?"?+?name,?e)

}

}

}

這個(gè)線程中的onReceive、onError,在JobScheduler中的EventLoop實(shí)例化時(shí)已定義。

JobScheduler.start()啟動(dòng)的第二項(xiàng)工作StreamListenerBus。用于異步傳遞StreamingListenerEvents到注冊(cè)的StreamingListeners。用于更新Spark UI中StreamTab的內(nèi)容。

以下代碼用于傳遞各種事件:

override?def?onPostEvent(listener:?StreamingListener,?event:?StreamingListenerEvent):?Unit?=?{

event?match?{

casereceiverStarted:?StreamingListenerReceiverStarted?=>

listener.onReceiverStarted(receiverStarted)

casereceiverError:?StreamingListenerReceiverError?=>

listener.onReceiverError(receiverError)

casereceiverStopped:?StreamingListenerReceiverStopped?=>

listener.onReceiverStopped(receiverStopped)

casebatchSubmitted:?StreamingListenerBatchSubmitted?=>

listener.onBatchSubmitted(batchSubmitted)

casebatchStarted:?StreamingListenerBatchStarted?=>

listener.onBatchStarted(batchStarted)

casebatchCompleted:?StreamingListenerBatchCompleted?=>

listener.onBatchCompleted(batchCompleted)

caseoutputOperationStarted:?StreamingListenerOutputOperationStarted?=>

listener.onOutputOperationStarted(outputOperationStarted)

caseoutputOperationCompleted:?StreamingListenerOutputOperationCompleted?=>

listener.onOutputOperationCompleted(outputOperationCompleted)

case?_?=>

}

}

看JobScheduler.start()啟動(dòng)的第三項(xiàng)工作ReceiverTracker。

ReceiverTracker用于管理所有的輸入的流,以及他們輸入的數(shù)據(jù)統(tǒng)計(jì)。這些信息將通過?StreamingListener監(jiān)聽。

ReceiverTracker的start()中,會(huì)內(nèi)部實(shí)例化ReceiverTrackerEndpoint這個(gè)Rpc消息通信體。

def?start():?Unit?=?synchronized?{

if?(isTrackerStarted)?{

throw?new?SparkException("ReceiverTracker?already?started")

}

if?(!receiverInputStreams.isEmpty)?{

endpoint?=?ssc.env.rpcEnv.setupEndpoint(

"ReceiverTracker",new?ReceiverTrackerEndpoint(ssc.env.rpcEnv))

if?(!skipReceiverLaunch)launchReceivers()

logInfo("ReceiverTracker?started")

trackerState?=?Started

}

}

在ReceiverTracker啟動(dòng)的過程中會(huì)調(diào)用其launchReceivers方法:

/**

*?Get?the?receivers?from?the?ReceiverInputDStreams,?distributes?them?to?the

*?worker?nodes?as?a?parallel?collection,?and?runs?them.

*/

private?def?launchReceivers():?Unit?=?{

val?receivers?=?receiverInputStreams.map(nis?=>?{

val?rcvr?=?nis.getReceiver()

rcvr.setReceiverId(nis.id)

rcvr

})

runDummySparkJob()

logInfo("Starting?"?+?receivers.length?+?"?receivers")

endpoint.send(StartAllReceivers(receivers))

}

其中調(diào)用了runDummySparkJob方法來啟動(dòng)Spark?Streaming的框架第一個(gè)Job,其中collect這個(gè)action操作會(huì)觸發(fā)Spark?Job的執(zhí)行。這個(gè)方法是為了確保每個(gè)Slave都注冊(cè)上,避免所有Receiver都在一個(gè)節(jié)點(diǎn),使后面的計(jì)算能負(fù)載均衡。

/**

*?Run?the?dummy?Spark?job?to?ensure?that?all?slaves?have?registered.?This?avoids?all?the

*?receivers?to?be?scheduled?on?the?same?node.

*

*?TODO?Should?poll?the?executor?number?and?wait?for?executors?according?to

*?"spark.scheduler.minRegisteredResourcesRatio"?and

*?"spark.scheduler.maxRegisteredResourcesWaitingTime"?rather?than?running?a?dummy?job.

*/

private?def?runDummySparkJob():?Unit?=?{

if?(!ssc.sparkContext.isLocal)?{

ssc.sparkContext.makeRDD(1?to?50,?50).map(x?=>?(x,?1)).reduceByKey(_?+?_,?20).collect()

}

assert(getExecutors.nonEmpty)

}

ReceiverTracker.launchReceivers()還調(diào)用了endpoint.send(StartAllReceivers(receivers))方法,Rpc消息通信體發(fā)送StartAllReceivers消息。

ReceiverTrackerEndpoint它自己接收到消息后,先根據(jù)調(diào)度策略獲得Recevier在哪個(gè)Executor上運(yùn)行,然后在調(diào)用startReceiver(receiver,?executors)方法,來啟動(dòng)Receiver。

override?def?receive:?PartialFunction[Any,?Unit]?=?{

//?Local?messages

caseStartAllReceivers(receivers)?=>

val?scheduledLocations?=?schedulingPolicy.scheduleReceivers(receivers,?getExecutors)

for?(receiver?<-?receivers)?{

val?executors?=?scheduledLocations(receiver.streamId)

updateReceiverScheduledExecutors(receiver.streamId,?executors)

receiverPreferredLocations(receiver.streamId)?=?receiver.preferredLocation

startReceiver(receiver,?executors)

}

在startReceiver方法中,ssc.sparkContext.submitJob提交Job的時(shí)候傳入startReceiverFunc這個(gè)方法,因?yàn)閟tartReceiverFunc該方法是在Executor上執(zhí)行的。而在startReceiverFunc方法中是實(shí)例化ReceiverSupervisorImpl對(duì)象,該對(duì)象是對(duì)Receiver進(jìn)行管理和監(jiān)控。這個(gè)Job是Spark?Streaming框架為我們啟動(dòng)的第二個(gè)Job,且一直運(yùn)行。因?yàn)閟upervisor.awaitTermination()該方法會(huì)阻塞等待退出。

/**

*?Start?a?receiver?along?with?its?scheduled?executors

*/

private?def?startReceiver(

receiver:?Receiver[_],

scheduledLocations:?Seq[TaskLocation]):?Unit?=?{

def?shouldStartReceiver:?Boolean?=?{

//?It's?okay?to?start?when?trackerState?is?Initialized?or?Started

!(isTrackerStopping?||?isTrackerStopped)

}

val?receiverId?=?receiver.streamId

if?(!shouldStartReceiver)?{

onReceiverJobFinish(receiverId)

return

}

val?checkpointDirOption?=?Option(ssc.checkpointDir)

val?serializableHadoopConf?=

new?SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)

//?Function?to?start?the?receiver?on?the?worker?node

valstartReceiverFunc:?Iterator[Receiver[_]]?=>?Unit?=

(iterator:?Iterator[Receiver[_]])?=>?{

if?(!iterator.hasNext)?{

throw?new?SparkException(

"Could?not?start?receiver?as?object?not?found.")

}

if?(TaskContext.get().attemptNumber()?==?0)?{

val?receiver?=?iterator.next()

assert(iterator.hasNext?==?false)

//實(shí)例化Receiver監(jiān)控者

val?supervisor?=new?ReceiverSupervisorImpl(

receiver,?SparkEnv.get,?serializableHadoopConf.value,?checkpointDirOption)

supervisor.start()

supervisor.awaitTermination()

}?else?{

//?It's?restarted?by?TaskScheduler,?but?we?want?to?reschedule?it?again.?So?exit?it.

}

}

//?Create?the?RDD?using?the?scheduledLocations?to?run?the?receiver?in?a?Spark?job

val?receiverRDD:?RDD[Receiver[_]]?=

if?(scheduledLocations.isEmpty)?{

ssc.sc.makeRDD(Seq(receiver),?1)

}?else?{

val?preferredLocations?=?scheduledLocations.map(_.toString).distinct

ssc.sc.makeRDD(Seq(receiver?->?preferredLocations))

}

receiverRDD.setName(s"Receiver?$receiverId")

ssc.sparkContext.setJobDescription(s"Streaming?job?running?receiver?$receiverId")

ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))

val?future?=ssc.sparkContext.submitJob[Receiver[_],?Unit,?Unit](

receiverRDD,startReceiverFunc,?Seq(0),?(_,?_)?=>?Unit,?())

//?We?will?keep?restarting?the?receiver?job?until?ReceiverTracker?is?stopped

future.onComplete?{

case?Success(_)?=>

if?(!shouldStartReceiver)?{

onReceiverJobFinish(receiverId)

}?else?{

logInfo(s"Restarting?Receiver?$receiverId")

self.send(RestartReceiver(receiver))

}

case?Failure(e)?=>

if?(!shouldStartReceiver)?{

onReceiverJobFinish(receiverId)

}?else?{

logError("Receiver?has?been?stopped.?Try?to?restart?it.",?e)

logInfo(s"Restarting?Receiver?$receiverId")

self.send(RestartReceiver(receiver))

}

}(submitJobThreadPool)

logInfo(s"Receiver?${receiver.streamId}?started")

}

接下來看下ReceiverSupervisorImpl的啟動(dòng)過程,先啟動(dòng)所有注冊(cè)上的BlockGenerator對(duì)象,然后向ReceiverTrackerEndpoint發(fā)送RegisterReceiver消息,再調(diào)用receiver的onStart方法。

/**?Start?the?supervisor?*/

def?start()?{

onStart()

startReceiver()

}

其中的onStart():

override?protected?defonStart()?{

registeredBlockGenerators.foreach?{?_.start()?}

}

其中的startReceiver():

/**?Start?receiver?*/

defstartReceiver():?Unit?=?synchronized?{

try?{

if?(onReceiverStart())?{

logInfo("Starting?receiver")

receiverState?=?Started

receiver.onStart()

logInfo("Called?receiver?onStart")

}?else?{

//?The?driver?refused?us

stop("Registered?unsuccessfully?because?Driver?refused?to?start?receiver?"?+?streamId,?None)

}

}?catch?{

case?NonFatal(t)?=>

stop("Error?starting?receiver?"?+?streamId,?Some(t))

}

}

override?protected?def?onReceiverStart():?Boolean?=?{

val?msg?=?RegisterReceiver(

streamId,?receiver.getClass.getSimpleName,?host,?executorId,?endpoint)

trackerEndpoint.askWithRetry[Boolean](msg)

}

其中在Driver運(yùn)行的ReceiverTrackerEndpoint對(duì)象接收到RegisterReceiver消息后,將streamId,?typ,?host,?executorId,?receiverEndpoint封裝為ReceiverTrackingInfo保存到內(nèi)存對(duì)象receiverTrackingInfos這個(gè)HashMap中。

override?def?receiveAndReply(context:?RpcCallContext):?PartialFunction[Any,?Unit]?=?{

//?Remote?messages

caseRegisterReceiver(streamId,?typ,?host,?executorId,?receiverEndpoint)?=>

val?successful?=

registerReceiver(streamId,?typ,?host,?executorId,?receiverEndpoint,?context.senderAddress)

context.reply(successful)

case?AddBlock(receivedBlockInfo)?=>

if?(WriteAheadLogUtils.isBatchingEnabled(ssc.conf,?isDriver?=?true))?{

walBatchingThreadPool.execute(new?Runnable?{

override?def?run():?Unit?=?Utils.tryLogNonFatalError?{

if?(active)?{

context.reply(addBlock(receivedBlockInfo))

}?else?{

throw?new?IllegalStateException("ReceiverTracker?RpcEndpoint?shut?down.")

}

}

})

}?else?{

context.reply(addBlock(receivedBlockInfo))

}

/**?Register?a?receiver?*/

private?defregisterReceiver(

streamId:?Int,

typ:?String,

host:?String,

executorId:?String,

receiverEndpoint:?RpcEndpointRef,

senderAddress:?RpcAddress

):?Boolean?=?{

if?(!receiverInputStreamIds.contains(streamId))?{

throw?new?SparkException("Register?received?for?unexpected?id?"?+?streamId)

}

if?(isTrackerStopping?||?isTrackerStopped)?{

return?false

}

val?scheduledLocations?=?receiverTrackingInfos(streamId).scheduledLocations

val?acceptableExecutors?=?if?(scheduledLocations.nonEmpty)?{

//?This?receiver?is?registering?and?it's?scheduled?by

//?ReceiverSchedulingPolicy.scheduleReceivers.?So?use?"scheduledLocations"?to?check?it.

scheduledLocations.get

}?else?{

//?This?receiver?is?scheduled?by?"ReceiverSchedulingPolicy.rescheduleReceiver",?so?calling

//?"ReceiverSchedulingPolicy.rescheduleReceiver"?again?to?check?it.

scheduleReceiver(streamId)

}

def?isAcceptable:?Boolean?=?acceptableExecutors.exists?{

case?loc:?ExecutorCacheTaskLocation?=>?loc.executorId?==?executorId

case?loc:?TaskLocation?=>?loc.host?==?host

}

if?(!isAcceptable)?{

//?Refuse?it?since?it's?scheduled?to?a?wrong?executor

false

}?else?{

val?name?=?s"${typ}-${streamId}"

val?receiverTrackingInfo?=ReceiverTrackingInfo(

streamId,

ReceiverState.ACTIVE,

scheduledLocations?=?None,

runningExecutor?=?Some(ExecutorCacheTaskLocation(host,?executorId)),

name?=?Some(name),

endpoint?=?Some(receiverEndpoint))

receiverTrackingInfos.put(streamId,?receiverTrackingInfo)

listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo))

logInfo("Registered?receiver?for?stream?"?+?streamId?+?"?from?"?+?senderAddress)

true

}

}

Receiver的啟動(dòng),以ssc.socketTextStream("localhost",?9999)為例,創(chuàng)建的是SocketReceiver對(duì)象。內(nèi)部啟動(dòng)一個(gè)線程來連接Socket?Server,讀取socket數(shù)據(jù)并存儲(chǔ)。

private[streaming]

class?SocketReceiver[T:?ClassTag](

host:?String,

port:?Int,

bytesToObjects:?InputStream?=>?Iterator[T],

storageLevel:?StorageLevel

)?extends?Receiver[T](storageLevel)?with?Logging?{

def?onStart()?{

//?Start?the?thread?that?receives?data?over?a?connection

new?Thread("Socket?Receiver")?{

setDaemon(true)

override?def?run()?{?receive()?}

}.start()

}

def?onStop()?{

//?There?is?nothing?much?to?do?as?the?thread?calling?receive()

//?is?designed?to?stop?by?itself?isStopped()?returns?false

}

/**?Create?a?socket?connection?and?receive?data?until?receiver?is?stopped?*/

def?receive()?{

var?socket:?Socket?=?null

try?{

logInfo("Connecting?to?"?+?host?+?":"?+?port)

socket?=?new?Socket(host,?port)

logInfo("Connected?to?"?+?host?+?":"?+?port)

val?iterator?=?bytesToObjects(socket.getInputStream())

while(!isStopped?&&?iterator.hasNext)?{

store(iterator.next)

}

if?(!isStopped())?{

restart("Socket?data?stream?had?no?more?data")

}?else?{

logInfo("Stopped?receiving")

}

}?catch?{

case?e:?java.net.ConnectException?=>

restart("Error?connecting?to?"?+?host?+?":"?+?port,?e)

case?NonFatal(e)?=>

logWarning("Error?receiving?data",?e)

restart("Error?receiving?data",?e)

}?finally?{

if?(socket?!=?null)?{

socket.close()

logInfo("Closed?socket?to?"?+?host?+?":"?+?port)

}

}

}

}

接下來看JobScheduler.start()中啟動(dòng)的第四項(xiàng)工作JobGenerator。

JobGenerator有成員RecurringTimer,用于啟動(dòng)消息系統(tǒng)和定時(shí)器。按照batchInterval時(shí)間間隔定期發(fā)送GenerateJobs消息。

//根據(jù)創(chuàng)建StreamContext時(shí)傳入的batchInterval,定時(shí)發(fā)送GenerateJobs消息

private?val?timer?=new?RecurringTimer(clock,?ssc.graph.batchDuration.milliseconds,

longTime?=>?eventLoop.post(GenerateJobs(new?Time(longTime))),?"JobGenerator")

JobGenerator的start()方法:

/**?Start?generation?of?jobs?*/

def?start():?Unit?=?synchronized?{

if?(eventLoop?!=?null)?return?//?generator?has?already?been?started

//?Call?checkpointWriter?here?to?initialize?it?before?eventLoop?uses?it?to?avoid?a?deadlock.

//?See?SPARK-10125

checkpointWriter

eventLoop?=?new?EventLoop[JobGeneratorEvent]("JobGenerator")?{

override?protected?def?onReceive(event:?JobGeneratorEvent):?Unit?=?processEvent(event)

override?protected?def?onError(e:?Throwable):?Unit?=?{

jobScheduler.reportError("Error?in?job?generator",?e)

}

}

// 啟動(dòng)消息循環(huán)處理線程

eventLoop.start()

if?(ssc.isCheckpointPresent)?{

restart()

}?else?{

// 開啟定時(shí)生成Job的定時(shí)器

startFirstTime()

}

}

JobGenerator.start()中的startFirstTime()的定義:

/**?Starts?the?generator?for?the?first?time?*/

private?def?startFirstTime()?{

val?startTime?=?new?Time(timer.getStartTime())

graph.start(startTime?-?graph.batchDuration)

timer.start(startTime.milliseconds)

logInfo("Started?JobGenerator?at?"?+?startTime)

}

JobGenerator.start()中的processEvent()的定義:

/**?Processes?all?events?*/

private?def?processEvent(event:?JobGeneratorEvent)?{

logDebug("Got?event?"?+?event)

event?match?{

caseGenerateJobs(time)?=>generateJobs(time)

case?ClearMetadata(time)?=>?clearMetadata(time)

case?DoCheckpoint(time,?clearCheckpointDataLater)?=>

doCheckpoint(time,?clearCheckpointDataLater)

case?ClearCheckpointData(time)?=>?clearCheckpointData(time)

}

}

其中g(shù)enerateJobs的定義:

/**?Generate?jobs?and?perform?checkpoint?for?the?given?`time`.??*/

private?def?generateJobs(time:?Time)?{

//?Set?the?SparkEnv?in?this?thread,?so?that?job?generation?code?can?access?the?environment

//?Example:?BlockRDDs?are?created?in?this?thread,?and?it?needs?to?access?BlockManager

//?Update:?This?is?probably?redundant?after?threadlocal?stuff?in?SparkEnv?has?been?removed.

SparkEnv.set(ssc.env)

Try?{

// 根據(jù)特定的時(shí)間獲取具體的數(shù)據(jù)

jobScheduler.receiverTracker.allocateBlocksToBatch(time)?//?allocate?received?blocks?to?batch

//調(diào)用DStreamGraph的generateJobs生成Job

graph.generateJobs(time)?//?generate?jobs?using?allocated?block

}?match?{

case?Success(jobs)?=>

val?streamIdToInputInfos?=?jobScheduler.inputInfoTracker.getInfo(time)

jobScheduler.submitJobSet(JobSet(time,?jobs,?streamIdToInputInfos))

case?Failure(e)?=>

jobScheduler.reportError("Error?generating?jobs?for?time?"?+?time,?e)

}

eventLoop.post(DoCheckpoint(time,?clearCheckpointDataLater?=?false))

}

/**?Perform?checkpoint?for?the?give?`time`.?*/

private?def?doCheckpoint(time:?Time,?clearCheckpointDataLater:?Boolean)?{

if?(shouldCheckpoint?&&?(time?-?graph.zeroTime).isMultipleOf(ssc.checkpointDuration))?{

logInfo("Checkpointing?graph?for?time?"?+?time)

ssc.graph.updateCheckpointData(time)

checkpointWriter.write(new?Checkpoint(ssc,?time),?clearCheckpointDataLater)

}

}

DStreamGraph的generateJobs方法,調(diào)用輸出流的generateJob方法來生成Jobs集合。

// 輸出流:具體Action的輸出操作

private?val?outputStreams?=?new?ArrayBuffer[DStream[_]]()

def?generateJobs(time:?Time):?Seq[Job]?=?{

logDebug("Generating?jobs?for?time?"?+?time)

val?jobs?=?this.synchronized?{

outputStreams.flatMap?{?outputStream?=>

val?jobOption?=?outputStream.generateJob(time)

jobOption.foreach(_.setCallSite(outputStream.creationSite))

jobOption

}

}

logDebug("Generated?"?+?jobs.length?+?"?jobs?for?time?"?+?time)

jobs

}

來看下DStream的generateJob方法,調(diào)用getOrCompute方法來獲取當(dāng)Interval的時(shí)候,DStreamGraph會(huì)被BatchData實(shí)例化成為RDD,如果有RDD則封裝jobFunc方法,里面包含context.sparkContext.runJob(rdd,?emptyFunc),然后返回封裝后的Job。

/**

*?Generate?a?SparkStreaming?job?for?the?given?time.?This?is?an?internal?method?that

*?should?not?be?called?directly.?This?default?implementation?creates?a?job

*?that?materializes?the?corresponding?RDD.?Subclasses?of?DStream?may?override?this

*?to?generate?their?own?jobs.

*/

private[streaming]?def?generateJob(time:?Time):?Option[Job]?=?{

getOrCompute(time)?match?{

case?Some(rdd)?=>?{

val?jobFunc?=?()?=>?{

val?emptyFunc?=?{?(iterator:?Iterator[T])?=>?{}?}

context.sparkContext.runJob(rdd,?emptyFunc)

}

Some(new?Job(time,jobFunc))

}

case?None?=>?None

}

}

接下來看JobScheduler的submitJobSet方法,向線程池中提交JobHandler。而JobHandler實(shí)現(xiàn)了Runnable?接口,最終調(diào)用了job.run()這個(gè)方法。看一下Job類的定義,其中run方法調(diào)用的func為構(gòu)造Job時(shí)傳入的jobFunc,其包含了context.sparkContext.runJob(rdd,?emptyFunc)操作,最終導(dǎo)致Job的提交。

def?submitJobSet(jobSet:?JobSet)?{

if?(jobSet.jobs.isEmpty)?{

logInfo("No?jobs?added?for?time?"?+?jobSet.time)

}?else?{

listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))

jobSets.put(jobSet.time,?jobSet)

jobSet.jobs.foreach(job?=>?jobExecutor.execute(new?JobHandler(job)))

logInfo("Added?jobs?for?time?"?+?jobSet.time)

}

}

private?class?JobHandler(job:?Job)?extends?Runnable?with?Logging?{

import?JobScheduler._

def?run()?{

try?{

val?formattedTime?=?UIUtils.formatBatchTime(

job.time.milliseconds,?ssc.graph.batchDuration.milliseconds,?showYYYYMMSS?=?false)

val?batchUrl?=?s"/streaming/batch/?id=${job.time.milliseconds}"

val?batchLinkText?=?s"[output?operation?${job.outputOpId},?batch?time?${formattedTime}]"

ssc.sc.setJobDescription(

s"""Streaming?job?from?$batchLinkText""")

ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY,?job.time.milliseconds.toString)

ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY,?job.outputOpId.toString)

//?We?need?to?assign?`eventLoop`?to?a?temp?variable.?Otherwise,?because

//?`JobScheduler.stop(false)`?may?set?`eventLoop`?to?null?when?this?method?is?running,?then

//?it's?possible?that?when?`post`?is?called,?`eventLoop`?happens?to?null.

var?_eventLoop?=?eventLoop

if?(_eventLoop?!=?null)?{

_eventLoop.post(JobStarted(job,?clock.getTimeMillis()))

//?Disable?checks?for?existing?output?directories?in?jobs?launched?by?the?streaming

//?scheduler,?since?we?may?need?to?write?output?to?an?existing?directory?during?checkpoint

//?recovery;?see?SPARK-4835?for?more?details.

PairRDDFunctions.disableOutputSpecValidation.withValue(true)?{

job.run()

}

_eventLoop?=?eventLoop

if?(_eventLoop?!=?null)?{

_eventLoop.post(JobCompleted(job,?clock.getTimeMillis()))

}

}?else?{

//?JobScheduler?has?been?stopped.

}

}?finally?{

ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY,?null)

ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY,?null)

}

}

}

}

Job的代碼片段:

private[streaming]

class?Job(val?time:?Time,?func:?()?=>?_)?{

private?var?_id:?String?=?_

private?var?_outputOpId:?Int?=?_

private?var?isSet?=?false

private?var?_result:?Try[_]?=?null

private?var?_callSite:?CallSite?=?null

private?var?_startTime:?Option[Long]?=?None

private?var?_endTime:?Option[Long]?=?None

defrun()?{

_result?=?Try(func())

}

備注:

資料來源于:DT_大數(shù)據(jù)夢(mèng)工廠(Spark發(fā)行版本定制)

更多私密內(nèi)容,請(qǐng)關(guān)注微信公眾號(hào):DT_Spark

如果您對(duì)大數(shù)據(jù)Spark感興趣,可以免費(fèi)聽由王家林老師每天晚上20:00開設(shè)的Spark永久免費(fèi)公開課,地址YY房間號(hào):68917580

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容