第二章 The What, Where, When, and How of Data Processing

本文是《Streaming Systems》第二章的輸出,這章的內(nèi)容也基本來(lái)自 Streaming 102: The world beyond batch(當(dāng)時(shí)還叫做 DataFlow 模型,現(xiàn)在已經(jīng)統(tǒng)稱為 Beam 模型了),DataFlow 在當(dāng)時(shí)是唯一一個(gè)能體現(xiàn)這篇文章理論的引擎,不過(guò)在現(xiàn)在,當(dāng)時(shí)的那些概念(觀點(diǎn))在很多系統(tǒng)中都有了相應(yīng)的體現(xiàn)。

RoadMap

為了更好地理解 Streaming System 的內(nèi)容,作者提出了五個(gè)基本的概念,其中有兩個(gè)已經(jīng)在第一章介紹過(guò)了:event-time 和 process-time 區(qū)分和聯(lián)系以及 window:

  1. 對(duì) event-time 的理解非常重要,可以說(shuō)它是后面的基礎(chǔ),如果你比較關(guān)注正確性和 event 實(shí)際發(fā)生的真實(shí)時(shí)間,那么分析數(shù)據(jù)與其 event time 的關(guān)系就非常重要,這種情況下 process time 是沒有太大意義的;
  2. window 是處理無(wú)限流的通用方法。

除了這兩個(gè)概念,還有三個(gè)重要的概念:

  1. Trigger:類似于一個(gè)控制信號(hào),它靈活地控制一個(gè) window 什么時(shí)候進(jìn)行觸發(fā)來(lái)輸出結(jié)果;
  2. Watermark:是關(guān)于輸入 完整性 定義的一個(gè)概念(是針對(duì) event-time 時(shí)間域),時(shí)間為 X 的 watermark 表示 event time 小于 X 的數(shù)據(jù)都被接收到了;
  3. Accumulation:這個(gè)比較難解釋,針對(duì)一個(gè)時(shí)間段,同一個(gè) window,多次結(jié)果輸出之間的關(guān)系(或者說(shuō)是:后到的數(shù)據(jù)如何應(yīng)影響之前的結(jié)果)。

理解上面這些概念之間的關(guān)系并不難,這里會(huì)通過(guò)下面四個(gè)問(wèn)題,把前面這五個(gè)概念串一下,這四個(gè)問(wèn)題也是每個(gè) Unbounded data processing system 都會(huì)面臨的問(wèn)題:

  1. What results are calculated?:This question is answered by the types of transformations within the pipeline(這段話我也沒有太理解,我個(gè)人的理解是:對(duì)結(jié)果進(jìn)行計(jì)算的操作是什么,答案是 transformation,它是一種抽象,可以是求和、構(gòu)建直方圖或 AI 模型訓(xùn)練等);
  2. Where in event time are results calculated?:計(jì)算什么時(shí)間范圍內(nèi)的數(shù)據(jù)(這個(gè)時(shí)間指的是 event-time)?解決方法是 event-time window;
  3. When in processing time are results materialized?:什么時(shí)間輸出結(jié)果(這里的時(shí)間指的是 process time)?解決方法是 trigger+watermark;
  4. How do refinements of results relate?:后到的數(shù)據(jù)如何影響之前的結(jié)果,對(duì)于同一個(gè) window,這些輸出結(jié)果之間的關(guān)系是什么樣的?這就是 Accumulation 做的事情。

本章下面將對(duì)上面的問(wèn)題做更深入一些探討。

Batch Foundations:What and Where

先看下 batch processing,batch process engine 就可以回答 what 和 where 的問(wèn)題。

What: Transformations

本文中所涉及的例子都會(huì)以 Beam 偽代碼的形式給出,不過(guò)它們跟 Flink 和 Spark 都很相似。在 Beam 中有兩個(gè)基本的概念抽象:

  1. PCollections:它代表了在并發(fā) transform 中可以處理的數(shù)據(jù)集(possibly massive ones);
  2. PTransforms:它是應(yīng)用到 PCollections 上,來(lái)執(zhí)行的轉(zhuǎn)換操作,會(huì)生成新的 PCollections。PTransforms 可以是對(duì)元素一個(gè)一個(gè)操作,也可以是聚集(agg)操作,還可以是包含其他 PTransforms 的聚合操作,如下圖所示:
