Spark Streaming(1) - 基本原理

本文基于spark 2.11

1. 前言

spark使用RDD來抽象的表示數(shù)據(jù),用戶使用RDD提供的一些算子編寫自己的spark application,使用RDD抽象表示數(shù)據(jù)要求對于輸入數(shù)據(jù)是靜態(tài)的,但是在流式數(shù)據(jù)處理中數(shù)據(jù)如同流水一樣不停的在管道中產(chǎn)生,這不符合RDD的要求。Spark Streaming的處理方式是,從輸入流中讀區(qū)數(shù)據(jù),將數(shù)據(jù)作為一個個batch保存起來,這樣就有了靜態(tài)的數(shù)據(jù),就可以用RDD來表示這些數(shù)據(jù),然后就可以基于RDD 創(chuàng)建任務(wù)了。

2. 基本原理

下面是一個從kafka讀取數(shù)據(jù)處理的代碼:

import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setMaster("local[2]").setAppName("KafkaWordCount")
// batchDuration 設(shè)置為 1 秒,然后創(chuàng)建一個 streaming 入口
// 每1秒依據(jù)RDD中創(chuàng)建一次job,輸入RDD就從已經(jīng)已經(jīng)收集的batch中取。
val ssc = new StreamingContext(conf, Seconds(1))

val kafkaParams: Map[String, String] = Map("group.id" -> "test",...)
val topics = Map("test" -> 1)
val lines = KafkaUtils.createStream(
      ssc,
      kafkaParams,
      topics,  StorageLevel.MEMORY_AND_DISK)
val words = lines.flatMap(_.split(" "))     
val pairs = words.map(word => (word, 1))    
val wordCounts = pairs.reduceByKey(_ + _)   
wordCounts.print()                      
wordCounts.foreachRDD(...)     
ssc.start()
ssc.awaitTermination()

和之前基于RDD的的wordcount程序不同:

  1. KafkaUtils.createStream(...)創(chuàng)建出來的不是RDD,和是一個DStream的類,
  2. DStream同樣存在map、flatMap、reduceByKey這樣的轉(zhuǎn)換操作,但是它是從DStream到DStream的轉(zhuǎn)換。
  3. print在RDD里表示一種action,會觸發(fā)job的創(chuàng)建和提交,但是DStream的action操作不會,它的處理方式不同,后續(xù)會介紹。
  4. ssc.start會啟動一下組建:
    • JobScheduler, 調(diào)度和追蹤job
    • JobGenerator,由JobScheduler啟動,定時(初始化StreamingContext指定的時間)從DStream創(chuàng)建出job
    • ReceiverTracker, 運(yùn)行在Driver上,收集 從各個receiver上報的流數(shù)據(jù)batch信息
    • ReceiverSupervisor,由ReceiverTracker運(yùn)行發(fā)送消息使其在executor上運(yùn)行,接收Receiver匯報的batch數(shù)據(jù),然后將數(shù)據(jù)信息匯報給ReceiverTracker。
    • Receiver, 運(yùn)行在executor上,由ReceiverSupervisor啟動,負(fù)責(zé)著流中讀區(qū)數(shù)據(jù),分成batch,匯報給ReceiverSupervisor。

從上介紹可以看出,job是在ssc.start過程中創(chuàng)建的,而且在運(yùn)行期間會根據(jù)用戶設(shè)置的duration不斷的創(chuàng)建。

下圖表示了使用kafka作為輸入源時的streaming工作期間的流程:

圖1 streaming工作流程
  1. Receiver1從kafka中讀入數(shù)據(jù),將數(shù)據(jù)轉(zhuǎn)發(fā)給ReceiverSupervisor
  2. ReceiverSupervisor,使用BlockManager存儲并管理數(shù)據(jù)信息。
  3. ReceiverSupervisor,將數(shù)據(jù)信息發(fā)送給運(yùn)行在Driver上的ReceiverTracker。
  4. JobGenerator,假設(shè)上面wordCount代碼中DStream之間的轉(zhuǎn)換看作一張DAG,DStreamGraph保存了所有的DAG。JobGenerate每隔一段時間從DStreamGraph中的DStream DAG生成RDD的DAG,然后提交RDD的job。RDD的數(shù)據(jù)則來自Generator根據(jù)ReceiverTracker中收集的batch數(shù)據(jù)信息。

2.1 DStream 到RDD的轉(zhuǎn)換

由于基于RDD計算的基于靜態(tài)的數(shù)據(jù),而數(shù)據(jù)是不斷產(chǎn)生的,spark streaming將輸入數(shù)據(jù)切成一個個batch,因此需要不斷的產(chǎn)生job去計算batch中的數(shù)據(jù)。

上面wordCount程序,描述了DStream之間的轉(zhuǎn)換,看起來幾乎和RDD之間的轉(zhuǎn)換是一樣的,JobGenerator運(yùn)行期間根據(jù)DStream不停創(chuàng)建RDD,再由RDD生成job 經(jīng)SparkContext提交運(yùn)行。DStream相當(dāng)于模板,RDD相當(dāng)于使用模板創(chuàng)造出的零件,而JobGenerator則相當(dāng)于操作模板的工人了。

下圖描述了DStream和RDD在運(yùn)行期間的關(guān)系:

DStream和RDD

可以看到DStream的子類都有一個RDD的對應(yīng)類,一句DStream生成的RDD DAG和DStream擁有一樣的轉(zhuǎn)換和依賴。采集輸入流中的一段數(shù)據(jù)作為RDD的源數(shù)據(jù)。

RDD#compute方法中完成輸入數(shù)據(jù)的計算,DStream也存在compute方法,但是其compute方法這是完成DStream到RDD的轉(zhuǎn)換。

2.2 ReceiverInputDStream

所有繼承DStream的類中,ReceiverInputDStream除了像其他DStream一樣創(chuàng)建出RDD以外,還需要返回一個Receiver負(fù)責(zé)接收收據(jù),例如ReceiverInputDStream的子類SocketInputDStream就能返回一個SocketReceiver的Receiver的實現(xiàn)類。

ReceiverInputDStream一般都是一個DStream DAG的源頭。

當(dāng)ReceiverTracker調(diào)用start啟動時,它會從DStreamGraph持有的DStream DAG中獲得所有的ReceiverInputDStream,然后取得Receiver,通過巧妙的方式將Reciver包裝成Task,然后發(fā)送到executor上執(zhí)行,然后在receiver端,Receiver和ReceiverSupervisor啟動接收數(shù)據(jù)。

在SparkStreaming(3) ReceiverTracker和Receiver中,啟動receiver時,receiver就是按上面方式獲得的。

2.3 output 操作

DStream和RDD有著類似的操作,map這種使得RDD轉(zhuǎn)換成新的RDD的操作稱為Transformation,foreach這種觸發(fā)job的創(chuàng)建和提交的操作稱為Action, DStream類似,Dstream到DStream的稱為Transformation, DStream的output操作有點類似rdd中的action操作,一個action意味著一個新的job被創(chuàng)建提交。DStream的output操作意味著一個DStream DAG模板的創(chuàng)建,也意味著到此處DStream轉(zhuǎn)換成RDD應(yīng)該觸發(fā)job,DStream常見的output操作有:

  1. saveAsTextFiles
  2. saveAsObjectFiles
  3. print
  4. foreachRDD

3 DStreamGraph

DStreamGraph用來保存所有output操作生成DStream DAG。
比如下面是DStream#foreachRDD的代碼:

private def foreachRDD(
      foreachFunc: (RDD[T], Time) => Unit,
      displayInnerRDDOps: Boolean): Unit = {
   // 調(diào)用了DStream#register()方法
    new ForEachDStream(this,
      context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
  }

//register方法向DStreamGraph注冊當(dāng)前DStream
// 由于DStream保存了所有的父依賴,因此注冊當(dāng)前DStream
// 就能追溯出整個DStream DAG,相當(dāng)于注冊了DStream DAG
private[streaming] def register(): DStream[T] = {
    ssc.graph.addOutputStream(this)
    this
  }

上面說DStream的output操作相當(dāng)于觸發(fā)一個DStream DAG模板的創(chuàng)建,而一個模板對應(yīng)一種job。 第一節(jié)wordcount代碼中分別有print和foreachRDD兩個output操作,因此DStreamGraph可以理解持有兩個DStream DAG,如下圖:


盡管創(chuàng)建出來的DStream DAG是一樣的,但是依然會創(chuàng)建出兩份RDD DAG,生成兩類job,

DStreamGraph還肩負(fù)著從根據(jù)注冊的DStreamDAG創(chuàng)建job的任務(wù),后續(xù)JobGenerator就是調(diào)用DStreamGraph完成創(chuàng)建job。下面是DstreamGraph創(chuàng)建job的方法:

def generateJobs(time: Time): Seq[Job] = {
    logDebug("Generating jobs for time " + time)
    val jobs = this.synchronized {
      // 對每一個因output操作而注冊的DStream DAG生成job
      outputStreams.flatMap { outputStream =>
        val jobOption = outputStream.generateJob(time)
        jobOption.foreach(_.setCallSite(outputStream.creationSite))
        jobOption
      }
    }
    logDebug("Generated " + jobs.length + " jobs for time " + time)
    jobs
  }


這是DStream#generateJob方法,time表示每一次生成job的時間

private[streaming] def generateJob(time: Time): Option[Job] = {
   // getOrCompute將DStream轉(zhuǎn)換成RDD,轉(zhuǎn)換操作是從當(dāng)前
   // DStream往上游追溯,追溯到源頭后在一次往下生成RDD的過程
   // 是一次DFS的過程。
    getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => {
          val emptyFunc = { (iterator: Iterator[T]) => {} }
         // 創(chuàng)建到RDD后提交job
          context.sparkContext.runJob(rdd, emptyFunc)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容