一、flink支持的時間概念類型
1.1、流計算與時間屬性密不可分
相比較于離線計算,流計算往往離不開討論時間這個概念,因為離線計算是有界數(shù)據(jù),用戶輸入一個固定的數(shù)據(jù)集,離線計算引擎會輸出一個固定的結(jié)果,這中間涉及的中間狀態(tài),包括時間的把握,用戶都不操心。但是流計算是無界數(shù)據(jù),并且實時引擎一直都在進行計算,正是因為如此,計算出來的結(jié)果每時每刻都在變化,因為數(shù)據(jù)量不可控,所以要關(guān)注狀態(tài),因為要保證計算的準確性,所以要關(guān)注事件屬性。目前Flink支持的時間屬性分別有Event Time,Ingestion Time,Processing Time,大致關(guān)系如下圖。

1.2、事件時間(Event Time)
1.2.1、事件時間基本概念
事件時間是每個獨立事件在產(chǎn)生它的設(shè)備上發(fā)生的時間, 簡單來說這個時間在進入flink之前就已經(jīng)存在,一般來講都是使用消息創(chuàng)建的時間戳(當使用事件時間時,需要人為指定一個時間字段),正是因為這個關(guān)系,所以事件時間能夠判斷消息的有序性。基于Event Time的時間概念,數(shù)據(jù)處理過程當中,數(shù)據(jù)依賴于自己本身產(chǎn)生的時間,而不是Flink系統(tǒng)中Operator所在節(jié)點的系統(tǒng)時鐘,這樣能夠借助于事件產(chǎn)生的時間信息來還原事件的先后關(guān)系,并且用事件時間統(tǒng)計出來的指標是最精確的。
1.2.2、事件時間能夠解決亂序問題?
理想狀態(tài)下,我們都希望數(shù)據(jù)流按照數(shù)據(jù)產(chǎn)生的先后順序,進入flink系統(tǒng),然后按順序被處理,按順序輸出,但實際上,數(shù)據(jù)流大多存在亂序問題,這個亂序可能存在以下幾個地方:
-》數(shù)據(jù)源-》flink系統(tǒng)
-》Source Operator-》transform Operator
-》transform Operator->Sink Operator
這也正是數(shù)據(jù)流從輸入到輸出必經(jīng)的一條線路。
假設(shè)現(xiàn)在有一條亂序的數(shù)據(jù)流(10,9,8,5,7,6,4,3,2,1),很容易發(fā)現(xiàn)5這條消息延后了,如果現(xiàn)在我定義事件時間和滾動窗口結(jié)合使用,并且定義窗口大小為5,那么第一個窗口范圍為(1-5),假設(shè)這里先不考慮水印(Watermark),當1,2,3,4消息落在了第一個窗口,當6消息到來時,flink判斷6消息之前的消息都已經(jīng)到達,則觸發(fā)一個窗口開始計算,這樣5這條消息都被丟失了,所以光靠事件時間并不能解決亂序問題,就像我剛才提及的水印,也只是能夠再一定程度上保證不亂序,關(guān)于水印,下面會詳細介紹。
1.2.3、為啥事件時間統(tǒng)計出來的指標是最準確的?
假設(shè)現(xiàn)在要計算一個指標,1:00-2:00的交易額,這里涉及到一個時間的區(qū)間,假設(shè)使用的是處理時間,那么窗口觸發(fā)的依據(jù)將會是Operator的本地時鐘,如果這時候消息發(fā)生亂序,很可能存在一部分數(shù)據(jù)本身時間戳是在1:00-2:00區(qū)間的,但是由于亂序,導致一部分數(shù)據(jù)沒有落到這個窗口,那么這樣計算出來的數(shù)據(jù)就不準確了。如果采用事件時間的話,面對亂序的數(shù)據(jù)流,我們結(jié)合著水印(Watermark),能夠一定程度上解決亂序的問題,在使用事件時間時,會伴隨著設(shè)置一個時間間隔,這個間隔也叫作對延遲到達數(shù)據(jù)流的容忍度,間隔越大,相對于來講容忍度就越大。具體怎么解決,下面詳解。
1.2.4、事件時間在什么場景下使用最合適?
事件時間一般用于對于時間很敏感的指標使用,例如上訴提及的1:00-2:00的交易額,有一個明確的時間區(qū)間,0:59的交易數(shù)據(jù)不能被統(tǒng)計,這種就適合用事件時間。如果對時間不是特別敏感,例如一些流量數(shù)據(jù),可能只是做一些過濾等,這種就可以使用處理時間。從性能角度講處理時間會比使用事件時間高很多。因為結(jié)合著事件時間的窗口,因為水印的存在,往往會晚觸發(fā)窗口的計算,并且flink還要維護水印,這些都是額外的開銷。
1.2.5、事件時間和窗口結(jié)合使用,窗口大小是不是絕對的?
假設(shè)使用事件時間+滾動窗口,并且窗口大小設(shè)置為10min,水印設(shè)置為1min,那么是不是這個窗口最晚11min以后會觸發(fā)計算,其實不是這樣的,如果換成攝入時間和處理時間的,那么確實是這樣的。假象一下存在這樣極端的場景(事實上確實存在),消息的數(shù)量出現(xiàn)坡峰,flink集群處于消費消息積壓的狀態(tài),數(shù)據(jù)本身也有時間戳,那么10min大小的窗口,過了10min以后,并沒有完全把本身時間戳是這10min內(nèi)的數(shù)據(jù)都消費完,那么這時候由于窗口還沒達到觸發(fā)計算的條件(最新的消息時間戳-時間間隔>=窗口的上限),還是處于接受數(shù)據(jù)的狀態(tài),那么這個時間就大于10min了。說完了積壓,當然也就存在另外一種情況,就是消息亂序的非常厲害,比如5min之后才應(yīng)該到的數(shù)據(jù),提前到了,會提前觸發(fā)窗口進行計算,那么這樣就小于10min了。所以說窗口大小不是絕對的。
1.3、攝入時間(Ingestion Time)
攝入時間(Ingestion Time)其實就是數(shù)據(jù)進入flink系統(tǒng)的時間,Ingestion Time依賴于Source Operator的本地時鐘,Source Operator也算是進行流計算的第一道關(guān)卡,拿進入flink系統(tǒng)的時間作為后續(xù)窗口觸發(fā)的條件,也能在一定程度上保證消息的有序性,上訴提到過,消息亂序有三個位置,事件時間+水印屬于可以在一定程度上并且在三個位置都可以保證消息的有序性。而攝入時間(Ingestion Time)無法保證數(shù)據(jù)源-》flink系統(tǒng)這一段線路的有序性,哪怕是你亂序到達flink系統(tǒng)的,flink系統(tǒng)也會按照Source Operator的本地時鐘來給你打上有序的時間戳。但是攝入時間(Ingestion Time)可以保證你在flink內(nèi)部處理是有序的,因為流計算是分布式的,并且是多個算子相互協(xié)同計算,很可能內(nèi)部出現(xiàn)亂序,并且偌大的flink集群,部分機器時鐘不同步,網(wǎng)絡(luò)延遲的,都是可能存在的,使用攝入時間(Ingestion Time),這個時間在進入flink系統(tǒng)也帶上了,后續(xù)不會因為在不同的Operator進行處理而被修改。
1.4、處理時間(Processing Time)
處理時間就相對好理解一點,處理時間就是數(shù)據(jù)在進行操作算子計算的過程中所在的那臺執(zhí)行機的本地時鐘,由于flink集群存在很多執(zhí)行機,那么都會單獨維護自己的處理時間。使用處理時間是完全不能夠保證消息的有序性的,所以對時間準確性有要求的話,不要使用處理時間。處理時間的優(yōu)點就是性能會相對好一些。
二、EventTime和Watermark
2.1、概述
Flink支持EventTime這個時間屬性,相對于其他流式計算框架,算是一大優(yōu)點。EventTime和Watermark主要就是為了解決,在面對消息存在亂序的情況下,盡可能的保證每條消息能夠準確的落在所屬的窗口,即使你是延遲到達,這樣子才可以保證每個窗口數(shù)據(jù)的完整性,最紅指標的準確性。Flink會用最新的事件時間減去固定時間間隔(用戶自己設(shè)置的)作為Watermark,該時間間隔為用戶外部配置的支持最大延遲到達的時間長度,也就是說不會有事件超過該間隔到達,否則就認為是遲到事件或者是異常事件。
2.2、順序事件中的Watermark
理想狀態(tài)下,如果數(shù)據(jù)元素的事件時間是有序的,那么水位線也會隨著時間逐漸增長,直至觸發(fā)窗口計算。

如上圖所示,窗口大小為5,時間間隔為1,那么當接收到6這個事件時間以后,6-1=5>=5,則觸發(fā)上一個窗口進行計算??梢园l(fā)現(xiàn)在有序的數(shù)據(jù)流面前,Watermark只是起了一個標記作用,反而會因為容忍的時間間隔,導致窗口晚計算。
2.3、亂序事件中的Watermark
在現(xiàn)實生活中,數(shù)據(jù)元素往往不是有序的,這時候就需要Watermark出場,見下圖:

上圖的窗口大小為20,時間間隔為4,可以發(fā)現(xiàn)當24進入flink以后,水位線等于24-4=20>=20,滿足窗口觸發(fā)條件了,但是這時19這個事件時間還未進入這個窗口,這就表示被丟棄了。通過這張圖說明,容忍的時間間隔短了,如果設(shè)置成5,那么19就不會被丟棄,但是這種需求往往是無止盡的,我們在做流計算時,應(yīng)該要想著如何去解決數(shù)據(jù)流產(chǎn)生亂序的根本原因,而不是一味的增加容忍時間。
2.4、并行(分布式)數(shù)據(jù)流中的Watermark
上述例子都是單個task來解釋,這是為了幫助前期好理解,但實際情況上并不是這樣的,F(xiàn)link是一個分布式的流式計算框架,必定存在單個算子,也有多個task并發(fā)運行的情況,所以實際情況見下圖:

Watermark在Source Operator中生成,并且在每個Source Operation的子Task中都會獨立生成Watermark。在Source Operator的子任務(wù)中生成后就會更新該Task的Watermark,且會逐步更新下游算子中的Watermark水位線,隨后一致保持在該并發(fā)之中,直至下一次的Watermark的生成,并對前面的Watermark進行覆蓋。如圖所示,黃色的小框框標的都是當前這個算子子任務(wù)的Watermark。仔細看其實會發(fā)現(xiàn)上下游關(guān)系存在多對一或者一對多,那么逐步更新下游算子中的Watermark水位線時就會出現(xiàn)多個Watermark同時更新一個算子Task的當前水位線,這時Flink會選擇最小的水位線來更新,這也是為了極大的保證數(shù)據(jù)不會因為延遲而被丟失了。
三、單純延遲和亂序?qū)α饔嬎愕挠绊?/h1>
3.1、單純延遲
單純延遲指的是,數(shù)據(jù)還是有序的,只不過因為各種原因來得非常慢,或者數(shù)據(jù)量很大,flink消費消息造成了積壓,這種都可以稱作為單純性的延遲,如果這種使用攝入時間或者處理時間,對于對時間區(qū)間有嚴格要求的指標,就完成不能夠保證了。如果使用事件時間的話,那么造成的直觀影響就是每個窗口觸發(fā)計算的時間都會延遲(相對于絕對時間),其實剛才也有介紹過,那么這種情況,直觀的影響就是數(shù)據(jù)產(chǎn)出非常慢,無法達到時效性。
3.2、亂序流
如果是數(shù)據(jù)流亂序了,那么可以使用事件時間+Watermark,可以在一定程度上保證亂序的數(shù)據(jù)流也能落在正確的窗口上。
四、如何科學的指定Watermark
之前有介紹過事件時間+Watermark可以在一定程度上保證數(shù)據(jù)流亂序的準確性,那么這個準確性如何,以及設(shè)置完Watermark以后,flink運行的性能如何,其實對這個Watermark的設(shè)置非常有考究。
一般在進行流計算任務(wù)開發(fā)的時候,如果有用到事件時間,都會手動設(shè)置一個Watermark,也稱之為容忍度,但是這個時間過短,可能還是會造成一部分數(shù)據(jù)丟失,過長,會導致窗口觸發(fā)計算時間延后,對性能影響較大,所以這個時間間隔,需要科學的測量。
一般來講,使用事件時間,數(shù)據(jù)本身就有一個時間戳,這個時間往往是數(shù)據(jù)產(chǎn)生的時間,我們在flink任務(wù)當中,當數(shù)據(jù)流進入flink系統(tǒng)以后,手動加一下時間戳,這樣就得到兩個時間戳了,我們可以讓測試任務(wù)上線跑一段時間,然后將這一段時間的數(shù)據(jù)落到關(guān)系型數(shù)據(jù)庫,進行簡單的數(shù)據(jù)分析,可以看出最大的延遲消息延遲了多少,百分比啥的,都是可以分析出來的,有了這個根據(jù),在來設(shè)置時間間隔,就更加運籌帷幄了。