Types of transformations

本章會(huì)用一個(gè)示例做講解,示例使用的數(shù)據(jù)源如下:

> SELECT * FROM UserScores ORDER BY EventTime;
------------------------------------------------
| Name  | Team  | Score | EventTime | ProcTime |
------------------------------------------------
| Julie | TeamX |     5 |  12:00:26 | 12:05:19 |
| Frank | TeamX |     9 |  12:01:26 | 12:08:19 |
| Ed    | TeamX |     7 |  12:02:26 | 12:05:39 |
| Julie | TeamX |     8 |  12:03:06 | 12:07:06 |
| Amy   | TeamX |     3 |  12:03:39 | 12:06:13 |
| Fred  | TeamX |     4 |  12:04:19 | 12:06:39 |
| Naomi | TeamX |     3 |  12:06:39 | 12:07:19 |
| Becky | TeamX |     8 |  12:07:26 | 12:08:39 |
| Naomi | TeamX |     1 |  12:07:46 | 12:09:00 |
------------------------------------------------

該數(shù)據(jù)源的 event-time 及 process time 的關(guān)系如下:

Nine input records, plotted in both event time and processing time

這里我們是希望計(jì)算這個(gè)球隊(duì)的總分?jǐn)?shù)。

這里使用了一個(gè)名叫 input 的 PCollection<KV<Team, Integer>> 作為輸入( input 是由 Team/Integer 作為鍵/值對(duì)組成的,Team 是球隊(duì)名,Interger 是每人的分?jǐn)?shù)),對(duì)于 batch process,其代碼實(shí)現(xiàn)如下:

PCollection<String> raw = IO.read(...);
PCollection<KV<Team, Integer>> input = raw.apply(new ParseFn());
PCollection<KV<Team, Integer>> totals =
  input.apply(Sum.integersPerKey());

執(zhí)行過(guò)程可以見:Classic batch processing(gif 動(dòng)圖,可以點(diǎn)擊查看)。

因?yàn)槭?batch pipeline,它直到接收到所有的輸入才會(huì)輸出計(jì)算結(jié)果,最后輸出的結(jié)果是48,從上面的圖中也可以看到:state 和 output 的矩形覆蓋了整個(gè) x 軸。如果我們要處理的數(shù)據(jù)源是 unbounded data,那么這種模式將無(wú)法 work,我們不可能等到所有輸入結(jié)束,這就是 window 要解決的問(wèn)題。

Where: Windowing

第一章已經(jīng)已經(jīng)介紹了,windowing 是沿著時(shí)間邊界對(duì)數(shù)據(jù)源進(jìn)行切片的過(guò)程,window 主要有以下三種:

Example windowing strategies

window 在 Beam 中的使用非常簡(jiǎn)單,這里我們使用一個(gè) 2min 的 fixed window,其實(shí)現(xiàn)如下:

// Windowed summation code
PCollection<KV<Team, Integer>> totals = input
  .apply(Window.into(FixedWindows.of(TWO_MINUTES)))
  .apply(Sum.integersPerKey());

Beam 提供了一個(gè)在 batch 和 streaming 上統(tǒng)一的模型,在 batch engine 執(zhí)行上的代碼,其處理過(guò)程見:Windowed summation on a batch engine(gif 動(dòng)圖),與前面不同的是,這里切分的四個(gè)窗口上都有對(duì)應(yīng)的輸出。

Going Streamings: When and How

前面看到了 batch engine 下的 window 操作,但是在理想情況下,我們是希望延遲盡可能的低,并且希望能夠處理無(wú)限流的情況,這就是需要轉(zhuǎn)向 Streaming Engine,之前需要等待全部輸入完成再輸出結(jié)果的模式將無(wú)法接受。但是 Streaming Engine 也有一些問(wèn)題需要去解決,比如:結(jié)果什么時(shí)候輸出?數(shù)據(jù)的完整性怎么定義?這就需要 trigger 和 watermark 機(jī)制。

When: The Wonderful Thing Abort Triggers Are Wonderful Things!

Trigger declare when output for a window should happen in processing time.

上面這句話需要好好理解,盡管 trigger 本身可能是在 event-time 這個(gè)時(shí)間域上確定的,但是它最后的表現(xiàn)是:確定何時(shí)(指的是 process time)輸出一個(gè) window 的結(jié)果。雖然 trigger 可能有多種不同的語(yǔ)義,但從概念上講,通常是只有兩種類型的 trigger:

  1. Repeated update triggers:可以是每接收一條新數(shù)據(jù)就觸發(fā)一次,或者按 process time 周期性處理(比如:1min 一次),具體的設(shè)置取決于 latency 和 cost 之間的權(quán)衡;
  2. Completeness triggers:它代表一個(gè) window 內(nèi)的數(shù)據(jù)接收完成之后做一次觸發(fā),比如在批處理中,不過(guò)它是把整個(gè)批看做一個(gè) window。

Repeated update triggers 是 streaming system 中最常見的使用類型,也是最容易實(shí)現(xiàn)和理解的類型,提供的語(yǔ)義是 repeated updates to a materialized dataset(它適合的也是這種場(chǎng)景)。

Completeness triggers 使用得比較少,提供了一個(gè)跟典型批處理場(chǎng)景一致的 streaming 語(yǔ)義,它們也提供了一個(gè)工具處理晚到或丟失的數(shù)據(jù),這個(gè)在后面的 watermark 中間介紹。

首先看一個(gè) repeated update trigger 的例子,這里跟之前的代碼比添加的內(nèi)容是:每接收一條新的 record 就做一次 trigger,示例如下:

// Triggering repeatedly with every record
PCollection<KV<Team, Integer>> totals = input
  .apply(Window.into(FixedWindows.of(TWO_MINUTES))
                .triggering(Repeatedly(AfterCount(1))));
  .apply(Sum.integersPerKey());

執(zhí)行過(guò)程見:Per-record triggering on a streaming engine,如果下游只是獲取最新的數(shù)據(jù),那么這種類型的 trigger 是可以滿足需求(它會(huì)把最新的結(jié)果寫到結(jié)果表中,下游每次獲取時(shí)拿到的都是最新的數(shù)據(jù)),而且隨著時(shí)間的推移結(jié)果將會(huì)變得更加準(zhǔn)確。

上面這種模式在應(yīng)對(duì)大規(guī)模的數(shù)據(jù)集時(shí),是非常消耗資源的,而根據(jù) process-time 周期性地輸出結(jié)果是更加適合大數(shù)據(jù)量場(chǎng)景,比如:按分鐘輸出。在 Beam 中這又分為兩種情況:

  1. aligned delays:delay 的時(shí)間點(diǎn)完全是根據(jù) process-time 切分的;
  2. unaligned delays:delay 是跟該窗口觀察到數(shù)據(jù)的 process-time 相關(guān)的。

這里看下第一種情況,代碼實(shí)現(xiàn)如下:

//  Triggering on aligned two-minute processing-time boundaries
PCollection<KV<Team, Integer>> totals = input
  .apply(Window.into(FixedWindows.of(TWO_MINUTES))
               .triggering(Repeatedly(AlignedDelay(TWO_MINUTES)))
  .apply(Sum.integersPerKey());

它的執(zhí)行過(guò)程見:Two-minute aligned delay triggers (i.e., microbatching)(動(dòng)圖),最后的結(jié)果如下圖(從動(dòng)圖中截取的):

Two-minute aligned delay triggers

按照 process-time 每 2min 就觸發(fā)一次結(jié)果輸出,這種 aligned delays 的優(yōu)點(diǎn)是:有點(diǎn)像 spark streaming,它的輸出時(shí)間點(diǎn)是可預(yù)測(cè)的,我們會(huì)同時(shí)得到所有 window 的更新。但是缺點(diǎn)也很明顯,所有的更新同時(shí)觸發(fā),這會(huì)導(dǎo)致負(fù)載不均衡,峰值的負(fù)載非常高。

因此,有了另一個(gè)可替代的方案:unaligned delays,示例如下:

PCollection<KV<Team, Integer>> totals = input
  .apply(Window.into(FixedWindows.of(TWO_MINUTES))
               .triggering(Repeatedly(UnalignedDelay(TWO_MINUTES))
  .apply(Sum.integersPerKey());

其執(zhí)行過(guò)程見:Two-minute unaligned delay triggers(動(dòng)圖),截取了動(dòng)圖中最后的一個(gè)結(jié)果,如下圖所示:

Two-minute unaligned delay triggers

這里也是每 2min 就觸發(fā)一次結(jié)果輸出,但它初始時(shí)間是由該 window 接收的第一條數(shù)據(jù)的時(shí)間決定,因?yàn)檩斎霐?shù)據(jù)在時(shí)間分布上是打散的,最后會(huì)使得觸發(fā)的時(shí)間點(diǎn)相對(duì)來(lái)說(shuō)比較均衡。關(guān)于延遲的話,這兩者延遲的差異可能為正也可能為負(fù),最后平均下來(lái)應(yīng)該是差不多的。對(duì)于大規(guī)模數(shù)據(jù)集場(chǎng)景,這種模式顯然是更加適合的。

最后,總結(jié)一下,repeated update trigger 適合這樣的場(chǎng)景:只是簡(jiǎn)單地、周期性地隨時(shí)間更新結(jié)果輸出;對(duì)【最終會(huì)不斷接近準(zhǔn)確性,但什么時(shí)間達(dá)到準(zhǔn)確性是不知道的】這種特性是可以接受的。由于分布式系統(tǒng)的一些特性,這會(huì)導(dǎo)致 process-time 和 event-time 之間的延遲無(wú)法預(yù)測(cè),我們也很難推理出什么時(shí)候當(dāng)前的輸出是由一個(gè)完整的、正確的輸入計(jì)算得到的,我們需要有一種方法去推理【完整性】而不是盲目相信輸出的結(jié)果。

When: Watermarks

Watermarks are a supporting aspect of the answer to the question: “When in processing time are results materialized?” Watermarks are temporal notions of input completeness in the event-time domain.

Watermark 要解決的問(wèn)題如上面的引用所示,這也是關(guān)于 watermark 的解釋。

我們可以把 watermark 當(dāng)作一個(gè)函數(shù):輸入一個(gè) process time,它返回一個(gè) event time。假設(shè)輸出的 event-time 是 E,它表示所有 event time 小于 E 的輸入都已經(jīng)接收到了,也就是說(shuō),event time 小于 E 的數(shù)據(jù)將會(huì)被認(rèn)為不會(huì)再接收到。根據(jù) watermark 的類型,分為以下兩種:

  1. perfect watermark: 在這種情況下,我們知道所有輸入數(shù)據(jù)的情況,因此也就不會(huì)有任何的數(shù)據(jù)延遲;
  2. heuristic watermark:對(duì)于分布式的數(shù)據(jù)源,了解所有輸入數(shù)據(jù)的情況是不現(xiàn)實(shí)的,heuristic watermark 利用數(shù)據(jù)源的已有情況(partition、ordering、文章增長(zhǎng)率等)盡可能地估計(jì)其進(jìn)度,它也意味著有時(shí)候輸出的結(jié)果是錯(cuò)誤的,因?yàn)闀?huì)有 late data。

Watermark 提供了一個(gè)與輸入相關(guān)的 Completeness 概念,watermark 也成為構(gòu)建了前面介紹的 completeness trigger 的基礎(chǔ)。這里有一個(gè)示例:

// Watermark completeness trigger
PCollection<KV<Team, Integer>> totals = input
  .apply(Window.into(FixedWindows.of(TWO_MINUTES))
               .triggering(AfterWatermark()))
  .apply(Sum.integersPerKey());

Perfect watermark 是一個(gè)理想情況,在現(xiàn)實(shí)中,達(dá)到 perfect watermark 的代價(jià)是非常大的,所以我們一般會(huì)選擇一個(gè) heuristic watermark。當(dāng)然,無(wú)論哪種情況,當(dāng) watermark 到達(dá) window 的結(jié)尾時(shí),當(dāng)前 window 的結(jié)果就可以輸出了(認(rèn)為該窗口的所有數(shù)據(jù)都已經(jīng)接收到了,可以做進(jìn)一步的處理),正如上面的例子所示,但是兩種類型的 watermark 最后的計(jì)算結(jié)果是不一樣的(執(zhí)行過(guò)程見:Windowed summation on a streaming engine with perfect (left) and heuristic (right) watermarks,動(dòng)圖),程序執(zhí)行結(jié)束時(shí)的情況如下圖所示:

Windowed summation on a streaming engine with perfect (left) and heuristic (right) watermarks

在右邊的結(jié)果中,9 成了 late data,被丟棄掉,也導(dǎo)致了最后的結(jié)果是不準(zhǔn)確的。而且在實(shí)際中,基于 event-time 的 watermark 的選擇本來(lái)就是一個(gè)難題,如果按照 process-time 去 delay 一段時(shí)間,也同樣會(huì)導(dǎo)致這個(gè)問(wèn)題。關(guān)于 watermark 有兩個(gè)典型的問(wèn)題:

  1. Too Slow:如上面圖中左邊所示,watermark 需要等待的時(shí)間會(huì)非常長(zhǎng),如果 watermark 由于未到的數(shù)據(jù)而延遲,那么它會(huì)導(dǎo)致最后輸出結(jié)果的延遲;
  2. Too Fast:如果 watermark 發(fā)出的比較早,可能會(huì)導(dǎo)致 event-time 小于 watermark 的數(shù)據(jù)(輸入時(shí)有延遲)成為 late data,從而導(dǎo)致結(jié)果的不準(zhǔn)確性,如上面圖中右邊所示,對(duì)于比較關(guān)心正確性場(chǎng)景,僅僅依賴 watermark 來(lái)確定什么時(shí)候輸出遠(yuǎn)遠(yuǎn)是不夠的(也就是用戶需要按照自己的策略處理 late data)。

這里對(duì)完整性的概念做了進(jìn)一步的講述,但是對(duì)于無(wú)限、無(wú)序的數(shù)據(jù)源來(lái)說(shuō),僅僅做到完整性是不夠的,因?yàn)椴豢赡軆H僅依賴完整性就能達(dá)到低延遲和完整性輸出的目標(biāo)。如果想要同時(shí)做到兩者應(yīng)該怎么做呢?到這里我們清楚的是:

  1. repeated update trigger:提供低延遲的 update,但無(wú)法解決完整性的問(wèn)題;
  2. watermark:提供完整性的概念,但通常伴隨著高延遲。

為什么不把兩者結(jié)合起來(lái)呢?

When: Early/On-Time/Late Triggers FTW

前面已經(jīng)介紹了兩種類型的 trigger:repeated update triggers 和 completeness/watermark triggers,在實(shí)際中,兩者單獨(dú)使用是不夠的,通常是結(jié)合在一起使用的。Beam 提供了一個(gè)標(biāo)準(zhǔn) watermark trigger 的擴(kuò)展,它可以在 watermark 的任何一邊支持 repeated update watermark。這里把這種復(fù)雜的 trigger 分為了以下三種情況:

類型 說(shuō)明 解釋 特點(diǎn)
Zero or more early panes which are the result of a repeated update trigger that periodically fires up until the watermark passes the end of the window 在 watermark 達(dá)到 window 的結(jié)尾處(end)前會(huì)不斷觸發(fā)(repeated update trigger),它意味著在這之后將不會(huì)再觸發(fā),所以是【early pane】 彌補(bǔ)了 watermark too slow 的問(wèn)題
A single on-time pane which is the result of the completeness/watermark trigger firing after the watermark passes the end of the window 在 watermark 到達(dá) window 結(jié)尾處進(jìn)行觸發(fā)(它是 Completeness/watermark trigger 觸發(fā)的結(jié)果) 這個(gè)觸發(fā)是比較特殊的,因?yàn)樗隽思俣ǎ合到y(tǒng)現(xiàn)在認(rèn)為這個(gè) window的輸入是已經(jīng)完成的
Zero or more late panes which are the result of another (possibly different) repeated update trigger that periodically fires any time late data arrive after the watermark has passed the end of the window 它是另一種類型的 repeated update trigger,在 watermark 到達(dá) window 結(jié)尾處再觸發(fā),對(duì)于 perfect watermark,它觸發(fā)0次;但是對(duì)于 heuristic watermark,任何晚到的數(shù)據(jù)都會(huì)導(dǎo)致一個(gè) late firing。 它彌補(bǔ)了 watermark too fast 的不足。

這里以一個(gè)示例來(lái)說(shuō)明,下面的示例添加兩種類型的 trigger:一個(gè)是每分鐘周期性的 early firing,另一個(gè)是每條數(shù)據(jù)來(lái)就觸發(fā)的 late firing,實(shí)現(xiàn)如下所示:

// Early, on-time, and late firings via the early/on-time/late API
PCollection<KV<Team, Integer>> totals = input
  .apply(Window.into(FixedWindows.of(TWO_MINUTES))
               .triggering(AfterWatermark()
                 .withEarlyFirings(AlignedDelay(ONE_MINUTE))
                 .withLateFirings(AfterCount(1))))
  .apply(Sum.integersPerKey());

執(zhí)行過(guò)程見:Windowed summation on a streaming engine with early, on-time, and late firings(動(dòng)圖),在動(dòng)圖中最后截取的一張圖如下所示:

Windowed summation on a streaming engine with early, on-time, and late firings

無(wú)論是哪種的 watermark,隨著時(shí)間推移都有了很大的改進(jìn),相對(duì)來(lái)說(shuō),也減少了相應(yīng)的延遲。但這里還有一個(gè)問(wèn)題——window 的生命周期,每個(gè) window 都是維護(hù)一定狀態(tài)信息的,這是需要占用一定資源,在很多情況下,該窗口是不能一直存活的,對(duì)于 perfect watermark 是沒有問(wèn)題,但是對(duì)于 heuristic watermark 就不是那么容易了,到現(xiàn)在為止,我們還沒有一個(gè)很好的方法去評(píng)估一個(gè)窗口應(yīng)該保留的周期。

When: Allowed Lateness

Any real-world out-of-order processing system needs to provide some way to bound the lifetimes of the windows it’s processing. A clean and concise way of doing this is by defining a horizon on the allowed lateness within the system; that is, placing a bound on how late any given record may be (relative to the watermark) for the system to bother processing it; any data that arrives after this horizon are simply dropped.

需要給 window 設(shè)置生命周期的原因在前面已經(jīng)講述過(guò)了,每個(gè) window 的 state 是不可能無(wú)限期保留的,一個(gè)是存儲(chǔ)資源不允許,另一個(gè)是舊的數(shù)據(jù)隨著時(shí)間其價(jià)值也會(huì)下降。

任何處理真實(shí)、亂序數(shù)據(jù)的處理系統(tǒng),都需要提供一個(gè)方法來(lái)定義處理 window 的生命周期邊界,一個(gè)簡(jiǎn)單的方法是:對(duì)于這個(gè)系允許延遲的時(shí)間點(diǎn)上定義一個(gè)界限(horizon)。超過(guò)這個(gè)界限的數(shù)據(jù)將會(huì)被丟棄,這相當(dāng)于也限制了 window 狀態(tài)的保留時(shí)間(watermark 超過(guò) the end of window 后再超過(guò)自定義的時(shí)間后,window 就可以徹底關(guān)閉了,相關(guān)狀態(tài)也會(huì)被清除),有了這個(gè)之后,系統(tǒng)就不會(huì)
為不關(guān)心的數(shù)據(jù)(延遲太多,以及沒有意義的數(shù)據(jù))浪費(fèi)資源。

在 event-time 時(shí)間域上指定一個(gè) horizon 雖然看起來(lái)有些奇怪,但它卻是目前可行方案中最好的一個(gè),它可以降低 window 錯(cuò)過(guò)處理晚到數(shù)據(jù)的機(jī)會(huì)。如果使用 process-time,中間出現(xiàn)了 crash(導(dǎo)致延遲了幾分鐘),可能會(huì)導(dǎo)致這些應(yīng)該處理的數(shù)據(jù)變成了 late data 而沒有及時(shí)去處理。

這里的 watermark,指的是 low watermark,它試圖找到系統(tǒng)中已知的、最老的未處理 record 的 event-time。不管 event-time skew 如何變化,low watermark 總是去尋找系統(tǒng)中已知的、最舊的未處理 event 的 event-time。

關(guān)于 low watermark,可以通過(guò)這個(gè)例子理解:對(duì)于 event-time 為 12:00 的那批數(shù)據(jù),可能分布在 process-time 上的多個(gè)位置,假設(shè) process-time 是在 12:00~12:10之間,假設(shè)現(xiàn)在 process-time 為12:10,當(dāng)前已知的最舊的數(shù)據(jù)的 event-time 為 12:00,那么這個(gè)時(shí)間點(diǎn)的 low watermark 就是 12:00;

