Structured Streaming 分析

StructedStreaming 流程分析

導(dǎo)言

Spark在2.*版本后加入StructedStreaming模塊,與流處理引擎Sparkstreaming一樣,用于處理流數(shù)據(jù)。但二者又有許多不同之處。

Sparkstreaming首次引入在0.*版本,其核心思想是利用spark批處理框架,以microbatch(以一段時間的流作為一個batch)的方式,完成對流數(shù)據(jù)的處理。

StructedStreaming誕生于2.*版本,主要用于處理結(jié)構(gòu)化流數(shù)據(jù),與Sparkstreaming不同的是StructedStrreaming不再是microbatch的處理方式,而是可以"不停的"循環(huán)從數(shù)據(jù)源獲取數(shù)據(jù)。從而實現(xiàn)真正的流處理。以dataset為代表的帶有結(jié)構(gòu)化(schema信息)的數(shù)據(jù)處理由于鎢絲計劃的完成,表現(xiàn)出更優(yōu)越的性能。同時Structedstreaming可以從數(shù)據(jù)中獲取時間(eventTime),從而可以針對流數(shù)據(jù)的生產(chǎn)時間而非收到數(shù)據(jù)的時間進行處理。

StructedStreaming的相關(guān)介紹可參考(http://spark.apache.org/docs/2.3.0/structured-streaming-programming-guide.html)。本文對StructedStreaming的流程/機制進行分析

開發(fā)structedStreaming應(yīng)用

StructedStreaming應(yīng)用開發(fā)流程

從官網(wǎng)/源碼中可以看到structedstreaming應(yīng)用的開發(fā)
除了spark的初始化工作,通常有三步與業(yè)務(wù)相關(guān)的操作:

  1. 獲取輸入數(shù)據(jù)源(可以理解為source)

     val lines = spark
     .readStream
     .format("kafka")
     .option("kafka.bootstrap.servers", bootstrapServers)
     .option(subscribeType, topics)
     .load()
     .selectExpr("CAST(value AS STRING)")
     .as[String]
    
  2. 根據(jù)業(yè)務(wù)邏輯對數(shù)據(jù)進行轉(zhuǎn)換處理 (業(yè)務(wù)處理)

 wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()
  1. 將處理結(jié)果寫入第三方數(shù)據(jù)源,整個流應(yīng)用通過query.start啟動(可以理解為sink)
query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .option("checkpointLocation", checkpointLocation)
      .start()
      query.awaitTermination()

流數(shù)據(jù)的讀取

通過DataStreamReader類完成應(yīng)用層與不同的流source源的reader隔離。load方法會為應(yīng)用獲取數(shù)據(jù)的邏輯

在處理數(shù)據(jù)源時框架使用serviceload機制,將所有集成DataSourceRegister的類加載如內(nèi)存,判斷對應(yīng)source的shortName是否與設(shè)置的一致,如果一致,則實例化此類。并根據(jù)此類屬性生成對應(yīng)的dataframe。

當前支持的source源有如下:

Source名 Source源
MemorySource 測試用
TextSocketSource 用于展示使用
FileStreamSource 從固定目下下讀文件
KafkaSource kafka作為數(shù)據(jù)源
RateStreamSource 固定速率的消息生成器,自增長的long型和時間戳

流數(shù)據(jù)的寫出

數(shù)據(jù)的寫出需要選擇寫出模式以及寫出的sink源

寫出模式:append,update,complete。 Structed streaming對寫出模式的支持與數(shù)據(jù)處理時使用到的算子有關(guān)。需要根據(jù)需求,處理邏輯選合適的寫出模式??蓞⒖?http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes). Structed streaming對一些輸出模式和算子的支持情況的校驗可參考org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker

sink源的寫出:

在處理sink源時框架依然使用serviceload機制,將所有集成DataSourceRegister的類加載如內(nèi)存,判斷對應(yīng)source的shortName是否與設(shè)置的一致,如果一致,則實例化此類

當前實現(xiàn)的sink

Sink名 sink目的地
memorysink 測試用
foreachSink 需要實現(xiàn)foreachwriter,用于定制化sink
kafkaSink 寫出數(shù)據(jù)到kafka
fileformatSink 寫出數(shù)據(jù)到hdfs。支持ORC,parquet等

