flink watermark的原理和實(shí)踐

1. Watermark概念

watermark是一種衡量Event Time進(jìn)展的機(jī)制,它是數(shù)據(jù)本身的一個(gè)隱藏屬性。通常基于Event Time的數(shù)據(jù),自身都包含一個(gè)timestamp.watermark是用于處理亂序事件的,而正確的處理亂序事件,通常用watermark機(jī)制結(jié)合window來(lái)實(shí)現(xiàn)。我們知道,流處理從事件產(chǎn)生,到流經(jīng)source,再到operator,中間是有一個(gè)過程和時(shí)間的。雖然大部分情況下,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時(shí)間順序來(lái)的,但是也不排除由于網(wǎng)絡(luò)、背壓等原因,導(dǎo)致亂序的產(chǎn)生(out-of-order或者說(shuō)late element)。但是對(duì)于late element,我們又不能無(wú)限期的等下去,必須要有個(gè)機(jī)制來(lái)保證一個(gè)特定的時(shí)間后,必須觸發(fā)window去進(jìn)行計(jì)算了。此時(shí)就是watermark發(fā)揮作用了,它表示當(dāng)達(dá)到watermark到達(dá)之后,在watermark之前的數(shù)據(jù)已經(jīng)全部達(dá)到(即使后面還有延遲的數(shù)據(jù)).watermark的示意圖如下.


image.png
2. 生成EventTime和Watermark

以下這個(gè)程序的功能是實(shí)現(xiàn)計(jì)算相同時(shí)間窗口出現(xiàn)相同單詞的統(tǒng)計(jì).在這個(gè)過程中,自定義實(shí)現(xiàn)了時(shí)間戳和Watermark.

public class DataStreamDemo {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);  //設(shè)置時(shí)間分配器

        env.setParallelism(1);  //設(shè)置并行度
        env.getConfig().setAutoWatermarkInterval(9000);//每9秒發(fā)出一個(gè)watermark

        DataStream<String> text = env.socketTextStream("localhost", 9900);

        DataStream<Tuple3<String, Long, Integer>> counts = text.filter(new FilterClass()).map(new LineSplitter())
                .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple3<String, Long, Integer>>() {

                    private long currentMaxTimestamp = 0l;

                    private final long maxOutOfOrderness = 10000l;   //這個(gè)控制失序已經(jīng)延遲的度量
                    //獲取EventTime
                    @Override
                    public long extractTimestamp(Tuple3<String, Long, Integer> element, long previousElementTimestamp) {
                        long timestamp = element.f1;
                        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
                        System.out.println(
                                "get timestamp is " + timestamp + " currentMaxTimestamp " + currentMaxTimestamp);
                        return timestamp;
                    }
                    //獲取Watermark
                    @Override
                    public Watermark getCurrentWatermark() {
                        System.out.println("wall clock is " + System.currentTimeMillis() + " new watermark "
                                + (currentMaxTimestamp - maxOutOfOrderness));
                        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
                    }
                }).keyBy(0).timeWindow(Time.seconds(20))
                // .allowedLateness(Time.seconds(10))   
                .sum(2);

        counts.print();
        env.execute("Window WordCount");

    }

//    自定義獲取timesStamp
//    private static class MyTimestamp extends AscendingTimestampExtractor<Tuple3<String, Long, Integer>> {
//
//        private static final long serialVersionUID = 1L;
//
//        public long extractAscendingTimestamp(Tuple3<String, Long, Integer> element) {
//
//            return element.f1;
//        }
//
//    }

    //構(gòu)造出element以及它的event time.然后把次數(shù)賦值為1
    public static final class LineSplitter implements MapFunction<String, Tuple3<String, Long, Integer>> {

        @Override
        public Tuple3<String, Long, Integer> map(String value) throws Exception {
            // TODO Auto-generated method stub
            String[] tokens = value.toLowerCase().split("\\W+");

            long eventtime = Long.parseLong(tokens[1]);

            return new Tuple3<String, Long, Integer>(tokens[0], eventtime, 1);
        }
    }

    //過濾掉為null和whitespace的字符串
    public static final class FilterClass implements FilterFunction<String> {

        @Override
        public boolean filter(String value) throws Exception {

            if (StringUtils.isNullOrWhitespaceOnly(value)) {
                return false;
            } else {
                return true;
            }
        }

    }
}
  • maxOutOfOrderness 這個(gè)參數(shù)在設(shè)置的時(shí)候往往根據(jù)經(jīng)驗(yàn)來(lái).MaxOutOfOrderness設(shè)置的太小,而自身數(shù)據(jù)發(fā)送時(shí)由于網(wǎng)絡(luò)等原因?qū)е聛y序或者late太多,那么最終的結(jié)果就是會(huì)有很多單條的數(shù)據(jù)在window中被觸發(fā),數(shù)據(jù)的正確性影響太大。如果設(shè)置太大,導(dǎo)致設(shè)置的Watermark太小,使得Watermark沒有用,因?yàn)樵驹诤芏痰臅r(shí)間內(nèi),一個(gè)窗口的所有的數(shù)據(jù)都到達(dá)了,但是不得不等Watermark一點(diǎn)點(diǎn)變大, 才能觸發(fā)計(jì)算.
3. EventTime按順序的情況

運(yùn)行上述程序.初始化的時(shí)候,由于沒有輸入,watermark為-10000


image.png

在9900監(jiān)聽端口,輸入aa 1522827199000(2018-04-04 15:33:19),重復(fù)3次.得到如下圖所示結(jié)果.現(xiàn)在的watermark是1522827189000(2018-04-04 15:33:09),即為最大的currentMaxTimestamp-10000.在這里生命下,aa 的時(shí)間2018-04-04 15:33:19在2018-04-04 15:33:0-2018-04-04 15:33:20這個(gè)窗口中.因?yàn)槲以O(shè)置的窗口為20s.不管怎樣怎樣,窗口是確定的.初始化設(shè)置后,就一直不會(huì)變.而且窗口是左閉右開的區(qū)間.


image.png

接著輸入bb 1522827299000(2018-04-04 15:34:59),此時(shí)的watermark為1522827289000(2018-04-04 15:34:49),此時(shí)的watermark超過了aa所在窗口的endtime(2018-04-04 15:33:20).那么會(huì)觸發(fā)計(jì)算,從而會(huì)有下面的輸出.在這里強(qiáng)調(diào)下,觸發(fā)計(jì)算的時(shí)間點(diǎn)是

  • watermark超過了window的endtime.
  • 在該window中有數(shù)據(jù).

只有同時(shí)滿足這兩個(gè)條件,就會(huì)觸發(fā)計(jì)算.


image.png
4. EventTime不按順序的情況

輸入如下數(shù)據(jù):

aa 1522827261000(2018-04-04 15:34:21)
aa 1522827251000(2018-04-04 15:34:11)
aa 1522827252000(2018-04-04 15:34:12)
ee 1522827291000(2018-04-04 15:34:51)

由于最后一個(gè)ee的輸入,改變了watermark,使得當(dāng)前的時(shí)間戳為2018-04-04 15:34:41.那么此時(shí)會(huì)觸發(fā)前面兩個(gè)窗口的計(jì)算.計(jì)算結(jié)果如下.


image.png

Note:此時(shí),如果還輸入之前的時(shí)間窗口的aa 1522827261000.是不會(huì)觸發(fā)計(jì)算的,它會(huì)丟棄掉數(shù)據(jù),這在后面會(huì)引入allowedLateness參數(shù).原窗口中的內(nèi)容不會(huì)立即被刪除,而是會(huì)再次等待一段時(shí)間,即watermark小于end-time + allowedLateness時(shí),后續(xù)的該窗口的數(shù)據(jù)到達(dá)時(shí)會(huì)納入到原窗口,再次觸發(fā)計(jì)算.而watermark >= end_time + allowedLateness,后續(xù)的還有屬于該窗口的數(shù)據(jù)到達(dá)時(shí),那么這種數(shù)據(jù)只能被刪除了,因?yàn)橄到y(tǒng)不會(huì)無(wú)限制的等下去,這既會(huì)增加window buffer的大小,也會(huì)引起不必要的性能下降.

5. 總結(jié)

通過以上例子,我們可以看到如何通過watermark對(duì)順序以及亂序數(shù)據(jù)的處理.以及如何在watermark觸發(fā)之后還能通過 allowedLateness對(duì)延遲做一些補(bǔ)償.watermark的設(shè)計(jì)思想和用途很光,我也只是淺嘗輒止.

參考文章:
flink watermark介紹
Flink流計(jì)算編程--watermark(水位線)簡(jiǎn)介

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容