窗口是流式計(jì)算中非常常用的算子之一,通過(guò)窗口可以將無(wú)限流切分成有限流,然后在每個(gè)窗口之上使用計(jì)算函數(shù),可以實(shí)現(xiàn)非常靈活的操作。Flink提供了豐富的窗口操作,除此之外,用戶還可以根據(jù)自己的處理場(chǎng)景自定義窗口。通過(guò)本文,你可以了解到:
- 窗口的基本概念和簡(jiǎn)單使用
- 內(nèi)置Window Assigners的分類、源碼及使用
- Window Function的分類及使用
- 窗口的組成部分及生命周期源碼解讀
- 完整的窗口使用Demo案例
Quick Start
是什么
Window(窗口)是處理無(wú)界流的核心算子,Window可以將數(shù)據(jù)流分為固定大小的"桶(buckets)"(即通過(guò)按照固定時(shí)間或長(zhǎng)度將數(shù)據(jù)流切分成不同的窗口),在每一個(gè)窗口上,用戶可以使用一些計(jì)算函數(shù)對(duì)窗口內(nèi)的數(shù)據(jù)進(jìn)行處理,從而得到一定時(shí)間范圍內(nèi)的統(tǒng)計(jì)結(jié)果。比如統(tǒng)計(jì)每隔5分鐘輸出最近一小時(shí)內(nèi)點(diǎn)擊量最多的前 N 個(gè)商品,這樣就可以使用一個(gè)小時(shí)的時(shí)間窗口將數(shù)據(jù)限定在固定時(shí)間范圍內(nèi),然后可以對(duì)該范圍內(nèi)的有界數(shù)據(jù)執(zhí)行聚合處理。
根據(jù)作用的數(shù)據(jù)流(DataStream、KeyedStream),Window可以分為兩種:Keyed Windows與Non-Keyed Windows。其中Keyed Windows是在KeyedStream上使用window(…)操作,產(chǎn)生一個(gè)WindowedStream。Non-Keyed Windows是在DataStream上使用windowAll(…)操作,產(chǎn)生一個(gè)AllWindowedStream。具體的轉(zhuǎn)換關(guān)系如下圖所示。注意:一般不推薦使用AllWindowedStream,因?yàn)樵谄胀魃线M(jìn)行窗口操作,會(huì)將所有分區(qū)的流都匯集到單個(gè)的Task中,即并行度為1,從而會(huì)影響性能。

如何用
上面我們介紹了什么是窗口,那么該如何使用窗口呢?具體如下面的代碼片段:
Keyed Windows
stream
.keyBy(...) // keyedStream上使用window
.window(...) // 必選: 指定窗口分配器( window assigner)
[.trigger(...)] // 可選: 指定觸發(fā)器(trigger),如果不指定,則使用默認(rèn)值
[.evictor(...)] // 可選: 指定清除器(evictor),如果不指定,則沒(méi)有
[.allowedLateness(...)] // 可選: 指定是否延遲處理數(shù)據(jù),如果不指定,默認(rèn)使用0
[.sideOutputLateData(...)] // 可選: 配置side output,如果不指定,則沒(méi)有
.reduce/aggregate/fold/apply() // 必選: 指定窗口計(jì)算函數(shù)
[.getSideOutput(...)] // 可選: 從side output中獲取數(shù)據(jù)
Non-Keyed Windows
stream
.windowAll(...) // 必選: 指定窗口分配器( window assigner)
[.trigger(...)] // 可選: 指定觸發(fā)器(trigger),如果不指定,則使用默認(rèn)值
[.evictor(...)] // 可選: 指定清除器(evictor),如果不指定,則沒(méi)有
[.allowedLateness(...)] // 可選: 指定是否延遲處理數(shù)據(jù),如果不指定,默認(rèn)使用0
[.sideOutputLateData(...)] // 可選: 配置side output,如果不指定,則沒(méi)有
.reduce/aggregate/fold/apply() // 必選: 指定窗口計(jì)算函數(shù)
[.getSideOutput(...)] // 可選: 從side output中獲取數(shù)據(jù)
簡(jiǎn)寫(xiě)window操作
上面的代碼片段中,要在keyedStream上使用window(…)或者在DataStream上使用windowAll(…),需要傳入一個(gè)window assigner的參數(shù),關(guān)于window assigner下文會(huì)進(jìn)行詳細(xì)解釋。如下面代碼片段:
// -------------------------------------------
// Keyed Windows
// -------------------------------------------
stream
.keyBy(id)
.window(TumblingEventTimeWindows.of(Time.seconds(5))) // 5S的滾動(dòng)窗口
.reduce(MyReduceFunction)
// -------------------------------------------
// Non-Keyed Windows
// -------------------------------------------
stream
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // 5S的滾動(dòng)窗口
.reduce(MyReduceFunction)
上面的代碼可以簡(jiǎn)寫(xiě)為:
// -------------------------------------------
// Keyed Windows
// -------------------------------------------
stream
.keyBy(id)
.timeWindow(Time.seconds(5)) // 5S的滾動(dòng)窗口
.reduce(MyReduceFunction)
// -------------------------------------------
// Non-Keyed Windows
// -------------------------------------------
stream
.timeWindowAll(Time.seconds(5)) // 5S的滾動(dòng)窗口
.reduce(MyReduceFunction)
關(guān)于上面的簡(jiǎn)寫(xiě),以KeyedStream為例,對(duì)于看一下具體的KeyedStream源碼片段,可以看出底層調(diào)用的還是非簡(jiǎn)寫(xiě)時(shí)的代碼。關(guān)于timeWindowAll()的代碼也是一樣的,可以參考DataStream源碼,這里不再贅述。
// 會(huì)根據(jù)用戶的使用的時(shí)間類型,調(diào)用不同的內(nèi)置window Assigner
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
return window(TumblingProcessingTimeWindows.of(size));
} else {
return window(TumblingEventTimeWindows.of(size));
}
}
Window Assigners
分類
WindowAssigner負(fù)責(zé)將輸入的數(shù)據(jù)分配到一個(gè)或多個(gè)窗口,F(xiàn)link內(nèi)置了許多WindowAssigner,這些WindowAssigner可以滿足大部分的使用場(chǎng)景。比如tumbling windows, sliding windows, session windows , global windows。如果這些內(nèi)置的WindowAssigner不能滿足你的需求,可以通過(guò)繼承WindowAssigner類實(shí)現(xiàn)自定義的WindowAssigner。
上面的WindowAssigner是基于時(shí)間的(time-based windows),除此之外,F(xiàn)link還提供了基于數(shù)量的窗口(count-based windows),即根據(jù)窗口的元素?cái)?shù)量定義窗口大小,這種情況下,如果數(shù)據(jù)存在亂序,將導(dǎo)致窗口計(jì)算結(jié)果不確定。本文重點(diǎn)介紹基于時(shí)間的窗口使用,由于篇幅有限,關(guān)于基于數(shù)量的窗口將不做討論。

