StructedStreaming 流程分析
導(dǎo)言
Spark在2.*版本后加入StructedStreaming模塊,與流處理引擎Sparkstreaming一樣,用于處理流數(shù)據(jù)。但二者又有許多不同之處。
Sparkstreaming首次引入在0.*版本,其核心思想是利用spark批處理框架,以microbatch(以一段時間的流作為一個batch)的方式,完成對流數(shù)據(jù)的處理。
StructedStreaming誕生于2.*版本,主要用于處理結(jié)構(gòu)化流數(shù)據(jù),與Sparkstreaming不同的是StructedStrreaming不再是microbatch的處理方式,而是可以"不停的"循環(huán)從數(shù)據(jù)源獲取數(shù)據(jù)。從而實現(xiàn)真正的流處理。以dataset為代表的帶有結(jié)構(gòu)化(schema信息)的數(shù)據(jù)處理由于鎢絲計劃的完成,表現(xiàn)出更優(yōu)越的性能。同時Structedstreaming可以從數(shù)據(jù)中獲取時間(eventTime),從而可以針對流數(shù)據(jù)的生產(chǎn)時間而非收到數(shù)據(jù)的時間進行處理。
StructedStreaming的相關(guān)介紹可參考(http://spark.apache.org/docs/2.3.0/structured-streaming-programming-guide.html)。本文對StructedStreaming的流程/機制進行分析
開發(fā)structedStreaming應(yīng)用
StructedStreaming應(yīng)用開發(fā)流程
從官網(wǎng)/源碼中可以看到structedstreaming應(yīng)用的開發(fā)
除了spark的初始化工作,通常有三步與業(yè)務(wù)相關(guān)的操作:
-
獲取輸入數(shù)據(jù)源(可以理解為source)
val lines = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", bootstrapServers) .option(subscribeType, topics) .load() .selectExpr("CAST(value AS STRING)") .as[String] 根據(jù)業(yè)務(wù)邏輯對數(shù)據(jù)進行轉(zhuǎn)換處理 (業(yè)務(wù)處理)
wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()
- 將處理結(jié)果寫入第三方數(shù)據(jù)源,整個流應(yīng)用通過query.start啟動(可以理解為sink)
query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.option("checkpointLocation", checkpointLocation)
.start()
query.awaitTermination()
流數(shù)據(jù)的讀取
通過DataStreamReader類完成應(yīng)用層與不同的流source源的reader隔離。load方法會為應(yīng)用獲取數(shù)據(jù)的邏輯
在處理數(shù)據(jù)源時框架使用serviceload機制,將所有集成DataSourceRegister的類加載如內(nèi)存,判斷對應(yīng)source的shortName是否與設(shè)置的一致,如果一致,則實例化此類。并根據(jù)此類屬性生成對應(yīng)的dataframe。
當前支持的source源有如下:
| Source名 | Source源 |
|---|---|
| MemorySource | 測試用 |
| TextSocketSource | 用于展示使用 |
| FileStreamSource | 從固定目下下讀文件 |
| KafkaSource | kafka作為數(shù)據(jù)源 |
| RateStreamSource | 固定速率的消息生成器,自增長的long型和時間戳 |
流數(shù)據(jù)的寫出
數(shù)據(jù)的寫出需要選擇寫出模式以及寫出的sink源
寫出模式:append,update,complete。 Structed streaming對寫出模式的支持與數(shù)據(jù)處理時使用到的算子有關(guān)。需要根據(jù)需求,處理邏輯選合適的寫出模式??蓞⒖?http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes). Structed streaming對一些輸出模式和算子的支持情況的校驗可參考org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
sink源的寫出:
在處理sink源時框架依然使用serviceload機制,將所有集成DataSourceRegister的類加載如內(nèi)存,判斷對應(yīng)source的shortName是否與設(shè)置的一致,如果一致,則實例化此類
當前實現(xiàn)的sink
| Sink名 | sink目的地 |
|---|---|
| memorysink | 測試用 |
| foreachSink | 需要實現(xiàn)foreachwriter,用于定制化sink |
| kafkaSink | 寫出數(shù)據(jù)到kafka |
| fileformatSink | 寫出數(shù)據(jù)到hdfs。支持ORC,parquet等 |
StructedStreaming深入理解
對于structed streaming有如上理解即可開發(fā)相關(guān)應(yīng)用。但structedstreaming的實現(xiàn)機制依然值得深究,尤其是structedstreaming是job是如何觸發(fā)機制,watermark是如何實現(xiàn)的,狀態(tài)數(shù)據(jù)是如何保存并用戶應(yīng)用恢復(fù)的。如下對這三個“問題”進行分析
About Trigger
與sparkstreaming基于定時器產(chǎn)生job然后調(diào)度的機制不同,structedstreaming實現(xiàn)了一套新的job觸發(fā)機制(trigger)。類似于flink這就是trigger機制。
trigger的設(shè)置
通過DataStreamWriter.trigger()完成對trigger設(shè)置。默認的trigger為ProcessingTime(interval),interval默認為0
trigger的分類
trigger有三種,OneTimeTrigger只會觸發(fā)一次計算。在流應(yīng)用中一般使用ProcessingTime和ContinuousTrigger兩種,下面對著兩種trigger進行對比
| Trigger類 | ProcessingTime | Continuous |
|---|---|---|
| 對應(yīng)execution | MicroBatchExecution | ContinuousExecution |
| 工作模式 | 以一定間隔(interval)調(diào)度計算邏輯,間隔為0時,上批次調(diào)用完成后,立即進入下一批次調(diào)用一直調(diào)用,退化為類似sparkstreaming的micro batch的流處理 | 以一定間隔(interval)查看流計算狀態(tài) |
| 支持API | 支持API豐富,如匯聚,關(guān)聯(lián)等操作 | 僅簡單的projection類(map,select等) |
| 備注 | total-cores個數(shù)大于partition數(shù),task長時運行 |
ProcessingTime
在使用ProcessingTime Trigger時,對應(yīng)的執(zhí)行引擎為MicrobatchExecution。
Trigger調(diào)度機制如下:
override def execute(triggerHandler: () => Boolean): Unit = {
while (true) {
val triggerTimeMs = clock.getTimeMillis
val nextTriggerTimeMs = nextBatchTime(triggerTimeMs)
val terminated = !triggerHandler()
if (intervalMs > 0) {
val batchElapsedTimeMs = clock.getTimeMillis - triggerTimeMs
if (batchElapsedTimeMs > intervalMs) {
notifyBatchFallingBehind(batchElapsedTimeMs)
}
if (terminated) {
return
}
clock.waitTillTime(nextTriggerTimeMs)
} else {
if (terminated) {
return
}
}
}
}
ProcessingTime Trigger循環(huán)調(diào)度每執(zhí)行邏輯:
triggerExecutor.execute(() => {
startTrigger()
if (isActive) {
reportTimeTaken("triggerExecution") {
if (currentBatchId < 0) {
// We'll do this initialization only once
populateStartOffsets(sparkSessionForStream)
...
} else {
constructNextBatch()
}
if (dataAvailable) {
currentStatus = currentStatus.copy(isDataAvailable = true)
updateStatusMessage("Processing new data")
runBatch(sparkSessionForStream)
}
}
// Report trigger as finished and construct progress object.
finishTrigger(dataAvailable)
if (dataAvailable) {
// Update committed offsets.
commitLog.add(currentBatchId)
committedOffsets ++= availableOffsets
currentBatchId += 1
sparkSession.sparkContext.setJobDescription(getBatchDescriptionString)
} else {
currentStatus = currentStatus.copy(isDataAvailable = false)
updateStatusMessage("Waiting for data to arrive")
Thread.sleep(pollingDelayMs)
}
}
updateStatusMessage("Waiting for next trigger")
isActive
})
ContinuousTrigger
在使用ContinuousTrigger時,對應(yīng)的執(zhí)行邏輯為continuousExecution。在調(diào)度時,Trigger退化為ProcessingTime Trigger。僅僅對執(zhí)行狀態(tài)查詢,記錄
Continuous執(zhí)行邏輯
triggerExecutor.execute(() => {
startTrigger()
if (reader.needsReconfiguration() && state.compareAndSet(ACTIVE, RECONFIGURING)) {
stopSources()
if (queryExecutionThread.isAlive) {
sparkSession.sparkContext.cancelJobGroup(runId.toString)
queryExecutionThread.interrupt()
}
false
} else if (isActive) {
currentBatchId = epochEndpoint.askSync[Long](IncrementAndGetEpoch)
logInfo(s"New epoch $currentBatchId is starting.")
true
} else {
false
}
})
在ContinuousDataSourceRDD的compute方法中可以看出,其計算邏輯如下:
* 通過一個名為**continuous-reader--${context.partitionId()}--" +
s"${context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY)}** 的線程實時獲取數(shù)據(jù),放入名為queue的隊列中。
* worker線程則長時間運行,在計算時則是從queue中實時獲取消息處理。
About waternark
StructedStreaming的與sparkstreaming相比一大特性就是支持基于數(shù)據(jù)中的時間戳的數(shù)據(jù)處理。也就是在處理數(shù)據(jù)時,可以對記錄中的字段的時間進行考慮。eventTime更好的代表數(shù)據(jù)本身的信息。
可以獲取消息本身的時間戳之后,就可以根據(jù)該時間戳來判斷消息的到達是否延遲(亂序)以及延遲的時間是否在容忍的范圍內(nèi)。該判斷方法是根據(jù)watermark機制來設(shè)置和判斷消息的有效性(延遲是否在可容忍范圍內(nèi))

