一、狀態(tài)分類
相對(duì)于其他流計(jì)算框架,F(xiàn)link 一個(gè)比較重要的特性就是其支持有狀態(tài)計(jì)算。即你可以將中間的計(jì)算結(jié)果進(jìn)行保存,并提供給后續(xù)的計(jì)算使用:
[圖片上傳失敗...(image-a7a290-1596010933687)]
<figcaption></figcaption>
具體而言,F(xiàn)link 又將狀態(tài) (State) 分為 Keyed State 與 Operator State:
2.1 算子狀態(tài)
算子狀態(tài) (Operator State):顧名思義,狀態(tài)是和算子進(jìn)行綁定的,一個(gè)算子的狀態(tài)不能被其他算子所訪問到。官方文檔上對(duì) Operator State 的解釋是:each operator state is bound to one parallel operator instance,所以更為確切的說一個(gè)算子狀態(tài)是與一個(gè)并發(fā)的算子實(shí)例所綁定的,即假設(shè)算子的并行度是 2,那么其應(yīng)有兩個(gè)對(duì)應(yīng)的算子狀態(tài):
[圖片上傳失敗...(image-88cb40-1596010933686)]
<figcaption></figcaption>
2.2 鍵控狀態(tài)
鍵控狀態(tài) (Keyed State) :是一種特殊的算子狀態(tài),即狀態(tài)是根據(jù) key 值進(jìn)行區(qū)分的,F(xiàn)link 會(huì)為每類鍵值維護(hù)一個(gè)狀態(tài)實(shí)例。如下圖所示,每個(gè)顏色代表不同 key 值,對(duì)應(yīng)四個(gè)不同的狀態(tài)實(shí)例。需要注意的是鍵控狀態(tài)只能在 KeyedStream 上進(jìn)行使用,我們可以通過 stream.keyBy(...) 來得到 KeyedStream 。
[圖片上傳失敗...(image-2dc7e-1596010933686)]
<figcaption></figcaption>
二、狀態(tài)編程
2.1 鍵控狀態(tài)
Flink 提供了以下數(shù)據(jù)格式來管理和存儲(chǔ)鍵控狀態(tài) (Keyed State):
-
ValueState:存儲(chǔ)單值類型的狀態(tài)??梢允褂?
update(T)進(jìn)行更新,并通過T value()進(jìn)行檢索。 -
ListState:存儲(chǔ)列表類型的狀態(tài)??梢允褂?
add(T)或addAll(List)添加元素;并通過get()獲得整個(gè)列表。 -
ReducingState:用于存儲(chǔ)經(jīng)過 ReduceFunction 計(jì)算后的結(jié)果,使用
add(T)增加元素。 -
AggregatingState:用于存儲(chǔ)經(jīng)過 AggregatingState 計(jì)算后的結(jié)果,使用
add(IN)添加元素。 -
FoldingState:已被標(biāo)識(shí)為廢棄,會(huì)在未來版本中移除,官方推薦使用
AggregatingState代替。 - MapState:維護(hù) Map 類型的狀態(tài)。
以上所有增刪改查方法不必硬記,在使用時(shí)通過語法提示來調(diào)用即可。這里給出一個(gè)具體的使用示例:假設(shè)我們正在開發(fā)一個(gè)監(jiān)控系統(tǒng),當(dāng)監(jiān)控?cái)?shù)據(jù)超過閾值一定次數(shù)后,需要發(fā)出報(bào)警信息。這里之所以要達(dá)到一定次數(shù),是因?yàn)橛捎谂及l(fā)原因,偶爾一次超過閾值并不能代表什么,故需要達(dá)到一定次數(shù)后才觸發(fā)報(bào)警,這就需要使用到 Flink 的狀態(tài)編程。相關(guān)代碼如下:
public class ThresholdWarning extends
RichFlatMapFunction<Tuple2<String, Long>, Tuple2<String, List<Long>>> {
// 通過ListState來存儲(chǔ)非正常數(shù)據(jù)的狀態(tài)
private transient ListState<Long> abnormalData;
// 需要監(jiān)控的閾值
private Long threshold;
// 觸發(fā)報(bào)警的次數(shù)
private Integer numberOfTimes;
ThresholdWarning(Long threshold, Integer numberOfTimes) {
this.threshold = threshold;
this.numberOfTimes = numberOfTimes;
}
@Override
public void open(Configuration parameters) {
// 通過狀態(tài)名稱(句柄)獲取狀態(tài)實(shí)例,如果不存在則會(huì)自動(dòng)創(chuàng)建
abnormalData = getRuntimeContext().getListState(
new ListStateDescriptor<>("abnormalData", Long.class));
}
@Override
public void flatMap(Tuple2<String, Long> value, Collector<Tuple2<String, List<Long>>> out)
throws Exception {
Long inputValue = value.f1;
// 如果輸入值超過閾值,則記錄該次不正常的數(shù)據(jù)信息
if (inputValue >= threshold) {
abnormalData.add(inputValue);
}
ArrayList<Long> list = Lists.newArrayList(abnormalData.get().iterator());
// 如果不正常的數(shù)據(jù)出現(xiàn)達(dá)到一定次數(shù),則輸出報(bào)警信息
if (list.size() >= numberOfTimes) {
out.collect(Tuple2.of(value.f0 + " 超過指定閾值 ", list));
// 報(bào)警信息輸出后,清空狀態(tài)
abnormalData.clear();
}
}
}
復(fù)制代碼
調(diào)用自定義的狀態(tài)監(jiān)控,這里我們使用 a,b 來代表不同類型的監(jiān)控?cái)?shù)據(jù),分別對(duì)其數(shù)據(jù)進(jìn)行監(jiān)控:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Tuple2<String, Long>> tuple2DataStreamSource = env.fromElements(
Tuple2.of("a", 50L), Tuple2.of("a", 80L), Tuple2.of("a", 400L),
Tuple2.of("a", 100L), Tuple2.of("a", 200L), Tuple2.of("a", 200L),
Tuple2.of("b", 100L), Tuple2.of("b", 200L), Tuple2.of("b", 200L),
Tuple2.of("b", 500L), Tuple2.of("b", 600L), Tuple2.of("b", 700L));
tuple2DataStreamSource
.keyBy(0)
.flatMap(new ThresholdWarning(100L, 3)) // 超過100的閾值3次后就進(jìn)行報(bào)警
.printToErr();
env.execute("Managed Keyed State");
復(fù)制代碼
輸出如下結(jié)果如下:
[圖片上傳失敗...(image-e0f6a8-1596010933685)]
<figcaption></figcaption>
2.2 狀態(tài)有效期
以上任何類型的 keyed state 都支持配置有效期 (TTL) ,示例如下:
StateTtlConfig ttlConfig = StateTtlConfig
// 設(shè)置有效期為 10 秒
.newBuilder(Time.seconds(10))
// 設(shè)置有效期更新規(guī)則,這里設(shè)置為當(dāng)創(chuàng)建和寫入時(shí),都重置其有效期到規(guī)定的10秒
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
/*設(shè)置只要值過期就不可見,另外一個(gè)可選值是ReturnExpiredIfNotCleanedUp,
代表即使值過期了,但如果還沒有被物理刪除,就是可見的*/
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
ListStateDescriptor<Long> descriptor = new ListStateDescriptor<>("abnormalData", Long.class);
descriptor.enableTimeToLive(ttlConfig);
復(fù)制代碼
2.3 算子狀態(tài)
相比于鍵控狀態(tài),算子狀態(tài)目前支持的存儲(chǔ)類型只有以下三種:
- ListState:存儲(chǔ)列表類型的狀態(tài)。
- UnionListState:存儲(chǔ)列表類型的狀態(tài),與 ListState 的區(qū)別在于:如果并行度發(fā)生變化,ListState 會(huì)將該算子的所有并發(fā)的狀態(tài)實(shí)例進(jìn)行匯總,然后均分給新的 Task;而 UnionListState 只是將所有并發(fā)的狀態(tài)實(shí)例匯總起來,具體的劃分行為則由用戶進(jìn)行定義。
- BroadcastState:用于廣播的算子狀態(tài)。
這里我們繼續(xù)沿用上面的例子,假設(shè)此時(shí)我們不需要區(qū)分監(jiān)控?cái)?shù)據(jù)的類型,只要有監(jiān)控?cái)?shù)據(jù)超過閾值并達(dá)到指定的次數(shù)后,就進(jìn)行報(bào)警,代碼如下:
public class ThresholdWarning extends RichFlatMapFunction<Tuple2<String, Long>,
Tuple2<String, List<Tuple2<String, Long>>>> implements CheckpointedFunction {
// 非正常數(shù)據(jù)
private List<Tuple2<String, Long>> bufferedData;
// checkPointedState
private transient ListState<Tuple2<String, Long>> checkPointedState;
// 需要監(jiān)控的閾值
private Long threshold;
// 次數(shù)
private Integer numberOfTimes;
ThresholdWarning(Long threshold, Integer numberOfTimes) {
this.threshold = threshold;
this.numberOfTimes = numberOfTimes;
this.bufferedData = new ArrayList<>();
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
// 注意這里獲取的是OperatorStateStore
checkPointedState = context.getOperatorStateStore().
getListState(new ListStateDescriptor<>("abnormalData",
TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {
})));
// 如果發(fā)生重啟,則需要從快照中將狀態(tài)進(jìn)行恢復(fù)
if (context.isRestored()) {
for (Tuple2<String, Long> element : checkPointedState.get()) {
bufferedData.add(element);
}
}
}
@Override
public void flatMap(Tuple2<String, Long> value,
Collector<Tuple2<String, List<Tuple2<String, Long>>>> out) {
Long inputValue = value.f1;
// 超過閾值則進(jìn)行記錄
if (inputValue >= threshold) {
bufferedData.add(value);
}
// 超過指定次數(shù)則輸出報(bào)警信息
if (bufferedData.size() >= numberOfTimes) {
// 順便輸出狀態(tài)實(shí)例的hashcode
out.collect(Tuple2.of(checkPointedState.hashCode() + "閾值警報(bào)!", bufferedData));
bufferedData.clear();
}
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// 在進(jìn)行快照時(shí),將數(shù)據(jù)存儲(chǔ)到checkPointedState
checkPointedState.clear();
for (Tuple2<String, Long> element : bufferedData) {
checkPointedState.add(element);
}
}
}
復(fù)制代碼
調(diào)用自定義算子狀態(tài),這里需要將并行度設(shè)置為 1:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 開啟檢查點(diǎn)機(jī)制
env.enableCheckpointing(1000);
// 設(shè)置并行度為1
DataStreamSource<Tuple2<String, Long>> tuple2DataStreamSource = env.setParallelism(1).fromElements(
Tuple2.of("a", 50L), Tuple2.of("a", 80L), Tuple2.of("a", 400L),
Tuple2.of("a", 100L), Tuple2.of("a", 200L), Tuple2.of("a", 200L),
Tuple2.of("b", 100L), Tuple2.of("b", 200L), Tuple2.of("b", 200L),
Tuple2.of("b", 500L), Tuple2.of("b", 600L), Tuple2.of("b", 700L));
tuple2DataStreamSource
.flatMap(new ThresholdWarning(100L, 3))
.printToErr();
env.execute("Managed Keyed State");
}
復(fù)制代碼
此時(shí)輸出如下:
[圖片上傳失敗...(image-d9aa0c-1596010933685)]
<figcaption></figcaption>
在上面的調(diào)用代碼中,我們將程序的并行度設(shè)置為 1,可以看到三次輸出中狀態(tài)實(shí)例的 hashcode 全是一致的,證明它們都同一個(gè)狀態(tài)實(shí)例。假設(shè)將并行度設(shè)置為 2,此時(shí)輸出如下:
[圖片上傳失敗...(image-757922-1596010933684)]
<figcaption></figcaption>
可以看到此時(shí)兩次輸出中狀態(tài)實(shí)例的 hashcode 是不一致的,代表它們不是同一個(gè)狀態(tài)實(shí)例,這也就是上文提到的,一個(gè)算子狀態(tài)是與一個(gè)并發(fā)的算子實(shí)例所綁定的。同時(shí)這里只輸出兩次,是因?yàn)樵诓l(fā)處理的情況下,線程 1 可能拿到 5 個(gè)非正常值,線程 2 可能拿到 4 個(gè)非正常值,因?yàn)橐笥?3 次才能輸出,所以在這種情況下就會(huì)出現(xiàn)只輸出兩條記錄的情況,所以需要將程序的并行度設(shè)置為 1。
三、檢查點(diǎn)機(jī)制
3.1 CheckPoints
為了使 Flink 的狀態(tài)具有良好的容錯(cuò)性,F(xiàn)link 提供了檢查點(diǎn)機(jī)制 (CheckPoints) 。通過檢查點(diǎn)機(jī)制,F(xiàn)link 定期在數(shù)據(jù)流上生成 checkpoint barrier ,當(dāng)某個(gè)算子收到 barrier 時(shí),即會(huì)基于當(dāng)前狀態(tài)生成一份快照,然后再將該 barrier 傳遞到下游算子,下游算子接收到該 barrier 后,也基于當(dāng)前狀態(tài)生成一份快照,依次傳遞直至到最后的 Sink 算子上。當(dāng)出現(xiàn)異常后,F(xiàn)link 就可以根據(jù)最近的一次的快照數(shù)據(jù)將所有算子恢復(fù)到先前的狀態(tài)。
[圖片上傳失敗...(image-8b1582-1596010933684)]
<figcaption></figcaption>
3.2 開啟檢查點(diǎn)
默認(rèn)情況下,檢查點(diǎn)機(jī)制是關(guān)閉的,需要在程序中進(jìn)行開啟:
// 開啟檢查點(diǎn)機(jī)制,并指定狀態(tài)檢查點(diǎn)之間的時(shí)間間隔
env.enableCheckpointing(1000);
// 其他可選配置如下:
// 設(shè)置語義
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 設(shè)置兩個(gè)檢查點(diǎn)之間的最小時(shí)間間隔
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
// 設(shè)置執(zhí)行Checkpoint操作時(shí)的超時(shí)時(shí)間
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 設(shè)置最大并發(fā)執(zhí)行的檢查點(diǎn)的數(shù)量
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
// 將檢查點(diǎn)持久化到外部存儲(chǔ)
env.getCheckpointConfig().enableExternalizedCheckpoints(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
// 如果有更近的保存點(diǎn)時(shí),是否將作業(yè)回退到該檢查點(diǎn)
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
復(fù)制代碼
3.3 保存點(diǎn)機(jī)制
保存點(diǎn)機(jī)制 (Savepoints) 是檢查點(diǎn)機(jī)制的一種特殊的實(shí)現(xiàn),它允許你通過手工的方式來觸發(fā) Checkpoint,并將結(jié)果持久化存儲(chǔ)到指定路徑中,主要用于避免 Flink 集群在重啟或升級(jí)時(shí)導(dǎo)致狀態(tài)丟失。示例如下:
# 觸發(fā)指定id的作業(yè)的Savepoint,并將結(jié)果存儲(chǔ)到指定目錄下
bin/flink savepoint :jobId [:targetDirectory]
復(fù)制代碼
更多命令和配置可以參考官方文檔:savepoints
四、狀態(tài)后端
4.1 狀態(tài)管理器分類
默認(rèn)情況下,所有的狀態(tài)都存儲(chǔ)在 JVM 的堆內(nèi)存中,在狀態(tài)數(shù)據(jù)過多的情況下,這種方式很有可能導(dǎo)致內(nèi)存溢出,因此 Flink 該提供了其它方式來存儲(chǔ)狀態(tài)數(shù)據(jù),這些存儲(chǔ)方式統(tǒng)一稱為狀態(tài)后端 (或狀態(tài)管理器):
[圖片上傳失敗...(image-84289c-1596010933684)]
<figcaption></figcaption>
主要有以下三種:
1. MemoryStateBackend
默認(rèn)的方式,即基于 JVM 的堆內(nèi)存進(jìn)行存儲(chǔ),主要適用于本地開發(fā)和調(diào)試。
2. FsStateBackend
基于文件系統(tǒng)進(jìn)行存儲(chǔ),可以是本地文件系統(tǒng),也可以是 HDFS 等分布式文件系統(tǒng)。 需要注意而是雖然選擇使用了 FsStateBackend ,但正在進(jìn)行的數(shù)據(jù)仍然是存儲(chǔ)在 TaskManager 的內(nèi)存中的,只有在 checkpoint 時(shí),才會(huì)將狀態(tài)快照寫入到指定文件系統(tǒng)上。
3. RocksDBStateBackend
RocksDBStateBackend 是 Flink 內(nèi)置的第三方狀態(tài)管理器,采用嵌入式的 key-value 型數(shù)據(jù)庫 RocksDB 來存儲(chǔ)正在進(jìn)行的數(shù)據(jù)。等到 checkpoint 時(shí),再將其中的數(shù)據(jù)持久化到指定的文件系統(tǒng)中,所以采用 RocksDBStateBackend 時(shí)也需要配置持久化存儲(chǔ)的文件系統(tǒng)。之所以這樣做是因?yàn)?RocksDB 作為嵌入式數(shù)據(jù)庫安全性比較低,但比起全文件系統(tǒng)的方式,其讀取速率更快;比起全內(nèi)存的方式,其存儲(chǔ)空間更大,因此它是一種比較均衡的方案。
4.2 配置方式
Flink 支持使用兩種方式來配置后端管理器:
第一種方式:基于代碼方式進(jìn)行配置,只對(duì)當(dāng)前作業(yè)生效:
// 配置 FsStateBackend
env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
// 配置 RocksDBStateBackend
env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:40010/flink/checkpoints"));
復(fù)制代碼
配置 RocksDBStateBackend 時(shí),需要額外導(dǎo)入下面的依賴:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_2.11</artifactId>
<version>1.9.0</version>
</dependency>
復(fù)制代碼
第二種方式:基于 flink-conf.yaml 配置文件的方式進(jìn)行配置,對(duì)所有部署在該集群上的作業(yè)都生效:
state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
復(fù)制代碼
注:本篇文章所有示例代碼下載地址:flink-state-management
參考資料
- Working with State
- Checkpointing
- Savepoints
- State Backends
- Fabian Hueske , Vasiliki Kalavri . 《Stream Processing with Apache Flink》. O'Reilly Media . 2019-4-30
作者:heibaiying
鏈接:https://juejin.im/post/5dd2661cf265da0bf175d5bb
來源:掘金
著作權(quán)歸作者所有。商業(yè)轉(zhuǎn)載請(qǐng)聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請(qǐng)注明出處。