本文是《Streaming Systems》第二章的輸出,這章的內(nèi)容也基本來(lái)自 Streaming 102: The world beyond batch(當(dāng)時(shí)還叫做 DataFlow 模型,現(xiàn)在已經(jīng)統(tǒng)稱為 Beam 模型了),DataFlow 在當(dāng)時(shí)是唯一一個(gè)能體現(xiàn)這篇文章理論的引擎,不過(guò)在現(xiàn)在,當(dāng)時(shí)的那些概念(觀點(diǎn))在很多系統(tǒng)中都有了相應(yīng)的體現(xiàn)。
RoadMap
為了更好地理解 Streaming System 的內(nèi)容,作者提出了五個(gè)基本的概念,其中有兩個(gè)已經(jīng)在第一章介紹過(guò)了:event-time 和 process-time 區(qū)分和聯(lián)系以及 window:
- 對(duì) event-time 的理解非常重要,可以說(shuō)它是后面的基礎(chǔ),如果你比較關(guān)注正確性和 event 實(shí)際發(fā)生的真實(shí)時(shí)間,那么分析數(shù)據(jù)與其 event time 的關(guān)系就非常重要,這種情況下 process time 是沒有太大意義的;
- window 是處理無(wú)限流的通用方法。
除了這兩個(gè)概念,還有三個(gè)重要的概念:
- Trigger:類似于一個(gè)控制信號(hào),它靈活地控制一個(gè) window 什么時(shí)候進(jìn)行觸發(fā)來(lái)輸出結(jié)果;
- Watermark:是關(guān)于輸入 完整性 定義的一個(gè)概念(是針對(duì) event-time 時(shí)間域),時(shí)間為 X 的 watermark 表示 event time 小于 X 的數(shù)據(jù)都被接收到了;
- Accumulation:這個(gè)比較難解釋,針對(duì)一個(gè)時(shí)間段,同一個(gè) window,多次結(jié)果輸出之間的關(guān)系(或者說(shuō)是:后到的數(shù)據(jù)如何應(yīng)影響之前的結(jié)果)。
理解上面這些概念之間的關(guān)系并不難,這里會(huì)通過(guò)下面四個(gè)問(wèn)題,把前面這五個(gè)概念串一下,這四個(gè)問(wèn)題也是每個(gè) Unbounded data processing system 都會(huì)面臨的問(wèn)題:
- What results are calculated?:This question is answered by the types of transformations within the pipeline(這段話我也沒有太理解,我個(gè)人的理解是:對(duì)結(jié)果進(jìn)行計(jì)算的操作是什么,答案是 transformation,它是一種抽象,可以是求和、構(gòu)建直方圖或 AI 模型訓(xùn)練等);
- Where in event time are results calculated?:計(jì)算什么時(shí)間范圍內(nèi)的數(shù)據(jù)(這個(gè)時(shí)間指的是 event-time)?解決方法是 event-time window;
- When in processing time are results materialized?:什么時(shí)間輸出結(jié)果(這里的時(shí)間指的是 process time)?解決方法是 trigger+watermark;
- How do refinements of results relate?:后到的數(shù)據(jù)如何影響之前的結(jié)果,對(duì)于同一個(gè) window,這些輸出結(jié)果之間的關(guān)系是什么樣的?這就是 Accumulation 做的事情。
本章下面將對(duì)上面的問(wèn)題做更深入一些探討。
Batch Foundations:What and Where
先看下 batch processing,batch process engine 就可以回答 what 和 where 的問(wèn)題。
What: Transformations
本文中所涉及的例子都會(huì)以 Beam 偽代碼的形式給出,不過(guò)它們跟 Flink 和 Spark 都很相似。在 Beam 中有兩個(gè)基本的概念抽象:
- PCollections:它代表了在并發(fā) transform 中可以處理的數(shù)據(jù)集(possibly massive ones);
- PTransforms:它是應(yīng)用到 PCollections 上,來(lái)執(zhí)行的轉(zhuǎn)換操作,會(huì)生成新的 PCollections。PTransforms 可以是對(duì)元素一個(gè)一個(gè)操作,也可以是聚集(agg)操作,還可以是包含其他 PTransforms 的聚合操作,如下圖所示:

本章會(huì)用一個(gè)示例做講解,示例使用的數(shù)據(jù)源如下:
> SELECT * FROM UserScores ORDER BY EventTime;
------------------------------------------------
| Name | Team | Score | EventTime | ProcTime |
------------------------------------------------
| Julie | TeamX | 5 | 12:00:26 | 12:05:19 |
| Frank | TeamX | 9 | 12:01:26 | 12:08:19 |
| Ed | TeamX | 7 | 12:02:26 | 12:05:39 |
| Julie | TeamX | 8 | 12:03:06 | 12:07:06 |
| Amy | TeamX | 3 | 12:03:39 | 12:06:13 |
| Fred | TeamX | 4 | 12:04:19 | 12:06:39 |
| Naomi | TeamX | 3 | 12:06:39 | 12:07:19 |
| Becky | TeamX | 8 | 12:07:26 | 12:08:39 |
| Naomi | TeamX | 1 | 12:07:46 | 12:09:00 |
------------------------------------------------
該數(shù)據(jù)源的 event-time 及 process time 的關(guān)系如下:

這里我們是希望計(jì)算這個(gè)球隊(duì)的總分?jǐn)?shù)。
這里使用了一個(gè)名叫 input 的 PCollection<KV<Team, Integer>> 作為輸入( input 是由 Team/Integer 作為鍵/值對(duì)組成的,Team 是球隊(duì)名,Interger 是每人的分?jǐn)?shù)),對(duì)于 batch process,其代碼實(shí)現(xiàn)如下:
PCollection<String> raw = IO.read(...);
PCollection<KV<Team, Integer>> input = raw.apply(new ParseFn());
PCollection<KV<Team, Integer>> totals =
input.apply(Sum.integersPerKey());
執(zhí)行過(guò)程可以見:Classic batch processing(gif 動(dòng)圖,可以點(diǎn)擊查看)。
因?yàn)槭?batch pipeline,它直到接收到所有的輸入才會(huì)輸出計(jì)算結(jié)果,最后輸出的結(jié)果是48,從上面的圖中也可以看到:state 和 output 的矩形覆蓋了整個(gè) x 軸。如果我們要處理的數(shù)據(jù)源是 unbounded data,那么這種模式將無(wú)法 work,我們不可能等到所有輸入結(jié)束,這就是 window 要解決的問(wèn)題。
Where: Windowing
第一章已經(jīng)已經(jīng)介紹了,windowing 是沿著時(shí)間邊界對(duì)數(shù)據(jù)源進(jìn)行切片的過(guò)程,window 主要有以下三種:

window 在 Beam 中的使用非常簡(jiǎn)單,這里我們使用一個(gè) 2min 的 fixed window,其實(shí)現(xiàn)如下:
// Windowed summation code
PCollection<KV<Team, Integer>> totals = input
.apply(Window.into(FixedWindows.of(TWO_MINUTES)))
.apply(Sum.integersPerKey());
Beam 提供了一個(gè)在 batch 和 streaming 上統(tǒng)一的模型,在 batch engine 執(zhí)行上的代碼,其處理過(guò)程見:Windowed summation on a batch engine(gif 動(dòng)圖),與前面不同的是,這里切分的四個(gè)窗口上都有對(duì)應(yīng)的輸出。
Going Streamings: When and How
前面看到了 batch engine 下的 window 操作,但是在理想情況下,我們是希望延遲盡可能的低,并且希望能夠處理無(wú)限流的情況,這就是需要轉(zhuǎn)向 Streaming Engine,之前需要等待全部輸入完成再輸出結(jié)果的模式將無(wú)法接受。但是 Streaming Engine 也有一些問(wèn)題需要去解決,比如:結(jié)果什么時(shí)候輸出?數(shù)據(jù)的完整性怎么定義?這就需要 trigger 和 watermark 機(jī)制。
When: The Wonderful Thing Abort Triggers Are Wonderful Things!
Trigger declare when output for a window should happen in processing time.
上面這句話需要好好理解,盡管 trigger 本身可能是在 event-time 這個(gè)時(shí)間域上確定的,但是它最后的表現(xiàn)是:確定何時(shí)(指的是 process time)輸出一個(gè) window 的結(jié)果。雖然 trigger 可能有多種不同的語(yǔ)義,但從概念上講,通常是只有兩種類型的 trigger:
- Repeated update triggers:可以是每接收一條新數(shù)據(jù)就觸發(fā)一次,或者按 process time 周期性處理(比如:1min 一次),具體的設(shè)置取決于 latency 和 cost 之間的權(quán)衡;
- Completeness triggers:它代表一個(gè) window 內(nèi)的數(shù)據(jù)接收完成之后做一次觸發(fā),比如在批處理中,不過(guò)它是把整個(gè)批看做一個(gè) window。
Repeated update triggers 是 streaming system 中最常見的使用類型,也是最容易實(shí)現(xiàn)和理解的類型,提供的語(yǔ)義是 repeated updates to a materialized dataset(它適合的也是這種場(chǎng)景)。
Completeness triggers 使用得比較少,提供了一個(gè)跟典型批處理場(chǎng)景一致的 streaming 語(yǔ)義,它們也提供了一個(gè)工具處理晚到或丟失的數(shù)據(jù),這個(gè)在后面的 watermark 中間介紹。
首先看一個(gè) repeated update trigger 的例子,這里跟之前的代碼比添加的內(nèi)容是:每接收一條新的 record 就做一次 trigger,示例如下:
// Triggering repeatedly with every record
PCollection<KV<Team, Integer>> totals = input
.apply(Window.into(FixedWindows.of(TWO_MINUTES))
.triggering(Repeatedly(AfterCount(1))));
.apply(Sum.integersPerKey());
執(zhí)行過(guò)程見:Per-record triggering on a streaming engine,如果下游只是獲取最新的數(shù)據(jù),那么這種類型的 trigger 是可以滿足需求(它會(huì)把最新的結(jié)果寫到結(jié)果表中,下游每次獲取時(shí)拿到的都是最新的數(shù)據(jù)),而且隨著時(shí)間的推移結(jié)果將會(huì)變得更加準(zhǔn)確。
上面這種模式在應(yīng)對(duì)大規(guī)模的數(shù)據(jù)集時(shí),是非常消耗資源的,而根據(jù) process-time 周期性地輸出結(jié)果是更加適合大數(shù)據(jù)量場(chǎng)景,比如:按分鐘輸出。在 Beam 中這又分為兩種情況:
- aligned delays:delay 的時(shí)間點(diǎn)完全是根據(jù) process-time 切分的;
- unaligned delays:delay 是跟該窗口觀察到數(shù)據(jù)的 process-time 相關(guān)的。
這里看下第一種情況,代碼實(shí)現(xiàn)如下:
// Triggering on aligned two-minute processing-time boundaries
PCollection<KV<Team, Integer>> totals = input
.apply(Window.into(FixedWindows.of(TWO_MINUTES))
.triggering(Repeatedly(AlignedDelay(TWO_MINUTES)))
.apply(Sum.integersPerKey());
它的執(zhí)行過(guò)程見:Two-minute aligned delay triggers (i.e., microbatching)(動(dòng)圖),最后的結(jié)果如下圖(從動(dòng)圖中截取的):

按照 process-time 每 2min 就觸發(fā)一次結(jié)果輸出,這種 aligned delays 的優(yōu)點(diǎn)是:有點(diǎn)像 spark streaming,它的輸出時(shí)間點(diǎn)是可預(yù)測(cè)的,我們會(huì)同時(shí)得到所有 window 的更新。但是缺點(diǎn)也很明顯,所有的更新同時(shí)觸發(fā),這會(huì)導(dǎo)致負(fù)載不均衡,峰值的負(fù)載非常高。
因此,有了另一個(gè)可替代的方案:unaligned delays,示例如下:
PCollection<KV<Team, Integer>> totals = input
.apply(Window.into(FixedWindows.of(TWO_MINUTES))
.triggering(Repeatedly(UnalignedDelay(TWO_MINUTES))
.apply(Sum.integersPerKey());
其執(zhí)行過(guò)程見:Two-minute unaligned delay triggers(動(dòng)圖),截取了動(dòng)圖中最后的一個(gè)結(jié)果,如下圖所示:

這里也是每 2min 就觸發(fā)一次結(jié)果輸出,但它初始時(shí)間是由該 window 接收的第一條數(shù)據(jù)的時(shí)間決定,因?yàn)檩斎霐?shù)據(jù)在時(shí)間分布上是打散的,最后會(huì)使得觸發(fā)的時(shí)間點(diǎn)相對(duì)來(lái)說(shuō)比較均衡。關(guān)于延遲的話,這兩者延遲的差異可能為正也可能為負(fù),最后平均下來(lái)應(yīng)該是差不多的。對(duì)于大規(guī)模數(shù)據(jù)集場(chǎng)景,這種模式顯然是更加適合的。
最后,總結(jié)一下,repeated update trigger 適合這樣的場(chǎng)景:只是簡(jiǎn)單地、周期性地隨時(shí)間更新結(jié)果輸出;對(duì)【最終會(huì)不斷接近準(zhǔn)確性,但什么時(shí)間達(dá)到準(zhǔn)確性是不知道的】這種特性是可以接受的。由于分布式系統(tǒng)的一些特性,這會(huì)導(dǎo)致 process-time 和 event-time 之間的延遲無(wú)法預(yù)測(cè),我們也很難推理出什么時(shí)候當(dāng)前的輸出是由一個(gè)完整的、正確的輸入計(jì)算得到的,我們需要有一種方法去推理【完整性】而不是盲目相信輸出的結(jié)果。
When: Watermarks
Watermarks are a supporting aspect of the answer to the question: “When in processing time are results materialized?” Watermarks are temporal notions of input completeness in the event-time domain.
Watermark 要解決的問(wèn)題如上面的引用所示,這也是關(guān)于 watermark 的解釋。
我們可以把 watermark 當(dāng)作一個(gè)函數(shù):輸入一個(gè) process time,它返回一個(gè) event time。假設(shè)輸出的 event-time 是 E,它表示所有 event time 小于 E 的輸入都已經(jīng)接收到了,也就是說(shuō),event time 小于 E 的數(shù)據(jù)將會(huì)被認(rèn)為不會(huì)再接收到。根據(jù) watermark 的類型,分為以下兩種:
- perfect watermark: 在這種情況下,我們知道所有輸入數(shù)據(jù)的情況,因此也就不會(huì)有任何的數(shù)據(jù)延遲;
- heuristic watermark:對(duì)于分布式的數(shù)據(jù)源,了解所有輸入數(shù)據(jù)的情況是不現(xiàn)實(shí)的,heuristic watermark 利用數(shù)據(jù)源的已有情況(partition、ordering、文章增長(zhǎng)率等)盡可能地估計(jì)其進(jìn)度,它也意味著有時(shí)候輸出的結(jié)果是錯(cuò)誤的,因?yàn)闀?huì)有 late data。
Watermark 提供了一個(gè)與輸入相關(guān)的 Completeness 概念,watermark 也成為構(gòu)建了前面介紹的 completeness trigger 的基礎(chǔ)。這里有一個(gè)示例:
// Watermark completeness trigger
PCollection<KV<Team, Integer>> totals = input
.apply(Window.into(FixedWindows.of(TWO_MINUTES))
.triggering(AfterWatermark()))
.apply(Sum.integersPerKey());
Perfect watermark 是一個(gè)理想情況,在現(xiàn)實(shí)中,達(dá)到 perfect watermark 的代價(jià)是非常大的,所以我們一般會(huì)選擇一個(gè) heuristic watermark。當(dāng)然,無(wú)論哪種情況,當(dāng) watermark 到達(dá) window 的結(jié)尾時(shí),當(dāng)前 window 的結(jié)果就可以輸出了(認(rèn)為該窗口的所有數(shù)據(jù)都已經(jīng)接收到了,可以做進(jìn)一步的處理),正如上面的例子所示,但是兩種類型的 watermark 最后的計(jì)算結(jié)果是不一樣的(執(zhí)行過(guò)程見:Windowed summation on a streaming engine with perfect (left) and heuristic (right) watermarks,動(dòng)圖),程序執(zhí)行結(jié)束時(shí)的情況如下圖所示:

在右邊的結(jié)果中,9 成了 late data,被丟棄掉,也導(dǎo)致了最后的結(jié)果是不準(zhǔn)確的。而且在實(shí)際中,基于 event-time 的 watermark 的選擇本來(lái)就是一個(gè)難題,如果按照 process-time 去 delay 一段時(shí)間,也同樣會(huì)導(dǎo)致這個(gè)問(wèn)題。關(guān)于 watermark 有兩個(gè)典型的問(wèn)題:
- Too Slow:如上面圖中左邊所示,watermark 需要等待的時(shí)間會(huì)非常長(zhǎng),如果 watermark 由于未到的數(shù)據(jù)而延遲,那么它會(huì)導(dǎo)致最后輸出結(jié)果的延遲;
- Too Fast:如果 watermark 發(fā)出的比較早,可能會(huì)導(dǎo)致 event-time 小于 watermark 的數(shù)據(jù)(輸入時(shí)有延遲)成為 late data,從而導(dǎo)致結(jié)果的不準(zhǔn)確性,如上面圖中右邊所示,對(duì)于比較關(guān)心正確性場(chǎng)景,僅僅依賴 watermark 來(lái)確定什么時(shí)候輸出遠(yuǎn)遠(yuǎn)是不夠的(也就是用戶需要按照自己的策略處理 late data)。
這里對(duì)完整性的概念做了進(jìn)一步的講述,但是對(duì)于無(wú)限、無(wú)序的數(shù)據(jù)源來(lái)說(shuō),僅僅做到完整性是不夠的,因?yàn)椴豢赡軆H僅依賴完整性就能達(dá)到低延遲和完整性輸出的目標(biāo)。如果想要同時(shí)做到兩者應(yīng)該怎么做呢?到這里我們清楚的是:
- repeated update trigger:提供低延遲的 update,但無(wú)法解決完整性的問(wèn)題;
- watermark:提供完整性的概念,但通常伴隨著高延遲。
為什么不把兩者結(jié)合起來(lái)呢?
When: Early/On-Time/Late Triggers FTW
前面已經(jīng)介紹了兩種類型的 trigger:repeated update triggers 和 completeness/watermark triggers,在實(shí)際中,兩者單獨(dú)使用是不夠的,通常是結(jié)合在一起使用的。Beam 提供了一個(gè)標(biāo)準(zhǔn) watermark trigger 的擴(kuò)展,它可以在 watermark 的任何一邊支持 repeated update watermark。這里把這種復(fù)雜的 trigger 分為了以下三種情況:
| 類型 | 說(shuō)明 | 解釋 | 特點(diǎn) |
|---|---|---|---|
| Zero or more early panes | which are the result of a repeated update trigger that periodically fires up until the watermark passes the end of the window | 在 watermark 達(dá)到 window 的結(jié)尾處(end)前會(huì)不斷觸發(fā)(repeated update trigger),它意味著在這之后將不會(huì)再觸發(fā),所以是【early pane】 | 彌補(bǔ)了 watermark too slow 的問(wèn)題 |
| A single on-time pane | which is the result of the completeness/watermark trigger firing after the watermark passes the end of the window | 在 watermark 到達(dá) window 結(jié)尾處進(jìn)行觸發(fā)(它是 Completeness/watermark trigger 觸發(fā)的結(jié)果) | 這個(gè)觸發(fā)是比較特殊的,因?yàn)樗隽思俣ǎ合到y(tǒng)現(xiàn)在認(rèn)為這個(gè) window的輸入是已經(jīng)完成的 |
| Zero or more late panes | which are the result of another (possibly different) repeated update trigger that periodically fires any time late data arrive after the watermark has passed the end of the window | 它是另一種類型的 repeated update trigger,在 watermark 到達(dá) window 結(jié)尾處再觸發(fā),對(duì)于 perfect watermark,它觸發(fā)0次;但是對(duì)于 heuristic watermark,任何晚到的數(shù)據(jù)都會(huì)導(dǎo)致一個(gè) late firing。 | 它彌補(bǔ)了 watermark too fast 的不足。 |
這里以一個(gè)示例來(lái)說(shuō)明,下面的示例添加兩種類型的 trigger:一個(gè)是每分鐘周期性的 early firing,另一個(gè)是每條數(shù)據(jù)來(lái)就觸發(fā)的 late firing,實(shí)現(xiàn)如下所示:
// Early, on-time, and late firings via the early/on-time/late API
PCollection<KV<Team, Integer>> totals = input
.apply(Window.into(FixedWindows.of(TWO_MINUTES))
.triggering(AfterWatermark()
.withEarlyFirings(AlignedDelay(ONE_MINUTE))
.withLateFirings(AfterCount(1))))
.apply(Sum.integersPerKey());
執(zhí)行過(guò)程見:Windowed summation on a streaming engine with early, on-time, and late firings(動(dòng)圖),在動(dòng)圖中最后截取的一張圖如下所示:

無(wú)論是哪種的 watermark,隨著時(shí)間推移都有了很大的改進(jìn),相對(duì)來(lái)說(shuō),也減少了相應(yīng)的延遲。但這里還有一個(gè)問(wèn)題——window 的生命周期,每個(gè) window 都是維護(hù)一定狀態(tài)信息的,這是需要占用一定資源,在很多情況下,該窗口是不能一直存活的,對(duì)于 perfect watermark 是沒有問(wèn)題,但是對(duì)于 heuristic watermark 就不是那么容易了,到現(xiàn)在為止,我們還沒有一個(gè)很好的方法去評(píng)估一個(gè)窗口應(yīng)該保留的周期。
When: Allowed Lateness
Any real-world out-of-order processing system needs to provide some way to bound the lifetimes of the windows it’s processing. A clean and concise way of doing this is by defining a horizon on the allowed lateness within the system; that is, placing a bound on how late any given record may be (relative to the watermark) for the system to bother processing it; any data that arrives after this horizon are simply dropped.
需要給 window 設(shè)置生命周期的原因在前面已經(jīng)講述過(guò)了,每個(gè) window 的 state 是不可能無(wú)限期保留的,一個(gè)是存儲(chǔ)資源不允許,另一個(gè)是舊的數(shù)據(jù)隨著時(shí)間其價(jià)值也會(huì)下降。
任何處理真實(shí)、亂序數(shù)據(jù)的處理系統(tǒng),都需要提供一個(gè)方法來(lái)定義處理 window 的生命周期邊界,一個(gè)簡(jiǎn)單的方法是:對(duì)于這個(gè)系允許延遲的時(shí)間點(diǎn)上定義一個(gè)界限(horizon)。超過(guò)這個(gè)界限的數(shù)據(jù)將會(huì)被丟棄,這相當(dāng)于也限制了 window 狀態(tài)的保留時(shí)間(watermark 超過(guò) the end of window 后再超過(guò)自定義的時(shí)間后,window 就可以徹底關(guān)閉了,相關(guān)狀態(tài)也會(huì)被清除),有了這個(gè)之后,系統(tǒng)就不會(huì)
為不關(guān)心的數(shù)據(jù)(延遲太多,以及沒有意義的數(shù)據(jù))浪費(fèi)資源。
在 event-time 時(shí)間域上指定一個(gè) horizon 雖然看起來(lái)有些奇怪,但它卻是目前可行方案中最好的一個(gè),它可以降低 window 錯(cuò)過(guò)處理晚到數(shù)據(jù)的機(jī)會(huì)。如果使用 process-time,中間出現(xiàn)了 crash(導(dǎo)致延遲了幾分鐘),可能會(huì)導(dǎo)致這些應(yīng)該處理的數(shù)據(jù)變成了 late data 而沒有及時(shí)去處理。
這里的 watermark,指的是 low watermark,它試圖找到系統(tǒng)中已知的、最老的未處理 record 的 event-time。不管 event-time skew 如何變化,low watermark 總是去尋找系統(tǒng)中已知的、最舊的未處理 event 的 event-time。
關(guān)于 low watermark,可以通過(guò)這個(gè)例子理解:對(duì)于 event-time 為 12:00 的那批數(shù)據(jù),可能分布在 process-time 上的多個(gè)位置,假設(shè) process-time 是在 12:00~12:10之間,假設(shè)現(xiàn)在 process-time 為12:10,當(dāng)前已知的最舊的數(shù)據(jù)的 event-time 為 12:00,那么這個(gè)時(shí)間點(diǎn)的 low watermark 就是 12:00;
并不是所有系統(tǒng)的 watermark 都是 low watermark,比如 structured streaming 里的 watermark 指的是 high watermark,它表示系統(tǒng)已知的最新 record 的 event time。在處理延遲時(shí),系統(tǒng)可以回收那些比 high watermark 老得超過(guò)一個(gè)自定義閾值的 window。它確定了 event-time skew 的最大值,超過(guò)這個(gè) skew window 的數(shù)據(jù)都會(huì)被丟棄。
關(guān)于high watermark,這里也通過(guò)一個(gè)例子來(lái)理解:假設(shè)現(xiàn)在 process-time 為12:10,當(dāng)前已知的最新的數(shù)據(jù)的 event-time 為 12:10,那么這個(gè)時(shí)間點(diǎn)的 high watermark 就是 12:10(可能 low watermark 是 12:00,如前面 low watermark 中的例子)。
「allowed lateness」和「watermark」之間的關(guān)系有些微妙,這里讓我們?cè)倏匆粋€(gè)示例,與前面相比,這里又增加了一個(gè) 1min 的 lateness horizon:
// Early/on-time/late firings with allowed lateness
PCollection<KV<Team, Integer>> totals = input
.apply(Window.into(FixedWindows.of(TWO_MINUTES))
.triggering(
AfterWatermark()
.withEarlyFirings(AlignedDelay(ONE_MINUTE))
.withLateFirings(AfterCount(1)))
.withAllowedLateness(ONE_MINUTE))
.apply(Sum.integersPerKey());
處理過(guò)程見:Allowed lateness with early/on-time/late firings(動(dòng)圖),下面是最后面得一張截圖:

