state和checkpoint和savepoint

?? state和checkpoint? 關(guān)系:

1、state一般指一個具體的task/operator的狀態(tài)【state數(shù)據(jù)默認保存在java的堆內(nèi)存中】

2、而checkpoint是把state數(shù)據(jù)持久化存儲了,表示了一個Flink Job在一個特定時刻的一份全局狀態(tài)快照,即包含了所有task/operator的狀態(tài),保存在hdfs

?? state 分類

1、Keyed State:基于KeyedStream上的狀態(tài)。這個狀態(tài)是跟特定的key綁定的,對KeyedStream流上的每一個key,都對應一個state。

保存state的數(shù)據(jù)結(jié)構(gòu)

ValueState<T>:即類型為T的單值狀態(tài)。這個狀態(tài)與對應的key綁定,是最簡單的狀態(tài)了。它可以通過update方法更新狀態(tài)值,通過value()方法獲取狀態(tài)值

ListState<T>:即key上的狀態(tài)值為一個列表。可以通過add方法往列表中附加值;也可以通過get()方法返回一個Iterable<T>來遍歷狀態(tài)值

ReducingState<T>:這種狀態(tài)通過用戶傳入的reduceFunction,每次調(diào)用add方法添加值的時候,會調(diào)用reduceFunction,最后合并到一個單一的狀態(tài)值

MapState<UK, UV>:即狀態(tài)值為一個map。用戶通過put或putAll方法添加元素

2、Operator State: Key無關(guān)的State,與Operator綁定的state,整個operator只對應一個state?

? 保存state的數(shù)據(jù)結(jié)構(gòu) ListState<T>,代表:Kafka Connector

?? checkpoint 配置

checkpoint的checkPointMode有兩種,Exactly-once和At-least-once

Exactly-once對于大多數(shù)應用來說是最合適的。At-least-once可能用在某些延遲超低的應用程序(始終延遲為幾毫秒)


??State Backend(狀態(tài)的后端存儲)


state會保存在taskmanager的內(nèi)存中,

checkpoint會存儲在JobManager的內(nèi)存中。

state 的store和checkpoint的位置取決于State Backend的配置

env.setStateBackend(…) ,有3種

1 MemoryStateBackend

2 FsStateBackend

3? RocksDBStateBackend


??修改State Backend的兩種方式

第一種: 修改當前任務代碼

env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints"));

或者new MemoryStateBackend()

或者new RocksDBStateBackend(filebackend, true);【需要添加第三方依賴】

第二種:全局調(diào)整

修改flink-conf.yaml

state.backend: filesystem

state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints

注意:state.backend的值可以是下面幾種:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend)

● 多個Checkpoint及從checkpoint恢復

在Flink的配置文件conf/flink-conf.yaml中,添加如下配置,指定最多需要保存Checkpoint的個數(shù)

state.checkpoints.num-retained: 20

退回指定的checkpoint

flink run -s? \

hdfs://namenode:9000/flink/checkpoints/467e17d2cc343e6c56255d222bae3421/chk-56/_metadata? ?\

flink-job.jar

● Savepoint?

checkPoint :應用定時觸發(fā),用于保存狀態(tài),會過期

內(nèi)部應用失敗重啟的時候使用

savePoint:用戶手動執(zhí)行,是指向Checkpoint的指針,不會過期在升級的情況下使用

注意:為了能夠在作業(yè)的不同版本之間以及 Flink 的不同版本之間順利升級,強烈推薦程序員通過 uid(String) 方法手動的給算子賦予 ID,這些 ID 將用于確定每一個算子的狀態(tài)范圍。如果不手動給各算子指定 ID,則會由 Flink 自動給每個算子生成一個 ID。只要這些 ID 沒有改變就能從保存點(savepoint)將程序恢復回來。而這些自動生成的 ID 依賴于程序的結(jié)構(gòu),并且對代碼的更改是很敏感的。因此,強烈建議用戶手動的設置 ID

savepoint使用

1:在flink-conf.yaml中配置Savepoint存儲位置

不是必須設置,但是設置后,后面創(chuàng)建指定Job的Savepoint時,可以不用在手動執(zhí)行命令時指定Savepoint的位置

state.savepoints.dir: hdfs://namenode:9000/flink/savepoints

2:觸發(fā)一個savepoint【直接觸發(fā)或者在cancel的時候觸發(fā)】

bin/flink savepoint jobId [targetDirectory] [-yid yarnAppId]【針對on yarn模式需要指定-yid參數(shù)】

bin/flink cancel -s [targetDirectory] jobId [-yid yarnAppId]【針對on yarn模式需要指定-yid參數(shù)】

3:從指定的savepoint啟動job

bin/flink run -s savepointPath [runArgs]



。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容