StructedStreaming深入理解

對于structed streaming有如上理解即可開發(fā)相關(guān)應(yīng)用。但structedstreaming的實現(xiàn)機制依然值得深究,尤其是structedstreaming是job是如何觸發(fā)機制,watermark是如何實現(xiàn)的,狀態(tài)數(shù)據(jù)是如何保存并用戶應(yīng)用恢復(fù)的。如下對這三個“問題”進行分析

About Trigger

與sparkstreaming基于定時器產(chǎn)生job然后調(diào)度的機制不同,structedstreaming實現(xiàn)了一套新的job觸發(fā)機制(trigger)。類似于flink這就是trigger機制。

trigger的設(shè)置

通過DataStreamWriter.trigger()完成對trigger設(shè)置。默認的trigger為ProcessingTime(interval),interval默認為0

trigger的分類

trigger有三種,OneTimeTrigger只會觸發(fā)一次計算。在流應(yīng)用中一般使用ProcessingTime和ContinuousTrigger兩種,下面對著兩種trigger進行對比

Trigger類 ProcessingTime Continuous
對應(yīng)execution MicroBatchExecution ContinuousExecution
工作模式 以一定間隔(interval)調(diào)度計算邏輯,間隔為0時,上批次調(diào)用完成后,立即進入下一批次調(diào)用一直調(diào)用,退化為類似sparkstreaming的micro batch的流處理 以一定間隔(interval)查看流計算狀態(tài)
支持API 支持API豐富,如匯聚,關(guān)聯(lián)等操作 僅簡單的projection類(map,select等)
備注 total-cores個數(shù)大于partition數(shù),task長時運行

ProcessingTime

在使用ProcessingTime Trigger時,對應(yīng)的執(zhí)行引擎為MicrobatchExecution。

Trigger調(diào)度機制如下:

override def execute(triggerHandler: () => Boolean): Unit = {
 while (true) {
  val triggerTimeMs = clock.getTimeMillis
  val nextTriggerTimeMs = nextBatchTime(triggerTimeMs)
  val terminated = !triggerHandler()
  if (intervalMs > 0) {
    val batchElapsedTimeMs = clock.getTimeMillis - triggerTimeMs
    if (batchElapsedTimeMs > intervalMs) {
      notifyBatchFallingBehind(batchElapsedTimeMs)
    }
    if (terminated) {
      return
    }
    clock.waitTillTime(nextTriggerTimeMs)
  } else {
    if (terminated) {
      return
    }
  }
}
}

ProcessingTime Trigger循環(huán)調(diào)度每執(zhí)行邏輯:

triggerExecutor.execute(() => {
  startTrigger()
  if (isActive) {
    reportTimeTaken("triggerExecution") {
      if (currentBatchId < 0) {
        // We'll do this initialization only once
        populateStartOffsets(sparkSessionForStream)
        ...
      } else {
        constructNextBatch()
      }
      if (dataAvailable) {
        currentStatus = currentStatus.copy(isDataAvailable = true)
        updateStatusMessage("Processing new data")
        runBatch(sparkSessionForStream)
      }
    }
    // Report trigger as finished and construct progress object.
    finishTrigger(dataAvailable)
    if (dataAvailable) {
      // Update committed offsets.
      commitLog.add(currentBatchId)
      committedOffsets ++= availableOffsets
      currentBatchId += 1
      sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
    } else {
      currentStatus = currentStatus.copy(isDataAvailable = false)
      updateStatusMessage("Waiting for data to arrive")
      Thread.sleep(pollingDelayMs)
    }
  }
  updateStatusMessage("Waiting for next trigger")
  isActive
})

ContinuousTrigger

在使用ContinuousTrigger時,對應(yīng)的執(zhí)行邏輯為continuousExecution。在調(diào)度時,Trigger退化為ProcessingTime Trigger。僅僅對執(zhí)行狀態(tài)查詢,記錄

