【Data Flow】The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost i...

正文之前

終于翻譯完了,可以開始看論文了,開心啊。。。。。。

正文

Event time for a given event essentially never changes, but processing time changes constantly for each event as it flows through the pipeline and time marches ever forward. This is an important distinction when it comes to robustly analyzing events in the context of when they occurred.

一個給定的事件的時間基本不會變,但是處理時間就會隨著事件的數(shù)據(jù)在處理管道一步步被處理而隨時間前移而不斷地變化。這是一個十分重要的區(qū)別,特別是我們迫切需要根據(jù)事件的發(fā)生時間進行分析的時候。

During processing, the realities of the systems in use (communication delays, scheduling algorithms, time spent processing, pipeline serialization, etc.) result in an inherent and dynamically changing amount of skew between the two domains. Global progress metrics, such as punctuations or watermarks, provide a good way to visualize this skew. For our purposes, we’ll consider something like MillWheel’s watermark, which is a lower bound (often heuristically established) on event times that have been processed by the pipeline. As we’ve made very clear above, notions of completeness are generally incompatible with correctness, so we won’t rely on watermarks as such. They do, however, provide a useful notion of when the system thinks it likely that all data up to a given point in event time have been observed, and thus find application in not only visualizing skew, but in monitoring overall system health and progress, as well as making decisions around progress that do not require complete accuracy, such as basic garbage collection policies.

在處理過程中,由于使用的系統(tǒng)的一些現(xiàn)實因素的影響(通信延遲、調(diào)度算法、處理時間、流水線序列化等),或導致這兩個時間域中產(chǎn)生一些內(nèi)在的、動態(tài)的波動變化。全局數(shù)據(jù)處理進度,比如標記或者水位標記是一種很好的將上述差值可視化的辦法。為了達到這個目的,考慮類似MillWheel的水位標記,它是一個下界,代表著在這個時間之前的數(shù)據(jù)已經(jīng)完全被系統(tǒng)處理了(通常采用啟發(fā)式的方法建立。)前面我們已經(jīng)說的很清楚了,數(shù)據(jù)已經(jīng)被完全處理的標記往往是和正確性不兼容,相互沖突的。所以我們同樣不應該依賴于水位標記。但是,它提供了一個很有用的概念,當系統(tǒng)認為一個時間節(jié)點之前的數(shù)據(jù)都已經(jīng)全部被觀察到了。應用就可以用這個時間節(jié)點來可視化處理時間差,并且還能檢測系統(tǒng)上層的健康狀態(tài)和進程,還可以對一些對精確度要求不高的決策,比如基本的垃圾回收策略等進行決策。

In an ideal world, time domain skew would always be zero; we would always be processing all events immediately as they happen. Reality is not so favorable, however, and often what we end up with looks more like Figure 2. Starting around 12:00, the watermark starts to skew more away from real time as the pipeline lags, diving back close to real time around 12:02, then lagging behind again noticeably by the time 12:03 rolls around. This dynamic variance in skew is very common in distributed data processing systems, and will play a big role in defining what functionality is necessary for providing correct, repeatable results.

理想情況下,系統(tǒng)時間與時間處理時間之間的差為0,我們應該是在事件發(fā)生的時候就立刻處理。實際上遠沒有那么好,往往更像是圖2所顯示的那樣子。從12點左右開始,管道處理開始遲滯,水位標記出現(xiàn)偏差,12:02左右的時候又開始靠攏,然后又開始在12:03的時候又有了很大的遲滯。分布式數(shù)據(jù)處理系統(tǒng)中這個動態(tài)的時間差變量很常見,而且它在考慮如何提供一個正確的,可重復的結果的時候是一個必須考慮的重要角色(變量)。

2. DATAFLOW MODEL

2. 數(shù)據(jù)流模型

In this section, we will define the formal model for the system and explain why its semantics are general enough to subsume the standard batch, micro-batch, and streaming models, as well as the hybrid streaming and batch semantics of the Lambda Architecture. For code examples, we will use a simplified variant of the Dataflow Java SDK, which itself is an evolution of the FlumeJava API.

這一節(jié)中,我們將會正式定義一個系統(tǒng)模型,并且解釋為什么它的語義能夠泛化到將批處理、微批處理、流失模型歸納到一起,同時還有混合流和Lambda架構的語義。舉個編程的實例,我們將會用DataFlow 的Java SDK的一個簡單變體來進行示例,這個變體也是FlumeJava API演化來的。

2.1 Core Primitives

2.1 核心基元

To begin with, let us consider primitives from the classic batch model. The Dataflow SDK has two core transforms that operate on the (key,value) pairs flowing through the system:

首先讓我們來考慮經(jīng)典的批處理模型的基元。DataFlow SDK在通過系統(tǒng)轉換成的鍵值對上有兩個核心的轉換操作:

? ParDo for generic parallel processing. Each input element to be processed (which itself may be a finite collection) is provided to a user-defined function (called a DoFn in Dataflow), which can yield zero or more output elements per input. For example, consider an operation which expands all prefixes of the input key, duplicating the value across them:

ParDo 用于并行處理。每一個要被出處理的輸入(可能是個有限集合)都會被提供一個用戶定義的函數(shù)(在DataFlow里面被稱為 DoFn ),它可以為每一個輸入產(chǎn)生一個或者多個的輸出。例如,考慮一個給輸入的鍵擴展所有前綴的操作,在他們之間復制所有的值:

? GroupByKey for key-grouping (key, value) pairs.

GroupByKey 用來按照鍵對鍵值對元素重新分組。

The ParDo operation operates element-wise on each input element, and thus translates naturally to unbounded data. The GroupByKey operation, on the other hand, collects all data for a given key before sending them downstream for reduction. If the input source is unbounded, we have no way of knowing when it will end. The common solution to this problem is to window the data.

