Overview
有狀態(tài)函數(shù)(Stateful function)和算子(Operator)在處理獨立的數(shù)據(jù)或事件時存儲數(shù)據(jù),使得狀態(tài)(State)成為任何復(fù)雜算子中的關(guān)鍵部分,例如:
- 當(dāng)應(yīng)用檢索特定的事件模式,State 將存儲到接收到的事件的序列
- 當(dāng)按照分鐘/小時/天的唯獨聚合事件時,狀態(tài)將保留待處理的聚合狀態(tài)
- 當(dāng)在流數(shù)據(jù)上進(jìn)行機(jī)器學(xué)習(xí)的模型訓(xùn)練時,狀態(tài)保存模型當(dāng)前版本的參數(shù)
- 當(dāng)需要管理歷史數(shù)據(jù)時,狀態(tài)允許有效地訪問過去發(fā)生的事件
Flink 需要知道 State,以便使程序的 checkpoint 和 savepoint 可用。
有關(guān)狀態(tài)的知識還允許重新縮放Flink應(yīng)用程序,這意味著Flink負(fù)責(zé)在并行實例之間重新分配狀態(tài)。
Work with State
State 分類
Flink 中的狀態(tài)分為兩種:Keyed state 和 Operator state
Operator State
每個 Operator state 都綁定到一個并行的算子的實例上??梢詤⒖?Kafka connector 的例子,Kafka 消費者的每個并行實例都維護(hù)一個 topic 分區(qū)和 offset 的對應(yīng)關(guān)系作為 Operator state。Operator state 支持當(dāng)并行度更改時,在并行的算子實例間重新分配 State,進(jìn)行這種重新分配有多種不同的方案(后面會介紹)。
Operator state 在 Python DataStream API 中還不支持。
Keyed State
Keyed state 與 key 強相關(guān),只能在 KeyedStream 上應(yīng)用的函數(shù)和算子內(nèi)使用。
可以將 Keyed state 認(rèn)為是分區(qū)后的 Operator state,每個 key 有一個 State 的分區(qū)。邏輯上,每個 Keyed state 綁定唯一的 <parallel-operator-instance, key>(算子并行實例和 key 的一對元組),可以將其簡單地視為 <operator, key>(因為每個 key 屬于算子的唯一一個并行實例)。
Keyed state 進(jìn)一步被組織為 Key groups。Key groups 是 Flink 重新分配 Keyed state 的原子單位,Key groups 的數(shù)量與最大并行度相同。在程序執(zhí)行期間 keyed operator 的每個并行實例都使用一個或多個 Key groups 的 keys。
Broadcast State
Broadcast State 是一種特殊的 Operator State。Broadcast State 是 Flink 支持的另一種擴(kuò)展方式。用來支持將某一個流的數(shù)據(jù)廣播到所有下游任務(wù),數(shù)據(jù)被存儲在本地,接受到廣播的流在操作時可以使用這些數(shù)據(jù)。
Broadcast state 的特點是:
- 使用 Map 類型的數(shù)據(jù)結(jié)構(gòu)
- 僅適用于同時具有廣播流和非廣播流作為數(shù)據(jù)輸入的特定算子
- 可以具有多個不同名稱的 Broadcast state
Broadcast state 在 Python DataStream API 中還不支持。
Raw State 和 Managed State
State 有兩種狀態(tài):托管狀態(tài)(managed)和原生狀態(tài)(raw)。
托管狀態(tài)(managed state) 由 Flink runtime 管理的數(shù)據(jù)結(jié)構(gòu)表示,例如內(nèi)部哈希表或 RocksDB。Flink runtime 對 State 進(jìn)行編碼并寫入 checkpoint。
原生狀態(tài)(raw state) 是在算子內(nèi)部的數(shù)據(jù)結(jié)構(gòu)中的保存。Checkpoint 只會保存 State 內(nèi)容的字節(jié)序列,State 的真實數(shù)據(jù)結(jié)構(gòu)對 Flink 是透明的。
所有數(shù)據(jù)流函數(shù)都可以使用托管狀態(tài)(managed state),而原生狀態(tài)(raw state)只能在具體實現(xiàn)算子時使用。建議使用托管狀態(tài),因為在托管狀態(tài)下,F(xiàn)link 能夠在并行度改變時自適應(yīng)地重新分配 State,并且在內(nèi)存管理方面可以做的更好。
Keyed DataStream
如果要使用 keyed state,首先需要在 DataStream 上指定 key(可以使用方法 keyBy(KeySelector) ),用于對狀態(tài)以及流中的記錄進(jìn)行分區(qū),這將產(chǎn)生 KeyedStream,可以使用 Keyed state。
KeySelector 函數(shù)將單個記錄作為輸入并返回該記錄的 key 值,可以是任何類型,并且必須從確定性計算中派生。
Flink 的數(shù)據(jù)模型不是基于 key-value 的。因此,不需要將數(shù)據(jù)集類型物理地加入到 key 和 value 中。key 被定義用于指導(dǎo)分組算子。
public class WC {
public String word;
// ...
public String getWord() { return word; }
}
DataStream<WC> words = ...
KeyedStream<WC> keyed = words.keyBy(WC::getWord);
Tuple Keys and Expression Keys
Flink 還有兩種定義 key 的方法:Java/scala api 中的 tuple keys 和 expression keys(python api中仍然不支持)。
可以使用元組(Tuple)字段索引或表達(dá)式來選擇對象的字段來指定 key。現(xiàn)在不推薦使用這些,使用 KeySelector 函數(shù)是絕對優(yōu)越的:使用 lambda,易于使用,在運行時的開銷可能更小。
Keyed State 和 Operator State
Use Keyed State
Keys state 提供多種不同類型 State,作用域都是當(dāng)前輸入數(shù)據(jù)的鍵,只能用于 KeyedStream,可以通過 stream.keyBy(…) 創(chuàng)建。
首先看有哪些不同類型的狀態(tài),以及如何在程序中使用:
-
ValueState<T>:保存了一個值,可以更新和讀?。ㄋ阕硬僮鞯拿總€ key 可能有一個 value)update(T)更新
T value()取值 ListState<T>:保存了一個列表,可以追加元素,可以獲取到一個包含所有當(dāng)前存儲的元素的迭代器
add(T)或addAll(List<T>)添加到列表
Iterable<T> get()獲取迭代器
update(List<T>)使用新的列表覆蓋現(xiàn)有列表ReducingState<T>:保存一個值,表示添加到 State 的所有值的聚合結(jié)果。提供的接口類似于ListState
add(T)函數(shù)會使用指定的函數(shù)(ReduceFunction)對添加的值進(jìn)行聚合AggregatingState<IN, OUT>:保存一個值,表示添加到 State 的所有值的聚合結(jié)果。與ReducingState不同的是,聚合結(jié)果的數(shù)據(jù)類型可以與添加到 State 的元素的數(shù)據(jù)類型不同。接口同樣類似于ListState
add(IN)函數(shù)會使用指定的函數(shù)(AggregateFunction)對添加的值進(jìn)行聚合MapState<UK, UV>:保存一個 Map??梢詫?key/value 存入 State,也可以獲取到一個包含所有當(dāng)前存儲的元素的迭代器
put(UK, UV)或putAll(Map<UK, UV>)添加 key/value 到 Map
get(UK)獲取與指定 key 的 value
entries()、keys()和values()對 Map 的元素/鍵/值遍歷訪問
所有類型的 State 都有 clear() 方法來清除當(dāng)前狀態(tài)。
首先要記住這些 State 對象僅用于有狀態(tài)接口,State 不一定存儲在內(nèi)存,也可能存儲在磁盤或其他位置。其次要記住的是,從 State 獲得的值取決于輸入數(shù)據(jù)的 key,如果處理的 keys 不同,定義的函數(shù)的調(diào)用結(jié)果會不同。
要想操作 State 對象,首先必須創(chuàng)建一個 StateDescriptor,該對象擁有 State 名稱(可以創(chuàng)建多個 State,必須具有唯一的名稱來引用 State),State 所持有的值的類型,可能還有用戶指定的函數(shù)(例如 ReduceFunction)。對應(yīng)不同的 State 類型,有如下類對象:ValueStateDescriptor、ListStateDescriptor、AggregatingStateDescriptor, ReducingStateDescriptor 和 MapStateDescriptor。
然后使用 RuntimeContext 可以才訪問到 State,因此只能在 RichFunction 中使用,在 RichFunction 方法中 RuntimeContext 訪問各種類型 State 的方法:
ValueState<T> getState(ValueStateDescriptor<T>)ReducingState<T> getReducingState(ReducingStateDescriptor<T>)ListState<T> getListState(ListStateDescriptor<T>)AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)
下面是在 FlatMapFunction 中的使用示例:
實現(xiàn)了一個簡單的計數(shù)窗口,通過輸入元組的第一個參數(shù)分組,在分組的流中,每接收到兩個元組,返回兩那個元組的第二個參數(shù)的平均值
class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
// 使用 ValueState 保存元素求和值
// 元組第一個參數(shù)為求和個數(shù) count,第二個參數(shù)為求和值 sum
private var sum: ValueState[(Long, Long)] = _
override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {
// 訪問 State 的值
val tmpCurrentSum = sum.value
// 初始值 (0, 0)
val currentSum = if (tmpCurrentSum != null) {
tmpCurrentSum
} else {
(0L, 0L)
}
// 求和并更新 State
val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
sum.update(newSum)
// 如果 state value 達(dá)到 2, 發(fā)送統(tǒng)計的平均值并清空 state
if (newSum._1 >= 2) {
out.collect((input._1, newSum._2 / newSum._1))
sum.clear()
}
}
override def open(parameters: Configuration): Unit = {
// 通過 RuntimeContext 和 ValueStateDescriptor 獲取 ValueState
sum = getRuntimeContext.getState(
new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
)
}
}
object ExampleCountWindowAverage extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// create keyed stream and call CountWindowAverage
env.fromCollection(List(
(1L, 3L), (1L, 5L), (1L, 7L), (1L, 4L), (1L, 2L)
))
.keyBy(_._1)
.flatMap(new CountWindowAverage())
.print()
// 輸出 (1,4) (1,5)
env.execute("ExampleManagedState")
}
狀態(tài)生存周期 (TTL)
狀態(tài)生存周期(TTL)可以被指定給任何類型的 Keyed state,如果配置了 TTL 并且 State value 已過期,State value 會被清除。所有集合類型 State(ListState 和 MapState)都支持為每個條目設(shè)置 TTL。
為了啟用 State TTL,首先需要構(gòu)建 StateTtlConfig 對象,然后通過在 ValueStateDescriptor(其他類型同理)構(gòu)造中傳入該對象來啟用 TTL,參考下面的例子:
// Time.seconds(1) 生存時間,必填項
//
// StateTtlConfig.UpdateType 更新類型
// - UpdateType.OnCreateAndWrite - 創(chuàng)建和寫入時更新(默認(rèn))
// - UpdateType.OnReadAndWrite - 讀取和寫入時更新
//
// StateTtlConfig.StateVisibility 狀態(tài)可見性,訪問時是否返回已經(jīng)過期的值
// - StateVisibility.NeverReturnExpired - 永遠(yuǎn)不會返回過期的值(默認(rèn)),對于不能訪問過期數(shù)據(jù)的場景有用
// - StateVisibility.ReturnExpiredIfNotCleanedUp -如果可以讀到會返回
//
val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build
// 構(gòu)建 ValueStateDescriptor
val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
// 啟用 TTL
stateDescriptor.enableTimeToLive(ttlConfig)
補充:
- 狀態(tài)上次的修改時間會和數(shù)據(jù)一起保存在 state backend 中,因此開啟 TTL 特性會增加狀態(tài)數(shù)據(jù)的存儲。
- 目前TTL僅支持處理時間(processing time)。
- 如果之前沒有配置TTL,而狀態(tài)恢復(fù)時啟用TTL(相反的情況同樣),會引起兼容性錯誤和 StateMigrationException 異常。
- TTL配置不是 checkpoint 或 savepoint 的一部分,而是在 Flink 運行時做處理。
- 對于 Map 類型,只有序列化方法支持空值時,TTL的設(shè)置才支持空值,否則需要使用
NullableSerializer進(jìn)行封裝(序列化會占用額外的字節(jié))。 - State TTL 當(dāng)前在 PyFlink DataStream API 中還不支持。
過期數(shù)據(jù)清理
默認(rèn)情況下,過期值在讀取時被顯式刪除(如調(diào)用 ValueState.value()),如果配置的 state 后端支持,則定期在后臺進(jìn)行垃圾收集??梢栽?StateTtlConfig 中禁用后臺清理:
某些老版本,如果未讀取過期狀態(tài)的數(shù)據(jù),則不會將其刪除,這可能會導(dǎo)致狀態(tài)不斷增長
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.disableCleanupInBackground()
.build();
為了對后臺的某些特殊清理進(jìn)行更細(xì)粒度的控制,可以按如下所述單獨配置。目前,heap state backend 依賴于增量清理,RocksD backend 使用壓縮過濾器進(jìn)行后臺清理。
全量快照時進(jìn)行清理
可以指定在獲取完整的 State 快照時激活清理方法,以減小快照的大小。本地狀態(tài)在當(dāng)前實現(xiàn)下未被清除,但當(dāng)從上一個快照恢復(fù)時,不會包括已刪除的過期 State。
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupFullSnapshot()
.build();
注意:
- 這種策略在 RocksDBStateBackend 的增量 checkpoint 模式下無效。
- 這種清理方式可以在任何時候通過 StateTtlConfig 啟用或者關(guān)閉,比如在從 Savepoint 恢復(fù)時。
增量數(shù)據(jù)清理
可以選擇增量方式清理狀態(tài)數(shù)據(jù),在狀態(tài)訪問或處理時進(jìn)行。如果某個狀態(tài)開啟了該清理策略,則會在存儲后端保留一個所有狀態(tài)的惰性全局迭代器。 每次觸發(fā)增量清理時,從迭代器中選擇已經(jīng)過期的數(shù)進(jìn)行清理。
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupIncrementally(10, true)
.build();
該策略有兩個參數(shù)。 第一個是每次清理時檢查狀態(tài)的條目數(shù),在每個狀態(tài)訪問時觸發(fā)。第二個參數(shù)表示是否在處理每條記錄時觸發(fā)清理。 Heap backend 默認(rèn)會檢查 5 條狀態(tài),并且關(guān)閉在每條記錄時觸發(fā)清理。
注意:
- 如果沒有 state 訪問,也沒有處理數(shù)據(jù),則不會清理過期數(shù)據(jù)。
- 增量清理會增加數(shù)據(jù)處理的耗時。
- 現(xiàn)在僅 Heap state backend 支持增量清除機(jī)制。在 RocksDB state backend 上啟用該特性無效。
- 如果 Heap state backend 使用同步快照方式,則會保存一份所有 key 的拷貝,從而防止并發(fā)修改問題,因此會增加內(nèi)存的使用。但異步快照則沒有這個問題。
- 對已有的作業(yè),這個清理方式可以在任何時候通過
StateTtlConfig啟用或禁用該特性,比如從 savepoint 重啟后。
在 RocksDB 壓縮時清理
如果使用 RocksDB state backend,則會啟用 Flink 為 RocksDB 定制的壓縮過濾器。RocksDB 會周期性的對數(shù)據(jù)進(jìn)行合并壓縮從而減少存儲空間。 Flink 提供的 RocksDB 壓縮過濾器會在壓縮時過濾掉已經(jīng)過期的狀態(tài)數(shù)據(jù)。
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupInRocksdbCompactFilter(1000)
.build();
Flink 處理一定條數(shù)的狀態(tài)數(shù)據(jù)后,會使用當(dāng)前時間戳來檢測 RocksDB 中的狀態(tài)是否已經(jīng)過期,可以通過 StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries) 方法指定處理狀態(tài)的條數(shù)。
時間戳更新的越頻繁,狀態(tài)的清理越及時,但由于壓縮會有調(diào)用 JNI 的開銷,因此會影響整體的壓縮性能。RocksDB backend 的默認(rèn)后臺清理策略會每處理 1000 條數(shù)據(jù)進(jìn)行一次。
可以通過配置開啟 RocksDB 過濾器的 debug 日志查看清理操作: log4j.logger.org.rocksdb.FlinkCompactionFilter=DEBUG
注意:
- 壓縮時調(diào)用 TTL 過濾器會降低速度。TTL 過濾器需要解析上次訪問的時間戳,并對每個將參與壓縮的狀態(tài)進(jìn)行是否過期檢查。對于集合型狀態(tài)類型(比如 list 和 map),會對集合中每個元素進(jìn)行檢查。
- 對于元素序列化后長度不固定的列表狀態(tài),TTL 過濾器需要在每次 JNI 調(diào)用過程中,額外調(diào)用 Flink 的 Java 序列化器,從而確定下一個未過期數(shù)據(jù)的位置。
- 對已有的作業(yè),這個清理方式可以在任何時候通過
StateTtlConfig啟用或禁用該特性,比如從 savepoint 重啟后。
Use Operator State
使用托管的 Operator State,有狀態(tài)函數(shù)需要實現(xiàn) CheckpointedFunction 接口。
CheckpointedFunction
CheckpointedFunction 接口提供了訪問 non-keyed state 的方法,需要實現(xiàn)如下兩個方法:
// 執(zhí)行 Checkpoint 時調(diào)用
void snapshotState(FunctionSnapshotContext context) throws Exception;
// 初始化函數(shù)時調(diào)用
void initializeState(FunctionInitializationContext context) throws Exception;
進(jìn)行 checkpoint 時會調(diào)用 snapshotState()。 用戶自定義函數(shù)初始化時會調(diào)用 initializeState(),初始化包括第一次自定義函數(shù)初始化和從之前的 checkpoint 恢復(fù)。 因此 initializeState() 不僅是定義不同狀態(tài)類型初始化的地方,也需要包括狀態(tài)恢復(fù)的邏輯。
Operator state 的數(shù)據(jù)結(jié)構(gòu)不像 Keyed state 豐富,只支持 List,可以認(rèn)為是可序列化對象的列表,彼此獨立。這些對象在動態(tài)擴(kuò)展時是可以重新分配 non-keyed state 的最小單元。目前支持幾種動態(tài)擴(kuò)展方式:
- Even-split redistribution:算子并發(fā)度發(fā)生改變的時候,并發(fā)的每個實例取出 State 列表,合并到一個新的列表上,形成邏輯上完整的 State。然后根據(jù)列表元素的個數(shù),均勻分配給新的并發(fā)實例(Task)。例如,如果并行度為1,算子的 State checkpoint 包含數(shù)據(jù)元 element1 和 element2,當(dāng)并行度增加到2時,element1 會在 算子實例0中,而element2在算子實例1中。
- Union redistribution:相比于平均分配更加靈活,把完整 State 劃分的方式交給用戶去做。并發(fā)度發(fā)生改變的時候,按同樣的方式取到完整的 State 列表,然后直接交給每個實例。
下面的例子是有狀態(tài)的 SinkFunction,利用 CheckpointedFunction 在將數(shù)據(jù)元發(fā)送之前進(jìn)行緩存,checkpoint 時緩存寫入 State,啟動時判斷是否使用 重發(fā)的 State 恢復(fù)緩存:
class BufferingSink(threshold: Int = 0) extends SinkFunction[(String, Int)]
with CheckpointedFunction {
@transient
private var checkpointedState: ListState[(String, Int)] = _
// 輸入數(shù)據(jù)緩存
private val bufferedElements = ListBuffer[(String, Int)]()
override def invoke(value: (String, Int)): Unit = {
// 輸入數(shù)據(jù)存入 buffer
bufferedElements += value
// 當(dāng)緩存數(shù)量達(dá)到閾值,發(fā)送數(shù)據(jù),并清理 buffer
if (bufferedElements.size == threshold) {
for (element <- bufferedElements) {
// send it to the sink
}
bufferedElements.clear()
}
}
// 執(zhí)行 Checkpoint 時調(diào)用
// 清除前一個檢查點包含的所有對象
// 緩存數(shù)據(jù)添加到 State
override def snapshotState(context: FunctionSnapshotContext): Unit = {
checkpointedState.clear()
for (element <- bufferedElements) {
checkpointedState.add(element)
}
}
// 用于首次初始化或從 checkpoint 恢復(fù)
override def initializeState(context: FunctionInitializationContext): Unit = {
val descriptor = new ListStateDescriptor[(String, Int)](
"buffered-elements",
TypeInformation.of(new TypeHint[(String, Int)]() {})
)
// 使用 ctx 和 ListStateDescriptor 訪問 ListState
// 注意調(diào)用方法 getListState(descriptor)
checkpointedState = context.getOperatorStateStore.getListState(descriptor)
// 根據(jù) isRestored() 方法來檢查當(dāng)前是否是 checkpoint 恢復(fù)的情況
if(context.isRestored) {
// 如果 true,表示是恢復(fù)失敗的情況,應(yīng)用恢復(fù)數(shù)據(jù)的邏輯:
// 恢復(fù)數(shù)據(jù)添加到 buffer 中
for(element <- checkpointedState.get()) {
bufferedElements += element
}
}
}
}
initializeState 方法接收一個 FunctionInitializationContext 參數(shù),會用來初始化 non-keyed state 的 “容器”。這些容器是一個 ListState 用于在 checkpoint 時保存 non-keyed state 對象。
注意這些狀態(tài)是如何初始化的,和 keyed state 類似,StateDescriptor 會包括狀態(tài)名字、以及狀態(tài)類型相關(guān)信息。
ListStateDescriptor<Tuple2<String, Integer>> descriptor =
new ListStateDescriptor<>(
"buffered-elements",
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
調(diào)用不同的獲取狀態(tài)對象的接口,會使用不同的狀態(tài)分配算法。比如 getUnionListState(descriptor) 會使用 union redistribution 算法, 而 getListState(descriptor) 則簡單的使用 even-split redistribution 算法。
當(dāng)初始化好狀態(tài)對象后,我們通過 isRestored() 方法判斷是否從之前的故障中恢復(fù)回來,如果該方法返回 true 則表示從故障中進(jìn)行恢復(fù),會執(zhí)行接下來的恢復(fù)邏輯。
正如代碼所示,BufferingSink 中初始化時,恢復(fù)回來的 ListState 的所有元素會添加到一個局部變量中,供下次 snapshotState() 時使用。 然后清空 ListState,再把當(dāng)前局部變量中的所有元素寫入到 checkpoint 中。
另外,我們同樣可以在 initializeState() 方法中使用 FunctionInitializationContext 初始化 keyed state。
有狀態(tài)的 Source Functions
帶狀態(tài)的數(shù)據(jù)源比其他的算子需要注意更多東西。為了保證更新狀態(tài)以及輸出的原子性(用于支持 exactly-once 語義),用戶需要在發(fā)送數(shù)據(jù)前獲取數(shù)據(jù)源的全局鎖。
class CounterSource extends RichParallelSourceFunction[Long]
with ListCheckpointed[Long] {
@volatile
private var isRunning = true
private var offset = 0L
override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
// 獲取鎖
val lock = ctx.getCheckpointLock
while (isRunning) {
// 輸出和更新在一個原子操作中
lock.synchronized({
ctx.collect(offset)
offset += 1
})
}
}
override def cancel(): Unit = isRunning = false
override def restoreState(state: util.List[Long]): Unit =
for (s <- state) {
offset = s
}
override def snapshotState(checkpointId: Long, timestamp: Long): util.List[Long] =
Collections.singletonList(offset)
}
Broadcast State Pattern
Broadcast State API
下面是一個例子來展示 Broadcast State API 的使用。在這個示例中,要處理的數(shù)據(jù)流中是有不同顏色(Color)和形狀(Shape)屬性的對象,希望在流中找到一對具有相同顏色的,并且遵循一個特定的形狀模式的對象組(例如,一個紅色長方形后面緊跟著一個紅色三角形的組合)。 同時希望尋找的模式也會隨著時間而改變。
第一個數(shù)據(jù)流是要處理的數(shù)據(jù)源,流中的對象是 Item,具有 Color 和 Shape 屬性
// 使用 Color 作為鍵來分組,保證相同 Color 的數(shù)據(jù)進(jìn)入到同一個子任務(wù)中
KeyedStream<Item, Color> colorPartitionedStream = shapeStream
.keyBy(new KeySelector<Shape, Color>(){...});
第二個數(shù)據(jù)流是要廣播的數(shù)據(jù),流中的對象是匹配規(guī)則(Rules)
// MapState 中保存 (RuleName,Rule) ,在描述類中指定 State name
MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(
"RulesBroadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<Rule>() {}));
// ruleStream 使用 MapStateDescriptor 作為參數(shù)廣播,得到廣播流
BroadcastStream<Rule> ruleBroadcastStream = ruleStream
.broadcast(ruleStateDescriptor);
然后需要做的是,連接兩個流并且指定兩個連接后的處理邏輯(使用廣播的 Rules 解析流中匹配的數(shù)據(jù)組,并返回)
DataStream<Match> output = colorPartitionedStream
.connect(ruleBroadcastStream)
.process(
/**
* 各個參數(shù)含義:
* Color,非廣播流 keyed stream 的鍵的類型
* Item,非廣播流的對象類型
* Rule,廣播流的對象類型
* String,返回結(jié)果的類型
**/
new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
// matching logic
}
)
如何連接兩個流:非廣播流調(diào)用 connect(BroadcastStream) 方法用來連接廣播流和非廣播流,BroadcastStream 作為參數(shù),返回一個 BroadcastConnectedStream 對象。BroadcastConnectedStream 調(diào)用 process() 方法執(zhí)行處理邏輯,需要指定一個邏輯實現(xiàn)類作為參數(shù),而具體的需要實現(xiàn)的抽象類取決于非廣播流的類型:
- 如果非廣播流是 keyed stream,需要實現(xiàn) KeyedBroadcastProcessFunction
- 如果非廣播流是 non-keyed stream,需要實現(xiàn) BroadcastProcessFunction
BroadcastProcessFunction 和 KeyedBroadcastProcessFunction
這兩個抽象函數(shù)有兩個相同的需要實現(xiàn)的接口:
-
processBroadcastElement()處理廣播流數(shù)據(jù) -
processElement()處理非廣播流數(shù)據(jù)
用于處理非廣播流是 non-keyed stream 的情況
public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
}
用于處理非廣播流是 keyed stream 的情況
public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {
public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
}
可以看到這兩個接口提供的上下文對象有所不同。處理非廣播流(processElement())使用 ReadOnlyContext,處理廣播流(processBroadcastElement())使用 Context。
這兩個上下文對象(簡稱 ctx)提供的方法接口:
- 訪問 Broadcast state:
ctx.getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)- 在
getBroadcastState()方法中傳入的stateDescriptor應(yīng)該與調(diào)用.broadcast(ruleStateDescriptor)的參數(shù)相同。
- 在
- 查詢數(shù)據(jù)的時間戳:
ctx.timestamp() - 獲取當(dāng)前水?。?code>ctx.currentWatermark()
- 獲取當(dāng)前處理時間:
ctx.currentProcessingTime() - 向旁路輸出(side-outputs)發(fā)送數(shù)據(jù):
ctx.output(OutputTag<X> outputTag, X value)
這兩個上下文對象不同之處在于對 Broadcast state 的訪問限制:處理廣播流元素,具有讀和寫的權(quán)限(read-write),處理非廣播流元素只有讀的權(quán)限(read-only)。
這么設(shè)計的原因是,保證 Broadcast state 在算子的所有并行實例中是相同的。由于 Flink 中沒有跨 Task 的通信機(jī)制,在一個任務(wù)實例中的修改不能在并行 Task 間傳遞。而廣播端在所有并行任務(wù)中都能看到相同的數(shù)據(jù)元素,只對廣播端提供可寫的權(quán)限。同時要求在廣播端的每個并行任務(wù)中,對接收數(shù)據(jù)的處理是相同的。如果忽略此規(guī)則會破壞 State 的一致性保證,從而導(dǎo)致不一致且難以診斷的結(jié)果。也就是說,
processBroadcast()的實現(xiàn)邏輯必須在所有并行實例中具有相同的確定性行為。
KeyedBroadcastProcessFunction 在 Keyed Stream 上工作,提供了一些 BroadcastProcessFunction 沒有的功能:
-
processElement()的參數(shù)ReadOnlyContext提供了方法能夠訪問 Flink 的定時器服務(wù),可以注冊事件定時器(event-time timer)或者處理時間的定時器(processing-time timer)。當(dāng)定時器觸發(fā)時,會調(diào)用onTimer()方法, 提供了OnTimerContext,具有ReadOnlyContext的全部功能,并且提供:- 查詢當(dāng)前觸發(fā)的是一個事件還是處理時間的定時器
- 查詢定時器關(guān)聯(lián)的key
-
processBroadcastElement()方法中的參數(shù)Context會提供方法applyToKeyedState(StateDescriptor<S, VS> stateDescriptor, KeyedStateFunction<KS, S> function)。 這個方法使用一個KeyedStateFunction能夠?qū)?stateDescriptor對應(yīng)的 state 中所有 key 的存儲狀態(tài)進(jìn)行某些操作。
注冊一個定時器只能在
KeyedBroadcastProcessFunction的processElement()方法中進(jìn)行。在processBroadcastElement()方法中不能注冊定時器,因為廣播的元素中并沒有關(guān)聯(lián)的 key。
回到前面的例子,KeyedBroadcastProcessFunction 的實現(xiàn)可能看起來如下:
new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
// 存儲部分匹配的結(jié)果,即匹配了一個元素,正在等待第二個元素
// 我們用一個數(shù)組來存儲,因為同時可能有很多第一個元素正在等待
private final MapStateDescriptor<String, List<Item>> mapStateDesc =
new MapStateDescriptor<>(
"items",
BasicTypeInfo.STRING_TYPE_INFO,
new ListTypeInfo<>(Item.class));
// 與之前的 ruleStateDescriptor 相同
private final MapStateDescriptor<String, Rule> ruleStateDescriptor =
new MapStateDescriptor<>(
"RulesBroadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<Rule>() {}));
@Override
public void processBroadcastElement(Rule value,
Context ctx,
Collector<String> out) throws Exception {
ctx.getBroadcastState(ruleStateDescriptor).put(value.name, value);
}
@Override
public void processElement(Item value,
ReadOnlyContext ctx,
Collector<String> out) throws Exception {
final MapState<String, List<Item>> state = getRuntimeContext().getMapState(mapStateDesc);
final Shape shape = value.getShape();
for (Map.Entry<String, Rule> entry :
ctx.getBroadcastState(ruleStateDescriptor).immutableEntries()) {
final String ruleName = entry.getKey();
final Rule rule = entry.getValue();
List<Item> stored = state.get(ruleName);
if (stored == null) {
stored = new ArrayList<>();
}
if (shape == rule.second && !stored.isEmpty()) {
for (Item i : stored) {
out.collect("MATCH: " + i + " - " + value);
}
stored.clear();
}
// 不需要額外的 else{} 段來考慮 rule.first == rule.second 的情況
if (shape.equals(rule.first)) {
stored.add(value);
}
if (stored.isEmpty()) {
state.remove(ruleName);
} else {
state.put(ruleName, stored);
}
}
}
}
重要考慮因素
使用 Broadcast state 時要注意的是:
- 沒有跨 Task 的通信,這就是為什么只有處理廣播流元素可以修改 Broadcast state 的原因。用戶需要保證所有 Task 對于 Broadcast state 的處理方式是一致的,否則會造成不同 Task 讀取 Broadcast state 時內(nèi)容不一致的情況,最終導(dǎo)致結(jié)果不一致。
- 跨 Task 的 Broadcast state 中的事件順序可能不同,盡管廣播流的元素可以保證都將轉(zhuǎn)到所有下游 Task,但元素可能以不同的順序到達(dá)下游 Task。因此,Broadcast state 更新不能依賴傳入事件的順序。
- 所有 Task 都會把 Broadcast state 存入 checkpoint,而不僅僅是其中一,雖然 checkpoint 發(fā)生時所有任務(wù)在具有相同的 broadcast state。這是為了避免在恢復(fù)期間所有任務(wù)從同一文件中進(jìn)行恢復(fù)(避免熱點),代價是 state checkpoint 的大小增加了并行度數(shù)量的倍數(shù)。Flink 會保證在恢復(fù)狀態(tài)/改變并發(fā)的時候數(shù)據(jù)沒有重復(fù)且沒有缺失。 在作業(yè)恢復(fù)時,如果與之前具有相同或更小的并發(fā)度,所有的 Task 讀取之前已經(jīng)在 checkpoint 中的 state。在增大并發(fā)的情況下,多出來的并發(fā)會使用輪詢調(diào)度算法讀取之前 Task 的 state。
- 不支持 RocksDB state backend,broadcast state 在運行時保存在內(nèi)存中,需要保證內(nèi)存充足。這一特性同樣適用于所有其他 Operator State。
主要引用: