structured streaming
模型思想
Structured Streaming模型是把數(shù)據(jù)流當作一個沒有邊界的數(shù)據(jù)表來對待,這樣開發(fā)人員可以在流上使用Spark SQL進行流處理,可以使用離線spark程序的開發(fā)方式來開發(fā)流處理程序,降低了開發(fā)門檻。
這張圖就顯示了,在新來數(shù)據(jù)的時候,會把新的row追加到?jīng)]邊界的表里面。

我們以一個典型的worldCount程序為例,在流入數(shù)據(jù)的時候,會把數(shù)據(jù)寫到表里面,然后在這個表上執(zhí)行word count查詢后,把統(tǒng)計出的word count寫到結(jié)果表中并輸出。

Structured Streaming 返回的是 DataFrame/DataSet,我們可以對其應用各種操作,從無類型,類似 SQL 的操作(例如 select,where,groupBy)到類型化的 RDD 類操作(例如 map,filter,flatMap)。
很多操作也不在流上支持:
- 還不支持多個流聚集(即,流 DF 上的聚合鏈)。
- 不支持 limit 和 take(N)
- 不支持 Distinct
- sort 操作僅在聚合后在完整輸出模式下支持
- 流和靜態(tài)流的外連接支持是有條件的:
- 不支持帶有流 DataSet 的完全外連接
- 不支持右側(cè)的流的左外連接
- 不支持左側(cè)的流的右外部聯(lián)接
- 不支持兩個流之間的任何 join
- count() - 無法從流 DataSet 返回單個計數(shù)。
- foreach()
- show() - 可以輸出到控制臺Sink來代替。
窗口操作
較之于DStream的窗口操作,一個顯著的改進是新的窗口運算可以基于”事件時間(數(shù)據(jù)所代表的事件發(fā)生時的時間)”(Event Time)進行計算而不在是數(shù)據(jù)進入到流上的時間。
<font size=2>? Flink支持三種時間,還有一種是事件接入時間(Ingestion Time)</font>
下圖以10分鐘為窗口尺度,統(tǒng)計了12:00 - 12:10, 12:05 - 12:15, 12:10 - 12:20三個時間窗口上word count值。dog這個單詞在12:07收到,它會影響12:00 - 12:10, 12:05 - 12:15兩個窗口。
<font size=2>? Flink區(qū)分了滾動窗口和滑動窗口,好像還有基于活躍度的會話窗口</font>

數(shù)據(jù)延遲
數(shù)據(jù)延遲基本上是沒辦法避免的事情,Structured Streaming可以保證一條舊的數(shù)據(jù)進入到流上時,依然可以基于這些“遲到”的數(shù)據(jù)重新計算并更新計算結(jié)果,但是這樣會一個問題,即需要在流上維持一個很大時間跨度的數(shù)據(jù)集,這會消耗很大的資源。

Structured Streaming引入了一種叫watermarking的機制來應對這個問題。watermarking實際上就是數(shù)據(jù)的事件時間與在其流上能找到的最大事件時間的最大差值(Time-To-Live, TTL),如果這個差值超過了設定的閾值,就意味著數(shù)據(jù)太陳舊了,時效性超出了流計算應該關注的區(qū)間,不再參與計算。(超時機制,低于水位線的都淹死了)
- In every trigger, while aggregate the data, we also scan for the max value of event time in the trigger data
- After trigger completes, compute watermark = MAX(event time before trigger, max event time in trigger)

引擎跟蹤的最大事件時間是藍色虛線,每個觸發(fā)開始時設置watermark 為(最大事件時間 - 10分鐘)是紅線。