并不是所有系統(tǒng)的 watermark 都是 low watermark,比如 structured streaming 里的 watermark 指的是 high watermark,它表示系統(tǒng)已知的最新 record 的 event time。在處理延遲時(shí),系統(tǒng)可以回收那些比 high watermark 老得超過(guò)一個(gè)自定義閾值的 window。它確定了 event-time skew 的最大值,超過(guò)這個(gè) skew window 的數(shù)據(jù)都會(huì)被丟棄。

關(guān)于high watermark,這里也通過(guò)一個(gè)例子來(lái)理解:假設(shè)現(xiàn)在 process-time 為12:10,當(dāng)前已知的最新的數(shù)據(jù)的 event-time 為 12:10,那么這個(gè)時(shí)間點(diǎn)的 high watermark 就是 12:10(可能 low watermark 是 12:00,如前面 low watermark 中的例子)。

「allowed lateness」和「watermark」之間的關(guān)系有些微妙,這里讓我們?cè)倏匆粋€(gè)示例,與前面相比,這里又增加了一個(gè) 1min 的 lateness horizon:

// Early/on-time/late firings with allowed lateness
PCollection<KV<Team, Integer>> totals = input
  .apply(Window.into(FixedWindows.of(TWO_MINUTES))
               .triggering(
                 AfterWatermark()
                   .withEarlyFirings(AlignedDelay(ONE_MINUTE))
                   .withLateFirings(AfterCount(1)))
               .withAllowedLateness(ONE_MINUTE))
 .apply(Sum.integersPerKey());

處理過(guò)程見:Allowed lateness with early/on-time/late firings(動(dòng)圖),下面是最后面得一張截圖:

Allowed lateness with early/on-time/late firings

當(dāng) watermark 到達(dá)一個(gè) window 的 lateness horizon 時(shí),比如:對(duì)于 event-time 為 [12:00~12:02] 的 window,其 lateness horizon 的 even-time 為12:03(圖中的虛線),也就是說(shuō),當(dāng) event-time 超過(guò) 12:03 時(shí),這個(gè) window 就徹底關(guān)閉了,該 window 的狀態(tài)將不會(huì)再維護(hù)了。

  1. 如果 watermark 是 perfect watermark,那么這里將 lateness horizon 設(shè)置為0就是最佳的;
  2. 在指定 lateness horizon 的規(guī)則時(shí),有一個(gè)需要注意的是:對(duì)于那種需要在隨著時(shí)間統(tǒng)計(jì)全局?jǐn)?shù)據(jù)的例子(比如:統(tǒng)計(jì)網(wǎng)站隨著時(shí)間變化的總訪問(wèn)次數(shù)),這種情況下這個(gè) window 是跟鍵值綁定在一起的,只要這個(gè)鍵值的數(shù)量控制在一定范圍內(nèi),就沒有必要通過(guò) lateness horizon 去限制 window 的生命周期。

How: Accumulation

When triggers are used to produce multiple panes for a single window over time, we find ourselves confronted with the last question: “How do refinements of results relate?”

在前面的例子中,每個(gè) pane 的新數(shù)據(jù)都建立在緊鄰的前一個(gè) pane 的數(shù)據(jù)之上。但是,實(shí)際上是有三種不同的更新模式:

類型 說(shuō)明 適用場(chǎng)景
Discarding 一旦一個(gè) pane 是物化之后,那么存儲(chǔ)的 state 將會(huì)被丟棄,會(huì)生成一個(gè)新的 pane,它是不依賴之前的 state 它適用這種場(chǎng)景,比如:如果下游 consumer 是自己做一些聚合操作,由下游自己去做相應(yīng)的處理,比如:求和操作
Accumulating 之前的 state 會(huì)持續(xù)保留,未來(lái)的輸入會(huì)反映到已經(jīng)存在的 state 上,比如:最后結(jié)果更新到 k/v 存儲(chǔ)上,像 HBase 或 Bigtable 適合這種場(chǎng)景:使用新值直接覆蓋之前的結(jié)果,新值是包含目前看到的所有數(shù)據(jù)
Accumulating and retracting 不但會(huì) accumulate,還會(huì)修改之前錯(cuò)誤的結(jié)果(之前的錯(cuò)誤結(jié)果可能已經(jīng)對(duì)下游產(chǎn)生了影響,比如 group 操作修改前后可能去到不同的分組、對(duì)于動(dòng)態(tài)窗口,舊值可能會(huì)涉及多個(gè) window) 這種機(jī)制最復(fù)雜,但功能也是最強(qiáng)大的

