The world beyond batch: Streaming 101

前言

今天流式數(shù)據(jù)處理在大數(shù)據(jù)領域是一件大事,理由如下:

????1、企業(yè)渴望更加及時的數(shù)據(jù),而且采用流式處理是降低延遲的很好的辦法。

????2、在現(xiàn)代企業(yè)中大的且無界的( unbounded)數(shù)據(jù)集變得更加普遍,且這些數(shù)據(jù)更容易被一個針對無界數(shù)據(jù)設計的系統(tǒng)所使用。

隨著時間的推移,和數(shù)據(jù)到達pipeline,數(shù)據(jù)處理的工作負載將更均勻地分布,從而產(chǎn)生更一致和可預測的資源消耗。

盡管是商業(yè)驅(qū)動激發(fā)了人們對流式系統(tǒng)的興趣,但是相對于批處理系統(tǒng)而言,現(xiàn)存的大多數(shù)流式系統(tǒng)還不成熟,而近來在這個領域很多激動人心的進展·。

作為一個在谷歌從事大規(guī)模流式系統(tǒng) (MillWheel,?Cloud Dataflow)工作5年多的人,至少可以這么說,我為流式系統(tǒng)的時代思潮感到高興。我對于如何幫助人們了解流式系統(tǒng)可以做什么以及怎樣更好的使用流式系統(tǒng)很感興趣,特別是考慮到現(xiàn)有批處理系統(tǒng)和流式系統(tǒng)之間的語義鴻溝。為了實現(xiàn)這個目標,?且OReilly的工作人員邀請我寫一篇關于我之前兩篇文章的續(xù)集: Say Goodbye to Batch? 以及 ?Strata + Hadoop World London 2015

在此,我想說的有很多,所以我把它分成了兩個部分:

Streaming 101: 第一篇文章將介紹一些基本的背景信息,并在深入了解時間域( time domains?)的細節(jié)之前,先澄清一些術語,以及數(shù)據(jù)處理(批處理和流處理)中通用的方法和高級概念。

The Dataflow Model:第二部分主要介紹在Cloud Dataflow中使用的統(tǒng)一的批處理和流處理模型,通過一個具體的例子來進行深入, 在此之后,我將通過對現(xiàn)有批處理和流系統(tǒng)的簡短語義比較來結(jié)束本文。

一、Background

首先,我將介紹一些重要的背景信息,這些信息將幫助構(gòu)建我想要討論的其他主題,主要針對三個特定的主題:

術語(Terminology): 要準確地談論復雜的話題,需要對術語進行精確的定義, 對于那些當前已有很多語義的術語(在不同語境下有不同的含義的術語),我將盡量在提到它們的時候明確我所要表達的意思。

Capabilities:我將評論流式系統(tǒng)經(jīng)常被人們詬病的缺點。 我也將提出我認為的數(shù)據(jù)處理系統(tǒng)構(gòu)建人員為滿足未來數(shù)據(jù)消費者的需求所需要采用的思路。

時間域(Time domains): 我將介紹與數(shù)據(jù)處理相關的兩個主要時間域, 展示它們之間的關系,并指出這兩個時間域帶來的一些困難。

二、Terminology: What is streaming?

在進一步討論之前,我想先講一件事: what is streaming?“Streaming”這個詞在今天可以指代很多不同的東西,這會導致我們對流式系統(tǒng)產(chǎn)生誤解,或者說對流式系統(tǒng)究竟能做什么產(chǎn)生誤解。?因此,我有必要稍微精確地定義這個術語。

問題的關鍵是,許多事情都應該用它們是什么來描述(例如,無界數(shù)據(jù)處理、近似結(jié)果等),從歷史的角度描述他們是如何實現(xiàn)的。Streaming究竟是什么,這在術語上缺乏精確的定義。在某些情況下,這給流式系統(tǒng)本身帶來和負擔,這意味著它們的能力僅限于流式系統(tǒng)經(jīng)常被描述的那些特性,比如流式系統(tǒng)的結(jié)果都是近似的或推測的。 考慮到設計良好的流式系統(tǒng)能夠產(chǎn)生正確、一致和可重復的結(jié)果,正如任何現(xiàn)有的批處理引擎一樣。我更傾向于將Streaming這個術語定義為一個非常特定的含義: 一種設計時考慮到無限的數(shù)據(jù)集的數(shù)據(jù)處理引擎。僅此而已( 為了完整起見,也許有必要指出這個定義既包括真正的流實現(xiàn),也包括微批處理實現(xiàn))。

至于“Streaming”的其他常見用法,下面是我經(jīng)常聽到的一些詞匯,每個詞匯都有更精確的描述性,我建議我們作為一個社區(qū)應該嘗試采用這些詞匯:

Unbounded data: 一種不斷增長的,本質(zhì)上是無限的數(shù)據(jù)集。 這些數(shù)據(jù)通常被稱為“流數(shù)據(jù)( streaming data.)”。 然而,對于術語Streaming或Batch等這些術語在描述數(shù)據(jù)集時是有問題的。因為如上所述,它們意味著使用某種類型的執(zhí)行引擎來處理這些數(shù)據(jù)集。 實際上,這兩種類型的數(shù)據(jù)集之間的關鍵區(qū)別在于它們的有限性,因此,最好使用能夠捕捉這種區(qū)別的術語來描述它們。 因此,我將把無限的“Streaming”數(shù)據(jù)集稱為Unbounded data(無界數(shù)據(jù)),把有限的“batch”數(shù)據(jù)集稱為bounded data(有界數(shù)據(jù))。

Unbounded data processing:? 應用于上述無界數(shù)據(jù)類型的一種正在被使用的數(shù)據(jù)處理模式。雖然我個人很喜歡使用?Streaming?這個術語來描述這種類型的數(shù)據(jù)處理,但是在這種情況下使用Streaming也意味著使用流執(zhí)行引擎,這更像是一種誤導; 自從批處理系統(tǒng)最初被構(gòu)想出來以來,就一直使用批處理引擎的重復運行來處理無界數(shù)據(jù)( 相反,設計良好的流系統(tǒng)完全能夠在有界數(shù)據(jù)上處理“ batch”類型的工作負載)。 因此,為了清晰起見,我將簡單地將其稱為無界數(shù)據(jù)處理(Unbounded data processing)

Low-latency, approximate, and/or speculative results: 這些用來描述計算結(jié)果的詞通常與流引擎( streaming engines)相關聯(lián), 傳統(tǒng)上,批處理系統(tǒng)的設計并沒有考慮到低延遲或推測結(jié)果,這是一個歷史遺留問題,僅此而已。 當然,批量引擎完全能夠產(chǎn)生近似的結(jié)果。 因此,與上面的術語一樣,將計算結(jié)果描述為它們是什么( low-latency, approximate, and/or speculative)要比描述它們在歷史上是如何表現(xiàn)的(通過流引擎)好得多。

從這里開始,每當我使用術語“Streaming”時,您都可以放心地認為我指的是為無界數(shù)據(jù)集( unbounded data)設計的執(zhí)行引擎,僅此而已,.當我說到上面的任何其他術語時,我會明確地說到無界數(shù)據(jù)( unbounded data)、無界數(shù)據(jù)( bounded data)處理或低延遲/近似/推測結(jié)果(?low-latency / approximate / speculative results). 這些是我們在 Cloud Dataflow中采用的術語,我鼓勵其他人采取類似的觀點。

三、On the greatly exaggerated limitations of streaming

接下來,讓我們討論一下流式系統(tǒng)能做什么和不能做什么,重點是能做什么: 在這些帖子中,我想要表達的最重要的事情之一就是:一個設計良好的流式系統(tǒng)應該具有怎樣的能力。 流式系統(tǒng)長期以來一直處于一個小眾市場,提供低延遲、不準確/投機的結(jié)果,通常需要與更強大的批處理系統(tǒng)一起才能提供最終正確的結(jié)果,即Lambda體系結(jié)構(gòu)( Lambda Architecture.)對于那些還不熟悉Lambda體系結(jié)構(gòu)的人來說,其基本思想是在批處理系統(tǒng)的同時運行一個流式系統(tǒng),這兩個系統(tǒng)基本上執(zhí)行相同的計算邏輯。 流系統(tǒng)提供了低延遲、不準確的結(jié)果( 要么是因為使用了近似算法,要么是因為流系統(tǒng)本身沒有提供正確性保證)且一段時間之后,批處理系統(tǒng)開始運行,并提供正確的輸出。這個架構(gòu)最初由Twitter的Nathan Marz (Storm的創(chuàng)始人)提出,它最終非常成功,因為它在當時是一個非常棒的想法。?但是流式引擎在正確性方面有點令人失望,且正如您所預期的那樣,批處理引擎天生就很笨拙,而Lambda提供了一種讓您吃到蛋糕的方法。 不幸的是,維護Lambda系統(tǒng)是一個麻煩: 您需要構(gòu)建、提供和維護 pipelines?的兩個獨立版本(batch layer?和 speed layer),然后在最后以某種方式合并來自兩個 pipelines?的結(jié)果(serving layer)。

作為一個花了數(shù)年時間致力于強一致性流引擎的人,我還發(fā)現(xiàn)Lambda體系結(jié)構(gòu)的整個原則有點令人討厭。 我是Jay Kreps( Jay Kreps’?Questioning the Lambda Architecture)的追隨者,當Lambda Architecture出現(xiàn)時同樣對它存有質(zhì)疑。Jay Kreps這篇文章是第一個反對雙重模式執(zhí)行必要性的文章;令人高興的是, Kreps使用Kafka這樣的可重復使用數(shù)據(jù)的系統(tǒng)作為流相互連接的橋梁,解決了流式系統(tǒng)可重復性的問題,甚至提出了Kappa體系結(jié)構(gòu),這意味著一個設計良好的系統(tǒng)僅需要使用運行一個管道,且這個系統(tǒng)是針對當前的工作而構(gòu)建的。 我原則上完全支持這個概念。坦率地說, 更進一步,我認為設計良好的流系統(tǒng)實際上提供了嚴格的批處理功能的超集,即設計良好的流式系統(tǒng)完全可以執(zhí)行批處理任務,所以現(xiàn)在應該不需要批處理系統(tǒng)了。值得稱贊的是Flink的工作人員把這個想法放在了心上,并且建立了一個Flink系統(tǒng),他是?all-streaming-all-the-time?,甚至是在“批處理”模式下。所有這一切的必然結(jié)果是:流系統(tǒng)的廣泛成熟,加上用于無限數(shù)據(jù)處理的健壯框架,最終將使得Lambda體系架構(gòu)成為大數(shù)據(jù)歷史上的老古董。 我相信現(xiàn)在是實現(xiàn)這一目標的時候了,因為要做到這一點,即在Streaming自己的模式下?lián)魯atch,你真的只需要兩件事:

Correctness?:在這方面Streaming可以做到與批處理一樣的效果。?核心在于,正確性歸結(jié)為一致的存儲。流系統(tǒng)需要一種關于隨著時間對checkpoint狀態(tài)進行持久化的方法(Kreps在他的《Why local state is a fundamental primitive in stream processing》 中談到了這一點),并且它必須經(jīng)過良好的設計以在機器故障時保持一致。 當Sparkstreaming幾年前首次出現(xiàn)在公共大數(shù)據(jù)領域時, 在一個原本黑暗的 streaming?世界里,它就像是一座燈塔,尤其是關于streaming?的一致性。 值得慶幸的是,從那以后情況有所改善,但值得注意的是,許多流系統(tǒng)仍然試圖在沒有強一致性的情況下生存;我真的不能相信 at-most-once processing仍然是一個問題,但事實上它確實仍是一個問題。 重申一下,因為這一點很重要:?對于exactly-once processing而言強一致性是必須的,這是正確性的必要條件,這是任何系統(tǒng)都必須具備的條件,它將有機會滿足或超過批處理系統(tǒng)的能力,除非你真的不在乎結(jié)果, 我懇請您不要使用任何不能提供強一致性狀態(tài)的流系統(tǒng)。 批處理系統(tǒng)不需要您提前驗證它們是否能夠生成正確的答案; 不要浪費你的時間在哪些不能滿足同樣標準的流系統(tǒng)上。 如果你想知道如何在流系統(tǒng)中獲得強一致性,我建議你看下 ?MillWheel和 Spark Streaming?兩篇論文都花了大量時間討論一致性問題。 鑒于其他地方和很多文獻中有關于這個主題的大量高質(zhì)量的內(nèi)容,我將不再在這篇文章中進一步介紹它。

Tools for reasoning about time?:這方面Streaming將超越batch。 對于處理event-time skew不固定、無界、無序的數(shù)據(jù),好的時間推理工具是必不可少的。 越來越多的現(xiàn)代數(shù)據(jù)集顯示了這些特性(event-time skew不固定、無界、無序),而現(xiàn)有的批處理系統(tǒng)(以及大多數(shù)流系統(tǒng))缺乏必要的工具來應對它們帶來的困難, 我將用這篇文章的剩余部分和下一篇文章的大部分時間來解釋和關注這一點。 首先,我們將對時域的重要概念有一個基本的了解,然后我們將更深入地了解我所說的無界的(unbounded)、無序的(unordered)、?event-time skew不固定(varying event-time skew)的數(shù)據(jù)。 接下來,我們將使用批處理和流系統(tǒng),研究有界和無界數(shù)據(jù)處理的常用方法。

四、Event time vs. processing time

要有效地談論無界數(shù)據(jù)( unbounded data?)處理,需要清楚地了解所涉及的時間域。在任何數(shù)據(jù)處理系統(tǒng)中,我們通常關心兩個時間域:

事件時間(Event time): 這是事件實際發(fā)生的時間。

處理時間(Processing time):?流式系統(tǒng)中觀察到事件的時間

不是所有用例都關心事件時間( event times?)(如果你的用例不關心,萬歲!你的生活更輕松),但很多人會。?例如隨著時間的推移描述用戶行為、大多數(shù)賬單應用程序和許多類型的異常檢測。

在理想情況下, event time?和?processing time總是相等的,即事件發(fā)生時立即進行處理。 然而,實際情況并非如此,event time?和?processing time之間的偏差不僅是非零的,而且通常是關于底層輸入源、執(zhí)行引擎和硬件特性的復雜的可變函數(shù)。同時影響因素包括:

1、共享資源限制,如非專用環(huán)境中的網(wǎng)絡擁塞、網(wǎng)絡分區(qū)或共享CPU

2、軟件原因,如分布式系統(tǒng)邏輯、爭用等

3、數(shù)據(jù)本身的特性,包括鍵分布、吞吐量的變化或無序方差(例如,一架飛機上的人,他們的手機在整個飛行過程中脫機使用后,都使用離線的飛行模式)

因此,如果在任何實際系統(tǒng)中繪制 event time?和?processing time?的進度,通常會得到類似于圖1中的紅線的結(jié)果

圖1:時域映射示例。X軸表示系統(tǒng)中 event time的完整性,即 event time中所有小于X的數(shù)據(jù)都被觀測到。y軸表示 processing time的進度,即數(shù)據(jù)處理系統(tǒng)執(zhí)行時觀察到的正常時鐘時間。

斜率為1的黑色虛線表示理想情況,其中 processing time和event time完全相等。 紅線代表現(xiàn)實情況。在這個例子中,系統(tǒng)在 processing time的開始有一點延遲,在中間向理想的方向轉(zhuǎn)變,然后再次延遲到最后一點。 理想值與紅線之間的水平距離是processing time與event time之間的偏差,這種偏差實質(zhì)上是pipeline處理引入的延遲。

由于processing time與event time之間的映射不是靜態(tài)的,這意味著如果您關心數(shù)據(jù)的event time,那么您不能僅通過在pipeline中觀察到的數(shù)據(jù)的上下文來分析數(shù)據(jù)。 不幸的是,現(xiàn)有的大多數(shù)系統(tǒng)針對無界數(shù)據(jù)操作都是采用這種方式。 為了處理無界數(shù)據(jù)集的無限特性,這些系統(tǒng)通常提供了一些數(shù)據(jù)窗口的概念。 下面我們將深入討論窗口,但它實際上意味著沿著時間邊界將數(shù)據(jù)集分割成有限的片段。

如果您關心正確性,且對在 event times上下文中分析數(shù)據(jù)感興趣,那么您就不能像大多數(shù)現(xiàn)有系統(tǒng)所做的那樣使用processing time(即,processing time??windowing)來分析數(shù)據(jù)。 由于processing time與event time之間沒有一致的相關性,一些event times數(shù)據(jù)最終會出現(xiàn)在錯誤的processing time窗口中(?例如,由于分布式系統(tǒng)固有的滯后,許多類型的輸入源的在線/離線特性等原因) 就像把正確性扔出窗外一樣。我們將在下面的一些例子以及下一篇文章中更詳細地討論這個問題。

不幸的是,按event times劃分窗口時,情況也不是那么樂觀。在無界數(shù)據(jù)的背景下,無序和可變傾斜將導致event times的完整性問題: 在processing time與event time之間缺乏可預測的映射,對于給定事件時間點X,在pipeline中您如何確定何時觀察到所有eventtime為X的所有數(shù)據(jù)? 對于許多真實世界的數(shù)據(jù)源,您無法確定,因為event time為X的所有數(shù)據(jù)可能會延遲到達pipeline(由于網(wǎng)絡,或其他原因),且這個延遲時多久沒有人知道。目前使用的絕大多數(shù)數(shù)據(jù)處理系統(tǒng)都依賴于某種完整性的概念,這使得它們在應用于無界數(shù)據(jù)集時處于非常不利的地位。

我建議,與其試圖將無界數(shù)據(jù)整理為最終完整的有限批信息,不如設計一些工具,讓我們生活在這些復雜數(shù)據(jù)集所帶來的不確定性世界中:新數(shù)據(jù)會到達,舊數(shù)據(jù)可能會被收回或更新。我們構(gòu)建的任何系統(tǒng)都應該能夠自己處理這些事實,完整性的概念是一種方便的最優(yōu)化的方案,而不是語義上的需要。

在深入研究我們?nèi)绾问褂?Cloud Dataflow模型構(gòu)建這樣的系統(tǒng)之前,讓我們先完成一個更有用的背景:常見的數(shù)據(jù)處理模式:

五、Data processing patterns

現(xiàn)在,我們已經(jīng)有了足夠的背景知識,當前可以開始研究那些常用的核心的可以跨有界和無界數(shù)據(jù)處理的模式。 我們將在我們關心的兩種主要引擎的上下文中查看針對這兩種類型數(shù)據(jù)的處理,以及與之相關的情況( 批處理和流處理,在這種情況下,我實際上是把微批處理和流處理混在一起,因為這兩者之間的區(qū)別在這個級別上不是很重要)。

5.1、Bounded data

處理有界數(shù)據(jù)非常簡單,可能每個人都很熟悉。 在下面的圖表中,我們從左邊開始,有一個充滿熵(熱力學的概念,可以簡單的認為是比較雜亂的)的數(shù)據(jù)集。 我們通過一些數(shù)據(jù)處理引擎運行它( 典型的批處理,盡管一個設計良好的流引擎也能很好地工作),例如Mapreduce. 右邊的是一個新的結(jié)構(gòu)化數(shù)據(jù)集,且具有更大的內(nèi)在價值:

Figure 2: Bounded data processing with a classic batch engine.

左邊的有限的非結(jié)構(gòu)化數(shù)據(jù)池通過數(shù)據(jù)處理引擎運行,從而在右邊生成相應的結(jié)構(gòu)化數(shù)據(jù)。

當然,作為這個方案的一部分,你實際上可以計算的東西有無數(shù)的變化,但是整個模型非常簡單。?更有趣的是處理無界數(shù)據(jù)集的任務。 現(xiàn)在讓我們看看通常處理無界數(shù)據(jù)的各種方法:首先是傳統(tǒng)批處理引擎使用的方法,然后是為無界數(shù)據(jù)設計的系統(tǒng)(如大多數(shù)流媒體或微批處理引擎)常采用的方法。

5.2、Unbounded data — batch

批處理引擎雖然在設計時沒有明確考慮到無界數(shù)據(jù),但自從第一次構(gòu)思批處理系統(tǒng)以來,就被用于處理無界數(shù)據(jù)集。 正如人們可能預期的那樣,這種方法將無界數(shù)據(jù)分割成適合批處理的有界數(shù)據(jù)集的集合。

5.2.1、Fixed windows

使用批處理引擎的重復運行來處理無界數(shù)據(jù)集的最常見方法是將輸入數(shù)據(jù)窗口設置為固定大小的窗口,然后將每個窗口作為獨立的有界數(shù)據(jù)源來處理。 特別是對像日志類型的輸入源,事件可以寫入目錄和文件,且目錄或文件的名字編碼與窗口相對應,這樣事情乍一看似乎很簡單,因為你實際上已經(jīng)完成了基于時間的shuffer,數(shù)據(jù)已經(jīng)被分配到它所屬時間窗口對應的文件中。

然而,在現(xiàn)實中,大多數(shù)系統(tǒng)仍然有一個完整性問題需要處理: 如果出于網(wǎng)絡分區(qū)的原因,您的一些事件在發(fā)送到日志文件的途中出現(xiàn)了延遲怎么辦? 如果您的事件是全局收集的,并且必須在處理之前轉(zhuǎn)移到一個公共位置,那該怎么辦? 如果你的活動來自移動設備呢? 這意味著可能需要某種緩解措施( 例如,延遲處理直到您確定所有事件都已被收集,或者每當有延遲到達的數(shù)據(jù)進入窗口時,就重新處理給定窗口的整個批處理)

Figure 3: Unbounded data processing via ad hoc fixed windows with a classic batch engine.

無界數(shù)據(jù)集預先收集到有限的固定大小的有界數(shù)據(jù)窗口中,然后通過連續(xù)運行經(jīng)典批處理引擎進行處理

5.2.2、Sessions

當您試圖使用批處理引擎將無界數(shù)據(jù)處理為更復雜的窗口策略(如 Sessions)時,這種方法就更容易失敗。 Sessions通常定義為活動周期(例如,對于特定用戶),以不活躍的間隙結(jié)束。 在使用典型的批處理引擎計算 Sessions時,Sessions通常會被不同的批次所分割,如下圖中的紅色標記所示。 通過增加批大小可以減少Session被分割的次數(shù),但代價是增加延遲。 另一種選擇是添加額外的邏輯,以便從之前的的運行結(jié)果中縫合會話,但代價是進一步增加復雜性。

Figure 4: Unbounded data processing into sessions via ad hoc fixed windows with a classic batch engine.

無界數(shù)據(jù)集預先收集到有限的固定大小的有界數(shù)據(jù)窗口中,然后通過連續(xù)運行經(jīng)典批處理引擎將這些窗口細分為動態(tài)會話窗口。

無論哪種方式,使用傳統(tǒng)的批處理引擎來計算會話都不太理想, 一個更好的方法是以流的方式構(gòu)建會話,我們將在后面看到這一點。

5.3、Unbounded data — streaming

與大多數(shù)基于批處理的無界數(shù)據(jù)處理方法相反,流系統(tǒng)是為無界數(shù)據(jù)構(gòu)建的。 正如我前面提到的,對于許多真實世界的分布式輸入源,您不僅發(fā)現(xiàn)自己在處理無界數(shù)據(jù),而且還在處理以下數(shù)據(jù):

Highly unordered with respect to event times, 這意味著,如果您希望在數(shù)據(jù)發(fā)生的上下文中(即基于event time時間域)分析數(shù)據(jù),那么您的pipeline需要某種基于時間的shuffle?Of varying event time skew?:也就是說,在給定的event?time X內(nèi),你無法在一定時間Y內(nèi)看到大部分數(shù)據(jù)(或者說這個時間Y也是varying的)。

在處理具有這些特征的數(shù)據(jù)時,可以采用幾種方法, 我通常將這些方法分為四類:

1、Time-agnostic

2、Approximation

3、Windowing by processing time

4、Windowing by event time

我們現(xiàn)在花點時間來看看這些方法。

5.3.1、Time-agnostic

Time-agnostic processing在以下情況中使用: 時間是無關緊要的,所有相關邏輯都是數(shù)據(jù)驅(qū)動的。 由于這種情況下所有任務內(nèi)容都是由更多數(shù)據(jù)的到來決定的,所以流引擎除了基本的數(shù)據(jù)傳遞之外,實際上沒有什么特別的任務需要處理。 因此,基本上所有現(xiàn)有的流系統(tǒng)都支持開箱即用的與時間無關的場景( 當然,對于那些關心正確性的人來說,?system-to-system的變動具有一致的擔保)。 批處理系統(tǒng)也非常適合于對無界數(shù)據(jù)源進行時間無關的處理,只需將無界數(shù)據(jù)源分割為任意的有界數(shù)據(jù)集序列,并獨立地處理這些數(shù)據(jù)集。 在本節(jié)中,我們將會看到幾個具體的例子,但是考慮到對于時間無關數(shù)據(jù)處理的簡單性,在此之后,我們不會在這上面花費更多的時間。

(1)Filtering

時間無關處理的一種基本形式是過濾, 假設您正在處理Web流量日志,您只需要特定域名下的流量,其他域名下的流量數(shù)據(jù)過濾掉,?你可以在每個數(shù)據(jù)記錄到達時查看它,看看它是否屬于自己所需要的域名,如果不屬于,則將其刪除。?對于這類處理任務,在任何時候都只依賴于單個元素,所以數(shù)據(jù)源是無界的、無序的和具有 varying event time skew?的事實與過濾本身是無關的。

Figure 5: Filtering unbounded data

包含不同類型的數(shù)據(jù)集合(從左到右流動)被過濾為 包含單一類型的同構(gòu)集合

(2)Inner-joins

另一個與時間無關的例子是內(nèi)連接(或hashjoin)。 當連接兩個無界數(shù)據(jù)源時,如果您只關心來自兩個數(shù)據(jù)源的元素到達時連接的結(jié)果,即邏輯上就不存在時間元素。 當從一個源中看到一個值時,您可以簡單地對其狀態(tài)進行緩沖存儲;當?shù)诙€來自另一個源的值到達時,您只需要輸出連接后的記錄( 實際上,您可能希望針對未成功進行JOIN連接的數(shù)據(jù)使用某種垃圾收集策略,這種策略可能是基于時間的。但是對于不存在或很少的沒有完成JOIN的數(shù)據(jù)的情況,這樣的事情可能不是問題)

Figure 6: Performing an inner join on unbounded data.

當觀察到來自兩個源的匹配元素時,就會產(chǎn)生連接

切換到某種外部連接的語義時,將引入我們討論過的數(shù)據(jù)完整性問題: 一旦你看到了連接的一邊,你怎么知道另一邊是否會到達? 說實話,你不知道。 所以你必須引入“超時”的概念,它引入了時間元素, 這個時間元素本質(zhì)上是窗口的一種形式,我們稍后會更仔細地研究它。

5.3.2、Approximation algorithms

Figure 7: Computing approximations on unbounded data.

數(shù)據(jù)通過一種復雜的算法運行,生成的輸出數(shù)據(jù)或多或少與另一端期望的結(jié)果相似

第二種主要的方法是近似算法,例如 approximate Top-N,?streaming K-means,?等等。 它們獲取無限的輸入源,并提供輸出數(shù)據(jù),如果你瞇著眼看,這些數(shù)據(jù)或多或少有點像你希望得到的結(jié)果。 近似算法的優(yōu)點是,從設計上來說,它們的開銷很低,而且是為無界數(shù)據(jù)設計的。 缺點是它們的數(shù)量有限,算法本身往往很復雜( 這使得召喚新的元素變得很困難) 它們的近似性質(zhì)限制了它們的實用性。

值得注意的是:這些算法在設計中確實有一些時間元素( 某種固有的衰變), 由于它們在數(shù)據(jù)到達pipeline時就開始處理數(shù)據(jù),所以時間元素通常是基于processing time的。 對于那些在近似上提供某種可證明誤差界限的算法來說,這一點尤為重要。 如果這些錯誤界限是基于按順序到達的數(shù)據(jù)來預測的,那么當您使用 varying event-time skew的無序數(shù)據(jù)來訓練算法時,它們實際上沒有任何意義。這需要謹記。

近似算法本身是一個很吸引人的主題,但因為它們本質(zhì)上是時間無關處理的另一個例子( 調(diào)制算法本身的時間特性), 它們使用起來非常簡單,因此在我們目前關注的問題上不值得進一步關注。

5.3.3、Windowing

其余兩種處理無界數(shù)據(jù)的方法都是窗口的變體。 在深入研究它們之間的差異之前,我應該明確說明我所說的窗口化是什么意思,因為我只簡單地提到過它。 窗口就是從數(shù)據(jù)源獲取數(shù)據(jù)(要么無界,要么有界) 然后沿著時間邊界把它切成有限的塊進行處理。 下圖顯示了三種不同的窗口模式:

Figure 8: Example windowing strategies

每個示例顯示三個不同的鍵,突出顯示對齊窗口(應用于所有數(shù)據(jù))和未對齊窗口(應用于數(shù)據(jù)子集)之間的區(qū)別

?Fixed windows:固定窗口就是把時間切分成具有固定時間長度的時間片段,通常(正如圖8所示),固定窗口的時間段被應用于整個數(shù)據(jù)集,這就是對齊窗口( aligned?windows)的例子。在某些情況下,最好對不同的數(shù)據(jù)子集( (e.g., per key))進行?phase-shift the windows,以便隨著時間的推移更均勻分配窗口負載, 相反,這是一個未對齊窗口的例子,因為它們在數(shù)據(jù)上是不同的。

?Sliding windows:是固定窗口的一種推廣,滑動窗口由固定長度和固定周期定義,如果周期小于長度,那么窗口重疊。 如果周期等于長度,就有固定的窗口。 如果周期大于長度,就會有一個奇怪的抽樣窗口,它只查看隨時間變化的數(shù)據(jù)子集。 與固定窗口一樣,滑動窗口通常是對齊的,盡管在某些用例中出于性能優(yōu)化的考慮,它可能是不對齊的。 注意,圖8中的滑動窗口是為了給人一種滑動運動的感覺而繪制的; 實際上,所有五個窗口都適用于整個數(shù)據(jù)集。

?Sessions:? 一個滑動窗口的例子, 會話由一系列事件組成,這些事件的終止時間間隔大于某些超時時間, 會話通常用于分析用戶在一段時間內(nèi)的行為,方法是將一系列與時間相關的事件組合在一起(?例如,一次觀看的一系列視頻)。 會話很有趣,因為它們的長度不能預先定義, 它們依賴于所涉及的實際數(shù)據(jù)。 它們也是未對齊窗口的典型例子,因為在不同的數(shù)據(jù)子集之間,會話實際上從來不是相同的( 例如,不同的用戶)。

討論的兩個時間域——processing time與event time——本質(zhì)上是我們關心的[2]。 在這兩個領域中,窗口都是有意義的,因此我們將詳細研究每個時間域,并了解它們之間的區(qū)別。 由于processing time窗口在現(xiàn)有系統(tǒng)中更為常見,我將從這里開始。

(1)Windowing by processing time

Figure 9: Windowing into fixed windows by processing time.

數(shù)據(jù)根據(jù)到達pipeline的順序被收集到窗口中

當通過processing time劃分窗口時,系統(tǒng)基本上會將傳入的數(shù)據(jù)緩沖到窗口中,直到經(jīng)過了一定的processing time。 例如,在窗口長度為5分鐘的固定窗口的情況下,系統(tǒng)將為5分鐘的processing time緩沖數(shù)據(jù), 在此之后,它將把在這五分鐘內(nèi)觀察到的所有數(shù)據(jù)當作一個窗口,發(fā)送到下游進行處理。 processing time窗口有幾個很好的特性:

?這很簡單。這個實現(xiàn)非常簡單,因為您從來不用隨著時間的推移對數(shù)據(jù)進行shuffle(不用關心遲到的數(shù)據(jù))。你只需要在它們到達時進行緩沖,并在窗口關閉時將它們發(fā)送到下游。

?如果你想要根據(jù)觀察到的數(shù)據(jù)來推斷某些信息,processing time窗口正式你所需要的, 許多監(jiān)控場景都屬于這一類。 想象一下,跟蹤發(fā)送到全球范圍的Web服務的每秒請求數(shù)。 為了檢測中斷而計算這些請求的速率是processing time窗口一種完美的使用場景。

撇開好的方面不談,processing time窗口有一個非常大的缺點, 如果相關數(shù)據(jù)具有event time,那么如果processing time窗口要反映這些事件實際發(fā)生的時間,那么這些數(shù)據(jù)必須按照event time順序到達。 不幸的是,在許多實際的分布式輸入源中event time有序的數(shù)據(jù)并不常見。

舉個簡單的例子,想象一下任何一個移動應用程序,它收集統(tǒng)計數(shù)據(jù)以供以后處理。 如果給定的移動設備在任何一段時間內(nèi)都可能處于離線狀態(tài)(短暫的連接中斷,飛越國家時的飛行模式,等等), 在此期間記錄的數(shù)據(jù)將不會被上傳,直到設備再次上線。 這意味著數(shù)據(jù)可能以分鐘、小時、天、周或更長時間的 event time skew到達pipeline。在使用processing time劃分窗口時,基本上不可能從這樣的數(shù)據(jù)集中得出任何有用的推論。 另一個例子是,當整個系統(tǒng)運行良好時,許多分布式輸入源似乎會提供event time有序的數(shù)據(jù)(或者幾乎有序). 不幸的是,當輸入源處于健康狀態(tài)時,event time skew很低,這并不意味著它將始終保持這種狀態(tài)。 考慮一個全局服務的例子,它需要處理從多個大洲收集的數(shù)據(jù)。 如果網(wǎng)絡在帶寬受限的跨大陸線路上出現(xiàn)問題(遺憾的是,這種情況非常普遍)這些網(wǎng)絡問題會進一步降低帶寬和/或增加延遲,那么您的輸入數(shù)據(jù)中有一部分數(shù)據(jù)可能會出現(xiàn)比以前更大的skew。 如果通過processing time對這些數(shù)據(jù)進行窗口化處理,那個該窗口不再代表其所對應時間段內(nèi)實際發(fā)生的數(shù)據(jù); 相反,這是事件到達處理pipeline時的時間窗口,其中的數(shù)據(jù)是舊數(shù)據(jù)和當前數(shù)據(jù)的任意組合。

在這兩種情況下,我們真正想要的是通過event time來劃分窗口數(shù)據(jù),這種方式對事件到達的順序是可以保證的,?即我們真正想要的是event time窗口。

(2)Windowing by event time

event time窗口是在需要以有限個時間塊來觀察數(shù)據(jù)源時使用的窗口,這些數(shù)據(jù)源反映了這些事件實際發(fā)生的時間。 這是劃分窗口的黃金標準。 遺憾的是,目前使用的大多數(shù)數(shù)據(jù)處理系統(tǒng)缺乏對它的良好的支持( 盡管任何具有良好一致性模型的系統(tǒng),如Hadoop或SparkStreaming,都可以作為構(gòu)建這樣一個窗口系統(tǒng)的合理基礎)

這張圖顯示了一個將無限源按照長度為一小時的固定窗進行窗口劃分的例子:

Figure 10: Windowing into fixed windows by event time.

數(shù)據(jù)被收集到窗口中根據(jù)數(shù)據(jù)被pipeline發(fā)現(xiàn)的時間。白色箭頭顯示processing time窗口中到達的示例數(shù)據(jù),這些數(shù)據(jù)與它們所屬的event time窗口不同

圖中的白色實線顯示了兩個特定的數(shù)據(jù)。 這兩個數(shù)據(jù)都是在processing time窗口中到達的,它們所在的processing time窗口與它們所屬的event time窗口不匹配。 因此,如果這些數(shù)據(jù)被窗口化到processing time窗口中,而processing time窗口又與event time有關,那么計算出來的結(jié)果就不正確了。因此使用event time的一個好處就是,保證event time的正確性。

event time窗口在無界數(shù)據(jù)源上的另一個好處是,您可以創(chuàng)建動態(tài)大小的窗口,例如session, 沒有像之前那種,在固定窗口上生成session時所產(chǎn)生的對session的任意分割(正如前面session例子中所看到的)。

Figure 11: Windowing into session windows by event time.

數(shù)據(jù)被收集到會話窗口,根據(jù)相應事件發(fā)生的時間捕獲活動的突發(fā)事件。白色箭頭再次指出將數(shù)據(jù)放入正確的event time位置所需的時間shuffer。

當然,強大的語義很少免費提供(是有代價的),event time窗口也不例外。 因為窗口的生存時間(在處理過程中)通常比窗口本身的實際長度長(因為會有遲到的數(shù)據(jù)),event time 窗口有兩個明顯的缺點,:

Buffering: 由于窗口壽命的延長,需要更多的數(shù)據(jù)緩沖。 值得慶幸的是,持久存儲通常是大多數(shù)數(shù)據(jù)處理系統(tǒng)所依賴的資源類型中最便宜的一種( 其他的主要是CPU、網(wǎng)絡帶寬和RAM)。 因此,在使用任何設計良好的數(shù)據(jù)處理系統(tǒng)(具有強一致的持久狀態(tài)和良好的內(nèi)存緩存層)時,這個問題通常不像人們想象的那么嚴重, 此外,許多有用的聚合不需要緩沖整個輸入集(例如,sum或average),而是可以使用存儲在持久化狀態(tài)中的更小的中間聚合數(shù)據(jù)增量地執(zhí)行。

Completeness: 由于我們通常無法很好地知道什么時候看到了給定窗口的所有數(shù)據(jù),那么我們?nèi)绾沃来翱诘慕Y(jié)果什么時候可以物化呢?事實上,我們無法知道。 對于許多類型的輸入,系統(tǒng)可以通過諸如MillWheel watermarks?(我將在第2部分中詳細討論)之類的東西給出一個相當精確的啟發(fā)式窗口進行估計。 但在要求數(shù)據(jù)絕對正確情況下(例如,計費),唯一真正的選擇是為管道構(gòu)建器提供一種方法,以便在pipeline builder想要物化windows的結(jié)果時表達這些結(jié)果,以及如何隨著時間的推移對這些結(jié)果進行細化。 處理窗口完整性(或缺少窗口完整性)是一個很吸引人的主題,但在具體的示例中(我們將在下一節(jié)中討論)可能會得到最好的探討。

六、Conclusion

Whew!?這是很多信息。你們中間有走到這地步的,當受人稱贊。 在這一點上,我們已經(jīng)大致完成了我想要介紹的內(nèi)容的一半,因此,退一步,重述我到目前為止所講的內(nèi)容,并讓事情在進入第2部分之前稍作休息是合理的。 這一切的好處是,第1部分是枯燥的內(nèi)容;第2部分是樂趣真正開始的地方。

七、Recap

明確的術語,明確地縮小了“Streaming”的定義,例如:只適用于執(zhí)行引擎,同時使用更多的描述性術語,如無界數(shù)據(jù)( unbounded data)和近似/推測的結(jié)果( approximate/speculative results),這些不同的概念通常被歸類在“Streaming”的范疇

評估了設計良好的批處理和流系統(tǒng)的相對功能,假設流處理實際上是批處理的超集(即流處理系統(tǒng)完全可以勝任所有批處理任務),以及Lambda體系結(jié)構(gòu)(基于流處理不如批處理的假設)等概念,隨著流處理系統(tǒng)的成熟,這些概念注定要退役。

提出了Streaming系統(tǒng)追趕并最終超越批處理所必需的兩個高級概念, 它們分別是正確性( correctness)和時間推理的工具( tools for reasoning about time)。

建立event time與processing time的重要區(qū)別, 描述這些差異在分析數(shù)據(jù)時所帶來的困難, 并提出了一種方法上的轉(zhuǎn)變,從完整性的概念轉(zhuǎn)變?yōu)楹唵蔚剡m應數(shù)據(jù)隨時間的變化。

通過批處理和流引擎,研究了當今常用的有界和無界數(shù)據(jù)的主要數(shù)據(jù)處理方法, 將無界方法大致分類: time-agnostic,?approximation,?windowing by processing time, and?windowing by event time.

八、Next time

本文為我將在第2部分中探討的具體示例提供了必要的上下文。文章中大致包括如下內(nèi)容:

從概念上看,我們是如何分解數(shù)據(jù)流模型中數(shù)據(jù)處理的概念的: what,?where,?when, and?how.

詳細介紹如何跨多個場景處理簡單、具體的示例數(shù)據(jù)集, 突出 Dataflow Model支持的多個用例,以及所涉及的具體api。 這些示例將幫助您理解本文中介紹的event time與processing time的概念,同時還將探索新的概念,如 ?watermarks.

比較現(xiàn)有的數(shù)據(jù)處理系統(tǒng)的重要特征,這些特征的在這兩個文章中都有提及,以便于更好的在這些數(shù)據(jù)系統(tǒng)中進行選擇,并鼓勵對不足的地方進行改進,我的終極目標是改善數(shù)據(jù)處理系統(tǒng),特別是整個大數(shù)據(jù)社區(qū)中的流式系統(tǒng),。

應該是一段美好的時光。到時候見!

???未完待續(xù) ??

微信公眾號
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容