1.前言
Flink一直是被成為了有狀態(tài)的流計算,并且還能夠通過對狀態(tài)的合理的調控與配置,來實現端到端的狀態(tài)一致性,實現數據的精準一次性傳輸。這是非常厲害的一個功能,我在前面也說了,Flink是如何完成狀態(tài)的保存的,也表明了要在這一篇文章當中從頭的聊一聊狀態(tài)編程到底是什么,都有什么類型的狀態(tài),在使用時到底該如何去使用。
2.什么是狀態(tài)
我一直都很不喜歡書本上那種非常教條的表述方式,因為它看起來總讓我覺得缺點什么,在我的思維中,如果想要表名一件事情,如果能在說概念之前拿一個現實生活中的例子去帶入效果會好很多。
例子:我們可以試想一下,Flink是一個數據計算框架,用來處理流入到它內部的所有數據。那如果把我們自己當作是Flink,而我們每一天的生活當作流入Flink中的數據是不是也可行呢?我們每天經歷的不同種生活,可以認為是一條條的數據流,我們需要對它們進行處理才能夠完成我們每天的正常生活。接下來我就用這個角色來代入Flink,去講講狀態(tài)對于Flink到底是個什么東西。
比如我今天早晨要去上班,上班的路上需要做地鐵。如果我的腦子里面沒有到公司的具體路線,我根本不會知道要坐上哪一條現路才能到達公司,這就相當于我無法完成從家到公司的這個程序任務。但是如果我知道這條路線的話,我就可以很順利的到達我的工位,開始進行當天的工作。這也就表明了我完成了從家到公司的這個程序任務,而我在從家到公司的路上所使用到的這條路線,就是“狀態(tài)”?。?!
所以,在Flink計算的過程中,有一些任務需要依賴一些其他的數據內容才能夠計算出正確的結果,這些被依賴的數據實際上就是Flink中的狀態(tài)。
3.狀態(tài)的分類
在Flink處理數據的過程中,有的時候需要依賴其他的數據才能夠獲得結果,有的不需要依賴其他的數據就能獲得到計算結果。那這些不需要依賴其他數據就能夠得到結果的算子就叫無狀態(tài)算子,反之則是有狀態(tài)的算子。
3.1 托管狀態(tài)和原始狀態(tài)
上面說的有狀態(tài)算子和無狀態(tài)算子只是針對于不同的計算場景而劃分的,并沒有固定且具體的無狀態(tài)或有狀態(tài)算子。但是狀態(tài)的類型被官方進行了劃分,分為托管狀態(tài)和原始狀態(tài),原始狀態(tài)是需要我們自己定義的,注意這個自己定義相當復雜,無論是狀態(tài)的存儲訪問權限、故障恢復之類的東西都需要開發(fā)人員自己動手,這需要相當強的功力,我本人肯定是不會的,所以也沒有辦法講給大家。
托管狀態(tài)就是Flink內部為我們實現的、已經給予了一系類完整實現的完整功能。也是我本人經常用到的一種狀態(tài)類型,在內部還能夠按照數據流是否按key進行分區(qū)而劃分成為了算子狀態(tài)和鍵控狀態(tài)。
3.1.1 算子狀態(tài)
在Flink計算過程中,會按照設定好的并行度對計算任務進行并行處理,每一個并行度就是一個并行子任務。算子狀態(tài)所覆蓋的范圍就是每一個并行子任務,這一個并行度中的所有計算流程都能夠訪問到相同的狀態(tài)。
3.1.2 按鍵分區(qū)狀態(tài)(鍵控狀態(tài))
這種狀態(tài)類型就是非常常見的一種狀態(tài)了,當數據被keyby傳入的鍵值選擇器處理后,數據會按照不同的鍵值流入到不同的邏輯分區(qū)中,每一個邏輯分區(qū)中的鍵都是一樣的,鍵控狀態(tài)也是按照這個key來進行維護的,也就是說一個key對應了一組狀態(tài),不同key之間的狀態(tài)是無法讀取的。
3.2 鍵控狀態(tài)與算子狀態(tài)
3.2.1 鍵控狀態(tài)隔離狀態(tài)方式
鍵控狀態(tài)針對于狀態(tài)的隔離方式,是通過把當前key與屬于這個key中的數據綁定成為一個map形式,這樣就能夠保證后來的數據可以通過這個map來找到自己所對應的狀態(tài)內容了。
3.2.2 算子狀態(tài)隔離狀態(tài)方式
算子狀態(tài)與鍵控狀態(tài)相比,是一種比較底層的狀態(tài),因為所有不需要考慮key的算子一旦使用了狀態(tài),都可以被稱為算子狀態(tài)。并且Flink的故障恢復也是靠著狀態(tài)來完成的。但是由于算子狀態(tài)應用的場景比較少,所以名氣不如鍵控狀態(tài)大。在數據處理的過程中,只要是這條數據被發(fā)送到了一個并行子任務,被一個算子處理的時候,處理過程中一旦使用狀態(tài),那這個狀態(tài)就會被鎖定到這個算子身上。因此,無論key是否一樣,只要是被發(fā)送到了統(tǒng)一并行子任務,那么所有的數據都能夠共享各個算子各自的狀態(tài)。
4.狀態(tài)類型
4.1 算子狀態(tài)類型
算子狀態(tài)類型支持三種不同的類型結構:ListState、UnionListState、BroadcastState,咱么就按照這三種狀態(tài)類型分別進行闡述。
4.1.1 ListState
在算子狀態(tài)當中,會按照不同的并行子任務來維護一個ListState,這個并行任務上的所有狀態(tài)項都會被放到這個ListState當中,每一個狀態(tài)項自然就是List中的一個最小元素。如果發(fā)生了并行度改變的情況,就會把所有并行度上的每一個狀態(tài)項都收集起來,形成一個擁有當前任務的所有并行子任務的所有狀態(tài)的一個大List,然后按照重新規(guī)劃好的并行度進行輪詢分配。
4.1.2 UnionListState
這個狀態(tài)是ListState的升級版本,它與ListState之間的區(qū)別就在于在發(fā)生并行度變化的時候,并不是形成一張大List進行輪詢分配,而是將這個大List以廣播的形式發(fā)送給更新并行度之后的所有子任務,讓它們按照自己的需要自行拿取和保留。
4.1.3 BroadcastState
廣播狀態(tài)是一個非常有用的狀態(tài),它能夠將廣播流中的數據發(fā)送給每一個并行子任務。所以在使用過程中,通過把規(guī)則數據按照廣播變量的方式提供給每一個并行子任務,來實現符合規(guī)則內容的處理。
4.2 鍵控狀態(tài)類型
鍵控狀態(tài)的類型與算子狀態(tài)的結構類型要復雜一些,數量上也會多一些,具體可以分為
ValueState、ListState、MapState、reduceingState、AggregateState
4.2.1 ValueState
值狀態(tài)是算子狀態(tài)中最基本的一個,根據名稱我們能夠得知,值狀態(tài)就是以一個值來設置狀態(tài)的。舉個例子,如果你想得出一個學校每個班級身高超過180cm的學生信息,就可以把這個180cm設置成為值狀態(tài),然后通過計算得出想要的結果。
4.2.2 ListState
列表狀態(tài)實際上就是多個值狀態(tài)組合而成。舉個例子:如果你想得出身高為180com、170cm、160cm這三種身高的學生信息,那就把這三種身高組成列表狀態(tài)。然后用這個列表狀態(tài)去處理數據。
4.2.3 MapState
映射狀態(tài)在我看起來是比較靈活的一種狀態(tài),它的狀態(tài)都是一個個的kv鍵值對。舉個例子:如果你想得到所有身高為180、體重為160的學生信息,就能夠通過這個狀態(tài)類型來計算并得到。
4.2.4 reduceingState
歸約狀態(tài)有點類似于值狀態(tài),只不過這種狀態(tài)類型是對每條數據的歸約結果。值得注意的是,歸約狀態(tài)的數據類型要與傳入的參數是一致的。
4.2.5 AggregateState
聚合狀態(tài)和歸約狀態(tài)沒差多少,只不過是能夠讓聚合的狀態(tài)與傳入的數據類型可以不一樣。
4.3 廣播狀態(tài)
廣播狀態(tài)在底層是一個以kv形式存在的狀態(tài)類型,因此如果想創(chuàng)建廣播變量,需要傳入一個MapState狀態(tài)描述去完成創(chuàng)建。你只要記住,如果你在做計算的時候,想要用依賴一些少量的數據做規(guī)則匹配,那么選擇廣播狀態(tài)基本上是不會錯的。
4.4 狀態(tài)生存時間
有的時候,隨著計算時間的不斷增長,計算時所使用的狀態(tài)也就會越來越大。大量的狀態(tài)會占據很大的內存空間,影響集群的性能。因此需要規(guī)定出一個狀態(tài)的生存時間的概念,一旦到達時間就會將過期的狀態(tài)擦除、或者進行狀態(tài)無效化。具體的使用方法、步驟如下:
1.創(chuàng)建一個StateTtlConfig配置對象,并傳入對應的參數
2.調用狀態(tài)描述器的enableTimeToLive方法,將第一步驟的對象傳入。
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(10)) //傳入的是狀態(tài)的生存時間
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)//定義什么時候觸發(fā)失效時間的判斷
//OnCreateAndWrite(默認)創(chuàng)建和寫入的時候觸發(fā) OnReadAndWrite無論讀寫都會創(chuàng)建
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) //定義失效狀態(tài)的可見性
//NeverReturnExpired 永遠不返回過期狀態(tài) ReturnExpireDefNotCleanedUp如果過去狀態(tài)還沒被系統(tǒng)清理,就還能返回
.build();
ValueStateDescriptor<String> stateDescriptor = new ValueStateDescriptor<>("mystate", String.class);
stateDescriptor.enableTimeToLive(ttlConfig);
5.狀態(tài)的具體使用
5.1 廣播狀態(tài)具體使用
1.創(chuàng)建狀態(tài)描述器
2.將數據流聲明為廣播狀態(tài)
3.廣播流與常規(guī)流連接
4.對連接后的流做計算
//1.創(chuàng)建狀態(tài)描述器
MapStateDescriptor<String,String> resultDescripter = new MapStateDecriptorM<>("ruler-name",String.class,String.class);
//2.將數據流聲明為廣播狀態(tài)
BroadcastStream<Event> ruleBroadcastStram = dataStream.broadcast(resultDescripter);
//3.廣播流與常規(guī)流連接 4.對連接后的流做計算
stream.connect(ruleBroadcastStram).process(new KeyedBroadcastProcessFunction()) --針對監(jiān)控流
stream.connect(ruleBroadcastStram).process(new BroadcastProcessFunction()) --針對常規(guī)流
在連接流里面還有兩個要實現的方法,分別為processElement和processBroadcastElement;后者是對廣播流中的數據進行處理的,前者則是定義如何根據廣播流里面的狀態(tài)數據做對應的處理計算的。因此processElement無法獲得可以修改的狀態(tài)數據內容。
5.2鍵控狀態(tài)
鍵控狀態(tài)的集種狀態(tài)的使用方法基本一樣,都是要先傳入對應的狀態(tài)描述器,然后在對狀態(tài)內容進行獲取,然后按照不同類型的狀態(tài)做對應的計算。這部分內容我會放到后面的實戰(zhàn)系列去詳細的寫,有需求看起來也會更加方便一些。