Event Time / Processing Time / Ingestion Time
Flink 在流應(yīng)用中支持不同的時(shí)間概念:
- Processing Time: 處理時(shí)間是指執(zhí)行相應(yīng)操作所在的機(jī)器的系統(tǒng)時(shí)間。
當(dāng)一個(gè)流應(yīng)用使用“處理時(shí)間”的概念時(shí),所有的基于時(shí)間的操作符(如 時(shí)間window)會(huì)使用執(zhí)行該操作符所在的機(jī)器的時(shí)間。一個(gè)按小時(shí)聚合數(shù)據(jù)的window函數(shù)會(huì)包含兩個(gè)整點(diǎn)時(shí)間之間,所有到達(dá)該操作符的數(shù)據(jù)。例如,如果一個(gè)應(yīng)用在上午9:15開始運(yùn)行,那么第一個(gè)按小時(shí)聚合的window會(huì)聚合9:15到10:00的數(shù)據(jù),下一個(gè)window會(huì)聚合10:00到11:00的數(shù)據(jù),以此類推。
處理時(shí)間是最簡(jiǎn)單的時(shí)間概念,它不需要在數(shù)據(jù)流與機(jī)器間進(jìn)行協(xié)調(diào)。它提供了最好的性能與最低的延遲。但是,在分布式,異步的環(huán)境中,處理時(shí)間不會(huì)提供結(jié)果一致性,因?yàn)榻Y(jié)果會(huì)受到各種因素的影響,如:數(shù)據(jù)到達(dá)系統(tǒng)的速度(如從消息隊(duì)列消費(fèi)的速度),系統(tǒng)內(nèi)兩個(gè)operator間的數(shù)據(jù)流的速度,以及停電(有計(jì)劃停電或其他因素)。(注:這很容易理解,processing time語(yǔ)義下,消息的速度會(huì)對(duì)結(jié)果產(chǎn)生很大的影響,比如window操作符,同樣的數(shù)據(jù),如果消息速度快,則第一個(gè)window會(huì)計(jì)算100個(gè)數(shù)據(jù),消息速度慢,則第一個(gè)window可能只會(huì)計(jì)算10個(gè)數(shù)據(jù),顯然兩種情況下的window的計(jì)算結(jié)果不會(huì)相同) - Event Time:事件時(shí)間是指數(shù)據(jù)從設(shè)備中產(chǎn)生的時(shí)間。這個(gè)時(shí)間一般都內(nèi)置在數(shù)據(jù)中,作為事件時(shí)間戳,在進(jìn)入flink前就存在,且可以被flink從數(shù)據(jù)中抽取出來(lái)。在Event time語(yǔ)義中,時(shí)間的快慢取決于數(shù)據(jù),而不是系統(tǒng)時(shí)間。Event Time語(yǔ)義的程序必須定義如何產(chǎn)生 Event Time Watermarks,watermark機(jī)制用于標(biāo)識(shí)當(dāng)前事件時(shí)間的進(jìn)度(即:在事件時(shí)間語(yǔ)義下,當(dāng)前時(shí)間是多少)。關(guān)于watermark機(jī)制會(huì)在本文后面介紹。
在理想情況下,事件時(shí)間語(yǔ)義下的處理可以做到結(jié)果的一致性與完整性,而不管數(shù)據(jù)到達(dá)的時(shí)間與順序。然而,除非數(shù)據(jù)確定是按(時(shí)間戳)順序到達(dá)的,否則事件時(shí)間語(yǔ)義下的處理會(huì)因?yàn)榈却赡墚a(chǎn)生的亂序(遲到)數(shù)據(jù)而造成一定的延遲。但是又由于它只能等待一個(gè)有限的一段時(shí)間,因此它限制了應(yīng)用在事件時(shí)間語(yǔ)義下的結(jié)果完整性的程度。
假設(shè)所有的數(shù)據(jù)都已經(jīng)到達(dá),事件時(shí)間語(yǔ)義下操作符會(huì)按照預(yù)期的那樣執(zhí)行操作,并產(chǎn)生準(zhǔn)確且一致性的結(jié)果,即便因?yàn)橛羞t到數(shù)據(jù)而造成數(shù)據(jù)的亂序,或是處理歷史數(shù)據(jù)。例如,按小時(shí)聚合的window僅會(huì)包含該window所代表的時(shí)間段的數(shù)據(jù),不管數(shù)據(jù)到達(dá)的window的順序或是它們?cè)谡鎸?shí)世界的什么時(shí)間被處理(注:這里與processing time形成對(duì)比,對(duì)于歷史數(shù)據(jù),event time語(yǔ)義下的window不管何時(shí)運(yùn)行應(yīng)用,每次都會(huì)聚合相同的數(shù)據(jù)進(jìn)入同一個(gè)window,這是因?yàn)閷?duì)window的定義是按照數(shù)據(jù)內(nèi)置的時(shí)間戳來(lái)定義的,而在processing time語(yǔ)義下處理歷史數(shù)據(jù),則可能不應(yīng)該在同一個(gè)window中執(zhí)行聚合操作的數(shù)據(jù)被分配到了同一個(gè)window,造成結(jié)果的不準(zhǔn)確,多次運(yùn)行也可能出現(xiàn)每次結(jié)果都不一致)
需要注意的是,有些時(shí)候,在事件時(shí)間語(yǔ)義下處理實(shí)時(shí)數(shù)據(jù)時(shí),為了保證及時(shí)處理,會(huì)使用一些processing time 操作符(注:這一句話現(xiàn)在理解了,在讀完整篇文檔后,就會(huì)有所理解,詳細(xì)的解釋,在后面的“空轉(zhuǎn)的Source”部分) -
Ingestion time:攝入時(shí)間是指數(shù)據(jù)進(jìn)入flink的時(shí)間。在Source操作符中,每個(gè)數(shù)據(jù)都會(huì)獲取source操作符的當(dāng)前時(shí)間作為其時(shí)間戳,下游的基于時(shí)間操作符(如window)處理時(shí)使用的就是這個(gè)時(shí)間。
攝入時(shí)間的概念介于事件時(shí)間與處理時(shí)間之間。對(duì)比于處理時(shí)間,攝入時(shí)間比其消耗更多一些,但是提供可預(yù)測(cè)的結(jié)果。因?yàn)閿z入時(shí)間使用一個(gè)穩(wěn)定的時(shí)間戳(由source指定),不同的window操作符操作該數(shù)據(jù)時(shí),都會(huì)被引用相同的時(shí)間戳;而處理時(shí)間語(yǔ)義下的window操作符可能會(huì)將同一個(gè)數(shù)據(jù)放入不同的window中(取決于本地系統(tǒng)時(shí)間或者數(shù)據(jù)傳輸延遲)。
對(duì)比于事件時(shí)間,攝入時(shí)間語(yǔ)義下的程序不能處理亂序或者遲到數(shù)據(jù),但是程序不再需要指定如何生成watermark。
在flink內(nèi)部,對(duì)攝入時(shí)間的處理更像是使用自動(dòng)分配timestamp以及自動(dòng)生成watermark的事件時(shí)間。
設(shè)置時(shí)間語(yǔ)義
Flink DataStream 程序的第一部分通常會(huì)設(shè)置 時(shí)間語(yǔ)義(time characteristic)。這個(gè)設(shè)置定義了生成數(shù)據(jù)流的數(shù)據(jù)源的行為(如,是否需要指定timestamp),以及window操作符應(yīng)該使用哪種時(shí)間概念對(duì)數(shù)據(jù)進(jìn)行處理。
下面的例子展示了Flink程序按小時(shí)聚合數(shù)據(jù)。window的行為和時(shí)間語(yǔ)義相對(duì)應(yīng)。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<MyEvent> stream = env.addSource(new FlinkKafkaConsumer09<MyEvent>(topic, schema, props));
stream
.keyBy( (event) -> event.getUser() )
.timeWindow(Time.hours(1))
.reduce( (a, b) -> a.add(b) )
.addSink(...);
需要注意的是,想要在事件時(shí)間語(yǔ)義下運(yùn)行該程序,需要使用能夠?qū)γ總€(gè)數(shù)據(jù)定義事件時(shí)間以及生成watermark的source,如果source不滿足這個(gè)條件,程序需要在source后,指定timestamp assigner以及watermark generator。這些函數(shù)描述了如何從數(shù)據(jù)中提取事件時(shí)間以及該程序所能夠處理的數(shù)據(jù)最大亂序程度。
接下來(lái)的部分描述了timestamp與watermark背后的一些機(jī)制。對(duì)于如何使用timestamp assigner以及watermark generator,可以參閱 Generating Timestamps / Watermarks
Event Time 與 Watermarks
注意:Flink實(shí)現(xiàn)了Dataflow Model中的許多技術(shù)。為了更好的介紹 event time 與 watermarks,建議先閱讀下面的文章:
- Streaming 101 by Tyler Akidau
- The Dataflow Model paper
支持event time 的流處理器需要一個(gè)方法來(lái)衡量當(dāng)前event time時(shí)間標(biāo)尺下的處理進(jìn)度。例如,一個(gè)按小時(shí)聚合數(shù)據(jù)的window操作,流處理器需要在event time超過window的end time后,提醒window你可以執(zhí)行操作并關(guān)閉window了。
event time與processing time(注:這一段話中,將processing time替換為 系統(tǒng)時(shí)間 會(huì)更好理解寫2)是獨(dú)立處理的。例如:在一個(gè)應(yīng)用中(注:處理實(shí)時(shí)數(shù)據(jù)時(shí)),當(dāng)前event time可能稍稍落后于processing time,但是兩者都會(huì)以相同的速度流逝。但是另一方面,另一個(gè)流程序可能會(huì)在幾秒內(nèi)就能夠處理在event time語(yǔ)義下的幾周的數(shù)據(jù),比如讀取kafka topic中的歷史數(shù)據(jù)。
(注:為了簡(jiǎn)單的理解event time與processing time 的不同,可以理解為event time是以數(shù)據(jù)自帶的時(shí)間戳的時(shí)間作為時(shí)間坐標(biāo)系或者時(shí)間標(biāo)尺,而processing time是以真實(shí)世界中,程序正在運(yùn)行時(shí)的時(shí)間作為時(shí)間坐標(biāo)系或時(shí)間標(biāo)尺)
Flink中衡量event time的概念稱作 watermark。Watermark作為數(shù)據(jù)流的一部分,并且會(huì)攜帶一個(gè)時(shí)間戳 t。Watermark(t) 表示當(dāng)前event time坐標(biāo)系下的數(shù)據(jù)流已經(jīng)達(dá)到了時(shí)間t,這意味著應(yīng)該不會(huì)再有時(shí)間戳小于等于t的數(shù)據(jù)出現(xiàn)(注:也就是不會(huì)再有遲到數(shù)據(jù))
(注:為什么需要watermark?processing time是真實(shí)世界的時(shí)間,時(shí)間是平滑流逝的,而event time是從數(shù)據(jù)中抽取出來(lái)的時(shí)間,是離散的時(shí)間,因此引入watermark的概念,就是為了分隔不同的數(shù)據(jù)到不同的window中去,并且告知window數(shù)據(jù)全部到達(dá),可以執(zhí)行計(jì)算了。一個(gè)很簡(jiǎn)單的例子:一個(gè)window要聚合event time 在 10:00:00 到 10:10:00的數(shù)據(jù),而真實(shí)流入window操作符的event time可能不會(huì)這么湊巧,最后一個(gè)數(shù)據(jù)的時(shí)間戳恰好就是 10:10:00,因此watermark的作用就是告知window,event time的坐標(biāo)系下,已經(jīng)把所有合適的數(shù)據(jù)聚合起來(lái)了,你這個(gè)window可以執(zhí)行計(jì)算了)
下面的圖展示了流中數(shù)據(jù)的時(shí)間戳以及流中的watermark。示例中的數(shù)據(jù)是按照時(shí)間戳的順序進(jìn)行的排列,意味著watermark在流中是周期性產(chǎn)生的。

對(duì)于處理亂序的數(shù)據(jù)流,watermark起到很關(guān)鍵的作用,如下圖,數(shù)據(jù)并沒有按照時(shí)間戳的順序排列好。一般來(lái)說(shuō),watermark意味著在watermark產(chǎn)生的時(shí)刻起,所有時(shí)間戳小于watermark的數(shù)據(jù)都已經(jīng)到達(dá)了。一旦watermark到達(dá)一個(gè)操作符,操作符就會(huì)將它內(nèi)部的event time時(shí)間表調(diào)整到watermark所代表的時(shí)間。

注意event time的值要么來(lái)自剛剛流入source的數(shù)據(jù)的內(nèi)置時(shí)間戳,要么是來(lái)自該數(shù)據(jù)所觸發(fā)的watermark的時(shí)間戳。
并發(fā)流中的Watermark
watermark在source function中生成或是在其后通過方法來(lái)生成。source的每個(gè)并發(fā)的subtask一般都會(huì)生成各自的watermark。這些watermark定義了在某個(gè)source中的event time的值(即,在event time語(yǔ)義下,當(dāng)前時(shí)間是幾點(diǎn))。
因?yàn)閣atermark會(huì)隨著數(shù)據(jù)流在程序中流動(dòng),因此當(dāng)它到達(dá)某個(gè)operator后,就會(huì)修改這個(gè)operator的event time的值。一旦operator修改了event time的值,就會(huì)生成一個(gè)新的watermark向下游傳播。
一些操作符會(huì)消費(fèi)多個(gè)數(shù)據(jù)流;如:union操作,或者是緊跟著keyBy/partition函數(shù)的操作符。這些操作符當(dāng)前的event time是所有數(shù)據(jù)流中最小的event time值。當(dāng)輸入流更新時(shí)間后,操作符也會(huì)更新時(shí)間。
下圖展示了并發(fā)流中watermark的流動(dòng)以及操作符如何追蹤event time的。

