透過(guò)窗口看無(wú)限數(shù)據(jù)流——Flink的Window全面解析

窗口是流式計(jì)算中非常常用的算子之一,通過(guò)窗口可以將無(wú)限流切分成有限流,然后在每個(gè)窗口之上使用計(jì)算函數(shù),可以實(shí)現(xiàn)非常靈活的操作。Flink提供了豐富的窗口操作,除此之外,用戶還可以根據(jù)自己的處理場(chǎng)景自定義窗口。通過(guò)本文,你可以了解到:

  • 窗口的基本概念和簡(jiǎn)單使用
  • 內(nèi)置Window Assigners的分類、源碼及使用
  • Window Function的分類及使用
  • 窗口的組成部分及生命周期源碼解讀
  • 完整的窗口使用Demo案例

Quick Start

是什么

Window(窗口)是處理無(wú)界流的核心算子,Window可以將數(shù)據(jù)流分為固定大小的"桶(buckets)"(即通過(guò)按照固定時(shí)間或長(zhǎng)度將數(shù)據(jù)流切分成不同的窗口),在每一個(gè)窗口上,用戶可以使用一些計(jì)算函數(shù)對(duì)窗口內(nèi)的數(shù)據(jù)進(jìn)行處理,從而得到一定時(shí)間范圍內(nèi)的統(tǒng)計(jì)結(jié)果。比如統(tǒng)計(jì)每隔5分鐘輸出最近一小時(shí)內(nèi)點(diǎn)擊量最多的前 N 個(gè)商品,這樣就可以使用一個(gè)小時(shí)的時(shí)間窗口將數(shù)據(jù)限定在固定時(shí)間范圍內(nèi),然后可以對(duì)該范圍內(nèi)的有界數(shù)據(jù)執(zhí)行聚合處理。

根據(jù)作用的數(shù)據(jù)流(DataStream、KeyedStream),Window可以分為兩種:Keyed WindowsNon-Keyed Windows。其中Keyed Windows是在KeyedStream上使用window(…)操作,產(chǎn)生一個(gè)WindowedStream。Non-Keyed Windows是在DataStream上使用windowAll(…)操作,產(chǎn)生一個(gè)AllWindowedStream。具體的轉(zhuǎn)換關(guān)系如下圖所示。注意:一般不推薦使用AllWindowedStream,因?yàn)樵谄胀魃线M(jìn)行窗口操作,會(huì)將所有分區(qū)的流都匯集到單個(gè)的Task中,即并行度為1,從而會(huì)影響性能。

如何用

上面我們介紹了什么是窗口,那么該如何使用窗口呢?具體如下面的代碼片段:

Keyed Windows

stream
       .keyBy(...)               // keyedStream上使用window
       .window(...)              // 必選: 指定窗口分配器( window assigner)
      [.trigger(...)]            // 可選: 指定觸發(fā)器(trigger),如果不指定,則使用默認(rèn)值
      [.evictor(...)]            // 可選: 指定清除器(evictor),如果不指定,則沒(méi)有
      [.allowedLateness(...)]    // 可選: 指定是否延遲處理數(shù)據(jù),如果不指定,默認(rèn)使用0 
      [.sideOutputLateData(...)] // 可選: 配置side output,如果不指定,則沒(méi)有
       .reduce/aggregate/fold/apply() // 必選: 指定窗口計(jì)算函數(shù)
      [.getSideOutput(...)]      // 可選: 從side output中獲取數(shù)據(jù)

Non-Keyed Windows

stream
       .windowAll(...)           // 必選: 指定窗口分配器( window assigner)
      [.trigger(...)]            // 可選: 指定觸發(fā)器(trigger),如果不指定,則使用默認(rèn)值
      [.evictor(...)]            // 可選: 指定清除器(evictor),如果不指定,則沒(méi)有
      [.allowedLateness(...)]    // 可選: 指定是否延遲處理數(shù)據(jù),如果不指定,默認(rèn)使用0
      [.sideOutputLateData(...)] // 可選: 配置side output,如果不指定,則沒(méi)有
       .reduce/aggregate/fold/apply() // 必選: 指定窗口計(jì)算函數(shù)
      [.getSideOutput(...)]      // 可選: 從side output中獲取數(shù)據(jù)

簡(jiǎn)寫(xiě)window操作

上面的代碼片段中,要在keyedStream上使用window(…)或者在DataStream上使用windowAll(…),需要傳入一個(gè)window assigner的參數(shù),關(guān)于window assigner下文會(huì)進(jìn)行詳細(xì)解釋。如下面代碼片段:

// -------------------------------------------
//  Keyed Windows
// -------------------------------------------
stream
       .keyBy(id)               
       .window(TumblingEventTimeWindows.of(Time.seconds(5))) // 5S的滾動(dòng)窗口
       .reduce(MyReduceFunction)
// -------------------------------------------
//  Non-Keyed Windows
// -------------------------------------------
stream               
       .windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) // 5S的滾動(dòng)窗口
       .reduce(MyReduceFunction)
    

上面的代碼可以簡(jiǎn)寫(xiě)為:

// -------------------------------------------
//  Keyed Windows
// -------------------------------------------
stream
       .keyBy(id)               
       .timeWindow(Time.seconds(5)) // 5S的滾動(dòng)窗口
       .reduce(MyReduceFunction)
// -------------------------------------------
//  Non-Keyed Windows
// -------------------------------------------
stream               
       .timeWindowAll(Time.seconds(5)) // 5S的滾動(dòng)窗口
       .reduce(MyReduceFunction)
    

關(guān)于上面的簡(jiǎn)寫(xiě),以KeyedStream為例,對(duì)于看一下具體的KeyedStream源碼片段,可以看出底層調(diào)用的還是非簡(jiǎn)寫(xiě)時(shí)的代碼。關(guān)于timeWindowAll()的代碼也是一樣的,可以參考DataStream源碼,這里不再贅述。

// 會(huì)根據(jù)用戶的使用的時(shí)間類型,調(diào)用不同的內(nèi)置window Assigner
public WindowedStream<T, KEY, TimeWindow> timeWindow(Time size) {
        if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
            return window(TumblingProcessingTimeWindows.of(size));
        } else {
            return window(TumblingEventTimeWindows.of(size));
        }
    }

Window Assigners

分類

WindowAssigner負(fù)責(zé)將輸入的數(shù)據(jù)分配到一個(gè)或多個(gè)窗口,F(xiàn)link內(nèi)置了許多WindowAssigner,這些WindowAssigner可以滿足大部分的使用場(chǎng)景。比如tumbling windows, sliding windows, session windows , global windows。如果這些內(nèi)置的WindowAssigner不能滿足你的需求,可以通過(guò)繼承WindowAssigner類實(shí)現(xiàn)自定義的WindowAssigner。

上面的WindowAssigner是基于時(shí)間的(time-based windows),除此之外,F(xiàn)link還提供了基于數(shù)量的窗口(count-based windows),即根據(jù)窗口的元素?cái)?shù)量定義窗口大小,這種情況下,如果數(shù)據(jù)存在亂序,將導(dǎo)致窗口計(jì)算結(jié)果不確定。本文重點(diǎn)介紹基于時(shí)間的窗口使用,由于篇幅有限,關(guān)于基于數(shù)量的窗口將不做討論。

使用介紹

下面將會(huì)對(duì)Flink內(nèi)置的四種基于時(shí)間的windowassigner,進(jìn)行一一分析。

Tumbling Windows

  • 圖解

Tumbling Windows(滾動(dòng)窗口)是將數(shù)據(jù)分配到確定的窗口中,根據(jù)固定時(shí)間或大小進(jìn)行切分,每個(gè)窗口有固定的大小且窗口之間不存在重疊(如下圖所示)。這種比較簡(jiǎn)單,適用于按照周期統(tǒng)計(jì)某一指標(biāo)的場(chǎng)景。

關(guān)于時(shí)間的選擇,可以使用Event Time或者Processing Time,分別對(duì)應(yīng)的window assigner為:TumblingEventTimeWindows、TumblingProcessingTimeWindows。用戶可以使用window assigner的of(size)方法指定時(shí)間間隔,其中時(shí)間單位可以是Time.milliseconds(x)、Time.seconds(x)或Time.minutes(x)等。

  • 使用
// 使用EventTime
datastream
           .keyBy(id)
           .window(TumblingEventTimeWindows.of(Time.seconds(10)))
           .process(new MyProcessFunction())
// 使用processing-time
datastream
           .keyBy(id)
           .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
           .process(new MyProcessFunction())

Sliding Windows

  • 圖解

Sliding Windows(滑動(dòng)窗口)在滾動(dòng)窗口之上加了一個(gè)滑動(dòng)窗口的時(shí)間,這種類型的窗口是會(huì)存在窗口重疊的(如下圖所示)。滾動(dòng)窗口是按照窗口固定的時(shí)間大小向前滾動(dòng),而滑動(dòng)窗口是根據(jù)設(shè)定的滑動(dòng)時(shí)間向前滑動(dòng)。窗口之間的重疊部分的大小取決于窗口大小與滑動(dòng)的時(shí)間大小,當(dāng)滑動(dòng)時(shí)間小于窗口時(shí)間大小時(shí)便會(huì)出現(xiàn)重疊。當(dāng)滑動(dòng)時(shí)間大于窗口時(shí)間大小時(shí),會(huì)出現(xiàn)窗口不連續(xù)的情況,導(dǎo)致數(shù)據(jù)可能不屬于任何一個(gè)窗口。當(dāng)兩者相等時(shí),其功能就和滾動(dòng)窗口相同了?;瑒?dòng)窗口的使用場(chǎng)景是:用戶根據(jù)設(shè)定的統(tǒng)計(jì)周期來(lái)計(jì)算指定窗口時(shí)間大小的指標(biāo),比如每隔5分鐘輸出最近一小時(shí)內(nèi)點(diǎn)擊量最多的前 N 個(gè)商品。

關(guān)于時(shí)間的選擇,可以使用Event Time或者Processing Time,分別對(duì)應(yīng)的window assigner為:SlidingEventTimeWindows、SlidingProcessingTimeWindows。用戶可以使用window assigner的of(size)方法指定時(shí)間間隔,其中時(shí)間單位可以是Time.milliseconds(x)、Time.seconds(x)或Time.minutes(x)等。

  • 使用
// 使用EventTime
datastream
           .keyBy(id)
           .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
           .process(new MyProcessFunction())
// 使用processing-time
datastream
           .keyBy(id)
           .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
           .process(new MyProcessFunction())

Session Windows

  • 圖解

Session Windows(會(huì)話窗口)主要是將某段時(shí)間內(nèi)活躍度較高的數(shù)據(jù)聚合成一個(gè)窗口進(jìn)行計(jì)算,窗口的觸發(fā)的條件是Session Gap,是指在規(guī)定的時(shí)間內(nèi)如果沒(méi)有數(shù)據(jù)活躍接入,則認(rèn)為窗口結(jié)束,然后觸發(fā)窗口計(jì)算結(jié)果。需要注意的是如果數(shù)據(jù)一直不間斷地進(jìn)入窗口,也會(huì)導(dǎo)致窗口始終不觸發(fā)的情況。與滑動(dòng)窗口、滾動(dòng)窗口不同的是,Session Windows不需要有固定窗口大小(window size)和滑動(dòng)時(shí)間(slide time),只需要定義session gap,來(lái)規(guī)定不活躍數(shù)據(jù)的時(shí)間上限即可。如下圖所示。Session Windows窗口類型比較適合非連續(xù)型數(shù)據(jù)處理或周期性產(chǎn)生數(shù)據(jù)的場(chǎng)景,根據(jù)用戶在線上某段時(shí)間內(nèi)的活躍度對(duì)用戶行為數(shù)據(jù)進(jìn)行統(tǒng)計(jì)。

關(guān)于時(shí)間的選擇,可以使用Event Time或者Processing Time,分別對(duì)應(yīng)的window assigner為:EventTimeSessionWindows和ProcessTimeSessionWindows。用戶可以使用window assigner的withGap()方法指定時(shí)間間隔,其中時(shí)間單位可以是Time.milliseconds(x)、Time.seconds(x)或Time.minutes(x)等。

  • 使用
