Apache Flink——Watermark 水位線

前言

在流數(shù)據(jù)處理應(yīng)用中,一個(gè)很重要、也很常見的操作就是窗口計(jì)算。所謂的“窗口”,一般就是劃定的一段時(shí)間范圍,也就是“時(shí)間窗”;對在這范圍內(nèi)的數(shù)據(jù)進(jìn)行處理,就是所謂的窗口計(jì)算。所以窗口和時(shí)間往往是分不開的。

基本概念是什么

  • Window:Window是處理無界流的關(guān)鍵,Windows將流拆分為一個(gè)個(gè)有限大小的buckets,可以可以在每一個(gè)buckets中進(jìn)行計(jì)算。
  • start_time、end_time:當(dāng)Window時(shí)時(shí)間窗口的時(shí)候,每個(gè)window都會有一個(gè)開始時(shí)間和結(jié)束時(shí)間(前開后閉),這個(gè)時(shí)間是系統(tǒng)時(shí)間。
  • event-time: 事件發(fā)生時(shí)間,是事件發(fā)生所在設(shè)備的當(dāng)?shù)貢r(shí)間,比如一個(gè)點(diǎn)擊事件的時(shí)間發(fā)生時(shí)間,是用戶點(diǎn)擊操作所在的手機(jī)或電腦的時(shí)間。
  • Watermarks:可以把他理解為一個(gè)水位線,等于evevtTime - delay(比如規(guī)定為20分鐘),一旦Watermarks大于了某個(gè)window的end_time,就會觸發(fā)此window的計(jì)算,Watermarks就是用來觸發(fā)window計(jì)算的。

推遲窗口觸發(fā)的時(shí)間,實(shí)現(xiàn)方式:通過當(dāng)前窗口中最大的eventTime-延遲時(shí)間所得到的Watermark與窗口原始觸發(fā)時(shí)間進(jìn)行對比,當(dāng)Watermark大于窗口原始觸發(fā)時(shí)間時(shí)則觸發(fā)窗口執(zhí)行!我們知道,流處理從事件產(chǎn)生,到流經(jīng)source,再到operator,中間是有一個(gè)過程和時(shí)間的,雖然大部分情況下,流到operator的數(shù)據(jù)都是按照事件產(chǎn)生的時(shí)間順序來的,但是也不排除由于網(wǎng)絡(luò)、分布式等原因,導(dǎo)致亂序的產(chǎn)生,所謂亂序,就是指Flink接收到的事件的先后順序不是嚴(yán)格按照事件的Event Time順序排列的。

那么此時(shí)出現(xiàn)一個(gè)問題,一旦出現(xiàn)亂序,如果只根據(jù)eventTime決定window的運(yùn)行,我們不能明確數(shù)據(jù)是否全部到位,但又不能無限期的等下去,此時(shí)必須要有個(gè)機(jī)制來保證一個(gè)特定的時(shí)間后,必須觸發(fā)window去進(jìn)行計(jì)算了,這個(gè)特別的機(jī)制,就是Watermark。

Watermark是一種衡量Event Time進(jìn)展的機(jī)制。 Watermark是用于處理亂序事件的,而正確的處理亂序事件,通常用Watermark機(jī)制結(jié)合window來實(shí)現(xiàn)。 數(shù)據(jù)流中的Watermark用于表示timestamp小于Watermark的數(shù)據(jù),都已經(jīng)到達(dá)了,因此,window的執(zhí)行也是由Watermark觸發(fā)的。 Watermark可以理解成一個(gè)延遲觸發(fā)機(jī)制,我們可以設(shè)置Watermark的延時(shí)時(shí)長t,每次系統(tǒng)會校驗(yàn)已經(jīng)到達(dá)的數(shù)據(jù)中最大的maxEventTime,然后認(rèn)定eventTime小于maxEventTime - t的所有數(shù)據(jù)都已經(jīng)到達(dá),如果有窗口的停止時(shí)間等于maxEventTime – t,那么這個(gè)窗口被觸發(fā)執(zhí)行。 有序流的Watermarker如下圖所示:(Watermark設(shè)置為0)

亂序流的Watermarker如下圖所示:(Watermark設(shè)置為2)

當(dāng)Flink接收到數(shù)據(jù)時(shí),會按照一定的規(guī)則去生成Watermark,這條Watermark就等于當(dāng)前所有到達(dá)數(shù)據(jù)中的maxEventTime - 延遲時(shí)長,也就是說,Watermark是由數(shù)據(jù)攜帶的,一旦數(shù)據(jù)攜帶的Watermark比當(dāng)前未觸發(fā)的窗口的停止時(shí)間要晚,那么就會觸發(fā)相應(yīng)窗口的執(zhí)行。由于Watermark是由數(shù)據(jù)攜帶的,因此,如果運(yùn)行過程中無法獲取新的數(shù)據(jù),那么沒有被觸發(fā)的窗口將永遠(yuǎn)都不被觸發(fā)。

上圖中,我們設(shè)置的允許最大延遲到達(dá)時(shí)間為2s,所以時(shí)間戳為5s的事件對應(yīng)的Watermark是3s,時(shí)間戳為9s的事件的Watermark是7s,如果我們的窗口1是1s-3s,窗口2是4s-6s,那么時(shí)間戳為5s的事件到達(dá)時(shí)的Watermarker恰好觸發(fā)窗口1,時(shí)間戳為9s的事件到達(dá)時(shí)的Watermark觸發(fā)窗口2。

Watermark 就是觸發(fā)前一窗口的“關(guān)窗時(shí)間”,一旦觸發(fā)關(guān)門那么以當(dāng)前時(shí)刻為準(zhǔn)在窗口范圍內(nèi)的所有所有數(shù)據(jù)都會收入窗中。只要沒有達(dá)到水位那么不管現(xiàn)實(shí)中的時(shí)間推進(jìn)了多久都不會觸發(fā)關(guān)窗。

一、時(shí)間語義

時(shí)間本身就有著“流”的特性,它可以用來判斷事件發(fā)生的先后以及間隔;所以如果我們想要?jiǎng)澏ù翱趤硎占瘮?shù)據(jù),一般就需要基于時(shí)間。對于批處理來說,這似乎沒什么討論的必要,因?yàn)閿?shù)據(jù)都收集好了,想怎么劃分窗口都可以;而對于流處理來說,如果想處理更加實(shí)時(shí),就必須對時(shí)間有更加精細(xì)的控制。

1.1 Flink 中的時(shí)間語義

對于一臺機(jī)器而言,“時(shí)間”自然就是指系統(tǒng)時(shí)間。但Flink 是一個(gè)分布式處理系統(tǒng),分布式架構(gòu)最大的特點(diǎn),就是節(jié)點(diǎn)彼此獨(dú)立、互不影響,這帶來了更高的吞吐量和容錯(cuò)性;但有利必有弊,最大的問題也來源于此。

在分布式系統(tǒng)中,節(jié)點(diǎn)“各自為政”,是沒有統(tǒng)一時(shí)鐘的,數(shù)據(jù)和控制信息都通過網(wǎng)絡(luò)進(jìn)
行傳輸。因?yàn)榫W(wǎng)絡(luò)傳輸會有延遲,而且這延遲是不確定的,所以 JobManager 作為管理者統(tǒng)一向所有 TaskManager 發(fā)送同步時(shí)鐘信號, 發(fā)出的同步信號無法同時(shí)到達(dá)所有節(jié)點(diǎn);想要擁有一個(gè)全局統(tǒng)一的時(shí)鐘,在分布式系統(tǒng)里是做不到的。

另一個(gè)麻煩的問題是,在流式處理的過程中,數(shù)據(jù)是在不同的節(jié)點(diǎn)間不停流動的,這同樣也會有網(wǎng)絡(luò)傳輸?shù)难舆t。這樣一來,當(dāng)上下游任務(wù)需要跨節(jié)點(diǎn)傳輸數(shù)據(jù)時(shí),它們對于“時(shí)間”的理解也會有所不同。例如,上游任務(wù)在 8 點(diǎn) 59 分 59 秒發(fā)出一條數(shù)據(jù),到下游要做窗口計(jì)算時(shí)已經(jīng)是 9 點(diǎn)零 1 秒了,那這條數(shù)據(jù)到底該不該被收到 8 點(diǎn)~9 點(diǎn)的窗口呢?

所以,當(dāng)我們希望對數(shù)據(jù)按照時(shí)間窗口來進(jìn)行收集計(jì)算時(shí),“時(shí)間”到底以誰為標(biāo)準(zhǔn)就非常重要了。

在事件發(fā)生之后,生成的數(shù)據(jù)被收集起來,首先進(jìn)入分布式消息隊(duì)列,然后被 Flink 系統(tǒng)中的 Source 算子讀取消費(fèi),進(jìn)而向下游的轉(zhuǎn)換算子(窗口算子)傳遞,最終由窗口算子進(jìn)行計(jì)算處理。

這里有兩個(gè)非常重要的時(shí)間點(diǎn):一個(gè)是數(shù)據(jù)產(chǎn)生的時(shí)間,我們把它叫作“事件時(shí)間”(Event Time);另一個(gè)是數(shù)據(jù)真正被處理的時(shí)刻,叫作“處理時(shí)間”(Processing Time)。我們所定義的窗口操作,到底是以那種時(shí)間作為衡量標(biāo)準(zhǔn),就是所謂的“時(shí)間語義”(Notions of Time)。由于分布式系統(tǒng)中網(wǎng)絡(luò)傳輸?shù)难舆t和時(shí)鐘漂移,處理時(shí)間相對事件發(fā)生的時(shí)間會有所滯后。

