摘要
Flink 認(rèn)為 Batch 是 Streaming 的一個(gè)特例,所以 Flink 底層引擎是一個(gè)流式引擎,在上面實(shí)現(xiàn)了流處理和批處理。而窗口(window)就是從 Streaming 到 Batch 的一個(gè)橋梁。Flink 提供了非常完善的窗口機(jī)制,這是我認(rèn)為的 Flink 最大的亮點(diǎn)之一(其他的亮點(diǎn)包括消息亂序處理,和 checkpoint 機(jī)制)。本文我們將介紹流式處理中的窗口概念,介紹 Flink 內(nèi)建的一些窗口和 Window API,最后討論下窗口在底層是如何實(shí)現(xiàn)的。
什么是 Window
在流處理應(yīng)用中,數(shù)據(jù)是連續(xù)不斷的,因此我們不可能等到所有數(shù)據(jù)都到了才開始處理。當(dāng)然我們可以每來一個(gè)消息就處理一次,但是有時(shí)我們需要做一些聚合類的處理,例如:在過去的1分鐘內(nèi)有多少用戶點(diǎn)擊了我們的網(wǎng)頁(yè)。在這種情況下,我們必須定義一個(gè)窗口,用來收集最近一分鐘內(nèi)的數(shù)據(jù),并對(duì)這個(gè)窗口內(nèi)的數(shù)據(jù)進(jìn)行計(jì)算。
窗口可以是時(shí)間驅(qū)動(dòng)的(Time Window,例如:每30秒鐘),也可以是數(shù)據(jù)驅(qū)動(dòng)的(Count Window,例如:每一百個(gè)元素)。一種經(jīng)典的窗口分類可以分成:翻滾窗口(Tumbling Window,無重疊),滾動(dòng)窗口(Sliding Window,有重疊),和會(huì)話窗口(Session Window,活動(dòng)間隙)。
我們舉個(gè)具體的場(chǎng)景來形象地理解不同窗口的概念。假設(shè),淘寶網(wǎng)會(huì)記錄每個(gè)用戶每次購(gòu)買的商品個(gè)數(shù),我們要做的是統(tǒng)計(jì)不同窗口中用戶購(gòu)買商品的總數(shù)。下圖給出了幾種經(jīng)典的窗口切分概述圖:

