Flink--EventTime中WaterMark知識點掃盲

  • 基于flink-1.8.1
  • 基于flink官網(wǎng)

概述

  • 實時計算中,數(shù)據(jù)時間比較敏感。有eventTime和processTime區(qū)分,一般來說eventTime是從原始的消息中提取過來的,processTime是Flink自己提供的,F(xiàn)link中一個亮點就是可以基于eventTime計算,這個功能很有用,因為實時數(shù)據(jù)可能會經(jīng)過比較長的鏈路,多少會有延時,并且有很大的不確定性,對于一些需要精確體現(xiàn)事件變化趨勢的場景中,單純使用processTime顯然是不合理的。


    Flink-eventtime.png

Flink WaterMark介紹

  • watermark是一種衡量Event Time進(jìn)展的機(jī)制,它是數(shù)據(jù)本身的一個隱藏屬性。通?;贓vent Time的數(shù)據(jù),自身都包含一個timestamp.watermark是用于處理亂序事件的,而正確的處理亂序事件,通常用watermark機(jī)制結(jié)合window來實現(xiàn)。
  • 流處理從事件產(chǎn)生,到流經(jīng)source,再到operator,中間是有一個過程和時間的。雖然大部分情況下,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時間順序來的,但是也不排除由于網(wǎng)絡(luò)、背壓等原因,導(dǎo)致亂序的產(chǎn)生(out-of-order或者說late element)。
  • 但是對于late element,我們又不能無限期的等下去,必須要有個機(jī)制來保證一個特定的時間后,必須觸發(fā)window去進(jìn)行計算了。這個特別的機(jī)制,就是watermark。

并行流中的watermark

  • 在source functions處或者之后生成watermark,source functions的parallel subtask 通常會獨立的生成watermark;這些watermarks定義了該特定parallel source的事件事件;
  • 對于流式處理中的,當(dāng)watermarks達(dá)到某個算子時,watermark會將event time提前。每當(dāng)算子將流中的event time提前,這個算子都會為下游算子生成新的watermark;
  • 一些算子會有多個source stream, 例如,一個union,或者跟隨keyBy(...)或partition(...)函數(shù)的算子。當(dāng)前輸入stream中的event time是多個source stream中的event time的最小值;(Such an operator’s current event time is the minimum of its input streams’ event times);由于input stream更新了event time,算子同樣會更新event time。如下圖所示:
    Flink-eventtime.png

生成 Timestamps / Watermarks

分配Timestamps

  • 為了處理事件時間,F(xiàn)link需要知道事件的時間戳,這意味著流中的每個元素都需要分配其事件時間戳。 這通常通過從元素中的某個字段訪問/提取時間戳來完成。
  • 時間戳分配與生成watermark密切相關(guān),watermark告訴系統(tǒng)事件時間的進(jìn)展。
  • 有兩種方法可以分配時間戳并生成水印:
    • 直接在數(shù)據(jù)流源中;
    • 通過時間戳分配器/watermatk生成器:在Flink中,時間戳分配器還定義要發(fā)出的watermark;
  • Attention:Both timestamps and watermarks are specified as milliseconds since the Java epoch of 1970-01-01T00:00:00Z.

Source Functions with Timestamps and Watermarks

  • source functions可以直接為它們生成的元素分配時間戳,它們也可以發(fā)出watermark。當(dāng)完成此操作時,不需要再使用時間戳分配器。請注意,如果使用時間戳分配器,則將覆蓋源提供的任何時間戳和水印。
  • 要直接為source中的元素分配時間戳,源必須在SourceContext上使用collectWithTimestamp(...)方法。 要生成水印,源必須調(diào)用emitWatermark(水?。┕δ?。
  • 語法:
    java
@Override
public void run(SourceContext<MyType> ctx) throws Exception {
    while (/* condition */) {
        MyType next = getNext();
        ctx.collectWithTimestamp(next, next.getEventTimestamp());

        if (next.hasWatermarkTime()) {
            ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
        }
    }
}

scala

override def run(ctx: SourceContext[MyType]): Unit = {
    while (/* condition */) {
        val next: MyType = getNext()
        ctx.collectWithTimestamp(next, next.eventTimestamp)

        if (next.hasWatermarkTime) {
            ctx.emitWatermark(new Watermark(next.getWatermarkTime))
        }
    }
}

Timestamp Assigners / Watermark Generators

  • Timestamp Assigners獲取stream并生成帶有帶時間戳元素和水印的新流。 如果原始stream中已經(jīng)有時間戳和/或水印,則時間戳分配器會覆蓋它們。
  • 時間戳分配器通常在數(shù)據(jù)源之后立即指定,但并非嚴(yán)格要求這樣做。 例如,常見的模式是在時間戳分配器之前解析(MapFunction)和過濾(FilterFunction)。 在任何情況下,需要在事件時間的第一個操作之前指定時間戳分配器(例如第一個窗口操作)。 作為一種特殊情況,當(dāng)使用Kafka作為流式傳輸作業(yè)的源時,F(xiàn)link允許在源(或消費者)本身內(nèi)指定時間戳分配器/水印發(fā)射器。 有關(guān)如何執(zhí)行此操作的更多信息,請參閱Kafka Connector文檔。
  • java
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

DataStream<MyEvent> stream = env.readFile(
        myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
        FilePathFilter.createDefaultFilter(), typeInfo);

DataStream<MyEvent> withTimestampsAndWatermarks = stream
        .filter( event -> event.severity() == WARNING )
        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());

withTimestampsAndWatermarks
        .keyBy( (event) -> event.getGroup() )
        .timeWindow(Time.seconds(10))
        .reduce( (a, b) -> a.add(b) )
        .addSink(...);
  • scala
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val stream: DataStream[MyEvent] = env.readFile(
         myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
         FilePathFilter.createDefaultFilter())

val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
        .filter( _.severity == WARNING )
        .assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())

withTimestampsAndWatermarks
        .keyBy( _.getGroup )
        .timeWindow(Time.seconds(10))
        .reduce( (a, b) => a.add(b) )
        .addSink(...)

With Periodic Watermarks周期性watermark

定時提取watermark,這種方式會定時提取更新wartermark。

  • AssignerWithPeriodicWatermarks定期分配時間戳并生成水印(可能取決于流元素,或純粹基于處理時間)。
  • 生成水印的間隔(每n毫秒)由ExecutionConfig.setAutoWatermarkInterval(...)定義。 每次調(diào)用分配器的getCurrentWatermark()方法,如果返回的水印非空且大于前一個水印,則會發(fā)出新的水印。
  • 這里我們展示了兩個使用周期性水印生成的時間戳分配器的簡單示例。 請注意,F(xiàn)link附帶了一個BoundedOutOfOrdernessTimestampExtractor,類似于下面顯示的BoundedOutOfOrdernessGenerator;
  • java
/**
 * This generator generates watermarks assuming that elements arrive out of order,
 * but only to a certain degree. The latest elements for a certain timestamp t will arrive
 * at most n milliseconds after the earliest elements for timestamp t.
 */
public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {

    private final long maxOutOfOrderness = 3500; // 3.5 seconds

    private long currentMaxTimestamp;

    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        long timestamp = element.getCreationTime();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }

    @Override
    public Watermark getCurrentWatermark() {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}

/**
 * This generator generates watermarks that are lagging behind processing time by a fixed amount.
 * It assumes that elements arrive in Flink after a bounded delay.
 */
public class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {

    private final long maxTimeLag = 5000; // 5 seconds

    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        return element.getCreationTime();
    }

    @Override
    public Watermark getCurrentWatermark() {
        // return the watermark as current time minus the maximum time lag
        return new Watermark(System.currentTimeMillis() - maxTimeLag);
    }
}
  • scala
/**
 * This generator generates watermarks assuming that elements arrive out of order,
 * but only to a certain degree. The latest elements for a certain timestamp t will arrive
 * at most n milliseconds after the earliest elements for timestamp t.
 */
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {

    val maxOutOfOrderness = 3500L // 3.5 seconds

    var currentMaxTimestamp: Long = _

    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        val timestamp = element.getCreationTime()
        currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
        timestamp
    }

    override def getCurrentWatermark(): Watermark = {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        new Watermark(currentMaxTimestamp - maxOutOfOrderness)
    }
}

/**
 * This generator generates watermarks that are lagging behind processing time by a fixed amount.
 * It assumes that elements arrive in Flink after a bounded delay.
 */
class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {

    val maxTimeLag = 5000L // 5 seconds

    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        element.getCreationTime
    }

    override def getCurrentWatermark(): Watermark = {
        // return the watermark as current time minus the maximum time lag
        new Watermark(System.currentTimeMillis() - maxTimeLag)
    }
}

With Punctuated Watermarks

伴隨event的到來就提取watermark,就是每一個event到來的時候,就會提取一次Watermark。這樣的方式當(dāng)然設(shè)置watermark更為精準(zhǔn),但是當(dāng)數(shù)據(jù)量大的時候,頻繁的更新wartermark會比較影響性能。通常情況下采用定時提取就足夠了。

  • 當(dāng)生成watermark的過程中某個event指示器 可能生成新wateramrk,請使用AssignerWithPunctuatedWatermarks。 對于此類,F(xiàn)link將首先調(diào)用extractTimestamp(...)方法為元素分配時間戳,然后立即調(diào)用該元素上的checkAndGetNextWatermark(...)方法。
  • checkAndGetNextWatermark(...)方法傳遞在extractTimestamp(...)方法中分配的時間戳,并可以決定是否要生成watermark。 每當(dāng)checkAndGetNextWatermark(...)方法返回非空watermark,并且該watermark大于最新的先前watermark時,將發(fā)出該新watermark。
  • demo
public class PunctuatedAssigner implements AssignerWithPunctuatedWatermarks<MyEvent> {

    @Override
    public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
        return element.getCreationTime();
    }

    @Override
    public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
        return lastElement.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
    }
}

  • scala
class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {

    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        element.getCreationTime
    }

    override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {
        if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
    }
}

  • 注意:可以在每個事件上生成水印。 然而,因為每個水印在下游引起一些計算,所以過多的水印會降低性能.

Timestamps per Kafka Partition

  • 當(dāng)使用Apache Kafka作為數(shù)據(jù)源時,每個Kafka分區(qū)可能具有簡單的事件時間模式(升序時間戳或有界無序)。但是,當(dāng)從Kafka消費流數(shù)據(jù)時,多個分區(qū)通常并行消費,交錯來自分區(qū)的事件并破壞每個分區(qū)模式(這是Kafka的消費者客戶端工作的固有方式)。
  • 在這種情況下,您可以使用Flink的Kafka分區(qū)感知水印生成。使用該功能,根據(jù)Kafka分區(qū)在Kafka使用者內(nèi)部生成水印,并且每個分區(qū)水印的合并方式與在流shuffle上合并水印的方式相同。
  • 例如,如果事件時間戳嚴(yán)格按每個Kafka分區(qū)升序,則使用升序時間戳水印生成器生成每分區(qū)水印將產(chǎn)生完美的整體水印。
  • 下圖顯示了如何使用per-Kafka分區(qū)水印生成,以及在這種情況下水印如何通過流數(shù)據(jù)流傳播。
  • demo
FlinkKafkaConsumer09<MyType> kafkaSource = new FlinkKafkaConsumer09<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyType>() {

    @Override
    public long extractAscendingTimestamp(MyType element) {
        return element.eventTimestamp();
    }
});

DataStream<MyType> stream = env.addSource(kafkaSource);
  • scala
val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema, props)
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[MyType] {
    def extractAscendingTimestamp(element: MyType): Long = element.eventTimestamp
})

val stream: DataStream[MyType] = env.addSource(kafkaSource)
kafka.png

Pre-defined Timestamp Extractors / Watermark Emitters

  • 這部分比較簡單,是兩個類的講解,詳見官網(wǎng);
  • 建議大家認(rèn)真閱讀一下官網(wǎng);

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容