1.1.1 處理時(shí)間(Processing Time)

處理時(shí)間的概念非常簡單,就是指執(zhí)行處理操作的機(jī)器的系統(tǒng)時(shí)間。

如果我們以它作為衡量標(biāo)準(zhǔn),那么數(shù)據(jù)屬于哪個(gè)窗口就很明顯了:只看窗口任務(wù)處理這條數(shù)據(jù)時(shí),當(dāng)前的系統(tǒng)時(shí)間即可。

這種方法非常簡單粗暴,不需要各個(gè)節(jié)點(diǎn)之間進(jìn)行協(xié)調(diào)同步,也不需要考慮數(shù)據(jù)在流中的位置,簡單來說就是“我的地盤聽我的”。所以處理時(shí)間是最簡單的時(shí)間語義。

1.1.2 事件時(shí)間(Event Time)

事件時(shí)間,是指每個(gè)事件在對應(yīng)的設(shè)備上發(fā)生的時(shí)間,也就是數(shù)據(jù)生成的時(shí)間。

數(shù)據(jù)一旦產(chǎn)生,這個(gè)時(shí)間自然就確定了,所以它可以作為一個(gè)屬性嵌入到數(shù)據(jù)中。這其實(shí)就是這條數(shù)據(jù)記錄的“時(shí)間戳”(Timestamp)。

在事件時(shí)間語義下,我們對于時(shí)間的衡量,就不看任何機(jī)器的系統(tǒng)時(shí)間了,而是依賴于數(shù)據(jù)本身產(chǎn)生的時(shí)間。

由于流處理中數(shù)據(jù)是源源不斷產(chǎn)生的,一般來說,先產(chǎn)生的數(shù)據(jù)也會先被處理,所以當(dāng)任務(wù)不停地接到數(shù)據(jù)時(shí),它們的時(shí)間戳也基本上是不斷增長的,就可以代表時(shí)間的推進(jìn)。

這里有個(gè)前提,就是“先產(chǎn)生的數(shù)據(jù)先被處理”,這要求我們可以保證數(shù)據(jù)到達(dá)的順序。但是由于分布式系統(tǒng)中網(wǎng)絡(luò)傳輸延遲的不確定性,實(shí)際應(yīng)用中我們要面對的數(shù)據(jù)流往往是亂序的。在這種情況下,就不能簡單地把數(shù)據(jù)自帶的時(shí)間戳當(dāng)作時(shí)鐘了,而需要用另外的標(biāo)志來表示事件時(shí)間進(jìn)展,在 Flink 中把它叫作事件時(shí)間的“水位線”(Watermarks)。

實(shí)際項(xiàng)目開發(fā)中大多數(shù)都是使用的事件時(shí)間

二、水位線(Watermark)

在實(shí)際應(yīng)用中,一般會采用事件時(shí)間語義。而水位線,就是基于事件時(shí)間提出的概念。一個(gè)數(shù)據(jù)產(chǎn)生的時(shí)刻,就是流處理中事件觸發(fā)的時(shí)間點(diǎn),這就是“事件時(shí)間”,一般都會以時(shí)間戳的形式作為一個(gè)字段記錄在數(shù)據(jù)里。這個(gè)時(shí)間就像商品的“生產(chǎn)日期”一樣,一旦產(chǎn)生就是固定的,印在包裝袋上,不會因?yàn)檫\(yùn)輸輾轉(zhuǎn)而變化。如果我們想要統(tǒng)計(jì)一段時(shí)間內(nèi)的數(shù)據(jù),需要?jiǎng)澐謺r(shí)間窗口,這時(shí)只要判斷一下時(shí)間戳就可以知道數(shù)據(jù)屬于哪個(gè)窗口了。

明確了一個(gè)數(shù)據(jù)的所屬窗口,還不能直接進(jìn)行計(jì)算。因?yàn)榇翱谔幚淼氖怯薪鐢?shù)據(jù),我們需要等窗口的數(shù)據(jù)都到齊了,才能計(jì)算出最終的統(tǒng)計(jì)結(jié)果。

對于時(shí)間窗口來說這很明顯:到了窗口的結(jié)束時(shí)間,自然就應(yīng)該收集到了所有數(shù)據(jù),就可以觸發(fā)計(jì)算輸出結(jié)果了。

2.1 水位線的定義

在事件時(shí)間的語義下,不依賴系統(tǒng)時(shí)間,而是基于數(shù)據(jù)自帶的時(shí)間戳去定義一個(gè)時(shí)鐘,用來表示當(dāng)前時(shí)間的進(jìn)展。

在數(shù)據(jù)流中加入一個(gè)時(shí)鐘標(biāo)記,記錄當(dāng)前的事件時(shí)間,這個(gè)標(biāo)記可以直接廣播到下游,當(dāng)下游任務(wù)收到這個(gè)標(biāo)記,就可以更新自己的時(shí)鐘了,這種類似于水流中用來做標(biāo)志的記號,在Flink中被稱為水位線。

有序流中的水位線

在理想狀態(tài)下,數(shù)據(jù)應(yīng)該按照它們生成的先后順序、排好隊(duì)進(jìn)入流中。在實(shí)際應(yīng)用中,如果當(dāng)前數(shù)據(jù)量非常大,可能會有很多數(shù)據(jù)的時(shí)間戳是相同的,這時(shí)每來一條時(shí)間就提取時(shí)間戳、插入水位線就做了大量的無用功。而且即使時(shí)間戳不同,同時(shí)涌來的時(shí)間差會非常?。ū热鐜缀撩耄?,往往對處理計(jì)算沒什么影響,故為了提高效率,一般會采取每隔一段時(shí)間生成一個(gè)水位線(對應(yīng)于時(shí)間戳)。這個(gè)每隔的時(shí)間周期指的是處理時(shí)間(系統(tǒng)時(shí)間)

亂序流中的水位線

在分布式系統(tǒng)中,數(shù)據(jù)在節(jié)點(diǎn)間的傳輸,會因?yàn)榫W(wǎng)絡(luò)傳輸延遲的不確定性,導(dǎo)致順序發(fā)生改變,這就是所謂的亂序。為了從亂序流中插入水位線,我們就需要定義一個(gè)規(guī)則:插入新的水位線時(shí),先判斷一下時(shí)間戳是否比之前的大,否則就不再生成新的水位線。

如果考慮到大量數(shù)據(jù)同時(shí)到來的處理效率,我們同樣可以周期性地生成水位線,這時(shí)只需要保存一下之前所有數(shù)據(jù)中最大時(shí)間戳,需要插入水位線時(shí),就直接以它作為時(shí)間戳生成新的水位線。

但也有一個(gè)問題,我們無法正確處理“遲到”的數(shù)據(jù)。為了讓窗口能正確收集到遲到的數(shù)據(jù),我們可以等上幾秒,也就是用當(dāng)前已有數(shù)據(jù)的最大時(shí)間戳減去幾秒,就是要插入的水位線的時(shí)間戳。

水位線的特性

水位線就代表了當(dāng)前的事件時(shí)間時(shí)鐘,而且可以在數(shù)據(jù)的時(shí)間戳基礎(chǔ)上加一些延遲來保證不丟數(shù)據(jù),這一點(diǎn)對于亂序流的正確處理非常重要。

  • 水位線是插入到數(shù)據(jù)流中的一個(gè)標(biāo)記,可以認(rèn)為是一個(gè)特殊的數(shù)據(jù)
  • 水位線主要的內(nèi)容是一個(gè)時(shí)間戳,用來表示當(dāng)前事件時(shí)間的進(jìn)展
  • 水位線是基于數(shù)據(jù)的時(shí)間戳生成
  • 水位線的時(shí)間戳必須單調(diào)遞增,以確保任務(wù)的事件時(shí)間時(shí)鐘一直向前推進(jìn)
  • 水位線可以通過設(shè)置延遲,來保證正確處理亂序數(shù)據(jù)
  • 一個(gè)水位線t,表示在當(dāng)前流中事件時(shí)間已經(jīng)達(dá)到了時(shí)間戳t,代表t之前的所有數(shù)據(jù)都到齊了,之后流中不會出現(xiàn)時(shí)間戳t’ <=t 的數(shù)據(jù)

水位線是Flink流處理中保證結(jié)果正確性的核心機(jī)制,它往往會跟窗口一起配合,完成堆亂序數(shù)據(jù)的正確處理。

2.2 水位線的生成

計(jì)算處理更快、實(shí)時(shí)性更強(qiáng)、計(jì)算準(zhǔn)確性盡可能得到保障,我們就需要設(shè)置合理的水位線。

水位線生成策略

在Flink的DataStream API中,有一個(gè)單獨(dú)用于生成水位線的方法:assignTimestampsAndWatermarks(),它主要用來為流中的數(shù)據(jù)分配時(shí)間戳,并生成水位線來指示事件時(shí)間:

public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
        WatermarkStrategy<T> watermarkStrategy)

上述方法需要傳入一個(gè)watermarkStrategy參數(shù),這就是所謂的水位線生成策略