注意:kafka source支持 per-partition watermark,你可以參閱這里
遲到數(shù)據(jù)
事實(shí)上,即便watermark的概念可以理解為,所有時(shí)間戳小于watermark的數(shù)據(jù)都已經(jīng)到達(dá)。但是真實(shí)情況下,確實(shí)會(huì)有數(shù)據(jù)違反了這樣的約定,這意味著,即便生成了watermark(t),但是仍然會(huì)有時(shí)間戳t' <= t的數(shù)據(jù)在隨后進(jìn)入數(shù)據(jù)流。事實(shí)上,在許多真實(shí)案例中,某些數(shù)據(jù)確實(shí)會(huì)延遲到達(dá),這使得定義一個(gè)所有數(shù)據(jù)都已經(jīng)進(jìn)入數(shù)據(jù)流的時(shí)間變得不可能。此外,即便遲到數(shù)據(jù)的遲到時(shí)間是有最大時(shí)間界限的,但是推遲太長(zhǎng)時(shí)間再生成watermark(這樣才能保證遲到的數(shù)據(jù)被正確處理)也不是我們期望的,因?yàn)檫@會(huì)造成處理的延遲。
針對(duì)這個(gè)情況,流程序需要明確遲到數(shù)據(jù)。遲到數(shù)據(jù)是指那些,當(dāng)數(shù)據(jù)到達(dá)時(shí),系統(tǒng)event time表的時(shí)間已經(jīng)流過了這個(gè)時(shí)間戳的數(shù)據(jù)??梢圆殚?Allowed Lateness 文檔學(xué)習(xí)如何處理event time語(yǔ)義下window操作符的遲到數(shù)據(jù)。
空轉(zhuǎn)的source Idling Source
目前,僅使用event time watermark generator時(shí),如果沒有數(shù)據(jù)流入,就不會(huì)有watermark產(chǎn)生。這意味著,如果數(shù)據(jù)流突然沒有數(shù)據(jù)流入了,event time不會(huì)再向前流動(dòng)(系統(tǒng)中的所有操作符的event time不會(huì)更新)。如:window操作符不會(huì)被新的watermark觸發(fā)計(jì)算操作,也就不會(huì)有這window的結(jié)果輸出。
為了規(guī)避這個(gè)問題,可以使用 periodic watermark assigner,它并不僅僅基于數(shù)據(jù)的時(shí)間戳生成watermark。一個(gè)示例的解決方法可以是:當(dāng)檢測(cè)到有一段時(shí)間沒有數(shù)據(jù)流入時(shí),可以使用processing time生成watermark。
Source可以使用 SourceFunction.SourceContext#markAsTemporarilyIdle來(lái)定義怎樣情況下該source處于空轉(zhuǎn)狀態(tài)。更多信息可以參閱Javadoc的文檔以及 StreamStatus。
調(diào)試watermark
參閱 Debugging Window & Event Time 部分學(xué)習(xí)如何在運(yùn)行時(shí)調(diào)試。
操作符如何處理watermark
操作符需要處理完給定的watermark后才可以將其轉(zhuǎn)發(fā)給下游操作符。如:WindowOperator 首先會(huì)計(jì)算那個(gè)window應(yīng)該被觸發(fā),并且直到得到所有應(yīng)該觸發(fā)計(jì)算的window的結(jié)構(gòu)后,才會(huì)吧watermark發(fā)送到下游。換句話說(shuō),觸發(fā)watermark生成的數(shù)據(jù),應(yīng)該比watermark更早的加入流(注:這樣才會(huì)保證計(jì)算結(jié)果的完整性和正確性)。
相同的規(guī)則適用于 TwoInpuStreamOperatro 。不同的是,這時(shí),operator中的watermark是其所有輸入流中watermark最小的那個(gè)。
詳細(xì)的過程在這幾個(gè)接口的實(shí)現(xiàn)類的方法中被定義:OneInputStreamOperator#processWatermark, TwoInputStreamOperator#processWatermark1 與TwoInputStreamOperator#processWatermark2
