本篇是《Streaming Systems》第一章的總結(jié)輸出。
在大數(shù)據(jù)領(lǐng)域,最近幾年 Streaming data processing 變得越來越重要、越來越為人所知,下面是其中的一些原因:
- 商業(yè)對(duì)于實(shí)時(shí)性要求更高,而 streaming 是可以達(dá)到其低延遲目標(biāo)的;
- 當(dāng)前,數(shù)據(jù)量巨大、無限的數(shù)據(jù)集越來越普遍,原生就為無限流設(shè)計(jì)的系統(tǒng)更適合處理這種場(chǎng)景的數(shù)據(jù);
- 實(shí)時(shí)地處理數(shù)據(jù),將會(huì)使得資源負(fù)載更加均衡,其資源消耗也更可控(包括:可預(yù)測(cè))。
當(dāng)然 Streaming System 在開始的時(shí)候并不是那么成熟,特別是跟 batch system 對(duì)比。但是,現(xiàn)在 Streaming System 已經(jīng)是非常成熟了,而業(yè)內(nèi)對(duì)其的理解還停留在過去,很多人并沒有真正理解 Streaming 的含義。
本篇文章主要聚焦以下幾點(diǎn):
- Terminology:對(duì)于 Streaming 做一個(gè)準(zhǔn)確的定義;
- Capabilities:指出現(xiàn)有 Streaming System 的一些問題,并提出一些新的觀點(diǎn)(系統(tǒng)設(shè)計(jì)者在設(shè)計(jì)時(shí)為了滿足未來數(shù)據(jù)處理的需求應(yīng)該考慮一下);
- Time Domains:介紹數(shù)據(jù)處理領(lǐng)域兩個(gè) time domain 的概念(process time 和 event time)。
1. What is Streaming?
什么是 Streaming?Streaming 這個(gè)詞在今天應(yīng)用得非常廣泛,它可能會(huì)給 Streaming System 中的 Streaming 帶來一些誤導(dǎo)。Streaming 很長(zhǎng)一段時(shí)間都跟 無限流、近似結(jié)果等概念聯(lián)系在一起,這實(shí)際上模糊了 Streaming 的真正含義,那些概念僅限于過去對(duì) Streaming 的描述(比如:近似結(jié)果),但是現(xiàn)在 Streaming 發(fā)展得已經(jīng)很成熟了。如果 Streaming System 可以做到像 Batch System 在生產(chǎn)環(huán)境的準(zhǔn)確性,那么 Streaming 將會(huì)是另外一種含義。
Streaming System:A type of data processing engine that is designed with infinite datasets in mind.
如果你想要表達(dá) low-latency,approximate 或者 speculative results 的含義,最好直接使用它們,而不是用 Streaming 來表示。
1.1. 如何定義 dataset
關(guān)于dataset 的定義,從兩個(gè)維度來說:cardinality 和 constitution。
cardinality 主要是關(guān)于 dataset 大小(或量級(jí))的定義,可以分為:
- Bounded data:
A type of dataset that is finite in size; - Unbounded data:
A type of dataset that is infinite in size。
constitution 定義了與 dataset 交互的方式,可以分為:
- Table:
A holistic view of a dataset at a specific point in time. SQL systems have traditionally dealt in tables(在某個(gè)特定的時(shí)間點(diǎn)上可以看到全部的數(shù)據(jù)); - Stream:
An element-by-element view of the evolution of a dataset over time. The MapReduce lineage of data processing systems have traditionally dealt in streams(隨著時(shí)間數(shù)據(jù)會(huì)有相應(yīng)的變化)。
后面會(huì)詳細(xì)講述 Table 和 Stream 的內(nèi)容,在這之前,重點(diǎn)會(huì)在 Stream 上,它現(xiàn)在已經(jīng)是流和批的基礎(chǔ)(batch 是一種特殊的 stream),也最能體現(xiàn) Streaming Systems 面臨的特有挑戰(zhàn)。
1.2. Streaming 的局限性被夸大了
Streaming 過去一直被認(rèn)為是一個(gè)可以提供 low-latency、inaccurate 或 speculative results 的服務(wù),特別是在與 batch system 提供的準(zhǔn)確性做對(duì)比之后,大家?guī)缀醪粫?huì)把 Streaming 與強(qiáng)一致性、準(zhǔn)確性聯(lián)系在一起,比如最開始的 Lambda 架構(gòu)。
Lambda 架構(gòu)在當(dāng)時(shí)提出了之后,取得了非常大的成功,在業(yè)內(nèi)應(yīng)用非常廣泛,但其最大的缺點(diǎn)就是 —— 業(yè)務(wù)需要維護(hù)兩套系統(tǒng)。Kreps 后來首先質(zhì)疑了 Lambda 架構(gòu),接著又提出了 Kappa 架構(gòu)。但是如果是作者的話,他會(huì)更加激進(jìn)一些,直接提出:Streaming System 可以提供批功能的超集。
Batch System 的目標(biāo)是高吞吐、節(jié)省資源,并針對(duì)批的場(chǎng)景做了大量的優(yōu)化,并不是天生為無限流設(shè)計(jì)的。在 DataFlow 中,我們使用同一個(gè)模型做批和流的處理,針對(duì)不同的場(chǎng)景也做了相應(yīng)具體的優(yōu)化,雖然目前是運(yùn)行不同的 Runner 中。
1.3. 一個(gè) Streaming System 需要加強(qiáng)的地方
Streaming 的成熟發(fā)展,將使得 Lambda 架構(gòu)變成歷史,但 Streaming System 想要在批處理領(lǐng)域做得比 Batch System 更好,還需要在下面兩個(gè)地方上加強(qiáng)。
1.3.1. Correctness
它的核心就是狀態(tài)存儲(chǔ)的一致性,需要有一個(gè) checkpoint 機(jī)制來保證 state 的一致性。最開始是 Spark Streaming 提供了這樣的機(jī)制,現(xiàn)在 Streaming System 從本質(zhì)上已經(jīng)有了很大的提高,強(qiáng)一致性已經(jīng)成為 Streaming System 的必備要求(不能保證一致性的 Streaming System 未來必然是會(huì)被淘汰的)。
1.3.2. Tools for reasoning about time
對(duì)于無限、無序數(shù)據(jù)流的處理,需要 goods tools for reasoning about time(不太好翻譯,理解就行,簡(jiǎn)單來說就是,關(guān)于 time 推理的定義,比如:應(yīng)該怎么定義才能保證數(shù)據(jù)的完整性,一個(gè)窗口應(yīng)該何時(shí)關(guān)閉?),這里先看下 time domain 的重要概念。
1.4. Process Time VS Event Time
關(guān)于 time domain 有兩種類型:
- Event Time:
This is the time at which events actually occurred; - Processing Time:
This is the time at which events are observed in the system。
理想情況下,Process Time 和 Event Time 應(yīng)該是相等的(如下圖中那條虛線),但在實(shí)際情況下,它們之間差距的影響因素非常多,可能跟軟件、硬件或數(shù)據(jù)有關(guān),并且這個(gè)差異還是毫無規(guī)律可循的,如下圖所示:

Process Time 和 Event Time 給 Streaming Processing 帶來了很多的問題,由于 unbounded dataset 的無邊界特性,這些系統(tǒng)會(huì)提供一些把輸入數(shù)據(jù)源進(jìn)行 window 操作的概念。window 操作的本質(zhì)還是把無限流按時(shí)間切片,對(duì)于那些關(guān)注 Event Time 的應(yīng)用來說,肯定是不能按照 Process Time 進(jìn)行 window 操作的(Process Time 和 Event Time 之間的無關(guān)性會(huì)導(dǎo)致 window 之間的不準(zhǔn)確性)。
之前的系統(tǒng)都依賴了【完整性】的概念,但如果把這個(gè)概念應(yīng)用無限流上時(shí)就會(huì)有一些問題,假如在 window 中使用了 Event Time,那么應(yīng)該如何定義數(shù)據(jù)的完整性呢?怎么決定什么時(shí)候拿到了 X (event time)之前的所有數(shù)據(jù)?與其想著如何去把無限流切片,不如我們思考一下:如何設(shè)計(jì)一種新的系統(tǒng)來適應(yīng)這種不確定性?
2. Data Processing Patterns
這一節(jié)主要是關(guān)于 Bounded data 和 Unbounded data 處理的常見模式(優(yōu)點(diǎn)類似于設(shè)計(jì)模式,但又不完全相同)。
2.1. Bounded Data
Bounded Data 的處理模型非常簡(jiǎn)單,比如:可以使用 MapReduce 去做。

