7.一文搞懂Flink中窗口的概念

1.前言

在上一篇文章當(dāng)中說了,如果需要進(jìn)行雙流join操作,可以選擇在窗口的范圍內(nèi)進(jìn)行,join操作會(huì)以窗口范圍內(nèi)的所有數(shù)據(jù)做inner join,然后將匹配到的所有數(shù)據(jù)交給計(jì)算函數(shù)進(jìn)行處理,這就是窗口join的執(zhí)行方式,但是這里也有一個(gè)之前沒有提到過的概念,那就是“窗口”。

窗口在數(shù)據(jù)計(jì)算的過程中很常見,它要做的實(shí)際上就是在沒有盡頭的數(shù)據(jù)流中切割出一段一段的范圍區(qū)間,然后對這個(gè)區(qū)間的數(shù)據(jù)進(jìn)行相應(yīng)的計(jì)算工作。接下來,我們就本著這個(gè)理念出發(fā),去聊聊Flink中窗口到底是什么。


2.時(shí)間語義

看到這里你可能會(huì)有個(gè)疑問,為什么我要聊窗口,但是這里卻寫的是時(shí)間語義呢?其實(shí)這不難理解,大家想一下,窗口本身就是在流式數(shù)據(jù)中切割出來的一個(gè)一個(gè)的小部分,這里會(huì)有一個(gè)邏輯切割的動(dòng)作,那這個(gè)動(dòng)作依賴的是什么呢?那就是時(shí)間概念啦,不過要注意一下,F(xiàn)link是以事件為驅(qū)動(dòng)的,這里說的時(shí)間二字,大家可以理解為Flink對流動(dòng)數(shù)據(jù)的一個(gè)判斷依據(jù),也可以說是數(shù)據(jù)自身的順序標(biāo)識!如果不去使用時(shí)間語義,那在流動(dòng)中的數(shù)據(jù)就沒有先后之分,數(shù)據(jù)開窗的結(jié)果就會(huì)產(chǎn)生誤差。

在Flink中一共有三個(gè)時(shí)間語義,它們分別是souce讀取數(shù)據(jù)時(shí)的攝入時(shí)間、數(shù)據(jù)進(jìn)入到程序中的處理時(shí)間、數(shù)據(jù)本身自帶的時(shí)間戳的事件時(shí)間。在這三種語義中,攝入時(shí)間出現(xiàn)的頻率極低,我本身并沒有使用過,所以不做講解。而處理時(shí)間和事件時(shí)間就能夠滿足極大多數(shù)場景的要求了,這里也就圍繞事件事件和處理時(shí)間來說。

2.1 處理時(shí)間

處理時(shí)間是個(gè)好東西,因?yàn)樗仁录r(shí)間要簡單很多。因?yàn)樗菙?shù)據(jù)進(jìn)入到Flink程序的時(shí)間,所以數(shù)據(jù)肯定是秉承著先后順序的,也就是滿足時(shí)間上的單調(diào)遞增的,實(shí)際上就是順序流。順序流理所當(dāng)然的不會(huì)發(fā)生數(shù)據(jù)亂序的情況嘛。在開窗的時(shí)候,不需要各個(gè)節(jié)點(diǎn)的協(xié)調(diào),所以邏輯上更加清晰明顯。

2.2 事件時(shí)間

事件時(shí)間就要復(fù)雜一些了,可以想一下。事件時(shí)間本身就是數(shù)據(jù)被創(chuàng)建時(shí)候的時(shí)間戳信息,雖然從埋點(diǎn)出發(fā)的時(shí)候順序保持單調(diào)。但是Flink畢竟是分布式的計(jì)算,必然會(huì)出現(xiàn)因?yàn)閭鬏數(shù)膯栴}和不同服務(wù)器之間性能上的差異而產(chǎn)生的數(shù)據(jù)亂序,讓后產(chǎn)生的數(shù)據(jù)比先產(chǎn)生的數(shù)據(jù)先進(jìn)入到程序,這也就導(dǎo)致了無法用其本身自帶的時(shí)間戳進(jìn)行數(shù)據(jù)現(xiàn)后的判斷了,因此如果選擇了事件時(shí)間語義,就要通過一個(gè)名叫水位線的機(jī)制來標(biāo)明當(dāng)前數(shù)據(jù)流的時(shí)間進(jìn)展。


