一、歷史變遷
- 在Flink 1.0.0時(shí)期
提供了RocksDB的支持,這個(gè)版本之前所有的狀態(tài)都只能存在進(jìn)程的內(nèi)存里面,這個(gè)內(nèi)存總有存不下的一天,如果存不下則會(huì)發(fā)生OOM。如果想要存更多數(shù)據(jù)、更大量State就要用到RocksDB。RocksDB是一款基于文件的嵌入式數(shù)據(jù)庫(kù),它會(huì)把數(shù)據(jù)存到磁盤(pán),但是同時(shí)它又提供高效讀寫(xiě)能力。所以使用RocksDB不會(huì)發(fā)生OOM這種事情。
在Flink1.1.0里面,提供了純異步化的RocksDB的snapshot。以前版本在做RocksDB的snapshot時(shí)它會(huì)同步阻塞主數(shù)據(jù)流的處理,很影響吞吐量,即每當(dāng)checkpoint時(shí)主數(shù)據(jù)流就會(huì)卡住。純異步化處理之后不會(huì)卡住數(shù)據(jù)流,于是吞吐量也得到了提升。
- 在Flink 1.3.0時(shí)期: 增量checkpoint
引入了增量的checkpoint這個(gè)比較重要的功能。只有基于增量的checkpoint才能更好地支持含有超大State的Job。如果每一次都把全量上TB的State都刷到遠(yuǎn)程的HDFS上那么這個(gè)效率是很低下的。而增量checkpoint只是把checkpoint間隔新增的那些狀態(tài)發(fā)到遠(yuǎn)程做存儲(chǔ),每一次checkpoint發(fā)的數(shù)據(jù)就少了很多,效率得到提高。在這個(gè)版本里面還引入了一個(gè)細(xì)粒度的recovery,細(xì)粒度的recovery在做恢復(fù)的時(shí)候,有時(shí)不需要對(duì)整個(gè)Job做恢復(fù),可能只需要恢復(fù)這個(gè)Job中的某一個(gè)子圖,這樣便能夠提高恢復(fù)效率。
- 在Flink 1.5.0時(shí)期:Local Recovery
引入了Task local 的State的recovery。因?yàn)榛赾heckpoint機(jī)制,會(huì)把State持久化地存儲(chǔ)到某一個(gè)遠(yuǎn)程存儲(chǔ),比如HDFS,當(dāng)發(fā)生Failover的時(shí)候需要重新把這個(gè)數(shù)據(jù)從遠(yuǎn)程HDFS再download下來(lái),如果這個(gè)狀態(tài)特別大那么該download操作的過(guò)程就會(huì)很漫長(zhǎng),導(dǎo)致Failover恢復(fù)所花的時(shí)間會(huì)很長(zhǎng)。Task local state recovery提供的機(jī)制是當(dāng)Job發(fā)生Failover之后,能夠保證該Job狀態(tài)在本地不會(huì)丟失,進(jìn)行恢復(fù)時(shí)只需在本地直接恢復(fù),不需從遠(yuǎn)程HDFS重新把狀態(tài)download下來(lái),于是就提升了Failover recovery的效率。
二、Asynchronous State Snapshots
我們注意到上面描述的機(jī)制意味著當(dāng) operator 向后端存儲(chǔ)快照時(shí),會(huì)停止處理輸入的數(shù)據(jù)。這種同步操作會(huì)在每次快照創(chuàng)建時(shí)引入延遲。
我們完全可以在存儲(chǔ)快照時(shí),讓 operator 繼續(xù)處理數(shù)據(jù),讓快照存儲(chǔ)在后臺(tái)異步運(yùn)行。為了做到這一點(diǎn),operator 必須能夠生成一個(gè)后續(xù)修改不影響之前狀態(tài)的狀態(tài)對(duì)象。例如 RocksDB 中使用的寫(xiě)時(shí)復(fù)制( copy-on-write )類型的數(shù)據(jù)結(jié)構(gòu)。
接收到輸入的 barrier 時(shí),operator異步快照復(fù)制出的狀態(tài)(注:checkpoint的同步部分,復(fù)制狀態(tài)可能會(huì)花費(fèi)較多的時(shí)間,這也是為什么checkpoint同步部分時(shí)間很長(zhǎng)的原因)。然后立即發(fā)射 barrier 到輸出流,繼續(xù)正常的流處理。一旦后臺(tái)異步快照完成,它就會(huì)向 checkpoint coordinator(JobManager)確認(rèn) checkpoint 完成?,F(xiàn)在 checkpoint 完成的充分條件是:所有 sink 接收到了 barrier,所有有狀態(tài) operator 都確認(rèn)完成了狀態(tài)備份(可能會(huì)比 sink 接收到 barrier 晚)。
RocksDBStateBackend 模式對(duì)于較大的 Key 進(jìn)行更新操作時(shí)序列化和反序列化耗時(shí)很多??梢钥紤]使用 FsStateBackend 模式替代。
三、理解Checkpoint


heavy alignments


如何選用合適的狀態(tài)后端
