概述
Apache Flink 是一個為生產(chǎn)環(huán)境而生的流處理器,具有易于使用的 API,可以用于定義高級流分析程序。
Flink 的 API 在數(shù)據(jù)流上具有非常靈活的窗口定義,使其在其他開源流處理框架中脫穎而出。
Windows定義
按固定時間區(qū)間計算該區(qū)間的值,例如15s計算匯總一次:

無窮的流,數(shù)據(jù)不間斷的,例如有累計數(shù)據(jù)的需求,按上圖的邏輯是處理不到的。換一種思路,每隔 15 秒,我們都將與上一次的結(jié)果進(jìn)行 sum 操作(滑動聚合):

流是無界的,我們不能限制流,所以上述方案也解決不了需求,但可以在有一個有界的范圍內(nèi)處理無界的流數(shù)據(jù)。
那么按一分鐘一個時間窗口計算,相當(dāng)于一個定義了一個 Window(窗口),window 的界限是1分鐘,且每分鐘內(nèi)的數(shù)據(jù)互不干擾,因此也可以稱為翻滾(不重合)窗口

第一分鐘的數(shù)量為8,第二分鐘是22,第三分鐘是27。。。這樣,1個小時內(nèi)會有60個window。
再考慮一種情況,每30秒統(tǒng)計一次過去1分鐘的數(shù)量之和:

通常來講,Window 就是用來對一個無限的流設(shè)置一個有限的集合,在有界的數(shù)據(jù)集上進(jìn)行操作的一種機(jī)制。window 又可以分為基于時間(Time-based)的 window 以及基于數(shù)量(Count-based)的 window。
Flink窗口類型
對于窗口的操作主要分為兩種,分別對于Keyedstream和Datastream。他們的主要區(qū)別也僅僅在于建立窗口的時候一個為.window(...),一個為.windowAll(...)。對于Keyedstream的窗口來說,他可以使得多任務(wù)并行計算,每一個logical key stream將會被獨(dú)立的進(jìn)行處理。
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...)/.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
按照窗口的Assigner來分,窗口可以分為
Tumbling window, sliding window,session window,global window,custom window每種窗口又可分別基于processing time和event time。
還有一種window叫做count window,依據(jù)元素到達(dá)的數(shù)量進(jìn)行分配,之后也會提到。
窗口的生命周期開始在第一個屬于這個窗口的元素到達(dá)的時候,結(jié)束于第一個不屬于這個窗口的元素到達(dá)的時候。
窗口的操作
Tumbling window
固定相同間隔分配窗口,每個窗口之間沒有重疊。

tumbling time windows(翻滾時間窗口)
data.keyBy(1)
.timeWindow(Time.minutes(1)) //tumbling time window 每分鐘統(tǒng)計一次數(shù)量和
.sum(1);
sliding time windows(滑動時間窗口)
固定相同間隔分配窗口,只不過每個窗口之間有重疊。窗口重疊的部分如果比窗口小,窗口將會有多個重疊,即一個元素可能被分配到多個窗口里去。

data.keyBy(1)
.timeWindow(Time.minutes(1), Time.seconds(30)) //sliding time window 每隔 30s 統(tǒng)計過去一分鐘的數(shù)量和
.sum(1);
那么流處理器如何解釋時間?
Apache Flink 具有三個不同的時間概念,即 processing time, event time 和 ingestion time。

默認(rèn)采用:
TimeCharacteristic.ProcessingTime
我們可以設(shè)置為其他:
1. env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
2. env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
3. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
指定為EventTime的source需要自己定義event time以及emit watermark,或者在source之外通過assignTimestampsAndWatermarks在程序手工指定
Watermark解釋
Count Windows
Apache Flink 還提供計數(shù)窗口功能。如果計數(shù)窗口設(shè)置的為 100 ,那么將會在窗口中收集 100 個事件,并在添加第 100 個元素時計算窗口的值。
tumbling count window
data.keyBy(1)
.countWindow(100) //統(tǒng)計每 100 個元素的數(shù)量之和
.sum(1);
sliding count window
data.keyBy(1)
.countWindow(100, 10) //每 10 個元素統(tǒng)計過去 100 個元素的數(shù)量之和
.sum(1);
Session window
主要是根據(jù)活動的事件進(jìn)行窗口化,他們通常不重疊,也沒有一個固定的開始和結(jié)束時間。一個session window關(guān)閉通常是由于一段時間沒有收到元素。在這種用戶交互事件流中,我們首先想到的是將事件聚合到會話窗口中(一段用戶持續(xù)活躍的周期),由非活躍的間隙分隔開。

// 靜態(tài)間隔時間
WindowedStream<MovieRate, Integer, TimeWindow> Rates = rates
.keyBy(MovieRate::getUserId)
.window(EventTimeSessionWindows.withGap(Time.milliseconds(10)));
// 動態(tài)時間
WindowedStream<MovieRate, Integer, TimeWindow> Rates = rates
.keyBy(MovieRate::getUserId)
.window(EventTimeSessionWindows.withDynamicGap(()));
Global window
同keyed的元素分配到一個窗口里
WindowedStream<MovieRate, Integer, GlobalWindow> Rates = rates
.keyBy(MovieRate::getUserId)
.window(GlobalWindows.create());
Flink 的窗口機(jī)制