3.水位線

既然上面已經(jīng)提到了,事件時(shí)間通過水位線機(jī)制來判斷數(shù)據(jù)流的時(shí)間進(jìn)展,那就要先談?wù)勊痪€之后才能去聊窗口了??吹竭@里大家不妨想一下,為什么事件時(shí)間要引入一個(gè)水位線的機(jī)制才能使用窗口計(jì)算呢?而處理時(shí)間就不用這么麻煩呢?這是因?yàn)榇翱谟?jì)算是一種在流式數(shù)據(jù)中進(jìn)行批計(jì)算的方式,它需要湊夠所有屬于當(dāng)前開窗范圍內(nèi)的數(shù)據(jù)都來了,才能出發(fā)計(jì)算。處理時(shí)間中沒有亂序情況,所以當(dāng)選用處理時(shí)間語義的時(shí)候,屬于當(dāng)前窗口的最后一條數(shù)據(jù)來了之后窗口即可觸發(fā)計(jì)算。但是事件時(shí)間可能會(huì)出現(xiàn)屬于當(dāng)前窗口的數(shù)據(jù)在能夠觸發(fā)窗口計(jì)算的數(shù)據(jù)后面到來的情況發(fā)生,這就會(huì)導(dǎo)致數(shù)據(jù)來了窗口已經(jīng)不見了,不僅造成了數(shù)據(jù)丟失還影響了計(jì)算結(jié)果。

這就引入了水位線的作用,為了避免上述情況的發(fā)生,我們可以通過在數(shù)據(jù)流中加入一個(gè)標(biāo)記來表明當(dāng)前數(shù)據(jù)流的時(shí)間進(jìn)展,這個(gè)標(biāo)記就是水位線機(jī)制。而且水位線機(jī)制還能以廣播的形式發(fā)送給下游所有任務(wù),也就是不會(huì)發(fā)生在下游多并行度的情況下,一個(gè)水位線標(biāo)識只進(jìn)入到了一個(gè)并行度而導(dǎo)致其他并行度中的窗口無法觸發(fā)計(jì)算的情況發(fā)生。

3.1 順序流中的水位線

那既然水位線機(jī)制是為了指明亂序流中的時(shí)間進(jìn)展而指定的,那順序流中是不是就不要水位線的概念了呢?實(shí)際上不然,水位線本身就是用來標(biāo)記時(shí)間進(jìn)展的,并不是單純的為了事件時(shí)間而服務(wù)。只不過在順序流生成水位線的時(shí)候可以指定水位線周期生成,因?yàn)槿绻縼硪粭l數(shù)據(jù)就生成一條水位線,就會(huì)導(dǎo)致出現(xiàn)大量一致的水位線,這不僅會(huì)浪費(fèi)性能,還能增加無用的集群壓力。

3.2 亂序流中的水位線

亂序流中的水位線有個(gè)問題需要考慮,因?yàn)樗痪€是從時(shí)間戳中提取出來的,如果正好趕上從遲到數(shù)據(jù)中生成水位線,就會(huì)造成“時(shí)光倒流”的情況發(fā)生,所以就需要在生成水位線之后對時(shí)間戳做一個(gè)判斷,判斷當(dāng)前水位線是否大于之前的那個(gè),只有大于才會(huì)插入。而且還會(huì)有個(gè)問題,那就是當(dāng)觸發(fā)窗口計(jì)算的數(shù)據(jù)來了之后依然有遲到的數(shù)據(jù)沒有過來呢,可是窗口卻觸發(fā)了計(jì)算。那這條數(shù)據(jù)就會(huì)丟失,所以在指明亂序流事件語義的時(shí)候,還要加入一個(gè)延遲時(shí)間的概念,讓窗口即使讀到了能夠觸發(fā)計(jì)算的數(shù)據(jù),卻能不觸發(fā)計(jì)算,繼續(xù)等待一段時(shí)間。只有當(dāng)延遲的時(shí)間到達(dá)了,才會(huì)觸發(fā)計(jì)算,銷毀窗口。如果還有遲到的數(shù)據(jù),只能放到側(cè)輸出流中做保存操作了。