我們先看一個延遲到達但沒有超過watermark的例子:(12:09, cat) ,這個數(shù)據(jù)會最先進入12:05 - 12:15這個窗口(雖然正常情況下它在12:00-12:10這個窗口開啟時就應該已經(jīng)就緒了,顯然它是一個遲到的數(shù)據(jù))。
watermark設定為10分鐘話意味著有效的事件時間可以推后到12:14 - 10m = 12:04,因為12:14是這個窗口中接收到的最晚的時間,代表目標系統(tǒng)最后時刻的狀態(tài),由于12:09在12:04之后,所以也被計入。
另一個超出watermark的例子是(12:04, donkey),這個時候水位線是(12:21, owl),watermark為10分鐘意味著有效的事件時間可以推后到12:11,而(12:04, donkey)比這個值還要早,說明它”太舊了”,所以不會被更新到結(jié)果表中了。
<font size=2>? Flink支持將舊數(shù)據(jù)單獨列到一個地方,便于進行后續(xù)的訪問</font>
Output Mode
- Append模式:顧名思義,既然是Append,那就意味著它每次都是添加新的行,那么也就是說:它適用且只適用于那些一旦產(chǎn)生計算結(jié)果便永遠不會去修改的情形, 所以它能保證每一行數(shù)據(jù)只被數(shù)據(jù)一次。只有select,where,map,flatMap,filter,join等的查詢將支持Append模式。
- Complete模式:整張結(jié)果表在每次觸發(fā)時都會全量輸出!這顯然是是要支撐那些針對數(shù)據(jù)全集進行的計算,例如:聚合
-
Update模式:某種意義上是和Append模式針鋒相對的一個種模式,它只輸出上次trigger之后,發(fā)生了“更新”的數(shù)據(jù)的,這包含新生的數(shù)據(jù)和行發(fā)生了變化的行。對于數(shù)據(jù)庫類型的sink來說,這是一種理想的模式。
持續(xù)計算
spark 2.3中引入一種能夠達到毫秒級低延遲的計算模式:持續(xù)計算。
兩種計算模式如下:默認(micro-batches)

micro-batch
spark一直以micro-batches來處理,spark streaming 計算引擎階段性地檢查數(shù)據(jù)流,然后批量處理數(shù)據(jù)。它的延遲最低100 ms。

在處理一批數(shù)據(jù)之前,先把這一批數(shù)據(jù)記錄的偏移量寫到whl日志中(write head log)(用于下一批數(shù)據(jù)查詢), 等到把偏移量保存完成后開始計算,這樣就產(chǎn)生了延遲。

- 首先,要設置trigger,然后根據(jù)這個生成mini-batch
- 然后,使用 sql-engine 將dataframe轉(zhuǎn)化成rdd (邏輯執(zhí)行計劃->物理執(zhí)行計劃)
- 從source獲取數(shù)據(jù),處理,并寫入sink
- 循環(huán)上面的步驟,執(zhí)行下一個micro-batch
- 各個batch之間,需要獲取state,更新并傳入下一個batch
在這種體系結(jié)構(gòu)中,driver將檢查點保存到WAL日志中,該日志可以用于重新啟動查詢。請注意,下一個micro-batch中要處理的范圍偏移量將在其開始之前保存在日志中,以便獲得確定性的重新執(zhí)行和end-to-end語義。因此,source上可用的記錄可能必須等待當前micro-batch處理完成,然后記錄其偏移量,并在下一個micro-batch中進行處理。
設定trigger的觸發(fā)時間為100ms,不斷的對source寫入數(shù)據(jù),可以發(fā)現(xiàn)前一個trigger觸發(fā)的數(shù)據(jù)批次計算job如果沒有處理完,后一個job不會啟動,不會并行的去執(zhí)行job。每個trigger觸發(fā)時會啟動一個新的job計算當前批次數(shù)據(jù)。
持續(xù)計算
而新引入的持續(xù)計算模式下,不是階段性的發(fā)起task,而是spark發(fā)起一個長期運行的long-running task,持續(xù)地讀、計算、寫。對于保存數(shù)據(jù)記錄的偏移量,相當于在數(shù)據(jù)流流入spark的時候上打標記,兩個標記之間叫 epoch,跟階段的意思差不多,task在遇到一個標記的時候會異步的保存這個偏移量,對于持續(xù)計算是沒有影響的。

最低延遲在1ms以下。(at least once,需要自己處理)