使用介紹
下面將會(huì)對(duì)Flink內(nèi)置的四種基于時(shí)間的windowassigner,進(jìn)行一一分析。
Tumbling Windows
- 圖解
Tumbling Windows(滾動(dòng)窗口)是將數(shù)據(jù)分配到確定的窗口中,根據(jù)固定時(shí)間或大小進(jìn)行切分,每個(gè)窗口有固定的大小且窗口之間不存在重疊(如下圖所示)。這種比較簡(jiǎn)單,適用于按照周期統(tǒng)計(jì)某一指標(biāo)的場(chǎng)景。
關(guān)于時(shí)間的選擇,可以使用Event Time或者Processing Time,分別對(duì)應(yīng)的window assigner為:TumblingEventTimeWindows、TumblingProcessingTimeWindows。用戶可以使用window assigner的of(size)方法指定時(shí)間間隔,其中時(shí)間單位可以是Time.milliseconds(x)、Time.seconds(x)或Time.minutes(x)等。

- 使用
// 使用EventTime
datastream
.keyBy(id)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.process(new MyProcessFunction())
// 使用processing-time
datastream
.keyBy(id)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.process(new MyProcessFunction())
Sliding Windows
- 圖解
Sliding Windows(滑動(dòng)窗口)在滾動(dòng)窗口之上加了一個(gè)滑動(dòng)窗口的時(shí)間,這種類型的窗口是會(huì)存在窗口重疊的(如下圖所示)。滾動(dòng)窗口是按照窗口固定的時(shí)間大小向前滾動(dòng),而滑動(dòng)窗口是根據(jù)設(shè)定的滑動(dòng)時(shí)間向前滑動(dòng)。窗口之間的重疊部分的大小取決于窗口大小與滑動(dòng)的時(shí)間大小,當(dāng)滑動(dòng)時(shí)間小于窗口時(shí)間大小時(shí)便會(huì)出現(xiàn)重疊。當(dāng)滑動(dòng)時(shí)間大于窗口時(shí)間大小時(shí),會(huì)出現(xiàn)窗口不連續(xù)的情況,導(dǎo)致數(shù)據(jù)可能不屬于任何一個(gè)窗口。當(dāng)兩者相等時(shí),其功能就和滾動(dòng)窗口相同了?;瑒?dòng)窗口的使用場(chǎng)景是:用戶根據(jù)設(shè)定的統(tǒng)計(jì)周期來(lái)計(jì)算指定窗口時(shí)間大小的指標(biāo),比如每隔5分鐘輸出最近一小時(shí)內(nèi)點(diǎn)擊量最多的前 N 個(gè)商品。
關(guān)于時(shí)間的選擇,可以使用Event Time或者Processing Time,分別對(duì)應(yīng)的window assigner為:SlidingEventTimeWindows、SlidingProcessingTimeWindows。用戶可以使用window assigner的of(size)方法指定時(shí)間間隔,其中時(shí)間單位可以是Time.milliseconds(x)、Time.seconds(x)或Time.minutes(x)等。

- 使用
// 使用EventTime
datastream
.keyBy(id)
.window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.process(new MyProcessFunction())
// 使用processing-time
datastream
.keyBy(id)
.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
.process(new MyProcessFunction())
Session Windows
- 圖解
Session Windows(會(huì)話窗口)主要是將某段時(shí)間內(nèi)活躍度較高的數(shù)據(jù)聚合成一個(gè)窗口進(jìn)行計(jì)算,窗口的觸發(fā)的條件是Session Gap,是指在規(guī)定的時(shí)間內(nèi)如果沒(méi)有數(shù)據(jù)活躍接入,則認(rèn)為窗口結(jié)束,然后觸發(fā)窗口計(jì)算結(jié)果。需要注意的是如果數(shù)據(jù)一直不間斷地進(jìn)入窗口,也會(huì)導(dǎo)致窗口始終不觸發(fā)的情況。與滑動(dòng)窗口、滾動(dòng)窗口不同的是,Session Windows不需要有固定窗口大小(window size)和滑動(dòng)時(shí)間(slide time),只需要定義session gap,來(lái)規(guī)定不活躍數(shù)據(jù)的時(shí)間上限即可。如下圖所示。Session Windows窗口類型比較適合非連續(xù)型數(shù)據(jù)處理或周期性產(chǎn)生數(shù)據(jù)的場(chǎng)景,根據(jù)用戶在線上某段時(shí)間內(nèi)的活躍度對(duì)用戶行為數(shù)據(jù)進(jìn)行統(tǒng)計(jì)。
關(guān)于時(shí)間的選擇,可以使用Event Time或者Processing Time,分別對(duì)應(yīng)的window assigner為:EventTimeSessionWindows和ProcessTimeSessionWindows。用戶可以使用window assigner的withGap()方法指定時(shí)間間隔,其中時(shí)間單位可以是Time.milliseconds(x)、Time.seconds(x)或Time.minutes(x)等。

