有狀態(tài)函數(Stateful function)和算子(Operator)在處理獨立的數據或事件時存儲數據,使得狀態(tài)(State)成為任何復雜算子中的關鍵部分,例如:
- 當應用檢索特定的事件模式,State 將存儲到接收到的事件的序列。
- 當按照分鐘/小時/天的唯獨聚合事件時,狀態(tài)將保留待處理的聚合狀態(tài)。
- 當在流數據上進行機器學習的模型訓練時,狀態(tài)保存模型當前版本的參數。
- 當需要管理歷史數據時,狀態(tài)允許有效地訪問過去發(fā)生的事件。
使用 Flink 需要了解 State,以便應用的檢查點(checkpoint)容錯和保存點(savepoint)可用。
State 的使用
Keyed State 和 Operator State
Flink 中的狀態(tài)分為兩種:Keyed state 和 Operator state
Operator State
每個 Operator state(non-keyed state) 都綁定到一個并行的算子實例上??梢詤⒖?Kafka connector 的例子,Kafka 消費者的每個并行實例都維護一個 topic 分區(qū)和偏移(offset)的對應關系作為 Operator state。Operator state 支持當并行度更改時,在并行的算子實例間重新分配 State,進行這種重新分配有多種不同的方案(后面會介紹)。
Keyed State
Keyed state 與鍵(key)相關,只能在 KeyedStream 上應用的函數和算子內使用。可以將 Keyed state 認為是分區(qū)后的 Operator state,每個 key 有一個狀態(tài)的分區(qū)(state-partition)。邏輯上,每個 Keyed state 綁定唯一的 <parallel-operator-instatnce, key>(算子并行實例和 key 的一對元組),可以將其簡單地視為 <operator, key>(因為每個 key 屬于算子的唯一一個并行實例)。
Keyed state 進一步被組織稱為 Key groups。Key groups 是 Flink 重新分配 Keyed state 的原子單位,Key groups 的數量與最大并行度相同。在程序執(zhí)行期間 keyed operator 的每個并行實例都使用一個或多個 Key groups 的 keys。
Raw State 和 Managed State
Keyed state 和 Operator state 有兩種形式:托管狀態(tài)(managed)和原始狀態(tài)(raw)。
托管狀態(tài)(managed state) 由 Flink runtime 管理的數據結構表示,例如內部哈希表或 RocksDB。Flink runtime 對 State 進行編碼并寫入 checkpoint。
原始狀態(tài)(raw state) 是在算子內部的數據結構中的保存。Checkpoint 只會保存 State 內容的字節(jié)序列,State 的真實數據結構對 Flink 是透明的。
所有數據流函數都可以使用托管狀態(tài)(managed state),而原始狀態(tài)(raw state)只能在具體實現算子時使用。建議使用托管狀態(tài)(而不是原始狀態(tài)),因為在托管狀態(tài)下,Flink 能夠在并行度改變時自適應地重新分配 State,并且在內存管理方面可以做的更好。
如果要為托管狀態(tài)自定義序列化的邏輯,請參考 自定義序列化 以確保將來的兼容性。默認序列化方法不需要特殊處理。
托管的 Keyed State
托管 Keys state 提供多種不同類型 State,作用域都是當前輸入數據的鍵,只能用于 KeyedStream,可以通過 stream.keyBy(…) 創(chuàng)建。
首先看不同類型的狀態(tài),以及它們如何在程序中使用:
ValueState<T>:保存了一個值,可以更新和讀?。ㄋ阕硬僮鞯拿總€ key 可能有一個 value)
update(T)更新
T value()取值ListState<T>:保存了一個列表 List??梢宰芳釉?,也可以獲取到一個包含所有當前存儲的元素的迭代器(Iterable)
add(T)或addAll(List<T>)添加到列表
Iterable<T> get()獲取迭代器
update(List<T>)使用新的列表覆蓋現有列表ReducingState<T>:保存一個值,表示添加到 State 的所有值的聚合結果。提供的接口類似于ListState
add(T)函數會使用指定的函數(ReduceFunction)對添加的值進行聚合AggregatingState<IN, OUT>:保存一個值,表示添加到 State 的所有值的聚合結果。與ReducingState不同的是,聚合結果的數據類型可以與添加到 State 的元素的數據類型不同。接口同樣類似于ListState
add(IN)函數會使用指定的函數(AggregateFunction)對添加的值進行聚合FoldingState<T, ACC>:保存一個值,表示添加到 State 的所有值的聚合結果。與ReducingState不同的事,聚合結果的數據類型可以與添加到 State 的元素的數據類型不同。接口同樣類似于ListState。已經過期的API。
add(IN)函數會使用指定的函數(FoldFunction)對添加的值進行聚合MapState<UK, UV>:保存一個映射表 Map。可以將 key/value 存入 State,也可以獲取到一個包含所有當前存儲的元素的迭代器(Iterable)
put(UK, UV)或putAll(Map<UK, UV>)添加 key/value 到 Map
get(UK)獲取與指定 key 的 value
entries()、keys()和values()對 Map 的元素/鍵/值遍歷訪問
所有類型的都有
clear()方法來清除當前狀態(tài)。FoldingState和FoldingStateDescriptor已在 Flink 1.4 中棄用,未來版本將被完全刪除。可以使用AggregatingState和AggregatingStateDescriptor代替。
首先,這些對象僅用于與 State 交互,State 不一定存儲在內存,也可能存儲在磁盤或其他位置。第二點,從 State 獲得的值取決于輸入數據的的 key,如果處理的 keys 不同,定義的函數的調用結果會不同。
要向操作 State 對象句柄,首先必須創(chuàng)建一個 StateDescriptor,該對象擁有 State 名稱(可以創(chuàng)建多個 State,必須具有唯一的名稱來引用 State),State 所持有的值的類型,可能還有用戶指定的函數(例如 ReduceFunction)。對應不同的 State 類型,有如下類對象:ValueStateDescriptor、ListStateDescriptor、ReducingStateDescriptor、FoldingStateDescriptor 和 MapStateDescriptor。然后使用 RuntimeContext 可以才訪問到 State,因此只能在 RichFunction 中使用,在 RichFunction 方法中 RuntimeContext 訪問不通類型 State 的方法:
- ValueState<T> getState(ValueStateDescriptor<T>)
- ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
- ListState<T> getListState(ListStateDescriptor<T>)
- AggregatingState<IN, OUT> getAggregatingState(AggregatingState<IN, OUT>)
FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)- MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)
下面是在 FlatMapFunction 中的使用示例:
實現了一個簡單的計數窗口,通過輸入元組的第一個參數分組,在分組的流中,每接收到兩個元組,返回兩那個元組的第二個參數的平均值
class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {
// 使用 ValueState 保存元素求和值
// 元組第一個參數為求和個數 count,第二個參數為求和值 sum
private var sum: ValueState[(Long, Long)] = _
override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {
// 訪問 State 的值
val tmpCurrentSum = sum.value
// 初始值 (0, 0)
val currentSum = if (tmpCurrentSum != null) {
tmpCurrentSum
} else {
(0L, 0L)
}
// 求和并更新 State
val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
sum.update(newSum)
// 如果 state value 達到 2, 發(fā)送統(tǒng)計的平均值并清空 state
if (newSum._1 >= 2) {
out.collect((input._1, newSum._2 / newSum._1))
sum.clear()
}
}
override def open(parameters: Configuration): Unit = {
// 通過 RuntimeContext 和 ValueStateDescriptor 獲取 ValueState
sum = getRuntimeContext.getState(
new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
)
}
}
object ExampleCountWindowAverage extends App {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// create keyed stream and call CountWindowAverage
env.fromCollection(List(
(1L, 3L), (1L, 5L), (1L, 7L), (1L, 4L), (1L, 2L)
))
.keyBy(_._1)
.flatMap(new CountWindowAverage())
.print()
// 輸出 (1,4) (1,5)
env.execute("ExampleManagedState")
}
狀態(tài)生存周期 (TTL)
生存期(TTL)可以被指定給任何類型的 Keyed state,如果配置了TTL并且 State value 已過期,State value 會被清除。所有集合類型 State (ListState 和 MapState)都支持為每個條目設置TTL。為了啟用 State TTL,首先需要構建 StateTtlConfig 對象,然后通過在 ValueStateDescriptor(其他類型同理)構造中傳入該對象來啟用TTL,參考下面的例子:
// Time.seconds(1) 生存時間
//
// StateTtlConfig.UpdateType 更新類型
// - UpdateType.OnCreateAndWrite - 創(chuàng)建和寫入時更新(默認)
// - UpdateType.OnReadAndWrite - 讀取和寫入時更新
//
// StateTtlConfig.StateVisibility 狀態(tài)可見性,訪問時是否返回已經過期的值
// - StateVisibility.NeverReturnExpired - 永遠不會返回過期的值(默認)
// - StateVisibility.ReturnExpiredIfNotCleanedUp -如果可以讀到會返回
//
val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build
// 構建 ValueStateDescriptor
val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
// 啟用TTL
stateDescriptor.enableTimeToLive(ttlConfig)
補充:
- State backends 存儲最新一次修改的時間戳以及更新值,因此啟用TTL會增加存儲的消耗。
- 目前TTL僅支持處理時間(processing time)。
- 如果之前沒有配置TTL,而狀態(tài)恢復時啟用TTL(相反的情況同樣),會引起兼容性錯誤和 StateMigrationException 異常。
- TTL配置不是 checkpoint 或 savepoint 的一部分,而是在 Flink 運行時做處理。
- 對于 Map 類型,只有序列化方法支持空值時,TTL的設置才支持空值,否則需要使用
NullableSerializer進行封裝(序列化會占用額外的字節(jié))。
過期數據清理
目前過期值只有在顯式訪問時才會被刪除(如調用 ValueState.value())。如果未讀取過期狀態(tài)的數據,則不會將其刪除,這可能會導致狀態(tài)不斷增長。將來的版本中可能會發(fā)生變化。
此外,可以指定在獲取完整的 State 快照時激活清理方法,以減小快照的大小。當從上一個快照恢復時,不會包括已刪除的過期 State。StateTtlConfig 可以指定(不適用于 RocksDB 增量 checkpoint 的情況):
val ttlConfig = StateTtlConfig
.newBuilder(Time.seconds(1))
.cleanupFullSnapshot
.build
Scala 中的快捷接口
除了上述接口外,Scala API 可以在 KeyedStream 上的有狀態(tài) map() 或 flatMap() 函數中使用 ValueState。如下獲取 ValueState 當前值,并必須返回用于更新的新值:
val stream: DataStream[(String, Int)] = ...
// 調用 mapWithState
val counts: DataStream[(String, Int)] = stream
.keyBy(_._1)
.mapWithState((in: (String, Int), count: Option[Int]) =>
count match {
case Some(c) => ( (in._1, c), Some(c + in._2) )
case None => ( (in._1, 0), Some(in._2) )
})
托管的 Operator State
Operator state 的數據結構不像 Keyed state 豐富,只支持 List,可以認為是可序列化對象的列表,彼此獨立。這些對象在動態(tài)擴展時是可以重新分配 non-keyed state 的最小單元。目前支持幾種動態(tài)擴展方式:
Even-split redistribution:算子并發(fā)度發(fā)生改變的時候,并發(fā)的每個實例取出 State 列表,合并到一個新的列表上,形成邏輯上完整的 State。然后根據列表元素的個數,均勻分配給新的并發(fā)實例(Task)。
例如,如果并行度為1,算子的 State checkpoint 包含數據元 element1 和 element2,當并行度增加到2時,element1 會在 算子實例0中,而element2在算子實例1中。Union redistribution:相比于平均分配更加靈活,把完整 State 劃分的方式交給用戶去做。并發(fā)度發(fā)生改變的時候,按同樣的方式取到完整的 State 列表,然后直接交給每個實例。
使用托管的 Operator State,有狀態(tài)函數需要實現 CheckpointedFunction 接口(更通用的)或 ListCheckpointed<T extends Serializable>接口。
CheckpointedFunction
CheckpointedFunction 需要實現兩個方法:
// 執(zhí)行 Checkpoint 時調用
void snapshotState(FunctionSnapshotContext context) throws Exception;
// 初始化函數時調用
void initializeState(FunctionInitializationContext context) throws Exception;
initializeState()函數不止在首次初始化函數調用,從 checkpoint 恢復時同樣會調用。因此,不僅要初始化 State 的邏輯,還包括 State 恢復的邏輯。
下面的例子是有狀態(tài)的 SinkFunction,利用 CheckpointedFunction 在將數據元發(fā)送之前進行緩存,checkpoint 時緩存寫入 State,啟動時判斷是否使用 重發(fā)的 State 恢復緩存:
class BufferingSink(threshold: Int = 0) extends SinkFunction[(String, Int)]
with CheckpointedFunction {
@transient
private var checkpointedState: ListState[(String, Int)] = _
// 輸入數據緩存
private val bufferedElements = ListBuffer[(String, Int)]()
override def invoke(value: (String, Int)): Unit = {
// 輸入數據存入 buffer
bufferedElements += value
// 當緩存數量達到閾值,發(fā)送數據,并清理 buffer
if (bufferedElements.size == threshold) {
for (element <- bufferedElements) {
// send it to the sink
}
bufferedElements.clear()
}
}
// 執(zhí)行 Checkpoint 時調用
// 清除前一個檢查點包含的所有對象
// 緩存數據添加到 State
override def snapshotState(context: FunctionSnapshotContext): Unit = {
checkpointedState.clear()
for (element <- bufferedElements) {
checkpointedState.add(element)
}
}
// 用于首次初始化或從 checkpoint 恢復
override def initializeState(context: FunctionInitializationContext): Unit = {
val descriptor = new ListStateDescriptor[(String, Int)](
"buffered-elements",
TypeInformation.of(new TypeHint[(String, Int)]() {})
)
// 使用 ctx 和 ListStateDescriptor 訪問 ListState
// 注意調用方法 getListState(descriptor)
checkpointedState = context.getOperatorStateStore.getListState(descriptor)
// 根據 isRestored() 方法來檢查當前是否是 checkpoint 恢復的情況
if(context.isRestored) {
// 如果 true,表示是恢復失敗的情況,應用恢復數據的邏輯:
// 恢復數據添加到 buffer 中
for(element <- checkpointedState.get()) {
bufferedElements += element
}
}
}
}
獲取 State 句柄函數的命名中約定包含其重新分發(fā)模式,其次時狀態(tài)結構。上面的例子中調用方法 getListState 表示使用了平均分發(fā)(even-split)的方式。如果要使用 union 的方式,使用 getUnionListState(descriptor) 方法返回的 ListState,會包含完整的 State 列表,由用戶決定處理恢復哪些數據。
ListCheckpointed
ListCheckpointed 是 CheckpointedFunction 的一個有限變體,只支持平均分發(fā)(even-split)的情況,同樣需要實現兩個方法:
// 執(zhí)行 Checkpoint 時調用
List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
// 初始化函數時調用
void restoreState(List<T> state) throws Exception;
有狀態(tài)的 Source Functions
在 Source 方法中,更新狀態(tài)和輸出應該在一個原子操作下完成(為了滿足精確一次的語義下的故障恢復),因此用戶需要取一個鎖,可以從ctx中獲得。
class CounterSource extends RichParallelSourceFunction[Long]
with ListCheckpointed[Long] {
@volatile
private var isRunning = true
private var offset = 0L
override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
// 獲取鎖
val lock = ctx.getCheckpointLock
while (isRunning) {
// 輸出和更新在一個原子操作中
lock.synchronized({
ctx.collect(offset)
offset += 1
})
}
}
override def cancel(): Unit = isRunning = false
override def restoreState(state: util.List[Long]): Unit =
for (s <- state) {
offset = s
}
override def snapshotState(checkpointId: Long, timestamp: Long): util.List[Long] =
Collections.singletonList(offset)
}
Broadcast State
前面提到了兩種 Operator state 支持的動態(tài)擴展方法:even-split(算子的每個平行任務平均的恢復狀態(tài)的一部分)和 union(每個任務獲得完整的狀態(tài)恢復)。Broadcast State 是 Flink 支持的另一種擴展方式。用來支持將某一個流的數據廣播到所有下游任務,數據被存儲在本地,接受到廣播的流在操作時可以使用這些數據。
Broadcast state 的特點是:
- 使用 Map 類型的數據結構
- 僅適用于同時具有廣播流和非廣播流作為數據輸入的特定算子
- 可以具有多個不同名稱的 Broadcast state
Broadcast State API
下面是一個例子來展示 Broadcast State API 的使用,在這個示例中,要處理的數據流中是有不同顏色(Color)和形狀(Shape)屬性的對象,希望在流中找到一對具有相同顏色的,并且遵循一個特定的形狀模式的對象組(例如,一個紅色長方形后面緊跟著一個紅色三角形的組合)。
第一個數據流是要處理的數據源,流中的對象具有 Color 和 Shape 屬性
// 使用 Color 作為鍵來分組,保證相同 Color 的數據進入到同一個子任務中
KeyedStream<Item, Color> colorPartitionedStream = shapeStream
.keyBy(new KeySelector<Shape, Color>(){...});
第二個數據流是要廣播的數據,流中的對象是匹配規(guī)則(Rules)
// MapState 中保存 (RuleName,Rule) ,在描述類中指定 State name
MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(
"RulesBroadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<Rule>() {}));
// ruleStream 使用 MapStateDescriptor 作為參數廣播,得到廣播流
BroadcastStream<Rule> ruleBroadcastStream = ruleStream
.broadcast(ruleStateDescriptor);
然后需要做的是,連接兩個流并且指定兩個連接后的處理邏輯(使用廣播的 rules 解析流中匹配的數據組,并返回)
DataStream<Match> output = colorPartitionedStream
.connect(ruleBroadcastStream)
.process(
new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
// matching logic
}
)
stream.connect(BroadcastStream) 方法用來連接廣播流和非廣播流,BroadcastStream 作為參數,返回一個 BroadcastConnectedStream 對象。BroadcastConnectedStream 調用 process() 方法執(zhí)行處理邏輯,需要指定一個邏輯實現類作為參數,而具體的需要實現的抽象類取決于非廣播流的類型:
- 如果非廣播流是 keyed stream,需要實現 KeyedBroadcastProcessFunction
- 如果非廣播流是 non-keyed stream,需要實現 BroadcastProcessFunction
這里注意下
KeyedBroadcastProcessFunction<Color, Item, Rule, String>各個參數含義:
Color,非廣播流 keyed stream 的鍵的類型
Item,非廣播流的對象類型
Rule,廣播流的對象類型
String,返回結果的類型
BroadcastProcessFunction 和 KeyedBroadcastProcessFunction
這兩個抽象函數有兩個相同的需要實現的接口:
-
processBroadcastElement()處理廣播流中接收的數據元 -
processElement()處理非廣播流數據的方法
用于處理非廣播流是 non-keyed stream 的情況
public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
}
用于處理非廣播流是 keyed stream 的情況
public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {
public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;
public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
}
可以看到這兩個接口提供的上下文對象有所不同。非廣播方(processElement)使用 ReadOnlyContext,而廣播方(processBroadcastElement)使用 Context。
這兩個上下文對象(簡稱ctx)通用的方法接口:
- 訪問 Broadcast state:
ctx.getBroadcastState(MapStateDescriptor<K, V> stateDescriptor) - 查詢數據元的時間戳:
ctx.timestamp() - 獲取當前水?。?code>ctx.currentWatermark()
- 獲取當前處理時間:
ctx.currentProcessingTime() - 向旁路輸出(side-outputs)發(fā)送數據:
ctx.output(OutputTag<X> outputTag, X value)
這兩者不同之處在于對 Broadcast state 的訪問限制:廣播方對其具有讀和寫的權限(read-write),非廣播方只有讀的權限(read-only)
這么設計的原因是,保證 Broadcast state 在算子的所有并行實例中是相同的。由于 Flink 中沒有跨任務的通信機制,在一個任務實例中的修改不能在并行任務間傳遞。而廣播端在所有并行任務中都能看到相同的數據元,只對廣播端提供可寫的權限。
同時要求在廣播端的每個并行任務中,對接收數據的處理是相同的。如果忽略此規(guī)則會破壞 State 的一致性保證,從而導致不一致且難以診斷的結果。也就是說,
processBroadcast()的實現邏輯必須在所有并行實例中具有相同的確定性行為。
回到上一個例子,KeyedBroadcastProcessFunction 的實現可能看起來如下:
new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
// store partial matches, i.e. first elements of the pair waiting for their second element
// we keep a list as we may have many first elements waiting
private final MapStateDescriptor<String, List<Item>> mapStateDesc =
new MapStateDescriptor<>(
"items",
BasicTypeInfo.STRING_TYPE_INFO,
new ListTypeInfo<>(Item.class));
// identical to our ruleStateDescriptor above
private final MapStateDescriptor<String, Rule> ruleStateDescriptor =
new MapStateDescriptor<>(
"RulesBroadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<Rule>() {}));
@Override
public void processBroadcastElement(Rule value,
Context ctx,
Collector<String> out) throws Exception {
ctx.getBroadcastState(ruleStateDescriptor).put(value.name, value);
}
@Override
public void processElement(Item value,
ReadOnlyContext ctx,
Collector<String> out) throws Exception {
final MapState<String, List<Item>> state = getRuntimeContext().getMapState(mapStateDesc);
final Shape shape = value.getShape();
for (Map.Entry<String, Rule> entry:
ctx.getBroadcastState(ruleStateDescriptor).immutableEntries()) {
final String ruleName = entry.getKey();
final Rule rule = entry.getValue();
List<Item> stored = state.get(ruleName);
if (stored == null) {
stored = new ArrayList<>();
}
if (shape == rule.second && !stored.isEmpty()) {
for (Item i : stored) {
out.collect("MATCH: " + i + " - " + value);
}
stored.clear();
}
// there is no else{} to cover if rule.first == rule.second
if (shape.equals(rule.first)) {
stored.add(value);
}
if (stored.isEmpty()) {
state.remove(ruleName);
} else {
state.put(ruleName, stored);
}
}
}
}
重要考慮因素
使用 Broadcast state 時要注意的是:
- 沒有跨任務的通信,這就是為什么只有廣播方可以修改 Broadcast state 的原因。
- 用戶必須確保所有任務以相同的方式為每個傳入的數據元更新 Broadcast state。否則,可能導致不一致的結果。
- 跨任務的 Broadcast state 中的事件順序可能不同,盡管廣播的元素可以保證所有元素都將轉到所有下游任務,但元素可能以不同的順序到達。因此,Broadcast state 更新不能依賴傳入事件的順序。
- 所有任務都會把 Broadcast state 存入 checkpoint,而不僅僅是其中一,雖然 checkpoint 發(fā)生時所有任務在具有相同的 broadcast state。這是為了避免在恢復期間所有任務從同一文件中進行恢復(避免熱點),然而代價是做 state checkpoint 的大小增加了并行度數量的倍數。
- Flink 確保在恢復或改變并行度時不會有重復數據,也不會丟失數據。在具有相同或更小并行度的恢復的情況下,每個任務讀取其狀態(tài)檢查點。在并行度放大時,每個任務都會讀取自己的狀態(tài),多余的任務以循環(huán)方式讀取前面任務的檢查點。
- 不支持 RocksDB state backend,broadcast state 在運行時保存在內存中。
Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/broadcast_state.html
https://flink.xskoo.com/dev/stream/state/state.html