3.3 生成水位線的策略

在flink中,有一個(gè)專門的方法用來生成水位線,這個(gè)方法是assignTimestampsAndWatermarks。它用來為流中的數(shù)據(jù)分配時(shí)間戳,并生成水位線來指示事件時(shí)間。如果想周期生成水位線,可以在環(huán)境變量的配置參數(shù)中通過調(diào)用setAutoWatermarkInterval方法來完成。

在Flink內(nèi)部有兩個(gè)能夠開箱即用的水位線生成器,分別對應(yīng)單調(diào)和亂序,其實(shí)在底層這兩種方法實(shí)現(xiàn)方式是類似的,如果對于亂序流中傳入的延遲時(shí)間參數(shù)為0的話,那這兩種水位線生成器就沒有什么差別了。

3.4 水位線的傳遞

水位線的傳遞是一件比較頭疼的事情,大家試想一下如果上游任務(wù)有5條并行度,下游任務(wù)有3條并行度。當(dāng)上游向下游傳遞數(shù)據(jù)的時(shí)候,水位線也是向下游傳遞的。3個(gè)并行度接收5個(gè)并行度發(fā)送過來的數(shù)據(jù),那就證明肯定會(huì)有一個(gè)到兩個(gè)的下游并行子任務(wù)(3并行度)需要接收一個(gè)以上的上游并行子任務(wù)(5并行度),那這個(gè)時(shí)候流淌在水位線里面的水位線怎么傳遞?怎么搞?

其實(shí)這種情況是這個(gè)樣子的,在多個(gè)上游任務(wù)向一個(gè)下游任務(wù)發(fā)送數(shù)據(jù)的時(shí)候,下游任務(wù)會(huì)給每一個(gè)向它發(fā)送數(shù)據(jù)的上游任務(wù)設(shè)置一個(gè)“分區(qū)水位線”,而自己要接收的水位線就是當(dāng)前分區(qū)水位線中最小的那個(gè)。上游的任務(wù)每做一次水位線更新,下游子任務(wù)就會(huì)從更新后的水位線中拿到最小的那個(gè)作為自己的水位線。

4.窗口

前面說了這么多,終于要到最重要的窗口這一塊啦。之前也說了,窗口計(jì)算是在流式數(shù)據(jù)上切割下來一塊一塊的范圍數(shù)據(jù)做批處理的,單純的普通切割沒辦法滿足所有的場景,所以需要從多個(gè)方面去考慮窗口的開窗類型。

4.1 按時(shí)間開窗 與 按數(shù)據(jù)數(shù)量開窗

4.1.1時(shí)間窗口

按照時(shí)間開窗的方法是通過窗口的開始時(shí)間和結(jié)束時(shí)間進(jìn)行窗口的切割的,整體采用的是左閉右開的取數(shù)范圍,窗口的數(shù)據(jù)范圍是開窗時(shí)間到關(guān)窗時(shí)間-1毫秒,這個(gè)區(qū)間,即:[開窗時(shí)間,關(guān)窗時(shí)間-1)

按照時(shí)間的窗口可以分為滾動(dòng)、滑動(dòng)、會(huì)話、全局。
滾動(dòng):固定范圍開窗,窗口不存在重疊部分。

處理時(shí)間 -window(TumblingProcessingTimeWindows.of(Time.seconds(5),Time.hours(-8))) //定義為中國時(shí)區(qū)
事件時(shí)間 -window(TumblingEventTimeWindows.of(Time.seconds(5)))

滑動(dòng):按照固定范圍和滑動(dòng)步長進(jìn)行開窗,窗口存在重疊部分。

.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    
window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))

會(huì)話:按照固定時(shí)間沒有來數(shù)據(jù)而關(guān)閉窗口。

//處理時(shí)間會(huì)話窗口
.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))

window(EventTimeSessionWindows.withGap(Time.seconds(10)))

全局:它是將所有的數(shù)據(jù)匯聚到一個(gè)窗口,并且需要自定義觸發(fā)邏輯,否則永遠(yuǎn)都不會(huì)觸發(fā)計(jì)算。

4.1.2計(jì)數(shù)窗口