Continuous執(zhí)行邏輯

    triggerExecutor.execute(() => {
        startTrigger()

        if (reader.needsReconfiguration() && state.compareAndSet(ACTIVE, RECONFIGURING)) {
          stopSources()
          if (queryExecutionThread.isAlive) {
            sparkSession.sparkContext.cancelJobGroup(runId.toString)
            queryExecutionThread.interrupt()
          }
          false
        } else if (isActive) {
          currentBatchId = epochEndpoint.askSync[Long](IncrementAndGetEpoch)
          logInfo(s"New epoch $currentBatchId is starting.")
          true
        } else {
          false
        }
      })

在ContinuousDataSourceRDD的compute方法中可以看出,其計算邏輯如下:

* 通過一個名為**continuous-reader--${context.partitionId()}--" +
    s"${context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY)}** 的線程實時獲取數(shù)據(jù),放入名為queue的隊列中。
* worker線程則長時間運行,在計算時則是從queue中實時獲取消息處理。

About waternark

StructedStreaming的與sparkstreaming相比一大特性就是支持基于數(shù)據(jù)中的時間戳的數(shù)據(jù)處理。也就是在處理數(shù)據(jù)時,可以對記錄中的字段的時間進行考慮。eventTime更好的代表數(shù)據(jù)本身的信息。
可以獲取消息本身的時間戳之后,就可以根據(jù)該時間戳來判斷消息的到達是否延遲(亂序)以及延遲的時間是否在容忍的范圍內(nèi)。該判斷方法是根據(jù)watermark機制來設(shè)置和判斷消息的有效性(延遲是否在可容忍范圍內(nèi))

屏幕快照 2018-06-12 下午10.15.59.png

watermark的設(shè)置

通過dataset.withWatermark()完成對watermark的設(shè)置

watermark的生成/更新

  1. 在driver內(nèi)注冊一個累加器eventTimeStats;

  2. 在一個批次計算內(nèi),executor的各task根據(jù)各自分區(qū)內(nèi)的消息的時間戳,來更新累加器

     executor中各task獲取分區(qū)的eventtime信息方式如下:
     在EventTimeWatermarkExec中的doExecute方法中
     iter.map { row =>
         eventTimeStats.add(getEventTime(row).getLong(0) / 1000)
         row
       }
      def add(eventTime: Long): Unit = {
         this.max = math.max(this.max, eventTime)
         this.min = math.min(this.min, eventTime)
         this.count += 1
         this.avg += (eventTime - avg) / count
     }
    
  3. 在driver端生成batch時,獲取各個操作/plan的watermark,找出操作的最小的watermark時間點,寫入offsetSeqMetadata,同時寫入offsetlog

     // 計算各plan的watermark
     lastExecution.executedPlan.collect {
               case e: EventTimeWatermarkExec => e
             }.zipWithIndex.foreach {
               case (e, index) if e.eventTimeStats.value.count > 0 =>
                 logDebug(s"Observed event time stats $index: ${e.eventTimeStats.value}")
                 val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs
                 val prevWatermarkMs = watermarkMsMap.get(index)
                 if (prevWatermarkMs.isEmpty || newWatermarkMs > prevWatermarkMs.get) {
                   watermarkMsMap.put(index, newWatermarkMs)
                 }
           //找出watermark中最小值      
         if(!watermarkMsMap.isEmpty) {
           val newWatermarkMs = watermarkMsMap.minBy(_._2)._2
           if (newWatermarkMs > batchWatermarkMs) {
             logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms")
             batchWatermarkMs = newWatermarkMs
           }
           //寫入offsetSeqMetadata
           offsetSeqMetadata = offsetSeqMetadata.copy(
         batchWatermarkMs = batchWatermarkMs,
         batchTimestampMs = triggerClock.getTimeMillis())
         //寫入offsetlog
         offsetLog.add(
       currentBatchId,
       availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)
    
  4. 根據(jù)watermark在讀消息時過濾數(shù)據(jù)

     StreamingSymmetricHashJoinExec -> doExecutor -> processPartitions -> StoreAndJoinWithOtherSide中有如下操作:
     
     val nonLateRows =
     WatermarkSupport.watermarkExpression(watermarkAttribute, eventTimeWatermark) match {
       case Some(watermarkExpr) =>
         val predicate = newPredicate(watermarkExpr, inputAttributes)
         inputIter.filter { row => !predicate.eval(row) }
       case None =>
         inputIter
     }
    

About state:

流應(yīng)用中,如果有狀態(tài)相關(guān)的如匯聚,關(guān)聯(lián)等操作,需要再應(yīng)用中將部分數(shù)據(jù)進行緩存,structedstreaming中通過statestore來對數(shù)據(jù)緩存以備后續(xù)計算及異常恢復(fù)使用

當前的statestore的實現(xiàn)僅HDFSBackedStateStore,由HDFSBackedStateStoreProvider生成和管理; 每個HDFSBackedStateStoreProvider對應(yīng)一個目錄。該目錄為{checkpointLocation}/state/operatorId/partitionId/{storeName}.
其中checkpointLocation是query中設(shè)置的路徑,storeName是store分類,在關(guān)聯(lián)中有如如下joinSide-storeType(如left-keyToNumValues)
每個statestore對應(yīng)一個versionId.delta文件 {checkpointLocation}/state/operatorId/partitionId/{storeName}/versionId.delta。

狀態(tài)數(shù)據(jù)的寫入:

在在一些有狀態(tài)的操作如關(guān)聯(lián)匯聚等,部分數(shù)據(jù)需要保存以備后續(xù)計算使用,

store的put操作:
只有需要存儲部分狀態(tài)的操作/算子需要對狀態(tài)數(shù)據(jù)進行緩存。從源碼中查看,有如下算子:

StateStoreSaveExec
FlatMapGroupsWithStateExec
SymmetricHashJoinStateManager

以流關(guān)聯(lián)操作為例,介紹SymmetricHashJoinStateManager中的state寫流程如下:

1) 將數(shù)據(jù)寫入state文件:在StreamingSymmetricHashJoinExec的doExecute方法中,調(diào)用到processPartitions,會調(diào)用到OneSideHashJoiner的storeAndJoinWithOtherSide方法,會根據(jù)條件判斷該記錄是否寫入臨時文件的輸出流中。判斷條件condition ( !stateKeyWatermarkPredicateFunc(key) && !stateValueWatermarkPredicateFunc(thisRow))
屏幕快照 2018-06-12 下午10.16.14.png
2) 在計算節(jié)結(jié)束后,將statestore數(shù)據(jù)寫入磁盤
    StreamingSymmetricHashJoinExec -> onOutputCompletion -> leftSideJoiner.commitStateAndGetMetrics -> joinStateManager.commit -> keyToNumValues.commit -> StateStoreHandler.commit -> HDFSBackedStateStore.commit

狀態(tài)數(shù)據(jù)的讀取:

在一些有狀態(tài)的操作如關(guān)聯(lián)匯聚等,需要對“歷史/之前批次”數(shù)據(jù)進行“緩存”,以備下次計算時,讀取使用。
有兩處讀取store的邏輯

1) statestoreRdd的compute方法
2)StreamingSymmetricHashJoinExec -> doExecutor -> processPartitions -> OneSideHashJoiner.init -> SymmetricHashJoinStateManager.init -> KeyToNumValuesStore.init -> getStateStore -> stateStore.get ->storeProvider.getStore

狀態(tài)數(shù)據(jù)的管理/maintain
在executor內(nèi)部,對于每一個分片啟動一個線程定期“compact”中間數(shù)據(jù),周期由spark.sql.streaming.stateStore.maintenanceInterval參數(shù)控制,默認為60s,線程名 : state-store-maintenance-task 主要工作是掃描delta文件,生成snapshot文件,清理陳舊文件。

生成snapshot文件具體邏輯:
    1) 掃描目錄下的文件,找出delta文件當前最大的版本號Max(d)(delta文件的命名方式Int.delta,版本號為Int值,如10.delta,則版本號為10)
    2) 找出當前最大的snapshot文件的版本號Max(s)(delta文件的命名方式Int.snapshot,版本號為Int值,如10.snapshot,則版本號為10)
    3) 當Max(d) - Max(s) 大于spark.sql.streaming.stateStore.minDeltasForSnapshot(默認為10)時,進行打快照操作。否則,跳過。
陳舊文件清理:
    1) 找出當前文件的最大版本號Max(v)
    2) MaxversionToRetain =  Max(v) - spark.sql.streaming.minBatchesToRetain(默認100)時,當MaxversionToRetain > 0 時清理所有版本號小于MaxversionToRetain的文件。否則,跳過
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容