到達(dá)窗口操作符的元素被傳遞給 WindowAssigner。WindowAssigner 將元素分配給一個或多個窗口,可能會創(chuàng)建新的窗口。窗口本身只是元素列表的標(biāo)識符,它可能提供一些可選的元信息,例如 TimeWindow 中的開始和結(jié)束時間。注意,元素可以被添加到多個窗口,這也意味著一個元素可以同時在多個窗口存在。
每個窗口都擁有一個 Trigger(觸發(fā)器),該 Trigger(觸發(fā)器) 決定何時計算和清除窗口。當(dāng)先前注冊的計時器超時時,將為插入窗口的每個元素調(diào)用觸發(fā)器。在每個事件上,觸發(fā)器都可以決定觸發(fā)(即、清除(刪除窗口并丟棄其內(nèi)容),或者啟動并清除窗口。一個窗口可以被求值多次,并且在被清除之前一直存在。注意,在清除窗口之前,窗口將一直消耗內(nèi)存。
當(dāng) Trigger(觸發(fā)器) 觸發(fā)時,可以將窗口元素列表提供給可選的 Evictor,Evictor 可以遍歷窗口元素列表,并可以決定從列表的開頭刪除首先進(jìn)入窗口的一些元素。然后其余的元素被賦給一個計算函數(shù),如果沒有定義 Evictor,觸發(fā)器直接將所有窗口元素交給計算函數(shù)。
-
計算函數(shù)接收 Evictor 過濾后的窗口元素,并計算窗口的一個或多個元素的結(jié)果。 DataStream API 接受不同類型的計算函數(shù),包括預(yù)定義的聚合函數(shù),如 sum(),min(),max(),以及 ReduceFunction,F(xiàn)oldFunction 或 WindowFunction。
- 窗口函數(shù)就是這四個:ReduceFunction,AggregateFunction,F(xiàn)oldFunction,ProcessWindowFunction。前兩個執(zhí)行得更有效,因為Flink可以增量地聚合每個到達(dá)窗口的元素。
- Flink必須在調(diào)用函數(shù)之前在內(nèi)部緩沖窗口中的所有元素,所以使用ProcessWindowFunction進(jìn)行操作效率不高。不過ProcessWindowFunction可以跟其他的窗口函數(shù)結(jié)合使用,其他函數(shù)接受增量信息,ProcessWindowFunction接受窗口的元數(shù)據(jù)。
這些是構(gòu)成 Flink 窗口機(jī)制的組件。 接下來我們逐步演示如何使用 DataStream API 實現(xiàn)自定義窗口邏輯。 我們從 DataStream [IN] 類型的流開始,并使用 key 選擇器函數(shù)對其分組,該函數(shù)將 key 相同類型的數(shù)據(jù)分組在一塊。
SingleOutputStreamOperator<xxx> data = env.addSource(...);
data.keyBy();
自定義Windows
Window Assigner
負(fù)責(zé)將元素分配到不同的 window。
Window API 提供了自定義的 WindowAssigner 接口,我們可以實現(xiàn) WindowAssigner 的方法
public abstract Collection<W> assignWindows(T element, long timestamp)
同時,對于基于 Count 的 window 而言,默認(rèn)采用了 GlobalWindow 的 window assigner,例如:
keyBy.window(GlobalWindows.create())
Trigger(觸發(fā)器)
觸發(fā)器定義了窗口何時準(zhǔn)備好被窗口處理。每個窗口分配器默認(rèn)都有一個觸發(fā)器,如果默認(rèn)的觸發(fā)器不符合你的要求,就可以使用trigger(...)自定義觸發(fā)器。
通常來說,默認(rèn)的觸發(fā)器適用于多種場景。例如,多有的event-time窗口分配器都有一個EventTimeTrigger作為默認(rèn)觸發(fā)器。該觸發(fā)器在watermark通過窗口末尾時出發(fā)。
PS:GlobalWindow默認(rèn)的觸發(fā)器時NeverTrigger,該觸發(fā)器從不出發(fā),所以在使用GlobalWindow時必須自定義觸發(fā)器。
Evictor(驅(qū)逐器-可選)
Evictors可以在觸發(fā)器觸發(fā)之后以及窗口函數(shù)被應(yīng)用之前和/或之后可選擇的移除元素。使用Evictor可以防止預(yù)聚合,因為窗口的所有元素都必須在應(yīng)用計算邏輯之前先傳給Evictor進(jìn)行處理。
通過 apply WindowFunction 來返回 DataStream 類型數(shù)據(jù)
利用 Flink 的內(nèi)部窗口機(jī)制和 DataStream API 可以實現(xiàn)自定義的窗口邏輯,例如 session window。