Flink的 Window 操作
Window是無限數(shù)據(jù)流處理的核心,Window將一個無限的stream拆分成有限大小的”buckets”桶,我們可以在這些桶上做計算操作。本文主要聚焦于在Flink中如何進行窗口操作,以及程序員如何從window提供的功能中獲得最大的收益。
窗口化的Flink程序的一般結構如下,第一個代碼段中是分組的流,而第二段是非分組的流。正如我們所見,唯一的區(qū)別是分組的stream調用keyBy(…)和window(…),而非分組的stream中window()換成了windowAll(…),這些也將貫穿都這一頁的其他部分中。
Keyed Windows
stream.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness()] <- optional, else zero
.reduce/fold/apply() <- required: "function"
Non-Keyed Windows
stream.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness()] <- optional, else zero
.reduce/fold/apply() <- required: "function"
在上面的例子中,方括號[]內的命令是可選的,這表明Flink允許你根據(jù)最符合你的要求來定義自己的window邏輯。
Window 的生命周期
簡單地說,當一個屬于window的元素到達之后這個window就創(chuàng)建了,而當當前時間(事件或者處理時間)為window的創(chuàng)建時間跟用戶指定的延遲時間相加時,窗口將被徹底清除。Flink 確保了只清除基于時間的window,其他類型的window不清除,例如:全局window(詳情:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#window-assigners) 。例如:對于一個每5分鐘創(chuàng)建無覆蓋的(即 翻滾窗口)窗口,允許一個1分鐘的時延的窗口策略,F(xiàn)link將會在12:00到12:05這段時間內第一個元素到達時創(chuàng)建窗口,當水印通過12:06時,移除這個窗口。
此外,每個 Window 都有一個Trigger(觸發(fā)器,詳情請見:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#triggers) 和一個附屬于 Window 的函數(shù)(例如: WindowFunction, ReduceFunction 及 FoldFunction),詳情請見:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#window-functions 。函數(shù)里包含了應用于窗口(Window)內容的計算,而Trigger(觸發(fā)器)則指定了函數(shù)在什么條件下可被應用(函數(shù)何時被觸發(fā)),一個觸發(fā)策略可以是 "當窗口中的元素個數(shù)超過4個時" 或者 "當水印達到窗口的邊界時"。觸發(fā)器還可以決定在窗口創(chuàng)建和刪除之間的任意時刻清除窗口的內容,本例中的清除僅指清除窗口的內容而不是窗口的元數(shù)據(jù),也就是說新的數(shù)據(jù)還是可以被添加到當前的window中。
除了上面的提到之外,你還可以指定一個驅逐者(Evictor,詳情請見:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#evictors ), Evictor
將在觸發(fā)器觸發(fā)之后或者在函數(shù)被應用之前或者之后,清楚窗口中的元素。
接下來我們將更深入的去了解上述的部件,我們從上述片段的主要部分開始(如:Keyed vs Non-Keyed Windows, Window Assigner, 及 Window Function),然后是可選部分。
分組和非分組Windows (Keyed vs Non-Keyed Windows)
首先,第一件事是指定你的數(shù)據(jù)流是分組的還是未分組的,這個必須在定義 window 之前指定好。使用 keyBy(...) 會將你的無限數(shù)據(jù)流拆分成邏輯分組的數(shù)據(jù)流,如果 keyBy(...) 函數(shù)不被調用的話,你的數(shù)據(jù)流將不是分組的。
在分組數(shù)據(jù)流中,任何正在傳入的事件的屬性都可以被當做key(更多詳情請見:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/api_concepts.html#specifying-keys ),分組數(shù)據(jù)流將你的window計算通過多任務并發(fā)執(zhí)行,以為每一個邏輯分組流在執(zhí)行中與其他的邏輯分組流是獨立地進行的。
在非分組數(shù)據(jù)流中,你的原始數(shù)據(jù)流并不會拆分成多個邏輯流并且所有的window邏輯將在一個任務中執(zhí)行,并發(fā)度為1。
窗口分配器(Window Assingers)
指定完你的數(shù)據(jù)流是分組的還是非分組的之后,接下來你需要定義一個窗口分配器(window assigner),窗口分配器定義了元素如何分配到窗口中,這是通過在分組數(shù)據(jù)流中調用window(...)或者非分組數(shù)據(jù)流中調用windowAll(...)時你選擇的窗口分配器(WindowAssigner)來指定的。WindowAssigner是負責將每一個到來的元素分配給一個或者多個窗口(window),Flink 提供了一些常用的預定義窗口分配器,即:滾動窗口、滑動窗口、會話窗口和全局窗口。你也可以通過繼承WindowAssigner類來自定義自己的窗口。所有的內置窗口分配器(除了全局窗口 global window)都是通過時間來分配元素到窗口中的,這個時間要么是處理的時間,要么是事件發(fā)生的時間。請看一下我們的 event time (https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html )部分來了解更多處理時間和事件時間的區(qū)別及時間戳(timestamp)和水印(watermark)是如何產(chǎn)生的。
接下來我們將展示Flink的預定義窗口分配器是如何工作的,以及它們在DataStream程序中是如何使用的。接下來我們將展示Flink的預定義窗口分配器是如何工作的,以及它們在DataStream程序中是如何使用的。下圖中展示了每個分配器是如何工作的,紫色圓圈代表著數(shù)據(jù)流中的一個元素,這些元素是通過一些key進行分區(qū)(在本例中是 user1,user2,user3), X軸顯示的是時間進度。
滾動窗口
滾動窗口分配器將每個元素分配的一個指定窗口大小的窗口中,滾動窗口有一個固定的大小,并且不會出現(xiàn)重疊。例如:如果你指定了一個5分鐘大小的滾動窗口,當前窗口將被評估并將按下圖說明每5分鐘創(chuàng)建一個新的窗口。
![滾動窗口][https://ci.apache.org/projects/flink/flink-docs-release-1.3/fig/tumbling-windows.svg ]
下面的代碼片段展示了如何使用滾動窗口。
Java 代碼
DataStream<T> input = ...;
滾動事件時間窗口( tumbling event-time windows )
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
滾動處理時間窗口(tumbling processing-time windows)
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>);
每日偏移8小時的滾動事件時間窗口(daily tumbling event-time windows offset by -8 hours. )
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
Scala 代碼:
val input:DataStream[T] =
滾動事件時間窗口(tumbling event-time windows)
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
滾動處理時間窗口(tumbling processing-time windows)
input
.keyBy(<key selector>)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.<windowed transformation>(<window function>)
每日偏移8小時的滾動事件時間窗口(daily tumbling event-time windows offset by -8 hours. )
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>)
時間間隔可以通過Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等其中的一個來指定。
在上面最后的例子中,滾動窗口分配器還接受了一個可選的偏移參數(shù),可以用來改變窗口的排列。例如,沒有偏移的話按小時的滾動窗口將按時間紀元來對齊,也就是說你將一個如: 1:00:00.000~1:59:59.999,2:00:00.000~2:59:59.999等,如果你想改變一下,你可以指定一個偏移,如果你指定了一個15分鐘的偏移,你將得到1:15:00.000~2:14:59.999,2:15:00.000~3:14:59.999等。時間偏移一個很大的用處是用來調準非0時區(qū)的窗口,例如:在中國你需要指定一個8小時的時間偏移。
滑動窗口(Sliding Windows)
滑動窗口分配器將元素分配到固定長度的窗口中,與滾動窗口類似,窗口的大小由窗口大小參數(shù)來配置,另一個窗口滑動參數(shù)控制滑動窗口開始的頻率。因此,滑動窗口如果滑動參數(shù)小于滾動參數(shù)的話,窗口是可以重疊的,在這種情況下元素會被分配到多個窗口中。
例如,你有10分鐘的窗口和5分鐘的滑動,那么每個窗口中5分鐘的窗口里包含著上個10分鐘產(chǎn)生的數(shù)據(jù),如下圖所示:
![][https://ci.apache.org/projects/flink/flink-docs-release-1.3/fig/sliding-windows.svg]
下面的代碼片段中展示了如何使用滑動窗口:
Java 代碼:
DataStream<T> input = ...;
滑動事件時間窗口
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
滑動處理時間窗口
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>);
//偏移8小時的滑動處理時間窗口(sliding processing-time windows offset by -8 hours)
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
Scala 代碼:
val input: DataStream[T] = ...
// 滑動事件時間窗口(sliding event-time windows)
input
.keyBy(<key selector>)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>)
//滑動處理時間窗口(sliding processing-time windows)
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.<windowed transformation>(<window function>)
// 偏移8小時的滑動處理時間窗口(sliding processing-time windows offset by -8 hours)
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>)
時間間隔可以通過Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等來指定。
正如上述例子所示,滑動窗口分配器也有一個可選的偏移參數(shù)來改變窗口的對齊。例如,沒有偏移參數(shù),按小時的窗口,有30分鐘的滑動,將根據(jù)時間紀元來對齊,也就是說你將得到如下的窗口1:00:00.001:59:59.999,1:30:00.0002:29:59.999等。而如果你想改變窗口的對齊,你可以給定一個偏移,如果給定一個15分鐘的偏移,你將得到如下的窗口:1:15:00.000~2:14.59.999, 1:45:00.000~2:44:59.999等。時間偏移一個很大的用處是用來調準非0時區(qū)的窗口,例如:在中國你需要指定一個8小時的時間偏移。
會話窗口(Session Windows)
session窗口分配器通過session活動來對元素進行分組,session窗口跟滾動窗口和滑動窗口相比,不會有重疊和固定的開始時間和結束時間的情況。相反,當它在一個固定的時間周期內不再收到元素,即非活動間隔產(chǎn)生,那個這個窗口就會關閉。一個session窗口通過一個session間隔來配置,這個session間隔定義了非活躍周期的長度。當這個非活躍周期產(chǎn)生,那么當前的session將關閉并且后續(xù)的元素將被分配到新的session窗口中去。
![會話窗口][https://ci.apache.org/projects/flink/flink-docs-release-1.3/fig/session-windows.svg]
下面的代碼片段中展示了如何使用session窗口
Java代碼:
DataStream<T> input = ...;
// 事件時間會話窗口(event-time session windows)
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
// 處理時間會話窗口(processing-time session windows)
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>);
Scala代碼:
val input: DataStream[T] = ...
// 事件時間會話窗口(event-time session windows)
input
.keyBy(<key selector>)
.window(EventTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>)
// 處理時間會話窗口(processing-time session windows)
input
.keyBy(<key selector>)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
.<windowed transformation>(<window function>)
時間間隔可以通過Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等來指定。
注意: 因為session看窗口沒有一個固定的開始和結束,他們的評估與滑動窗口和滾動窗口不同。在內部,session操作為每一個到達的元素創(chuàng)建一個新的窗口,并合并間隔時間小于指定非活動間隔的窗口。為了進行合并,session窗口的操作需要指定一個合并觸發(fā)器(Trigger)和一個合并窗口函數(shù)(Window Function),如:ReduceFunction或者WindowFunction(FoldFunction不能合并)。
全局窗口(Global Windows)
全局窗口分配器將所有具有相同key的元素分配到同一個全局窗口中,這個窗口模式僅適用于用戶還需自定義觸發(fā)器的情況。否則,由于全局窗口沒有一個自然的結尾,無法執(zhí)行元素的聚合,將不會有計算被執(zhí)行。
![全局窗口][https://ci.apache.org/projects/flink/flink-docs-release-1.3/fig/non-windowed.svg]
下面的代碼片段展示了如何使用全局窗口:
Java 代碼:
DataStream<T> input = ...;
input
.keyBy(<key selector>)
.window(GlobalWindows.create())
.<windowed transformation>(<window function>);
Scala代碼:
val input: DataStream[T] = ...
input
.keyBy(<key selector>)
.window(GlobalWindows.create())
.<windowed transformation>(<window function>)
窗口函數(shù)(Window Functions)
定義完窗口分配器后,我們還需要為每一個窗口指定我們需要執(zhí)行的計算,這是窗口的責任,當系統(tǒng)決定一個窗口已經(jīng)準備好執(zhí)行之后,這個窗口函數(shù)將被用來處理窗口中的每一個元素(可能是分組的)。請參考:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#triggers 來了解當一個窗口準備好之后,F(xiàn)link是如何決定的。
window函數(shù)可以是ReduceFunction, FoldFunction 或者 WindowFunction 中的一個。前面兩個更高效一些(),因為在每個窗口中增量地對每一個到達的元素執(zhí)行聚合操作。一個 WindowFunction 可以獲取一個窗口中的所有元素的一個迭代以及哪個元素屬于哪個窗口的額外元信息。
有WindowFunction的窗口化操作會比其他的操作效率要差一些,因為Flink內部在調用函數(shù)之前會將窗口中的所有元素都緩存起來。這個可以通過WindowFunction和ReduceFunction或者FoldFunction結合使用來獲取窗口中所有元素的增量聚合和WindowFunction接收的額外的窗口元數(shù)據(jù),接下來我們將看一看每一種變體的示例。
ReduceFunction
ReduceFunction指定了如何通過兩個輸入的參數(shù)進行合并輸出一個同類型的參數(shù)的過程,F(xiàn)link使用ReduceFunction來對窗口中的元素進行增量聚合。
一個ReduceFunction 可以通過如下的方式來定義和使用:
Java 代碼:
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce(new ReduceFunction<Tuple2<String, Long>> {
public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
}
});
Scala 代碼:
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }
上面的例子是將窗口所有元素中元組的第二個屬性進行累加操作。
FoldFunction
FoldFunction 指定了一個輸入元素如何與一個輸出類型的元素合并的過程,這個FoldFunction 會被每一個加入到窗口中的元素和當前的輸出值增量地調用,第一個元素是與一個預定義的類型為輸出類型的初始值合并。
一個FoldFunction可以通過如下的方式定義和調用:
Java 代碼:
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.fold("", new FoldFunction<Tuple2<String, Long>, String>> {
public String fold(String acc, Tuple2<String, Long> value) {
return acc + value.f1;
}
});
Scala 代碼:
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.fold("") { (acc, v) => acc + v._2 }
上面例子追加所有輸入的長整型到一個空的字符串中。
注意 fold()不能應用于回話窗口或者其他可合并的窗口中。
窗口函數(shù) —— 一般用法(WindowFunction - The Generic Case)
一個WindowFunction將獲得一個包含了window中的所有元素迭代(Iterable),并且提供所有窗口函數(shù)的最大靈活性。這些帶來了性能的成本和資源的消耗,因為window中的元素無法進行增量迭代,而是緩存起來直到window被認為是可以處理時為止。
WindowFunction的使用說明如下:
Java 代碼:
public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
/**
// Evaluates the window and outputs none or several elements.
// @param key The key for which this window is evaluated.
// @param window The window that is being evaluated.
// @param input The elements in the window being evaluated.
// @param out A collector for emitting elements.
// @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}
Scala 代碼:
trait WindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable {
/**
// Evaluates the window and outputs none or several elements.
//
// @param key The key for which this window is evaluated.
// @param window The window that is being evaluated.
// @param input The elements in the window being evaluated.
// @param out A collector for emitting elements.
// @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT])
}
一個WindowFunction可以按如下方式來定義和使用:
Java 代碼:
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction());
/* ... */
public class MyWindowFunction implements WindowFunction<Tuple<String, Long>, String, String, TimeWindow> {
void apply(String key, TimeWindow window, Iterable<Tuple<String, Long>> input, Collector<String> out) {
long count = 0;
for (Tuple<String, Long> in: input) {
count++;
}
out.collect("Window: " + window + "count: " + count);
}
}
Scala 代碼:
val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.apply(new MyWindowFunction())
/* ... */
class MyWindowFunction extends WindowFunction[(String, Long), String, String, TimeWindow] {
def apply(key: String, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): () = {
var count = 0L
for (in <- input) {
count = count + 1
}
out.collect(s"Window $window count: $count")
}
}
上面的例子展示了統(tǒng)計一個window中元素個數(shù)的WindowFunction,此外,還將window的信息添加到輸出中。
注意:使用WindowFunction來做簡單的聚合操作如計數(shù)操作,性能是相當差的。下一章節(jié)我們將展示如何將ReduceFunction跟WindowFunction結合起來,來獲取增量聚合和添加到WindowFunction中的信息。
ProcessWindowFunction
在使用WindowFunction的地方你也可以用ProcessWindowFunction,這跟WindowFunction很類似,除了接口允許查詢跟多關于context的信息,context是window評估發(fā)生的地方。
下面是ProcessWindowFunction的接口:
Java 代碼:
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {
/**
// Evaluates the window and outputs none or several elements.
//
// @param key The key for which this window is evaluated.
// @param context The context in which the window is being evaluated.
// @param elements The elements in the window being evaluated.
// @param out A collector for emitting elements.
//
// @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
public abstract void process(
KEY key,
Context context,
Iterable<IN> elements,
Collector<OUT> out) throws Exception;
/**
// The context holding window metadata
*/
public abstract class Context {
/**
// @return The window that is being evaluated.
*/
public abstract W window();
}
}
Scala 代碼:
abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function {
/**
// Evaluates the window and outputs none or several elements.
//
// @param key The key for which this window is evaluated.
// @param context The context in which the window is being evaluated.
// @param elements The elements in the window being evaluated.
// @param out A collector for emitting elements.
// @throws Exception The function may throw exceptions to fail the program and trigger recovery.
*/
@throws[Exception]
def process(
key: KEY,
context: Context,
elements: Iterable[IN],
out: Collector[OUT])
/**
// The context holding window metadata
*/
abstract class Context {
/**
// @return The window that is being evaluated.
*/
def window: W
}
}
ProcessWindowFunction可以通過如下方式調用:
Java 代碼:
DataStream<Tuple2<String, Long>> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.process(new MyProcessWindowFunction());`
Scala 代碼:
`val input: DataStream[(String, Long)] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.process(new MyProcessWindowFunction())
有增量聚合功能的WindowFunction (WindowFunction with Incremental Aggregation)
WindowFunction可以跟ReduceFunction或者FoldFunction結合來增量地對到達window中的元素進行聚合,當window關閉之后,WindowFunction就能提供聚合結果。當獲取到WindowFunction額外的window元信息后就可以進行增量計算窗口了。
標注:你也可以使用ProcessWindowFunction替換WindowFunction來進行增量窗口聚合。
使用FoldFunction 進行增量窗口聚合(Incremental Window Aggregation with FoldFunction)
下面的例子展示了一個增量的FoldFunction如何跟一個WindowFunction結合,來獲取窗口的事件數(shù),并同時返回窗口的key和窗口的最后時間。
Java 代碼:
DataStream<SensorReading> input = ...;
input
.keyBy(<key selector>)
.timeWindow(<window assigner>)
.fold(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyWindowFunction())
// Function definitions
private static class MyFoldFunction
implements FoldFunction<SensorReading, Tuple3<String, Long, Integer> > {
public Tuple3<String, Long, Integer> fold(Tuple3<String, Long, Integer> acc, SensorReading s) {
Integer cur = acc.getField(2);
acc.setField(2, cur + 1);
return acc;
}
}
private static class MyWindowFunction
implements WindowFunction<Tuple3<String, Long, Integer>, Tuple3<String, Long, Integer>, String, TimeWindow> {
public void apply(String key,
TimeWindow window,
Iterable<Tuple3<String, Long, Integer>> counts,
Collector<Tuple3<String, Long, Integer>> out) {
Integer count = counts.iterator().next().getField(2);
out.collect(new Tuple3<String, Long, Integer>(key, window.getEnd(),count));
}
}
Scala 代碼:
val input: DataStream[SensorReading] = ...
input
.keyBy(<key selector>)
.timeWindow(<window assigner>)
.fold (
("", 0L, 0),
(acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) },
( key: String,
window: TimeWindow,
counts: Iterable[(String, Long, Int)],
out: Collector[(String, Long, Int)] ) =>
{
val count = counts.iterator.next()
out.collect((key, window.getEnd, count._3))
}
)
使用ReduceFunction進行增量窗口聚合(Incremental Window Aggregation with ReduceFunction)
下面例子展示了一個增量額ReduceFunction如何跟一個WindowFunction結合,來獲取窗口中最小的事件和窗口的開始時間。
Java 代碼:
DataStream<SensorReading> input = ...;
input
.keyBy(<key selector>)
.timeWindow(<window assigner>)
.reduce(new MyReduceFunction(), new MyWindowFunction());
// Function definitions
private static class MyReduceFunction implements ReduceFunction<SensorReading> {
public SensorReading reduce(SensorReading r1, SensorReading r2) {
return r1.value() > r2.value() ? r2 : r1;
}
}
private static class MyWindowFunction
implements WindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
public void apply(String key,
TimeWindow window,
Iterable<SensorReading> minReadings,
Collector<Tuple2<Long, SensorReading>> out) {
SensorReading min = minReadings.iterator().next();
out.collect(new Tuple2<Long, SensorReading>(window.getStart(), min));
}
}
Scala 代碼:
val input: DataStream[SensorReading] = ...
input
.keyBy(<key selector>)
.timeWindow(<window assigner>)
.reduce(
(r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 },
( key: String,
window: TimeWindow,
minReadings: Iterable[SensorReading],
out: Collector[(Long, SensorReading)] ) =>
{
val min = minReadings.iterator.next()
out.collect((window.getStart, min))
}
)
觸發(fā)器(Triggers)
觸發(fā)器決定了一個窗口何時可以被窗口函數(shù)處理,每一個窗口分配器都有一個默認的觸發(fā)器,如果默認的觸發(fā)器不能滿足你的需要,你可以通過調用trigger(...)來指定一個自定義的觸發(fā)器。觸發(fā)器的接口有5個方法來允許觸發(fā)器處理不同的事件:
*onElement()方法,每個元素被添加到窗口時調用
*onEventTime()方法,當一個已注冊的事件時間計時器啟動時調用
*onProcessingTime()方法,當一個已注冊的處理時間計時器啟動時調用
*onMerge()方法,與狀態(tài)性觸發(fā)器相關,當使用會話窗口時,兩個觸發(fā)器對應的窗口合并時,合并兩個觸發(fā)器的狀態(tài)。
*最后一個clear()方法執(zhí)行任何需要清除的相應窗口
上面的方法中有兩個需要注意的地方:
1)第一、三通過返回一個TriggerResult來決定如何操作調用他們的事件,這些操作可以是下面操作中的一個;
CONTINUE:什么也不做
FIRE:觸發(fā)計算
PURGE:清除窗口中的數(shù)據(jù)
FIRE_AND_PURGE:觸發(fā)計算并清除窗口中的數(shù)據(jù)
2)這些函數(shù)可以被用來為后續(xù)的操作注冊處理時間定時器或者事件時間計時器
觸發(fā)和清除(Fire and Purge)
一旦一個觸發(fā)器決定一個窗口已經(jīng)準備好進行處理,它將觸發(fā)并返回FIRE或者FIRE_AND_PURGE。這是窗口操作發(fā)送當前窗口結果的信號,給定一個擁有一個WindowFunction的窗口那么所有的元素都將發(fā)送到WindowFunction中(可能之后還會發(fā)送到驅逐器(Evitor)中)。有ReduceFunction或者FoldFunction的Window僅僅發(fā)送他們的急切聚合結果。
當一個觸發(fā)器觸發(fā)時,它可以是FIRE或者FIRE_AND_PURGE,如果是FIRE的話,將保持window中的內容,FIRE_AND_PURGE的話,會清除window的內容。默認情況下,預實現(xiàn)的觸發(fā)器僅僅是FIRE,不會清除window的狀態(tài)。
注意:清除操作僅清除window的內容,并留下潛在的窗口元信息和完整的觸發(fā)器狀態(tài)。
窗口分配器默認的觸發(fā)器(Default Triggers of WindowAssigners)
默認的觸發(fā)器適用于許多種情況,例如:所有的事件時間分配器都有一個EventTimeTrigger作為默認的觸發(fā)器,這個觸發(fā)器僅在當水印通過窗口的最后時間時觸發(fā)。
注意:GlobalWindow默認的觸發(fā)器是NeverTrigger,是永遠不會觸發(fā)的,因此,如果你使用的是GlobalWindow的話,你需要定義一個自定義觸發(fā)器。
注意:通過調用trigger(...)來指定一個觸發(fā)器你就重寫了WindowAssigner的默認觸發(fā)器。例如:如果你為TumblingEventTimeWindows指定了一個CountTrigger,你就不會再通過時間來獲取觸發(fā)了,而是通過計數(shù)?,F(xiàn)在,如果你想通過時間和計數(shù)來觸發(fā)的話,你需要寫你自己自定義的觸發(fā)器。
內置的和自定義的觸發(fā)器(Build-in and Custom Triggers)
Flink有一些內置的觸發(fā)器:
*EventTimeTrigger(前面提到過)觸發(fā)是根據(jù)由水印衡量的事件時間的進度來的
*ProcessingTimeTrigger 根據(jù)處理時間來觸發(fā)
*CountTrigger 一旦窗口中的元素個數(shù)超出了給定的限制就會觸發(fā)
*PurgingTrigger 作為另一個觸發(fā)器的參數(shù)并將它轉換成一個清除類型
如果你想實現(xiàn)一個自定義的觸發(fā)器,你需要查看一下這個抽象類Trigger(https://github.com/apache/flink/blob/master//flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java ),請注意,這個API還在優(yōu)化中,后續(xù)的Flink版本可能會改變。
驅逐器(Evictors)
Flink的窗口模型允許指定一個除了WindowAssigner和Trigger之外的可選參數(shù)Evitor,這個可以通過調用evitor(...)方法(在這篇文檔的開頭展示過)來實現(xiàn)。這個驅逐器(evitor)可以在觸發(fā)器觸發(fā)之前或者之后,或者窗口函數(shù)被應用之前清理窗口中的元素。為了達到這個目的,Evitor接口有兩個方法:
/**
// Optionally evicts elements. Called before windowing function.
//
// @param elements The elements currently in the pane.
// @param size The current number of elements in the pane.
// @param window The {@link Window}
// @param evictorContext The context for the Evictor
///
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
/**
// Optionally evicts elements. Called after windowing function.
//
// @param elements The elements currently in the pane.
// @param size The current number of elements in the pane.
// @param window The {@link Window}
// @param evictorContext The context for the Evictor
*/
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
evitorBefore()方法包含了在window function之前被應用的驅逐邏輯,而evitorAfter()方法包含了在window function之后被應用的驅逐邏輯。在window function應用之前被驅逐的元素將不會再被window function處理。
Flink有三個預實現(xiàn)的驅逐器,他們是:
CountEvitor:在窗口中保持一個用戶指定數(shù)量的元素,并在窗口的開始處丟棄剩余的其他元素
DeltaEvitor: 通過一個DeltaFunction和一個閾值,計算窗口緩存中最近的一個元素和剩余的所有元素的delta值,并清除delta值大于或者等于閾值的元素
TimeEvitor:使用一個interval的毫秒數(shù)作為參數(shù),對于一個給定的窗口,它會找出元素中的最大時間戳max_ts,并清除時間戳小于max_tx - interval的元素。
默認情況下:所有預實現(xiàn)的evitor都是在window function前應用它們的邏輯
注意:指定一個Evitor要防止預聚合,因為窗口中的所有元素必須得在計算之前傳遞到驅逐器中
注意:Flink 并不保證窗口中的元素是有序的,所以驅逐器可能從窗口的開始處清除,元素到達的先后不是那么必要。
允許延遲(Allowed Lateness)
當處理事件時間的window時,可能會出現(xiàn)元素到達晚了,F(xiàn)link用來與事件時間聯(lián)系的水印已經(jīng)過了元素所屬的窗口的最后時間。可以查看事件時間(event time https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html )尤其是晚到元素(late elements https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html#late-elements )來了解Flink如何處理事件時間的討論。
默認情況下,當水印已經(jīng)過了窗口的最后時間時晚到的元素會被丟棄。然而,F(xiàn)link允許為窗口操作指定一個最大允許時延,允許時延指定了元素可以晚到多長時間,默認情況下是0。水印已經(jīng)過了窗口最后時間后才來的元素,如果還未到窗口最后時間加時延時間,那么元素任然添加到窗口中。如果依賴觸發(fā)器的使用的話,晚到但是未丟棄的元素可能會導致窗口再次被觸發(fā)。
為了達到這個目的,F(xiàn)link將保持窗口的狀態(tài)直到允許時延的發(fā)生,一旦發(fā)生,F(xiàn)link將清除Window,刪除window的狀態(tài),如Window 生命周期章節(jié)中所描述的那樣。
默認情況下,允許時延為0,也就是說水印之后到達的元素將被丟棄。
你可以按如下方式來指定一個允許時延:
Java 代碼:
DataStream<T> input = ...;
input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.<windowed transformation>(<window function>);
Scala 代碼:
val input: DataStream[T] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.<windowed transformation>(<window function>)
注意:當使用GlobalWindows分配器時,沒有數(shù)據(jù)會被認為是延遲的,因為Global Window的最后時間是Long.MAX_VALUE。
以側輸出來獲取延遲數(shù)據(jù)(Getting Late Data as a Site Output)
使用Flink的側輸出(https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/side_output.html )特性,你可以獲得一個已經(jīng)被丟棄的延遲數(shù)據(jù)流。
首先你需要在窗口化的數(shù)據(jù)流中調用sideOutputLateData(OutputTag)指定你需要獲取延遲數(shù)據(jù),然后,你就可以在window 操作的結果中獲取到側輸出流了。
代碼如下:
Java 代碼:
final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};
DataStream<T> input = ...;
DataStream<T> result = input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.sideOutputLateData(lateOutputTag)
.<windowed transformation>(<window function>);
DataStream<T> lateStream = result.getSideOutput(lateOutputTag);
Scala代碼:
val lateOutputTag = OutputTag[T]("late-data")
val input: DataStream[T] = ...
val result = input
.keyBy(<key selector>)
.window(<window assigner>)
.allowedLateness(<time>)
.sideOutputLateData(lateOutputTag)
.<windowed transformation>(<window function>)
val lateStream = result.getSideOutput(lateOutputTag)
延遲元素考慮(Late elements considerations)
當指定一個允許延遲大于0時,window以及window中的內容將會繼續(xù)保持即使水印已經(jīng)達到了window的最后時間。在這種情況下,當一個延遲事件到來而未丟棄時,它可能會觸發(fā)window中的其他觸發(fā)器。這些觸發(fā)叫做late firings,因為它們是由延遲事件觸發(fā)的,并相對于window中第一個觸發(fā)即主觸發(fā)而言。對于session window而言,late firing還會進一步導致window的合并,因為它們橋接了兩個之前存在差距,而未合并的window。
有用狀態(tài)大小的考慮(Useful state size considerations)
window 可以定義一個很長的周期(例如:一天、一周或者一月),因此積累了相當大的狀態(tài)。這里有些規(guī)則,當估計你的窗口計算的存儲要求時,需要記住。
1、Flink會在每個窗口中為每個屬于它的元素創(chuàng)建一份備份,鑒于此,滾動窗口保存了每個元素的一個備份,與此相反,滑動窗口會為每個元素創(chuàng)建幾個備份,如Window Assigner章節(jié)所述。因此,一個窗口大小為1天,滑動大小為1秒的滑動窗口可能就不是個好的策略了。
2、FoldFunction和ReduceFunction可以制定reduce的存儲需求,因為它們預聚合元素并且每個窗口只保存一個值。相反,只有WindowFunction需要累積所有的元素。
3、使用Evitor需要避免任何預聚合操作,因為窗口中的所有元素都需要在應用于計算之前傳遞到evitor中