Flink 基礎(chǔ) - 窗口 Window API 編程實(shí)例

前言

通過(guò)上一篇文章,我們基本了解了窗口的概念,以及常見(jiàn)的用法,本篇繼續(xù)學(xué)習(xí),flink 的window api 操作

窗口分配 - window() 方法

  • 我們可以使用 window() 來(lái)定義一個(gè)窗口,然后基于窗口做一些處理,注意 window() 操作必須在 keyBy() 之后才能用
  • flink 提供了更加簡(jiǎn)單的 timeWindow()countWindow() 用于定義時(shí)間窗口和計(jì)數(shù)窗口

windowAll 方法

上文說(shuō)到,window()必須用在 keyBy() 之后,但是 windowAll() 可以直接應(yīng)用在 DataStream 類(lèi)型上,但是這個(gè)相當(dāng)于,把所有數(shù)據(jù)都放到一個(gè)窗口運(yùn)行,是一個(gè)全局操作,會(huì)把所有數(shù)據(jù)都放到相同的下游算子,相當(dāng)于并行度設(shè)置為了1,不推薦使用

timeWindow

timeWindow ,傳一個(gè)參數(shù)時(shí),會(huì)設(shè)置為滾動(dòng)窗口,傳兩個(gè)參數(shù),就是滑動(dòng)窗口

如何操作窗口中收集的數(shù)據(jù)

增量聚合函數(shù)

  • 每條數(shù)據(jù)進(jìn)來(lái)就計(jì)算,保持一個(gè)簡(jiǎn)單的狀態(tài)
  • ReduceFunction, AggregateFunction
    例如上一篇的wordCount 案例,就是增量聚合,即來(lái)一條數(shù)據(jù),處理一條,還有類(lèi)似的,比如 min(),sum() 等算子操作,都是增量聚合
    ReduceFunction 的案例就是上一篇的word count ,這里主要以 AggregateFunction 為例 ,依然是實(shí)現(xiàn)一個(gè) word count 的案例,測(cè)試過(guò)程省略
package com.lxs.flink.realtime.window;

import com.lxs.utils.KafkaUtils;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.util.Objects;

/**
 * User: lixinsong
 * Date: 2021/1/14
 * Description:
 */

public class AggregateFunctionTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Tuple2<String, Integer>> dataStream = env.addSource(KafkaUtils.initKafka("lxs")).flatMap(new FlatMapFunction<String, Tuple2<String, Integer >>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer >> collector) throws Exception {
                String[] arr = s.split(",");
                for (String a: arr) {
                    collector.collect(Tuple2.of(a, 1));
                }
            }
        });

        // 統(tǒng)計(jì)每10s的窗口,輸入數(shù)據(jù)的詞頻
        DataStream<Tuple2<String, Integer>> wordCount = dataStream.keyBy(0).timeWindow(Time.seconds(10)).aggregate(new AggregateFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> createAccumulator() {
                // 創(chuàng)建累加器,初始值給一個(gè) null
                return null;
            }

            @Override
            public Tuple2<String, Integer> add(Tuple2<String, Integer> s1, Tuple2<String, Integer> s2) {
                // 累加器進(jìn)行累加, 需要注意,因?yàn)槔奂悠鞒跏贾禐?NULL,這里需要人為判斷,如果為null,就設(shè)置為當(dāng)前傳入的數(shù)據(jù)值
                if (Objects.isNull(s2)) {
                    return s1;
                }
                s2.f1 = s2.f1 + 1;   // word count 累加器 + 1 
                return s2;
            }

            @Override
            public Tuple2<String, Integer> getResult(Tuple2<String, Integer> acc) {
                return acc;
            }

            @Override
            public Tuple2<String, Integer> merge(Tuple2<String, Integer> s, Tuple2<String, Integer> acc1) {
                // 滾動(dòng)窗口不會(huì)走到這個(gè)函數(shù),一般會(huì)話窗口需要這個(gè)方法
                return null;
            }
        });

        wordCount.print("AggregateFunctionTest");
        env.execute("test");
    }
}

需要注意的是 AggregateFunction 有三個(gè)組成部分,分別是 輸入值,累加值,輸出值, 即實(shí)現(xiàn)的子方法,都是在實(shí)現(xiàn)累加器的功能,AggregateFunction 源碼如下

image.png

全窗口函數(shù)

  • 先把窗口數(shù)據(jù)收集起來(lái),等到計(jì)算時(shí)遍歷所有數(shù)據(jù)
  • ProcessWindowFunction, WindowFunction

先以 WindowFunction 舉例,依然是 word count

package com.lxs.flink.realtime.window;

import com.lxs.utils.KafkaUtils;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.IterableUtils;

import java.util.Iterator;

/**
 * User: lixinsong
 * Date: 2021/1/14
 * Description:
 */

public class WindowFunctionTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Tuple2<String, Integer>> dataStream = env.addSource(KafkaUtils.initKafka("lxs")).flatMap(new FlatMapFunction<String, Tuple2<String, Integer >>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer >> collector) throws Exception {
                String[] arr = s.split(",");
                for (String a: arr) {
                    collector.collect(Tuple2.of(a, 1));
                }
            }
        });

        DataStream<Tuple2<String, Integer>> wordCount = dataStream.keyBy(0).timeWindow(Time.seconds(10)).apply(new WindowFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple, TimeWindow>() {
            @Override
            // 全窗口函數(shù),將窗口的數(shù)據(jù)收集,統(tǒng)一處理,所以這里的 input 是一個(gè) Iterable 對(duì)象
            public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Integer>> input, Collector<Tuple2<String, Integer>> out) throws Exception {
                int count = 0;
                String word = "";
                for (Tuple2<String, Integer> s : input) {
                    word = s.f0;
                    count += 1;
                }
                out.collect(Tuple2.of(word, count));
            }
        });
        wordCount.print("windowFunction test");
        env.execute("test");
    }
}

這里主要是在 timeWindow() 方法后面使用了apply() 算子,實(shí)現(xiàn)了 WindowFunction 接口,主要是實(shí)現(xiàn)了 apply()方法,先看下 WindowFunction 源碼,可以看出WindowFunction 有更多的窗口信息,有輸入數(shù)據(jù)(IN),輸出數(shù)據(jù)(OUT),key(keyBy()的返回),窗口信息(W)

image.png

其他 API

  • trigger() 觸發(fā)器,定義窗口什么時(shí)間關(guān)閉,觸發(fā)計(jì)算并輸出結(jié)果
  • evictor() 移除器
  • allowedLateness() 允許處理遲到數(shù)據(jù),窗口延遲關(guān)閉
  • sideOutputLateData 將遲到數(shù)據(jù)放入側(cè)輸出流
  • getSideOutput() 獲取側(cè)輸出流

例如側(cè)輸出流案例如下,但是需要注意,側(cè)輸出流必須用在時(shí)間語(yǔ)義下,否則,我們無(wú)法界定什么數(shù)據(jù)算作遲到數(shù)據(jù),應(yīng)該放到側(cè)輸出流

OutputTag<Tuple2<String, Integer>> late = new OutputTag<Tuple2<String, Integer>>("late"){};
        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = dataStream.keyBy(0)
                .timeWindow(Time.seconds(10))  // 設(shè)置 10 秒的滾動(dòng)窗口
                .allowedLateness(Time.seconds(2))    // 允許數(shù)據(jù)遲到 2s,窗口延遲2s關(guān)閉
                .sideOutputLateData(late)    // 如果延遲2s還有遲到數(shù)據(jù),就放到側(cè)輸出流
                .sum(1);
        sum.getSideOutput(late).print("late");
        sum.print("wordCount test");
        env.execute("test");

總結(jié)

Keyed Window

// Keyed Window
stream
       .keyBy(...)               <-  按照一個(gè)Key進(jìn)行分組
       .window(...)              <-  將數(shù)據(jù)流中的元素分配到相應(yīng)的窗口中
      [.trigger(...)]            <-  指定觸發(fā)器Trigger(可選)
      [.evictor(...)]            <-  指定清除器Evictor(可選)
       .reduce/aggregate/process()      <-  窗口處理函數(shù)Window Function

// Non-Keyed Window
stream
       .windowAll(...)           <-  不分組,將數(shù)據(jù)流中的所有元素分配到相應(yīng)的窗口中
      [.trigger(...)]            <-  指定觸發(fā)器Trigger(可選)
      [.evictor(...)]            <-  指定清除器Evictor(可選)
       .reduce/aggregate/process()      <-  窗口處理函數(shù)Window Function
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容