上圖中,raw data stream 代表用戶的購(gòu)買行為流,圈中的數(shù)字代表該用戶本次購(gòu)買的商品個(gè)數(shù),事件是按時(shí)間分布的,所以可以看出事件之間是有time gap的。Flink 提供了上圖中所有的窗口類型,下面我們會(huì)逐一進(jìn)行介紹。
Time Window
就如名字所說的,Time Window 是根據(jù)時(shí)間對(duì)數(shù)據(jù)流進(jìn)行分組的。這里我們涉及到了流處理中的時(shí)間問題,時(shí)間問題和消息亂序問題是緊密關(guān)聯(lián)的,這是流處理中現(xiàn)存的難題之一,我們將在后續(xù)的 EventTime 和消息亂序處理 中對(duì)這部分問題進(jìn)行深入探討。這里我們只需要知道 Flink 提出了三種時(shí)間的概念,分別是event time(事件時(shí)間:事件發(fā)生時(shí)的時(shí)間),ingestion time(攝取時(shí)間:事件進(jìn)入流處理系統(tǒng)的時(shí)間),processing time(處理時(shí)間:消息被計(jì)算處理的時(shí)間)。Flink 中窗口機(jī)制和時(shí)間類型是完全解耦的,也就是說當(dāng)需要改變時(shí)間類型時(shí)不需要更改窗口邏輯相關(guān)的代碼。
-
Tumbling Time Window
如上圖,我們需要統(tǒng)計(jì)每一分鐘中用戶購(gòu)買的商品的總數(shù),需要將用戶的行為事件按每一分鐘進(jìn)行切分,這種切分被成為翻滾時(shí)間窗口(Tumbling Time Window)。翻滾窗口能將數(shù)據(jù)流切分成不重疊的窗口,每一個(gè)事件只能屬于一個(gè)窗口。通過使用 DataStream API,我們可以這樣實(shí)現(xiàn):
// Stream of (userId, buyCnt)
val buyCnts: DataStream[(Int, Int)] = ...
val tumblingCnts: DataStream[(Int, Int)] = buyCnts
// key stream by userId
.keyBy(0)
// tumbling time window of 1 minute length
.timeWindow(Time.minutes(1))
// compute sum over buyCnt
.sum(1)
-
Sliding Time Window
但是對(duì)于某些應(yīng)用,它們需要的窗口是不間斷的,需要平滑地進(jìn)行窗口聚合。比如,我們可以每30秒計(jì)算一次最近一分鐘用戶購(gòu)買的商品總數(shù)。這種窗口我們稱為滑動(dòng)時(shí)間窗口(Sliding Time Window)。在滑窗中,一個(gè)元素可以對(duì)應(yīng)多個(gè)窗口。通過使用 DataStream API,我們可以這樣實(shí)現(xiàn):
val slidingCnts: DataStream[(Int, Int)] = buyCnts
.keyBy(0)
// sliding time window of 1 minute length and 30 secs trigger interval
.timeWindow(Time.minutes(1), Time.seconds(30))
.sum(1)
Count Window
Count Window 是根據(jù)元素個(gè)數(shù)對(duì)數(shù)據(jù)流進(jìn)行分組的。
-
Tumbling Count Window
當(dāng)我們想要每100個(gè)用戶購(gòu)買行為事件統(tǒng)計(jì)購(gòu)買總數(shù),那么每當(dāng)窗口中填滿100個(gè)元素了,就會(huì)對(duì)窗口進(jìn)行計(jì)算,這種窗口我們稱之為翻滾計(jì)數(shù)窗口(Tumbling Count Window),上圖所示窗口大小為3個(gè)。通過使用 DataStream API,我們可以這樣實(shí)現(xiàn):
// Stream of (userId, buyCnts)
val buyCnts: DataStream[(Int, Int)] = ...
val tumblingCnts: DataStream[(Int, Int)] = buyCnts
// key stream by sensorId
.keyBy(0)
// tumbling count window of 100 elements size
.countWindow(100)
// compute the buyCnt sum
.sum(1)
-
Sliding Count Window
當(dāng)然Count Window 也支持 Sliding Window,雖在上圖中未描述出來,但和Sliding Time Window含義是類似的,例如計(jì)算每10個(gè)元素計(jì)算一次最近100個(gè)元素的總和,代碼示例如下。
val slidingCnts: DataStream[(Int, Int)] = vehicleCnts
.keyBy(0)
// sliding count window of 100 elements size and 10 elements trigger interval
.countWindow(100, 10)
.sum(1)
Session Window
在這種用戶交互事件流中,我們首先想到的是將事件聚合到會(huì)話窗口中(一段用戶持續(xù)活躍的周期),由非活躍的間隙分隔開。如上圖所示,就是需要計(jì)算每個(gè)用戶在活躍期間總共購(gòu)買的商品數(shù)量,如果用戶30秒沒有活動(dòng)則視為會(huì)話斷開(假設(shè)raw data stream是單個(gè)用戶的購(gòu)買行為流)。Session Window 的示例代碼如下:
// Stream of (userId, buyCnts)
val buyCnts: DataStream[(Int, Int)] = ...
val sessionCnts: DataStream[(Int, Int)] = vehicleCnts
.keyBy(0)
// session window based on a 30 seconds session gap interval
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(30)))
.sum(1)
一般而言,window 是在無限的流上定義了一個(gè)有限的元素集合。這個(gè)集合可以是基于時(shí)間的,元素個(gè)數(shù)的,時(shí)間和個(gè)數(shù)結(jié)合的,會(huì)話間隙的,或者是自定義的。Flink 的 DataStream API 提供了簡(jiǎn)潔的算子來滿足常用的窗口操作,同時(shí)提供了通用的窗口機(jī)制來允許用戶自己定義窗口分配邏輯。下面我們會(huì)對(duì) Flink 窗口相關(guān)的 API 進(jìn)行剖析。
剖析 Window API
得益于 Flink Window API 松耦合設(shè)計(jì),我們可以非常靈活地定義符合特定業(yè)務(wù)的窗口。Flink 中定義一個(gè)窗口主要需要以下三個(gè)組件。
-
Window Assigner:用來決定某個(gè)元素被分配到哪個(gè)/哪些窗口中去。
-
Trigger:觸發(fā)器。決定了一個(gè)窗口何時(shí)能夠被計(jì)算或清除,每個(gè)窗口都會(huì)擁有一個(gè)自己的Trigger。
-
Evictor:可以譯為“驅(qū)逐者”。在Trigger觸發(fā)之后,在窗口被處理之前,Evictor(如果有Evictor的話)會(huì)用來剔除窗口中不需要的元素,相當(dāng)于一個(gè)filter。
上述三個(gè)組件的不同實(shí)現(xiàn)的不同組合,可以定義出非常復(fù)雜的窗口。Flink 中內(nèi)置的窗口也都是基于這三個(gè)組件構(gòu)成的,當(dāng)然內(nèi)置窗口有時(shí)候無法解決用戶特殊的需求,所以 Flink 也暴露了這些窗口機(jī)制的內(nèi)部接口供用戶實(shí)現(xiàn)自定義的窗口。下面我們將基于這三者探討窗口的實(shí)現(xiàn)機(jī)制。
Window 的實(shí)現(xiàn)
下圖描述了 Flink 的窗口機(jī)制以及各組件之間是如何相互工作的。