計(jì)數(shù)窗口就是按照元素的格式進(jìn)行數(shù)據(jù)截取的,當(dāng)數(shù)據(jù)達(dá)到個(gè)數(shù)的時(shí)候就觸發(fā)窗口的計(jì)算。在底層代碼實(shí)現(xiàn)上,是通過全局窗口來實(shí)現(xiàn)的。
滾動(dòng)計(jì)數(shù)窗口:

stream.keyBy(...)
.countWindow(10)

滑動(dòng)計(jì)數(shù)窗口

stream.keyBy(...)
.countWindow(10,3)

事件時(shí)間會(huì)話窗口

stream.keyBy(...)
.window(GlobalWindows.create()); 
//需要自定義觸發(fā)計(jì)算

4.2 窗口計(jì)算

在介紹完了窗口如何分配之后,就可以進(jìn)行下一個(gè)步驟了。那就是窗口是如何進(jìn)行計(jì)算的,不能處理數(shù)據(jù)的時(shí)候只進(jìn)行劃分不進(jìn)行計(jì)算吧。
計(jì)算邏輯需要根據(jù) 當(dāng)前數(shù)據(jù)是否被key by方法處理過來劃分,是按鍵分區(qū)還是非按鍵分區(qū)。只有這樣判斷之后,才好繼續(xù)。

4.2.1 按鍵分區(qū)狀態(tài)

按鍵分區(qū)狀態(tài)通過調(diào)用window方法完成計(jì)算。如果數(shù)據(jù)被key by處理過后,那么數(shù)據(jù)就會(huì)按照key進(jìn)行邏輯分區(qū),在keyed流上做開窗計(jì)算的時(shí)候,每一個(gè)key都會(huì)有一個(gè)自己的計(jì)算窗口。

4.2.2 非按鍵分區(qū)狀態(tài)

非按鍵分區(qū)狀態(tài)通過調(diào)用windowall方法完成計(jì)算,如果沒有被key by處理過后的數(shù)據(jù)進(jìn)行開窗,那么所有的數(shù)據(jù)都會(huì)進(jìn)入到當(dāng)前的這個(gè)窗口,也就會(huì)讓窗口的并行度變成了 1 .

5.窗口函數(shù)

如果要對數(shù)據(jù)流做窗口計(jì)算,就先要生成其水位線,然后指定窗口內(nèi)數(shù)據(jù)的分配原則,在這一切都做完之后,就需要進(jìn)行具體窗口的計(jì)算邏輯編寫了。

窗口函數(shù)可以按照計(jì)算方式的不同劃分為增量聚合窗口函數(shù)和全窗口函數(shù)。

5.1 增量聚合函數(shù)

增量聚合函數(shù)是對窗口數(shù)據(jù)的計(jì)算方式,具體可以分為兩大類:歸約函數(shù)和聚合函數(shù)

5.1.1 歸約函數(shù)

歸約函數(shù)中保存的是窗口內(nèi)數(shù)據(jù)的聚合值,每當(dāng)有新的數(shù)據(jù)進(jìn)入到窗口之后,就會(huì)觸發(fā)歸約計(jì)算,讓新進(jìn)入的數(shù)據(jù)歸約到已經(jīng)聚合的數(shù)據(jù)結(jié)果中,但是要求進(jìn)來的數(shù)據(jù)類型要和歸約的數(shù)據(jù)數(shù)據(jù)類型一致。

5.1.2 聚合函數(shù)

聚合函數(shù)與歸約函數(shù)類型,當(dāng)窗口選擇使用聚合函數(shù)進(jìn)行計(jì)算的時(shí)候,如果選用聚合計(jì)算,就能夠讓中間聚合的值的數(shù)據(jù)類型與進(jìn)入數(shù)據(jù)流中的數(shù)據(jù)類型不一致

5.2 全窗口函數(shù)

全窗口函數(shù)和增量聚合函數(shù)的區(qū)別在于,增量聚合函數(shù)是來一條數(shù)據(jù)進(jìn)行一次計(jì)算,而全窗口函數(shù)是等到數(shù)據(jù)都來了之后再進(jìn)行計(jì)算。雖然與增量聚合函數(shù)相比,全窗口函數(shù)在進(jìn)行計(jì)算的時(shí)候?qū)涸斐傻膲毫Ρ容^高,但是選用這種計(jì)算規(guī)則能夠獲取運(yùn)行時(shí)的上下文信息。

在函數(shù)種類方面,全窗口函數(shù)可以分為 窗口函數(shù) 和處理窗口函數(shù)。但是窗口函數(shù)的功能被處理窗口函數(shù)給全覆蓋了,所以可以直接使用處理窗口函數(shù)。通過這個(gè)函數(shù),能夠獲得窗口內(nèi)數(shù)據(jù)的迭代器信息。

5.3 窗口函數(shù)聯(lián)合使用

上面說了,增量聚合窗口和全窗口函數(shù)都有各自的計(jì)算邏輯,前者壓力小但是無法獲得上下文信息,后者壓力大但是可以獲得上下文信息。如果想要同時(shí)擁有二者的能力就可以將這兩種函數(shù)聯(lián)合在一起使用。前者進(jìn)行當(dāng)前窗口的數(shù)據(jù)計(jì)算,后者負(fù)責(zé)接收前面窗口的計(jì)算結(jié)果,雖然這會(huì)讓全窗口函數(shù)中的迭代器中只有一個(gè)數(shù)據(jù),但是卻能夠獲得運(yùn)行時(shí)上下文信息。


6.怎么保證窗口計(jì)算的時(shí)候數(shù)據(jù)不丟失

Flink中為了保證窗口計(jì)算的時(shí)候,數(shù)據(jù)不會(huì)丟失給數(shù)據(jù)設(shè)置了三重保障。
1.水位線延遲時(shí)間
在使用亂序流時(shí)間語義的時(shí)候,可以指定水位線的延遲時(shí)間,讓窗口能夠在延遲的區(qū)間內(nèi)等待遲到的數(shù)據(jù)。
2.允許遲到數(shù)據(jù)
在窗口計(jì)算的時(shí)候,開啟允許遲到數(shù)據(jù)。這樣窗口就在被水位線觸發(fā)計(jì)算之后,繼續(xù)等待一段時(shí)間,當(dāng)有新的數(shù)據(jù)進(jìn)入到窗口時(shí),會(huì)觸發(fā)二次計(jì)算。
3.開啟遲到數(shù)據(jù)進(jìn)入到側(cè)輸出流中,雖然有了上面兩種保障的支持,數(shù)據(jù)丟失的可能性已經(jīng)很低了。但是萬一有漏網(wǎng)之魚就不太好了,所以可以通過這個(gè)功能,讓遲到的數(shù)據(jù)進(jìn)入到側(cè)輸出流中進(jìn)行保存。


7.其他API

1.觸發(fā)器
用來自定義窗口何時(shí)觸發(fā)計(jì)算的。
2.移除器
在窗口函數(shù)執(zhí)行之前,移除某些數(shù)據(jù)
3.允許延遲
讓數(shù)據(jù)在水位線延遲之后到達(dá)的數(shù)據(jù)依然能夠進(jìn)入到窗口中。
4.側(cè)輸出流
讓在允許延遲之后到達(dá)的數(shù)據(jù)進(jìn)入通過這個(gè)方法,輸入到對應(yīng)的topic中。


8.結(jié)語
今天的這篇文章字?jǐn)?shù)有點(diǎn)多,但是卻基本上涵蓋了所有與窗口相關(guān)的內(nèi)容,所以花點(diǎn)時(shí)間看也是應(yīng)該的。截至到目前為止,已經(jīng)將Flink中很多的特性都進(jìn)行了講解,除了基礎(chǔ)計(jì)算算子之外。窗口、狀態(tài)、檢查點(diǎn)、多留轉(zhuǎn)換都說完了。那接下來就講講Flink中的一些理論知識。當(dāng)理論知識講完了,也就沒什么好說的了,搞定這些內(nèi)容的話,F(xiàn)link一定程度上的使用也沒有問題了。到時(shí)我會(huì)繼續(xù)講講與FlinkSQL相關(guān)的內(nèi)容的,如果我寫的東西對大家有幫助,還希望能夠點(diǎn)贊、投幣、轉(zhuǎn)發(fā)、收藏哦!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容