Flink的可靠性保證 - 狀態(tài)存儲

一 為什么需要State存儲

與批計(jì)算相比,State是流計(jì)算特有的,批計(jì)算的failover機(jī)制,是失敗后重新計(jì)算;流計(jì)算在大多數(shù)場景下是增量計(jì)算,數(shù)據(jù)逐條處理,每次計(jì)算是在上一次計(jì)算結(jié)果之上進(jìn)行處理的,這就要求對上一次的計(jì)算結(jié)果進(jìn)行存儲,當(dāng)因?yàn)闄C(jī)器,網(wǎng)絡(luò),臟數(shù)據(jù)等原因?qū)е鲁绦蝈e(cuò)誤的時(shí)候,可以重啟Job進(jìn)行state恢復(fù)。Flink就是基于state存儲,通過CheckPoint機(jī)制來保證數(shù)據(jù)的準(zhǔn)確性。

此外,State存儲的內(nèi)容還有流計(jì)算過程中計(jì)算節(jié)點(diǎn)的中間結(jié)果或元數(shù)據(jù)屬性,比如Window方面的操作,需要累加數(shù)據(jù);在aggregation過程中的中間聚合結(jié)果;在以Apache Kafka作為數(shù)據(jù)源時(shí)候,記錄已經(jīng)讀取數(shù)據(jù)的offset等

二 State存儲的實(shí)現(xiàn)

Flink內(nèi)部有三種state的存儲實(shí)現(xiàn),如果不做配置,F(xiàn)link默認(rèn)使用的是MemoryStateBackend,三種實(shí)現(xiàn)分別是:

1 基于內(nèi)存的MemoryStateBackend- 在debug模式使用,在生產(chǎn)模式下不建議使用;

2基于HDFS的FsStateBackend –基于分布式文件系統(tǒng)的持久化,每次讀寫都產(chǎn)生網(wǎng)絡(luò)IO,整體性能不太好;

3基于RocksDB的RocksDBStateBackend - 本地文件+異步HDFS持久化,當(dāng)前版本在生產(chǎn)環(huán)境下使用的

選擇用RocksDB+HDFS的方式進(jìn)行State的存儲,State存儲分兩個(gè)階段,首先本地存儲到RocksDB,然后異步的同步到遠(yuǎn)程的HDFS。 這樣的而設(shè)計(jì)既消除了HeapStateBackend的局限(內(nèi)存有限,宕機(jī)數(shù)據(jù)丟失),也減少了純分布式存儲的網(wǎng)絡(luò)IO開銷。RocksDBStateBackend存儲value的大小是有限制的,

RocksDB’s的bridge API是基于byte[]的,所以這種state存儲支持的每個(gè)key的value最大不超過2^31,有些merge操作的值可能會超過2^31 bytes,這點(diǎn)要注意。

StateBackend的控制粒度到j(luò)ob級別,如果想為所有job設(shè)置StateBackend,可以通過更改flink-conf.yaml文件里state.backend的值 ,上述3類sate backend對應(yīng)的值是;

jobmanager(MemoryStateBackend),filesystem(FsStateBackend),和 rocksdb(RocksDBStateBackend);也可為單獨(dú)的job設(shè)置StateBackend的方法:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));

KeyedState和OperatorState:

KeyedState - 這里面的key就是我們用KeyBy(x)里面的key,key與key之間的State是不可見的。KeyedState只能用在KeyedStream上的數(shù)據(jù)處理上。OperatorState是和Operator聯(lián)系在一起的,比如Source Connector的實(shí)現(xiàn)中會用OperatorState來記錄source數(shù)據(jù)讀取的offset。

無論選用何種stateBackend,這些state都是優(yōu)先存在本機(jī)上,當(dāng)計(jì)算并行度發(fā)生變化,這些state也會被重新分發(fā)到不同機(jī)器上去。

OperatorState的分發(fā),需要在Source Connector中實(shí)現(xiàn),重點(diǎn)是把source的partition重新分配,并把之前記錄的每個(gè)partition的offset也告訴新分配到的Source Connector。

KeyedState存的數(shù)據(jù)量比較大,如果調(diào)整并發(fā)度,copy的東西可能比較多,F(xiàn)link為了避免過多的拷貝,采用了一個(gè)keygroup的機(jī)制。每個(gè)key通過hash方法分配到不同的keygroup中,當(dāng)并發(fā)度調(diào)整的時(shí)候,調(diào)整粒度是keygroup,也就是一個(gè)key通過hash后所在的keygroup保持不變。

RawState 和 ?Managed State

managed state是我們常用的那些ValueState,ListState,MapState等,這些State類型,由Flink控制它們的數(shù)據(jù)結(jié)構(gòu)和存取方法。

Raw state是在自己實(shí)現(xiàn)operator的時(shí)候使用,相當(dāng)于自定義state類型,自己控制數(shù)據(jù)結(jié)構(gòu)和存取方式。

State可以手動地刪除已存的值,也可以設(shè)置Time-To-Live (TTL),讓state過期自動失效。存取State前,要先創(chuàng)建StateDescriptor,StateDescriptor含有state名稱和state值的數(shù)據(jù)類型,有時(shí)候還需要自定義函數(shù)。在keyedstream上應(yīng)用的函數(shù),存取state的時(shí)候,key是由Flink自動提供的,直接使用xxx.value()函數(shù)就可以取到當(dāng)前key對應(yīng)的值;由Flink自動控制的話,可以統(tǒng)一控制state和stream的分區(qū)。

歡迎閱讀,有問題可以通過郵件kaiyuan0989愛特163.com一起探討。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容