- 使用
// 使用EventTime
datastream
.keyBy(id)
.window((EventTimeSessionWindows.withGap(Time.minutes(15)))
.process(new MyProcessFunction())
// 使用processing-time
datastream
.keyBy(id)
.window(ProcessingTimeSessionWindows.withGap(Time.minutes(15)))
.process(new MyProcessFunction())
注意:由于session window的開(kāi)始時(shí)間與結(jié)束時(shí)間取決于接收的數(shù)據(jù)。windowassigner不會(huì)立即分配所有的元素到正確的窗口,SessionWindow會(huì)為每個(gè)接收的元素初始化一個(gè)以該元素的時(shí)間戳為開(kāi)始時(shí)間的窗口,使用session gap作為窗口大小,然后再合并重疊部分的窗口。所以, session window 操作需要指定用于合并的 Trigger 和 Window Function,比如ReduceFunction, AggregateFunction, or ProcessWindowFunction。
Global Windows
- 圖解
Global Windows(全局窗口)將所有相同的key的數(shù)據(jù)分配到單個(gè)窗口中計(jì)算結(jié)果,窗口沒(méi)有起始和結(jié)束時(shí)間,窗口需要借助于Triger來(lái)觸發(fā)計(jì)算,如果不對(duì)Global Windows指定Triger,窗口是不會(huì)觸發(fā)計(jì)算的。因此,使用Global Windows需要非常慎重,用戶需要非常明確自己在整個(gè)窗口中統(tǒng)計(jì)出的結(jié)果是什么,并指定對(duì)應(yīng)的觸發(fā)器,同時(shí)還需要有指定相應(yīng)的數(shù)據(jù)清理機(jī)制,否則數(shù)據(jù)將一直留在內(nèi)存中。

- 使用
datastream
.keyBy(id)
.window(GlobalWindows.create())
.process(new MyProcessFunction())
Window Functions
分類
Flink提供了兩大類窗口函數(shù),分別為增量聚合函數(shù)和全量窗口函數(shù)。其中增量聚合函數(shù)的性能要比全量窗口函數(shù)高,因?yàn)樵隽烤酆洗翱谑腔谥虚g結(jié)果狀態(tài)計(jì)算最終結(jié)果的,即窗口中只維護(hù)一個(gè)中間結(jié)果狀態(tài),不要緩存所有的窗口數(shù)據(jù)。相反,對(duì)于全量窗口函數(shù)而言,需要對(duì)所以進(jìn)入該窗口的數(shù)據(jù)進(jìn)行緩存,等到窗口觸發(fā)時(shí)才會(huì)遍歷窗口內(nèi)所有數(shù)據(jù),進(jìn)行結(jié)果計(jì)算。如果窗口數(shù)據(jù)量比較大或者窗口時(shí)間較長(zhǎng),就會(huì)耗費(fèi)很多的資源緩存數(shù)據(jù),從而導(dǎo)致性能下降。
-
增量聚合函數(shù)
包括:ReduceFunction、AggregateFunction和FoldFunction
-
全量窗口函數(shù)
包括:ProcessWindowFunction
使用介紹
ReduceFunction
輸入兩個(gè)相同類型的數(shù)據(jù)元素按照指定的計(jì)算方法進(jìn)行聚合,然后輸出類型相同的一個(gè)結(jié)果元素。要求輸入元素的數(shù)據(jù)類型與輸出元素的數(shù)據(jù)類型必須一致。實(shí)現(xiàn)的效果是使用上一次的結(jié)果值與當(dāng)前值進(jìn)行聚合。具體使用案例如下:
public class ReduceFunctionExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 模擬數(shù)據(jù)源
SingleOutputStreamOperator<Tuple3<Long, Integer, Long>> input = env.fromElements(
Tuple3.of(1L, 10, 1588491228L),
Tuple3.of(1L, 15, 1588491229L),
Tuple3.of(1L, 20, 1588491238L),
Tuple3.of(1L, 25, 1588491248L),
Tuple3.of(2L, 10, 1588491258L),
Tuple3.of(2L, 30, 1588491268L),
Tuple3.of(2L, 20, 1588491278L)).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long, Integer, Long>>() {
@Override
public long extractAscendingTimestamp(Tuple3<Long, Integer, Long> element) {
return element.f2 * 1000;
}
});
input
.map(new MapFunction<Tuple3<Long, Integer, Long>, Tuple2<Long, Integer>>() {
@Override
public Tuple2<Long, Integer> map(Tuple3<Long, Integer, Long> value) {
return Tuple2.of(value.f0, value.f1);
}
})
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.reduce(new ReduceFunction<Tuple2<Long, Integer>>() {
@Override
public Tuple2<Long, Integer> reduce(Tuple2<Long, Integer> value1, Tuple2<Long, Integer> value2) throws Exception {
// 根據(jù)第一個(gè)元素分組,求第二個(gè)元素的累計(jì)和
return Tuple2.of(value1.f0, value1.f1 + value2.f1);
}
}).print();
env.execute("ReduceFunctionExample");
}
}
AggregateFunction
與ReduceFunction相似,AggregateFunction也是基于中間狀態(tài)計(jì)算結(jié)果的增量計(jì)算函數(shù),相比ReduceFunction,AggregateFunction在窗口計(jì)算上更加靈活,但是實(shí)現(xiàn)稍微復(fù)雜,需要實(shí)現(xiàn)AggregateFunction接口,重寫(xiě)四個(gè)方法。其最大的優(yōu)勢(shì)就是中間結(jié)果的數(shù)據(jù)類型和最終的結(jié)果類型不依賴于輸入的數(shù)據(jù)類型。關(guān)于AggregateFunction的源碼,如下所示:
/**
* @param <IN> 輸入元素的數(shù)據(jù)類型
* @param <ACC> 中間聚合結(jié)果的數(shù)據(jù)類型
* @param <OUT> 最終聚合結(jié)果的數(shù)據(jù)類型
*/
@PublicEvolving
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {
/**
* 創(chuàng)建一個(gè)新的累加器
*/
ACC createAccumulator();
/**
* 將新的數(shù)據(jù)與累加器進(jìn)行聚合,返回一個(gè)新的累加器
*/
ACC add(IN value, ACC accumulator);
/**
從累加器中計(jì)算最終結(jié)果并返回
*/
OUT getResult(ACC accumulator);
/**
* 合并兩個(gè)累加器并返回結(jié)果
*/
ACC merge(ACC a, ACC b);
}
具體使用代碼案例如下:
public class AggregateFunctionExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 模擬數(shù)據(jù)源
SingleOutputStreamOperator<Tuple3<Long, Integer, Long>> input = env.fromElements(
Tuple3.of(1L, 10, 1588491228L),
Tuple3.of(1L, 15, 1588491229L),
Tuple3.of(1L, 20, 1588491238L),
Tuple3.of(1L, 25, 1588491248L),
Tuple3.of(2L, 10, 1588491258L),
Tuple3.of(2L, 30, 1588491268L),
Tuple3.of(2L, 20, 1588491278L)).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long, Integer, Long>>() {
@Override
public long extractAscendingTimestamp(Tuple3<Long, Integer, Long> element) {
return element.f2 * 1000;
}
});
input.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.aggregate(new MyAggregateFunction()).print();
env.execute("AggregateFunctionExample");
}
private static class MyAggregateFunction implements AggregateFunction<Tuple3<Long, Integer, Long>,Tuple2<Long,Integer>,Tuple2<Long,Integer>> {
/**
* 創(chuàng)建一個(gè)累加器,初始化值
* @return
*/
@Override
public Tuple2<Long, Integer> createAccumulator() {
return Tuple2.of(0L,0);
}
/**
*
* @param value 輸入的元素值
* @param accumulator 中間結(jié)果值
* @return
*/
@Override
public Tuple2<Long, Integer> add(Tuple3<Long, Integer, Long> value, Tuple2<Long, Integer> accumulator) {
return Tuple2.of(value.f0,value.f1 + accumulator.f1);
}
/**
* 獲取計(jì)算結(jié)果值
* @param accumulator
* @return
*/
@Override
public Tuple2<Long, Integer> getResult(Tuple2<Long, Integer> accumulator) {
return Tuple2.of(accumulator.f0,accumulator.f1);
}
/**
* 合并中間結(jié)果值
* @param a 中間結(jié)果值a
* @param b 中間結(jié)果值b
* @return
*/
@Override
public Tuple2<Long, Integer> merge(Tuple2<Long, Integer> a, Tuple2<Long, Integer> b) {
return Tuple2.of(a.f0,a.f1 + b.f1);
}
}
}
FoldFunction
FoldFunction定義了如何將窗口中的輸入元素與外部的元素合并的邏輯,該接口已標(biāo)記過(guò)時(shí),建議用戶使用AggregateFunction來(lái)替換使用FoldFunction。
public class FoldFunctionExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 模擬數(shù)據(jù)源
SingleOutputStreamOperator<Tuple3<Long, Integer, Long>> input = env.fromElements(
Tuple3.of(1L, 10, 1588491228L),
Tuple3.of(1L, 15, 1588491229L),
Tuple3.of(1L, 20, 1588491238L),
Tuple3.of(1L, 25, 1588491248L),
Tuple3.of(2L, 10, 1588491258L),
Tuple3.of(2L, 30, 1588491268L),
Tuple3.of(2L, 20, 1588491278L)).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long, Integer, Long>>() {
@Override
public long extractAscendingTimestamp(Tuple3<Long, Integer, Long> element) {
return element.f2 * 1000;
}
});
input.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.fold("用戶",new FoldFunction<Tuple3<Long, Integer, Long>,String>() {
@Override
public String fold(String accumulator, Tuple3<Long, Integer, Long> value) throws Exception {
// 為第一個(gè)元素的值拼接一個(gè)"用戶"字符串,進(jìn)行輸出
return accumulator + value.f0 ;
}
}).print();
env.execute("FoldFunctionExample");
}
}
ProcessWindowFunction
前面提到的ReduceFunction和AggregateFunction都是基于中間狀態(tài)實(shí)現(xiàn)增量計(jì)算的窗口函數(shù)。有些時(shí)候需要使用整個(gè)窗口的所有數(shù)據(jù)進(jìn)行計(jì)算,比如求中位數(shù)和眾數(shù)。另外,ProcessWindowFunction的Context對(duì)象可以訪問(wèn)窗口的一些元數(shù)據(jù)信息,比如窗口結(jié)束時(shí)間、水位線等。ProcessWindowsFunction能夠更加靈活地支持基于窗口全部數(shù)據(jù)元素的結(jié)果計(jì)算。
在系統(tǒng)內(nèi)部,由ProcessWindowFunction處理的窗口會(huì)將所有已分配的數(shù)據(jù)存儲(chǔ)到ListState中,通過(guò)將數(shù)據(jù)收集起來(lái)且提供對(duì)于窗口的元數(shù)據(jù)及其他一些特性的訪問(wèn)和使用,應(yīng)用場(chǎng)景比ReduceFunction和AggregateFunction更加廣泛。關(guān)于ProcessWindowFunction抽象類的源碼,如下所示:
/**
* @param <IN> 輸入的數(shù)據(jù)類型
* @param <OUT> 輸出的數(shù)據(jù)類型
* @param <KEY> key的數(shù)據(jù)類型
* @param <W> window的類型
*/
@PublicEvolving
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;
/**
* 計(jì)算窗口數(shù)據(jù),輸出0個(gè)或多個(gè)元素
* @param key 窗口的key
* @param context 窗口的上下文
* @param elements 窗口內(nèi)的所有元素
* @param out 輸出元素的collector對(duì)象
* @throws Exception
*/
public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
/**
* 當(dāng)窗口被銷毀時(shí),刪除狀態(tài)
* @param context
* @throws Exception
*/
public void clear(Context context) throws Exception {}
//context可以訪問(wèn)窗口的元數(shù)據(jù)信息.
public abstract class Context implements java.io.Serializable {
//返回當(dāng)前被計(jì)算的窗口
public abstract W window();
// 返回當(dāng)前processing time.
public abstract long currentProcessingTime();
// 返回當(dāng)前event-time 水位線.
public abstract long currentWatermark();
//每個(gè)key和每個(gè)window的狀態(tài)訪問(wèn)器
public abstract KeyedStateStore windowState();
// 每個(gè)key的global state的狀態(tài)訪問(wèn)器.
public abstract KeyedStateStore globalState();
/**
* 向side output輸出數(shù)據(jù)
* @param outputTag the {@code OutputTag} side output 輸出的標(biāo)識(shí).
* @param value 輸出的數(shù)據(jù).
*/
public abstract <X> void output(OutputTag<X> outputTag, X value);
}
}
具體的使用案例如下:
public class ProcessWindowFunctionExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 模擬數(shù)據(jù)源
SingleOutputStreamOperator<Tuple3<Long, Integer, Long>> input = env.fromElements(
Tuple3.of(1L, 10, 1588491228L),
Tuple3.of(1L, 15, 1588491229L),
Tuple3.of(1L, 20, 1588491238L),
Tuple3.of(1L, 25, 1588491248L),
Tuple3.of(2L, 10, 1588491258L),
Tuple3.of(2L, 30, 1588491268L),
Tuple3.of(2L, 20, 1588491278L)).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long, Integer, Long>>() {
@Override
public long extractAscendingTimestamp(Tuple3<Long, Integer, Long> element) {
return element.f2 * 1000;
}
});
input.keyBy(t -> t.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.process(new MyProcessWindowFunction())
.print();
}
private static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple3<Long, Integer, Long>,Tuple3<Long,String,Integer>,Long,TimeWindow> {
@Override
public void process(
Long aLong,
Context context,
Iterable<Tuple3<Long, Integer, Long>> elements,
Collector<Tuple3<Long, String, Integer>> out) throws Exception {
int count = 0;
for (Tuple3<Long, Integer, Long> in: elements) {
count++;
}
// 統(tǒng)計(jì)每個(gè)窗口數(shù)據(jù)個(gè)數(shù),加上窗口輸出
out.collect(Tuple3.of(aLong,"" + context.window(),count));
}
}
}
增量聚合函數(shù)和ProcessWindowFunction整合
ProcessWindowFunction提供了很強(qiáng)大的功能,但是唯一的缺點(diǎn)就是需要更大的狀態(tài)存儲(chǔ)數(shù)據(jù)。在很多時(shí)候,增量聚合的使用是非常頻繁的,那么如何實(shí)現(xiàn)既支持增量聚合又支持訪問(wèn)窗口元數(shù)據(jù)的操作呢?可以將ReduceFunction和AggregateFunction與ProcessWindowFunction整合在一起使用。通過(guò)這種組合方式,分配給窗口的元素會(huì)立即被執(zhí)行計(jì)算,當(dāng)窗口觸發(fā)時(shí),會(huì)把聚合的結(jié)果傳給ProcessWindowFunction,這樣ProcessWindowFunction的process方法的Iterable參數(shù)被就只有一個(gè)值,即增量聚合的結(jié)果。
- ReduceFunction與ProcessWindowFunction組合
public class ReduceProcessWindowFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 模擬數(shù)據(jù)源
SingleOutputStreamOperator<Tuple3<Long, Integer, Long>> input = env.fromElements(
Tuple3.of(1L, 10, 1588491228L),
Tuple3.of(1L, 15, 1588491229L),
Tuple3.of(1L, 20, 1588491238L),
Tuple3.of(1L, 25, 1588491248L),
Tuple3.of(2L, 10, 1588491258L),
Tuple3.of(2L, 30, 1588491268L),
Tuple3.of(2L, 20, 1588491278L)).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long, Integer, Long>>() {
@Override
public long extractAscendingTimestamp(Tuple3<Long, Integer, Long> element) {
return element.f2 * 1000;
}
});
input.map(new MapFunction<Tuple3<Long, Integer, Long>, Tuple2<Long, Integer>>() {
@Override
public Tuple2<Long, Integer> map(Tuple3<Long, Integer, Long> value) {
return Tuple2.of(value.f0, value.f1);
}
})
.keyBy(t -> t.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.reduce(new MyReduceFunction(),new MyProcessWindowFunction())
.print();
env.execute("ProcessWindowFunctionExample");
}
private static class MyReduceFunction implements ReduceFunction<Tuple2<Long, Integer>> {
@Override
public Tuple2<Long, Integer> reduce(Tuple2<Long, Integer> value1, Tuple2<Long, Integer> value2) throws Exception {
//增量求和
return Tuple2.of(value1.f0,value1.f1 + value2.f1);
}
}
private static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple2<Long,Integer>,Tuple3<Long,Integer,String>,Long,TimeWindow> {
@Override
public void process(Long aLong, Context ctx, Iterable<Tuple2<Long, Integer>> elements, Collector<Tuple3<Long, Integer, String>> out) throws Exception {
// 將求和之后的結(jié)果附帶窗口結(jié)束時(shí)間一起輸出
out.collect(Tuple3.of(aLong,elements.iterator().next().f1,"window_end" + ctx.window().getEnd()));
}
}
}
- AggregateFunction與ProcessWindowFunction組合
public class AggregateProcessWindowFunction {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 模擬數(shù)據(jù)源
SingleOutputStreamOperator<Tuple3<Long, Integer, Long>> input = env.fromElements(
Tuple3.of(1L, 10, 1588491228L),
Tuple3.of(1L, 15, 1588491229L),
Tuple3.of(1L, 20, 1588491238L),
Tuple3.of(1L, 25, 1588491248L),
Tuple3.of(2L, 10, 1588491258L),
Tuple3.of(2L, 30, 1588491268L),
Tuple3.of(2L, 20, 1588491278L))
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long, Integer, Long>>() {
@Override
public long extractAscendingTimestamp(Tuple3<Long, Integer, Long> element) {
return element.f2 * 1000;
}
});
input.keyBy(t -> t.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.aggregate(new MyAggregateFunction(),new MyProcessWindowFunction())
.print();
env.execute("AggregateFunctionExample");
}
private static class MyAggregateFunction implements AggregateFunction<Tuple3<Long, Integer, Long>, Tuple2<Long, Integer>, Tuple2<Long, Integer>> {
/**
* 創(chuàng)建一個(gè)累加器,初始化值
*
* @return
*/
@Override
public Tuple2<Long, Integer> createAccumulator() {
return Tuple2.of(0L, 0);
}
/**
* @param value 輸入的元素值
* @param accumulator 中間結(jié)果值
* @return
*/
@Override
public Tuple2<Long, Integer> add(Tuple3<Long, Integer, Long> value, Tuple2<Long, Integer> accumulator) {
return Tuple2.of(value.f0, value.f1 + accumulator.f1);
}
/**
* 獲取計(jì)算結(jié)果值
*
* @param accumulator
* @return
*/
@Override
public Tuple2<Long, Integer> getResult(Tuple2<Long, Integer> accumulator) {
return Tuple2.of(accumulator.f0, accumulator.f1);
}
/**
* 合并中間結(jié)果值
*
* @param a 中間結(jié)果值a
* @param b 中間結(jié)果值b
* @return
*/
@Override
public Tuple2<Long, Integer> merge(Tuple2<Long, Integer> a, Tuple2<Long, Integer> b) {
return Tuple2.of(a.f0, a.f1 + b.f1);
}
}
private static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple2<Long,Integer>,Tuple3<Long,Integer,String>,Long,TimeWindow> {
@Override
public void process(Long aLong, Context ctx, Iterable<Tuple2<Long, Integer>> elements, Collector<Tuple3<Long, Integer, String>> out) throws Exception {
// 將求和之后的結(jié)果附帶窗口結(jié)束時(shí)間一起輸出
out.collect(Tuple3.of(aLong,elements.iterator().next().f1,"window_end" + ctx.window().getEnd()));
}
}
}
window 生命周期解讀
生命周期圖解
窗口從創(chuàng)建到執(zhí)行窗口計(jì)算再到被清除,需要經(jīng)過(guò)一系列的過(guò)程,這個(gè)過(guò)程就是窗口的生命周期。
首先,當(dāng)一個(gè)元素進(jìn)入窗口算子之前,會(huì)由WindowAssigner分配該元素進(jìn)入哪個(gè)或哪幾個(gè)窗口,如果窗口不存在,則創(chuàng)建窗口。
其次,數(shù)據(jù)進(jìn)入了窗口,這時(shí)要看有沒(méi)有使用增量聚合函數(shù),如果使用了增量聚合函數(shù)ReduceFunction或AggregateFunction,新加入窗口的元素會(huì)立即觸發(fā)增量計(jì)算,計(jì)算的結(jié)果作為窗口的內(nèi)容。如果沒(méi)有使用增量聚合函數(shù),則會(huì)將進(jìn)入窗口的數(shù)據(jù)存儲(chǔ)到ListState狀態(tài)中,進(jìn)一步等待窗口觸發(fā)時(shí),遍歷窗口元素進(jìn)行聚合計(jì)算。
然后,每個(gè)元素在進(jìn)入窗口之后會(huì)傳遞至該窗口的觸發(fā)器,觸發(fā)器決定了窗口何時(shí)被執(zhí)行計(jì)算及何時(shí)需要清除自身和保存的內(nèi)容。觸發(fā)器可以根據(jù)已分配的元素或注冊(cè)的計(jì)時(shí)器來(lái)決定某些特定時(shí)刻執(zhí)行窗口計(jì)算或清除窗口內(nèi)容。
最后,觸發(fā)器成功觸發(fā)之后的操作取決于使用的窗口函數(shù),如果使用的是增量聚合函數(shù),如ReduceFunction或AggregateFunction,則會(huì)直接輸出聚合的結(jié)果。如果只包含一個(gè)全量窗口函數(shù),如ProcessWindowFunction,則會(huì)作用窗口的所有元素,執(zhí)行計(jì)算,輸出結(jié)果。如果組合使用了ReduceFunction和ProcessWindowFunction,即組合使用了增量聚合窗口函數(shù)和全量窗口函數(shù),全量窗口函數(shù)會(huì)作用于增量聚合函數(shù)的聚合值,然后再輸出最終的結(jié)果。
- 情況1:僅使用增量聚合窗口函數(shù)

