Flink總結(jié)-狀態(tài)保存

官方文檔
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/state/

所有的functions和operator在flink中,都是可以stateful。stateful functions通過處理單獨(dú)的元素/事件

為了使得 state有容錯(cuò)性,flink需要使用checkpoint狀態(tài).Checkpoint允許flink恢復(fù)狀態(tài)和位置在流中,使得應(yīng)用有相同的預(yù)付達(dá)到任意失敗的運(yùn)行。

狀態(tài)有兩種

  • keyed狀態(tài).Keyed State is always relative to keys and can only be used in functions and operators on a KeyedStream.
  • 操作的狀態(tài).With Operator State (or non-keyed state), each operator state is bound to one parallel operator instance. The Kafka Connector is a good motivating example for the use of Operator State in Flink. Each parallel instance of the Kafka consumer maintains a map of topic partitions and offsets as its Operator State.

CheckPoints

https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/state/checkpoints.html#overview

打開和配置Checkpoint

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// start a checkpoint every 1000 ms
env.enableCheckpointing(1000);

// advanced options:

// set mode to exactly-once (this is the default)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// make sure 500 ms of progress happen between checkpoints
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// checkpoints have to complete within one minute, or are discarded
env.getCheckpointConfig().setCheckpointTimeout(60000);

// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// enable externalized checkpoints which are retained after job cancellation
//當(dāng)程序關(guān)閉的時(shí)候,會(huì)觸發(fā)額外的checkpoints
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

相關(guān)聯(lián)的配置項(xiàng)

一些相關(guān)聯(lián)的參數(shù)在conf/flink-conf.yaml

  • state.backend。如果打開,可以用以存儲(chǔ)operator的狀態(tài)的checkpoint。支持的后端有:
    • jobmanager In-memory state, backup to JobManager’s/ZooKeeper’s memory. Should be used only for minimal state (Kafka offsets) or testing and local debugging.
    • filesystem: State is in-memory on the TaskManagers, and state snapshots are stored in a file system. Supported are all filesystems supported by Flink, for example HDFS, S3, …
  • state.backend.fs.checkpointdir:存儲(chǔ)checkpoint的目錄,文件系統(tǒng)是flink支持的文件系統(tǒng)。注意:State backend必須從jobmanager可訪問,使用flie:// 只能在local搭建的情況下。
  • state.backend.rocksdb.checkpointdir
  • state.checkpoints.dir
  • state.checkpoints.num-retained

Resuming from an externalized checkpoint

A job may be resumed from an externalized checkpoint just as from a savepoint by using the checkpoint’s meta data file instead (see the savepoint restore guide). Note that if the meta data file is not self-contained, the jobmanager needs to have access to the data files it refers to (see Directory Structure above).

需要從checkpoint的meta數(shù)據(jù)恢復(fù)程序。注意:如果meta data文件不是自包含的,jobmanager就需要訪問關(guān)聯(lián)的數(shù)據(jù)文件

$ bin/flink run -s :checkpointMetaDataPath [:runArgs]

注意:
This directory will then contain the checkpoint meta data required to restore the checkpoint. For the MemoryStateBackend, this meta data file will be self-contained and no further files are needed.

FsStateBackend and RocksDBStateBackend write separate data files and only write the paths to these files into the meta data file. These data files are stored at the path given to the state back-end during construction.

在使用FsStateBackend and RocksDBStateBackend 情況下,會(huì)把文件分開存儲(chǔ),只需要填寫這些meta文件保存的路徑即可。

Directory Structure

可以通過配置state.checkpoints.dir

比如:
state.checkpoints.dir: hdfs:///checkpoints/

這些文件是可以在保存在后端的時(shí)候通過construction指定的。經(jīng)驗(yàn)證,是可行的。

env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data/");

checkpoint與savepoint的區(qū)別

  • use a state backend specific (low-level) data format,
  • may be incremental,
  • do not support Flink specific features like rescaling.

Savepoints

Triggering Savepoints

When triggering a savepoint, a new savepoint directory beneath the target directory is created. In there, the data as well as the meta data will be stored. For example with a FsStateBackend or RocksDBStateBackend:

# Savepoint target directory
/savepoints/

# Savepoint directory
/savepoints/savepoint-:shortjobid-:savepointid/

# Savepoint file contains the checkpoint meta data
/savepoints/savepoint-:shortjobid-:savepointid/_metadata

# Savepoint state
/savepoints/savepoint-:shortjobid-:savepointid/...
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • rljs by sennchi Timeline of History Part One The Cognitiv...
    sennchi閱讀 7,854評(píng)論 0 10
  • 原文鏈接 對(duì)于單節(jié)點(diǎn)設(shè)置,F(xiàn)link已經(jīng)準(zhǔn)備就緒,不需要更改默認(rèn)配置就可以啟動(dòng)。 開箱即用的配置會(huì)使用你默認(rèn)安裝的...
    小C菜鳥閱讀 7,880評(píng)論 0 0
  • Hadoop2.7.3+Spark2.1.0 完全分布式環(huán)境 搭建全過程 www.cnblogs.com/purs...
    Helen_Cat閱讀 1,555評(píng)論 0 3
  • 這一代人,一個(gè)個(gè)像懸崖邊的孩子。在青春的荒原上,他們忽然看見了光。 他們猛力奔跑,觸足之地,或陷泥濘,或長青草,驚...
    bigxiaoxin閱讀 417評(píng)論 0 0
  • (一) 小伊坐在車的后座位上,看著窗外的樹不斷的往后退,逐漸變小,再到消失不見。她已經(jīng)保持這個(gè)姿勢(shì)很久了,不知道的...
    佐佐小仙女閱讀 843評(píng)論 2 3

友情鏈接更多精彩內(nèi)容