當(dāng) watermark 到達(dá)一個(gè) window 的 lateness horizon 時(shí),比如:對(duì)于 event-time 為 [12:00~12:02] 的 window,其 lateness horizon 的 even-time 為12:03(圖中的虛線),也就是說(shuō),當(dāng) event-time 超過(guò) 12:03 時(shí),這個(gè) window 就徹底關(guān)閉了,該 window 的狀態(tài)將不會(huì)再維護(hù)了。
- 如果 watermark 是 perfect watermark,那么這里將 lateness horizon 設(shè)置為0就是最佳的;
- 在指定 lateness horizon 的規(guī)則時(shí),有一個(gè)需要注意的是:對(duì)于那種需要在隨著時(shí)間統(tǒng)計(jì)全局?jǐn)?shù)據(jù)的例子(比如:統(tǒng)計(jì)網(wǎng)站隨著時(shí)間變化的總訪問(wèn)次數(shù)),這種情況下這個(gè) window 是跟鍵值綁定在一起的,只要這個(gè)鍵值的數(shù)量控制在一定范圍內(nèi),就沒有必要通過(guò) lateness horizon 去限制 window 的生命周期。
How: Accumulation
When triggers are used to produce multiple panes for a single window over time, we find ourselves confronted with the last question: “How do refinements of results relate?”
在前面的例子中,每個(gè) pane 的新數(shù)據(jù)都建立在緊鄰的前一個(gè) pane 的數(shù)據(jù)之上。但是,實(shí)際上是有三種不同的更新模式:
| 類型 | 說(shuō)明 | 適用場(chǎng)景 |
|---|---|---|
| Discarding | 一旦一個(gè) pane 是物化之后,那么存儲(chǔ)的 state 將會(huì)被丟棄,會(huì)生成一個(gè)新的 pane,它是不依賴之前的 state | 它適用這種場(chǎng)景,比如:如果下游 consumer 是自己做一些聚合操作,由下游自己去做相應(yīng)的處理,比如:求和操作 |
| Accumulating | 之前的 state 會(huì)持續(xù)保留,未來(lái)的輸入會(huì)反映到已經(jīng)存在的 state 上,比如:最后結(jié)果更新到 k/v 存儲(chǔ)上,像 HBase 或 Bigtable | 適合這種場(chǎng)景:使用新值直接覆蓋之前的結(jié)果,新值是包含目前看到的所有數(shù)據(jù) |
| Accumulating and retracting | 不但會(huì) accumulate,還會(huì)修改之前錯(cuò)誤的結(jié)果(之前的錯(cuò)誤結(jié)果可能已經(jīng)對(duì)下游產(chǎn)生了影響,比如 group 操作修改前后可能去到不同的分組、對(duì)于動(dòng)態(tài)窗口,舊值可能會(huì)涉及多個(gè) window) | 這種機(jī)制最復(fù)雜,但功能也是最強(qiáng)大的 |
這里來(lái)看一個(gè)使用 Discarding 的示例,這種模式下,沒有一個(gè) window 是重疊的,如果當(dāng)前的 window 有新的數(shù)據(jù)來(lái),即使這個(gè) window 之前已經(jīng)關(guān)閉了,這里還會(huì)再新建一個(gè)窗口,將新到來(lái)的數(shù)據(jù)放到這個(gè)窗口里,發(fā)送到下游,對(duì)于這個(gè)時(shí)間點(diǎn)的 window 來(lái)說(shuō),它們都是獨(dú)立存在的。
// Discarding mode version of early/on-time/late firings
PCollection<KV<Team, Integer>> totals = input
.apply(Window.into(FixedWindows.of(TWO_MINUTES))
.triggering(
AfterWatermark()
.withEarlyFirings(AlignedDelay(ONE_MINUTE))
.withLateFirings(AtCount(1)))
.discardingFiredPanes())
.apply(Sum.integersPerKey());
處理過(guò)程見:Discarding mode version of early/on-time/late firings on a streaming engine。
這里看下 Accumulating and retracting 模式,其處理過(guò)程見:Accumulating and retracting mode version of early/late firings on a streaming engine。
// Accumulating and retracting mode version of early/on-time/late firings
PCollection<KV<Team, Integer>> totals = input
.apply(Window.into(FixedWindows.of(TWO_MINUTES))
.triggering(
AfterWatermark()
.withEarlyFirings(AlignedDelay(ONE_MINUTE))
.withLateFirings(AtCount(1)))
.accumulatingAndRetractingFiredPanes())
.apply(Sum.integersPerKey());
這種把這三種模式列出來(lái)做一個(gè)對(duì)比,如下圖所示,它們?nèi)甙错樞蚱浯鎯?chǔ)和計(jì)算成本是逐漸變高的。具體應(yīng)該選擇哪種模式,是需要在正確性、latency 和 cost 之間做一個(gè) trade-off。

Summary

參考:
- 《Streaming Systems》,本文的圖片和內(nèi)容都來(lái)自這本書的第二章。