ParDo 操作因為對每個輸入數(shù)據(jù)進行處理,因此自然地轉化為無界數(shù)據(jù)。而另一方面,GroupByKey操作則是根據(jù)給定的Key值,在送到下流進行聚合前,收集所有此鍵對應的數(shù)據(jù)。如果輸入源無界,那么我們就無法知道是什么時候結束了。通用的解決辦法是對數(shù)據(jù)進行窗口化。

2.2 Windowing

2.2 窗口化

Systems which support grouping typically redefine their GroupByKey operation to essentially be GroupByKeyAndWindow. Our primary contribution here is support for unaligned windows, for which there are two key insights. The first is that it is simpler to treat all windowing strategies as unaligned from the perspective of the model, and allow underlying implementations to apply optimizations relevant to the aligned cases where applicable. The second is that windowing can be broken apart into two related operations:

支持分組的系統(tǒng)通常都會重定義其GroupByKey操作為GroupByKeyWindow操作。我們的主要的貢獻就是支持不對稱窗口,這個貢獻包含兩個關鍵見解。第一個是:從模型的角度來看,將所有的窗口視為未對齊的會比較簡單,并且允許底層實現(xiàn)對對齊窗口的相關優(yōu)化(而底層實現(xiàn)來負責把對齊窗口作為一個特例進行優(yōu)化)。第二個是:窗口化可以被分割成兩個相關的操作:

? Set<Window> AssignWindows(T datum), which assigns the element to zero or more windows. This is essentially the Bucket Operator from Li.

使用 Set<Window> AssignWindows(T datum) 將元素分配給0或者幾個窗口(窗口分配操作)。這是Li在22文獻中提到的Bucket(桶)操作符。

? Set<Window> MergeWindows(Set<Window> windows), which merges windows at grouping time. This allows datadriven windows to be constructed over time as data arrive and are grouped together.

通過Set<Window> MergeWindows(Set<Window> windows),可以在匯總數(shù)據(jù)。這一操作允許數(shù)據(jù)驅動窗口可以在數(shù)據(jù)到達的過程中逐漸建立并且被組合。

For any given windowing strategy, the two operations are intimately related; sliding window assignment requires sliding window merging, sessions window assignment requires sessions window merging, etc.

Note that, to support event-time windowing natively, instead of passing (key,value) pairs through the system, we now pass (key, value, event time, window) 4-tuples. Elements are provided to the system with event-time timestamps (which may also be modified at any point in the pipeline), and are initially assigned to a default global window, covering all of event time, providing semantics that match the defaults in the standard batch model.

對于任何給定的窗口策略,這兩個操作都是相互關聯(lián)的;滑動窗口分配需要滑動窗口合并。會話窗口分配需要會話窗口合并等。請注意,為了支持事件時間窗口化,而不是通過系統(tǒng)傳遞鍵值對,我們要傳遞【鍵-值-事件時間-窗口】這個四元元組。被提交到系統(tǒng)的元素需要自帶一個事件發(fā)生時間戳(在后面的管道處理過程中可能會被修改),并且初始分配到一個全局的窗口中,這個窗口覆蓋了所有的事件發(fā)生時間,還提供了一個標準的批處理模型的語義作為默認配置。

2.2.1 Window Assignment

2.2.1 窗口分配

From the model’s perspective, window assignment creates a new copy of the element in each of the windows to which it has been assigned. For example, consider windowing a dataset by sliding windows of two-minute width and one- minute period, as shown in Figure 3 (for brevity, timestamps are given in HH:MM format).

從模型的角度來說,窗口的分配創(chuàng)造了一個窗口內(nèi)所有元素的副本到其被分配的對應窗口。比如考慮一個滑動窗口的數(shù)據(jù)集分配,它的寬度為2mins,并且滑動周期為1min,如圖3(簡單起見,時間戳格式為:HH:MM)。

In this case, each of the two (key,value) pairs is duplicated to exist in both of the windows that overlapped the element’s timestamp. Since windows are associated directly with the elements to which they belong, this means window assignment can happen anywhere in the pipeline before grouping is applied. This is important, as the grouping operation may be buried somewhere downstream inside a composite transformation (e.g. Sum.integersPerKey()).

在這個例子里面,兩條四元元組數(shù)據(jù)被復制到每一個元素時間戳重疊的窗口。因為窗口是直接與元素相關,這就意味著窗口分配在處理管道的聚合動作發(fā)生前隨時都可能會發(fā)生。這很重要!因為聚合操作可能會出現(xiàn)在下游的某一個復雜的組合變化中。(比如Sum.integersPerKey())

2.2.2 Window Merging

2.2.2 窗口合并

Window merging occurs as part of the GroupByKeyAndWindow operation, and is best explained in the context of an example. We will use session windowing since it is our motivating use case. Figure 4 shows four example data, three for k1 and one for k2, as they are windowed by session, with a 30-minute session timeout. All are initially placed in a default global window by the system. The sessions implementation of AssignWindows puts each element into a single window that extends 30 minutes beyond its own timestamp; this window denotes the range of time into which later events can fall if they are to be considered part of the same session. We then begin the GroupByKeyAndWindow operation, which is really a five-part composite operation:

窗口合并是GroupByKeyWindow操作的一個子操作,這一點我們最好在下文的例子中進行說明。我們將會使用會話窗口因為它是我們很想要解決的用例。圖4展示了四個示例數(shù)據(jù)。3條包含了k1,一條包含了k2,并且他們是被會話劃分的窗口,會話的過期時間是30mins。所有的數(shù)據(jù)都被初始化放置在一個全局的窗口中。AssignWindows的會話實現(xiàn)把每一個元素放到一個由它自身的時間戳往外延伸30mins長的單獨的窗口中,這個窗口表示后面的事件如果可以被納入同一個會話的話就會落入這個窗口中(這個窗口的時間段如果和另外一個窗口的時間段相互重合,則意味著這兩個窗口應該屬于同一個會話)。然后我們就開始由五個部分所混合組成的GroupByKeyAndWindow操作:

