本文基于spark 2.11
1. 前言
spark使用RDD來抽象的表示數(shù)據(jù),用戶使用RDD提供的一些算子編寫自己的spark application,使用RDD抽象表示數(shù)據(jù)要求對于輸入數(shù)據(jù)是靜態(tài)的,但是在流式數(shù)據(jù)處理中數(shù)據(jù)如同流水一樣不停的在管道中產(chǎn)生,這不符合RDD的要求。Spark Streaming的處理方式是,從輸入流中讀區(qū)數(shù)據(jù),將數(shù)據(jù)作為一個個batch保存起來,這樣就有了靜態(tài)的數(shù)據(jù),就可以用RDD來表示這些數(shù)據(jù),然后就可以基于RDD 創(chuàng)建任務(wù)了。
2. 基本原理
下面是一個從kafka讀取數(shù)據(jù)處理的代碼:
import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setMaster("local[2]").setAppName("KafkaWordCount")
// batchDuration 設(shè)置為 1 秒,然后創(chuàng)建一個 streaming 入口
// 每1秒依據(jù)RDD中創(chuàng)建一次job,輸入RDD就從已經(jīng)已經(jīng)收集的batch中取。
val ssc = new StreamingContext(conf, Seconds(1))
val kafkaParams: Map[String, String] = Map("group.id" -> "test",...)
val topics = Map("test" -> 1)
val lines = KafkaUtils.createStream(
ssc,
kafkaParams,
topics, StorageLevel.MEMORY_AND_DISK)
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
wordCounts.foreachRDD(...)
ssc.start()
ssc.awaitTermination()
和之前基于RDD的的wordcount程序不同:
- KafkaUtils.createStream(...)創(chuàng)建出來的不是RDD,和是一個DStream的類,
- DStream同樣存在map、flatMap、reduceByKey這樣的轉(zhuǎn)換操作,但是它是從DStream到DStream的轉(zhuǎn)換。
- print在RDD里表示一種action,會觸發(fā)job的創(chuàng)建和提交,但是DStream的action操作不會,它的處理方式不同,后續(xù)會介紹。
- ssc.start會啟動一下組建:
- JobScheduler, 調(diào)度和追蹤job
- JobGenerator,由JobScheduler啟動,定時(初始化StreamingContext指定的時間)從DStream創(chuàng)建出job
- ReceiverTracker, 運(yùn)行在Driver上,收集 從各個receiver上報的流數(shù)據(jù)batch信息
- ReceiverSupervisor,由ReceiverTracker運(yùn)行發(fā)送消息使其在executor上運(yùn)行,接收Receiver匯報的batch數(shù)據(jù),然后將數(shù)據(jù)信息匯報給ReceiverTracker。
- Receiver, 運(yùn)行在executor上,由ReceiverSupervisor啟動,負(fù)責(zé)著流中讀區(qū)數(shù)據(jù),分成batch,匯報給ReceiverSupervisor。
從上介紹可以看出,job是在ssc.start過程中創(chuàng)建的,而且在運(yùn)行期間會根據(jù)用戶設(shè)置的duration不斷的創(chuàng)建。
下圖表示了使用kafka作為輸入源時的streaming工作期間的流程:

- Receiver1從kafka中讀入數(shù)據(jù),將數(shù)據(jù)轉(zhuǎn)發(fā)給ReceiverSupervisor
- ReceiverSupervisor,使用BlockManager存儲并管理數(shù)據(jù)信息。
- ReceiverSupervisor,將數(shù)據(jù)信息發(fā)送給運(yùn)行在Driver上的ReceiverTracker。
- JobGenerator,假設(shè)上面wordCount代碼中DStream之間的轉(zhuǎn)換看作一張DAG,DStreamGraph保存了所有的DAG。JobGenerate每隔一段時間從DStreamGraph中的DStream DAG生成RDD的DAG,然后提交RDD的job。RDD的數(shù)據(jù)則來自Generator根據(jù)ReceiverTracker中收集的batch數(shù)據(jù)信息。
2.1 DStream 到RDD的轉(zhuǎn)換
由于基于RDD計算的基于靜態(tài)的數(shù)據(jù),而數(shù)據(jù)是不斷產(chǎn)生的,spark streaming將輸入數(shù)據(jù)切成一個個batch,因此需要不斷的產(chǎn)生job去計算batch中的數(shù)據(jù)。
上面wordCount程序,描述了DStream之間的轉(zhuǎn)換,看起來幾乎和RDD之間的轉(zhuǎn)換是一樣的,JobGenerator運(yùn)行期間根據(jù)DStream不停創(chuàng)建RDD,再由RDD生成job 經(jīng)SparkContext提交運(yùn)行。DStream相當(dāng)于模板,RDD相當(dāng)于使用模板創(chuàng)造出的零件,而JobGenerator則相當(dāng)于操作模板的工人了。
下圖描述了DStream和RDD在運(yùn)行期間的關(guān)系:

可以看到DStream的子類都有一個RDD的對應(yīng)類,一句DStream生成的RDD DAG和DStream擁有一樣的轉(zhuǎn)換和依賴。采集輸入流中的一段數(shù)據(jù)作為RDD的源數(shù)據(jù)。
RDD#compute方法中完成輸入數(shù)據(jù)的計算,DStream也存在compute方法,但是其compute方法這是完成DStream到RDD的轉(zhuǎn)換。
2.2 ReceiverInputDStream
所有繼承DStream的類中,ReceiverInputDStream除了像其他DStream一樣創(chuàng)建出RDD以外,還需要返回一個Receiver負(fù)責(zé)接收收據(jù),例如ReceiverInputDStream的子類SocketInputDStream就能返回一個SocketReceiver的Receiver的實現(xiàn)類。
ReceiverInputDStream一般都是一個DStream DAG的源頭。
當(dāng)ReceiverTracker調(diào)用start啟動時,它會從DStreamGraph持有的DStream DAG中獲得所有的ReceiverInputDStream,然后取得Receiver,通過巧妙的方式將Reciver包裝成Task,然后發(fā)送到executor上執(zhí)行,然后在receiver端,Receiver和ReceiverSupervisor啟動接收數(shù)據(jù)。
在SparkStreaming(3) ReceiverTracker和Receiver中,啟動receiver時,receiver就是按上面方式獲得的。
2.3 output 操作
DStream和RDD有著類似的操作,map這種使得RDD轉(zhuǎn)換成新的RDD的操作稱為Transformation,foreach這種觸發(fā)job的創(chuàng)建和提交的操作稱為Action, DStream類似,Dstream到DStream的稱為Transformation, DStream的output操作有點類似rdd中的action操作,一個action意味著一個新的job被創(chuàng)建提交。DStream的output操作意味著一個DStream DAG模板的創(chuàng)建,也意味著到此處DStream轉(zhuǎn)換成RDD應(yīng)該觸發(fā)job,DStream常見的output操作有:
- saveAsTextFiles
- saveAsObjectFiles
- foreachRDD
等
3 DStreamGraph
DStreamGraph用來保存所有output操作生成DStream DAG。
比如下面是DStream#foreachRDD的代碼:
private def foreachRDD(
foreachFunc: (RDD[T], Time) => Unit,
displayInnerRDDOps: Boolean): Unit = {
// 調(diào)用了DStream#register()方法
new ForEachDStream(this,
context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
}
//register方法向DStreamGraph注冊當(dāng)前DStream
// 由于DStream保存了所有的父依賴,因此注冊當(dāng)前DStream
// 就能追溯出整個DStream DAG,相當(dāng)于注冊了DStream DAG
private[streaming] def register(): DStream[T] = {
ssc.graph.addOutputStream(this)
this
}
上面說DStream的output操作相當(dāng)于觸發(fā)一個DStream DAG模板的創(chuàng)建,而一個模板對應(yīng)一種job。 第一節(jié)wordcount代碼中分別有print和foreachRDD兩個output操作,因此DStreamGraph可以理解持有兩個DStream DAG,如下圖:

盡管創(chuàng)建出來的DStream DAG是一樣的,但是依然會創(chuàng)建出兩份RDD DAG,生成兩類job,
DStreamGraph還肩負(fù)著從根據(jù)注冊的DStreamDAG創(chuàng)建job的任務(wù),后續(xù)JobGenerator就是調(diào)用DStreamGraph完成創(chuàng)建job。下面是DstreamGraph創(chuàng)建job的方法:
def generateJobs(time: Time): Seq[Job] = {
logDebug("Generating jobs for time " + time)
val jobs = this.synchronized {
// 對每一個因output操作而注冊的DStream DAG生成job
outputStreams.flatMap { outputStream =>
val jobOption = outputStream.generateJob(time)
jobOption.foreach(_.setCallSite(outputStream.creationSite))
jobOption
}
}
logDebug("Generated " + jobs.length + " jobs for time " + time)
jobs
}
這是DStream#generateJob方法,time表示每一次生成job的時間
private[streaming] def generateJob(time: Time): Option[Job] = {
// getOrCompute將DStream轉(zhuǎn)換成RDD,轉(zhuǎn)換操作是從當(dāng)前
// DStream往上游追溯,追溯到源頭后在一次往下生成RDD的過程
// 是一次DFS的過程。
getOrCompute(time) match {
case Some(rdd) =>
val jobFunc = () => {
val emptyFunc = { (iterator: Iterator[T]) => {} }
// 創(chuàng)建到RDD后提交job
context.sparkContext.runJob(rdd, emptyFunc)
}
Some(new Job(time, jobFunc))
case None => None
}
}