暫時這種模式支持的操作有限,它主要支持Map,F(xiàn)ilter和Project。 不支持聚合操作,連接,窗口等。
- 設置為continuous trigger (check-point的時間間隔)
- 使用 sql-engine 將dataframe轉(zhuǎn)化成rdd (只進行一次)
- 從source獲取數(shù)據(jù),處理,并寫入sink
- 各partition long running task, 執(zhí)行完數(shù)據(jù)后并不會結(jié)束,會繼續(xù)拉取數(shù)據(jù)進行處理。
現(xiàn)在還處于試驗階段,不支持生產(chǎn)。社區(qū)進度也較慢。只支持map filter等操作,不支持聚合,想要聚合的話,必須coalesce(1)到一個partition上才可以。
2.4版本特性
- 在complete/append模式下,支持Limit操作。 [spark-24662]
- foreachBatch api, 參數(shù)是rdd,所以可以使用rdd來做一些暫時不支持的操作。它的數(shù)據(jù)可以寫入多個location里面。對于continuous,可以有foreach操作。 [spark-24565]
- 批流是兼容check-point文件的,只需要修改trigger即可。這里修改了一個bug,在轉(zhuǎn)化的時候數(shù)據(jù)是錯的。 [spark-25399]
- 在多個流聚合的時候,之前是選擇最小的watermark,現(xiàn)在可以選擇最小的或者最大的。 [spark-24730]
雜項
stateful操作
如果相互批次之間的數(shù)據(jù)并沒有相互影響,叫做stateless操作。但是,譬如count操作,則需要在批次之間傳遞數(shù)據(jù)。
因為流處理結(jié)果是不斷增長的。因此,不斷增長的結(jié)果需要存儲在容錯的State Store中。
State Store的目的是提供一個可靠的地方,引擎可以從那里讀取Structured Streaming聚合的中間結(jié)果。因此,即使driver出現(xiàn)故障,Spark也能將狀態(tài)恢復到故障前的狀態(tài)。State Store是由類似于HDFS的分布式文件系統(tǒng)支持的。為了保證可恢復性,必須至少存儲兩個最近版本。例如,如果批次#10在處理過程中失敗,那么State Store可能具有批次#9和批次#10一半的狀態(tài)。Spark將從批次#9開始重新計算,因為批次#9是最后一個成功完成的批次。同時,State Store中還存在對于舊狀態(tài)的垃圾回收機制。
State Store處理兩類文件:delta文件和snapshot文件。delta文件包含每個查詢執(zhí)行結(jié)果的狀態(tài)表示。它是由給定executor中注冊的行更改提供的tmp delta file構(gòu)造的(State Store與partition相關,每個executor在一個hash map中存儲狀態(tài)數(shù)據(jù))。tmp delta file的名稱遵從“temp-{Random.nextLong}”模式。最后,在調(diào)用commit方法時,為新版本創(chuàng)建最終的delta文件,其名稱遵從“version.delta”模式。最后,多個delta文件合并到snapshot文件中,這些文件的名稱遵從“version.snapshot”模式。
StateStore 本身也帶了 maintainess 即維護模塊,會周期性的在后臺將過去的狀態(tài)和最近若干版本的流水 log 進行合并,并把合并后的結(jié)果重新寫回到 HDFS:old_snapshot + delta_a + delta_b + … => lastest_snapshot。

