窗口是流式計(jì)算中非常重要的一個概念, 很多常見的功能都是通過各種窗口實(shí)現(xiàn)的, 比如每5分鐘統(tǒng)計(jì)一下剛?cè)?小時的熱度. Flink DataStream API 將窗口獨(dú)立成 Operator. 每個窗口算子包含了以下幾個部分:
Windows Assigner
指定窗口的類型, 定義如何將數(shù)據(jù)流分配到一個或者多個窗口
Windows Trigger
指定窗口觸發(fā)的時機(jī), 定義窗口滿足什么樣的條件觸發(fā)計(jì)算
Evictor
用戶數(shù)據(jù)剔除
Lateness
標(biāo)記是否處理遲到的數(shù)據(jù), 當(dāng)遲到數(shù)據(jù)到達(dá)窗口中是否觸發(fā)計(jì)算
Output Tag
標(biāo)記輸出標(biāo)簽, 然后再通過 getSideOutput 將窗口中的數(shù)據(jù)根據(jù)標(biāo)簽輸出
Windows Function
定義窗口上的數(shù)據(jù)處理的邏輯, 例如對數(shù)據(jù)進(jìn)行sum
2. Window Assigner
首先最需要了解的就是 windows Assigner了, 我們想要一個什么樣的窗口劃分, 主要就是通過他來實(shí)現(xiàn)的.
根據(jù) flink 上游的數(shù)據(jù)集是否為 KeyedStream 類型 來做分別的處理. 如果使用了keyBy( ) 則對應(yīng)使用window( ) 來處理, 否則可以使用 windowAll( )來使用
Flink 可以支持兩種類型的窗口, 分別是基于時間的窗口和基于數(shù)量的窗口.基于時間的意思就是按照時間去劃分窗口,同理,基于數(shù)量的也是根據(jù)窗口中的數(shù)量來做切分的. 對應(yīng)的分別就是 timeWindow() 和 countWindow() 來使用, 下面的示例主要使用 timeWindow() 來演示.
對于不同的 Window Assigner, 還可以把窗口劃分為4大類, 分別是 滾動窗口(Tumbling Windows) / 滑動窗口(Sliding Window) / 會話窗口(Session Window) 和 全局窗口(Global Window).
滾動窗口
DataStream API 提供基于 EventTime 和 ProcessingTime 的兩種類型的 Tumbling window.對應(yīng)的 Assigner 分別是 TumblingEventTimeWindow 和 ProcessingEventTimeWindow . 舉例如下,完整代碼見Github.
// 使用ProcessTime的滾動時間窗口, 長度為10s
stream.keyBy(x -> x.f1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10))).process(...)
// 使用ProcessTime的滾動時間窗口, 長度為10s
stream.keyBy(x ->x.f1).window(TumblingEventTimeWindows.of(Time.seconds(10))).process(...)
使用 window(TumblingProcessingTimeWindows.of(Time.seconds(10))) 的方法有點(diǎn)啰嗦, Flink 還提供了timeWindow( ) 的 API 來簡化這一行代碼.
// 直接使用 timeWindow API 便可實(shí)現(xiàn)滾動窗口的操作, 參數(shù)依舊是窗口的長度
// 窗口類型的時間由 time characteristic 確定, 如果指定為 event time,那么窗口也會自動用這個時間
input.keyBy(x -> x.f1).timeWindow(Time.seconds(10));
滑動窗口
滑動窗口顧名思義就是一個在不斷往后滑動的窗口, 比如說 每5分鐘 統(tǒng)計(jì)一個 最近一小時的時間, 那么就需要用滑動窗口來做處理. 滑動窗口主要是依靠 window size 和 slide time 來確定. 與滾動窗口類似的, flink 也提供了對應(yīng)不同時間的 Assigner API(SlidingEventTimeWindow / SlidingEventTimeWindow), 語法基本類似, 只是由原本的一個參數(shù)(窗口長度) 變?yōu)榱藘蓚€參數(shù)(窗口長度和滑動時間), 同樣的, 為了簡化代碼, 依然可以使用timeWindow() 來簡化.
// 兩個參數(shù)分別是 窗口長度 和 滑動時間, 窗口時間類型依舊通過time characteristic 確定
input.keyBy(x -> x.f1).timeWindow(Time.seconds(10), Time.seconds(1))
會話窗口
會話窗口主要是將某段時間內(nèi)活躍度較高的數(shù)據(jù)聚合成一個窗口計(jì)算. 觸發(fā)條件是 Session Gap. 在規(guī)定的時間內(nèi)沒有數(shù)據(jù)接入則認(rèn)為這個窗口結(jié)束,然后觸發(fā)窗口計(jì)算. Session Gap 除了固定間隔的方式, 也可以動態(tài)抽取.
// 創(chuàng)建 Session Window, 間隔為 3s
DataStream<Tuple3<String, Long, Integer>> aggregated = source
.keyBy(0)
.window(EventTimeSessionWindows.withGap(Time.seconds(3L)))
.sum(2);
全局窗口
全局窗口將所有key的數(shù)據(jù)分配到單個窗口中計(jì)算結(jié)果.
// 創(chuàng)建 GlobalWindow
input.keyBy(1)
.window(GlobalWindows.create())
.sum(1);
上面就是構(gòu)建不同的窗口的方法了, 下文會介紹在有了窗口之后怎樣對窗口中的數(shù)據(jù)做處理