官方文檔
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/
所有的functions和operator在flink中,都是可以stateful。stateful functions通過處理單獨(dú)的元素/事件
為了使得 state有容錯(cuò)性,flink需要使用checkpoint狀態(tài).Checkpoint允許flink恢復(fù)狀態(tài)和位置在流中,使得應(yīng)用有相同的預(yù)付達(dá)到任意失敗的運(yùn)行。
狀態(tài)有兩種
- keyed狀態(tài).Keyed State is always relative to keys and can only be used in functions and operators on a KeyedStream.
- 操作的狀態(tài).With Operator State (or non-keyed state), each operator state is bound to one parallel operator instance. The Kafka Connector is a good motivating example for the use of Operator State in Flink. Each parallel instance of the Kafka consumer maintains a map of topic partitions and offsets as its Operator State.
CheckPoints
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/checkpoints.html#overview
打開和配置Checkpoint
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);
// advanced options:
// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);
// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// enable externalized checkpoints which are retained after job cancellation
//當(dāng)程序關(guān)閉的時(shí)候,會(huì)觸發(fā)額外的checkpoints
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
相關(guān)聯(lián)的配置項(xiàng)
一些相關(guān)聯(lián)的參數(shù)在conf/flink-conf.yaml
- state.backend。如果打開,可以用以存儲(chǔ)operator的狀態(tài)的checkpoint。支持的后端有:
- jobmanager In-memory state, backup to JobManager’s/ZooKeeper’s memory. Should be used only for minimal state (Kafka offsets) or testing and local debugging.
- filesystem: State is in-memory on the TaskManagers, and state snapshots are stored in a file system. Supported are all filesystems supported by Flink, for example HDFS, S3, …
- state.backend.fs.checkpointdir:存儲(chǔ)checkpoint的目錄,文件系統(tǒng)是flink支持的文件系統(tǒng)。注意:State backend必須從jobmanager可訪問,使用flie:// 只能在local搭建的情況下。
- state.backend.rocksdb.checkpointdir
- state.checkpoints.dir
- state.checkpoints.num-retained
Resuming from an externalized checkpoint
A job may be resumed from an externalized checkpoint just as from a savepoint by using the checkpoint’s meta data file instead (see the savepoint restore guide). Note that if the meta data file is not self-contained, the jobmanager needs to have access to the data files it refers to (see Directory Structure above).
需要從checkpoint的meta數(shù)據(jù)恢復(fù)程序。注意:如果meta data文件不是自包含的,jobmanager就需要訪問關(guān)聯(lián)的數(shù)據(jù)文件
$ bin/flink run -s :checkpointMetaDataPath [:runArgs]
注意:
This directory will then contain the checkpoint meta data required to restore the checkpoint. For the MemoryStateBackend, this meta data file will be self-contained and no further files are needed.
FsStateBackend and RocksDBStateBackend write separate data files and only write the paths to these files into the meta data file. These data files are stored at the path given to the state back-end during construction.
在使用FsStateBackend and RocksDBStateBackend 情況下,會(huì)把文件分開存儲(chǔ),只需要填寫這些meta文件保存的路徑即可。
Directory Structure
可以通過配置state.checkpoints.dir
比如:
state.checkpoints.dir: hdfs:///checkpoints/
這些文件是可以在保存在后端的時(shí)候通過construction指定的。經(jīng)驗(yàn)證,是可行的。
env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/");
checkpoint與savepoint的區(qū)別
- use a state backend specific (low-level) data format,
- may be incremental,
- do not support Flink specific features like rescaling.
Savepoints
Triggering Savepoints
When triggering a savepoint, a new savepoint directory beneath the target directory is created. In there, the data as well as the meta data will be stored. For example with a FsStateBackend or RocksDBStateBackend:
# Savepoint target directory
/savepoints/
# Savepoint directory
/savepoints/savepoint-:shortjobid-:savepointid/
# Savepoint file contains the checkpoint meta data
/savepoints/savepoint-:shortjobid-:savepointid/_metadata
# Savepoint state
/savepoints/savepoint-:shortjobid-:savepointid/...