?? state和checkpoint? 關(guān)系:
1、state一般指一個具體的task/operator的狀態(tài)【state數(shù)據(jù)默認保存在java的堆內(nèi)存中】
2、而checkpoint是把state數(shù)據(jù)持久化存儲了,表示了一個Flink Job在一個特定時刻的一份全局狀態(tài)快照,即包含了所有task/operator的狀態(tài),保存在hdfs
?? state 分類
1、Keyed State:基于KeyedStream上的狀態(tài)。這個狀態(tài)是跟特定的key綁定的,對KeyedStream流上的每一個key,都對應一個state。
保存state的數(shù)據(jù)結(jié)構(gòu)
ValueState<T>:即類型為T的單值狀態(tài)。這個狀態(tài)與對應的key綁定,是最簡單的狀態(tài)了。它可以通過update方法更新狀態(tài)值,通過value()方法獲取狀態(tài)值
ListState<T>:即key上的狀態(tài)值為一個列表。可以通過add方法往列表中附加值;也可以通過get()方法返回一個Iterable<T>來遍歷狀態(tài)值
ReducingState<T>:這種狀態(tài)通過用戶傳入的reduceFunction,每次調(diào)用add方法添加值的時候,會調(diào)用reduceFunction,最后合并到一個單一的狀態(tài)值
MapState<UK, UV>:即狀態(tài)值為一個map。用戶通過put或putAll方法添加元素
2、Operator State: Key無關(guān)的State,與Operator綁定的state,整個operator只對應一個state?
? 保存state的數(shù)據(jù)結(jié)構(gòu) ListState<T>,代表:Kafka Connector
?? checkpoint 配置
checkpoint的checkPointMode有兩種,Exactly-once和At-least-once
Exactly-once對于大多數(shù)應用來說是最合適的。At-least-once可能用在某些延遲超低的應用程序(始終延遲為幾毫秒)
??State Backend(狀態(tài)的后端存儲)
state會保存在taskmanager的內(nèi)存中,
checkpoint會存儲在JobManager的內(nèi)存中。
state 的store和checkpoint的位置取決于State Backend的配置
env.setStateBackend(…) ,有3種
1 MemoryStateBackend
2 FsStateBackend
3? RocksDBStateBackend
??修改State Backend的兩種方式
第一種: 修改當前任務代碼
env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));
或者new MemoryStateBackend()
或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依賴】
第二種:全局調(diào)整
修改flink-conf.yaml
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints
注意:state.backend的值可以是下面幾種:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)
● 多個Checkpoint及從checkpoint恢復
在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的個數(shù)
state.checkpoints.num-retained: 20
退回指定的checkpoint
flink run -s? \
hdfs://namenode:9000/flink/checkpoints/467e17d2cc343e6c56255d222bae3421/chk-56/_metadata? ?\
flink-job.jar
● Savepoint?
checkPoint :應用定時觸發(fā),用于保存狀態(tài),會過期
內(nèi)部應用失敗重啟的時候使用
savePoint:用戶手動執(zhí)行,是指向Checkpoint的指針,不會過期在升級的情況下使用
注意:為了能夠在作業(yè)的不同版本之間以及 Flink 的不同版本之間順利升級,強烈推薦程序員通過 uid(String) 方法手動的給算子賦予 ID,這些 ID 將用于確定每一個算子的狀態(tài)范圍。如果不手動給各算子指定 ID,則會由 Flink 自動給每個算子生成一個 ID。只要這些 ID 沒有改變就能從保存點(savepoint)將程序恢復回來。而這些自動生成的 ID 依賴于程序的結(jié)構(gòu),并且對代碼的更改是很敏感的。因此,強烈建議用戶手動的設置 ID
savepoint使用
1:在flink-conf.yaml中配置Savepoint存儲位置
不是必須設置,但是設置后,后面創(chuàng)建指定Job的Savepoint時,可以不用在手動執(zhí)行命令時指定Savepoint的位置
state.savepoints.dir: hdfs://namenode:9000/flink/savepoints
2:觸發(fā)一個savepoint【直接觸發(fā)或者在cancel的時候觸發(fā)】
bin/flink savepoint jobId [targetDirectory] [-yid yarnAppId]【針對on yarn模式需要指定-yid參數(shù)】
bin/flink cancel -s [targetDirectory] jobId [-yid yarnAppId]【針對on yarn模式需要指定-yid參數(shù)】
3:從指定的savepoint啟動job
bin/flink run -s savepointPath [runArgs]
。