flink使用06-如何處理窗口內(nèi)的數(shù)據(jù)

上一節(jié)主要是大致介紹了下 flink 的窗口組成, 以及如何去劃分窗口的. 那么這一篇文章主要是對剩下的內(nèi)容做一下總結(jié), 說一下如何對窗口內(nèi)的數(shù)據(jù)做處理.

Window Function

Window Assigner 的作用是劃分窗口的, 而 Window Function 就是對窗口內(nèi)的數(shù)據(jù)做處理的一個過程. Flink 提供了 4 種類型的 Window Function, 分別是 ReduceFunction / AggregateFunction / FoldFunction / ProcessWindowFunction. 另外, 這四類還根據(jù)計算原理的不同分為增量聚合函數(shù)和全量窗口函數(shù). 增量的計算性能比較高, 主要是基于中間狀態(tài)的計算結(jié)果, 窗口中只維護(hù)中間結(jié)果的狀態(tài)值.

1. ReduceFunction (增量)

對輸入的兩個相同類型的元素按照指定的計算方式進(jìn)行聚合, 通過實(shí)現(xiàn) ReduceFunction 接口就可以在reduce( ) 函數(shù)內(nèi)部進(jìn)行聚合操作了.

// 將Tuple2 按照 f1 進(jìn)行 keyBy, 之后將 f0字符合并起來
input.keyBy(x -> x.f1)
    .timeWindow(Time.seconds(10), Time.seconds(1))
    .reduce(new ReduceFunction<Tuple2<String, Long>>() {
        @Override
        public Tuple2<String, Long> reduce(Tuple2<String, Long> t1, Tuple2<String, Long> t2) throws Exception {
            return new Tuple2<>(t1.f0 + t2.f0, t1.f1);
            }
        });

當(dāng)然也可以使用匿名函數(shù)的方式,寫起來會更加簡潔.上述代碼可以改為:

input.keyBy(x -> x.f1).timeWindow(Time.seconds(10), Time.seconds(1))
    .reduce((t1,t2) -> new Tuple2<>(t1.f0 + t2.f0, t1.f1));

2. AggregateFunction (增量)

AggregateFunction 相對于ReduceFunction更加靈活,但是實(shí)現(xiàn)起來也更復(fù)雜, AggregateFunction有 4 個需要復(fù)寫的方法, 其中createAccumulator( ) 定義累加器, add( ) 定義數(shù)據(jù)的添加邏輯, getResult( ) 定義了根據(jù) accumulator 計算結(jié)果的邏輯, merge()方法定義合并 accumulator 的邏輯.

input.keyBy(x -> x.f1)
    .timeWindow(Time.seconds(10), Time.seconds(1))
    // 自定義一個AggregateFunciton, 將相同標(biāo)號 f1 的數(shù)據(jù)的 f0字符串字段合并在一起
    // ("hello", 1L) + ("world", 1L) = ("hello world", 1L)
    .aggregate(new MyAggregateFunction());

通過自定義的 MyAggregateFunction() 來實(shí)現(xiàn) AggregateFunction 接口

public static class MyAggregateFunction implements AggregateFunction<Tuple2<String, Long>, String, String>{
        @Override
        public String createAccumulator() {
            // 初始化累加器
            return "";
        }
        @Override
        public String add(Tuple2<String, Long> t, String s) {
            // 輸入數(shù)據(jù)與累加器的合并
            return s + " " +t.f0;
        }
        @Override
        public String getResult(String s) {
            // 得到累加器的結(jié)果
            return s.trim();
        }
        @Override
        public String merge(String s, String acc1) {
            // 合并累加器
            return s + " " + acc1;
        }
    }

3. FoldFunction (增量)

FoldFunction定義了如何將窗口中的輸入元素與外部的元素合并的邏輯

input.keyBy(x -> x.f1)
.timeWindow(Time.seconds(10), Time.seconds(1)).fold("flink", (acc, t) ->t.f0 + acc);

FoldFunction在新版本已經(jīng)被標(biāo)記@Deprecated了, 建議使用AggregateFunction代替

4. ProcessWindowFunction (全量)

ProcessWindowFunction 相較于其他的 Window Function, 可以實(shí)現(xiàn)一些更復(fù)雜的計算, 比如基于整個窗口做某些指標(biāo)計算 或者需要操作窗口中的狀態(tài)數(shù)據(jù)和窗口元數(shù)據(jù). Flink 提供了 ProcessWindowFunction 這個抽象類, 繼承此類就可以實(shí)現(xiàn)ProcessWindowFunction, 其中, 必須要實(shí)現(xiàn) process( ) 方法, 這是處理窗口數(shù)據(jù)的主要方法.還在一下跟窗口數(shù)據(jù)相關(guān)的方法可以有選擇的實(shí)現(xiàn).

public static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple3<String, Long, Long>, String, Long, TimeWindow> {
    @Override
    public void process(Long s, Context context, Iterable<Tuple3<String, Long, Long>>       elements, Collector<String> out) throws Exception {
    // 統(tǒng)計每個窗口內(nèi)的所有數(shù)據(jù)的 f0字段加起來共有多少個單詞
    // 也就做單個窗口的 wordcount
    Long count = 0L;
    for (Tuple3<String, Long, Long> element : elements) {
    count += element.f0.split(" ").length;
    }
    out.collect("window: " + context.window() + " word count: " + count);
    }
}

5. 增量與全量共同使用

增量聚合函數(shù)雖然性能好, 但是靈活性不如全量函數(shù), 例如對窗口狀態(tài)數(shù)據(jù)的操作以及對窗口中的元數(shù)據(jù)信息的獲取. 但是如果用 ProcessWindowFunction 去完成一些基礎(chǔ)的增量計算相對比較浪費(fèi)資源, 因此可以兩者結(jié)合的方式來實(shí)現(xiàn).

input.keyBy(x -> x.f1)
    .timeWindow(Time.seconds(10), Time.seconds(1))
    // 第一個Function為 ReduceFunction, 取窗口的最小值
    .reduce((r1, r2) -> {
    return r1.f0 < r2.f0 ? r1 : r2;
    // 第二個Function為 ProcessWindowFunction, 獲取窗口的時間信息
    }, new ProcessWindowFunction<Tuple2<Long, Long>, String, Long, TimeWindow>() {
    @Override
    public void process(Long aLong, Context context, Iterable<Tuple2<Long, Long>>       elements, Collector<String> out) throws Exception {
    out.collect("window: " + context.window()); 
        }
    }).print();

Flink 窗口中的其他組件

除了 Window Assigner 和 Window Function外,Flink的窗口中還有 Triger窗口觸發(fā)器, 其負(fù)責(zé)判斷何時將窗口中的數(shù)據(jù)取出做計算, flink已經(jīng)默認(rèn)為各種類型的窗口實(shí)現(xiàn)了 triger. 用戶也可以自己手動指定. Evictors 是數(shù)據(jù)剔除器, 目的是把窗口中的數(shù)據(jù)按照需求做一定的剔除. Flink也有 API 針對延遲數(shù)據(jù)做處理, 延遲的數(shù)據(jù)可以丟棄也可以通過sideOutputLateDate( ) 方法處理.

1. Triger 窗口觸發(fā)器

EventTimeTrigger: 通過對比 watermark 和窗口 EndTime 確定是否觸發(fā)窗口

ProcessTimeTrigger: 通過對比 ProcessTime 和窗口 EndTime 確定是否觸發(fā)窗口

ContinuousEventTimeTrigger: 根據(jù)間隔時間周期性觸發(fā)窗口

ContinuousEventTimeTrigger: 同上, 區(qū)別是使用ProcessTime

CountTrigger: 根據(jù)接入數(shù)量是否超過閾值

DeltaTrigger: 根據(jù)計算出來的 Delta 指標(biāo)是否超過指定的 Threshold

PurgingTrigger: 可以將任意觸發(fā)器作為參數(shù)轉(zhuǎn)換為Purge類型觸發(fā)器

2. Evictors觸發(fā)器

CountEvictor: 保持固定數(shù)量的數(shù)據(jù), 超過的剔除

DeltaEvictor: 通過定義 delta 和 threshold , 計算兩個數(shù)據(jù)之間的 delta 值, 超過則剔除

TimeEvictor: 指定時間間隔, 將當(dāng)前窗口中的最新元素的時間減去Interval, 然后將小于該結(jié)果的數(shù)據(jù)全部剔除

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容