public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>,
    WatermarkGeneratorSupplier<T>{
        @Override
        TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);
        
        @Override
        WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}
  • TimestampAssigner:主要負(fù)責(zé)從流中數(shù)據(jù)元素的某個(gè)字段中提取時(shí)間戳,并分配給元素。時(shí)間戳的分配是生成水位線的基礎(chǔ)。

  • WatermarkGenerator:主要負(fù)責(zé)按照既定方式,基于時(shí)間戳生成水位線。在WatermarkGenerator接口中有兩個(gè)方法:onEvent,onPeriodicEmit。

    • onEvent:每個(gè)事件(數(shù)據(jù))到來都會調(diào)用的方法,它的參數(shù)有當(dāng)前事件、時(shí)間戳,以及允許發(fā)出水位線的一個(gè)WatermarkOutput,可以基于事件做出各種操作。
    • onPeriodicEmit:周期性調(diào)用的方法,可以由WatermarkOutput發(fā)出水位線。周期時(shí)間為處理時(shí)間,可以調(diào)用環(huán)境配置的…setAutoWatermarkInterval()方法來設(shè)置,默認(rèn)為200ms。
public interface WatermarkGenerator<T> {
    void onEvent(T event, long eventTimestamp, WatermarkOutput output);
    void onPeriodicEmit(WatermarkOutput output);
}
env.getConfig().setAutoWatermarkInterval(100);

2.3 Flink內(nèi)置水位線生成器

WatermarkStrategy 這個(gè)接口是一個(gè)生成水位線策略的抽象,讓我們可以靈活地實(shí)現(xiàn)自己的需求;但看起來有些復(fù)雜,如果想要自己實(shí)現(xiàn)應(yīng)該還是比較麻煩的。Flink 提供了內(nèi)置的水位線生成器(WatermarkGenerator),不僅開箱即用簡化了編程,而且也為我們自定義水位線策略提供了模板。

這兩個(gè)生成器可以通過調(diào)用 WatermarkStrategy 的靜態(tài)輔助方法來創(chuàng)建。它們都是周期性生成水位線的,分別對應(yīng)著處理有序流和亂序流的場景。

2.3.1 有序流

對于有序流,主要特點(diǎn)就是時(shí)間戳單調(diào)增長(Monotonously Increasing Timestamps),所以永遠(yuǎn)不會出現(xiàn)遲到數(shù)據(jù)的問題。這是周期性生成水位線的最簡單的場景,直接調(diào)用
WatermarkStrategy.forMonotonousTimestamps()方法就可以實(shí)現(xiàn)。簡單來說,就是直接拿當(dāng)前最大的時(shí)間戳作為水位線就可以了。

import com.yibo.flink.datastream.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.ArrayList;
import java.util.List;

/**
 * @Author: huangyibo
 * @Date: 2022/7/3 1:42
 * @Description: 有序流的Watermark生成
 */

public class WaterMark {

    public static void main(String[] args) throws Exception {
        //創(chuàng)建執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //設(shè)置生成水位線的時(shí)間間隔
        env.getConfig().setAutoWatermarkInterval(100);

        List<Event> list = new ArrayList<>();
        list.add(new Event("Mary","./home",1000L));
        list.add(new Event("Bobo","./cart",2000L));
        list.add(new Event("Alice","./cart",3000L));
        list.add(new Event("Bobo","./prod?id=1",4000L));
        list.add(new Event("Bobo","./prod?id=2",4500L));

        SingleOutputStreamOperator<Event> streamOperator = env.fromCollection(list)
                //有序流的Watermark生成
                .assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
                            @Override
                            public long extractTimestamp(Event element, long recordTimestamp) {
                                return element.getTimestamp();
                            }
                        }));

        streamOperator.print();
        
        env.execute();
    }
}

上面代碼中我們調(diào)用.withTimestampAssigner()方法,將數(shù)據(jù)中的 timestamp 字段提取出來,作為時(shí)間戳分配給數(shù)據(jù)元素;然后用內(nèi)置的有序流水位線生成器構(gòu)造出了生成策略。這樣,提取出的數(shù)據(jù)時(shí)間戳,就是我們處理計(jì)算的事件時(shí)間。

這里需要注意的是,時(shí)間戳和水位線的單位,必須都是毫秒。

2.3.2 亂序流

由于亂序流中需要等待遲到數(shù)據(jù)到齊,所以必須設(shè)置一個(gè)固定量的延遲時(shí)間(Fixed
Amount of Lateness)。這時(shí)生成水位線的時(shí)間戳,就是當(dāng)前數(shù)據(jù)流中最大的時(shí)間戳減去延遲的結(jié)果,相當(dāng)于把表調(diào)慢,當(dāng)前時(shí)鐘會滯后于數(shù)據(jù)的最大時(shí)間戳。調(diào)用WatermarkStrategy.forBoundedOutOfOrderness()方法就可以實(shí)現(xiàn)。這個(gè)方法需要傳入一個(gè) maxOutOfOrderness 參數(shù),表示“最大亂序程度”,它表示數(shù)據(jù)流中亂序數(shù)據(jù)時(shí)間戳的最大差值;如果我們能確定亂序程度,那么設(shè)置對應(yīng)時(shí)間長度的延遲,就可以等到所有的亂序數(shù)據(jù)了。

