Flink WaterMark 詳解

背景

image

實(shí)時(shí)計(jì)算中,數(shù)據(jù)時(shí)間比較敏感。有eventTime和processTime區(qū)分,一般來說eventTime是從原始的消息中提取過來的,processTime是Flink自己提供的,F(xiàn)link中一個亮點(diǎn)就是可以基于eventTime計(jì)算,這個功能很有用,因?yàn)閷?shí)時(shí)數(shù)據(jù)可能會經(jīng)過比較長的鏈路,多少會有延時(shí),并且有很大的不確定性,對于一些需要精確體現(xiàn)事件變化趨勢的場景中,單純使用processTime顯然是不合理的。

概念

watermark是一種衡量Event Time進(jìn)展的機(jī)制,它是數(shù)據(jù)本身的一個隱藏屬性。通?;贓vent Time的數(shù)據(jù),自身都包含一個timestamp.watermark是用于處理亂序事件的,而正確的處理亂序事件,通常用watermark機(jī)制結(jié)合window來實(shí)現(xiàn)。

流處理從事件產(chǎn)生,到流經(jīng)source,再到operator,中間是有一個過程和時(shí)間的。雖然大部分情況下,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時(shí)間順序來的,但是也不排除由于網(wǎng)絡(luò)、背壓等原因,導(dǎo)致亂序的產(chǎn)生(out-of-order或者說late element)。

但是對于late element,我們又不能無限期的等下去,必須要有個機(jī)制來保證一個特定的時(shí)間后,必須觸發(fā)window去進(jìn)行計(jì)算了。這個特別的機(jī)制,就是watermark。

window劃分

window的設(shè)定無關(guān)數(shù)據(jù)本身,而是系統(tǒng)定義好了的。
window是flink中劃分?jǐn)?shù)據(jù)一個基本單位,window的劃分方式是固定的,默認(rèn)會根據(jù)自然時(shí)間劃分window,并且劃分方式是前閉后開。

window劃分 w1 w2 w3
3s [00:00:00~00:00:03) [00:00:03~00:00:06) [00:00:06~00:00:09)
5s [00:00:00~00:00:05) [00:00:05~00:00:10) [00:00:10~00:00:15)
10s [00:00:00~00:00:10) [00:00:10~00:00:20) [00:00:20~00:00:30)
1min [00:00:00~00:01:00) [00:01:00~00:02:00) [00:02:00~00:03:00)

示例

如果設(shè)置最大允許的亂序時(shí)間是10s,滾動時(shí)間窗口為5s

{"datetime":"2019-03-26 16:25:24","name":"zhangsan"}
//currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:24],currentMaxTimestamp:[2019-03-26 16:25:24],watermark:[2019-03-26 16:25:14]

觸達(dá)改記錄的時(shí)間窗口應(yīng)該為2019-03-26 16:25:20~2019-03-26 16:25:25
即當(dāng)有數(shù)據(jù)eventTime >= 2019-03-26 16:25:35 時(shí)

{"datetime":"2019-03-26 16:25:35","name":"zhangsan"}
//currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:35],currentMaxTimestamp:[2019-03-26 16:25:35],watermark:[2019-03-26 16:25:25]
//(zhangsan,1,2019-03-26 16:25:24,2019-03-26 16:25:24,2019-03-26 16:25:20,2019-03-26 16:25:25)

后面會詳細(xì)講解。_

提取watermark

watermark的提取工作在taskManager中完成,意味著這項(xiàng)工作是并行進(jìn)行的的,而watermark是一個全局的概念,就是一個整個Flink作業(yè)之后一個warkermark。

AssignerWithPeriodicWatermarks

定時(shí)提取watermark,這種方式會定時(shí)提取更新wartermark。

//默認(rèn)200ms
public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
    this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
    if (characteristic == TimeCharacteristic.ProcessingTime) {
        getConfig().setAutoWatermarkInterval(0);
    } else {
        getConfig().setAutoWatermarkInterval(200);
    }
}

AssignerWithPunctuatedWatermarks

伴隨event的到來就提取watermark,就是每一個event到來的時(shí)候,就會提取一次Watermark。
這樣的方式當(dāng)然設(shè)置watermark更為精準(zhǔn),但是當(dāng)數(shù)據(jù)量大的時(shí)候,頻繁的更新wartermark會比較影響性能。
通常情況下采用定時(shí)提取就足夠了。

使用

設(shè)置數(shù)據(jù)流時(shí)間特征

//設(shè)置為事件時(shí)間
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

默認(rèn)為TimeCharacteristic.ProcessingTime,默認(rèn)水位線更新每隔200ms

入口文件

val env = StreamExecutionEnvironment.getExecutionEnvironment

//便于測試,并行度設(shè)置為1
env.setParallelism(1)

//env.getConfig.setAutoWatermarkInterval(9000)

//設(shè)置為事件時(shí)間
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

//設(shè)置source 本地socket
val text: DataStream[String] = env.socketTextStream("localhost", 9000)


val lateText = new OutputTag[(String, String, Long, Long)]("late_data")

val value = text.filter(new MyFilterNullOrWhitespace)
.flatMap(new MyFlatMap)
.assignTimestampsAndWatermarks(new MyWaterMark)
.map(x => (x.name, x.datetime, x.timestamp, 1L))
.keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sideOutputLateData(lateText)
//.sum(2)
.apply(new MyWindow)
//.window(TumblingEventTimeWindows.of(Time.seconds(3)))
//.apply(new MyWindow)
value.getSideOutput(lateText).map(x => {
"延遲數(shù)據(jù)|name:" + x._1 + "|datetime:" + x._2
}).print()

value.print()

env.execute("watermark test")

class MyWaterMark extends AssignerWithPeriodicWatermarks[EventObj] {

  val maxOutOfOrderness = 10000L // 3.0 seconds
  var currentMaxTimestamp = 0L

  val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

  /**
    * 用于生成新的水位線,新的水位線只有大于當(dāng)前水位線才是有效的
    *
    * 通過生成水印的間隔(每n毫秒)定義 ExecutionConfig.setAutoWatermarkInterval(...)。
    * getCurrentWatermark()每次調(diào)用分配器的方法,如果返回的水印非空并且大于先前的水印,則將發(fā)出新的水印。
    *
    * @return
    */
  override def getCurrentWatermark: Watermark = {
    new Watermark(this.currentMaxTimestamp - this.maxOutOfOrderness)
  }

  /**
    * 用于從消息中提取事件時(shí)間
    *
    * @param element                  EventObj
    * @param previousElementTimestamp Long
    * @return
    */
  override def extractTimestamp(element: EventObj, previousElementTimestamp: Long): Long = {

    currentMaxTimestamp = Math.max(element.timestamp, currentMaxTimestamp)

    val id = Thread.currentThread().getId
    println("currentThreadId:" + id + ",key:" + element.name + ",eventTime:[" + element.datetime + "],currentMaxTimestamp:[" + sdf.format(currentMaxTimestamp) + "],watermark:[" + sdf.format(getCurrentWatermark().getTimestamp) + "]")

    element.timestamp
  }
}

代碼詳解

  1. 設(shè)置為事件時(shí)間
  2. 接受本地socket數(shù)據(jù)
  3. 抽取timestamp生成watermark,打印(線程id,key,eventTime,currentMaxTimestamp,watermark)
  4. event time每隔3秒觸發(fā)一次窗口,打?。╧ey,窗口內(nèi)元素個數(shù),窗口內(nèi)最早元素的時(shí)間,窗口內(nèi)最晚元素的時(shí)間,窗口自身開始時(shí)間,窗口自身結(jié)束時(shí)間)

試驗(yàn)

第一次

數(shù)據(jù)

{"datetime":"2019-03-26 16:25:24","name":"zhangsan"}

輸出

|currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:24],currentMaxTimestamp:[2019-03-26 16:25:24],watermark:[2019-03-26 16:25:14]

匯總

Key EventTime currentMaxTimestamp Watermark
zhangsan 2019-03-26 16:25:24 2019-03-26 16:25:24 2019-03-26 16:25:14

第二次

數(shù)據(jù)

{"datetime":"2019-03-26 16:25:27","name":"zhangsan"}

輸出

currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:27],currentMaxTimestamp:[2019-03-26 16:25:27],watermark:[2019-03-26 16:25:17]

匯總

Key EventTime currentMaxTimestamp Watermark
zhangsan 2019-03-26 16:25:24 2019-03-26 16:25:24 2019-03-26 16:25:14
zhangsan 2019-03-26 16:25:27 2019-03-26 16:25:27 2019-03-26 16:25:17

隨著EventTime的升高,Watermark升高。

第三次

數(shù)據(jù)

{"datetime":"2019-03-26 16:25:34","name":"zhangsan"}

輸出

currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:34],currentMaxTimestamp:[2019-03-26 16:25:34],watermark:[2019-03-26 16:25:24]

匯總

Key EventTime currentMaxTimestamp Watermark
zhangsan 2019-03-26 16:25:24 2019-03-26 16:25:24 2019-03-26 16:25:14
zhangsan 2019-03-26 16:25:27 2019-03-26 16:25:27 2019-03-26 16:25:17
zhangsan 2019-03-26 16:25:34 2019-03-26 16:25:34 2019-03-26 16:25:24

到這里,window仍然沒有被觸發(fā),此時(shí)watermark的時(shí)間已經(jīng)等于了第一條數(shù)據(jù)的Event Time了。

第四次

數(shù)據(jù)

{"datetime":"2019-03-26 16:25:35","name":"zhangsan"}
image

輸出

currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:35],currentMaxTimestamp:[2019-03-26 16:25:35],watermark:[2019-03-26 16:25:25]
(zhangsan,1,2019-03-26 16:25:24,2019-03-26 16:25:24,2019-03-26 16:25:20,2019-03-26 16:25:25)
image

匯總

Key EventTime currentMaxTimestamp Watermark WindowStartTime WindowEndTime
zhangsan 2019-03-26 16:25:24 2019-03-26 16:25:24 2019-03-26 16:25:14
zhangsan 2019-03-26 16:25:27 2019-03-26 16:25:27 2019-03-26 16:25:17
zhangsan 2019-03-26 16:25:34 2019-03-26 16:25:34 2019-03-26 16:25:24
zhangsan 2019-03-26 16:25:35 2019-03-26 16:25:35 2019-03-26 16:25:25 [2019-03-26 16:25:20 2019-03-26 16:25:25)

直接證明了window的設(shè)定無關(guān)數(shù)據(jù)本身,而是系統(tǒng)定義好了的。
輸入的數(shù)據(jù)中,根據(jù)自身的Event Time,將數(shù)據(jù)劃分到不同的window中,如果window中有數(shù)據(jù),則當(dāng)watermark時(shí)間>=Event Time時(shí),就符合了window觸發(fā)的條件了,最終決定window觸發(fā),還是由數(shù)據(jù)本身的Event Time所屬的window中的window_end_time決定。

當(dāng)最后一條數(shù)據(jù)16:25:35到達(dá)是,Watermark提升到16:25:25,此時(shí)窗口16:25:20~16:25:25中有數(shù)據(jù),Window被觸發(fā)。

第五次

數(shù)據(jù)

{"datetime":"2019-03-26 16:25:37","name":"zhangsan"}

輸出

currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:37],currentMaxTimestamp:[2019-03-26 16:25:37],watermark:[2019-03-26 16:25:27]

匯總

Key EventTime currentMaxTimestamp Watermark WindowStartTime WindowEndTime
zhangsan 2019-03-26 16:25:24 2019-03-26 16:25:24 2019-03-26 16:25:14
zhangsan 2019-03-26 16:25:27 2019-03-26 16:25:27 2019-03-26 16:25:17
zhangsan 2019-03-26 16:25:34 2019-03-26 16:25:34 2019-03-26 16:25:24
zhangsan 2019-03-26 16:25:35 2019-03-26 16:25:35 2019-03-26 16:25:25 [2019-03-26 16:25:20 2019-03-26 16:25:25)
zhangsan 2019-03-26 16:25:37 2019-03-26 16:25:37 2019-03-26 16:25:27

此時(shí),watermark時(shí)間雖然已經(jīng)達(dá)到了第二條數(shù)據(jù)的時(shí)間,但是由于其沒有達(dá)到第二條數(shù)據(jù)所在window的結(jié)束時(shí)間,所以window并沒有被觸發(fā)。

第二條數(shù)據(jù)所在的window時(shí)間是:[2019-03-26 16:25:25,2019-03-26 16:25:30)

第六次

數(shù)據(jù)

{"datetime":"2019-03-26 16:25:40","name":"zhangsan"}

輸出

currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:40],currentMaxTimestamp:[2019-03-26 16:25:40],watermark:[2019-03-26 16:25:30]
(zhangsan,1,2019-03-26 16:25:27,2019-03-26 16:25:27,2019-03-26 16:25:25,2019-03-26 16:25:30)

匯總

Key EventTime currentMaxTimestamp Watermark WindowStartTime WindowEndTime
zhangsan 2019-03-26 16:25:24 2019-03-26 16:25:24 2019-03-26 16:25:14
zhangsan 2019-03-26 16:25:27 2019-03-26 16:25:27 2019-03-26 16:25:17
zhangsan 2019-03-26 16:25:34 2019-03-26 16:25:34 2019-03-26 16:25:24
zhangsan 2019-03-26 16:25:35 2019-03-26 16:25:35 2019-03-26 16:25:25 [2019-03-26 16:25:20 2019-03-26 16:25:25)
zhangsan 2019-03-26 16:25:37 2019-03-26 16:25:37 2019-03-26 16:25:27
zhangsan 2019-03-26 16:25:40 2019-03-26 16:25:40 2019-03-26 16:25:30 [2019-03-26 16:25:25 2019-03-26 16:25:30)

結(jié)論

window的觸發(fā)要符合以下幾個條件:

  1. watermark時(shí)間 >= window_end_time
  2. 在[window_start_time,window_end_time)中有數(shù)據(jù)存在

同時(shí)滿足了以上2個條件,window才會觸發(fā)。
watermark是一個全局的值,不是某一個key下的值,所以即使不是同一個key的數(shù)據(jù),其warmark也會增加.

多并行度

image

總結(jié)

Flink如何處理亂序?

watermark+window機(jī)制。window中可以對input進(jìn)行按照Event Time排序,使得完全按照Event Time發(fā)生的順序去處理數(shù)據(jù),以達(dá)到處理亂序數(shù)據(jù)的目的。

Flink何時(shí)觸發(fā)window?

對于late element太多的數(shù)據(jù)而言

  1. Event Time < watermark時(shí)間

對于out-of-order以及正常的數(shù)據(jù)而言

  1. watermark時(shí)間 >= window_end_time
  2. 在[window_start_time,window_end_time)中有數(shù)據(jù)存在

Flink應(yīng)該如何設(shè)置最大亂序時(shí)間?

結(jié)合自己的業(yè)務(wù)以及數(shù)據(jù)情況去設(shè)置。

image

參考

Flink WaterMark(水位線)分布式執(zhí)行理解
Flink流計(jì)算編程--watermark(水位線)簡介
The Dataflow Model

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Watermark 是 Flink 實(shí)時(shí)處理計(jì)算平臺的一個重要概念,也是 Google 的著名實(shí)時(shí)計(jì)算論文 The...
    hongyuzhou閱讀 2,751評論 2 13
  • 1. Watermark概念 watermark是一種衡量Event Time進(jìn)展的機(jī)制,它是數(shù)據(jù)本身的一個隱藏屬...
    maskwang520閱讀 18,394評論 0 6
  • Window是無限數(shù)據(jù)流處理的核心,Window將一個無限的stream拆分成有限大小的”buckets”桶,我們...
    尼小摩閱讀 3,556評論 0 13
  • 聽完樊登老師講的《即興演講》、《演講的力量》、《高效演講》最少的一本書,都聽了兩遍,其他都是三四遍。和女...
    蘇小濤閱讀 222評論 0 0
  • 按照官方網(wǎng)站的介紹,IPFS編譯一般無法通過。原因基本是中間下載包失敗。所以你最好有一個能訪問ipfs.io的代理...
    樂樂_6272閱讀 1,728評論 0 0

友情鏈接更多精彩內(nèi)容