時(shí)間語義、Event Time和Watermark機(jī)制原理與實(shí)踐

[TOC]
在流處理中,時(shí)間是一個(gè)非常核心的概念,是整個(gè)系統(tǒng)的基石。比如,我們經(jīng)常會(huì)遇到這樣的需求:給定一個(gè)時(shí)間窗口,比如一個(gè)小時(shí),統(tǒng)計(jì)時(shí)間窗口的內(nèi)數(shù)據(jù)指標(biāo)。那如何界定哪些數(shù)據(jù)將進(jìn)入這個(gè)窗口呢?在窗口的定義之前,首先需要確定一個(gè)應(yīng)用使用什么樣的時(shí)間語義。

本文將介紹Flink的Event Time、Processing Time和Ingestion Time三種時(shí)間語義,接著會(huì)詳細(xì)介紹Event Time和Watermark的工作機(jī)制,以及如何對(duì)數(shù)據(jù)流設(shè)置Event Time并生成Watermark。

Flink的三種時(shí)間語義


image.png

如上圖所示,F(xiàn)link支持三種時(shí)間語義:

Event Time
Event Time指的是數(shù)據(jù)流中每個(gè)元素或者每個(gè)事件自帶的時(shí)間屬性,一般是事件發(fā)生的時(shí)間。由于事件從發(fā)生到進(jìn)入Flink時(shí)間算子之間有很多環(huán)節(jié),一個(gè)較早發(fā)生的事件因?yàn)檠舆t可能較晚到達(dá),因此使用Event Time意味著事件到達(dá)有可能是亂序的。

使用Event Time時(shí),最理想的情況下,我們可以一直等待所有的事件到達(dá)后再進(jìn)行時(shí)間窗口的處理。假設(shè)一個(gè)時(shí)間窗口內(nèi)的所有數(shù)據(jù)都已經(jīng)到達(dá),基于Event Time的流處理會(huì)得到正確且一致的結(jié)果:無論我們是將同一個(gè)程序部署在不同的計(jì)算環(huán)境還是在相同的環(huán)境下多次計(jì)算同一份數(shù)據(jù),都能夠得到同樣的計(jì)算結(jié)果。我們根本不同擔(dān)心亂序到達(dá)的問題。但這只是理想情況,現(xiàn)實(shí)中無法實(shí)現(xiàn),因?yàn)槲覀兗炔恢谰烤挂榷嚅L時(shí)間才能確認(rèn)所有事件都已經(jīng)到達(dá),更不可能無限地一直等待下去。在實(shí)際應(yīng)用中,當(dāng)涉及到對(duì)事件按照時(shí)間窗口進(jìn)行統(tǒng)計(jì)時(shí),F(xiàn)link會(huì)將窗口內(nèi)的事件緩存下來,直到接收到一個(gè)Watermark,以確認(rèn)不會(huì)有更晚數(shù)據(jù)的到達(dá)。Watermark意味著在一個(gè)時(shí)間窗口下,F(xiàn)link會(huì)等待一個(gè)有限的時(shí)間,這在一定程度上降低了計(jì)算結(jié)果的絕對(duì)準(zhǔn)確性,而且增加了系統(tǒng)的延遲。在流處理領(lǐng)域,比起其他幾種時(shí)間語義,使用Event Time的好處是某個(gè)事件的時(shí)間是確定的,這樣能夠保證計(jì)算結(jié)果在一定程度上的可預(yù)測(cè)性。

一個(gè)基于Event Time的Flink程序中必須定義Event Time,以及如何生成Watermark。我們可以使用元素中自帶的時(shí)間,也可以在元素到達(dá)Flink后人為給Event Time賦值。

使用Event Time的優(yōu)勢(shì)是結(jié)果的可預(yù)測(cè)性,缺點(diǎn)是緩存較大,增加了延遲,且調(diào)試和定位問題更復(fù)雜。

Processing Time
對(duì)于某個(gè)算子來說,Processing Time指算子使用當(dāng)前機(jī)器的系統(tǒng)時(shí)鐘來定義時(shí)間。在Processing Time的時(shí)間窗口場(chǎng)景下,無論事件什么時(shí)候發(fā)生,只要該事件在某個(gè)時(shí)間段達(dá)到了某個(gè)算子,就會(huì)被歸結(jié)到該窗口下,不需要Watermark機(jī)制。對(duì)于一個(gè)程序在同一個(gè)計(jì)算環(huán)境來說,每個(gè)算子都有一定的耗時(shí),同一個(gè)事件的Processing Time,第n個(gè)算子和第n+1個(gè)算子不同。如果一個(gè)程序在不同的集群和環(huán)境下執(zhí)行時(shí),限于軟硬件因素,不同環(huán)境下前序算子處理速度不同,對(duì)于下游算子來說,事件的Processing Time也會(huì)不同,不同環(huán)境下時(shí)間窗口的計(jì)算結(jié)果會(huì)發(fā)生變化。因此,Processing Time在時(shí)間窗口下的計(jì)算會(huì)有不確定性。

