回顧
????在之前的學(xué)習(xí)中我們了解到,flink 作為低延時(shí)的流式數(shù)據(jù)處理框架,本身是有狀態(tài)的。狀態(tài) state 是為了保存一些操作符 operator 的中間結(jié)果,同時(shí),通過(guò)狀態(tài)可以保證精確一致語(yǔ)義。
State 分類
????State 從其實(shí)現(xiàn)方式可分為:Keyed State 和 Operator State,從管理方式可分為:Raw State 和 Managed State。
Keyed State
- Keyed State 通常是與 keys 相關(guān)的,其函數(shù)和操作只能在 KeyedStream 中使用。事實(shí)上,keyed 與 hive 中的分區(qū)極其類似,每一個(gè) key 只能屬于某一個(gè) keyed state。
DataStream<Tuple2<String, Integer>> counts =
text.flatMap(new Tokenizer())
.keyBy(0) // 使用 keyby 方法進(jìn)行劃分,不同的 task 之間不會(huì)出現(xiàn)相同的 key
.sum(1);

keyby.png
- 如上,有3個(gè)并行度的 WordCount 任務(wù),在 keyby 之后,相同的 key 會(huì)被劃分到相同的 task 中進(jìn)行處理。
Operator State
- non-keyed state,每一個(gè) operator state 都僅與一個(gè) operator 的實(shí)例綁定。
- 常見(jiàn)的 operator state 是 source state,例如記錄當(dāng)前 source 的 offset
Managed State
????Managed State 是由 Flink Runtime 中管理的 State ,并將狀態(tài)數(shù)據(jù)轉(zhuǎn)換為 hashtable 或者 RocksDB 的對(duì)象進(jìn)行存儲(chǔ)。

mysu_bj.png
- ValueState:與 key 對(duì)應(yīng)單個(gè)的值,在我們統(tǒng)計(jì)流式數(shù)據(jù)中的單詞個(gè)數(shù)時(shí),事實(shí)上,狀態(tài)就是以 ValueState 存在,每次在狀態(tài)值上進(jìn)行更新。在其內(nèi)部,調(diào)用 update(T value) 方法進(jìn)行狀態(tài)值的更新。
public interface ValueState<T> extends State {
/**
* Returns the current value for the state. When the state is not
* partitioned the returned value is the same for all inputs in a given
* operator instance. If state partitioning is applied, the value returned
* depends on the current operator input, as the operator maintains an
* independent state for each partition.
*
* <p>If you didn't specify a default value when creating the {@link ValueStateDescriptor}
* this will return {@code null} when to value was previously set using {@link #update(Object)}.
*
* @return The state value corresponding to the current input.
*
* @throws IOException Thrown if the system cannot access the state.
*/
T value() throws IOException;
/**
* Updates the operator state accessible by {@link #value()} to the given
* value. The next time {@link #value()} is called (for the same state
* partition) the returned state will represent the updated value. When a
* partitioned state is updated with null, the state for the current key
* will be removed and the default value is returned on the next access.
*
* @param value The new value for the state.
*
* @throws IOException Thrown if the system cannot access the state.
*/
void update(T value) throws IOException;
}
- ListState:與 key 對(duì)應(yīng)的元素的列表的狀態(tài) list,內(nèi)部定義 update(T)和 addAll(T)兩個(gè)方法
public interface ListState<T> extends MergingState<T, Iterable<T>> {
/**
* Updates the operator state accessible by {@link #get()} by updating existing values to
* to the given list of values. The next time {@link #get()} is called (for the same state
* partition) the returned state will represent the updated list.
*
* <p>If null or an empty list is passed in, the state value will be null.
*
* @param values The new values for the state.
*
* @throws Exception The method may forward exception thrown internally (by I/O or functions).
*/
void update(List<T> values) throws Exception;
/**
* Updates the operator state accessible by {@link #get()} by adding the given values
* to existing list of values. The next time {@link #get()} is called (for the same state
* partition) the returned state will represent the updated list.
*
* <p>If null or an empty list is passed in, the state value remains unchanged.
*
* @param values The new values to be added to the state.
*
* @throws Exception The method may forward exception thrown internally (by I/O or functions).
*/
void addAll(List<T> values) throws Exception;
}
- ReducingState:定義與 key 相關(guān)的數(shù)據(jù)元素單個(gè)聚合的狀態(tài)值。
- AggregatingState:定義與 key 相關(guān)的數(shù)據(jù)元素單個(gè)聚合的狀態(tài)值。
Raw State
????Raw State 是由算子本身進(jìn)行管理的 State ,此時(shí)狀態(tài)都是以字節(jié)數(shù)組的形式保存到 Checkpoint 中,F(xiàn)link 并不清楚狀態(tài)數(shù)據(jù)的內(nèi)部結(jié)構(gòu),每次狀態(tài)的寫(xiě)入和讀取都需要算子進(jìn)行序列化和反序列化。
狀態(tài)管理
????Flink 中狀態(tài)管理有三種方案:MemoryStateBackend、FSStateBackend、RocksDBStateBackend。
MemoryStateBackend
- MemoryStateBackend 基于內(nèi)存的狀態(tài)管理器,它通常將狀態(tài)數(shù)據(jù)存儲(chǔ)在 JVM 內(nèi)存中,包括 Key/State 及窗口中緩存的數(shù)據(jù)。但是由于內(nèi)存本身的限制,基于內(nèi)存的狀態(tài)管理會(huì)造成內(nèi)存溢出。因此,這種狀態(tài)管理機(jī)制通常在本地測(cè)試中使用,生產(chǎn)中禁止使用內(nèi)存狀態(tài)管理器。
env.setStateBackend(new MemoryStateBackend());
FSStateBackend
- FSStateBackend 基于文件的狀態(tài)管理,和內(nèi)存管理機(jī)制不同,F(xiàn)SStateBackend 通常把狀態(tài)數(shù)據(jù)保存在本地文件系統(tǒng),或者HDFS文件系統(tǒng)中。在初始化時(shí),需要傳入文件路徑?;谖募臓顟B(tài)管理機(jī)制,適用于狀態(tài)數(shù)據(jù)很大的數(shù)據(jù),此時(shí),如果使用內(nèi)存狀態(tài)管理器,很容易就把內(nèi)存撐爆。通常情況下,為了保證文件狀態(tài)安全性,會(huì)把文件狀態(tài)保存在 HDFS 中,此時(shí),借助 HDFS 的多副本的策略,保證文件狀態(tài)不丟失。
env.setStateBackend(new FsStateBackend(""));
// 源碼
public FsStateBackend(Path checkpointDataUri) {
this(checkpointDataUri.toUri());
}
RocksDBStateBackend
- RocksDBStateBackend 基于內(nèi)存和文件系統(tǒng)的狀態(tài)管理器,這是基于三方的狀態(tài)管理器。通常,先把狀態(tài)放在內(nèi)存中,等到到達(dá)一定的大小時(shí),會(huì)將狀態(tài)數(shù)據(jù)刷到文件中。
env.setStateBackend(new RocksDBStateBackend(""));
總結(jié)
????狀態(tài)是 Flink 容錯(cuò)機(jī)制的基石,了解 State 的機(jī)制,可以更好的管理 Checkpoint,更好的進(jìn)行失敗任務(wù)的恢復(fù)。