Flink 使用之數(shù)據(jù)分流

Flink 使用介紹相關(guān)文檔目錄

Flink 使用介紹相關(guān)文檔目錄

背景

數(shù)據(jù)分流在Flink中叫做旁路輸出(side output)。Flink的工作流程可以理解為是一條流水線。我們編寫的程序是流水線上的各個工序。然而,在一些場景中,我們將一部分滿足特定條件的數(shù)據(jù)從主線分出去,形成旁路輸出,好比是流水線中發(fā)現(xiàn)不合格的產(chǎn)品必須要剔除一樣。本篇為大家?guī)鞦link使用旁路輸出的方法。

OutputTag

Flink可以支持1個或多個side output。不同的side output使用OutputTag區(qū)分。OutputTag是side output數(shù)據(jù)流的標記,和數(shù)據(jù)流嚴格對應(yīng)。

OutputTag使用如下方式創(chuàng)建:

public static final OutputTag<X> someTag = new OutputTag<X>("tag-name"){};

其中泛型X為旁路輸出流的數(shù)據(jù)類型,tag-name為tag的名稱。

需要注意的是我們必須使用匿名內(nèi)部類的方式創(chuàng)建OutputTag。下面這種寫法是錯誤的:

public static final OutputTag<X> someTag = new OutputTag<X>("tag-name");

這是因為Flink內(nèi)部使用clazz.getGenericSuperclass()方式獲取父類的Type,返回的類型是ParameterizedType,從中可以獲取到泛型類型(代碼位于TypeExtractor::getParameterType)。如果不使用匿名內(nèi)部類則無法獲取到泛型類型,會引發(fā)錯誤。

遲到數(shù)據(jù)分流

下面這個例子是將遲到的數(shù)據(jù)單獨從旁路輸出。代碼和分析如下所示:

public class OutOfOrderDemo {
    // 創(chuàng)建tag
    public static final OutputTag<Tuple2<String, Integer>> lateTag = new OutputTag<Tuple2<String, Integer>>("late"){};

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        // 示例數(shù)據(jù),其中D亂序,I來遲(H到來的時候認為15000ms之前的數(shù)據(jù)已經(jīng)到齊)
        SingleOutputStreamOperator<Tuple2<String, Integer>> source = executionEnvironment.fromElements(
                new Tuple2<>("A", 0),
                new Tuple2<>("B", 1000),
                new Tuple2<>("C", 2000),
                new Tuple2<>("D", 7000),
                new Tuple2<>("E", 3000),
                new Tuple2<>("F", 4000),
                new Tuple2<>("G", 5000),
                new Tuple2<>("H", 20000),
                new Tuple2<>("I", 8000)
        ).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Integer>>forGenerator(new WatermarkGeneratorSupplier<Tuple2<String, Integer>>() {
            // 這里自定義WatermarkGenerator的原因是Flink按照運行時間周期發(fā)送watermark,但我們的例子是單次執(zhí)行的,可以認為數(shù)據(jù)是一瞬間到來
            // 因此我們改寫為每到來一條數(shù)據(jù)發(fā)送一次watermark,watermark的時間戳為數(shù)據(jù)的事件事件減去5000毫秒,意思是最多容忍數(shù)據(jù)來遲5000毫秒
            @Override
            public WatermarkGenerator<Tuple2<String, Integer>> createWatermarkGenerator(Context context) {
                return new WatermarkGenerator<Tuple2<String, Integer>>() {
                    @Override
                    public void onEvent(Tuple2<String, Integer> event, long eventTimestamp, WatermarkOutput output) {
                        long watermark = eventTimestamp - 5000L < 0 ? 0L : eventTimestamp - 5000L;
                        output.emitWatermark(new Watermark(watermark));
                    }

                    @Override
                    public void onPeriodicEmit(WatermarkOutput output) {

                    }
                };
            }
        // 取第二個字段為watermark
        }).withTimestampAssigner((element, timestamp) -> element.f1));

        // 窗口大小5秒,允許延遲5秒
        // watermark和allowedLateness的區(qū)別是,watermark決定了什么時候窗口數(shù)據(jù)觸發(fā)計算,allowedLateness決定什么數(shù)據(jù)被認為是lateElement,從而發(fā)送到sideOutput
        // 設(shè)置side output tag
        source.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))).allowedLateness(Time.seconds(5)).sideOutputLateData(lateTag).process(new ProcessAllWindowFunction<Tuple2<String, Integer>, Object, TimeWindow>() {
            @Override
            public void process(ProcessAllWindowFunction<Tuple2<String, Integer>, Object, TimeWindow>.Context context, Iterable<Tuple2<String, Integer>> elements, Collector<Object> out) throws Exception {
                Iterator<Tuple2<String, Integer>> iterator = elements.iterator();
                System.out.println("--------------------");
                while(iterator.hasNext()) {
                    System.out.println(iterator.next());
                }
            }
        // 打印sideoutput流內(nèi)容
        }).getSideOutput(lateTag).process(new ProcessFunction<Tuple2<String, Integer>, Object>() {
            @Override
            public void processElement(Tuple2<String, Integer> value, ProcessFunction<Tuple2<String, Integer>, Object>.Context ctx, Collector<Object> out) throws Exception {
                System.out.println("Late element: " + value);
            }
        });

        executionEnvironment.execute();
    }
}

從執(zhí)行結(jié)果可以發(fā)現(xiàn)打印出了Late element: (I,8000)。表明I為遲到的元素,它被發(fā)往旁路輸出。

條件分流

除了遲到數(shù)據(jù)輸出之外,用戶還可以根據(jù)自定義條件將數(shù)據(jù)分流。下面的例子根據(jù)一個整數(shù)數(shù)據(jù)流元素的奇偶性,分發(fā)到不同的side output中。代碼如下:

public class SplitDemo {
    public static final OutputTag<Integer> evenTag = new OutputTag<Integer>("even"){};
    public static final OutputTag<Integer> oddTag = new OutputTag<Integer>("odd"){};

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<Integer> source = executionEnvironment.fromElements(1, 2, 3, 4, 5);

        SingleOutputStreamOperator<Integer> process = source.process(new ProcessFunction<Integer, Integer>() {
            @Override
            public void processElement(Integer value, ProcessFunction<Integer, Integer>.Context ctx, Collector<Integer> out) throws Exception {
                if (value % 2 == 0) {
                    // 這里不使用out.collect,而是使用ctx.output
                    // 這個方法多了一個參數(shù),可以指定output tag,從而實現(xiàn)數(shù)據(jù)分流
                    ctx.output(evenTag, value);
                } else {
                    ctx.output(oddTag, value);
                }
            }
        });

        // 依賴OutputTag獲取對應(yīng)的旁路輸出
        DataStream<Integer> evenStream = process.getSideOutput(evenTag);
        DataStream<Integer> oddStream = process.getSideOutput(oddTag);

        // 分別打印兩個旁路輸出流中的數(shù)據(jù)
        evenStream.process(new ProcessFunction<Integer, String>() {
            @Override
            public void processElement(Integer value, ProcessFunction<Integer, String>.Context ctx, Collector<String> out) throws Exception {
                out.collect("Even: " + value);
            }
        }).print();

        oddStream.process(new ProcessFunction<Integer, String>() {
            @Override
            public void processElement(Integer value, ProcessFunction<Integer, String>.Context ctx, Collector<String> out) throws Exception {
                out.collect("Odd: " + value);
            }
        }).print();

        executionEnvironment.execute();
    }
}

條件分流在實際開發(fā)中用途非常廣泛。比如將數(shù)據(jù)分類發(fā)往不同的下游sink。也可以將不符合規(guī)范無法處理的異常數(shù)據(jù)通過旁路輸出收集起來,便于問題的收集和定位。

本博客為作者原創(chuàng),歡迎大家參與討論和批評指正。如需轉(zhuǎn)載請注明出處。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容