- 情況2:僅使用全量窗口函數(shù)

- 情況3:組合使用增量聚合窗口函數(shù)與全量窗口函數(shù)

分配器(Window Assigners)
WindowAssigner的作用是將輸入的元素分配到一個(gè)或多個(gè)窗口,當(dāng)WindowAssigner將第一個(gè)元素分配到窗口時(shí),就會(huì)創(chuàng)建該窗口,所以一個(gè)窗口一旦被創(chuàng)建,窗口中必然至少有一個(gè)元素。Flink內(nèi)置了很多WindowAssigners,本文主要討論基于時(shí)間的WindowAssigners,這些分配器都繼承了WindowAssigner抽象類。關(guān)于常用的分配器,上文已經(jīng)做了詳細(xì)解釋。下面先來(lái)看一下繼承關(guān)系圖:

接下來(lái),將會(huì)對(duì)WindowAssigner抽象類的源碼進(jìn)行分析,具體代碼如下:
```java
/**
* WindowAssigner分配一個(gè)元素到0個(gè)或多個(gè)窗口
* 在一個(gè)窗口算子內(nèi)部,元素是按照key進(jìn)行分組的(使用KeyedStream),
* 相同key和window的元素集合稱之為一個(gè)pane(格子)
* @param <T> 要分配元素的數(shù)據(jù)類型
* @param <W> window的類型:TimeWindow、GlobalWindow
*/
@PublicEvolving
public abstract class WindowAssigner<T, W extends Window> implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 返回一個(gè)向其分配元素的窗口集合
* @param element 待分配的元素
* @param timestamp 元素的時(shí)間戳
* @param context WindowAssignerContext對(duì)象
* @return
*/
public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);
/**
* 返回一個(gè)與該WindowAssigner相關(guān)的默認(rèn)trigger(觸發(fā)器)
* @param env 執(zhí)行環(huán)境
* @return
*/
public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);
/**
* 返回一個(gè)窗口序列化器
* @param executionConfig
* @return
*/
public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
/**
* 如果元素是基于event time分配到窗口的,則返回true
* @return
*/
public abstract boolean isEventTime();
/**
* 該Context允許訪問(wèn)當(dāng)前的處理時(shí)間processing time
*/
public abstract static class WindowAssignerContext {
/**
* 返回當(dāng)前的處理時(shí)間
*/
public abstract long getCurrentProcessingTime();
}
}
觸發(fā)器(Triggers)
數(shù)據(jù)接入窗口后,窗口是否觸發(fā)WindowFunciton計(jì)算,取決于窗口是否滿足觸發(fā)條件。Triggers就是決定窗口何時(shí)觸發(fā)計(jì)算并輸出結(jié)果的條件,Triggers可以根據(jù)時(shí)間或者具體的數(shù)據(jù)條件進(jìn)行觸發(fā),比如進(jìn)入窗口元素的個(gè)數(shù)或者進(jìn)入窗口的某些特定的元素值等。前面討論的內(nèi)置WindowAssigner都有各自默認(rèn)的觸發(fā)器,當(dāng)使用的是Processing Time時(shí),則當(dāng)處理時(shí)間超過(guò)窗口結(jié)束時(shí)間時(shí)會(huì)被觸發(fā)。當(dāng)使用Event Time時(shí),當(dāng)水位線超過(guò)窗口結(jié)束時(shí)間時(shí)會(huì)被觸發(fā)。
Flink在內(nèi)部提供很多內(nèi)置的觸發(fā)器,常用的主要有EventTimeTrigger、ProcessTimeTrigger以及CountTrigger等。每種每種觸發(fā)器都對(duì)應(yīng)于不同的Window Assigner,例如Event Time類型的Windows對(duì)應(yīng)的觸發(fā)器是EventTimeTrigger,其基本原理是判斷當(dāng)前的Watermark是否超過(guò)窗口的EndTime,如果超過(guò)則觸發(fā)對(duì)窗口內(nèi)數(shù)據(jù)的計(jì)算,反之不觸發(fā)計(jì)算。關(guān)于上面分析的內(nèi)置WindowAssigner的默認(rèn)trigger,可以從各自的源碼中看到,具體羅列如下:
| 分配器 | 對(duì)應(yīng)的源碼 | 默認(rèn)觸發(fā)器 |
|---|---|---|
| TumblingEventTimeWindows | public Trigger <object, timewindow="" style="font-size: inherit; color: inherit; line-height: inherit; margin: 0px; padding: 0px;">getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); }</object,> | EventTimeTrigger |
| TumblingProcessingTimeWindows | public Trigger <object, timewindow="" style="font-size: inherit; color: inherit; line-height: inherit; margin: 0px; padding: 0px;">getDefaultTrigger(StreamExecutionEnvironment env) { return ProcessingTimeTrigger.create(); }</object,> | ProcessingTimeTrigger |
| SlidingEventTimeWindows | public Trigger <object, timewindow="" style="font-size: inherit; color: inherit; line-height: inherit; margin: 0px; padding: 0px;">getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); }</object,> | EventTimeTrigger |
| SlidingProcessingTimeWindows | public Trigger <object, timewindow="" style="font-size: inherit; color: inherit; line-height: inherit; margin: 0px; padding: 0px;">getDefaultTrigger(StreamExecutionEnvironment env) { return ProcessingTimeTrigger.create(); }</object,> | ProcessingTimeTrigger |
| EventTimeSessionWindows | public Trigger <object, timewindow="" style="font-size: inherit; color: inherit; line-height: inherit; margin: 0px; padding: 0px;">getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); }</object,> | EventTimeTrigger |
| ProcessingTimeSessionWindows | public Trigger <object, timewindow="" style="font-size: inherit; color: inherit; line-height: inherit; margin: 0px; padding: 0px;">getDefaultTrigger(StreamExecutionEnvironment env) { return ProcessingTimeTrigger.create(); }</object,> | ProcessingTimeTrigger |
| GlobalWindows | public Trigger <object, globalwindow="" style="font-size: inherit; color: inherit; line-height: inherit; margin: 0px; padding: 0px;">getDefaultTrigger(StreamExecutionEnvironment env) { return new NeverTrigger(); }</object,> | NeverTrigger |
這些Trigger都繼承了Trigger抽象類,具體的繼承關(guān)系,如下圖:

關(guān)于這些內(nèi)置的Trigger的具體解釋如下:
| Trigger | 解釋 |
|---|---|
| EventTimeTrigger | 當(dāng)前的Watermark是否超過(guò)窗口的EndTime,如果超過(guò)則觸發(fā)對(duì)窗口內(nèi)數(shù)據(jù)的計(jì)算,反之不觸發(fā)計(jì)算; |
| ProcessTimeTrigger | 當(dāng)前的Processing Time是否超過(guò)窗口的EndTime,如果超過(guò)則觸發(fā)對(duì)窗口內(nèi)數(shù)據(jù)的計(jì)算,反之不觸發(fā)計(jì)算; |
| ContinuousEventTimeTrigger | 根據(jù)間隔時(shí)間周期性觸發(fā)窗口或者Window的結(jié)束時(shí)間小于當(dāng)前EventTime,觸發(fā)窗口計(jì)算; |
| ContinuousProcessingTimeTrigger | 根據(jù)間隔時(shí)間周期性觸發(fā)窗口或者Window的結(jié)束時(shí)間小于當(dāng)前ProcessTime,觸發(fā)窗口計(jì)算; |
| CountTrigger | 根據(jù)窗口的數(shù)據(jù)條數(shù)是否超過(guò)設(shè)定的閾值確定是否觸發(fā)窗口計(jì)算; |
| DeltaTrigger | 根據(jù)窗口的數(shù)據(jù)計(jì)算出來(lái)的Delta指標(biāo)是否超過(guò)指定的閾值,判斷是否觸發(fā)窗口計(jì)算 |
| PurgingTrigger | 可以將任意觸發(fā)器作為參數(shù)轉(zhuǎn)換為Purge類型觸發(fā)器,計(jì)算完成后數(shù)據(jù)將被清理。 |
關(guān)于抽象類Trigger的源碼解釋如下:
/**
* @param <T> 元素的數(shù)據(jù)類型
* @param <W> Window的類型
*/
@PublicEvolving
public abstract class Trigger<T, W extends Window> implements Serializable {
private static final long serialVersionUID = -4104633972991191369L;
/**
* 每個(gè)元素被分配到窗口時(shí)都會(huì)調(diào)用該方法,返回一個(gè)TriggerResult枚舉
* 該枚舉包含很多觸發(fā)的類型:CONTINUE、FIRE_AND_PURGE、FIRE、PURGE
*
* @param element 進(jìn)入窗口的元素
* @param timestamp 進(jìn)入窗口元素的時(shí)間戳
* @param window 窗口
* @param ctx 上下文對(duì)象,可以注冊(cè)計(jì)時(shí)器(timer)回調(diào)函數(shù)
* @return
* @throws Exception
*/
public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
/**
* 當(dāng)使用TriggerContext注冊(cè)的processing-time計(jì)時(shí)器被觸發(fā)時(shí),會(huì)調(diào)用該方法
*
* @param time 觸發(fā)計(jì)時(shí)器的時(shí)間戳
* @param window 計(jì)時(shí)器觸發(fā)的window
* @param ctx 上下文對(duì)象,可以注冊(cè)計(jì)時(shí)器(timer)回調(diào)函數(shù)
* @return
* @throws Exception
*/
public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
/**
* 當(dāng)使用TriggerContext注冊(cè)的event-time計(jì)時(shí)器被觸發(fā)時(shí),會(huì)調(diào)用該方法
*
* @param time 觸發(fā)計(jì)時(shí)器的時(shí)間戳
* @param window 計(jì)時(shí)器觸發(fā)的window
* @param ctx 上下文對(duì)象,可以注冊(cè)計(jì)時(shí)器(timer)回調(diào)函數(shù)
* @return
* @throws Exception
*/
public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
/**
* 如果觸發(fā)器支持合并觸發(fā)器狀態(tài),將返回true
*
* @return
*/
public boolean canMerge() {
return false;
}
/**
* 當(dāng)多個(gè)窗口被合并成一個(gè)窗口時(shí),會(huì)調(diào)用該方法
*
* @param window 合并之后的window
* @param ctx 上下文對(duì)象,可以注冊(cè)計(jì)時(shí)器回調(diào)函數(shù),也可以訪問(wèn)狀態(tài)
* @throws Exception
*/
public void onMerge(W window, OnMergeContext ctx) throws Exception {
throw new UnsupportedOperationException("This trigger does not support merging.");
}
/**
* 清除所有Trigger持有的窗口狀態(tài)
* 當(dāng)窗口被銷毀時(shí),調(diào)用該方法
*
* @param window
* @param ctx
* @throws Exception
*/
public abstract void clear(W window, TriggerContext ctx) throws Exception;
/**
* Context對(duì)象,傳給Trigger的方法參數(shù)中,用于注冊(cè)計(jì)時(shí)器回調(diào)函數(shù)和處理狀態(tài)
*/
public interface TriggerContext {
// 返回當(dāng)前處理時(shí)間
long getCurrentProcessingTime();
MetricGroup getMetricGroup();
// 返回當(dāng)前水位線時(shí)間戳
long getCurrentWatermark();
// 注冊(cè)一個(gè)processing-time的計(jì)時(shí)器
void registerProcessingTimeTimer(long time);
// 注冊(cè)一個(gè)EventTime計(jì)時(shí)器
void registerEventTimeTimer(long time);
// 刪除一個(gè)processing-time的計(jì)時(shí)器
void deleteProcessingTimeTimer(long time);
// 刪除一個(gè)EventTime計(jì)時(shí)器
void deleteEventTimeTimer(long time);
/**
* 提取狀態(tài)當(dāng)前Trigger的窗口和Key的狀態(tài)
*/
<S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor);
// 與getPartitionedState功能相同,該方法已被標(biāo)記過(guò)時(shí)
@Deprecated
<S extends Serializable> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState);
// 同getPartitionedState功能,該方法已被標(biāo)記過(guò)時(shí)
@Deprecated
<S extends Serializable> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState);
}
// TriggerContext的擴(kuò)展
public interface OnMergeContext extends TriggerContext {
// 合并每個(gè)window的狀態(tài),狀態(tài)必須支持合并
<S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor);
}
}
上面的源碼可以看出,每當(dāng)觸發(fā)器調(diào)用時(shí),會(huì)產(chǎn)生一個(gè)TriggerResult對(duì)象,該對(duì)象是一個(gè)枚舉類,其包括的屬性決定了作用在窗口上的操作是什么??偣灿兴姆N行為:CONTINUE、FIRE_AND_PURGE、FIRE、PURGE,關(guān)于每種類型的具體含義,我們先看一下TriggerResult源碼:
/**
* 觸發(fā)器方法的結(jié)果類型,決定在窗口上執(zhí)行什么操作,比如是否調(diào)用window function
* 或者是否需要銷毀窗口
* 注意:如果一個(gè)Trigger返回的是FIRE或者FIRE_AND_PURGE,但是窗口中沒(méi)有任何元素,則窗口函數(shù)不會(huì)被調(diào)用
*/
public enum TriggerResult {
// 什么都不做,當(dāng)前不觸發(fā)計(jì)算,繼續(xù)等待
CONTINUE(false, false),
// 執(zhí)行 window function,輸出結(jié)果,之后清除所有狀態(tài)
FIRE_AND_PURGE(true, true),
// 執(zhí)行 window function,輸出結(jié)果,窗口不會(huì)被清除,數(shù)據(jù)繼續(xù)保留
FIRE(true, false),
// 清除窗口內(nèi)部數(shù)據(jù),但不觸發(fā)計(jì)算
PURGE(false, true);
}
清除器(Evictors)
Evictors是一個(gè)可選的組件,其主要作用是對(duì)進(jìn)入WindowFuction前后的數(shù)據(jù)進(jìn)行清除處理。Flink內(nèi)置了三種Evictors:分別為CountEvictor、DeltaEvictor、TimeEvitor。如果用戶不指定Evictors,也不會(huì)有默認(rèn)值。
- CountEvictor:保持在窗口中具有固定數(shù)量的元素,將超過(guò)指定窗口元素?cái)?shù)量的數(shù)據(jù)在窗口計(jì)算前剔除;
- DeltaEvictor:通過(guò)定義DeltaFunction和指定threshold,并計(jì)算Windows中的元素與最新元素之間的Delta大小,如果超過(guò)threshold則將當(dāng)前數(shù)據(jù)元素剔除;
- TimeEvictor:通過(guò)指定時(shí)間間隔,將當(dāng)前窗口中最新元素的時(shí)間減去Interval,然后將小于該結(jié)果的數(shù)據(jù)全部剔除,其本質(zhì)是將具有最新時(shí)間的數(shù)據(jù)選擇出來(lái),刪除過(guò)時(shí)的數(shù)據(jù)。
Evictors繼承關(guān)系圖如下:

關(guān)于Evictors接口的源碼,如下:
/**
* 在WindowFunction計(jì)算之前或者之后進(jìn)行清除窗口元素
* @param <T> 元素的數(shù)據(jù)類型
* @param <W> 窗口類型
*/
@PublicEvolving
public interface Evictor<T, W extends Window> extends Serializable {
/**
* 選擇性剔除元素,在windowing function之前調(diào)用
* @param elements 窗口中的元素
* @param size 窗口中元素個(gè)數(shù)
* @param window 窗口
* @param evictorContext
*/
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
/**
* 選擇性剔除元素,在windowing function之后調(diào)用
* @param elements 窗口中的元素.
* @param size 窗口中元素個(gè)數(shù).
* @param window 窗口
* @param evictorContext
*/
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
// 傳遞給Evictor方法參數(shù)的值
interface EvictorContext {
// 返回當(dāng)前processing time
long getCurrentProcessingTime();
MetricGroup getMetricGroup();
// 返回當(dāng)前的水位線時(shí)間戳
long getCurrentWatermark();
}
}
小結(jié)
本文首先給出了窗口使用的快速入門(mén),介紹了窗口的基本概念、分類及簡(jiǎn)單使用。然后對(duì)Flink內(nèi)置的Window Assigner進(jìn)行了一一解讀,并給出了圖解與使用的代碼片段。接著對(duì)Flink的Window Function進(jìn)行介紹,包括窗口函數(shù)的分類及詳細(xì)使用案例。最后分析了Window生命周期所涉及的組件,并對(duì)每個(gè)組件的源碼進(jìn)行分析。