State Backend Support
local state
Local state backends maintain all state in local memory or outof-core, within an embedded key-value database such as RocksDB. Out-of-core access is preferred in production deployments asin that case state size is only limited by the quota of the local filesystem allocated to each node. When a snapshot operation is triggered, a copy of the current local states is written to a durable storage layer (e.g., a distributed file system directory). The local statebackends can support asynchronous and incremental snapshotting (Section 4.2) and yield considerable read/write performance as they exploit data locality while eliminating the need for distributed or transactional coordination(在寫local state時(shí)不用考慮與其他task 進(jìn)行事物協(xié)調(diào),因?yàn)楫?dāng)發(fā)生failover事,會(huì)從remote state 載入全局一致的state,同時(shí)由于狀態(tài)在本地,可以進(jìn)行高效的讀寫,缺點(diǎn)是快照的時(shí)候需要將狀態(tài)副本寫入遠(yuǎn)程state).
External State
External State Backends, where state access is internally coordinated with an external system such as a database or key/value store.(狀態(tài)的訪問外部數(shù)據(jù)庫,計(jì)算存儲(chǔ)分離)
Non-Mvcc database,can be supported by maintaining within each task a write-ahead-log (WAL) of pending state changes per epoch. The WAL can be committed to the database once an epoch has been completed. In more
detail, each snapshot is one distributed bulk 2-phase commit transaction: during a checkpoint, changes are put into the transaction log (WAL) and pre-committed when triggerSnapshot() is invoked. Once the global snapshot is complete, then pre-committed states are fully-committed by the JobManager in one atomic transaction(每個(gè)task維護(hù)一個(gè)wal log 存儲(chǔ) pending state,在完成時(shí)提交). This approach is feasible even when the database does not expose the necessary program hooks to log, pre-commit, and fully commit.
MVCC MVCC-enabled databases allow for committing state
across multiple database versions. This can integrate with Flink’s snapshotting mechanism by associating each state update with the undergoing epoch. Once a snapshot is fully committed the version is atomically incremented. Likewise, a failed snapshot epoch decrements the current version. (數(shù)據(jù)庫通過mvcc 支持撤回,避免了使用WAL 緩存預(yù)提交數(shù)據(jù))
External 比local的優(yōu)勢(shì)
A general advantage of external state backends is that rollback recovery does not require any I/O to retrieve and load state from snapshots (contrary to local state
backends). This benefit becomes particularly impactful when state
is very large, by avoiding any network I/O upon reconfiguration,
thus, making it a suitable choice under low latency requirements.
Finally, another benefit that comes out-of-the-box with all external
backends is support for incremental snapshotting, since, by definition, only changes are committed externally。(計(jì)算存儲(chǔ)分離可以很方便的實(shí)現(xiàn)狀態(tài)恢復(fù),不需要像local state需要從DFS載入狀態(tài)副本,同時(shí)原生支持增量快照,只需要提交對(duì)狀態(tài)的修改 ,local state是通過 lsm支持的。)
補(bǔ)充
目前flink的狀態(tài)后端都是采用local state模式,local state 可以選擇 heapstatebackend 或者 RocksDBstateBackend。然后快照時(shí)將狀態(tài)副本備份至DFS,這造成了對(duì)大狀態(tài)恢復(fù)時(shí)間過長的問題。
采用Exteral State又會(huì)面臨較大的讀寫延遲,阿里針對(duì)這一問題進(jìn)行了優(yōu)化,優(yōu)化緩存解決了讀寫延遲的問題,可以參考https://files.alicdn.com/tpsservice/1df9ccb8a7b6b2782a558d3c32d40c19.pdf
異步增量快照
Task收到快照對(duì)齊后,出發(fā)triggerSnapshot()獲得當(dāng)前task的state copy。然而這個(gè)copy可以不是物理上的,可以是邏輯上的,當(dāng)實(shí)際調(diào)用時(shí)才產(chǎn)生。(it can be a logical snapshot that is lazily materialized by a concurrent thread)。這需要copy-on-write的數(shù)據(jù)結(jié)構(gòu)支持。
flink local backends 的實(shí)現(xiàn)
- out-of-core state backend based on RocksDB
運(yùn)用LSM tree,更新新不是立刻的,而是以異步的方式添加和壓縮。進(jìn)行快照時(shí),以同步的標(biāo)記當(dāng)前版本號(hào),以阻止在壓縮數(shù)據(jù)時(shí)對(duì)該版本號(hào)數(shù)據(jù)的修改。The operator can then continue processing
and make modifications to the state. An asynchronous thread iterates over the marked version, materializes it to the snapshot store,
and finally releases the snapshot so that future compactions can
overwrite that state.
增量實(shí)現(xiàn):Furthermore, the LSM-based data structure
also lends itself to incremental snapshots, which write only parts to
the snapshot store that changed since the previous snapshots
-in-memory
Flink’s in-memory local state backend implementation is based
on hash tables that employ chain hashing. During a snapshot, it
copies the current table array synchronously and then starts the external materialization of the snapshot(用同步的方式獲取state的snapshot copy,然后用異步的方式external materialization,我的理解是序列化,輸出到外部存儲(chǔ)), in a background thread. The operator’s regular stream processing thread lazily copies the state entries and overflow chains upon modification, if the materialization thread still holds onto the snapshot(當(dāng)materialization thread持有快照時(shí),采用lazy copy的方式,只有當(dāng)流處理線程對(duì)快照進(jìn)行過修改,才會(huì)deep copy state). Incremental snapshots for
the in-memory local backend are possible and conceptually trivial
(using delta maps), yet not implemented at the current point.
-
Lazy Copy
A lazy copy can be defined as a combination of both shallow copy and deep copy. The mechanism follows a simple approach – at the initial state, shallow copy approach is used. A counter is also used to keep a track on how many objects share the data. When the program wants to modify the original object, it checks whether the object is shared or not. If the object is shared, then the deep copy mechanism is initiated.
參考
1、
Flink增量快照https://www.slideshare.net/FlinkForward/stephan-ewen-scaling-to-large-state
levelDB介紹
https://www.cnblogs.com/haippy/archive/2011/12/04/2276064.html
2、
external state 嘗試
http://osdir.com/apache-flink-development/msg10919.html
https://files.alicdn.com/tpsservice/1df9ccb8a7b6b2782a558d3c32d40c19.pdf
3、
Flink 狀態(tài)管理:Carbone P, Ewen, Stephan, Fóra, Gyula, et al. State management in Apache Flink?[J]. Proceedings of the Vldb Endowment, 2017, 10(12):1718-1729.