StateStore 模塊提供了 分片的、分版本的、可遷移的、高可用 key-value store。
基于這個 StateStore 模塊,StreamExecution 實現(xiàn)了 增量的 持續(xù)查詢、和很好的故障恢復以維護 end-to-end exactly-once guarantees。
Structured Streaming 在編程模型上暴露給用戶的是,每次持續(xù)查詢看做面對全量數(shù)據(jù)(而不僅僅是本次執(zhí)行信收到的數(shù)據(jù)),所以每次執(zhí)行的結(jié)果是針對全量數(shù)據(jù)進行計算的結(jié)果。但是在實際執(zhí)行過程中,由于全量數(shù)據(jù)會越攢越多,那么每次對全量數(shù)據(jù)進行計算的代價和消耗會越來越大。
Structured Streaming 的做法是:轉(zhuǎn)全量為增量,即在每次執(zhí)行時,先從 StateStore 里 restore 出上次執(zhí)行后的狀態(tài),然后加入本執(zhí)行的新數(shù)據(jù),再進行計算。如果有狀態(tài)改變,將把改變的狀態(tài)重新 save 到 StateStore 里。所以 Structured Streaming 在編程模型上暴露給用戶的是,每次持續(xù)查詢看做面對全量數(shù)據(jù),但在具體實現(xiàn)上轉(zhuǎn)換為增量的持續(xù)查詢。
在continuous模式下,spark通過維護一組long-running task集合來持續(xù)對數(shù)據(jù)進行read,process,write操作,這樣就避免了task的創(chuàng)建銷毀等操作,并且checkpoint的操作也優(yōu)化為異步,這樣就極大的減少了延遲。這種模型不再是用基于批去模擬流,而是基于事件流的思路。
那么問題來了,spark是怎么實現(xiàn)的呢,long-running還好說,直接在task的compute中寫循環(huán)即可,那么checkpoint怎么做呢,要知道spark基于checkpoint實現(xiàn)容錯,當一個批處理完后,spark會寫一些offset,snapshot到hdfs。而現(xiàn)在沒有批的概念。
沒有批就要創(chuàng)造批,流是批的超集,我們需要定義一種規(guī)則在流中劃分出一個個批即可。這里spark采用了Chandy-Lamport algorithm來做批的劃分,從而實現(xiàn)分布式checkpoint。其原理是這樣的,在每個task的數(shù)據(jù)流事件中注入epoch marker事件,在driver端做整體epoch的自增維護。當task處理到epoch marker事件后就通知driver,當driver發(fā)現(xiàn)收集到的epoch marker數(shù)量等同于source和sink的partition數(shù)量,那么就說明這一個epoch已經(jīng)完成,driver端就可以把一個epoch的數(shù)據(jù)看做是一個批,從而進行checkpoint記錄。
容錯
streamExecution 從source獲取offset,并使用這個去拿去數(shù)據(jù),處理后寫入sink,最后commit,并將結(jié)果存儲至hdfs。
micro-batch 會在每個partition 把state狀態(tài)保存到hdfs。每個batch會寫版本號。
如果出錯的時候,可以從文件中恢復數(shù)據(jù)。
由于 exectutor 節(jié)點的故障可由 Spark 框架本身很好的 handle,不引起可用性問題,現(xiàn)在只討論 driver 故障恢復。
如果在某個執(zhí)行過程中發(fā)生 driver 故障,那么重新起來的 StreamExecution:
讀取 WAL offsetlog 恢復出最新的 offsets 等
讀取 batchCommitLog 決定是否需要重做最近一個批次
這樣即可保證每次執(zhí)行的計算結(jié)果,在 sink 這個層面,是 不重不丟 的 —— 即使中間發(fā)生過 1 次或以上的失效和恢復。
一致性語義

micro-batch 模式可以提供 end-to-end 的 exactly-once 語義。原因是因為在 input 端和 output 端都做了很多工作來進行保證,比如 input 端 replayable + wal,output 端寫入冪等。
continuous mode 只能提供 at-least-once, 它犧牲了一次性語義以減少延遲。
micro-batch 基于偏移和提交日志,而 continuous 僅使用提交的日志。
假設提交日志中的最后 batch / epoch id是#2,它對應于偏移量(4,5,6)(= 3個Kafka分區(qū))。偏移日志中的最新值是(7,8,9),它們對應于批次/時期#3。
continuous 執(zhí)行讀取提交日志并查看最后一個紀元#2。然后它檢索與其對應的偏移量((4,5,6))。所以它會重新處理(4,5,6)。
micro-batch 執(zhí)行更多步驟。首先,它獲取最后的偏移日志((7,8,9))。接下來,它將它們與最后提交的偏移量((4,5,6))進行比較。由于兩者具有不同的批次ID,因此保留對應于最近批次ID的偏移量以進行處理。
應用舉例
- trigger once 設置為周期啟動的job(cron),會自動處理offset,所以可以不間斷的處理數(shù)據(jù)。
- 低時延的更新,可以通過streaming模式,從redis上更新到下游。
- 結(jié)合維表,可以使用 stream 和 batch 進行 join/union。
- binlog-> streaming -> delta (采集mysql數(shù)據(jù)倒入hive)
- 數(shù)據(jù)寫入多個表