// 使用EventTime
datastream
           .keyBy(id)
           .window((EventTimeSessionWindows.withGap(Time.minutes(15)))
           .process(new MyProcessFunction())
// 使用processing-time
datastream
           .keyBy(id)
           .window(ProcessingTimeSessionWindows.withGap(Time.minutes(15)))
           .process(new MyProcessFunction())

注意:由于session window的開(kāi)始時(shí)間與結(jié)束時(shí)間取決于接收的數(shù)據(jù)。windowassigner不會(huì)立即分配所有的元素到正確的窗口,SessionWindow會(huì)為每個(gè)接收的元素初始化一個(gè)以該元素的時(shí)間戳為開(kāi)始時(shí)間的窗口,使用session gap作為窗口大小,然后再合并重疊部分的窗口。所以, session window 操作需要指定用于合并的 TriggerWindow Function,比如ReduceFunction, AggregateFunction, or ProcessWindowFunction

Global Windows

  • 圖解

Global Windows(全局窗口)將所有相同的key的數(shù)據(jù)分配到單個(gè)窗口中計(jì)算結(jié)果,窗口沒(méi)有起始和結(jié)束時(shí)間,窗口需要借助于Triger來(lái)觸發(fā)計(jì)算,如果不對(duì)Global Windows指定Triger,窗口是不會(huì)觸發(fā)計(jì)算的。因此,使用Global Windows需要非常慎重,用戶需要非常明確自己在整個(gè)窗口中統(tǒng)計(jì)出的結(jié)果是什么,并指定對(duì)應(yīng)的觸發(fā)器,同時(shí)還需要有指定相應(yīng)的數(shù)據(jù)清理機(jī)制,否則數(shù)據(jù)將一直留在內(nèi)存中。

  • 使用
datastream
    .keyBy(id)
    .window(GlobalWindows.create())
    .process(new MyProcessFunction())

Window Functions

分類

Flink提供了兩大類窗口函數(shù),分別為增量聚合函數(shù)和全量窗口函數(shù)。其中增量聚合函數(shù)的性能要比全量窗口函數(shù)高,因?yàn)樵隽烤酆洗翱谑腔谥虚g結(jié)果狀態(tài)計(jì)算最終結(jié)果的,即窗口中只維護(hù)一個(gè)中間結(jié)果狀態(tài),不要緩存所有的窗口數(shù)據(jù)。相反,對(duì)于全量窗口函數(shù)而言,需要對(duì)所以進(jìn)入該窗口的數(shù)據(jù)進(jìn)行緩存,等到窗口觸發(fā)時(shí)才會(huì)遍歷窗口內(nèi)所有數(shù)據(jù),進(jìn)行結(jié)果計(jì)算。如果窗口數(shù)據(jù)量比較大或者窗口時(shí)間較長(zhǎng),就會(huì)耗費(fèi)很多的資源緩存數(shù)據(jù),從而導(dǎo)致性能下降。

  • 增量聚合函數(shù)

    包括:ReduceFunction、AggregateFunction和FoldFunction

  • 全量窗口函數(shù)

    包括:ProcessWindowFunction

使用介紹

ReduceFunction

輸入兩個(gè)相同類型的數(shù)據(jù)元素按照指定的計(jì)算方法進(jìn)行聚合,然后輸出類型相同的一個(gè)結(jié)果元素。要求輸入元素的數(shù)據(jù)類型與輸出元素的數(shù)據(jù)類型必須一致。實(shí)現(xiàn)的效果是使用上一次的結(jié)果值與當(dāng)前值進(jìn)行聚合。具體使用案例如下:

public class ReduceFunctionExample {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 模擬數(shù)據(jù)源
        SingleOutputStreamOperator<Tuple3<Long, Integer, Long>> input = env.fromElements(
                Tuple3.of(1L, 10, 1588491228L),
                Tuple3.of(1L, 15, 1588491229L),
                Tuple3.of(1L, 20, 1588491238L),
                Tuple3.of(1L, 25, 1588491248L),
                Tuple3.of(2L, 10, 1588491258L),
                Tuple3.of(2L, 30, 1588491268L),
                Tuple3.of(2L, 20, 1588491278L)).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long, Integer, Long>>() {
            @Override
            public long extractAscendingTimestamp(Tuple3<Long, Integer, Long> element) {
                return element.f2 * 1000;
            }
        });

        input
                .map(new MapFunction<Tuple3<Long, Integer, Long>, Tuple2<Long, Integer>>() {
                    @Override
                    public Tuple2<Long, Integer> map(Tuple3<Long, Integer, Long> value) {
                        return Tuple2.of(value.f0, value.f1);
                    }
                })
                .keyBy(0)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .reduce(new ReduceFunction<Tuple2<Long, Integer>>() {
                    @Override
                    public Tuple2<Long, Integer> reduce(Tuple2<Long, Integer> value1, Tuple2<Long, Integer> value2) throws Exception {
                        // 根據(jù)第一個(gè)元素分組,求第二個(gè)元素的累計(jì)和
                        return Tuple2.of(value1.f0, value1.f1 + value2.f1);
                    }
                }).print();

        env.execute("ReduceFunctionExample");
    }
}

AggregateFunction

與ReduceFunction相似,AggregateFunction也是基于中間狀態(tài)計(jì)算結(jié)果的增量計(jì)算函數(shù),相比ReduceFunction,AggregateFunction在窗口計(jì)算上更加靈活,但是實(shí)現(xiàn)稍微復(fù)雜,需要實(shí)現(xiàn)AggregateFunction接口,重寫(xiě)四個(gè)方法。其最大的優(yōu)勢(shì)就是中間結(jié)果的數(shù)據(jù)類型和最終的結(jié)果類型不依賴于輸入的數(shù)據(jù)類型。關(guān)于AggregateFunction的源碼,如下所示:

/** 
*  @param <IN>  輸入元素的數(shù)據(jù)類型
 * @param <ACC> 中間聚合結(jié)果的數(shù)據(jù)類型
 * @param <OUT> 最終聚合結(jié)果的數(shù)據(jù)類型
 */
@PublicEvolving
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable {

    /**
     * 創(chuàng)建一個(gè)新的累加器
     */
    ACC createAccumulator();

    /**
     * 將新的數(shù)據(jù)與累加器進(jìn)行聚合,返回一個(gè)新的累加器
     */
    ACC add(IN value, ACC accumulator);

    /**
     從累加器中計(jì)算最終結(jié)果并返回
     */
    OUT getResult(ACC accumulator);

    /**
     * 合并兩個(gè)累加器并返回結(jié)果
     */
    ACC merge(ACC a, ACC b);
}

具體使用代碼案例如下:

public class AggregateFunctionExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 模擬數(shù)據(jù)源
        SingleOutputStreamOperator<Tuple3<Long, Integer, Long>> input = env.fromElements(
                Tuple3.of(1L, 10, 1588491228L),
                Tuple3.of(1L, 15, 1588491229L),
                Tuple3.of(1L, 20, 1588491238L),
                Tuple3.of(1L, 25, 1588491248L),
                Tuple3.of(2L, 10, 1588491258L),
                Tuple3.of(2L, 30, 1588491268L),
                Tuple3.of(2L, 20, 1588491278L)).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long, Integer, Long>>() {
            @Override
            public long extractAscendingTimestamp(Tuple3<Long, Integer, Long> element) {
                return element.f2 * 1000;
            }
        });

        input.keyBy(0)
             .window(TumblingEventTimeWindows.of(Time.seconds(10)))
             .aggregate(new MyAggregateFunction()).print();
        env.execute("AggregateFunctionExample");

    }

    private static class MyAggregateFunction implements AggregateFunction<Tuple3<Long, Integer, Long>,Tuple2<Long,Integer>,Tuple2<Long,Integer>> {
        /**
         * 創(chuàng)建一個(gè)累加器,初始化值
         * @return
         */
        @Override
        public Tuple2<Long, Integer> createAccumulator() {
            return Tuple2.of(0L,0);
        }

        /**
         *
         * @param value 輸入的元素值
         * @param accumulator 中間結(jié)果值
         * @return
         */
        @Override
        public Tuple2<Long, Integer> add(Tuple3<Long, Integer, Long> value, Tuple2<Long, Integer> accumulator) {
            return Tuple2.of(value.f0,value.f1 + accumulator.f1);
        }

        /**
         * 獲取計(jì)算結(jié)果值
         * @param accumulator
         * @return
         */
        @Override
        public Tuple2<Long, Integer> getResult(Tuple2<Long, Integer> accumulator) {
            return Tuple2.of(accumulator.f0,accumulator.f1);
        }

        /**
         * 合并中間結(jié)果值
         * @param a 中間結(jié)果值a
         * @param b 中間結(jié)果值b
         * @return
         */
        @Override
        public Tuple2<Long, Integer> merge(Tuple2<Long, Integer> a, Tuple2<Long, Integer> b) {
            return Tuple2.of(a.f0,a.f1 + b.f1);
        }
    }
}

FoldFunction

FoldFunction定義了如何將窗口中的輸入元素與外部的元素合并的邏輯,該接口已標(biāo)記過(guò)時(shí),建議用戶使用AggregateFunction來(lái)替換使用FoldFunction。

public class FoldFunctionExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 模擬數(shù)據(jù)源
        SingleOutputStreamOperator<Tuple3<Long, Integer, Long>> input = env.fromElements(
                Tuple3.of(1L, 10, 1588491228L),
                Tuple3.of(1L, 15, 1588491229L),
                Tuple3.of(1L, 20, 1588491238L),
                Tuple3.of(1L, 25, 1588491248L),
                Tuple3.of(2L, 10, 1588491258L),
                Tuple3.of(2L, 30, 1588491268L),
                Tuple3.of(2L, 20, 1588491278L)).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long, Integer, Long>>() {
            @Override
            public long extractAscendingTimestamp(Tuple3<Long, Integer, Long> element) {
                return element.f2 * 1000;
            }
        });

        input.keyBy(0)
             .window(TumblingEventTimeWindows.of(Time.seconds(10)))
             .fold("用戶",new FoldFunction<Tuple3<Long, Integer, Long>,String>() {
                 @Override
                 public String fold(String accumulator, Tuple3<Long, Integer, Long> value) throws Exception {
                    // 為第一個(gè)元素的值拼接一個(gè)"用戶"字符串,進(jìn)行輸出
                     return accumulator + value.f0 ;
                 }
             }).print();

        env.execute("FoldFunctionExample");

    }
}

ProcessWindowFunction

前面提到的ReduceFunction和AggregateFunction都是基于中間狀態(tài)實(shí)現(xiàn)增量計(jì)算的窗口函數(shù)。有些時(shí)候需要使用整個(gè)窗口的所有數(shù)據(jù)進(jìn)行計(jì)算,比如求中位數(shù)和眾數(shù)。另外,ProcessWindowFunction的Context對(duì)象可以訪問(wèn)窗口的一些元數(shù)據(jù)信息,比如窗口結(jié)束時(shí)間、水位線等。ProcessWindowsFunction能夠更加靈活地支持基于窗口全部數(shù)據(jù)元素的結(jié)果計(jì)算。

在系統(tǒng)內(nèi)部,由ProcessWindowFunction處理的窗口會(huì)將所有已分配的數(shù)據(jù)存儲(chǔ)到ListState中,通過(guò)將數(shù)據(jù)收集起來(lái)且提供對(duì)于窗口的元數(shù)據(jù)及其他一些特性的訪問(wèn)和使用,應(yīng)用場(chǎng)景比ReduceFunction和AggregateFunction更加廣泛。關(guān)于ProcessWindowFunction抽象類的源碼,如下所示:

/**
 * @param <IN> 輸入的數(shù)據(jù)類型
 * @param <OUT> 輸出的數(shù)據(jù)類型
 * @param <KEY> key的數(shù)據(jù)類型
 * @param <W> window的類型
 */
@PublicEvolving
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> extends AbstractRichFunction {
    private static final long serialVersionUID = 1L;
    /**
     * 計(jì)算窗口數(shù)據(jù),輸出0個(gè)或多個(gè)元素
     * @param key 窗口的key
     * @param context 窗口的上下文
     * @param elements 窗口內(nèi)的所有元素
     * @param out 輸出元素的collector對(duì)象
     * @throws Exception
     */
    public abstract void process(KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception;
    /**
     * 當(dāng)窗口被銷毀時(shí),刪除狀態(tài)
     * @param context
     * @throws Exception
     */
    public void clear(Context context) throws Exception {}
    //context可以訪問(wèn)窗口的元數(shù)據(jù)信息.
    public abstract class Context implements java.io.Serializable {
    //返回當(dāng)前被計(jì)算的窗口
        public abstract W window();
    // 返回當(dāng)前processing time. 
        public abstract long currentProcessingTime();
    // 返回當(dāng)前event-time 水位線.
        public abstract long currentWatermark();
    //每個(gè)key和每個(gè)window的狀態(tài)訪問(wèn)器
        public abstract KeyedStateStore windowState();
    // 每個(gè)key的global state的狀態(tài)訪問(wèn)器.
        public abstract KeyedStateStore globalState();
        /**
         * 向side output輸出數(shù)據(jù)
         * @param outputTag the {@code OutputTag}  side output 輸出的標(biāo)識(shí).
         * @param value 輸出的數(shù)據(jù).
         */
        public abstract <X> void output(OutputTag<X> outputTag, X value);
    }
}

具體的使用案例如下:

public class ProcessWindowFunctionExample {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 模擬數(shù)據(jù)源
        SingleOutputStreamOperator<Tuple3<Long, Integer, Long>> input = env.fromElements(
                Tuple3.of(1L, 10, 1588491228L),
                Tuple3.of(1L, 15, 1588491229L),
                Tuple3.of(1L, 20, 1588491238L),
                Tuple3.of(1L, 25, 1588491248L),
                Tuple3.of(2L, 10, 1588491258L),
                Tuple3.of(2L, 30, 1588491268L),
                Tuple3.of(2L, 20, 1588491278L)).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long, Integer, Long>>() {
            @Override
            public long extractAscendingTimestamp(Tuple3<Long, Integer, Long> element) {
                return element.f2 * 1000;
            }
        });

        input.keyBy(t -> t.f0)
             .window(TumblingEventTimeWindows.of(Time.seconds(10)))
             .process(new MyProcessWindowFunction())
             .print();
    }

    private static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple3<Long, Integer, Long>,Tuple3<Long,String,Integer>,Long,TimeWindow> {
        @Override
        public void process(
                Long aLong,
                Context context,
                Iterable<Tuple3<Long, Integer, Long>> elements,
                Collector<Tuple3<Long, String, Integer>> out) throws Exception {
            int count = 0;
            for (Tuple3<Long, Integer, Long> in: elements) {
                count++;
            }
            // 統(tǒng)計(jì)每個(gè)窗口數(shù)據(jù)個(gè)數(shù),加上窗口輸出
            out.collect(Tuple3.of(aLong,"" + context.window(),count));
        }
    }
}

增量聚合函數(shù)和ProcessWindowFunction整合

ProcessWindowFunction提供了很強(qiáng)大的功能,但是唯一的缺點(diǎn)就是需要更大的狀態(tài)存儲(chǔ)數(shù)據(jù)。在很多時(shí)候,增量聚合的使用是非常頻繁的,那么如何實(shí)現(xiàn)既支持增量聚合又支持訪問(wèn)窗口元數(shù)據(jù)的操作呢?可以將ReduceFunction和AggregateFunction與ProcessWindowFunction整合在一起使用。通過(guò)這種組合方式,分配給窗口的元素會(huì)立即被執(zhí)行計(jì)算,當(dāng)窗口觸發(fā)時(shí),會(huì)把聚合的結(jié)果傳給ProcessWindowFunction,這樣ProcessWindowFunction的process方法的Iterable參數(shù)被就只有一個(gè)值,即增量聚合的結(jié)果。

  • ReduceFunction與ProcessWindowFunction組合
public class ReduceProcessWindowFunction {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 模擬數(shù)據(jù)源
        SingleOutputStreamOperator<Tuple3<Long, Integer, Long>> input = env.fromElements(
                Tuple3.of(1L, 10, 1588491228L),
                Tuple3.of(1L, 15, 1588491229L),
                Tuple3.of(1L, 20, 1588491238L),
                Tuple3.of(1L, 25, 1588491248L),
                Tuple3.of(2L, 10, 1588491258L),
                Tuple3.of(2L, 30, 1588491268L),
                Tuple3.of(2L, 20, 1588491278L)).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long, Integer, Long>>() {
            @Override
            public long extractAscendingTimestamp(Tuple3<Long, Integer, Long> element) {
                return element.f2 * 1000;
            }
        });

        input.map(new MapFunction<Tuple3<Long, Integer, Long>, Tuple2<Long, Integer>>() {
            @Override
            public Tuple2<Long, Integer> map(Tuple3<Long, Integer, Long> value) {
                return Tuple2.of(value.f0, value.f1);
            }
        })
             .keyBy(t -> t.f0)
             .window(TumblingEventTimeWindows.of(Time.seconds(10)))
             .reduce(new MyReduceFunction(),new MyProcessWindowFunction())
             .print();

        env.execute("ProcessWindowFunctionExample");
    }

    private static class MyReduceFunction implements ReduceFunction<Tuple2<Long, Integer>> {
        @Override
        public Tuple2<Long, Integer> reduce(Tuple2<Long, Integer> value1, Tuple2<Long, Integer> value2) throws Exception {
            //增量求和
            return Tuple2.of(value1.f0,value1.f1 + value2.f1);
        }
    }

    private static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple2<Long,Integer>,Tuple3<Long,Integer,String>,Long,TimeWindow> {
        @Override
        public void process(Long aLong, Context ctx, Iterable<Tuple2<Long, Integer>> elements, Collector<Tuple3<Long, Integer, String>> out) throws Exception {
            // 將求和之后的結(jié)果附帶窗口結(jié)束時(shí)間一起輸出
            out.collect(Tuple3.of(aLong,elements.iterator().next().f1,"window_end" + ctx.window().getEnd()));
        }
    }
}
  • AggregateFunction與ProcessWindowFunction組合
public class AggregateProcessWindowFunction {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 模擬數(shù)據(jù)源
        SingleOutputStreamOperator<Tuple3<Long, Integer, Long>> input = env.fromElements(
                Tuple3.of(1L, 10, 1588491228L),
                Tuple3.of(1L, 15, 1588491229L),
                Tuple3.of(1L, 20, 1588491238L),
                Tuple3.of(1L, 25, 1588491248L),
                Tuple3.of(2L, 10, 1588491258L),
                Tuple3.of(2L, 30, 1588491268L),
                Tuple3.of(2L, 20, 1588491278L))
                .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple3<Long, Integer, Long>>() {
                    @Override
                    public long extractAscendingTimestamp(Tuple3<Long, Integer, Long> element) {
                        return element.f2 * 1000;
                    }
                });

        input.keyBy(t -> t.f0)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .aggregate(new MyAggregateFunction(),new MyProcessWindowFunction())
                .print();

        env.execute("AggregateFunctionExample");

    }

    private static class MyAggregateFunction implements AggregateFunction<Tuple3<Long, Integer, Long>, Tuple2<Long, Integer>, Tuple2<Long, Integer>> {
        /**
         * 創(chuàng)建一個(gè)累加器,初始化值
         *
         * @return
         */
        @Override
        public Tuple2<Long, Integer> createAccumulator() {
            return Tuple2.of(0L, 0);
        }

        /**
         * @param value       輸入的元素值
         * @param accumulator 中間結(jié)果值
         * @return
         */
        @Override
        public Tuple2<Long, Integer> add(Tuple3<Long, Integer, Long> value, Tuple2<Long, Integer> accumulator) {
            return Tuple2.of(value.f0, value.f1 + accumulator.f1);
        }

        /**
         * 獲取計(jì)算結(jié)果值
         *
         * @param accumulator
         * @return
         */
        @Override
        public Tuple2<Long, Integer> getResult(Tuple2<Long, Integer> accumulator) {
            return Tuple2.of(accumulator.f0, accumulator.f1);
        }

        /**
         * 合并中間結(jié)果值
         *
         * @param a 中間結(jié)果值a
         * @param b 中間結(jié)果值b
         * @return
         */
        @Override
        public Tuple2<Long, Integer> merge(Tuple2<Long, Integer> a, Tuple2<Long, Integer> b) {
            return Tuple2.of(a.f0, a.f1 + b.f1);
        }
    }

    private static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple2<Long,Integer>,Tuple3<Long,Integer,String>,Long,TimeWindow> {
        @Override
        public void process(Long aLong, Context ctx, Iterable<Tuple2<Long, Integer>> elements, Collector<Tuple3<Long, Integer, String>> out) throws Exception {
            // 將求和之后的結(jié)果附帶窗口結(jié)束時(shí)間一起輸出
            out.collect(Tuple3.of(aLong,elements.iterator().next().f1,"window_end" + ctx.window().getEnd()));
        }
    }
}

window 生命周期解讀

生命周期圖解

窗口從創(chuàng)建到執(zhí)行窗口計(jì)算再到被清除,需要經(jīng)過(guò)一系列的過(guò)程,這個(gè)過(guò)程就是窗口的生命周期。

首先,當(dāng)一個(gè)元素進(jìn)入窗口算子之前,會(huì)由WindowAssigner分配該元素進(jìn)入哪個(gè)或哪幾個(gè)窗口,如果窗口不存在,則創(chuàng)建窗口。

其次,數(shù)據(jù)進(jìn)入了窗口,這時(shí)要看有沒(méi)有使用增量聚合函數(shù),如果使用了增量聚合函數(shù)ReduceFunction或AggregateFunction,新加入窗口的元素會(huì)立即觸發(fā)增量計(jì)算,計(jì)算的結(jié)果作為窗口的內(nèi)容。如果沒(méi)有使用增量聚合函數(shù),則會(huì)將進(jìn)入窗口的數(shù)據(jù)存儲(chǔ)到ListState狀態(tài)中,進(jìn)一步等待窗口觸發(fā)時(shí),遍歷窗口元素進(jìn)行聚合計(jì)算。

然后,每個(gè)元素在進(jìn)入窗口之后會(huì)傳遞至該窗口的觸發(fā)器,觸發(fā)器決定了窗口何時(shí)被執(zhí)行計(jì)算及何時(shí)需要清除自身和保存的內(nèi)容。觸發(fā)器可以根據(jù)已分配的元素或注冊(cè)的計(jì)時(shí)器來(lái)決定某些特定時(shí)刻執(zhí)行窗口計(jì)算或清除窗口內(nèi)容。

最后,觸發(fā)器成功觸發(fā)之后的操作取決于使用的窗口函數(shù),如果使用的是增量聚合函數(shù),如ReduceFunction或AggregateFunction,則會(huì)直接輸出聚合的結(jié)果。如果只包含一個(gè)全量窗口函數(shù),如ProcessWindowFunction,則會(huì)作用窗口的所有元素,執(zhí)行計(jì)算,輸出結(jié)果。如果組合使用了ReduceFunction和ProcessWindowFunction,即組合使用了增量聚合窗口函數(shù)和全量窗口函數(shù),全量窗口函數(shù)會(huì)作用于增量聚合函數(shù)的聚合值,然后再輸出最終的結(jié)果。

  • 情況1:僅使用增量聚合窗口函數(shù)
  • 情況2:僅使用全量窗口函數(shù)
  • 情況3:組合使用增量聚合窗口函數(shù)與全量窗口函數(shù)

分配器(Window Assigners)

WindowAssigner的作用是將輸入的元素分配到一個(gè)或多個(gè)窗口,當(dāng)WindowAssigner將第一個(gè)元素分配到窗口時(shí),就會(huì)創(chuàng)建該窗口,所以一個(gè)窗口一旦被創(chuàng)建,窗口中必然至少有一個(gè)元素。Flink內(nèi)置了很多WindowAssigners,本文主要討論基于時(shí)間的WindowAssigners,這些分配器都繼承了WindowAssigner抽象類。關(guān)于常用的分配器,上文已經(jīng)做了詳細(xì)解釋。下面先來(lái)看一下繼承關(guān)系圖:

接下來(lái),將會(huì)對(duì)WindowAssigner抽象類的源碼進(jìn)行分析,具體代碼如下:

```java
/**
 * WindowAssigner分配一個(gè)元素到0個(gè)或多個(gè)窗口
 * 在一個(gè)窗口算子內(nèi)部,元素是按照key進(jìn)行分組的(使用KeyedStream),
 * 相同key和window的元素集合稱之為一個(gè)pane(格子)
 * @param <T> 要分配元素的數(shù)據(jù)類型
 * @param <W> window的類型:TimeWindow、GlobalWindow
 */
@PublicEvolving
public abstract class WindowAssigner<T, W extends Window> implements Serializable {
    private static final long serialVersionUID = 1L;
    /**
     * 返回一個(gè)向其分配元素的窗口集合
     * @param element 待分配的元素
     * @param timestamp 元素的時(shí)間戳
     * @param context WindowAssignerContext對(duì)象
     * @return
     */
    public abstract Collection<W> assignWindows(T element, long timestamp, WindowAssignerContext context);
    /**
     * 返回一個(gè)與該WindowAssigner相關(guān)的默認(rèn)trigger(觸發(fā)器)
     * @param env 執(zhí)行環(huán)境
     * @return
     */
    public abstract Trigger<T, W> getDefaultTrigger(StreamExecutionEnvironment env);

    /**
     * 返回一個(gè)窗口序列化器
     * @param executionConfig
     * @return
     */
    public abstract TypeSerializer<W> getWindowSerializer(ExecutionConfig executionConfig);
    /**
     * 如果元素是基于event time分配到窗口的,則返回true
     * @return
     */
    public abstract boolean isEventTime();
    /**
     * 該Context允許訪問(wèn)當(dāng)前的處理時(shí)間processing time
     */
    public abstract static class WindowAssignerContext {

        /**
         * 返回當(dāng)前的處理時(shí)間
         */
        public abstract long getCurrentProcessingTime();
    }
}

觸發(fā)器(Triggers)

數(shù)據(jù)接入窗口后,窗口是否觸發(fā)WindowFunciton計(jì)算,取決于窗口是否滿足觸發(fā)條件。Triggers就是決定窗口何時(shí)觸發(fā)計(jì)算并輸出結(jié)果的條件,Triggers可以根據(jù)時(shí)間或者具體的數(shù)據(jù)條件進(jìn)行觸發(fā),比如進(jìn)入窗口元素的個(gè)數(shù)或者進(jìn)入窗口的某些特定的元素值等。前面討論的內(nèi)置WindowAssigner都有各自默認(rèn)的觸發(fā)器,當(dāng)使用的是Processing Time時(shí),則當(dāng)處理時(shí)間超過(guò)窗口結(jié)束時(shí)間時(shí)會(huì)被觸發(fā)。當(dāng)使用Event Time時(shí),當(dāng)水位線超過(guò)窗口結(jié)束時(shí)間時(shí)會(huì)被觸發(fā)。

Flink在內(nèi)部提供很多內(nèi)置的觸發(fā)器,常用的主要有EventTimeTrigger、ProcessTimeTrigger以及CountTrigger等。每種每種觸發(fā)器都對(duì)應(yīng)于不同的Window Assigner,例如Event Time類型的Windows對(duì)應(yīng)的觸發(fā)器是EventTimeTrigger,其基本原理是判斷當(dāng)前的Watermark是否超過(guò)窗口的EndTime,如果超過(guò)則觸發(fā)對(duì)窗口內(nèi)數(shù)據(jù)的計(jì)算,反之不觸發(fā)計(jì)算。關(guān)于上面分析的內(nèi)置WindowAssigner的默認(rèn)trigger,可以從各自的源碼中看到,具體羅列如下:

分配器 對(duì)應(yīng)的源碼 默認(rèn)觸發(fā)器
TumblingEventTimeWindows public Trigger <object, timewindow="" style="font-size: inherit; color: inherit; line-height: inherit; margin: 0px; padding: 0px;">getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); }</object,> EventTimeTrigger
TumblingProcessingTimeWindows public Trigger <object, timewindow="" style="font-size: inherit; color: inherit; line-height: inherit; margin: 0px; padding: 0px;">getDefaultTrigger(StreamExecutionEnvironment env) { return ProcessingTimeTrigger.create(); }</object,> ProcessingTimeTrigger
SlidingEventTimeWindows public Trigger <object, timewindow="" style="font-size: inherit; color: inherit; line-height: inherit; margin: 0px; padding: 0px;">getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); }</object,> EventTimeTrigger
SlidingProcessingTimeWindows public Trigger <object, timewindow="" style="font-size: inherit; color: inherit; line-height: inherit; margin: 0px; padding: 0px;">getDefaultTrigger(StreamExecutionEnvironment env) { return ProcessingTimeTrigger.create(); }</object,> ProcessingTimeTrigger
EventTimeSessionWindows public Trigger <object, timewindow="" style="font-size: inherit; color: inherit; line-height: inherit; margin: 0px; padding: 0px;">getDefaultTrigger(StreamExecutionEnvironment env) { return EventTimeTrigger.create(); }</object,> EventTimeTrigger
ProcessingTimeSessionWindows public Trigger <object, timewindow="" style="font-size: inherit; color: inherit; line-height: inherit; margin: 0px; padding: 0px;">getDefaultTrigger(StreamExecutionEnvironment env) { return ProcessingTimeTrigger.create(); }</object,> ProcessingTimeTrigger
GlobalWindows public Trigger <object, globalwindow="" style="font-size: inherit; color: inherit; line-height: inherit; margin: 0px; padding: 0px;">getDefaultTrigger(StreamExecutionEnvironment env) { return new NeverTrigger(); }</object,> NeverTrigger

這些Trigger都繼承了Trigger抽象類,具體的繼承關(guān)系,如下圖:

關(guān)于這些內(nèi)置的Trigger的具體解釋如下:

Trigger 解釋
EventTimeTrigger 當(dāng)前的Watermark是否超過(guò)窗口的EndTime,如果超過(guò)則觸發(fā)對(duì)窗口內(nèi)數(shù)據(jù)的計(jì)算,反之不觸發(fā)計(jì)算;
ProcessTimeTrigger 當(dāng)前的Processing Time是否超過(guò)窗口的EndTime,如果超過(guò)則觸發(fā)對(duì)窗口內(nèi)數(shù)據(jù)的計(jì)算,反之不觸發(fā)計(jì)算;
ContinuousEventTimeTrigger 根據(jù)間隔時(shí)間周期性觸發(fā)窗口或者Window的結(jié)束時(shí)間小于當(dāng)前EventTime,觸發(fā)窗口計(jì)算;
ContinuousProcessingTimeTrigger 根據(jù)間隔時(shí)間周期性觸發(fā)窗口或者Window的結(jié)束時(shí)間小于當(dāng)前ProcessTime,觸發(fā)窗口計(jì)算;
CountTrigger 根據(jù)窗口的數(shù)據(jù)條數(shù)是否超過(guò)設(shè)定的閾值確定是否觸發(fā)窗口計(jì)算;
DeltaTrigger 根據(jù)窗口的數(shù)據(jù)計(jì)算出來(lái)的Delta指標(biāo)是否超過(guò)指定的閾值,判斷是否觸發(fā)窗口計(jì)算
PurgingTrigger 可以將任意觸發(fā)器作為參數(shù)轉(zhuǎn)換為Purge類型觸發(fā)器,計(jì)算完成后數(shù)據(jù)將被清理。

關(guān)于抽象類Trigger的源碼解釋如下:

/**
 * @param <T> 元素的數(shù)據(jù)類型
 * @param <W> Window的類型
 */
@PublicEvolving
public abstract class Trigger<T, W extends Window> implements Serializable {

    private static final long serialVersionUID = -4104633972991191369L;
    /**
     * 每個(gè)元素被分配到窗口時(shí)都會(huì)調(diào)用該方法,返回一個(gè)TriggerResult枚舉
     * 該枚舉包含很多觸發(fā)的類型:CONTINUE、FIRE_AND_PURGE、FIRE、PURGE
     *
     * @param element   進(jìn)入窗口的元素
     * @param timestamp 進(jìn)入窗口元素的時(shí)間戳
     * @param window    窗口
     * @param ctx       上下文對(duì)象,可以注冊(cè)計(jì)時(shí)器(timer)回調(diào)函數(shù)
     * @return
     * @throws Exception
     */
    public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
    /**
     * 當(dāng)使用TriggerContext注冊(cè)的processing-time計(jì)時(shí)器被觸發(fā)時(shí),會(huì)調(diào)用該方法
     *
     * @param time   觸發(fā)計(jì)時(shí)器的時(shí)間戳
     * @param window 計(jì)時(shí)器觸發(fā)的window
     * @param ctx    上下文對(duì)象,可以注冊(cè)計(jì)時(shí)器(timer)回調(diào)函數(shù)
     * @return
     * @throws Exception
     */
    public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
    /**
     * 當(dāng)使用TriggerContext注冊(cè)的event-time計(jì)時(shí)器被觸發(fā)時(shí),會(huì)調(diào)用該方法
     *
     * @param time   觸發(fā)計(jì)時(shí)器的時(shí)間戳
     * @param window 計(jì)時(shí)器觸發(fā)的window
     * @param ctx    上下文對(duì)象,可以注冊(cè)計(jì)時(shí)器(timer)回調(diào)函數(shù)
     * @return
     * @throws Exception
     */
    public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
    /**
     * 如果觸發(fā)器支持合并觸發(fā)器狀態(tài),將返回true
     *
     * @return
     */
    public boolean canMerge() {
        return false;
    }

    /**
     * 當(dāng)多個(gè)窗口被合并成一個(gè)窗口時(shí),會(huì)調(diào)用該方法
     *
     * @param window 合并之后的window
     * @param ctx    上下文對(duì)象,可以注冊(cè)計(jì)時(shí)器回調(diào)函數(shù),也可以訪問(wèn)狀態(tài)
     * @throws Exception
     */
    public void onMerge(W window, OnMergeContext ctx) throws Exception {
        throw new UnsupportedOperationException("This trigger does not support merging.");
    }
    /**
     * 清除所有Trigger持有的窗口狀態(tài)
     * 當(dāng)窗口被銷毀時(shí),調(diào)用該方法
     *
     * @param window
     * @param ctx
     * @throws Exception
     */
    public abstract void clear(W window, TriggerContext ctx) throws Exception;
    /**
     * Context對(duì)象,傳給Trigger的方法參數(shù)中,用于注冊(cè)計(jì)時(shí)器回調(diào)函數(shù)和處理狀態(tài)
     */
    public interface TriggerContext {
        // 返回當(dāng)前處理時(shí)間
        long getCurrentProcessingTime();
        MetricGroup getMetricGroup();
        // 返回當(dāng)前水位線時(shí)間戳
        long getCurrentWatermark();
        // 注冊(cè)一個(gè)processing-time的計(jì)時(shí)器
        void registerProcessingTimeTimer(long time);
        // 注冊(cè)一個(gè)EventTime計(jì)時(shí)器
        void registerEventTimeTimer(long time);
        //  刪除一個(gè)processing-time的計(jì)時(shí)器
        void deleteProcessingTimeTimer(long time);
        // 刪除一個(gè)EventTime計(jì)時(shí)器
        void deleteEventTimeTimer(long time);
        /**
         * 提取狀態(tài)當(dāng)前Trigger的窗口和Key的狀態(tài)
         */
        <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor);

        // 與getPartitionedState功能相同,該方法已被標(biāo)記過(guò)時(shí)
        @Deprecated
        <S extends Serializable> ValueState<S> getKeyValueState(String name, Class<S> stateType, S defaultState);
        // 同getPartitionedState功能,該方法已被標(biāo)記過(guò)時(shí)
        @Deprecated
        <S extends Serializable> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState);
    }
    // TriggerContext的擴(kuò)展
    public interface OnMergeContext extends TriggerContext {
        // 合并每個(gè)window的狀態(tài),狀態(tài)必須支持合并
        <S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor);
    }
}

上面的源碼可以看出,每當(dāng)觸發(fā)器調(diào)用時(shí),會(huì)產(chǎn)生一個(gè)TriggerResult對(duì)象,該對(duì)象是一個(gè)枚舉類,其包括的屬性決定了作用在窗口上的操作是什么??偣灿兴姆N行為:CONTINUE、FIRE_AND_PURGE、FIRE、PURGE,關(guān)于每種類型的具體含義,我們先看一下TriggerResult源碼:

/**
 * 觸發(fā)器方法的結(jié)果類型,決定在窗口上執(zhí)行什么操作,比如是否調(diào)用window function
 * 或者是否需要銷毀窗口
 * 注意:如果一個(gè)Trigger返回的是FIRE或者FIRE_AND_PURGE,但是窗口中沒(méi)有任何元素,則窗口函數(shù)不會(huì)被調(diào)用
 */
public enum TriggerResult {

    // 什么都不做,當(dāng)前不觸發(fā)計(jì)算,繼續(xù)等待
    CONTINUE(false, false),

    // 執(zhí)行 window function,輸出結(jié)果,之后清除所有狀態(tài)
    FIRE_AND_PURGE(true, true),

    // 執(zhí)行 window function,輸出結(jié)果,窗口不會(huì)被清除,數(shù)據(jù)繼續(xù)保留
    FIRE(true, false),
    
    // 清除窗口內(nèi)部數(shù)據(jù),但不觸發(fā)計(jì)算
    PURGE(false, true);
    
}

清除器(Evictors)

Evictors是一個(gè)可選的組件,其主要作用是對(duì)進(jìn)入WindowFuction前后的數(shù)據(jù)進(jìn)行清除處理。Flink內(nèi)置了三種Evictors:分別為CountEvictor、DeltaEvictor、TimeEvitor。如果用戶不指定Evictors,也不會(huì)有默認(rèn)值。

  • CountEvictor:保持在窗口中具有固定數(shù)量的元素,將超過(guò)指定窗口元素?cái)?shù)量的數(shù)據(jù)在窗口計(jì)算前剔除;
  • DeltaEvictor:通過(guò)定義DeltaFunction和指定threshold,并計(jì)算Windows中的元素與最新元素之間的Delta大小,如果超過(guò)threshold則將當(dāng)前數(shù)據(jù)元素剔除;
  • TimeEvictor:通過(guò)指定時(shí)間間隔,將當(dāng)前窗口中最新元素的時(shí)間減去Interval,然后將小于該結(jié)果的數(shù)據(jù)全部剔除,其本質(zhì)是將具有最新時(shí)間的數(shù)據(jù)選擇出來(lái),刪除過(guò)時(shí)的數(shù)據(jù)。

Evictors繼承關(guān)系圖如下:

關(guān)于Evictors接口的源碼,如下:

/**
 * 在WindowFunction計(jì)算之前或者之后進(jìn)行清除窗口元素
 * @param <T> 元素的數(shù)據(jù)類型
 * @param <W> 窗口類型
 */
@PublicEvolving
public interface Evictor<T, W extends Window> extends Serializable {
    /**
     * 選擇性剔除元素,在windowing function之前調(diào)用
     * @param elements 窗口中的元素
     * @param size  窗口中元素個(gè)數(shù)
     * @param window 窗口
     * @param evictorContext
     */
    void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
    /**
     * 選擇性剔除元素,在windowing function之后調(diào)用
     * @param elements 窗口中的元素.
     * @param size 窗口中元素個(gè)數(shù).
     * @param window 窗口
     * @param evictorContext
     */
    void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
    // 傳遞給Evictor方法參數(shù)的值
    interface EvictorContext {
        // 返回當(dāng)前processing time
        long getCurrentProcessingTime();
        MetricGroup getMetricGroup();
        // 返回當(dāng)前的水位線時(shí)間戳
        long getCurrentWatermark();
    }
}

小結(jié)

本文首先給出了窗口使用的快速入門(mén),介紹了窗口的基本概念、分類及簡(jiǎn)單使用。然后對(duì)Flink內(nèi)置的Window Assigner進(jìn)行了一一解讀,并給出了圖解與使用的代碼片段。接著對(duì)Flink的Window Function進(jìn)行介紹,包括窗口函數(shù)的分類及詳細(xì)使用案例。最后分析了Window生命周期所涉及的組件,并對(duì)每個(gè)組件的源碼進(jìn)行分析。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容