? DropTimestamps - Drops element timestamps, as only the window is relevant from here on out.

DropTimestamps :丟棄元素的時間戳,在輸出里面只有窗口的與此相關(后續(xù)輸出里面我們只關心窗口);

? GroupByKey - Groups (value, window) tuples by key.

GroupByKey 按照鍵值將(值,窗口)分組。

? MergeWindows - Merges the set of currently buffered windows for a key. The actual merge logic is defined by the windowing strategy. In this case, the windows for v1 and v4 overlap, so the sessions windowing strategy merges them into a single new, larger session, as indicated in bold.

MergeWindows 把同一個鍵對應的(值,窗口)進行窗口合并。具體的合并方式取決于窗口策略。在這個例子中,v1的窗口和v4的窗口重疊了。所以會話窗口策略將他們合并成一個單獨的、更大的會話,如粗體所示。

? GroupAlsoByWindow - For each key, groups values by window. After merging in the prior step, v1 and v4 are now in identical windows, and thus are grouped together at this step.

GroupAlsoByWindow 對每一個鍵,通過窗口對值進行分組。在上一步的合并后,v1和v4在一個完全相同的窗口中,并且這一步被分在同一個組里面。

? ExpandToElements - Expands per-key, per-window groups of values into (key, value, event time, window) tuples, with new per-window timestamps. In this example, we set the timestamp to the end of the window, but any timestamp greater than or equal to the timestamp of the earliest event in the window is valid with respect to watermark correctness.

ExpandToElements 擴展每一個鍵、每一個窗口的值,用新的時間戳使之成為(鍵、值、事件時間、窗口)的四元元組。在這個例子里面,我們將時間設置為窗口的末尾,但是任何時間戳都比最早的事件時間要好,或者差不多,因為這樣就符合水位標記的正確性需求。

2.2.3 API

As a brief example of the use of windowing in practice, consider the following Cloud Dataflow SDK code to calculate keyed integer sums:

作為一個簡單的窗口應用實例??紤]下面的Cloud Dataflow SDK編程計算鍵控整數(shù)和:

To do the same thing, but windowed into sessions with a 30-minute timeout as in Figure 4, one would add a single Window.into call before initiating the summation:

要做到一樣的效果,但是窗口化為一個30mins終止時間的會話窗口,好比在圖4中那樣。那么我們在開始求和之前就要加一個單獨的Window.into 調(diào)用。

2.3 Triggers & Incremental Processing

2.3 觸發(fā)器 & 增量處理

The ability to build unaligned, event-time windows is an improvement, but now we have two more shortcomings to address:

建立非對齊、基于事件時間窗口的能力是一種改進,但是現(xiàn)在我們需要說明兩個缺點:

? We need some way of providing support for tuple- and processing-time-based windows, otherwise we have regressed our windowing semantics relative to other systems in existence.

我們需要一些方法去提供基于記錄(元組)和基于處理時間的窗口。否則就會出現(xiàn)我們的窗口語義與別的系統(tǒng)不兼容的情況。

? We need some way of knowing when to emit the results for a window. Since the data are unordered with respect to event time, we require some other signal to tell us when the window is done.

我們需要一些方法來了解到窗口的結果。因為數(shù)據(jù)無序隨我們需要一些其他的信號來告訴我們窗口結束了。

The problem of tuple- and processing-time-based windows we will address in Section 2.4, once we have built up a solution to the window completeness problem. As to window completeness, an initial inclination for solving it might be to use some sort of global event-time progress metric, such as watermarks. However, watermarks themselves have two major shortcomings with respect to correctness:

關于第一點,基于記錄和基于處理時間的窗口在2.4節(jié)中就會討論到,現(xiàn)在我們要構建了一個窗口完整性問題的解決方案。對于窗口完整性問題,一開始我們傾向于使用一些全局的事件時間進度標記,比如水位標記來解決它。但是會未標記這種又有兩個正確性方面的缺點:

? They are sometimes too fast, meaning there may be late data that arrives behind the watermark. For many distributed data sources, it is intractable to derive a completely perfect event time watermark, and thus impossible to rely on it solely if we want 100% correctness in our output data.

它們有時候太快了(水位標記可能設置的過短),這就意味著會有數(shù)據(jù)在水位標記之后到達。對于很多分布式數(shù)據(jù)源來說,一個很難解決的問題就是得到完美的事件時間標記,因此如果我們想要100%的輸出結果正確率,那么就不能只依靠水位標記了。

? They are sometimes too slow. Because they are a global progress metric, the watermark can be held back for the entire pipeline by a single slow datum. And even for healthy pipelines with little variability in event-time skew, the baseline level of skew may still be multiple minutes or more, depending upon the input source. As a result, using watermarks as the sole signal for emitting window results is likely to yield higher latency of overall results than, for example, a comparable Lambda Architecture pipeline.

它們有時候太慢了(水位標記可能設置的過長),因為水位標記是全局進程標志,它可以只要一個數(shù)據(jù)遲到就會影響到整個水位標記。甚至對一些只有一些小小的事件時間延遲的健康管道而言,延遲基準可能幾分鐘或者更多,這取決于輸入源。因此,使用水位標記作為唯一的窗口完整觸發(fā)信號,可能導致整個處理結果比諸如Lambda架構管道的處理結果更大的延遲。

For these reasons, we postulate that watermarks alone are insufficient. A useful insight in addressing the completeness problem is that the Lambda Architecture effectively sidesteps the issue: it does not solve the completeness problem by somehow providing correct answers faster; it simply provides the best low-latency estimate of a result that the streaming pipeline can provide, with the promise of eventual consistency and correctness once the batch pipeline runs. If we want to do the same thing from within a single pipeline (regardless of execution engine), then we will need a way to provide multiple answers (or panes) for any given window. We call this feature triggers, since they allow the specification of when to trigger the output results for a given window.

因為這些原因,我們假設單獨的水位標記是不夠的。一個在完整性問題方面的實用深刻的見解是Labmda架構的有效地回避了這個問題:它并不是通過提供一個更快的方式來解決這個問題,它只提供了最好的流管道輸出結果低延遲預估,而且承諾通過批處理保證結果的一致性和正確性。如果我們通過單獨的處理管道(不論是什么執(zhí)行引擎)做到同樣的事情,那么我們將需要一種對任何給定的窗口提供多種答案的方式(或者可以叫做窗格 譯者注:對窗口這個比喻的引申)。我們稱之為觸發(fā)器,它們可以選擇在何時觸發(fā)指定窗口的輸出結果。

In a nutshell, triggers are a mechanism for stimulating the production of GroupByKeyAndWindow results in response to internal or external signals. They are complementary to the windowing model, in that they each affect system behaviour along a different axis of time:

簡言之,觸發(fā)器是一種受內(nèi)部或者外部信號觸發(fā),從而激發(fā)GroupByKeyAndWindow執(zhí)行并且將結果輸出的機制。它們與窗口模型互補,因為它們從不同的時間維度影響著系統(tǒng)的行為:

? Windowing determines where in event time data are grouped together for processing.

窗口化將基于事件時間的數(shù)據(jù)(where 基于時間段選擇)進行分組等待處理。

? Triggering determines when in processing time the results of groupings are emitted as panes.

觸發(fā)器的目的是在處理過程中(when 基于處理時間選擇)決定分組結果作為窗格輸出。

Our systems provide predefined trigger implementations for triggering at completion estimates (e.g. watermarks, including percentile watermarks, which provide useful semantics for dealing with stragglers in both batch and streaming execution engines when you care more about processing a minimum percentage of the input data quickly than processing every last piece of it), at points in processing time, and in response to data arriving (counts, bytes, data punctuations, pattern matching, etc.). We also support composing triggers into logical combinations (and, or, etc.), loops, sequences, and other such constructions. In addition, users may define their own triggers utilizing both the underlying primitives of the execution runtime (e.g. watermark timers, processingtime timers, data arrival, composition support) and any other relevant external signals (data injection requests, external progress metrics, RPC completion callbacks, etc.). We will look more closely at examples in Section 2.4.

我們的系統(tǒng)提供了基于窗口的完成度的估計的預定義觸發(fā)器(比如,水位標記,包含百分比水位標記,當我們更關心迅速處理一小部分輸入信息而不是等待最后的那一點數(shù)據(jù)到來時,它能提供一個實用的語義在批處理和流式系統(tǒng)中處理遲來的數(shù)據(jù))。觸發(fā)器有基于處理時間的,有基于數(shù)據(jù)抵達清苦啊大哥(數(shù)據(jù)條數(shù),字節(jié)數(shù),數(shù)據(jù)到達標記,模式匹配等),我們也支持對基礎觸發(fā)器進行邏輯的組合(與、或)、循環(huán)、序列和一些其他的復合構造方法。另外,用戶可以基于系統(tǒng)底層執(zhí)行時間(比如,水位標記計時器,處理時間計時器,數(shù)據(jù)到達,復合構造)和任何的其他的外部信號(數(shù)據(jù)注入請求,外部進程進度指標,RPC完成回調(diào)等)來自定義觸發(fā)器。我們會在2.4節(jié)節(jié)中看到更多的具體的例子。

In addition to controlling when results are emitted, the triggers system provides a way to control how multiple panes for the same window relate to each other, via three different refinement modes:

除了控制結果輸出,觸發(fā)器系統(tǒng)也定義了控制一個窗口的多個相互關聯(lián)的窗格的方法,主要通過三種不同的模式:

? Discarding: Upon triggering, window contents are discarded, and later results bear no relation to previous results. This mode is useful in cases where the downstream consumer of the data (either internal or external to the pipeline) expects the values from various trigger fires to be independent (e.g. when injecting into a system that generates a sum of the values injected). It is also the most efficient in terms of amount of data buffered, though for associative and commutative operations which can be modeled as a Dataflow Combiner, the efficiency delta will often be minimal. For our video sessions use case, this is not sufficient, since it is impractical to require downstream consumers of our data to stitch together partial sessions.

丟棄:觸發(fā)后,窗口內(nèi)容就被丟棄了,之后的結果與之前的結果無關。這個模式在下游數(shù)據(jù)使用者(管道內(nèi)部或者外部)希望來自不同的觸發(fā)器計算結果是各自獨立的(比如當數(shù)據(jù)注入到求和系統(tǒng)中的時候)時候很實用。這個模式在數(shù)據(jù)緩存方面也是最有效的,然而對于可以建模成Dataflow的Combiner的累積的、交替操作,效率增量一般是最小的。在視頻會話實例場景中,拋棄模式效率并不好,因為要求下游的數(shù)據(jù)消費者只關心部分的會話是不切實際的。

? Accumulating: Upon triggering, window contents are left intact in persistent state, and later results become a refinement of previous results. This is useful when the downstream consumer expects to overwrite old values with new ones when receiving multiple results for the same window, and is effectively the mode used in Lambda Architecture systems, where the streaming pipeline produces low-latency results, which are then overwritten in the future by the results from the batch pipeline. For video sessions, this might be sufficient if we are simply calculating sessions and then immediately writing them to some output source that supports updates (e.g. a database or key/value store).

累積:觸發(fā)后,窗口內(nèi)容被完整的保留在持久化的狀態(tài)中,后來的結果就成了對前一次結果的改良。當下游的數(shù)據(jù)消費者希望在接收到同一個窗口的多個計算結果,并且希望用新的結果覆寫舊的數(shù)據(jù)的時候這個模式就很有用了。當流管道產(chǎn)生可能在后面被批處理管道產(chǎn)生的新結果覆寫的低延遲結果的時候,這個模式在Lambda架構里面十分有效。對于視頻會話實例場景中,如果單純的計算然后立刻寫到一些支持更新的輸出中的話,這個模式很有效(數(shù)據(jù)庫或者鍵/值存儲)。

? Accumulating & Retracting: Upon triggering, in addition to the Accumulating semantics, a copy of the emitted value is also stored in persistent state. When the window triggers again in the future, a retraction for the previous value will be emitted first, followed by the new value as a normal datum. Retractions are necessary in pipelines with multiple serial GroupByKeyAndWindow operations, since the multiple results generated by a single window over subsequent trigger fires may end up on separate keys when grouped downstream. In that case, the second grouping operation will generate incorrect results for those keys unless it is informed via a retraction that the effects of the original output should be reversed. Dataflow Combiner operations that are also reversible can support retractions efficiently via an uncombine method. For video sessions, this mode is the ideal. If we are performing aggregations downstream from session creation that depend on properties of the sessions themselves, for example detecting unpopular ads (such as those which are viewed for less than five seconds in a majority of sessions), initial results may be invalidated as inputs evolve over time, e.g. as a significant number of offline mobile viewers come back online and upload session data. Retractions provide a way for us to adapt to these types of changes in complex pipelines with multiple serial grouping stages.

累積和撤回:觸發(fā)后,除了累積語義之外,一個輸出副本也被存儲在持久化的狀態(tài)中。當窗口再次被觸發(fā)的時候,一個以前的輸出結果的撤回操作講會首先執(zhí)行,然后新的結果作為正常輸出。撤回在多個串行GroupByKeyAndWindow操作中是必須的,因為由單一窗口所產(chǎn)生的多個觸發(fā)結果可能會在下游中被分組到不同的鍵對應的數(shù)據(jù)中去。在這種情況下,除非通過一個撤回操作撤回上一次分組的結果,否則第二個分組操作就會產(chǎn)生錯誤的結果。Dataflow Combiner操作同樣也是可逆的,它可以通過uncombine方法有效的支持撤回操作。在視頻會話中,這個模式是很理想的。如果我們在下游會話創(chuàng)建的一開始,就基于會話本身的一些屬性執(zhí)行匯總操作,比如檢查不受歡迎的廣告(比如在很多的會話中都被關看不到5秒的廣告),初始的結果隨著輸入的增加會變的無效,比如說離線手機用戶再次上線并且更新數(shù)據(jù)。撤回為我們提供了一種適應這些復雜的、有多個串行組合階段的管道的方法。(簡單的撤回實現(xiàn)只能支持確定性的計算,而非確定性計算的支持需要更復雜,代價也更高。我們已經(jīng)看到這樣的使用場景,比如說概率模型 譯者注:比如說基于布隆過濾器的UV統(tǒng)計)

2.4 Examples

2.4 例子

We will now consider a series of examples that highlight the plurality of useful output patterns supported by the Dataflow Model. We will look at each example in the context of the integer summation pipeline from Section 2.2.3:

我們將會考慮一系列的例子,并且高亮顯示Dataflow模式支持的多種有用的輸出格式。下面的例子是關于2.2.3 中提到的整數(shù)求和管道:

Let us assume we have an input source from which we are observing ten data points, each themselves small integer values. We will consider them in the context of both bounded and unbounded data sources. For diagrammatic simplicity, we will assume all these data are for the same key; in a real pipeline, the types of operations we describe here would be happening in parallel for multiple keys. Figure 5 diagrams how these data relate together along both axes of time we care about. The X axis plots the data in event time (i.e. when the events actually occurred), while the Y axis plots the data in processing time (i.e. when the pipeline observes them). All examples assume execution on our streaming engine unless otherwise specified.

假設有一個輸入源,我們從中選擇十個數(shù)據(jù)點,每一個都是比較小的整數(shù)。我們將會考慮有界輸入源與無界數(shù)據(jù)輸入源兩種情況。為了畫圖的簡單,我們假設所有的數(shù)據(jù)都屬于一個鍵。在一個真正的管道中,我們描述的操作類型都是多個鍵并行處理的。圖5 畫出了我們關心的兩個時間軸的聯(lián)系。X軸是事件時間(比如事件實際發(fā)生的時間),Y軸畫出了處理時間(管道聚焦于這些數(shù)據(jù)上的時間)所有的例子都假設數(shù)據(jù)處理執(zhí)行都是在流處理引擎上的。

Many of the examples will also depend on watermarks, in which cases we will include them in our diagrams. We will graph both the ideal watermark and an example actual watermark. The straight dotted line with slope of one represents the ideal watermark, i.e. if there were no event-time skew and all events were processed by the system as they occurred. Given the vagaries of distributed systems, skew is a common occurrence; this is exemplified by the meandering path the actual watermark takes in Figure 5, represented by the darker, dashed line. Note also that the heuristic nature of this watermark is exemplified by the single “l(fā)ate” datum with value 9 that appears behind the watermark.

在很多的例子中也會用到水位標記,在我們的圖中也用到了。我們畫出理想的水位標記和實際的水位標記。直的虛線代表著理想水位標記,也就是說在系統(tǒng)在事件發(fā)生的時候就已經(jīng)處理了它們,以至于沒有事件時間延遲。不過考慮到分布式系統(tǒng)的一些特性,延遲是一個常見的偏差;在圖5中實際的那條黑色的、彎曲的水位標記線就很好的例證了這一點。另外要注意,在水位標記后面出現(xiàn)的單獨的延遲的數(shù)值9也表明了實際的水位標記線是猜測獲得的。

If we were to process these data in a classic batch system using the described summation pipeline, we would wait for all the data to arrive, group them together into one bundle (since these data are all for the same key), and sum their values to arrive at total result of 51. This result is represented by the darkened rectangle in Figure 6, whose area covers the ranges of event and processing time included in the sum (with the top of the rectangle denoting when in processing time the result was materialized). Since classic batch processing is event-time agnostic, the result is contained within a single global window covering all of event time. And since outputs are only calculated once all inputs are received, the result covers all of processing time for the execution.

如果我們在傳統(tǒng)的批處理系統(tǒng)上構建的上述的數(shù)據(jù)求和管道處理數(shù)據(jù),我們應該等到所有的數(shù)據(jù)都抵達,將他們聚合成一批(假設這些數(shù)據(jù)都是同一個鍵對應的值),然后再進行求和,最后得到結果51。這個結果的示意圖如圖6中的黑色邊框,長方形區(qū)域代表求和運算涵蓋的處理時間和事件時間。

Note the inclusion of watermarks in this diagram. Though not typically used for classic batch processing, watermarks would semantically be held at the beginning of time until all data had been processed, then advanced to infinity. An important point to note is that one can get identical semantics to classic batch by running the data through a streaming system with watermarks progressed in this manner.

注意此圖中包含水位標記,雖然通常不用于經(jīng)典的批處理,但是在語義上我們?nèi)匀豢梢砸脒@個概念。在一開始的時候在所有的數(shù)據(jù)都到達之前水位標記線是不動的,到達之后水位線近似平行于事件發(fā)生時間軸開始平移,然后一直到無限遠處。一個重點是,我們可以通過在流式系統(tǒng)上運行一個水位標記進程來在批處理系統(tǒng)上得到同樣的語義。(這提示我們其實水位線的概念可以同樣適用于批處理)

Now let us say we want to convert this pipeline to run over an unbounded data source. In Dataflow, the default triggering semantics are to emit windows when the watermark passes them. But when using the global window with an unbounded input source, we are guaranteed that will never happen, since the global window covers all of event time. As such, we will need to either trigger by something other than the default trigger, or window by something other than the global window. Otherwise, we will never get any output.

我們想要轉換管道讓它可以在無界數(shù)據(jù)上運行。在Dataflow里面,默認的觸發(fā)器語義是當水位標記通過的時候制造窗口。但是當對無界的輸入數(shù)據(jù)源使用全局窗口的時候,那么窗口就永遠都不會被觸發(fā),因為全局窗口覆蓋了所有的事件時間。因此,我們需要在默認觸發(fā)器外設置其他的觸發(fā)器計算。或者按照另外的方式打開窗口而不是全局窗口。否則我們就永遠得不到任何輸出。

Let us first look at changing the trigger, since this will allow us to to generate conceptually identical output (a global per-key sum over all time), but with periodic updates. In this example, we apply a Window.trigger operation that repeatedly fires on one-minute periodic processing-time boundaries. We also specify Accumulating mode so that our global sum will be refined over time (this assumes we have an output sink into which we can simply overwrite previous results for the key with new results, e.g. a database or key/value store). Thus, in Figure 7, we generate updated global sums once per minute of processing time. Note how the semi-transparent output rectangles overlap, since Accumulating panes build upon prior results by incorporating overlapping regions of processing time:

我們首先看一下改變窗口這條思路,因為這會幫助我們產(chǎn)生概念上一致的輸出(一個全局的包含所有時間的按鍵求和),以及周期性的更新結果。在這個例子中,我們使用 Window.trigger操作在一分鐘的周期內(nèi)重復觸發(fā)窗口。我們使用累積模式來修正我們的全局求和的結果,確保其比較精確(這假設我們將結果輸出到一個數(shù)據(jù)庫或者是鍵值對中)。因此,在圖7中我們生成了每分鐘更新的全局求和結果。注意半透明的輸出長方形是重疊的,因為累積窗格模式下計算時包含了之前的輸出窗口的內(nèi)容:

If we instead wanted to generate the delta in sums once per minute, we could switch to Discarding mode, as in Figure 8. Note that this effectively gives the processing-time windowing semantics provided by many streaming systems. The output panes no longer overlap, since their results incorporate data from independent regions of processing time.

如果我們想要求出每分鐘的求和增量,那么我們應該轉化為丟棄模式。就跟在圖8中的一樣。注意,通過流式系統(tǒng)提供的處理時間窗口化的計算模式很有效。輸出窗格不再重疊,因為輸出結果來自于相互獨立的時間內(nèi)的數(shù)據(jù)。

Another, more robust way of providing processing-time windowing semantics is to simply assign arrival time as event times at data ingress, then use event time windowing. A nice side effect of using arrival time event times is that the system has perfect knowledge of the event times in flight, and thus can provide perfect (i.e. non-heuristic) watermarks, with no late data. This is an effective and cost-efficient way of processing unbounded data for use cases where true event times are not necessary or available.

另一種更加健壯的方式是:提供處理時間窗口實現(xiàn)方式,將數(shù)據(jù)進入處理管道的時間分配為數(shù)據(jù)的事件時間,然后使用這個事件時間進行窗口化。使用讀取時間作為事件時間的一個好處是系統(tǒng)對流式系統(tǒng)的事件時間十分的清楚,因此可以提供完美的、無延遲數(shù)據(jù)的水位標記(比如無啟發(fā)式)。所以這種方式在無界數(shù)據(jù)且事件時間并非必須或者無法獲取的情況下是一種非常低成本且有效的方式。

Before we look more closely at other windowing options, let us consider one more change to the triggers for this pipeline. The other common windowing mode we would like to model is tuple-based windows. We can provide this sort of functionality by simply changing the trigger to fire after a certain number of data arrive, say two. In Figure 9, we get five outputs, each containing the sum of two adjacent (by processing time) data. More sophisticated tuple-based windowing schemes (e.g. sliding tuple-based windows) require custom windowing strategies, but are otherwise supported.

在我們討論更多的窗口類型前,先看一下對于對于處理管道的觸發(fā)器的改進。另一種常用的窗口化模式是基于記錄數(shù)據(jù)數(shù)的窗口策略。我們可以通過簡單的修改觸發(fā)器使其在一定數(shù)據(jù)達到后被觸發(fā),來實現(xiàn)基于記錄數(shù)據(jù)數(shù)的窗口化。圖9中我們給定五個輸出,每一個都包含了兩個在處理時間上鄰近的數(shù)據(jù)的和。更加精密的基于記錄數(shù)的窗口方案(比如基于記錄數(shù)的滑動窗口)需要定制化窗口策略來支持。

Let us now return to the other option for supporting unbounded sources: switching away from global windowing. To start with, let us window the data into fixed, two-minute Accumulating windows:

讓我們回到另一個支持無界數(shù)據(jù)源的選項:將視線從全局窗口上移開。一開始,我們觀察固定的、兩分鐘寬度的累積窗口:

With no trigger strategy specified, the system would use the default trigger, which is effectively:

沒有定義觸發(fā)器策略,系統(tǒng)只能使用默認的有效觸發(fā)器:

The watermark trigger fires when the watermark passes the end of the window in question. Both batch and streaming engines implement watermarks, as detailed in Section 3.1. The Repeat call in the trigger is used to handle late data; should any data arrive after the watermark, they will instantiate the repeated watermark trigger, which will fire immediately since the watermark has already passed.

水位標記觸發(fā)器在水位線到達窗口盡頭的時候會被觸發(fā)。我們假設批處理和流式引擎都實現(xiàn)了水位標記,在3.1節(jié)中詳細解釋。Repeat在觸發(fā)器中用于處理后續(xù)遲到的數(shù)據(jù),也就是那些在水位標記之后抵達的數(shù)據(jù)。這就意味著當越過水位線之后,水位標志觸發(fā)器會立刻觸發(fā)。因為按照定義來說,此時的水位線已經(jīng)越過了窗口盡頭了。

Figures 10?12 each characterize this pipeline on a different type of runtime engine. We will first observe what execution of this pipeline would look like on a batch engine. Given our current implementation, the data source would have to be a bounded one, so as with the classic batch example above, we would wait for all data in the batch to arrive. We would then process the data in event-time order, with windows being emitted as the simulated watermark advances, as in Figure 10:

圖10-12描述了窗口在上述三種類型的數(shù)據(jù)處理引擎上運行的特征。首先觀察在批處理引擎上這個數(shù)據(jù)管道是如何運行的。受限于當前的實現(xiàn)情況,輸入源必須是有限的,所以前面提到的就像是經(jīng)典的批處理示例一樣,我們應該等所有的數(shù)據(jù)到達之后再進行運算。然后我們要根據(jù)數(shù)據(jù)的事件時間進行處理,在模擬的水位線到達后窗口計算被觸發(fā),輸出計算結果。整體如圖10所示。

Now imagine executing a micro-batch engine over this data source with one minute micro-batches. The system would gather input data for one minute, process them, and repeat. Each time, the watermark for the current batch would start at the beginning of time and advance to the end of time (technically jumping from the end time of the batch to the end of time instantaneously, since no data would exist for that period). We would thus end up with a new watermark for every micro-batch round, and corresponding outputs for all windows whose contents had changed since the last round. This provides a very nice mix of latency and eventual correctness, as in Figure 11:

現(xiàn)在考慮下微型批處理引擎,每分鐘弄對數(shù)據(jù)源進行一次微型批處理。系統(tǒng)每分鐘先收集一分鐘的數(shù)據(jù),然后處理,然后再重復這個過程。每一次開始之后,水位線會從當前的批次開始并且上升到該批次的結束時間(技術上來說是即刻完成的,取決于一分鐘內(nèi)管道內(nèi)是否會有數(shù)據(jù)積壓)。這樣每一輪的微型批處理都是以一個新的水位標記為結束,并且對應所有窗口都會產(chǎn)生不同的內(nèi)容輸出。者提供了一個很好地延遲和最終準確新的混合權衡。如圖11:

Next, consider this pipeline executed on a streaming engine, as in Figure 12. Most windows are emitted when the watermark passes them. Note however that the datum with value 9 is actually late relative to the watermark. For whatever reason (mobile input source being offline, network partition, etc.), the system did not realize that datum had not yet been injected, and thus, having observed the 5, allowed the watermark to proceed past the point in event time that would eventually be occupied by the 9. Hence, once the 9 finally arrives, it causes the first window (for event-time range [12:00, 12:02)) to retrigger with an updated sum:

接下來,考慮在流式引擎上執(zhí)行管道計算,就比如在圖12中顯示的那樣。大部分的窗口都會在水位標記到達的時候進行輸出。注意值為9的那個數(shù)據(jù)是在水位標記到達之后才抵達的。不論什么原因(手機用戶的輸入源可能離線,或者網(wǎng)絡故障分區(qū)等)系統(tǒng)并未意識到數(shù)據(jù)那一條數(shù)據(jù)并未被注入,因此,看數(shù)據(jù)5,一開始在水位標記到達的時候觸發(fā)了計算,后來值9來了之后窗口重新觸發(fā),進行計算。因此,當9最終抵達,會造成一開始的窗口(事件時間為 [12:00, 12:02)的那一個)再次觸發(fā),并且更新求和的值:

This output pattern is nice in that we have roughly one output per window, with a single refinement in the case of the late datum. But the overall latency of results is noticeably worse than the micro-batch system, on account of having to wait for the watermark to advance; this is the case of watermarks being too slow from Section 2.3.

輸出格式是每一個窗口嚴格的一個輸出,并且對遲到的數(shù)據(jù)有一個細微的改良,這一點很不錯。但是總體結果的延遲比之微型批處理系統(tǒng)更為糟糕,因為必須要等到水位標記有動作。這就是在2.3節(jié)中說到的單純地依賴水位標記的問題。

If we want lower latency via multiple partial results for all of our windows, we can add in some additional, processing-time-based triggers to provide us with regular updates until the watermark actually passes, as in Figure 13. This yields somewhat better latency than the micro-batch pipeline, since data are accumulated in windows as they arrive instead of being processed in small batches. Given strongly-consistent micro-batch and streaming engines, the choice between them (as well as the choice of micro-batch size) really becomes just a matter of latency versus cost, which is exactly one of the goals we set out to achieve with this model.

如果我們想要通過對所有的窗口輸出多種局部結果來降低延遲,我們可以加一個額外的、基于處理時間的觸發(fā)器來提供規(guī)律的刷新,直到水位標記最終通過,比如圖13。這樣我們可以得到比微型批處理管道更低的延遲,因為數(shù)據(jù)在會窗口中累積而不是到達之后就被小批量的處理了。假設微型批處理和流式引擎都是強一致的,那么在他們之間的選擇(類似于對微型批處理的尺寸的選擇)就會成為一個影響延遲與成本之間的關鍵問題,這也是我們這個模型所致力于要去實現(xiàn)的。

As one final exercise, let us update our example to satisfy the video sessions requirements (modulo the use of summation as the aggregation operation, which we will maintain for diagrammatic consistency; switching to another aggregation would be trivial), by updating to session windowing with a one minute timeout and enabling retractions. This highlights the composability provided by breaking the model into four pieces (what you are computing, where in event time you are computing it, when in processing time you are observing the answers, and how those answers relate to later refinements), and also illustrates the power of reverting previous values which otherwise might be left uncorrelated to the value offered as replacement.

作為最后一個例子,通過更新會話窗口化,設置終止時間為1分鐘,并且開啟撤回功能,我們來看下更新后如何滿足視頻會話的需求(為了圖表的一致性,我們繼續(xù)把求合作為我們的示范操作,轉變?yōu)閯e的聚合操作也是很簡單的)。高亮的部分也顯示了我們將模型的四個維度拆開之后所帶來的高靈活性(就算什么?在哪一段事件時間里面計算?在處理時間中什么時候看到結果?結果如何與后面的細微改良相關聯(lián)?),同時這也展示了可以可以進行撤回操作是一個強力的工具,否則可能會讓下游之前接受到的數(shù)據(jù)無法進行修正。

In this example, we output initial singleton sessions for values 5 and 7 at the first one-minute processing-time boundary. At the second minute boundary, we output a third session with value 10, built up from the values 3, 4, and 3. When the value of 8 is finally observed, it joins the two sessions with values 7 and 10. As the watermark passes the end of this new combined session, retractions for the 7 and 10 sessions are emitted, as well as a normal datum for the new session with value 25. Similarly, when the 9 arrives (late), it joins the session with value 5 to the session with value 25. The repeated watermark trigger then immediately emits retractions for the 5 and the 25, followed by a combined session of value 39. A similar dance occurs for the values 3, 8, and 1, ultimately ending with a retraction for an initial 3 session, followed by a combined session of 12.

在這個例子中,由于5和7之間事件發(fā)生時間大于1分鐘,因此被當做了兩個會話,因為兩者之間的事件時間大于一分鐘,所以被當做兩個結果輸出。在第二分鐘的分界線上,我們輸出了第三個值為10的會話,它是由數(shù)據(jù)3、4、3得來的。當數(shù)據(jù)8最后被系統(tǒng)觀察到的時候,它就加入了數(shù)據(jù)7和10的會話中,當水位標記到達這個合并后的和窗口的終點對數(shù)據(jù)7和10的撤回操作就會發(fā)出,撤回方式是往下游發(fā)送兩條鍵為之前的兩個會話標記,值為-7和-10的記錄,然后發(fā)送一個值為25的新的窗口計算結果。同樣的,當數(shù)據(jù)9到達(遲到的),它與值為5的會話一起與值為25會話組成了新窗口。重復的水位標記觸發(fā)器馬上對值5和25進行撤回操作,并且隨之發(fā)送一個值為39的合并窗口。一個類似的行為發(fā)生在值3、8、1上,最終對三個初始的窗口進行撤回操作,隨之的是一個結果為12的新會話。

正文之后

本文大部分是自己翻譯,小部分比較長、難的句子,則是參考了歐陸字典、Google 翻譯、以及最重要的下面的來自阿里云的一篇已經(jīng)翻譯好的部分!

流計算精品翻譯: The Dataflow Model

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

  • rljs by sennchi Timeline of History Part One The Cognitiv...
    sennchi閱讀 7,847評論 0 10
  • 大家好,我叫吳子涵,我看了一個故事叫《女孩故事》,里面的內(nèi)容是,讓女孩們跟著小主人一起體驗成功的喜悅,告訴她們怎樣...
    吳子涵閱讀 249評論 0 0
  • 街頭的燈火 在冬季的夜晚游走 如魅的汽車忽至近前 一束短暫的溫暖一閃而過 消失在空寂的街尾 當溫柔已成往事 在這長...
    菩提樹下參禪讀書閱讀 293評論 0 1
  • 一下內(nèi)容均為個人學習使用,有不足之處請拍磚。 使用代碼創(chuàng)建窗口,沒有反應,無法進入 didFinishLaunch...
    木木小林醬閱讀 32,175評論 1 21
  • 我們學校的院子里,有一顆松樹,高大挺拔,蒼翠蔥郁。冬日來臨,當周圍的楊樹慢慢“赤身露體”的時候,它卻比以前更顯活...
    瑋瑋想睡覺閱讀 329評論 0 0

友情鏈接更多精彩內(nèi)容