首先上圖中的組件都位于一個(gè)算子(window operator)中,數(shù)據(jù)流源源不斷地進(jìn)入算子,每一個(gè)到達(dá)的元素都會(huì)被交給 WindowAssigner。WindowAssigner 會(huì)決定元素被放到哪個(gè)或哪些窗口(window),可能會(huì)創(chuàng)建新窗口。因?yàn)橐粋€(gè)元素可以被放入多個(gè)窗口中,所以同時(shí)存在多個(gè)窗口是可能的。注意,
Window本身只是一個(gè)ID標(biāo)識(shí)符,其內(nèi)部可能存儲(chǔ)了一些元數(shù)據(jù),如TimeWindow中有開始和結(jié)束時(shí)間,但是并不會(huì)存儲(chǔ)窗口中的元素。窗口中的元素實(shí)際存儲(chǔ)在 Key/Value State 中,key為Window,value為元素集合(或聚合值)。為了保證窗口的容錯(cuò)性,該實(shí)現(xiàn)依賴了 Flink 的 State 機(jī)制(參見 state 文檔)。
每一個(gè)窗口都擁有一個(gè)屬于自己的 Trigger,Trigger上會(huì)有定時(shí)器,用來決定一個(gè)窗口何時(shí)能夠被計(jì)算或清除。每當(dāng)有元素加入到該窗口,或者之前注冊(cè)的定時(shí)器超時(shí)了,那么Trigger都會(huì)被調(diào)用。Trigger的返回結(jié)果可以是 continue(不做任何操作),fire(處理窗口數(shù)據(jù)),purge(移除窗口和窗口中的數(shù)據(jù)),或者 fire + purge。一個(gè)Trigger的調(diào)用結(jié)果只是fire的話,那么會(huì)計(jì)算窗口并保留窗口原樣,也就是說窗口中的數(shù)據(jù)仍然保留不變,等待下次Trigger fire的時(shí)候再次執(zhí)行計(jì)算。一個(gè)窗口可以被重復(fù)計(jì)算多次知道它被 purge 了。在purge之前,窗口會(huì)一直占用著內(nèi)存。
當(dāng)Trigger fire了,窗口中的元素集合就會(huì)交給Evictor(如果指定了的話)。Evictor 主要用來遍歷窗口中的元素列表,并決定最先進(jìn)入窗口的多少個(gè)元素需要被移除。剩余的元素會(huì)交給用戶指定的函數(shù)進(jìn)行窗口的計(jì)算。如果沒有 Evictor 的話,窗口中的所有元素會(huì)一起交給函數(shù)進(jìn)行計(jì)算。
計(jì)算函數(shù)收到了窗口的元素(可能經(jīng)過了 Evictor 的過濾),并計(jì)算出窗口的結(jié)果值,并發(fā)送給下游。窗口的結(jié)果值可以是一個(gè)也可以是多個(gè)。DataStream API 上可以接收不同類型的計(jì)算函數(shù),包括預(yù)定義的sum(),min(),max(),還有 ReduceFunction,FoldFunction,還有WindowFunction。WindowFunction 是最通用的計(jì)算函數(shù),其他的預(yù)定義的函數(shù)基本都是基于該函數(shù)實(shí)現(xiàn)的。
Flink 對(duì)于一些聚合類的窗口計(jì)算(如sum,min)做了優(yōu)化,因?yàn)榫酆项惖挠?jì)算不需要將窗口中的所有數(shù)據(jù)都保存下來,只需要保存一個(gè)result值就可以了。每個(gè)進(jìn)入窗口的元素都會(huì)執(zhí)行一次聚合函數(shù)并修改result值。這樣可以大大降低內(nèi)存的消耗并提升性能。但是如果用戶定義了 Evictor,則不會(huì)啟用對(duì)聚合窗口的優(yōu)化,因?yàn)?Evictor 需要遍歷窗口中的所有元素,必須要將窗口中所有元素都存下來。
源碼分析
上述的三個(gè)組件構(gòu)成了 Flink 的窗口機(jī)制。為了更清楚地描述窗口機(jī)制,以及解開一些疑惑(比如 purge 和 Evictor 的區(qū)別和用途),我們將一步步地解釋 Flink 內(nèi)置的一些窗口(Time Window,Count Window,Session Window)是如何實(shí)現(xiàn)的。
Count Window 實(shí)現(xiàn)
Count Window 是使用三組件的典范,我們可以在 KeyedStream 上創(chuàng)建 Count Window,其源碼如下所示:
// tumbling count window
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size) {
return window(GlobalWindows.create()) // create window stream using GlobalWindows
.trigger(PurgingTrigger.of(CountTrigger.of(size))); // trigger is window size
}
// sliding count window
public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
return window(GlobalWindows.create())
.evictor(CountEvictor.of(size)) // evictor is window size
.trigger(CountTrigger.of(slide)); // trigger is slide size
}
第一個(gè)函數(shù)是申請(qǐng)翻滾計(jì)數(shù)窗口,參數(shù)為窗口大小。第二個(gè)函數(shù)是申請(qǐng)滑動(dòng)計(jì)數(shù)窗口,參數(shù)分別為窗口大小和滑動(dòng)大小。它們都是基于 GlobalWindows 這個(gè) WindowAssigner 來創(chuàng)建的窗口,該assigner會(huì)將所有元素都分配到同一個(gè)global window中,所有GlobalWindows的返回值一直是 GlobalWindow 單例?;旧献远x的窗口都會(huì)基于該assigner實(shí)現(xiàn)。
翻滾計(jì)數(shù)窗口并不帶evictor,只注冊(cè)了一個(gè)trigger。該trigger是帶purge功能的 CountTrigger。也就是說每當(dāng)窗口中的元素?cái)?shù)量達(dá)到了 window-size,trigger就會(huì)返回fire+purge,窗口就會(huì)執(zhí)行計(jì)算并清空窗口中的所有元素,再接著儲(chǔ)備新的元素。從而實(shí)現(xiàn)了tumbling的窗口之間無重疊。
滑動(dòng)計(jì)數(shù)窗口的各窗口之間是有重疊的,但我們用的 GlobalWindows assinger 從始至終只有一個(gè)窗口,不像 sliding time assigner 可以同時(shí)存在多個(gè)窗口。所以trigger結(jié)果不能帶purge,也就是說計(jì)算完窗口后窗口中的數(shù)據(jù)要保留下來(供下個(gè)滑窗使用)。另外,trigger的間隔是slide-size,evictor的保留的元素個(gè)數(shù)是window-size。也就是說,每個(gè)滑動(dòng)間隔就觸發(fā)一次窗口計(jì)算,并保留下最新進(jìn)入窗口的window-size個(gè)元素,剔除舊元素。
假設(shè)有一個(gè)滑動(dòng)計(jì)數(shù)窗口,每2個(gè)元素計(jì)算一次最近4個(gè)元素的總和,那么窗口工作示意圖如下所示:

圖中所示的各個(gè)窗口邏輯上是不同的窗口,但在物理上是同一個(gè)窗口。該滑動(dòng)計(jì)數(shù)窗口,trigger的觸發(fā)條件是元素個(gè)數(shù)達(dá)到2個(gè)(每進(jìn)入2個(gè)元素就會(huì)觸發(fā)一次),evictor保留的元素個(gè)數(shù)是4個(gè),每次計(jì)算完窗口總和后會(huì)保留剩余的元素。所以第一次觸發(fā)trigger是當(dāng)元素5進(jìn)入,第三次觸發(fā)trigger是當(dāng)元素2進(jìn)入,并驅(qū)逐5和2,計(jì)算剩余的4個(gè)元素的總和(22)并發(fā)送出去,保留下2,4,9,7元素供下個(gè)邏輯窗口使用。
Time Window 實(shí)現(xiàn)
同樣的,我們也可以在 KeyedStream 上申請(qǐng) Time Window,其源碼如下所示:
// tumbling time window
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return window(TumblingProcessingTimeWindows.of(size));
} else {
return window(TumblingEventTimeWindows.of(size));
}
}
// sliding time window
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size, Time slide) {
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return window(SlidingProcessingTimeWindows.of(size, slide));
} else {
return window(SlidingEventTimeWindows.of(size, slide));
}
}
在方法體內(nèi)部會(huì)根據(jù)當(dāng)前環(huán)境注冊(cè)的時(shí)間類型,使用不同的WindowAssigner創(chuàng)建window。可以看到,EventTime和IngestTime都使用了XXXEventTimeWindows這個(gè)assigner,因?yàn)镋ventTime和IngestTime在底層的實(shí)現(xiàn)上只是在Source處為Record打時(shí)間戳的實(shí)現(xiàn)不同,在window operator中的處理邏輯是一樣的。
這里我們主要分析sliding process time window,如下是相關(guān)源碼:
public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
private static final long serialVersionUID = 1L;
private final long size;
private final long slide;
private SlidingProcessingTimeWindows(long size, long slide) {
this.size = size;
this.slide = slide;
}
@Override
public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
timestamp = System.currentTimeMillis();
List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
// 對(duì)齊時(shí)間戳
long lastStart = timestamp - timestamp % slide;
for (long start = lastStart;
start > timestamp - size;
start -= slide) {
// 當(dāng)前時(shí)間戳對(duì)應(yīng)了多個(gè)window
windows.add(new TimeWindow(start, start + size));
}
return windows;
}
...
}
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
@Override
// 每個(gè)元素進(jìn)入窗口都會(huì)調(diào)用該方法
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
// 注冊(cè)定時(shí)器,當(dāng)系統(tǒng)時(shí)間到達(dá)window end timestamp時(shí)會(huì)回調(diào)該trigger的onProcessingTime方法
ctx.registerProcessingTimeTimer(window.getEnd());
return TriggerResult.CONTINUE;
}
@Override
// 返回結(jié)果表示執(zhí)行窗口計(jì)算并清空窗口
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
return TriggerResult.FIRE_AND_PURGE;
}
...
}
首先,SlidingProcessingTimeWindows會(huì)對(duì)每個(gè)進(jìn)入窗口的元素根據(jù)系統(tǒng)時(shí)間分配到(size / slide)個(gè)不同的窗口,并會(huì)在每個(gè)窗口上根據(jù)窗口結(jié)束時(shí)間注冊(cè)一個(gè)定時(shí)器(相同窗口只會(huì)注冊(cè)一份),當(dāng)定時(shí)器超時(shí)時(shí)意味著該窗口完成了,這時(shí)會(huì)回調(diào)對(duì)應(yīng)窗口的Trigger的onProcessingTime方法,返回FIRE_AND_PURGE,也就是會(huì)執(zhí)行窗口計(jì)算并清空窗口。整個(gè)過程示意圖如下:

如上圖所示橫軸代表時(shí)間戳(為簡(jiǎn)化問題,時(shí)間戳從0開始),第一條record會(huì)被分配到[-5,5)和[0,10)兩個(gè)窗口中,當(dāng)系統(tǒng)時(shí)間到5時(shí),就會(huì)計(jì)算[-5,5)窗口中的數(shù)據(jù),并將結(jié)果發(fā)送出去,最后清空窗口中的數(shù)據(jù),釋放該窗口資源。
Session Window 實(shí)現(xiàn)
Session Window 是一個(gè)需求很強(qiáng)烈的窗口機(jī)制,但Session也比之前的Window更復(fù)雜,所以 Flink 也是在即將到來的 1.1.0 版本中才支持了該功能。由于篇幅問題,我們將在后續(xù)的 Session Window 的實(shí)現(xiàn) 中深入探討 Session Window 的實(shí)現(xiàn)。


