1在線動(dòng)態(tài)計(jì)算分類最熱門商品案例回顧與演示
我們用Spark?Streaming+Spark?SQL來實(shí)現(xiàn)分類最熱門商品的在線動(dòng)態(tài)計(jì)算。代碼如下:
package?com.dt.spark.streaming
import?org.apache.spark.SparkConf
import?org.apache.spark.sql.Row
import?org.apache.spark.sql.hive.HiveContext
import?org.apache.spark.sql.types.{IntegerType,?StringType,?StructField,?StructType}
import?org.apache.spark.streaming.{Seconds,?StreamingContext}
/**
*?使用Spark?Streaming+Spark?SQL來在線動(dòng)態(tài)計(jì)算電商中不同類別中最熱門的商品排名,例如手機(jī)
*?這個(gè)類別下面最熱門的三種手機(jī)、電視這個(gè)類別下最熱門的三種電視,該實(shí)例在實(shí)際生產(chǎn)環(huán)境下
*?具有非常重大的意義;
*
*?@author?DT大數(shù)據(jù)夢(mèng)工廠
*?新浪微博:http://weibo.com/ilovepains/
*
*?實(shí)現(xiàn)技術(shù):Spark?Streaming+Spark?SQL,之所以Spark?Streaming能夠使用ML、sql、graphx等
*?功能是因?yàn)橛衒oreachRDD和Transform等接口,這些接口中其實(shí)是基于RDD進(jìn)行操作,所以以RDD為
*?基石,就可以直接使用Spark其它所有的功能,就像直接調(diào)用API一樣簡(jiǎn)單。
*?假設(shè)說這里的數(shù)據(jù)的格式:user?item?category,例如Rocky?Samsung?Android
*/
object?OnlineTheTop3ItemForEachCategory2DB?{
def?main(args:?Array[String]){
/**
*?第1步:創(chuàng)建Spark的配置對(duì)象SparkConf,設(shè)置Spark程序的運(yùn)行時(shí)的配置信息,
*?例如說通過setMaster來設(shè)置程序要鏈接的Spark集群的Master的URL,如果設(shè)置
*?為local,則代表Spark程序在本地運(yùn)行,特別適合于機(jī)器配置條件非常差(例如
*?只有1G的內(nèi)存)的初學(xué)者???????*
*/
val?conf?=?new?SparkConf()?//創(chuàng)建SparkConf對(duì)象
//設(shè)置應(yīng)用程序的名稱,在程序運(yùn)行的監(jiān)控界面可以看到名稱
conf.setAppName("OnlineTheTop3ItemForEachCategory2DB")
conf.setMaster("spark://Master:7077")?//此時(shí),程序在Spark集群
//設(shè)置batchDuration時(shí)間間隔來控制Job生成的頻率并且創(chuàng)建Spark?Streaming執(zhí)行的入口
val?ssc?=?new?StreamingContext(conf,?Seconds(5))
ssc.checkpoint("/root/Documents/SparkApps/checkpoint")
val?userClickLogsDStream?=?ssc.socketTextStream("Master",?9999)
val?formattedUserClickLogsDStream?=?userClickLogsDStream.map(clickLog?=>
(clickLog.split("?")(2)?+?"_"?+?clickLog.split("?")(1),?1))
val?categoryUserClickLogsDStream?=?formattedUserClickLogsDStream.reduceByKeyAndWindow(_+_,
_-_,?Seconds(60),?Seconds(20))
categoryUserClickLogsDStream.foreachRDD?{?rdd?=>?{
if?(rdd.isEmpty())?{
println("No?data?inputted!!!")
}?else?{
val?categoryItemRow?=?rdd.map(reducedItem?=>?{
val?category?=?reducedItem._1.split("_")(0)
val?item?=?reducedItem._1.split("_")(1)
val?click_count?=?reducedItem._2
Row(category,?item,?click_count)
})
val?structType?=?StructType(Array(
StructField("category",?StringType,?true),
StructField("item",?StringType,?true),
StructField("click_count",?IntegerType,?true)
))
val?hiveContext?=?new?HiveContext(rdd.context)
val?categoryItemDF?=?hiveContext.createDataFrame(categoryItemRow,?structType)
categoryItemDF.registerTempTable("categoryItemTable")
val?reseltDataFram?=?hiveContext.sql("SELECT?category,item,click_count?FROM" +
" (SELECT?category,item,click_count,row_number()"?+
"?OVER?(PARTITION?BY?category?ORDER?BY?click_count?DESC)?rank"?+
"FROM?categoryItemTable)subquery?WHERE?rank?<=?3")
reseltDataFram.show()
val?resultRowRDD?=?reseltDataFram.rdd
resultRowRDD.foreachPartition?{?partitionOfRecords?=>?{
if?(partitionOfRecords.isEmpty){
println("This?RDD?is?not?null?but?partition?is?null")
}?else?{
//?ConnectionPool?is?a?static,?lazily?initialized?pool?of?connections
val?connection?=?ConnectionPool.getConnection()
partitionOfRecords.foreach(record?=>?{
val?sql?=?"insert?into?categorytop3(category,item,client_count)?"?+
values('"?+?record.getAs("category")?+?"','"?+
record.getAs("item")?+?"',"?+?record.getAs("click_count")?+?")"
val?stmt?=?connection.createStatement();
stmt.executeUpdate(sql);
})
ConnectionPool.returnConnection(connection)?//?return?to?the?pool?for?future?reuse
}
}
}
}
}
}
/**
*?在StreamingContext調(diào)用start方法的內(nèi)部其實(shí)是會(huì)啟動(dòng)JobScheduler的Start方法,進(jìn)行消息循環(huán),
*?在JobScheduler的start內(nèi)部會(huì)構(gòu)造JobGenerator和ReceiverTacker,并且調(diào)用JobGenerator和
*?ReceiverTacker的start方法:
* 1,JobGenerator啟動(dòng)后會(huì)不斷的根據(jù)batchDuration生成一個(gè)個(gè)的Job
* 2,ReceiverTracker啟動(dòng)后首先在Spark?Cluster中啟動(dòng)Receiver(其實(shí)是在Executor中先啟動(dòng)
* ReceiverSupervisor),在Receiver收到數(shù)據(jù)后會(huì)通過ReceiverSupervisor存儲(chǔ)到Executor并且
* 把數(shù)據(jù)的Metadata信息發(fā)送給Driver中的ReceiverTracker,在ReceiverTracker內(nèi)部會(huì)通過
* ReceivedBlockTracker來管理接受到的元數(shù)據(jù)信息
* 每個(gè)BatchInterval會(huì)產(chǎn)生一個(gè)具體的Job,其實(shí)這里的Job不是Spark?Core中所指的Job,它只是基于
* DStreamGraph而生成的RDD的DAG而已,從Java角度講,相當(dāng)于Runnable接口實(shí)例,此時(shí)要想運(yùn)行Job
* 需要提交給JobScheduler,在JobScheduler中通過線程池的方式找到一個(gè)單獨(dú)的線程來提交Job到集群
* 運(yùn)行(其實(shí)是在線程中基于RDD的Action觸發(fā)真正的作業(yè)的運(yùn)行),為什么使用線程池呢?
* 1,作業(yè)不斷生成,所以為了提升效率,我們需要線程池;這和在Executor中通過線程池執(zhí)行Task
* 有異曲同工之妙;
* 2,有可能設(shè)置了Job的FAIR公平調(diào)度的方式,這個(gè)時(shí)候也需要多線程的支持;
*/
ssc.start()
ssc.awaitTermination()
}
}
2 基于案例貫通Spark Streaming的運(yùn)行源碼
我們將基于以上案例,粗略地分析一下Spark源碼,提示一些有針對(duì)性的內(nèi)容,以了解其運(yùn)行的主要流程。
代碼沒有直接使用SparkContext,而是使用StreamingContext。
我們來看看StreamingContext的源碼片段:
/**
*?Create?a?StreamingContext?by?providing?the?configuration?necessary?for?a?new?SparkContext.
*?@param?conf?a?org.apache.spark.SparkConf?object?specifying?Spark?parameters
*?@param?batchDuration?the?time?interval?at?which?streaming?data?will?be?divided?into?batches
*/
def?this(conf:?SparkConf,?batchDuration:?Duration)?=?{
this(StreamingContext.createNewSparkContext(conf),?null,?batchDuration)
}
沒錯(cuò),createNewSparkContext就是創(chuàng)建SparkContext:
private[streaming]?def?createNewSparkContext(conf:?SparkConf):?SparkContext?=?{
new?SparkContext(conf)
}
這說明Spark Streaming也是Spark上的一個(gè)應(yīng)用程序。
案例最開始,肯定要通過數(shù)據(jù)流創(chuàng)建一個(gè)InputDStream。
val?userClickLogsDStream?=?ssc.socketTextStream("Master",?9999)
socketTextStream方法定義如下:
/**
*?Create?a?input?stream?from?TCP?source?hostname:port.?Data?is?received?using
*?a?TCP?socket?and?the?receive?bytes?is?interpreted?as?UTF8?encoded?`\n`?delimited
*?lines.
*?@param?hostname??????Hostname?to?connect?to?for?receiving?data
*?@param?port??????????Port?to?connect?to?for?receiving?data
*?@param?storageLevel??Storage?level?to?use?for?storing?the?received?objects
*??????????????????????(default:?StorageLevel.MEMORY_AND_DISK_SER_2)
*/
def?socketTextStream(
hostname:?String,
port:?Int,
storageLevel:?StorageLevel?=?StorageLevel.MEMORY_AND_DISK_SER_2
):?ReceiverInputDStream[String]?=?withNamedScope("socket?text?stream")?{
socketStream[String](hostname,?port,?SocketReceiver.bytesToLines,?storageLevel)
}
可看到代碼最后面調(diào)用socketStream。
socketStream定義如下:
/**
*?Create?a?input?stream?from?TCP?source?hostname:port.?Data?is?received?using
*?a?TCP?socket?and?the?receive?bytes?it?interepreted?as?object?using?the?given
*?converter.
*?@param?hostname??????Hostname?to?connect?to?for?receiving?data
*?@param?port??????????Port?to?connect?to?for?receiving?data
*?@param?converter?????Function?to?convert?the?byte?stream?to?objects
*?@param?storageLevel??Storage?level?to?use?for?storing?the?received?objects
*?@tparam?T????????????Type?of?the?objects?received?(after?converting?bytes?to?objects)
*/
def?socketStream[T:?ClassTag](
hostname:?String,
port:?Int,
converter:?(InputStream)?=>?Iterator[T],
storageLevel:?StorageLevel
):?ReceiverInputDStream[T]?=?{
new?SocketInputDStream[T](this,?hostname,?port,?converter,?storageLevel)
}
實(shí)際上生成SocketInputDStream。
SocketInputDStream類如下:
private[streaming]
class?SocketInputDStream[T:?ClassTag](
ssc_?:?StreamingContext,
host:?String,
port:?Int,
bytesToObjects:?InputStream?=>?Iterator[T],
storageLevel:?StorageLevel
)?extendsReceiverInputDStream[T](ssc_)?{
def?getReceiver():?Receiver[T]?=?{
new?SocketReceiver(host,?port,?bytesToObjects,?storageLevel)
}
}
SocketInputDStream繼承ReceiverInputDStream。
其中實(shí)現(xiàn)getReceiver方法,返回SocketReceiver對(duì)象。
總結(jié)一下SocketInputDStream的繼承關(guān)系:
SocketInputDStream -> ReceiverInputDStream -> InputDStream -> DStream。
DStream是生成RDD的模板,是邏輯級(jí)別,當(dāng)達(dá)到Interval的時(shí)候這些模板會(huì)被BatchData實(shí)例化成為RDD和DAG。
看看DStream的源碼片段:
//?RDDs?generated,?marked?as?private[streaming]?so?that?testsuites?can?access?it
@transient
private[streaming]?var?generatedRDDs?=new?HashMap[Time,?RDD[T]]?()
看看DStream的getOrCompute:
/**
*?Get?the?RDD?corresponding?to?the?given?time;?either?retrieve?it?from?cache
*?or?compute-and-cache?it.
*/
private[streaming]?final?def?getOrCompute(time:?Time):?Option[RDD[T]]?=?{
//?If?RDD?was?already?generated,?then?retrieve?it?from?HashMap,
//?or?else?compute?the?RDD
generatedRDDs.get(time).orElse?{
//?Compute?the?RDD?if?time?is?valid?(e.g.?correct?time?in?a?sliding?window)
//?of?RDD?generation,?else?generate?nothing.
if?(isTimeValid(time))?{
val?rddOption?=?createRDDWithLocalProperties(time,?displayInnerRDDOps?=?false)?{
//?Disable?checks?for?existing?output?directories?in?jobs?launched?by?the?streaming
//?scheduler,?since?we?may?need?to?write?output?to?an?existing?directory?during?checkpoint
//?recovery;?see?SPARK-4835?for?more?details.?We?need?to?have?this?call?here?because
//?compute()?might?cause?Spark?jobs?to?be?launched.
PairRDDFunctions.disableOutputSpecValidation.withValue(true)?{
compute(time)
}
}
rddOption.foreach?{?case?newRDD?=>
//?Register?the?generated?RDD?for?caching?and?checkpointing
if?(storageLevel?!=?StorageLevel.NONE)?{
newRDD.persist(storageLevel)
logDebug(s"Persisting?RDD?${newRDD.id}?for?time?$time?to?$storageLevel")
}
if?(checkpointDuration?!=?null?&&?(time?-?zeroTime).isMultipleOf(checkpointDuration))?{
newRDD.checkpoint()
logInfo(s"Marking?RDD?${newRDD.id}?for?time?$time?for?checkpointing")
}
generatedRDDs.put(time,?newRDD)
}
rddOption
}?else?{
None
}
}
}
主要是生成RDD,再將生成的RDD放在HashMap中。具體生成RDD過程以后剖析。
目前大致講了DStream和RDD這些核心概念在Spark Streaming中的使用。
體現(xiàn)Spark Streaming應(yīng)用運(yùn)行流程的關(guān)鍵類如下圖所示。
先看看ScreamingContext的start()。start()方法啟動(dòng)StreamContext,由于Spark應(yīng)用程序不能有多個(gè)SparkContext對(duì)象實(shí)例,所以Spark?Streaming框架在啟動(dòng)時(shí)對(duì)狀態(tài)進(jìn)行判斷。代碼如下:
/**
*?Start?the?execution?of?the?streams.
*
*?@throws?IllegalStateException?if?the?StreamingContext?is?already?stopped.
*/
def?start():?Unit?=?synchronized?{
state?match?{
case?INITIALIZED?=>
startSite.set(DStream.getCreationSite())
StreamingContext.ACTIVATION_LOCK.synchronized?{
StreamingContext.assertNoOtherContextIsActive()
try?{
validate()
//?Start?the?streaming?scheduler?in?a?new?thread,?so?that?thread?local?properties
//?like?call?sites?and?job?groups?can?be?reset?without?affecting?those?of?the
//?current?thread.
ThreadUtils.runInNewThread("streaming-start")?{
sparkContext.setCallSite(startSite.get)
sparkContext.clearJobGroup()
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL,?"false")
//啟動(dòng)JobScheduler
scheduler.start()
}
state?=?StreamingContextState.ACTIVE
}?catch?{
case?NonFatal(e)?=>
logError("Error?starting?the?context,?marking?it?as?stopped",?e)
scheduler.stop(false)
state?=?StreamingContextState.STOPPED
throw?e
}
StreamingContext.setActiveContext(this)
}
shutdownHookRef?=?ShutdownHookManager.addShutdownHook(
StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
//?Registering?Streaming?Metrics?at?the?start?of?the?StreamingContext
assert(env.metricsSystem?!=?null)
env.metricsSystem.registerSource(streamingSource)
uiTab.foreach(_.attach())
logInfo("StreamingContext?started")
case?ACTIVE?=>
logWarning("StreamingContext?has?already?been?started")
case?STOPPED?=>
throw?new?IllegalStateException("StreamingContext?has?already?been?stopped")
}
}
初始狀態(tài)時(shí),會(huì)啟動(dòng)JobScheduler。
來看下JobScheduler的啟動(dòng)過程start()。其中啟動(dòng)了EventLoop、StreamListenerBus、ReceiverTracker和jobGenerator等多項(xiàng)工作。
def?start():?Unit?=?synchronized?{
if?(eventLoop?!=?null)?return?//?scheduler?has?already?been?started
logDebug("Starting?JobScheduler")
eventLoop?=?new?EventLoop[JobSchedulerEvent]("JobScheduler")?{
override?protected?defonReceive(event:?JobSchedulerEvent):?Unit?=processEvent(event)
override?protected?defonError(e:?Throwable):?Unit?=reportError("Error?in?job?scheduler",?e)
}
// 啟動(dòng)消息循環(huán)處理線程。用于處理JobScheduler的各種事件。
eventLoop.start()
//?attach?rate?controllers?of?input?streams?to?receive?batch?completion?updates
for?{
inputDStream?<-?ssc.graph.getInputStreams
rateController?<-?inputDStream.rateController
}?ssc.addStreamingListener(rateController)
// 啟動(dòng)監(jiān)聽器。用于更新Spark UI中StreamTab的內(nèi)容。
listenerBus.start(ssc.sparkContext)
receiverTracker?=?new?ReceiverTracker(ssc)
// 生成InputInfoTracker。用于管理所有的輸入的流,以及他們輸入的數(shù)據(jù)統(tǒng)計(jì)。這些信息將通過?StreamingListener監(jiān)聽。
inputInfoTracker?=?new?InputInfoTracker(ssc)
// 啟動(dòng)ReceiverTracker。用于處理數(shù)據(jù)接收、數(shù)據(jù)緩存、Block生成。
receiverTracker.start()
// 啟動(dòng)JobGenerator。用于DStreamGraph初始化、DStream與RDD的轉(zhuǎn)換、生成Job、提交執(zhí)行等工作。
jobGenerator.start()
logInfo("Started?JobScheduler")
}
JobScheduler中的消息處理函數(shù)processEvent,處理三類消息:Job已開始,Job已完成,錯(cuò)誤報(bào)告。
private?def?processEvent(event:?JobSchedulerEvent)?{
try?{
event?match?{
caseJobStarted(job,?startTime)?=>?handleJobStart(job,?startTime)
caseJobCompleted(job,?completedTime)?=>?handleJobCompletion(job,?completedTime)
caseErrorReported(m,?e)?=>?handleError(m,?e)
}
}?catch?{
case?e:?Throwable?=>
reportError("Error?in?job?scheduler",?e)
}
}
我們?cè)俅致缘胤治鲆幌翵obScheduler.start()中啟動(dòng)的工作。
先看JobScheduler.start()啟動(dòng)的第一項(xiàng)工作EventLoop。EventLoop用于處理JobScheduler的各種事件。
EventLoop中有事件隊(duì)列:
private?valeventQueue:?BlockingQueue[E]?=?new?LinkedBlockingDeque[E]()
還有一個(gè)線程處理隊(duì)列中的事件:
private?val?eventThread?=?new?Thread(name)?{
setDaemon(true)
override?def?run():?Unit?=?{
try?{
while?(!stopped.get)?{
val?event?=?eventQueue.take()
try?{
onReceive(event)
}?catch?{
case?NonFatal(e)?=>?{
try?{
onError(e)
}?catch?{
case?NonFatal(e)?=>?logError("Unexpected?error?in?"?+?name,?e)
}
}
}
}
}?catch?{
case?ie:?InterruptedException?=>?//?exit?even?if?eventQueue?is?not?empty
case?NonFatal(e)?=>?logError("Unexpected?error?in?"?+?name,?e)
}
}
}
這個(gè)線程中的onReceive、onError,在JobScheduler中的EventLoop實(shí)例化時(shí)已定義。
JobScheduler.start()啟動(dòng)的第二項(xiàng)工作StreamListenerBus。用于異步傳遞StreamingListenerEvents到注冊(cè)的StreamingListeners。用于更新Spark UI中StreamTab的內(nèi)容。
以下代碼用于傳遞各種事件:
override?def?onPostEvent(listener:?StreamingListener,?event:?StreamingListenerEvent):?Unit?=?{
event?match?{
casereceiverStarted:?StreamingListenerReceiverStarted?=>
listener.onReceiverStarted(receiverStarted)
casereceiverError:?StreamingListenerReceiverError?=>
listener.onReceiverError(receiverError)
casereceiverStopped:?StreamingListenerReceiverStopped?=>
listener.onReceiverStopped(receiverStopped)
casebatchSubmitted:?StreamingListenerBatchSubmitted?=>
listener.onBatchSubmitted(batchSubmitted)
casebatchStarted:?StreamingListenerBatchStarted?=>
listener.onBatchStarted(batchStarted)
casebatchCompleted:?StreamingListenerBatchCompleted?=>
listener.onBatchCompleted(batchCompleted)
caseoutputOperationStarted:?StreamingListenerOutputOperationStarted?=>
listener.onOutputOperationStarted(outputOperationStarted)
caseoutputOperationCompleted:?StreamingListenerOutputOperationCompleted?=>
listener.onOutputOperationCompleted(outputOperationCompleted)
case?_?=>
}
}
看JobScheduler.start()啟動(dòng)的第三項(xiàng)工作ReceiverTracker。
ReceiverTracker用于管理所有的輸入的流,以及他們輸入的數(shù)據(jù)統(tǒng)計(jì)。這些信息將通過?StreamingListener監(jiān)聽。
ReceiverTracker的start()中,會(huì)內(nèi)部實(shí)例化ReceiverTrackerEndpoint這個(gè)Rpc消息通信體。
def?start():?Unit?=?synchronized?{
if?(isTrackerStarted)?{
throw?new?SparkException("ReceiverTracker?already?started")
}
if?(!receiverInputStreams.isEmpty)?{
endpoint?=?ssc.env.rpcEnv.setupEndpoint(
"ReceiverTracker",new?ReceiverTrackerEndpoint(ssc.env.rpcEnv))
if?(!skipReceiverLaunch)launchReceivers()
logInfo("ReceiverTracker?started")
trackerState?=?Started
}
}
在ReceiverTracker啟動(dòng)的過程中會(huì)調(diào)用其launchReceivers方法:
/**
*?Get?the?receivers?from?the?ReceiverInputDStreams,?distributes?them?to?the
*?worker?nodes?as?a?parallel?collection,?and?runs?them.
*/
private?def?launchReceivers():?Unit?=?{
val?receivers?=?receiverInputStreams.map(nis?=>?{
val?rcvr?=?nis.getReceiver()
rcvr.setReceiverId(nis.id)
rcvr
})
runDummySparkJob()
logInfo("Starting?"?+?receivers.length?+?"?receivers")
endpoint.send(StartAllReceivers(receivers))
}
其中調(diào)用了runDummySparkJob方法來啟動(dòng)Spark?Streaming的框架第一個(gè)Job,其中collect這個(gè)action操作會(huì)觸發(fā)Spark?Job的執(zhí)行。這個(gè)方法是為了確保每個(gè)Slave都注冊(cè)上,避免所有Receiver都在一個(gè)節(jié)點(diǎn),使后面的計(jì)算能負(fù)載均衡。
/**
*?Run?the?dummy?Spark?job?to?ensure?that?all?slaves?have?registered.?This?avoids?all?the
*?receivers?to?be?scheduled?on?the?same?node.
*
*?TODO?Should?poll?the?executor?number?and?wait?for?executors?according?to
*?"spark.scheduler.minRegisteredResourcesRatio"?and
*?"spark.scheduler.maxRegisteredResourcesWaitingTime"?rather?than?running?a?dummy?job.
*/
private?def?runDummySparkJob():?Unit?=?{
if?(!ssc.sparkContext.isLocal)?{
ssc.sparkContext.makeRDD(1?to?50,?50).map(x?=>?(x,?1)).reduceByKey(_?+?_,?20).collect()
}
assert(getExecutors.nonEmpty)
}
ReceiverTracker.launchReceivers()還調(diào)用了endpoint.send(StartAllReceivers(receivers))方法,Rpc消息通信體發(fā)送StartAllReceivers消息。
ReceiverTrackerEndpoint它自己接收到消息后,先根據(jù)調(diào)度策略獲得Recevier在哪個(gè)Executor上運(yùn)行,然后在調(diào)用startReceiver(receiver,?executors)方法,來啟動(dòng)Receiver。
override?def?receive:?PartialFunction[Any,?Unit]?=?{
//?Local?messages
caseStartAllReceivers(receivers)?=>
val?scheduledLocations?=?schedulingPolicy.scheduleReceivers(receivers,?getExecutors)
for?(receiver?<-?receivers)?{
val?executors?=?scheduledLocations(receiver.streamId)
updateReceiverScheduledExecutors(receiver.streamId,?executors)
receiverPreferredLocations(receiver.streamId)?=?receiver.preferredLocation
startReceiver(receiver,?executors)
}
在startReceiver方法中,ssc.sparkContext.submitJob提交Job的時(shí)候傳入startReceiverFunc這個(gè)方法,因?yàn)閟tartReceiverFunc該方法是在Executor上執(zhí)行的。而在startReceiverFunc方法中是實(shí)例化ReceiverSupervisorImpl對(duì)象,該對(duì)象是對(duì)Receiver進(jìn)行管理和監(jiān)控。這個(gè)Job是Spark?Streaming框架為我們啟動(dòng)的第二個(gè)Job,且一直運(yùn)行。因?yàn)閟upervisor.awaitTermination()該方法會(huì)阻塞等待退出。
/**
*?Start?a?receiver?along?with?its?scheduled?executors
*/
private?def?startReceiver(
receiver:?Receiver[_],
scheduledLocations:?Seq[TaskLocation]):?Unit?=?{
def?shouldStartReceiver:?Boolean?=?{
//?It's?okay?to?start?when?trackerState?is?Initialized?or?Started
!(isTrackerStopping?||?isTrackerStopped)
}
val?receiverId?=?receiver.streamId
if?(!shouldStartReceiver)?{
onReceiverJobFinish(receiverId)
return
}
val?checkpointDirOption?=?Option(ssc.checkpointDir)
val?serializableHadoopConf?=
new?SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)
//?Function?to?start?the?receiver?on?the?worker?node
valstartReceiverFunc:?Iterator[Receiver[_]]?=>?Unit?=
(iterator:?Iterator[Receiver[_]])?=>?{
if?(!iterator.hasNext)?{
throw?new?SparkException(
"Could?not?start?receiver?as?object?not?found.")
}
if?(TaskContext.get().attemptNumber()?==?0)?{
val?receiver?=?iterator.next()
assert(iterator.hasNext?==?false)
//實(shí)例化Receiver監(jiān)控者
val?supervisor?=new?ReceiverSupervisorImpl(
receiver,?SparkEnv.get,?serializableHadoopConf.value,?checkpointDirOption)
supervisor.start()
supervisor.awaitTermination()
}?else?{
//?It's?restarted?by?TaskScheduler,?but?we?want?to?reschedule?it?again.?So?exit?it.
}
}
//?Create?the?RDD?using?the?scheduledLocations?to?run?the?receiver?in?a?Spark?job
val?receiverRDD:?RDD[Receiver[_]]?=
if?(scheduledLocations.isEmpty)?{
ssc.sc.makeRDD(Seq(receiver),?1)
}?else?{
val?preferredLocations?=?scheduledLocations.map(_.toString).distinct
ssc.sc.makeRDD(Seq(receiver?->?preferredLocations))
}
receiverRDD.setName(s"Receiver?$receiverId")
ssc.sparkContext.setJobDescription(s"Streaming?job?running?receiver?$receiverId")
ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))
val?future?=ssc.sparkContext.submitJob[Receiver[_],?Unit,?Unit](
receiverRDD,startReceiverFunc,?Seq(0),?(_,?_)?=>?Unit,?())
//?We?will?keep?restarting?the?receiver?job?until?ReceiverTracker?is?stopped
future.onComplete?{
case?Success(_)?=>
if?(!shouldStartReceiver)?{
onReceiverJobFinish(receiverId)
}?else?{
logInfo(s"Restarting?Receiver?$receiverId")
self.send(RestartReceiver(receiver))
}
case?Failure(e)?=>
if?(!shouldStartReceiver)?{
onReceiverJobFinish(receiverId)
}?else?{
logError("Receiver?has?been?stopped.?Try?to?restart?it.",?e)
logInfo(s"Restarting?Receiver?$receiverId")
self.send(RestartReceiver(receiver))
}
}(submitJobThreadPool)
logInfo(s"Receiver?${receiver.streamId}?started")
}
接下來看下ReceiverSupervisorImpl的啟動(dòng)過程,先啟動(dòng)所有注冊(cè)上的BlockGenerator對(duì)象,然后向ReceiverTrackerEndpoint發(fā)送RegisterReceiver消息,再調(diào)用receiver的onStart方法。
/**?Start?the?supervisor?*/
def?start()?{
onStart()
startReceiver()
}
其中的onStart():
override?protected?defonStart()?{
registeredBlockGenerators.foreach?{?_.start()?}
}
其中的startReceiver():
/**?Start?receiver?*/
defstartReceiver():?Unit?=?synchronized?{
try?{
if?(onReceiverStart())?{
logInfo("Starting?receiver")
receiverState?=?Started
receiver.onStart()
logInfo("Called?receiver?onStart")
}?else?{
//?The?driver?refused?us
stop("Registered?unsuccessfully?because?Driver?refused?to?start?receiver?"?+?streamId,?None)
}
}?catch?{
case?NonFatal(t)?=>
stop("Error?starting?receiver?"?+?streamId,?Some(t))
}
}
override?protected?def?onReceiverStart():?Boolean?=?{
val?msg?=?RegisterReceiver(
streamId,?receiver.getClass.getSimpleName,?host,?executorId,?endpoint)
trackerEndpoint.askWithRetry[Boolean](msg)
}
其中在Driver運(yùn)行的ReceiverTrackerEndpoint對(duì)象接收到RegisterReceiver消息后,將streamId,?typ,?host,?executorId,?receiverEndpoint封裝為ReceiverTrackingInfo保存到內(nèi)存對(duì)象receiverTrackingInfos這個(gè)HashMap中。
override?def?receiveAndReply(context:?RpcCallContext):?PartialFunction[Any,?Unit]?=?{
//?Remote?messages
caseRegisterReceiver(streamId,?typ,?host,?executorId,?receiverEndpoint)?=>
val?successful?=
registerReceiver(streamId,?typ,?host,?executorId,?receiverEndpoint,?context.senderAddress)
context.reply(successful)
case?AddBlock(receivedBlockInfo)?=>
if?(WriteAheadLogUtils.isBatchingEnabled(ssc.conf,?isDriver?=?true))?{
walBatchingThreadPool.execute(new?Runnable?{
override?def?run():?Unit?=?Utils.tryLogNonFatalError?{
if?(active)?{
context.reply(addBlock(receivedBlockInfo))
}?else?{
throw?new?IllegalStateException("ReceiverTracker?RpcEndpoint?shut?down.")
}
}
})
}?else?{
context.reply(addBlock(receivedBlockInfo))
}
/**?Register?a?receiver?*/
private?defregisterReceiver(
streamId:?Int,
typ:?String,
host:?String,
executorId:?String,
receiverEndpoint:?RpcEndpointRef,
senderAddress:?RpcAddress
):?Boolean?=?{
if?(!receiverInputStreamIds.contains(streamId))?{
throw?new?SparkException("Register?received?for?unexpected?id?"?+?streamId)
}
if?(isTrackerStopping?||?isTrackerStopped)?{
return?false
}
val?scheduledLocations?=?receiverTrackingInfos(streamId).scheduledLocations
val?acceptableExecutors?=?if?(scheduledLocations.nonEmpty)?{
//?This?receiver?is?registering?and?it's?scheduled?by
//?ReceiverSchedulingPolicy.scheduleReceivers.?So?use?"scheduledLocations"?to?check?it.
scheduledLocations.get
}?else?{
//?This?receiver?is?scheduled?by?"ReceiverSchedulingPolicy.rescheduleReceiver",?so?calling
//?"ReceiverSchedulingPolicy.rescheduleReceiver"?again?to?check?it.
scheduleReceiver(streamId)
}
def?isAcceptable:?Boolean?=?acceptableExecutors.exists?{
case?loc:?ExecutorCacheTaskLocation?=>?loc.executorId?==?executorId
case?loc:?TaskLocation?=>?loc.host?==?host
}
if?(!isAcceptable)?{
//?Refuse?it?since?it's?scheduled?to?a?wrong?executor
false
}?else?{
val?name?=?s"${typ}-${streamId}"
val?receiverTrackingInfo?=ReceiverTrackingInfo(
streamId,
ReceiverState.ACTIVE,
scheduledLocations?=?None,
runningExecutor?=?Some(ExecutorCacheTaskLocation(host,?executorId)),
name?=?Some(name),
endpoint?=?Some(receiverEndpoint))
receiverTrackingInfos.put(streamId,?receiverTrackingInfo)
listenerBus.post(StreamingListenerReceiverStarted(receiverTrackingInfo.toReceiverInfo))
logInfo("Registered?receiver?for?stream?"?+?streamId?+?"?from?"?+?senderAddress)
true
}
}
Receiver的啟動(dòng),以ssc.socketTextStream("localhost",?9999)為例,創(chuàng)建的是SocketReceiver對(duì)象。內(nèi)部啟動(dòng)一個(gè)線程來連接Socket?Server,讀取socket數(shù)據(jù)并存儲(chǔ)。
private[streaming]
class?SocketReceiver[T:?ClassTag](
host:?String,
port:?Int,
bytesToObjects:?InputStream?=>?Iterator[T],
storageLevel:?StorageLevel
)?extends?Receiver[T](storageLevel)?with?Logging?{
def?onStart()?{
//?Start?the?thread?that?receives?data?over?a?connection
new?Thread("Socket?Receiver")?{
setDaemon(true)
override?def?run()?{?receive()?}
}.start()
}
def?onStop()?{
//?There?is?nothing?much?to?do?as?the?thread?calling?receive()
//?is?designed?to?stop?by?itself?isStopped()?returns?false
}
/**?Create?a?socket?connection?and?receive?data?until?receiver?is?stopped?*/
def?receive()?{
var?socket:?Socket?=?null
try?{
logInfo("Connecting?to?"?+?host?+?":"?+?port)
socket?=?new?Socket(host,?port)
logInfo("Connected?to?"?+?host?+?":"?+?port)
val?iterator?=?bytesToObjects(socket.getInputStream())
while(!isStopped?&&?iterator.hasNext)?{
store(iterator.next)
}
if?(!isStopped())?{
restart("Socket?data?stream?had?no?more?data")
}?else?{
logInfo("Stopped?receiving")
}
}?catch?{
case?e:?java.net.ConnectException?=>
restart("Error?connecting?to?"?+?host?+?":"?+?port,?e)
case?NonFatal(e)?=>
logWarning("Error?receiving?data",?e)
restart("Error?receiving?data",?e)
}?finally?{
if?(socket?!=?null)?{
socket.close()
logInfo("Closed?socket?to?"?+?host?+?":"?+?port)
}
}
}
}
接下來看JobScheduler.start()中啟動(dòng)的第四項(xiàng)工作JobGenerator。
JobGenerator有成員RecurringTimer,用于啟動(dòng)消息系統(tǒng)和定時(shí)器。按照batchInterval時(shí)間間隔定期發(fā)送GenerateJobs消息。
//根據(jù)創(chuàng)建StreamContext時(shí)傳入的batchInterval,定時(shí)發(fā)送GenerateJobs消息
private?val?timer?=new?RecurringTimer(clock,?ssc.graph.batchDuration.milliseconds,
longTime?=>?eventLoop.post(GenerateJobs(new?Time(longTime))),?"JobGenerator")
JobGenerator的start()方法:
/**?Start?generation?of?jobs?*/
def?start():?Unit?=?synchronized?{
if?(eventLoop?!=?null)?return?//?generator?has?already?been?started
//?Call?checkpointWriter?here?to?initialize?it?before?eventLoop?uses?it?to?avoid?a?deadlock.
//?See?SPARK-10125
checkpointWriter
eventLoop?=?new?EventLoop[JobGeneratorEvent]("JobGenerator")?{
override?protected?def?onReceive(event:?JobGeneratorEvent):?Unit?=?processEvent(event)
override?protected?def?onError(e:?Throwable):?Unit?=?{
jobScheduler.reportError("Error?in?job?generator",?e)
}
}
// 啟動(dòng)消息循環(huán)處理線程
eventLoop.start()
if?(ssc.isCheckpointPresent)?{
restart()
}?else?{
// 開啟定時(shí)生成Job的定時(shí)器
startFirstTime()
}
}
JobGenerator.start()中的startFirstTime()的定義:
/**?Starts?the?generator?for?the?first?time?*/
private?def?startFirstTime()?{
val?startTime?=?new?Time(timer.getStartTime())
graph.start(startTime?-?graph.batchDuration)
timer.start(startTime.milliseconds)
logInfo("Started?JobGenerator?at?"?+?startTime)
}
JobGenerator.start()中的processEvent()的定義:
/**?Processes?all?events?*/
private?def?processEvent(event:?JobGeneratorEvent)?{
logDebug("Got?event?"?+?event)
event?match?{
caseGenerateJobs(time)?=>generateJobs(time)
case?ClearMetadata(time)?=>?clearMetadata(time)
case?DoCheckpoint(time,?clearCheckpointDataLater)?=>
doCheckpoint(time,?clearCheckpointDataLater)
case?ClearCheckpointData(time)?=>?clearCheckpointData(time)
}
}
其中g(shù)enerateJobs的定義:
/**?Generate?jobs?and?perform?checkpoint?for?the?given?`time`.??*/
private?def?generateJobs(time:?Time)?{
//?Set?the?SparkEnv?in?this?thread,?so?that?job?generation?code?can?access?the?environment
//?Example:?BlockRDDs?are?created?in?this?thread,?and?it?needs?to?access?BlockManager
//?Update:?This?is?probably?redundant?after?threadlocal?stuff?in?SparkEnv?has?been?removed.
SparkEnv.set(ssc.env)
Try?{
// 根據(jù)特定的時(shí)間獲取具體的數(shù)據(jù)
jobScheduler.receiverTracker.allocateBlocksToBatch(time)?//?allocate?received?blocks?to?batch
//調(diào)用DStreamGraph的generateJobs生成Job
graph.generateJobs(time)?//?generate?jobs?using?allocated?block
}?match?{
case?Success(jobs)?=>
val?streamIdToInputInfos?=?jobScheduler.inputInfoTracker.getInfo(time)
jobScheduler.submitJobSet(JobSet(time,?jobs,?streamIdToInputInfos))
case?Failure(e)?=>
jobScheduler.reportError("Error?generating?jobs?for?time?"?+?time,?e)
}
eventLoop.post(DoCheckpoint(time,?clearCheckpointDataLater?=?false))
}
/**?Perform?checkpoint?for?the?give?`time`.?*/
private?def?doCheckpoint(time:?Time,?clearCheckpointDataLater:?Boolean)?{
if?(shouldCheckpoint?&&?(time?-?graph.zeroTime).isMultipleOf(ssc.checkpointDuration))?{
logInfo("Checkpointing?graph?for?time?"?+?time)
ssc.graph.updateCheckpointData(time)
checkpointWriter.write(new?Checkpoint(ssc,?time),?clearCheckpointDataLater)
}
}
DStreamGraph的generateJobs方法,調(diào)用輸出流的generateJob方法來生成Jobs集合。
// 輸出流:具體Action的輸出操作
private?val?outputStreams?=?new?ArrayBuffer[DStream[_]]()
def?generateJobs(time:?Time):?Seq[Job]?=?{
logDebug("Generating?jobs?for?time?"?+?time)
val?jobs?=?this.synchronized?{
outputStreams.flatMap?{?outputStream?=>
val?jobOption?=?outputStream.generateJob(time)
jobOption.foreach(_.setCallSite(outputStream.creationSite))
jobOption
}
}
logDebug("Generated?"?+?jobs.length?+?"?jobs?for?time?"?+?time)
jobs
}
來看下DStream的generateJob方法,調(diào)用getOrCompute方法來獲取當(dāng)Interval的時(shí)候,DStreamGraph會(huì)被BatchData實(shí)例化成為RDD,如果有RDD則封裝jobFunc方法,里面包含context.sparkContext.runJob(rdd,?emptyFunc),然后返回封裝后的Job。
/**
*?Generate?a?SparkStreaming?job?for?the?given?time.?This?is?an?internal?method?that
*?should?not?be?called?directly.?This?default?implementation?creates?a?job
*?that?materializes?the?corresponding?RDD.?Subclasses?of?DStream?may?override?this
*?to?generate?their?own?jobs.
*/
private[streaming]?def?generateJob(time:?Time):?Option[Job]?=?{
getOrCompute(time)?match?{
case?Some(rdd)?=>?{
val?jobFunc?=?()?=>?{
val?emptyFunc?=?{?(iterator:?Iterator[T])?=>?{}?}
context.sparkContext.runJob(rdd,?emptyFunc)
}
Some(new?Job(time,jobFunc))
}
case?None?=>?None
}
}
接下來看JobScheduler的submitJobSet方法,向線程池中提交JobHandler。而JobHandler實(shí)現(xiàn)了Runnable?接口,最終調(diào)用了job.run()這個(gè)方法。看一下Job類的定義,其中run方法調(diào)用的func為構(gòu)造Job時(shí)傳入的jobFunc,其包含了context.sparkContext.runJob(rdd,?emptyFunc)操作,最終導(dǎo)致Job的提交。
def?submitJobSet(jobSet:?JobSet)?{
if?(jobSet.jobs.isEmpty)?{
logInfo("No?jobs?added?for?time?"?+?jobSet.time)
}?else?{
listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
jobSets.put(jobSet.time,?jobSet)
jobSet.jobs.foreach(job?=>?jobExecutor.execute(new?JobHandler(job)))
logInfo("Added?jobs?for?time?"?+?jobSet.time)
}
}
private?class?JobHandler(job:?Job)?extends?Runnable?with?Logging?{
import?JobScheduler._
def?run()?{
try?{
val?formattedTime?=?UIUtils.formatBatchTime(
job.time.milliseconds,?ssc.graph.batchDuration.milliseconds,?showYYYYMMSS?=?false)
val?batchUrl?=?s"/streaming/batch/?id=${job.time.milliseconds}"
val?batchLinkText?=?s"[output?operation?${job.outputOpId},?batch?time?${formattedTime}]"
ssc.sc.setJobDescription(
s"""Streaming?job?from?$batchLinkText""")
ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY,?job.time.milliseconds.toString)
ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY,?job.outputOpId.toString)
//?We?need?to?assign?`eventLoop`?to?a?temp?variable.?Otherwise,?because
//?`JobScheduler.stop(false)`?may?set?`eventLoop`?to?null?when?this?method?is?running,?then
//?it's?possible?that?when?`post`?is?called,?`eventLoop`?happens?to?null.
var?_eventLoop?=?eventLoop
if?(_eventLoop?!=?null)?{
_eventLoop.post(JobStarted(job,?clock.getTimeMillis()))
//?Disable?checks?for?existing?output?directories?in?jobs?launched?by?the?streaming
//?scheduler,?since?we?may?need?to?write?output?to?an?existing?directory?during?checkpoint
//?recovery;?see?SPARK-4835?for?more?details.
PairRDDFunctions.disableOutputSpecValidation.withValue(true)?{
job.run()
}
_eventLoop?=?eventLoop
if?(_eventLoop?!=?null)?{
_eventLoop.post(JobCompleted(job,?clock.getTimeMillis()))
}
}?else?{
//?JobScheduler?has?been?stopped.
}
}?finally?{
ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY,?null)
ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY,?null)
}
}
}
}
Job的代碼片段:
private[streaming]
class?Job(val?time:?Time,?func:?()?=>?_)?{
private?var?_id:?String?=?_
private?var?_outputOpId:?Int?=?_
private?var?isSet?=?false
private?var?_result:?Try[_]?=?null
private?var?_callSite:?CallSite?=?null
private?var?_startTime:?Option[Long]?=?None
private?var?_endTime:?Option[Long]?=?None
defrun()?{
_result?=?Try(func())
}
備注:
資料來源于:DT_大數(shù)據(jù)夢(mèng)工廠(Spark發(fā)行版本定制)
更多私密內(nèi)容,請(qǐng)關(guān)注微信公眾號(hào):DT_Spark
如果您對(duì)大數(shù)據(jù)Spark感興趣,可以免費(fèi)聽由王家林老師每天晚上20:00開設(shè)的Spark永久免費(fèi)公開課,地址YY房間號(hào):68917580