這里來(lái)看一個(gè)使用 Discarding 的示例,這種模式下,沒有一個(gè) window 是重疊的,如果當(dāng)前的 window 有新的數(shù)據(jù)來(lái),即使這個(gè) window 之前已經(jīng)關(guān)閉了,這里還會(huì)再新建一個(gè)窗口,將新到來(lái)的數(shù)據(jù)放到這個(gè)窗口里,發(fā)送到下游,對(duì)于這個(gè)時(shí)間點(diǎn)的 window 來(lái)說(shuō),它們都是獨(dú)立存在的。

//  Discarding mode version of early/on-time/late firings
PCollection<KV<Team, Integer>> totals = input
  .apply(Window.into(FixedWindows.of(TWO_MINUTES))
               .triggering(
                 AfterWatermark()
                   .withEarlyFirings(AlignedDelay(ONE_MINUTE))
                   .withLateFirings(AtCount(1)))
               .discardingFiredPanes())
  .apply(Sum.integersPerKey());

處理過(guò)程見:Discarding mode version of early/on-time/late firings on a streaming engine

這里看下 Accumulating and retracting 模式,其處理過(guò)程見:Accumulating and retracting mode version of early/late firings on a streaming engine。

// Accumulating and retracting mode version of early/on-time/late firings
PCollection<KV<Team, Integer>> totals = input
  .apply(Window.into(FixedWindows.of(TWO_MINUTES))
               .triggering(
                 AfterWatermark()
                   .withEarlyFirings(AlignedDelay(ONE_MINUTE))
                   .withLateFirings(AtCount(1)))
               .accumulatingAndRetractingFiredPanes())
  .apply(Sum.integersPerKey());

這種把這三種模式列出來(lái)做一個(gè)對(duì)比,如下圖所示,它們?nèi)甙错樞蚱浯鎯?chǔ)和計(jì)算成本是逐漸變高的。具體應(yīng)該選擇哪種模式,是需要在正確性、latency 和 cost 之間做一個(gè) trade-off。

Side-by-side comparison of accumulation modes

Summary

總結(jié)

參考:

  1. 《Streaming Systems》,本文的圖片和內(nèi)容都來(lái)自這本書的第二章。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 像流星劃過(guò)天際,燦爛的消亡。 歲月未存慈悲,愿這個(gè)年紀(jì)我已不再將就 有些事情無(wú)法強(qiáng)求 該來(lái)的總會(huì)來(lái)該走的也無(wú)法挽留...
    拼命活著活著拼命閱讀 1,386評(píng)論 0 1
  • 有位客人到某人家里做客,看見主人家的灶上煙囪是直的,旁邊又有很多木材??腿烁嬖V主人說(shuō),煙囪要改曲,木材須移去,否則...
    ___Alone獨(dú)自_ad1c閱讀 318評(píng)論 0 1
  • 風(fēng)吹雪不知身該去往何處,不知不覺,提劍便走到了城郊。身后一黑影緊緊相隨。 風(fēng)吹雪沒有回頭,道:“我知道自己在做...
    靠學(xué)習(xí)吃飯的小朋友閱讀 1,130評(píng)論 2 3
  • 一 那一年,她十歲。 三年自然災(zāi)害導(dǎo)致饑民無(wú)數(shù),寒冬臘月不要說(shuō)無(wú)米下鍋,就是連野菜也無(wú)處可尋。萬(wàn)般無(wú)奈的父親決...
    慕天寒閱讀 496評(píng)論 2 19
  • 01引起恐懼癥的原因 不僅僅只有創(chuàng)傷性事件,還有替代經(jīng)歷和被告知經(jīng)歷。 02關(guān)于特定對(duì)象恐懼癥的治療方法 ①學(xué)會(huì)放...
    掃地_閱讀 287評(píng)論 0 4

友情鏈接更多精彩內(nèi)容