Flink中的Time/WaterMark

前言

在Flink中,基于不同的Time Notion來處理流數(shù)據(jù),具有不同的意義和結(jié)果,官網(wǎng)給出的一張圖,非常形象地展示了Process Time、Event Time、Ingestion Time這三個(gè)時(shí)間分別所處的位置,如下圖所示:



下面,分別對(duì)這3個(gè)Time Notion進(jìn)行說明如下:

ProcessTime

Flink中有對(duì)數(shù)據(jù)處理的操作進(jìn)行抽象,稱為Transformation Operator,而對(duì)于整個(gè)Dataflow的開始和結(jié)束分別對(duì)應(yīng)著Source Operator和Sink Operator,這些Operator都是在Flink集群系統(tǒng)所在的主機(jī)節(jié)點(diǎn)上,所以在基于ProcessTime的Notion進(jìn)行與時(shí)間相關(guān)的數(shù)據(jù)處理時(shí),數(shù)據(jù)處理依賴于Flink程序運(yùn)行所在的主機(jī)節(jié)點(diǎn)系統(tǒng)時(shí)鐘(System Clock)。
因?yàn)槲覀冴P(guān)心的是數(shù)據(jù)處理時(shí)間(Process Time),比如進(jìn)行Time Window操作,對(duì)Window的指派就是基于當(dāng)前Operator所在主機(jī)節(jié)點(diǎn)的系統(tǒng)時(shí)鐘。也就是說,每次創(chuàng)建一個(gè)Window,計(jì)算Window對(duì)應(yīng)的起始時(shí)間和結(jié)束時(shí)間都使用Process Time,它與外部進(jìn)入的數(shù)據(jù)元素的事件時(shí)間無關(guān)。那么,后續(xù)作用于Window的操作(Function)都是基于具有Process Time特性的Window進(jìn)行的。
使用ProcessTime的場(chǎng)景,比如,我們需要對(duì)某個(gè)App應(yīng)用的用戶行為進(jìn)行實(shí)時(shí)統(tǒng)計(jì)分析與監(jiān)控,由于用戶可能使用不同的終端設(shè)備,這樣可能會(huì)造成數(shù)據(jù)并非是實(shí)時(shí)的(如用戶手機(jī)沒電,導(dǎo)致2小時(shí)以后才會(huì)將操作行為記錄批量上傳上來)。而此時(shí),如果我們按照每分鐘的時(shí)間粒度做實(shí)時(shí)統(tǒng)計(jì)監(jiān)控,那么這些數(shù)據(jù)記錄延遲的太嚴(yán)重,如果為了等到這些記錄上傳上來(無法預(yù)測(cè),具體什么時(shí)間能獲取到這些數(shù)據(jù))再做統(tǒng)計(jì)分析,對(duì)每分鐘之內(nèi)的數(shù)據(jù)進(jìn)行統(tǒng)計(jì)分析的結(jié)果恐怕要到幾個(gè)小時(shí)甚至幾天后才能計(jì)算并輸出結(jié)果,這不是我們所希望的。而且,數(shù)據(jù)處理系統(tǒng)可能也沒有這么大的容量來處理海量數(shù)據(jù)的情況。結(jié)合業(yè)務(wù)需求,其實(shí)我們只需要每分鐘時(shí)間內(nèi)進(jìn)入的數(shù)據(jù)記錄,依賴當(dāng)前數(shù)據(jù)處理系統(tǒng)的處理時(shí)間(Process Time)生成每分鐘的Window,指派數(shù)據(jù)記錄到指定Window并計(jì)算結(jié)果,這樣就不用考慮數(shù)據(jù)元素本身自帶的事件時(shí)間了。

EventTime

流數(shù)據(jù)中的數(shù)據(jù)元素可能會(huì)具有不變的事件時(shí)間(Event Time)屬性,該事件時(shí)間是數(shù)據(jù)元素所代表的行為發(fā)生時(shí)就不會(huì)改變。最簡(jiǎn)單的情況下,這也最容易理解:所有進(jìn)入到Flink處理系統(tǒng)的流數(shù)據(jù),都是在外部的其它系統(tǒng)中產(chǎn)生的,它們產(chǎn)生后具有了事件時(shí)間,經(jīng)過傳輸后,進(jìn)入到Flink處理系統(tǒng),理論上(如果所有系統(tǒng)都具有相同系統(tǒng)時(shí)鐘)該事件時(shí)間對(duì)應(yīng)的時(shí)間戳要早于進(jìn)入到Flink處理系統(tǒng)中進(jìn)行處理的時(shí)間戳,但實(shí)際應(yīng)用中會(huì)出現(xiàn)數(shù)據(jù)記錄亂序、延遲到達(dá)等問題,這也是非常普遍的。
基于EventTime的Notion,處理數(shù)據(jù)的進(jìn)度(Progress)依賴于數(shù)據(jù)本身,而不是當(dāng)前Flink處理系統(tǒng)中Operator所在主機(jī)節(jié)點(diǎn)的系統(tǒng)時(shí)鐘。所以,需要有一種機(jī)制能夠控制數(shù)據(jù)處理的進(jìn)度,比如一個(gè)基于事件時(shí)間的Time Window創(chuàng)建后,具體怎么確定屬于該Window的數(shù)據(jù)元素都已經(jīng)到達(dá)?如果確定都到達(dá)了,然后就可以對(duì)屬于這個(gè)Window的所有數(shù)據(jù)元素做滿足需要的處理(如匯總、分組等)。這就要用到WaterMark機(jī)制,它能夠衡量數(shù)據(jù)處理進(jìn)度(表達(dá)數(shù)據(jù)到達(dá)的完整性)。
WaterMark帶有一個(gè)時(shí)間戳,假設(shè)為X,進(jìn)入到數(shù)據(jù)處理系統(tǒng)中的數(shù)據(jù)元素具有事件時(shí)間,記為Y,如果Y<X,則所有的數(shù)據(jù)元素均已到達(dá),可以計(jì)算并輸出結(jié)果。反過來說,可能更容易理解一些:要想觸發(fā)對(duì)當(dāng)前Window中的數(shù)據(jù)元素進(jìn)行計(jì)算,必須保證對(duì)所有進(jìn)入到系統(tǒng)的數(shù)據(jù)元素,其事件時(shí)間Y>=X。如果數(shù)據(jù)元素的事件時(shí)間是有序的,那么當(dāng)出現(xiàn)一個(gè)數(shù)據(jù)元素的事件時(shí)間Y<X,則觸發(fā)對(duì)當(dāng)前Window計(jì)算,并創(chuàng)建另一個(gè)新的Window來指派事件時(shí)間Y<X的數(shù)據(jù)元素到該新的Window中。
可以看到,有了WaterMark機(jī)制,對(duì)基于事件時(shí)間的流數(shù)據(jù)處理會(huì)變得特別靈活,可以根據(jù)實(shí)際業(yè)務(wù)需要選擇各種組件和處理策略。比如,上面我們說到,當(dāng)Y<X則觸發(fā)當(dāng)前Window計(jì)算,記為t1時(shí)刻,如果流數(shù)據(jù)元素是亂序的,經(jīng)過一段時(shí)間,假設(shè)t2時(shí)刻有一個(gè)數(shù)據(jù)元素的事件時(shí)間Y>=X,這時(shí)該怎么辦呢?如果t1時(shí)刻的Window已經(jīng)不存在了,但我們還是希望新出現(xiàn)的亂序數(shù)據(jù)元素加入到t1時(shí)刻Window的計(jì)算中,這時(shí)可以實(shí)現(xiàn)自定義的Trigger來滿足各種業(yè)務(wù)場(chǎng)景的需要。

IngestionTime

IngestionTime是數(shù)據(jù)進(jìn)入到Flink流數(shù)據(jù)處理系統(tǒng)的時(shí)間,該時(shí)間依賴于Source Operator所在主機(jī)節(jié)點(diǎn)的系統(tǒng)時(shí)鐘,會(huì)為到達(dá)的數(shù)據(jù)記錄指派Ingestion Time?;贗ngestionTime的Notion,存在多個(gè)Source Operator的情況下,每個(gè)Source Operator會(huì)使用自己本地系統(tǒng)時(shí)鐘指派Ingestion Time。后續(xù)基于時(shí)間相關(guān)的各種操作,都會(huì)使用數(shù)據(jù)記錄中的Ingestion Time。
與EventTime相比,IngestionTime不能處理亂序、延遲到達(dá)事件的應(yīng)用場(chǎng)景,它也就不用必須指定如何生成WaterMark。

使用EventTime與WaterMark來完成不同類型內(nèi)容操作行為的統(tǒng)計(jì)分析

方法一 調(diào)用assignTimestampsAndWatermarks()進(jìn)行指派

具體操作: TimeWindow的大小設(shè)置為1分鐘(60000ms),允許延遲到達(dá)時(shí)間設(shè)置為50秒(50000ms),并且為了模擬流數(shù)據(jù)元素事件時(shí)間早于當(dāng)前處理系統(tǒng)的系統(tǒng)時(shí)間,設(shè)置延遲時(shí)間為2分鐘(120000ms)。
首先完成操作行為數(shù)據(jù)模擬包括數(shù)據(jù)延遲 亂序等情況: 自定義實(shí)現(xiàn)一個(gè)source:

import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Instant;
import java.util.*;
import java.util.concurrent.TimeUnit;

/**
 * @Auther: dalan
 * @Date: 19-3-21 16:44
 * @Description:
 */
public class StringLineEventSource  extends RichParallelSourceFunction<String> {
    /** logger */
    private static final Logger LOGGER = LoggerFactory.getLogger(StringLineEventSource.class);

    public Long latenessMills;
    private volatile boolean running = true;

    public StringLineEventSource(){super();}
    public StringLineEventSource(Long latenessMills){super(); this.latenessMills = latenessMills;}

    private List<String> channelSet = Arrays.asList("a", "b", "c", "d"); // 操作內(nèi)容: 模擬數(shù)據(jù)
    private  List<String> behaviorTypes = Arrays.asList("INSTALL", "OPEN",
                                                "BROWSE", "CLICK",
                                                "PURCHASE", "CLOSE", "UNINSTALL"); // 操作類型
    private Random rand = new Random(9527); // 通過隨機(jī)函數(shù)生產(chǎn)結(jié)果

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        long numElements = Long.MAX_VALUE;
        long count = 0L;

        while (running && count < numElements){
            String channel = channelSet.get(rand.nextInt(channelSet.size()));
            List<String> event = generateEvent(); // 生產(chǎn)數(shù)據(jù): eventtime使用的當(dāng)前時(shí)間
            LOGGER.info(event.toString());
            String ts = event.get(0);
            String id = event.get(1);
            String behaviorType = event.get(2);

            String result = StringUtils.join(Arrays.asList(ts, channel, id, behaviorType),"\t");
            ctx.collect(result); 

            count += 1;
            TimeUnit.MILLISECONDS.sleep(5L);
        }
    }

    private List<String> generateEvent() {
        Long delayedTimestamp = Instant.ofEpochMilli(System.currentTimeMillis())
                .minusMillis(latenessMills)
                .toEpochMilli(); // 延遲時(shí)間
                 // timestamp, id, behaviorType
        return  Arrays.asList(delayedTimestamp.toString(),
                              UUID.randomUUID().toString(),
                              behaviorTypes.get(rand.nextInt(behaviorTypes.size()))); // <ts,uuid,type>
    }


    @Override
    public void cancel() {
        this.running = false;
    }
}

流數(shù)據(jù)中的數(shù)據(jù)元素為字符串記錄行的格式,包含字段:事件時(shí)間、渠道、用戶編號(hào)、用戶行為類型。在Flink程序中,通過調(diào)用stream: DataStream[T]的assignTimestampsAndWatermarks()進(jìn)行時(shí)間戳的指派,并生成WaterMark。然后,基于Keyed Window生成Tumbling Window(不存在Window重疊)來操作數(shù)據(jù)記錄。最后,將計(jì)算結(jié)果輸出到Kafka中去。


import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.*;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;

/**
 * @Auther: dalan
 * @Date: 19-3-21 16:43
 * @Description:
 */
public class UserDefineWaterMark {
    /** logger */
    private static final Logger LOGGER = LoggerFactory.getLogger(UserDefineWaterMark.class);

    // main
    public static void main(String[] args) throws Exception {
        final ParameterTool params = ParameterTool.fromArgs(args);
        Long sourceLatenessMillis = (Long)params.getLong("source-lateness-millis");
        Long maxLaggedTimeMillis = params.getLong("window-lagged-millis");
        Long windowSizeMillis = params.getLong("window-size-millis");

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 基于event time
        DataStream<String> streams = env.addSource(new StringLineEventSource(sourceLatenessMillis));

        //解析輸入的數(shù)據(jù)
        DataStream<String> inputMap = ((DataStreamSource<String>) streams)
        .setParallelism(1)
        .assignTimestampsAndWatermarks( // 指派時(shí)間戳,并生成WaterMark
                new BoundedOutOfOrdernessTimestampExtractor<String>(Time.milliseconds(maxLaggedTimeMillis)){
                    @Override
                    public long extractTimestamp(String s) {
                        return NumberUtils.toLong(s.split("\t")[0]);
                    }
                })
        .setParallelism(2)
        .map(new MapFunction<String, Tuple2<Tuple2<String,String>, Long>>() {
            @Override
            public Tuple2<Tuple2<String,String>, Long> map(String value) throws Exception {
                String[] arr = value.split("\t");
                String channel = arr[1];
                return new Tuple2<Tuple2<String,String>, Long>(Tuple2.of(channel, arr[3]), 1L);
            }
        })
        .setParallelism(2)
        .keyBy(0)
        .window(TumblingEventTimeWindows.of(Time.milliseconds(windowSizeMillis)))
        .process(new ProcessWindowFunction<Tuple2<Tuple2<String, String>, Long>, Object, Tuple, TimeWindow>() {
            @Override
            public void process(Tuple tuple, Context context, Iterable<Tuple2<Tuple2<String, String>, Long>> iterable, Collector<Object> collector) throws Exception {
                long count = 0;
                Tuple2<String,String> tuple2 = null;
                for (Tuple2<Tuple2<String, String>, Long> in : iterable){
                    tuple2 = in.f0;
                    count++;
                }

                LOGGER.info("window===" + tuple.toString());
                collector.collect(new Tuple6<String, Long, Long,String,String,Long>(tuple2.getField(0).toString(),context.window().getStart(), context.window().getEnd(),tuple.getField(0).toString(),tuple.getField(1).toString(),count));
            }
        })
        .setParallelism(4)
        .map(t -> {
            Tuple6<String, Long, Long,String,String,Long> tt = (Tuple6<String, Long, Long,String,String,Long>)t;
            Long windowStart = tt.f1;
            Long windowEnd = tt.f2;
            String channel = tt.f3;
            String behaviorType = tt.f4;
            Long count = tt.f5;
            return StringUtils.join(Arrays.asList(windowStart, windowEnd, channel, behaviorType, count) ,"\t");
        })
        .setParallelism(3);

        inputMap.addSink((new FlinkKafkaProducer011<String>("localhost:9092,localhost:9092","windowed-result-topic",new SimpleStringSchema())))
                .setParallelism(3);

        //inputMap.print();

        env.execute("EventTime and WaterMark Demo");
    }
}

通過執(zhí)行 mvn clean package -DskipTests 生產(chǎn)jar包,或者直接執(zhí)行main方法,需要添加如下啟動(dòng)參數(shù)

--window-result-topic  windowed-result-topic
--zookeeper.connect  localhost:2181
--bootstrap.servers localhost:9092
--source-lateness-millis 120000 # 源延遲時(shí)間
--window-lagged-millis 50000    # 窗口延遲時(shí)間
--window-size-millis 60000       # 窗口大小

輸出如下內(nèi)容:

1553169300000   1553169360000   b   UNINSTALL   440
1553169300000   1553169360000   c   CLOSE   389
1553169300000   1553169360000   b   CLICK   445
1553169300000   1553169360000   b   OPEN    423
1553169300000   1553169360000   a   OPEN    363
1553169300000   1553169360000   b   CLOSE   431
1553169300000   1553169360000   a   PURCHASE    433
1553169300000   1553169360000   d   PURCHASE    427
1553169300000   1553169360000   a   INSTALL 388
1553169300000   1553169360000   c   BROWSE  381
1553169300000   1553169360000   d   CLOSE   446
1553169300000   1553169360000   c   UNINSTALL   427
1553169300000   1553169360000   a   CLICK   391
1553169300000   1553169360000   b   BROWSE  429
1553169300000   1553169360000   a   UNINSTALL   377
1553169300000   1553169360000   d   INSTALL 397
1553169300000   1553169360000   a   CLOSE   423
1553169300000   1553169360000   c   PURCHASE    437
1553169300000   1553169360000   a   BROWSE  388
1553169300000   1553169360000   d   UNINSTALL   379
1553169300000   1553169360000   d   BROWSE  431
1553169300000   1553169360000   b   INSTALL 434
1553169300000   1553169360000   c   CLICK   415
1553169300000   1553169360000   c   INSTALL 455
1553169300000   1553169360000   d   OPEN    426
1553169300000   1553169360000   c   OPEN    399
1553169300000   1553169360000   d   CLICK   402
1553169300000   1553169360000   b   PURCHASE    442

在上面的代碼實(shí)現(xiàn)中我們直接使用了Flink內(nèi)建實(shí)現(xiàn)的BoundedOutOfOrdernessTimestampExtractor來指派時(shí)間戳和生成WaterMark。從而實(shí)現(xiàn)了從事件記錄中提取時(shí)間戳的邏輯,實(shí)際生成WaterMark的邏輯使用BoundedOutOfOrdernessTimestampExtractor提供的默認(rèn)邏輯.

public abstract class BoundedOutOfOrdernessTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> {
 
    private static final long serialVersionUID = 1L;
    private long currentMaxTimestamp;
    private long lastEmittedWatermark = Long.MIN_VALUE;
    private final long maxOutOfOrderness;
 
    public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
        if (maxOutOfOrderness.toMilliseconds() < 0) {
            throw new RuntimeException("Tried to set the maximum allowed " +
                "lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
        }
        this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
        this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness; // 初始設(shè)置當(dāng)前最大事件時(shí)間戳
    }
 
    public long getMaxOutOfOrdernessInMillis() {
        return maxOutOfOrderness;
    }
 
    public abstract long extractTimestamp(T element);
 
    @Override
    public final Watermark getCurrentWatermark() {
        long potentialWM = currentMaxTimestamp - maxOutOfOrderness; // 當(dāng)前最大事件時(shí)間戳,減去允許最大延遲到達(dá)時(shí)間
        if (potentialWM >= lastEmittedWatermark) { // 檢查上一次emit的WaterMark時(shí)間戳,如果比lastEmittedWatermark大則更新其值
            lastEmittedWatermark = potentialWM;
        }
        return new Watermark(lastEmittedWatermark);
    }
 
    @Override
    public final long extractTimestamp(T element, long previousElementTimestamp) {
        long timestamp = extractTimestamp(element);
        if (timestamp > currentMaxTimestamp) { // 檢查新到達(dá)的數(shù)據(jù)元素的事件時(shí)間,用currentMaxTimestamp記錄下當(dāng)前最大的
            currentMaxTimestamp = timestamp;
        }
        return timestamp;
    }
}

在getCurrentWatermark()和extractTimestamp()方法中,lastEmittedWatermark是WaterMark中的時(shí)間戳,計(jì)算它時(shí),總是根據(jù)當(dāng)前進(jìn)入Flink處理系統(tǒng)的數(shù)據(jù)元素的最大的事件時(shí)間currentMaxTimestamp,然后再減去一個(gè)maxOutOfOrderness(外部配置的支持最大延遲到達(dá)的時(shí)間),也就說,這里面實(shí)現(xiàn)的WaterMark中的時(shí)間戳序列是非嚴(yán)格單調(diào)遞增的。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • apache Flink是一個(gè)面向分布式數(shù)據(jù)流處理和批量數(shù)據(jù)處理的開源計(jì)算平臺(tái),它能夠基于同一個(gè)Flink運(yùn)行時(shí)(...
    生活的探路者閱讀 1,532評(píng)論 3 8
  • 在Flink中,使用event-time模式時(shí),默認(rèn)提供的window有TumblingEventTimeWind...
    耳邊的火閱讀 10,806評(píng)論 3 5
  • Flink總結(jié) Flink簡(jiǎn)介 Apache Flink作為一款高吞吐量、低延遲的針對(duì)流數(shù)據(jù)和批數(shù)據(jù)的分布式實(shí)時(shí)處...
    bigdata_er閱讀 10,758評(píng)論 0 10
  • 背景知識(shí) 低延遲 vs 高吞吐 流處理系統(tǒng)與批處理系統(tǒng)最大不同在于節(jié)點(diǎn)間的數(shù)據(jù)傳輸方式。 對(duì)于一個(gè)流處理系統(tǒng),當(dāng)一...
    云藤閱讀 2,419評(píng)論 0 9
  • `1.class 和 id 的使用場(chǎng)景?class為類選擇器,可以用于標(biāo)識(shí)同一類的多個(gè)元素,而id只能用于標(biāo)識(shí)某個(gè)...
    datagirl閱讀 569評(píng)論 0 0

友情鏈接更多精彩內(nèi)容