2.2. Unbounded Data:Batch
先看下使用 Batch engine 去處理 Unbounded Data 的情況。
2.2.1. Fixed windows
處理模型簡(jiǎn)單來說就是微批處理(像 Spark Streaming 這種),這種模型依然會(huì)有完整性(Completeness)的問題,直到所有的數(shù)據(jù)都到了再進(jìn)行處理?還是每當(dāng)有新數(shù)據(jù)來,就把這個(gè) window 的整個(gè) batch 都重新計(jì)算一遍?

2.2.2. Sessions
使用 Batch 引擎去處理 Session 類型的任務(wù)時(shí),還是通過切分 batch 來做,如果 batch 太大,結(jié)果可能會(huì)帶來相應(yīng)的延遲;如果通過額外的邏輯把新的數(shù)據(jù)合并到之前的 session 窗口,又會(huì)帶來非常大的復(fù)雜性。

2.3. Unbounded Data:Streaming
在現(xiàn)實(shí)中,對(duì)于 Unbounded Data,它不僅僅是無限流,還可能有以下特點(diǎn):
- 對(duì)于 event time 而言,是強(qiáng)無序的,這意味著如果想根據(jù) event time 做一些事情,還需要基于 event time 做一些 sort 操作;
- event time 的跨度非常大,很難假設(shè)在某個(gè)時(shí)間點(diǎn)大部分?jǐn)?shù)據(jù)已經(jīng)全部到了。
對(duì)于上面這種類型的數(shù)據(jù)源,有幾種方案可以做相應(yīng)的處理,這里詳細(xì)介紹一下這些方法(跟具體的場(chǎng)景有關(guān)系)。
2.3.1. Time-agnostic
這種是最基本的場(chǎng)景,對(duì)時(shí)間不敏感(對(duì) time 沒有太大要求),下面舉一些這種類型的例子。
2.3.1.2. Filtering
過濾是種比較簡(jiǎn)單的場(chǎng)景,在任何情況下,只依賴某一個(gè)元素(可能是多元素組合后的元素)進(jìn)行過濾,在處理的時(shí)候,直接按照條件過濾即可,event-time 與 process-time 的偏差對(duì)其沒什么影響。

2.3.1.2. Inner joins
Time-agnostic 的另一個(gè)常見場(chǎng)景就是 inner join 了。當(dāng)對(duì)兩個(gè)無限流數(shù)據(jù)源做 join 時(shí),如果只關(guān)心兩個(gè)數(shù)據(jù)源到達(dá)時(shí) join 的結(jié)果,那么 time 就沒有太大影響了(如果一個(gè)數(shù)據(jù)源的數(shù)據(jù)先到,先做緩存,但如果另一個(gè)數(shù)據(jù)源的數(shù)據(jù)一直不來,這時(shí)候可以引入 timeout 機(jī)制來解決這個(gè)問題)。

2.3.2. Approximation algorithms
近似算法,像:近似 Top-N、Streaming K-means 等等,在數(shù)據(jù)的處理時(shí)候,即使有時(shí)間上的傾斜(event-time 與 process-time 有偏差)對(duì)結(jié)果影響也不是很大。它的優(yōu)點(diǎn)是:開銷低+本身就是為無限流而設(shè)計(jì);缺點(diǎn)是:數(shù)據(jù)規(guī)模的限制(已經(jīng)完成的計(jì)算,很難再引入新到的數(shù)據(jù)),結(jié)果的近似性也限制了它們的應(yīng)用。

近似算法也有 time 的概念,只不過依賴的是 process time,即使使用了跨 event-time 的數(shù)據(jù)去訓(xùn)練它們,通常情況下意義也不大(本質(zhì)上還是 time-agnostic 類型)。
2.3.3. Windowing
window 的類型有以下幾種:

- Fixed Window:按時(shí)間切成固定大小的 window,是 aligned window 的一種;
- Sliding Window:也是一種 Fixed Window,但它有 fixed length 和 fixed period 兩個(gè)設(shè)置;
- Sessions:一種 unaligned window,長(zhǎng)度是未知的,一種動(dòng)態(tài)的 window,比如分析用戶的行為等。
window 在 process-time 和 event-time 下都是有意義的,這里分別來看下這兩種情況。
Windowing by processing time
按 Process Time 切分 window 在實(shí)現(xiàn)上非常簡(jiǎn)單,只需要先緩存到來的數(shù)據(jù),等到達(dá)到 process time 然后關(guān)閉窗口,做相應(yīng)的計(jì)算即可。window 的完整性也很容易評(píng)估,因?yàn)槲覀冎?window 應(yīng)該什么時(shí)候關(guān)閉(適合那些不需要 event-time 的場(chǎng)景,比如統(tǒng)計(jì) qps、pv、uv 等)。

這種方式最大的缺點(diǎn)就是,無法處理 event-time 相關(guān)的場(chǎng)景。而 Event Time 延遲的場(chǎng)景非常普遍,比如:手機(jī) app 經(jīng)常需要上報(bào)統(tǒng)計(jì)的信息,如果手機(jī)突然掉線幾天,等在線之后,這時(shí)候上報(bào)的數(shù)據(jù)可能是幾 hour /天前的數(shù)據(jù)。
Windowing by event time
對(duì)于需要保證 event-time 正確性的場(chǎng)景是使用 event-time window 的重要原因。

但是使用 vent-time window 并不是完全沒有沒有性能消耗的,它有兩個(gè)明顯的缺點(diǎn):
- Buffering:相當(dāng)于擴(kuò)展了 window 的長(zhǎng)度,需要緩存更多的數(shù)據(jù)(有些情況只需要增量緩存,并不緩存全部原始的數(shù)據(jù),類似于先進(jìn)行小范圍的聚合);
- Completeness:一個(gè) window 的完整性怎么定義?一個(gè)通用的解決方法就是使用 watermark 來去做,但是對(duì)于那些需要精確計(jì)算的場(chǎng)景(比如:計(jì)費(fèi))又該怎么做呢(后面的文章會(huì)介紹)?
3. 總結(jié)
本章主要講述的內(nèi)容:
- Streaming 的清晰定義,對(duì)于 dataset 兩個(gè)維度的定義;
- 評(píng)估設(shè)計(jì)良好的 streaming 和 batch system,指出 streaming 是 batch 的超集,Lambda 架構(gòu)提出的前提是 streaming system 的功能還比較弱;
- 提出兩個(gè)重要的地方(如果 streaming system 想在 batch 領(lǐng)域趕上 batch system 的話):正確性和 Tools for reasoning about time;
- event-time 和 process-time 的差異;
- 當(dāng)前對(duì)于 Bounded data 和 Unbounded data,常用的處理模式。
有誤的地方,歡迎指正~
參考: