基于Apache Flink的流處理 第六章 基于時(shí)間和窗口的算子

1、配置時(shí)間特性

  • ProcessingTime,指算子根據(jù)處理機(jī)器的系統(tǒng)時(shí)鐘決定數(shù)據(jù)流當(dāng)前的時(shí)間。無(wú)須等待水位線來(lái)驅(qū)動(dòng)事件時(shí)間前進(jìn)。
  • EventTime,蒜子根據(jù)數(shù)據(jù)自身包含的信息決定當(dāng)前時(shí)間。依靠水位線聲明某個(gè)時(shí)間間隔內(nèi)所有時(shí)間戳都已經(jīng)接受時(shí),事件時(shí)間窗口才觸發(fā)
  • IngestionTime,指定每個(gè)接受的記錄都把數(shù)據(jù)源算子的處理時(shí)間作為事件時(shí)間的時(shí)間戳并自動(dòng)生成水位線。

默認(rèn)情況下是處理時(shí)間,設(shè)置其他時(shí)間特性使用

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

1.1 分配時(shí)間戳和生成水位線

  • 水位線是當(dāng)前事件攜帶的一個(gè)時(shí)間戳,用來(lái)觸發(fā)計(jì)時(shí)器(比如窗口),通常比當(dāng)前事件的時(shí)間早一些,告知系統(tǒng)該事件來(lái)時(shí),只處理更早一個(gè)時(shí)間點(diǎn)之前的數(shù)據(jù)。

周期性水位分配器

  • 系統(tǒng)默認(rèn)200毫秒發(fā)一個(gè)水位線
  • Flink會(huì)根據(jù)設(shè)定的間隔,調(diào)用getCurrentWatermark()方法,如果該方法返回值非空,且它的時(shí)間戳大于上一個(gè)水位線的時(shí)間戳,那么算子就會(huì)發(fā)出一個(gè)新的水位線
  • 也可以使用BoundedOutOfOrdernessTimeStampExtractor來(lái)替代下面這段
env.getConfig().setAutoWatermarkInterval(100);

DataStream<SensorReading> dataStream = inputStream.map(line -> {
                    String[] fields = line.split(",");
                    return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
                })

                // 亂序數(shù)據(jù)設(shè)置時(shí)間戳和watermark
                .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<SensorReading>() {

                    private final long maxOutBoundary = 3 * 1000L;
                    private long currentMaxTimestamp = Integer.MIN_VALUE;

                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        return new Watermark(currentMaxTimestamp - maxOutBoundary);
                    }

                    @Override
                    public long extractTimestamp(SensorReading sensorReading, long l) {
                        //獲取當(dāng)前記錄的時(shí)間戳
                        long currentTs = sensorReading.getTimestamp() * 1000L;
                        // 更新最大的時(shí)間戳
                        currentMaxTimestamp = Math.max(currentMaxTimestamp, currentTs);
                        // 返回記錄的時(shí)間戳
                        return currentTs;
                    }
                });

定點(diǎn)水位線分配器

  • 接口中的checkAndGetNextWatermark()方法會(huì)在針對(duì)每個(gè)事件的extractTimestamp()方法后立刻調(diào)用。決定是否生成新水位線,如果該方法返回一個(gè)非空、且大于之前值的水位線。算子就會(huì)將這個(gè)新水位線發(fā)出

http://www.itdecent.cn/p/e6c7957d76d9

class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {

    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        element.getCreationTime
    }

    override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {
        if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
    }
}

1.2 水位線、延遲及完整性

  • 水位線可用于平衡延遲和結(jié)果的完整性。
  • 它們控制著在執(zhí)行某些計(jì)算前需要等待數(shù)據(jù)到達(dá)的時(shí)間。

2、處理函數(shù)

  • 處理函數(shù)可以訪問(wèn)記錄的時(shí)間戳和水位線,并支持注冊(cè)在將來(lái)某個(gè)特定時(shí)間出發(fā)的計(jì)時(shí)器

2.1 處理函數(shù)接口

  • 以KeyedProcessFunction為例,其中processElement會(huì)對(duì)每個(gè)到來(lái)的記錄進(jìn)行處理,其中的Context可以訪問(wèn)到水位等信息,第一個(gè)參數(shù)是輸入,第三個(gè)是輸出。
  • 在processElement中可以注冊(cè)計(jì)數(shù)器registerProcessingTimeTimer,然后在onTimer這里寫觸發(fā)時(shí)干什么
dataStream.keyBy("id")
                .process( new MyProcess() )
                .print();

public static class MyProcess extends KeyedProcessFunction<Tuple, SensorReading, Integer> {
        ValueState<Long> tsTimerState;

        @Override
        public void open(Configuration parameters) throws Exception {
            tsTimerState =  getRuntimeContext().getState(new ValueStateDescriptor<Long>("ts-timer", Long.class));
        }

        @Override
        public void processElement(SensorReading value, Context ctx, Collector<Integer> out) throws Exception {
            out.collect(value.getId().length());

            // context
            // Timestamp of the element currently being processed or timestamp of a firing timer.
            ctx.timestamp();
            // Get key of the element being processed.
            ctx.getCurrentKey();
            //            ctx.output();
            ctx.timerService().currentProcessingTime();
            ctx.timerService().currentWatermark();
            // 在5處理時(shí)間的5秒延遲后觸發(fā)
            ctx.timerService().registerProcessingTimeTimer( ctx.timerService().currentProcessingTime() + 5000L);
            tsTimerState.update(ctx.timerService().currentProcessingTime() + 1000L);
            //            ctx.timerService().registerEventTimeTimer((value.getTimestamp() + 10) * 1000L);
            // 刪除指定時(shí)間觸發(fā)的定時(shí)器
            //            ctx.timerService().deleteProcessingTimeTimer(tsTimerState.value());
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception {
            System.out.println(timestamp + " 定時(shí)器觸發(fā)");
            ctx.getCurrentKey();
            //            ctx.output();
            ctx.timeDomain();
        }

        @Override
        public void close() throws Exception {
            tsTimerState.clear();
        }
    }

2.2 時(shí)間服務(wù)和計(jì)時(shí)器

  • Context和onTimerContext對(duì)象中的TimerService提供了一下方法:
    • currentProcessingTime()
    • currentWatermark()
    • registerProcessingTimeTimer(long timestamp)
    • registerEventTimeTimer(long timestamp)
    • deleteProcessingTimeTimer(long timestamp)
    • deleteEventTimeTimer(long timestamp)
  • 計(jì)時(shí)器處罰會(huì)調(diào)用onTimer()回調(diào)函數(shù)
  • 系統(tǒng)對(duì)于processElement()和onTimer()兩個(gè)方法的調(diào)用時(shí)同步的
  • 每個(gè)鍵值可以有多個(gè)計(jì)時(shí)器,但具體到每個(gè)時(shí)間戳只能有一個(gè)。
  • 默認(rèn)會(huì)將計(jì)時(shí)器的時(shí)間戳放到一個(gè)優(yōu)先隊(duì)列中。
  • 所有計(jì)時(shí)器會(huì)和其他狀態(tài)一起寫入檢查點(diǎn)。

2.3 副輸出

  • 可以發(fā)出多條數(shù)據(jù)流
SingleOutputStreamOperator<SensorReading> highTempStream = dataStream.process(new ProcessFunction<SensorReading, SensorReading>() {
      @Override
      public void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) throws Exception {
        // 判斷溫度,大于30度,高溫流輸出到主流;小于低溫流輸出到側(cè)輸出流
        if (value.getTemperature() > 30) {
          out.collect(value);
        } else {
          ctx.output(lowTempTag, value);
        }
      }
    });

highTempStream.getSideOutput(lowTempTag).print("low-temp");

3、窗口算子

  • 基于時(shí)間的窗口分配器會(huì)根據(jù)元素時(shí)間的時(shí)間戳或當(dāng)前處理時(shí)間將其分配到一個(gè)或多個(gè)窗口,每個(gè)時(shí)間窗口都有一個(gè)開(kāi)始時(shí)間戳和結(jié)束時(shí)間戳
  • 所有內(nèi)置的窗口分配器都提供了一個(gè)默認(rèn)的觸發(fā)器,一旦時(shí)間超過(guò)了窗口的結(jié)束時(shí)間就會(huì)觸發(fā)窗口計(jì)算
  • 時(shí)間區(qū)間是左閉右開(kāi)的

3.1 滾動(dòng)窗口

滾動(dòng)窗口不會(huì)重疊,滑動(dòng)窗口可以重疊,會(huì)話窗口是根據(jù)沒(méi)有收到信息的間隔來(lái)劃定窗口。滾動(dòng)窗口比較常用。https://ashiamd.github.io/docsify-notes/#/study/BigData/Flink/%E5%B0%9A%E7%A1%85%E8%B0%B7Flink%E5%85%A5%E9%97%A8%E5%88%B0%E5%AE%9E%E6%88%98-%E5%AD%A6%E4%B9%A0%E7%AC%94%E8%AE%B0?id=_622-timewindow

這里使用.window就可以創(chuàng)建默認(rèn)窗口了,類型為TimeWindow。

DataStream<Integer> resultStream = dataStream.keyBy("id")
                .window(TumblingProcessingTimeWindows.of(Time.seconds(15)))

3.2 在窗口上應(yīng)用的函數(shù)

aggregateFunction

  • 以增量方式應(yīng)用于窗口內(nèi)的元素,狀態(tài)只有一個(gè)值
  • 第一個(gè)參數(shù)是輸入,第三個(gè)是輸出,第二個(gè)是累加用于存儲(chǔ)中間值的類型
DataStream<Integer> resultStream = dataStream.keyBy("id")
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .aggregate(new AggregateFunction<SensorReading, Integer, Integer>() {

                    // 新建的累加器
                    @Override
                    public Integer createAccumulator() {
                        return 0;
                    }

                    // 每個(gè)數(shù)據(jù)在上次的基礎(chǔ)上累加
                    @Override
                    public Integer add(SensorReading value, Integer accumulator) {
                        return accumulator + 1;
                    }

                    // 返回結(jié)果值
                    @Override
                    public Integer getResult(Integer accumulator) {
                        return accumulator;
                    }

                    // 分區(qū)合并結(jié)果(TimeWindow一般用不到,SessionWindow可能需要考慮合并)
                    @Override
                    public Integer merge(Integer a, Integer b) {
                        return a + b;
                    }
                });

ProcessWindowFunction

  • 就是窗口的處理函數(shù),可以獲取到環(huán)境信息
SingleOutputStreamOperator<SensorReading> minTempStream = keyStream
                .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<SensorReading>() {
                    private final long maxOutBoundary = 4 * 1000L;
                    private long currentMaxTimestamp = Integer.MIN_VALUE;

                    @Nullable
                    @Override
                    public Watermark getCurrentWatermark() {
                        return new Watermark(currentMaxTimestamp - maxOutBoundary);
                    }

                    @Override
                    public long extractTimestamp(SensorReading sensorReading, long l) {
                        //獲取當(dāng)前記錄的時(shí)間戳
                        long currentTs = sensorReading.getTimestamp() * 1000L;
                        // 更新最大的時(shí)間戳
                        currentMaxTimestamp = Math.max(currentMaxTimestamp, currentTs);
                        // 返回記錄的時(shí)間戳
                        return currentTs;
                    }
                })
                .keyBy(SensorReading::getId)
                .window(TumblingEventTimeWindows.of(Time.seconds(2)))
                .process(new ProcessWindowFunction<SensorReading, SensorReading, String, TimeWindow>() {
                    @Override
                    public void process(String s, ProcessWindowFunction<SensorReading, SensorReading, String, TimeWindow>.Context context, Iterable<SensorReading> iterable, Collector<SensorReading> collector) throws Exception {
                        System.out.println(context.currentWatermark());
                    }
                });

3.3 自定義窗口算子

  • 這個(gè)就是自定義窗口,不實(shí)用現(xiàn)成的窗口函數(shù)
  • 主要的幾個(gè)組件是分配器、觸發(fā)器和移除器
窗口相關(guān)組件流程

分配器 windowAssigner

  • 會(huì)返回0個(gè)或多個(gè)窗口對(duì)象
  • 會(huì)提供默認(rèn)的觸發(fā)器
new WindowAssigner<SensorReading, Window>() {
                    
    @Override
     public Collection<Window> assignWindows(SensorReading sensorReading, long l, WindowAssignerContext windowAssignerContext) {
         // long starttime = l - (l % windowsize);
         // long endtime = starttime + endtime;
         // collector.singletonList(new TimeWindow(starttime, endtime));
     }

     @Override
     public Trigger<SensorReading, Window> getDefaultTrigger(StreamExecutionEnvironment streamExecutionEnvironment) {
         return EventTimeTrigger.create();
     }

     @Override
     public TypeSerializer<Window> getWindowSerializer(ExecutionConfig executionConfig) {
          return null;
      }

      @Override
      public boolean isEventTime() {
          return false;
      }
}

觸發(fā)器

  • 定義何時(shí)對(duì)窗口進(jìn)行計(jì)算并發(fā)出結(jié)果
  • 默認(rèn)觸發(fā)器會(huì)在處理時(shí)間或水位線超過(guò)了窗口結(jié)束邊界的時(shí)間戳?xí)r觸發(fā)
  • 每次觸發(fā)器調(diào)用都會(huì)生成一個(gè)TriggerResult,可以是以下幾個(gè)值:CONTINUE(什么都不做)、FIRE(發(fā)出結(jié)果)、PURGE(清楚窗口內(nèi)容并刪除窗口自身及元數(shù)據(jù))、FIRE_AND_PURGE
  • trigger本身還可以操縱context去注冊(cè)計(jì)時(shí)器(是計(jì)時(shí)器不是觸發(fā)器,就是ontimer的那個(gè)東西)
SingleOutputStreamOperator<PageViewCount> uvStream = dataStream
      .filter(data -> "pv".equals(data.getBehavior()))
      .timeWindowAll(Time.hours(1))
      .trigger(new MyTrigger())
      .process(new UvCountResultWithBloomFliter());

public static class MyTrigger extends Trigger<UserBehavior, TimeWindow> {
    @Override
    public TriggerResult onElement(UserBehavior element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
      // 每一條數(shù)據(jù)來(lái)到,直接觸發(fā)窗口計(jì)算,并且直接清空窗口
      return TriggerResult.FIRE_AND_PURGE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
      return TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
      return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
    }
  }

移除器

  • 用于在窗口執(zhí)行計(jì)算前或后從窗口中刪除元素
new Evictor<SensorReading, TimeWindow>() {
                    @Override
                    public void evictBefore(Iterable<TimestampedValue<SensorReading>> iterable, int i, TimeWindow timeWindow, EvictorContext evictorContext) {
                        
                    }

                    @Override
                    public void evictAfter(Iterable<TimestampedValue<SensorReading>> iterable, int i, TimeWindow timeWindow, EvictorContext evictorContext) {

                    }
                }

4、基于時(shí)間的雙流Join

  • 基于時(shí)間間隔的Join,對(duì)兩條流中擁有相同鍵值以及彼此之間時(shí)間戳不超過(guò)某一指定間隔的事件進(jìn)行Join
orangeStream
    .keyBy(<KeySelector>)
    .intervalJoin(greenStream.keyBy(<KeySelector>))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process (new ProcessJoinFunction<Integer, Integer, String(){

        @Override
        public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
            out.collect(first + "," + second);
        }
    });
  • 基于窗口的join
input1.join(input2)
.where() // 左流限制條件
.equalTo() // 右流限制條件
.window()
.apply() // 具體join的joinFunction

5、處理遲到的數(shù)據(jù)

5.1 用副輸出收集遲到的數(shù)據(jù)

OutputTag<SensorReading> outputTag = new OutputTag<SensorReading>("late") {
    };

    // 基于事件時(shí)間的開(kāi)窗聚合,統(tǒng)計(jì)15秒內(nèi)溫度的最小值
    SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id")
      .timeWindow(Time.seconds(15))
      .sideOutputLateData(outputTag)
      .minBy("temperature");

    minTempStream.print("minTemp");
    minTempStream.getSideOutput(outputTag).print("late");

    env.execute();

5.2 基于遲到數(shù)據(jù)更新結(jié)果

  • allowedLateness 指定這個(gè)之后水位線超過(guò)窗口結(jié)束時(shí)間仍會(huì)保留數(shù)據(jù) 這里指定時(shí)間這么久
  • 會(huì)觸發(fā)多次計(jì)算,水位線到了會(huì)觸發(fā),遲到數(shù)據(jù)來(lái)了也會(huì)觸發(fā)

https://blog.csdn.net/lmalds/article/details/55259718

    SingleOutputStreamOperator<SensorReading> minTempStream = dataStream.keyBy("id")
      .timeWindow(Time.seconds(15))
      .allowedLateness(Time.minutes(1))
      .sideOutputLateData(outputTag)
      .minBy("temperature");
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容