上一節(jié)主要是大致介紹了下 flink 的窗口組成, 以及如何去劃分窗口的. 那么這一篇文章主要是對剩下的內(nèi)容做一下總結(jié), 說一下如何對窗口內(nèi)的數(shù)據(jù)做處理.
Window Function
Window Assigner 的作用是劃分窗口的, 而 Window Function 就是對窗口內(nèi)的數(shù)據(jù)做處理的一個過程. Flink 提供了 4 種類型的 Window Function, 分別是 ReduceFunction / AggregateFunction / FoldFunction / ProcessWindowFunction. 另外, 這四類還根據(jù)計算原理的不同分為增量聚合函數(shù)和全量窗口函數(shù). 增量的計算性能比較高, 主要是基于中間狀態(tài)的計算結(jié)果, 窗口中只維護(hù)中間結(jié)果的狀態(tài)值.
1. ReduceFunction (增量)
對輸入的兩個相同類型的元素按照指定的計算方式進(jìn)行聚合, 通過實(shí)現(xiàn) ReduceFunction 接口就可以在reduce( ) 函數(shù)內(nèi)部進(jìn)行聚合操作了.
// 將Tuple2 按照 f1 進(jìn)行 keyBy, 之后將 f0字符合并起來
input.keyBy(x -> x.f1)
.timeWindow(Time.seconds(10), Time.seconds(1))
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> t1, Tuple2<String, Long> t2) throws Exception {
return new Tuple2<>(t1.f0 + t2.f0, t1.f1);
}
});
當(dāng)然也可以使用匿名函數(shù)的方式,寫起來會更加簡潔.上述代碼可以改為:
input.keyBy(x -> x.f1).timeWindow(Time.seconds(10), Time.seconds(1))
.reduce((t1,t2) -> new Tuple2<>(t1.f0 + t2.f0, t1.f1));
2. AggregateFunction (增量)
AggregateFunction 相對于ReduceFunction更加靈活,但是實(shí)現(xiàn)起來也更復(fù)雜, AggregateFunction有 4 個需要復(fù)寫的方法, 其中createAccumulator( ) 定義累加器, add( ) 定義數(shù)據(jù)的添加邏輯, getResult( ) 定義了根據(jù) accumulator 計算結(jié)果的邏輯, merge()方法定義合并 accumulator 的邏輯.
input.keyBy(x -> x.f1)
.timeWindow(Time.seconds(10), Time.seconds(1))
// 自定義一個AggregateFunciton, 將相同標(biāo)號 f1 的數(shù)據(jù)的 f0字符串字段合并在一起
// ("hello", 1L) + ("world", 1L) = ("hello world", 1L)
.aggregate(new MyAggregateFunction());
通過自定義的 MyAggregateFunction() 來實(shí)現(xiàn) AggregateFunction 接口
public static class MyAggregateFunction implements AggregateFunction<Tuple2<String, Long>, String, String>{
@Override
public String createAccumulator() {
// 初始化累加器
return "";
}
@Override
public String add(Tuple2<String, Long> t, String s) {
// 輸入數(shù)據(jù)與累加器的合并
return s + " " +t.f0;
}
@Override
public String getResult(String s) {
// 得到累加器的結(jié)果
return s.trim();
}
@Override
public String merge(String s, String acc1) {
// 合并累加器
return s + " " + acc1;
}
}
3. FoldFunction (增量)
FoldFunction定義了如何將窗口中的輸入元素與外部的元素合并的邏輯
input.keyBy(x -> x.f1)
.timeWindow(Time.seconds(10), Time.seconds(1)).fold("flink", (acc, t) ->t.f0 + acc);
FoldFunction在新版本已經(jīng)被標(biāo)記@Deprecated了, 建議使用AggregateFunction代替
4. ProcessWindowFunction (全量)
ProcessWindowFunction 相較于其他的 Window Function, 可以實(shí)現(xiàn)一些更復(fù)雜的計算, 比如基于整個窗口做某些指標(biāo)計算 或者需要操作窗口中的狀態(tài)數(shù)據(jù)和窗口元數(shù)據(jù). Flink 提供了 ProcessWindowFunction 這個抽象類, 繼承此類就可以實(shí)現(xiàn)ProcessWindowFunction, 其中, 必須要實(shí)現(xiàn) process( ) 方法, 這是處理窗口數(shù)據(jù)的主要方法.還在一下跟窗口數(shù)據(jù)相關(guān)的方法可以有選擇的實(shí)現(xiàn).
public static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple3<String, Long, Long>, String, Long, TimeWindow> {
@Override
public void process(Long s, Context context, Iterable<Tuple3<String, Long, Long>> elements, Collector<String> out) throws Exception {
// 統(tǒng)計每個窗口內(nèi)的所有數(shù)據(jù)的 f0字段加起來共有多少個單詞
// 也就做單個窗口的 wordcount
Long count = 0L;
for (Tuple3<String, Long, Long> element : elements) {
count += element.f0.split(" ").length;
}
out.collect("window: " + context.window() + " word count: " + count);
}
}
5. 增量與全量共同使用
增量聚合函數(shù)雖然性能好, 但是靈活性不如全量函數(shù), 例如對窗口狀態(tài)數(shù)據(jù)的操作以及對窗口中的元數(shù)據(jù)信息的獲取. 但是如果用 ProcessWindowFunction 去完成一些基礎(chǔ)的增量計算相對比較浪費(fèi)資源, 因此可以兩者結(jié)合的方式來實(shí)現(xiàn).
input.keyBy(x -> x.f1)
.timeWindow(Time.seconds(10), Time.seconds(1))
// 第一個Function為 ReduceFunction, 取窗口的最小值
.reduce((r1, r2) -> {
return r1.f0 < r2.f0 ? r1 : r2;
// 第二個Function為 ProcessWindowFunction, 獲取窗口的時間信息
}, new ProcessWindowFunction<Tuple2<Long, Long>, String, Long, TimeWindow>() {
@Override
public void process(Long aLong, Context context, Iterable<Tuple2<Long, Long>> elements, Collector<String> out) throws Exception {
out.collect("window: " + context.window());
}
}).print();
Flink 窗口中的其他組件
除了 Window Assigner 和 Window Function外,Flink的窗口中還有 Triger窗口觸發(fā)器, 其負(fù)責(zé)判斷何時將窗口中的數(shù)據(jù)取出做計算, flink已經(jīng)默認(rèn)為各種類型的窗口實(shí)現(xiàn)了 triger. 用戶也可以自己手動指定. Evictors 是數(shù)據(jù)剔除器, 目的是把窗口中的數(shù)據(jù)按照需求做一定的剔除. Flink也有 API 針對延遲數(shù)據(jù)做處理, 延遲的數(shù)據(jù)可以丟棄也可以通過sideOutputLateDate( ) 方法處理.
1. Triger 窗口觸發(fā)器
EventTimeTrigger: 通過對比 watermark 和窗口 EndTime 確定是否觸發(fā)窗口
ProcessTimeTrigger: 通過對比 ProcessTime 和窗口 EndTime 確定是否觸發(fā)窗口
ContinuousEventTimeTrigger: 根據(jù)間隔時間周期性觸發(fā)窗口
ContinuousEventTimeTrigger: 同上, 區(qū)別是使用ProcessTime
CountTrigger: 根據(jù)接入數(shù)量是否超過閾值
DeltaTrigger: 根據(jù)計算出來的 Delta 指標(biāo)是否超過指定的 Threshold
PurgingTrigger: 可以將任意觸發(fā)器作為參數(shù)轉(zhuǎn)換為Purge類型觸發(fā)器
2. Evictors觸發(fā)器
CountEvictor: 保持固定數(shù)量的數(shù)據(jù), 超過的剔除
DeltaEvictor: 通過定義 delta 和 threshold , 計算兩個數(shù)據(jù)之間的 delta 值, 超過則剔除
TimeEvictor: 指定時間間隔, 將當(dāng)前窗口中的最新元素的時間減去Interval, 然后將小于該結(jié)果的數(shù)據(jù)全部剔除