import com.yibo.flink.datastream.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;

/**
 * @Author: huangyibo
 * @Date: 2022/7/3 1:42
 * @Description: 亂序流的Watermark生成
 */

public class WaterMark {

    public static void main(String[] args) throws Exception {
        //創(chuàng)建執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //設(shè)置生成水位線的時(shí)間間隔
        env.getConfig().setAutoWatermarkInterval(100);

        List<Event> list = new ArrayList<>();
        list.add(new Event("Mary","./home",1000L));
        list.add(new Event("Bobo","./cart",2000L));
        list.add(new Event("Alice","./cart",3000L));
        list.add(new Event("Bobo","./prod?id=1",4000L));
        list.add(new Event("Bobo","./prod?id=2",4500L));

        //亂序流的Watermark生成
        SingleOutputStreamOperator<Event> streamOperator = env.fromCollection(list)
                // 插入水位線的邏輯
                .assignTimestampsAndWatermarks(
                        // 針對亂序流插入水位線,延遲時(shí)間設(shè)置為 2s
                        WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        // 抽取時(shí)間戳的邏輯
                        .withTimestampAssigner((SerializableTimestampAssigner<Event>) (element, recordTimestamp) -> element.getTimestamp())
                );

        streamOperator.print();

        env.execute();
    }
}

事實(shí)上,有序流的水位線生成器本質(zhì)上和亂序流是一樣的,相當(dāng)于延遲設(shè)為 0 的亂序流水位線生成器,兩者完全等同:

WatermarkStrategy.forMonotonousTimestamps()
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(0))

注意:亂序流中生成的水位線真正的時(shí)間戳,其實(shí)是當(dāng)前最大時(shí)間戳 - 延遲時(shí)間 - 1,這里單位是毫秒。

public void onPeriodicEmit(WatermarkOutput output) {
    output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
}

2.4 自定義水位線策略

一般來說,F(xiàn)link 內(nèi)置的水位線生成器就可以滿足應(yīng)用需求了。不過有時(shí)我們的業(yè)務(wù)邏輯
可能非常復(fù)雜,這時(shí)對水位線生成的邏輯也有更高的要求,我們就必須自定義實(shí)現(xiàn)水位線策略WatermarkStrategy 了。

在 WatermarkStrategy 中,時(shí)間戳分配器 TimestampAssigner 都是大同小異的,指定字段提取時(shí)間戳就可以了;而不同策略的關(guān)鍵就在于 WatermarkGenerator 的實(shí)現(xiàn)。整體說來,F(xiàn)link有兩種不同的生成水位線的方式:一種是周期性的(Periodic),另一種是斷點(diǎn)式的(Punctuated)。

WatermarkGenerator接口中有兩個(gè)方法:onEvent()、onPeriodicEmit(),前者是在每個(gè)時(shí)間到來時(shí)調(diào)用,后者由框架周期性調(diào)用。周期性調(diào)用的方法中發(fā)出水位線,自然就是周期性生成水位線;而在事件觸發(fā)的方法中發(fā)出水位線,自然就是斷點(diǎn)式生成了。兩種方式的不同就集中體現(xiàn)在這兩個(gè)方法的實(shí)現(xiàn)上。

2.4.1 周期性水位線生成器(Periodic Generator)

周期性生成器一般是通過onEvent()觀察判斷輸入的事件,而在onPeriodicEmit()里發(fā)出水位線。

import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class CustomWatermarkTest {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.addSource(new ClickSource())
                .assignTimestampsAndWatermarks(new CustomWatermarkStrategy()).print();

        env.execute();
    }

    public static class CustomWatermarkStrategy implements WatermarkStrategy<Event> {
        @Override
        public TimestampAssigner<Event> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return new SerializableTimestampAssigner<Event>() {
                @Override
                public long extractTimestamp(Event element, long recordTimestamp) {
                    return element.timestamp; // 告訴程序數(shù)據(jù)源里的時(shí)間戳是哪一個(gè)字段
                }
            };
        }

        @Override
        public WatermarkGenerator<Event>
        createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new CustomPeriodicGenerator();
        }
    }

    public static class CustomPeriodicGenerator implements WatermarkGenerator<Event> {
        private Long delayTime = 5000L; // 延遲時(shí)間
        private Long maxTs = Long.MIN_VALUE + delayTime + 1L; // 觀察到的最大時(shí)間戳

        @Override
        public void onEvent(Event event, long eventTimestamp, WatermarkOutput
                output) {
            // 每來一條數(shù)據(jù)就調(diào)用一次
            maxTs = Math.max(event.timestamp, maxTs); // 更新最大時(shí)間戳
        }

        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            // 發(fā)射水位線,默認(rèn) 200ms 調(diào)用一次
            output.emitWatermark(new Watermark(maxTs - delayTime - 1L));
        }
    }
}