watermark的設(shè)置
通過dataset.withWatermark()完成對watermark的設(shè)置
watermark的生成/更新
在driver內(nèi)注冊一個累加器eventTimeStats;
-
在一個批次計算內(nèi),executor的各task根據(jù)各自分區(qū)內(nèi)的消息的時間戳,來更新累加器
executor中各task獲取分區(qū)的eventtime信息方式如下: 在EventTimeWatermarkExec中的doExecute方法中 iter.map { row => eventTimeStats.add(getEventTime(row).getLong(0) / 1000) row } def add(eventTime: Long): Unit = { this.max = math.max(this.max, eventTime) this.min = math.min(this.min, eventTime) this.count += 1 this.avg += (eventTime - avg) / count } -
在driver端生成batch時,獲取各個操作/plan的watermark,找出操作的最小的watermark時間點,寫入offsetSeqMetadata,同時寫入offsetlog
// 計算各plan的watermark lastExecution.executedPlan.collect { case e: EventTimeWatermarkExec => e }.zipWithIndex.foreach { case (e, index) if e.eventTimeStats.value.count > 0 => logDebug(s"Observed event time stats $index: ${e.eventTimeStats.value}") val newWatermarkMs = e.eventTimeStats.value.max - e.delayMs val prevWatermarkMs = watermarkMsMap.get(index) if (prevWatermarkMs.isEmpty || newWatermarkMs > prevWatermarkMs.get) { watermarkMsMap.put(index, newWatermarkMs) } //找出watermark中最小值 if(!watermarkMsMap.isEmpty) { val newWatermarkMs = watermarkMsMap.minBy(_._2)._2 if (newWatermarkMs > batchWatermarkMs) { logInfo(s"Updating eventTime watermark to: $newWatermarkMs ms") batchWatermarkMs = newWatermarkMs } //寫入offsetSeqMetadata offsetSeqMetadata = offsetSeqMetadata.copy( batchWatermarkMs = batchWatermarkMs, batchTimestampMs = triggerClock.getTimeMillis()) //寫入offsetlog offsetLog.add( currentBatchId, availableOffsets.toOffsetSeq(sources, offsetSeqMetadata) -
根據(jù)watermark在讀消息時過濾數(shù)據(jù)
StreamingSymmetricHashJoinExec -> doExecutor -> processPartitions -> StoreAndJoinWithOtherSide中有如下操作: val nonLateRows = WatermarkSupport.watermarkExpression(watermarkAttribute, eventTimeWatermark) match { case Some(watermarkExpr) => val predicate = newPredicate(watermarkExpr, inputAttributes) inputIter.filter { row => !predicate.eval(row) } case None => inputIter }
About state:
流應(yīng)用中,如果有狀態(tài)相關(guān)的如匯聚,關(guān)聯(lián)等操作,需要再應(yīng)用中將部分數(shù)據(jù)進行緩存,structedstreaming中通過statestore來對數(shù)據(jù)緩存以備后續(xù)計算及異常恢復(fù)使用
當前的statestore的實現(xiàn)僅HDFSBackedStateStore,由HDFSBackedStateStoreProvider生成和管理; 每個HDFSBackedStateStoreProvider對應(yīng)一個目錄。該目錄為{storeName}.
其中checkpointLocation是query中設(shè)置的路徑,storeName是store分類,在關(guān)聯(lián)中有如如下storeType(如left-keyToNumValues)
每個statestore對應(yīng)一個versionId.delta文件 {storeName}/versionId.delta。
狀態(tài)數(shù)據(jù)的寫入:
在在一些有狀態(tài)的操作如關(guān)聯(lián)匯聚等,部分數(shù)據(jù)需要保存以備后續(xù)計算使用,
store的put操作:
只有需要存儲部分狀態(tài)的操作/算子需要對狀態(tài)數(shù)據(jù)進行緩存。從源碼中查看,有如下算子:
StateStoreSaveExec
FlatMapGroupsWithStateExec
SymmetricHashJoinStateManager
以流關(guān)聯(lián)操作為例,介紹SymmetricHashJoinStateManager中的state寫流程如下:
1) 將數(shù)據(jù)寫入state文件:在StreamingSymmetricHashJoinExec的doExecute方法中,調(diào)用到processPartitions,會調(diào)用到OneSideHashJoiner的storeAndJoinWithOtherSide方法,會根據(jù)條件判斷該記錄是否寫入臨時文件的輸出流中。判斷條件condition ( !stateKeyWatermarkPredicateFunc(key) && !stateValueWatermarkPredicateFunc(thisRow))

2) 在計算節(jié)結(jié)束后,將statestore數(shù)據(jù)寫入磁盤
StreamingSymmetricHashJoinExec -> onOutputCompletion -> leftSideJoiner.commitStateAndGetMetrics -> joinStateManager.commit -> keyToNumValues.commit -> StateStoreHandler.commit -> HDFSBackedStateStore.commit
狀態(tài)數(shù)據(jù)的讀取:
在一些有狀態(tài)的操作如關(guān)聯(lián)匯聚等,需要對“歷史/之前批次”數(shù)據(jù)進行“緩存”,以備下次計算時,讀取使用。
有兩處讀取store的邏輯
1) statestoreRdd的compute方法
2)StreamingSymmetricHashJoinExec -> doExecutor -> processPartitions -> OneSideHashJoiner.init -> SymmetricHashJoinStateManager.init -> KeyToNumValuesStore.init -> getStateStore -> stateStore.get ->storeProvider.getStore
狀態(tài)數(shù)據(jù)的管理/maintain
在executor內(nèi)部,對于每一個分片啟動一個線程定期“compact”中間數(shù)據(jù),周期由spark.sql.streaming.stateStore.maintenanceInterval參數(shù)控制,默認為60s,線程名 : state-store-maintenance-task 主要工作是掃描delta文件,生成snapshot文件,清理陳舊文件。
生成snapshot文件具體邏輯:
1) 掃描目錄下的文件,找出delta文件當前最大的版本號Max(d)(delta文件的命名方式Int.delta,版本號為Int值,如10.delta,則版本號為10)
2) 找出當前最大的snapshot文件的版本號Max(s)(delta文件的命名方式Int.snapshot,版本號為Int值,如10.snapshot,則版本號為10)
3) 當Max(d) - Max(s) 大于spark.sql.streaming.stateStore.minDeltasForSnapshot(默認為10)時,進行打快照操作。否則,跳過。
陳舊文件清理:
1) 找出當前文件的最大版本號Max(v)
2) MaxversionToRetain = Max(v) - spark.sql.streaming.minBatchesToRetain(默認100)時,當MaxversionToRetain > 0 時清理所有版本號小于MaxversionToRetain的文件。否則,跳過