Processing Time只依賴當(dāng)前執(zhí)行機(jī)器的系統(tǒng)時(shí)鐘,不需要依賴Watermark,無需緩存。Processing Time是實(shí)現(xiàn)起來非常簡(jiǎn)單也是延遲最小的一種時(shí)間語義。

Ingestion Time
Ingestion Time是事件到達(dá)Flink Souce的時(shí)間。從Source到下游各個(gè)算子中間可能有很多計(jì)算環(huán)節(jié),任何一個(gè)算子的處理速度快慢可能影響到下游算子的Processing Time。而Ingestion Time定義的是數(shù)據(jù)流最早進(jìn)入Flink的時(shí)間,因此不會(huì)被算子處理速度影響。

Ingestion Time通常是Event Time和Processing Time之間的一個(gè)折中方案。比起Event Time,Ingestion Time可以不需要設(shè)置復(fù)雜的Watermark,因此也不需要太多緩存,延遲較低。比起Processing Time,Ingestion Time的時(shí)間是Souce賦值的,一個(gè)事件在整個(gè)處理過程從頭至尾都使用這個(gè)時(shí)間,而且后續(xù)算子不受前序算子處理速度的影響,計(jì)算結(jié)果相對(duì)準(zhǔn)確一些,但計(jì)算成本稍高。

設(shè)置時(shí)間語義
在Flink中,我們需要在執(zhí)行環(huán)境層面設(shè)置使用哪種時(shí)間語義。下面的代碼使用Event Time:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

如果想用另外兩種時(shí)間語義,需要替換為:TimeCharacteristic.ProcessingTime和TimeCharacteristic.IngestionTime。

Event Time和Watermark
Flink的三種時(shí)間語義中,Processing Time和Ingestion Time都可以不用設(shè)置Watermark。如果我們要使用Event Time語義,以下兩項(xiàng)配置缺一不可:第一,使用一個(gè)時(shí)間戳為數(shù)據(jù)流中每個(gè)事件的Event Time賦值;第二,生成Watermark。

實(shí)際上,Event Time是每個(gè)事件的元數(shù)據(jù),F(xiàn)link并不知道每個(gè)事件的發(fā)生時(shí)間是什么,我們必須要為每個(gè)事件的Event Time賦值一個(gè)時(shí)間戳。關(guān)于時(shí)間戳,包括Flink在內(nèi)的絕大多數(shù)系統(tǒng)都支持Unix時(shí)間戳系統(tǒng)(Unix time或Unix epoch)。Unix時(shí)間戳系統(tǒng)以1970-01-01 00:00:00.000 為起始點(diǎn),其他時(shí)間記為距離該起始時(shí)間的整數(shù)差值,一般是毫秒(millisecond)精度。

有了Event Time時(shí)間戳,我們還必須生成Watermark。Watermark是Flink插入到數(shù)據(jù)流中的一種特殊的數(shù)據(jù)結(jié)構(gòu),它包含一個(gè)時(shí)間戳,并假設(shè)后續(xù)不會(huì)有小于該時(shí)間戳的數(shù)據(jù)。下圖展示了一個(gè)亂序數(shù)據(jù)流,其中方框是單個(gè)事件,方框中的數(shù)字是其對(duì)應(yīng)的Event Time時(shí)間戳,圓圈為Watermark,圓圈中的數(shù)字為Watermark對(duì)應(yīng)的時(shí)間戳。


image.png
  • Watermark與事件的時(shí)間戳緊密相關(guān)。一個(gè)時(shí)間戳為T的Watermark假設(shè)后續(xù)到達(dá)的事件時(shí)間戳都大于T。
  • 假如Flink算子接收到一個(gè)違背上述規(guī)則的事件,該事件將被認(rèn)定為遲到數(shù)據(jù),如上圖中時(shí)間戳為19的事件比Watermark(20)更晚到達(dá)。Flink提供了一些其他機(jī)制來處理遲到數(shù)據(jù)。
  • Watermark時(shí)間戳必須單調(diào)遞增,以保證時(shí)間不會(huì)倒流。
  • Watermark機(jī)制允許用戶來控制準(zhǔn)確度和延遲。Watermark設(shè)置得與事件時(shí)間戳相距緊湊,會(huì)產(chǎn)生不少遲到數(shù)據(jù),影響計(jì)算結(jié)果的準(zhǔn)確度,整個(gè)應(yīng)用的延遲很低;Watermark設(shè)置得非常寬松,準(zhǔn)確度能夠得到提升,但應(yīng)用的延遲較高,因?yàn)镕link必須等待更長的時(shí)間才進(jìn)行計(jì)算。

Flink的Watermark細(xì)節(jié)介紹

Watermark是什么?從不同的維度可以有不同的理解

  1. 從Watermark的計(jì)算角度看:可以將Watermark理解為一個(gè)函數(shù):
    ,它的輸入是當(dāng)前的系統(tǒng)時(shí)間,輸出是一個(gè)Event Time(一個(gè)時(shí)間戳),而且輸出的這個(gè)時(shí)間戳是嚴(yán)格單調(diào)遞增的。這樣看,Watermark就是一個(gè)函數(shù)。
  2. 從Watermark的具體形式來看:可以將Watermark當(dāng)成一個(gè)個(gè)時(shí)間戳,值就是1中輸出的那個(gè)時(shí)間戳。
  3. 從Watermark流轉(zhuǎn)的角度看:可以將Watermark理解成夾雜在正常流事件中的一個(gè)個(gè)特殊事件。

這3種描述方式,看似不同,實(shí)則一樣,只是從不同的角度去看了而已。不管怎么理解,我們必須知道:流處理系統(tǒng)規(guī)定,如果某個(gè)時(shí)刻Watermark的值為T1,那系統(tǒng)就認(rèn)為凡是早于或等于T1時(shí)間的事件都已經(jīng)收到了。注意,這個(gè)就是Watermark所代表的含義,實(shí)際因?yàn)楝F(xiàn)實(shí)中各種情況,未必能嚴(yán)格做到這樣,但目標(biāo)就是要達(dá)到上面規(guī)定的這樣,或者無限逼近。

Why?
為什么需要Watermark?這個(gè)也有很多種描述方式,往大了說就是提供一種理論上解決分布式系統(tǒng)中消息亂序問題(這是分布式系統(tǒng)中一個(gè)經(jīng)典難題)的方案。說小點(diǎn)就是在有狀態(tài)的流計(jì)算中,當(dāng)我們關(guān)注事件的順序或者完整性時(shí),需要有這么一種機(jī)制能實(shí)現(xiàn)這個(gè)需求。

這里的完整性我舉個(gè)例子解釋一下:比如我們基于事件發(fā)生時(shí)間統(tǒng)計(jì)每5min的用戶PV總量,那比如12:00-12:05這個(gè)5min的統(tǒng)計(jì)該在什么時(shí)間點(diǎn)計(jì)算呢?假設(shè)沒有Watermark這個(gè)概念,你就永遠(yuǎn)不知道什么時(shí)候12:00-12:05區(qū)間的所有事件才全到齊。你不能假定收到12:00-12:05的數(shù)據(jù)就認(rèn)為之前的數(shù)據(jù)已經(jīng)全部來了,因?yàn)閿?shù)據(jù)可能延遲+亂序了。而Watermark就是為了解決這個(gè)問題而提出的,當(dāng)你收到Watermark的值為12:00-12:05的事件時(shí),你就可以認(rèn)為早于這個(gè)時(shí)間的數(shù)據(jù)已經(jīng)都到了,數(shù)據(jù)已經(jīng)完整了,可以進(jìn)行12:00-12:05這個(gè)5min區(qū)間的數(shù)據(jù)計(jì)算了。至于如何保證,這個(gè)是框架要做的事情(當(dāng)然一般需要開發(fā)者參與)。

Where?
哪里需要Watermark?這里我給一個(gè)簡(jiǎn)單粗暴的結(jié)論,當(dāng)同時(shí)滿足下面兩個(gè)條件的時(shí)候才會(huì)需要Watermark:

  • 計(jì)算中使用了時(shí)間相關(guān)的算子(time-based operations),其實(shí)再明確點(diǎn),就是使用了Window的時(shí)候(注:Flink的Global Window除外,這個(gè)Window不是基于時(shí)間的)。
  • 1中使用的時(shí)間相關(guān)的算子選擇使用事件時(shí)間,即Event Time(注:如果是Flink的話,也包含Ingestion Time)。

這里再解釋一下2。前文我們介紹過有兩種時(shí)間Event Time和Processing Time(Flink獨(dú)有的Ingestion Time在Watermark這里可以歸結(jié)為Event Time,后文不再另行說明),時(shí)間相關(guān)的算子選擇時(shí)間時(shí)必然是二選一。并不是選擇Processing Time的時(shí)候就沒有Watermark了,只是這個(gè)時(shí)候Processing Time自身就是一個(gè)完美的Watermark(因?yàn)闀r(shí)間一去不復(fù)返,Processing Time永遠(yuǎn)是單調(diào)遞增的),并不需要產(chǎn)生單獨(dú)的Watermark了。所以在Processing Time里面,你可以認(rèn)為Watermark沒有意義了,所以去掉了,或者認(rèn)為Processing Time自身就是Watermark都行。

實(shí)戰(zhàn)

場(chǎng)景介紹
為了方便說明,我構(gòu)造了一個(gè)簡(jiǎn)單的場(chǎng)景,假設(shè)有一個(gè)設(shè)備產(chǎn)生了一組事件,如下:

{"id":"event1","timestamp":"2020-05-24T12:00:00.000+08:00"}
{"id":"event2","timestamp":"2020-05-24T12:00:01.000+08:00"}
{"id":"event3","timestamp":"2020-05-24T12:00:03.000+08:00"}
{"id":"event4","timestamp":"2020-05-24T12:00:04.000+08:00"}
{"id":"event5","timestamp":"2020-05-24T12:00:05.000+08:00"}
{"id":"event6","timestamp":"2020-05-24T12:00:06.000+08:00"}
{"id":"event7","timestamp":"2020-05-24T12:00:07.000+08:00"}
{"id":"event8","timestamp":"2020-05-24T12:00:08.000+08:00"}
{"id":"event9","timestamp":"2020-05-24T12:00:09.000+08:00"}

一共9個(gè)事件,id是事件名稱,timestamp是設(shè)備端事件真實(shí)產(chǎn)生的時(shí)間。也就是事件真實(shí)產(chǎn)生順序是:

event1, event2, event3, event4, event5, event6, event7, event8, event9

但在傳輸過程中因?yàn)楦鞣N現(xiàn)實(shí)原因亂序了,到Flink這里的時(shí)候,事件順序變成了:

event1, event2, event4, event5, event7, event3, event6, event8, event9

現(xiàn)在我們要做的事情就是計(jì)算每5秒中的事件個(gè)數(shù),以此來判斷事件高峰期。

說明:

  1. 這個(gè)計(jì)算是非常有代表性的,比如電商統(tǒng)計(jì)每小時(shí)的pv就能知道每天用戶高峰期發(fā)生在哪幾個(gè)時(shí)段,這里為了方便說明問題,把問題簡(jiǎn)化了,并且為了快速出結(jié)果,把時(shí)間粒度縮短為5秒鐘。
  2. 計(jì)算時(shí),要想結(jié)果準(zhǔn)確,就不能使用Processing Time,這樣如果數(shù)據(jù)從產(chǎn)生到被處理延遲比較大的話,最終計(jì)算的結(jié)果也會(huì)不準(zhǔn)確。除非這個(gè)延遲可控或者可接受,則可簡(jiǎn)單的使用Processing Time,否則就必須用Event Time進(jìn)行計(jì)算。
Flink提供的Watermark機(jī)制

Flink提供了3種方式來生成Watermark:

  1. 在Source中生成Watermark;
  2. 通過AssignerWithPeriodicWatermarks生成Watermark;
  3. 通過AssignerWithPunctuatedWatermarks生成Watermark;

前面介紹過了Watermark是在使用Event Time的場(chǎng)景下才使用的,所以給事件增加Event Time和生成Watermark是一對(duì)操作,一般都是一起使用的。方式1是直接在Flink的最源頭Source那里就生成了Event Time和Watermark。方式2和方式3則是流處理中的某一步驟(可以理解為一個(gè)特殊點(diǎn)的算子),它的輸入是流,輸出還是流,只不過經(jīng)過這個(gè)流之后事件就會(huì)有Event Timestamp和Watermark了,一般這一步放在Source之后,最晚也要在時(shí)間算子之前,也就是Window之前。而且他兩的優(yōu)先級(jí)高,如果Source中生成了Watermark,后面又使用了方式2或3,則會(huì)覆蓋之前的Event Timestamp和Watermark。

下面我們分別介紹每種方式。

Watermark In Source

package com.niyanchun.watermark;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.joda.time.DateTime;

import java.text.Format;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.List;

/**
 * Assign timestamp and watermark at Source Function Demo.
 *
 * @author NiYanchun
 **/
public class AssignAtSourceDemo {

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

//    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    env.addSource(new CustomSource())
        .timeWindowAll(Time.seconds(5))
        .process(new CustomProcessFunction())
        .print();

    env.execute();
  }


  public static class CustomSource extends RichSourceFunction<JSONObject> {

    @Override
    public void run(SourceContext<JSONObject> ctx) throws Exception {
      System.out.println("event in source:");
      getOutOfOrderEvents().forEach(e -> {
        System.out.println(e);
        long timestampInMills = ((DateTime) e.get("timestamp")).getMillis();
        ctx.collectWithTimestamp(e, timestampInMills);
        ctx.emitWatermark(new Watermark(timestampInMills));
      });

      try {
        Thread.sleep(5000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

    @Override
    public void cancel() {

    }
  }


  /**
   * generate out of order events
   *
   * @return List<JSONObject>
   */
  private static List<JSONObject> getOutOfOrderEvents() {
    // 2020-05-24 12:00:00
    JSONObject event1 = new JSONObject().fluentPut("id", "event1")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 0));
    // 2020-05-24 12:00:01
    JSONObject event2 = new JSONObject().fluentPut("id", "event2")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 1));
    // 2020-05-24 12:00:03
    JSONObject event3 = new JSONObject().fluentPut("id", "event3")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 3));
    // 2020-05-24 12:00:04
    JSONObject event4 = new JSONObject().fluentPut("id", "event4")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 4));
    // 2020-05-24 12:00:05
    JSONObject event5 = new JSONObject().fluentPut("id", "event5")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 5));
    // 2020-05-24 12:00:06
    JSONObject event6 = new JSONObject().fluentPut("id", "event6")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 6));
    // 2020-05-24 12:00:07
    JSONObject event7 = new JSONObject().fluentPut("id", "event7")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 7));
    // 2020-05-24 12:00:08
    JSONObject event8 = new JSONObject().fluentPut("id", "event8")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 8));
    // 2020-05-24 12:00:09
    JSONObject event9 = new JSONObject().fluentPut("id", "event9")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 9));

    // 這里把消息打亂,模擬實(shí)際中的消息亂序
    // 真實(shí)的消息產(chǎn)生順序是(根據(jù)時(shí)間戳):event1, event2, event3, event4, event5, event6, event7, event8, event9
    // 打亂之后的消息順序是:event1, event2, event4, event3, event5, event7, event6, event8, event9
    return Arrays.asList(event1, event2, event4, event5, event7, event3, event6, event8, event9);
  }

  public static class CustomProcessFunction extends ProcessAllWindowFunction<JSONObject, Object, TimeWindow> {

    @Override
    public void process(Context context, Iterable<JSONObject> elements, Collector<Object> out) throws Exception {
      TimeWindow window = context.window();
      Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
      System.out.println(String.format("\nwindow{%s - %s}", sdf.format(window.getStart()), sdf.format(window.getEnd())));

      int count = 0;
      for (JSONObject element : elements) {
        System.out.println(element.getString("id"));
        count++;
      }
      System.out.println("Total:" + count);
    }
  }
}

這里自定義了一個(gè)Source,然后接了一個(gè)Window(timeWindowAll),做了一個(gè)簡(jiǎn)單的處理,最終輸出。這里需要注意一個(gè)點(diǎn):timeWindowAll底層其實(shí)是定義了一個(gè)TumblingWindows,至于使用Processing Time(TumblingProcessingTimeWindows),還是Event Time(TumblingEventTimeWindows)則由env.setStreamTimeCharacteristic來確定的,該選項(xiàng)的默認(rèn)值是TimeCharacteristic.ProcessingTime,即使用Processing Time。

作為演示,修改一下上面代碼,先使用Processing Time,看下結(jié)果:

event in source:
{"id":"event1","timestamp":"2020-05-24T12:00:00.000+08:00"}
{"id":"event2","timestamp":"2020-05-24T12:00:01.000+08:00"}
{"id":"event4","timestamp":"2020-05-24T12:00:04.000+08:00"}
{"id":"event5","timestamp":"2020-05-24T12:00:05.000+08:00"}
{"id":"event7","timestamp":"2020-05-24T12:00:07.000+08:00"}
{"id":"event3","timestamp":"2020-05-24T12:00:03.000+08:00"}
{"id":"event6","timestamp":"2020-05-24T12:00:06.000+08:00"}
{"id":"event8","timestamp":"2020-05-24T12:00:08.000+08:00"}
{"id":"event9","timestamp":"2020-05-24T12:00:09.000+08:00"}

window{2020-05-24 20:12:30 - 2020-05-24 20:12:35}
event1
event2
event4
event5
event7
event3
event6
event8
event9
Total:9

Process finished with exit code 0

可以看到,只有一個(gè)Window,其范圍是window{2020-05-24 20:12:30 - 2020-05-24 20:12:35},即我代碼運(yùn)行的時(shí)間,顯然這樣的統(tǒng)計(jì)結(jié)果是沒有意義的,因?yàn)樗w現(xiàn)不出業(yè)務(wù)真正的高峰期。后面我們只討論使用Event Time的情況。

現(xiàn)在重新改為env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);,然后運(yùn)行:

event in source:
{"id":"event1","timestamp":"2020-05-24T12:00:00.000+08:00"}
{"id":"event2","timestamp":"2020-05-24T12:00:01.000+08:00"}
{"id":"event4","timestamp":"2020-05-24T12:00:04.000+08:00"}
{"id":"event5","timestamp":"2020-05-24T12:00:05.000+08:00"}
{"id":"event7","timestamp":"2020-05-24T12:00:07.000+08:00"}
{"id":"event3","timestamp":"2020-05-24T12:00:03.000+08:00"}
{"id":"event6","timestamp":"2020-05-24T12:00:06.000+08:00"}
{"id":"event8","timestamp":"2020-05-24T12:00:08.000+08:00"}
{"id":"event9","timestamp":"2020-05-24T12:00:09.000+08:00"}

window{2020-05-24 12:00:00 - 2020-05-24 12:00:05}
event1
event2
event4
Total:3

window{2020-05-24 12:00:05 - 2020-05-24 12:00:10}
event5
event7
event6
event8
event9
Total:5

Process finished with exit code 0

我們看下現(xiàn)在的輸出,有兩個(gè)Window:window{2020-05-24 12:00:00 - 2020-05-24 12:00:05}和window{2020-05-24 12:00:05 - 2020-05-24 12:00:10},可以看到就是5秒鐘一個(gè)Window。然后12:00:00-12:00:05這個(gè)Window里面包含了3個(gè)事件:event1,event2,event4;12:00:05-12:00:10這個(gè)Window里面包含了5個(gè)事件:event5、event7、event6、event8、event9。

從這個(gè)結(jié)果看event3丟了,其它數(shù)據(jù)都在,為什么呢?如果我說因?yàn)閑vent3亂序了,排在了后邊,你肯定會(huì)說event6也排到了event7后邊,為什么event6卻沒有丟呢?要解釋清楚這個(gè)問題還需要涉及到觸發(fā)器以及窗口的原理和機(jī)制,為了保證行文的連貫性,這里我先直接給出結(jié)論:因?yàn)榇翱谀J(rèn)的觸發(fā)器實(shí)現(xiàn)機(jī)制是本該在一個(gè)窗口內(nèi)的數(shù)據(jù)亂序了以后,只要在這個(gè)窗口結(jié)束(即被觸發(fā))之前來,那是不影響的,不認(rèn)為是遲到數(shù)據(jù),不會(huì)被丟掉;但如果這個(gè)窗口已經(jīng)結(jié)束了才來,就會(huì)被丟掉了。比如event3本應(yīng)該屬于12:00:00-12:00:05這個(gè)窗口,當(dāng)event5這條數(shù)據(jù)來的時(shí)候,這個(gè)窗口就就認(rèn)為數(shù)據(jù)完整了,于是觸發(fā)計(jì)算,接著就銷毀了。等event3來的時(shí)候已經(jīng)是12:00:05-12:00:10窗口了,所以它直接被丟掉了。也就是在時(shí)間窗口這里,對(duì)于“亂序”的定義不是要求每個(gè)到來事件的時(shí)間戳都嚴(yán)格升序,而是看屬于這個(gè)窗口的事件能否在窗口時(shí)間范圍內(nèi)來,如果能來,就不算亂序,至于在這個(gè)時(shí)間范圍內(nèi)來的先后順序無所謂。這個(gè)其實(shí)也是合理的。

另外還有兩個(gè)細(xì)節(jié)點(diǎn)要注意一下:

  • 當(dāng)Source是有界數(shù)據(jù)時(shí),當(dāng)所有數(shù)據(jù)發(fā)送完畢后,系統(tǒng)會(huì)自動(dòng)發(fā)一個(gè)值為Long.MAX_VALUE的Watermark,表示數(shù)據(jù)發(fā)送完了。
  • Window是一個(gè)左閉右開區(qū)間,比如12:00:00的數(shù)據(jù)屬于12:00:00-12:00:05窗口,而12:00:05的數(shù)據(jù)屬于12:00:05-12:00:10窗口。
AssignerWithPeriodicWatermarks && AssignerWithPunctuatedWatermarks

AssignerWithPriodicWatermarks和AssignerWithPunctuatedWatermarks其實(shí)非常像,哪怕是用法都非常像,他兩個(gè)的主要區(qū)別是Watermark的產(chǎn)生機(jī)制或者時(shí)機(jī):AssignerWithPriodicWatermarks是根據(jù)一個(gè)固定的時(shí)間周期性的產(chǎn)生Watermark,而AssignerWithPunctuatedWatermarks則是由事件驅(qū)動(dòng),然后代碼自己控制何時(shí)以何種方式產(chǎn)生Watermark,比如一個(gè)event就產(chǎn)生一個(gè),還是幾個(gè)event產(chǎn)生一個(gè),或者滿足什么條件時(shí)產(chǎn)生Watermark等,就是用戶可以靈活控制。

package com.niyanchun.watermark;

import com.alibaba.fastjson.JSONObject;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import java.text.Format;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.List;

/**
 * Assign timestamp and watermark at Source Function Demo.
 *
 * @author NiYanchun
 **/
public class AssignerWatermarksDemo {

  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env.setParallelism(1);
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    env.addSource(new CustomSource())
        .assignTimestampsAndWatermarks(new CustomAssignerWithPeriodicWatermarks())
//        .assignTimestampsAndWatermarks(new CustomAssignerWithPunctuatedWatermarks())
        .timeWindowAll(Time.seconds(5))
        .process(new CustomProcessFunction())
        .print();

    env.execute();
  }

  public static class CustomSource extends RichSourceFunction<JSONObject> {

    @Override
    public void run(SourceContext<JSONObject> ctx) throws Exception {
      System.out.println("event in source:");
      getOutOfOrderEvents().forEach(e -> {
        System.out.println(e);
        ctx.collect(e);
      });

      try {
        Thread.sleep(2000);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }

    @Override
    public void cancel() {

    }
  }
  
  /**
   * generate out of order events
   *
   * @return List<JSONObject>
   */
  private static List<JSONObject> getOutOfOrderEvents() {
    // 2020-05-24 12:00:00
    JSONObject event1 = new JSONObject().fluentPut("id", "event1")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 0));
    // 2020-05-24 12:00:01
    JSONObject event2 = new JSONObject().fluentPut("id", "event2")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 1));
    // 2020-05-24 12:00:03
    JSONObject event3 = new JSONObject().fluentPut("id", "event3")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 3));
    // 2020-05-24 12:00:04
    JSONObject event4 = new JSONObject().fluentPut("id", "event4")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 4));
    // 2020-05-24 12:00:05
    JSONObject event5 = new JSONObject().fluentPut("id", "event5")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 5));
    // 2020-05-24 12:00:06
    JSONObject event6 = new JSONObject().fluentPut("id", "event6")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 6));
    // 2020-05-24 12:00:07
    JSONObject event7 = new JSONObject().fluentPut("id", "event7")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 7));
    // 2020-05-24 12:00:08
    JSONObject event8 = new JSONObject().fluentPut("id", "event8")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 8));
    // 2020-05-24 12:00:09
    JSONObject event9 = new JSONObject().fluentPut("id", "event9")
        .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 9));

    // 可以把消息打亂,模擬實(shí)際中的消息亂序。
    // 真實(shí)的消息產(chǎn)生順序是(根據(jù)時(shí)間戳):event1, event2, event3, event4, event5, event6, event7, event8, event9
    // 打亂之后的消息順序是:event1, event2, event4, event3, event5, event7, event6, event8, event9
    return Arrays.asList(event1, event2, event4, event5, event7, event3, event6, event8, event9);
  }

  public static class CustomProcessFunction extends ProcessAllWindowFunction<JSONObject, Object, TimeWindow> {

    @Override
    public void process(Context context, Iterable<JSONObject> elements, Collector<Object> out) throws Exception {
      TimeWindow window = context.window();
      Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
      System.out.println(String.format("\nwindow{%s - %s}", sdf.format(window.getStart()), sdf.format(window.getEnd())));

      int count = 0;
      for (JSONObject element : elements) {
        System.out.println(element.getString("id"));
        count++;
      }
      System.out.println("Total:" + count);
    }
  }

  /**
   * AssignerWithPeriodicWatermarks demo
   */
  public static class CustomAssignerWithPeriodicWatermarks implements AssignerWithPeriodicWatermarks<JSONObject> {

    private long currentTimestamp;

    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
      Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
      System.out.println(String.format("invoke getCurrentWatermark at %s and watermark is: %s",
          System.currentTimeMillis(), sdf.format(currentTimestamp)));
      return new Watermark(currentTimestamp);
    }

    @Override
    public long extractTimestamp(JSONObject element, long previousElementTimestamp) {
      long timestamp = ((DateTime) element.get("timestamp")).getMillis();
      Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
      System.out.println("invoke extractTimestamp: " + sdf.format(timestamp));
      currentTimestamp = timestamp;
      return timestamp;
    }
  }

  /**
   * AssignerWithPunctuatedWatermarks demo.
   */
  public static class CustomAssignerWithPunctuatedWatermarks implements AssignerWithPunctuatedWatermarks<JSONObject> {

    private long currentTimestamp;

    @Nullable
    @Override
    public Watermark checkAndGetNextWatermark(JSONObject lastElement, long extractedTimestamp) {
      Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
      System.out.println(String.format("invoke getCurrentWatermark at %s and watermark is: %s",
          System.currentTimeMillis(), sdf.format(currentTimestamp)));
      return new Watermark(currentTimestamp);
    }

    @Override
    public long extractTimestamp(JSONObject element, long previousElementTimestamp) {
      long timestamp = ((DateTime) element.get("timestamp")).getMillis();
      Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
      System.out.println("invoke extractTimestamp: " + sdf.format(timestamp));
      currentTimestamp = timestamp;
      return timestamp;
    }
  }
}

先分別看下AssignerWithPriodicWatermarks和AssignerWithPunctuatedWatermarks部分吧:

  /**
   * AssignerWithPeriodicWatermarks demo
   */
  public static class CustomAssignerWithPeriodicWatermarks implements AssignerWithPeriodicWatermarks<JSONObject> {

    private long currentTimestamp;

    @Nullable
    @Override
    public Watermark getCurrentWatermark() {
      // 省略一些邏輯
      return new Watermark(currentTimestamp);
    }

    @Override
    public long extractTimestamp(JSONObject element, long previousElementTimestamp) {
      // 省略一些邏輯
      return timestamp;
    }
  }

  /**
   * AssignerWithPunctuatedWatermarks demo.
   */
  public static class CustomAssignerWithPunctuatedWatermarks implements AssignerWithPunctuatedWatermarks<JSONObject> {

    private long currentTimestamp;

    @Nullable
    @Override
    public Watermark checkAndGetNextWatermark(JSONObject lastElement, long extractedTimestamp) {
      // 省略一些邏輯
      return new Watermark(currentTimestamp);
    }

    @Override
    public long extractTimestamp(JSONObject element, long previousElementTimestamp) {
      // 省略一些邏輯
      return timestamp;
    }
  }

為了突出重點(diǎn),我刪掉了具體實(shí)現(xiàn)??梢钥吹竭@兩個(gè)類都有一個(gè)extractTimestamp方法,這個(gè)方法每個(gè)Event都會(huì)調(diào)用,作用就是給Event賦一個(gè)Event Time。另外一個(gè)方法稍微有點(diǎn)差異,AssignerWithPeriodicWatermarks的方法叫g(shù)etCurrentWatermark(),而AssignerWithPunctuatedWatermarks的方法是checkAndGetNextWatermark(JSONObject lastElement, long extractedTimestamp),它們的主要區(qū)別是方法的調(diào)用機(jī)制:

  • getCurrentWatermark()沒有參數(shù),它是框架根據(jù)用戶設(shè)置的固定時(shí)間周期性的調(diào)用。這個(gè)固定的時(shí)間可以通過以下方式設(shè)置:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

ExecutionConfig executionConfig = env.getConfig();
executionConfig.setAutoWatermarkInterval(500);

上面的代碼設(shè)置每500毫秒調(diào)用一次getCurrentWatermark(),即每500毫秒產(chǎn)生一個(gè)Watermark。不顯式的設(shè)置的話,默認(rèn)值是0,但實(shí)際效果是每200ms調(diào)用一次。

  • checkAndGetNextWatermark(JSONObject lastElement, long extractedTimestamp)有兩個(gè)參數(shù):一個(gè)是event,一個(gè)是extractTimestamp方法返回的時(shí)間戳。這個(gè)方法被調(diào)用的時(shí)間點(diǎn)是:每個(gè)事件來了先調(diào)用extractTimestamp,然后馬上調(diào)用checkAndGetNextWatermark。在checkAndGetNextWatermark中你可以通過返回值控制是否產(chǎn)生新的Watermark,如果你不想返回新的Watermark,可以返回null或者一個(gè)小于等于上一個(gè)Watermark的時(shí)間戳,這樣就相當(dāng)于本次不返回Watermark或者返回的Watermark不是遞增的被丟棄了,繼續(xù)使用原來的Watermark。因?yàn)閃atermark不能為null,且必須單調(diào)遞增。

AssignerWithPriodicWatermarks和AssignerWithPunctuatedWatermarks的區(qū)別就這些,最佳實(shí)踐的話我個(gè)人覺得優(yōu)先考慮AssignerWithPriodicWatermarks,如果不能滿足需求,再考慮AssignerWithPunctuatedWatermarks。一方面是前者簡(jiǎn)單一些,另一方面是一般沒有必要每個(gè)事件就計(jì)算一個(gè)Watermark,這樣會(huì)增加不是很有必要的計(jì)算量。

遲到數(shù)據(jù)

從上面的部分看到event3因?yàn)檫t到被默默的丟掉了,現(xiàn)實(shí)中數(shù)據(jù)是重要資產(chǎn),肯定是不能隨便丟棄的。Flink提供了兩種解決方案:

  1. 允許一定的延遲。這個(gè)延遲可以在兩個(gè)地方設(shè)置:第一種是可以在上面的AssignerWithXXXWatermarks方法里面給計(jì)算出的時(shí)間戳減去一個(gè)時(shí)間,這個(gè)時(shí)間就是你允許延遲的時(shí)間。第二種就是在時(shí)間窗口那里可以通過allowedLateness來設(shè)置一個(gè)允許的延遲時(shí)間,

但允許一定延遲的方式只能治標(biāo),不能治本。我們只能根據(jù)實(shí)際情況允許一定限度的延遲,但總歸是有個(gè)限度的,原因主要有兩個(gè):1)延遲太高會(huì)喪失實(shí)時(shí)性,如果你的場(chǎng)景對(duì)實(shí)時(shí)性要求比較高,那就無法設(shè)置太大的延遲。2)延遲實(shí)際是延長了窗口的生命周期,所以資源消耗會(huì)增加。

  1. 在Window那里通過sideOutputLateData將遲到的數(shù)據(jù)以流的形式旁路出去。這個(gè)是治本的手段,它沒有時(shí)間的限制,如果有遲到數(shù)據(jù),就會(huì)發(fā)送到這個(gè)單獨(dú)的流里面去,然后可以為這個(gè)流單獨(dú)設(shè)置處理方式。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容