們在 onPeriodicEmit()里調(diào)用 output.emitWatermark(),就可以發(fā)出水位線了;這個(gè)方法
由系統(tǒng)框架周期性地調(diào)用,默認(rèn) 200ms 一次。所以水位線的時(shí)間戳是依賴當(dāng)前已有數(shù)據(jù)的最大時(shí)間戳的(這里的實(shí)現(xiàn)與內(nèi)置生成器類似,也是減去延遲時(shí)間再減 1),但具體什么時(shí)候生成與數(shù)據(jù)無關(guān)。

2.4.2 斷點(diǎn)式水位線生成器(Punctuated Generator)

斷點(diǎn)式生成器會不停地檢測 onEvent()中的事件,當(dāng)發(fā)現(xiàn)帶有水位線信息的特殊事件時(shí),
就立即發(fā)出水位線。一般來說,斷點(diǎn)式生成器不會通過 onPeriodicEmit()發(fā)出水位線。

public class CustomPunctuatedGenerator implements WatermarkGenerator<Event> {
    @Override
    public void onEvent(Event r, long eventTimestamp, WatermarkOutput output) {
        // 只有在遇到特定的 itemId 時(shí),才發(fā)出水位線
        if (r.user.equals("Mary")) {
            output.emitWatermark(new Watermark(r.timestamp - 1));
        }
    }
    
    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        // 不需要做任何事情,因?yàn)槲覀冊?onEvent 方法中發(fā)射了水位線
    }
}

在 onEvent()中判斷當(dāng)前事件的 user 字段,只有遇到“Mary”這個(gè)特殊的值時(shí),才調(diào)用output.emitWatermark()發(fā)出水位線。這個(gè)過程是完全依靠事件來觸發(fā)的,所以水位線的生成一定在某個(gè)數(shù)據(jù)到來之后。

2.5 在自定義數(shù)據(jù)源中發(fā)送水位線

我們也可以在自定義的數(shù)據(jù)源中抽取事件時(shí)間,然后發(fā)送水位線。這里要注意的是,在自 定義數(shù)據(jù)源中發(fā)送了水位線以后,就不能再在程序中使用 assignTimestampsAndWatermarks 方 法 來 生 成 水 位 線 了 。 在 自 定 義 數(shù) 據(jù) 源 中 生 成 水 位 線 和 在 程 序 中 使 用 assignTimestampsAndWatermarks 方法生成水位線二者只能取其一。

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;

import java.util.Calendar;
import java.util.Random;

public class EmitWatermarkInSourceFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        env.addSource(new ClickSourceWithWatermark()).print();
        env.execute();
    }

    // 泛型是數(shù)據(jù)源中的類型
    public static class ClickSourceWithWatermark implements SourceFunction<Event> {
        private boolean running = true;

        @Override
        public void run(SourceContext<Event> sourceContext) throws Exception {
            Random random = new Random();
            String[] userArr = {"Mary", "Bob", "Alice"};
            String[] urlArr = {"./home", "./cart", "./prod?id=1"};
            while (running) {
                long currTs = Calendar.getInstance().getTimeInMillis(); // 毫秒時(shí)間戳
                String username = userArr[random.nextInt(userArr.length)];
                String url = urlArr[random.nextInt(urlArr.length)];
                Event event = new Event(username, url, currTs);
                // 使用 collectWithTimestamp 方法將數(shù)據(jù)發(fā)送出去,并指明數(shù)據(jù)中的時(shí)間戳的字段
                sourceContext.collectWithTimestamp(event, event.timestamp);
                // 發(fā)送水位線
                sourceContext.emitWatermark(new Watermark(event.timestamp - 1L));
                Thread.sleep(1000L);
            }
        }

        @Override
        public void cancel() {
            running = false;
        }
    }
}

在自定義水位線中生成水位線相比 assignTimestampsAndWatermarks 方法更加靈活,可以任意的產(chǎn)生周期性的、非周期性的水位線,以及水位線的大小也完全由我們自定義。所以非常適合用來編寫 Flink 的測試程序,測試 Flink 的各種各樣的特性。

2.6 水位線的傳遞

在實(shí)際應(yīng)用中往往上下游都有多個(gè)并行子任務(wù),為了統(tǒng)一推進(jìn)事件時(shí)間的進(jìn)展,我們要求上游任務(wù)處理完水位線、時(shí)鐘改變之后,要把當(dāng)前的水位線廣播給所有的下游任務(wù)。這樣,后續(xù)任務(wù)就不需要依賴原始數(shù)據(jù)中的時(shí)間戳,也可以知道當(dāng)前事件時(shí)間了。

如圖 所示,當(dāng)前任務(wù)的上游,有四個(gè)并行子任務(wù),所以會接收到來自四個(gè)分區(qū)的水位線;而下游有三個(gè)并行子任務(wù),所以會向三個(gè)分區(qū)發(fā)出水位線。具體過程如下:

  • 1、上游并行子任務(wù)發(fā)來不同的水位線,當(dāng)前任務(wù)會為每一個(gè)分區(qū)設(shè)置一個(gè)“分區(qū)水位線” (Partition Watermark),這是一個(gè)分區(qū)時(shí)鐘;而當(dāng)前任務(wù)自己的時(shí)鐘,就是所有分區(qū)時(shí)鐘里最小的那個(gè)。

  • 2、當(dāng)有一個(gè)新的水位線(第一分區(qū)的 4)從上游傳來時(shí),當(dāng)前任務(wù)會首先更新對應(yīng)的分區(qū)時(shí)鐘;然后再次判斷所有分區(qū)時(shí)鐘中的最小值,如果比之前大,說明事件時(shí)間有了進(jìn)展,當(dāng)前任務(wù)的時(shí)鐘也就可以更新了。這里要注意,更新后的任務(wù)時(shí)鐘,并不一定是新來的那個(gè)分區(qū)水位線,比如這里改變的是第一分區(qū)的時(shí)鐘,但最小的分區(qū)時(shí)鐘是第三分區(qū)的 3,于是當(dāng)前任務(wù)時(shí)鐘就推進(jìn)到了 3。當(dāng)時(shí)鐘有進(jìn)展時(shí),當(dāng)前任務(wù)就會將自己的時(shí)鐘以水位線的形式,廣播給下游所有子任務(wù)。

  • 3、再次收到新的水位線(第二分區(qū)的 7)后,執(zhí)行同樣的處理流程。首先將第二個(gè)分區(qū)時(shí)鐘更新為 7,然后比較所有分區(qū)時(shí)鐘;發(fā)現(xiàn)最小值沒有變化,那么當(dāng)前任務(wù)的時(shí)鐘也不變,也不會向下游任務(wù)發(fā)出水位線。

  • 4、同樣道理,當(dāng)又一次收到新的水位線(第三分區(qū)的 6)之后,第三個(gè)分區(qū)時(shí)鐘更新為6,同時(shí)所有分區(qū)時(shí)鐘最小值變成了第一分區(qū)的 4,所以當(dāng)前任務(wù)的時(shí)鐘推進(jìn)到 4,并發(fā)出時(shí)間戳為 4 的水位線,廣播到下游各個(gè)分區(qū)任務(wù)。

水位線在上下游任務(wù)之間的傳遞,非常巧妙地避免了分布式系統(tǒng)中沒有統(tǒng)一時(shí)鐘的問題,每個(gè)任務(wù)都以“處理完之前所有數(shù)據(jù)”為標(biāo)準(zhǔn)來確定自己的時(shí)鐘,就可以保證窗口處理的結(jié)果總是正確的。對于有多條流合并之后進(jìn)行處理的場景,水位線傳遞的規(guī)則是類似的。

2.7 水位線的總結(jié)

水位線在事件時(shí)間的世界里面,承擔(dān)了時(shí)鐘的角色,是唯一的時(shí)間尺度。

水位線的默認(rèn)計(jì)算公式:水位線 = 觀察到的最大事件時(shí)間 – 最大延遲時(shí)間 – 1 毫秒

在數(shù)據(jù)流開始之前,F(xiàn)link 會插入一個(gè)大小是負(fù)無窮大(在 Java 中是-Long.MAX_VALUE)的水位線,而在數(shù)據(jù)流結(jié)束時(shí),F(xiàn)link 會插入一個(gè)正無窮大(Long.MAX_VALUE)的水位線,保證所有的窗口閉合以及所有的定時(shí)器都被觸發(fā)。

對于離線數(shù)據(jù)集,F(xiàn)link 也會將其作為流讀入,也就是一條數(shù)據(jù)一條數(shù)據(jù)的讀取。在這種
情況下,F(xiàn)link 對于離線數(shù)據(jù)集,只會插入兩次水位線,也就是在最開始處插入負(fù)無窮大的水位線,在結(jié)束位置插入一個(gè)正無窮大的水位線。因?yàn)橹恍枰迦雰纱嗡痪€,就可以保證計(jì)算的正確,無需在數(shù)據(jù)流的中間插入水位線了。

水位線的重要性在于它的邏輯時(shí)鐘特性,而邏輯時(shí)鐘這個(gè)概念可以說是分布式系統(tǒng)里面最為重要的概念之一了,理解透徹了對理解各種分布式系統(tǒng)非常有幫助。具體可以參考 Leslie Lamport 的論文。

參考:
https://blog.csdn.net/weixin_47491957/article/details/124400164

https://blog.csdn.net/mynameisgt/article/details/124205582

https://zhuanlan.zhihu.com/p/364013202

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容