Flink DataStream 狀態(tài)和容錯 一:狀態(tài)的使用

有狀態(tài)函數(Stateful function)和算子(Operator)在處理獨立的數據或事件時存儲數據,使得狀態(tài)(State)成為任何復雜算子中的關鍵部分,例如:

  • 當應用檢索特定的事件模式,State 將存儲到接收到的事件的序列。
  • 當按照分鐘/小時/天的唯獨聚合事件時,狀態(tài)將保留待處理的聚合狀態(tài)。
  • 當在流數據上進行機器學習的模型訓練時,狀態(tài)保存模型當前版本的參數。
  • 當需要管理歷史數據時,狀態(tài)允許有效地訪問過去發(fā)生的事件。

使用 Flink 需要了解 State,以便應用的檢查點(checkpoint)容錯和保存點(savepoint)可用。

State 的使用

Keyed State 和 Operator State

Flink 中的狀態(tài)分為兩種:Keyed state 和 Operator state

Operator State

每個 Operator state(non-keyed state) 都綁定到一個并行的算子實例上??梢詤⒖?Kafka connector 的例子,Kafka 消費者的每個并行實例都維護一個 topic 分區(qū)和偏移(offset)的對應關系作為 Operator state。Operator state 支持當并行度更改時,在并行的算子實例間重新分配 State,進行這種重新分配有多種不同的方案(后面會介紹)。

Keyed State

Keyed state 與鍵(key)相關,只能在 KeyedStream 上應用的函數和算子內使用。可以將 Keyed state 認為是分區(qū)后的 Operator state,每個 key 有一個狀態(tài)的分區(qū)(state-partition)。邏輯上,每個 Keyed state 綁定唯一的 <parallel-operator-instatnce, key>(算子并行實例和 key 的一對元組),可以將其簡單地視為 <operator, key>(因為每個 key 屬于算子的唯一一個并行實例)。

Keyed state 進一步被組織稱為 Key groups。Key groups 是 Flink 重新分配 Keyed state 的原子單位,Key groups 的數量與最大并行度相同。在程序執(zhí)行期間 keyed operator 的每個并行實例都使用一個或多個 Key groups 的 keys。

Raw State 和 Managed State

Keyed state 和 Operator state 有兩種形式:托管狀態(tài)(managed)和原始狀態(tài)(raw)。

托管狀態(tài)(managed state) 由 Flink runtime 管理的數據結構表示,例如內部哈希表或 RocksDB。Flink runtime 對 State 進行編碼并寫入 checkpoint。

原始狀態(tài)(raw state) 是在算子內部的數據結構中的保存。Checkpoint 只會保存 State 內容的字節(jié)序列,State 的真實數據結構對 Flink 是透明的。

所有數據流函數都可以使用托管狀態(tài)(managed state),而原始狀態(tài)(raw state)只能在具體實現算子時使用。建議使用托管狀態(tài)(而不是原始狀態(tài)),因為在托管狀態(tài)下,Flink 能夠在并行度改變時自適應地重新分配 State,并且在內存管理方面可以做的更好。

如果要為托管狀態(tài)自定義序列化的邏輯,請參考 自定義序列化 以確保將來的兼容性。默認序列化方法不需要特殊處理。

托管的 Keyed State

托管 Keys state 提供多種不同類型 State,作用域都是當前輸入數據的鍵,只能用于 KeyedStream,可以通過 stream.keyBy(…) 創(chuàng)建。

首先看不同類型的狀態(tài),以及它們如何在程序中使用:

  • ValueState<T>:保存了一個值,可以更新和讀?。ㄋ阕硬僮鞯拿總€ key 可能有一個 value)
    update(T) 更新
    T value() 取值

  • ListState<T>:保存了一個列表 List??梢宰芳釉?,也可以獲取到一個包含所有當前存儲的元素的迭代器(Iterable)
    add(T)addAll(List<T>) 添加到列表
    Iterable<T> get() 獲取迭代器
    update(List<T>) 使用新的列表覆蓋現有列表

  • ReducingState<T>:保存一個值,表示添加到 State 的所有值的聚合結果。提供的接口類似于 ListState
    add(T) 函數會使用指定的函數(ReduceFunction)對添加的值進行聚合

  • AggregatingState<IN, OUT>:保存一個值,表示添加到 State 的所有值的聚合結果。與 ReducingState 不同的是,聚合結果的數據類型可以與添加到 State 的元素的數據類型不同。接口同樣類似于 ListState
    add(IN) 函數會使用指定的函數(AggregateFunction)對添加的值進行聚合

  • FoldingState<T, ACC>:保存一個值,表示添加到 State 的所有值的聚合結果。與 ReducingState 不同的事,聚合結果的數據類型可以與添加到 State 的元素的數據類型不同。接口同樣類似于 ListState已經過期的API。
    add(IN) 函數會使用指定的函數(FoldFunction)對添加的值進行聚合

  • MapState<UK, UV>:保存一個映射表 Map。可以將 key/value 存入 State,也可以獲取到一個包含所有當前存儲的元素的迭代器(Iterable)
    put(UK, UV)putAll(Map<UK, UV>) 添加 key/value 到 Map
    get(UK) 獲取與指定 key 的 value
    entries()、keys()values() 對 Map 的元素/鍵/值遍歷訪問

所有類型的都有 clear() 方法來清除當前狀態(tài)。FoldingStateFoldingStateDescriptor 已在 Flink 1.4 中棄用,未來版本將被完全刪除。可以使用 AggregatingStateAggregatingStateDescriptor 代替。

首先,這些對象僅用于與 State 交互,State 不一定存儲在內存,也可能存儲在磁盤或其他位置。第二點,從 State 獲得的值取決于輸入數據的的 key,如果處理的 keys 不同,定義的函數的調用結果會不同。

要向操作 State 對象句柄,首先必須創(chuàng)建一個 StateDescriptor,該對象擁有 State 名稱(可以創(chuàng)建多個 State,必須具有唯一的名稱來引用 State),State 所持有的值的類型,可能還有用戶指定的函數(例如 ReduceFunction)。對應不同的 State 類型,有如下類對象:ValueStateDescriptor、ListStateDescriptor、ReducingStateDescriptor、FoldingStateDescriptorMapStateDescriptor。然后使用 RuntimeContext 可以才訪問到 State,因此只能在 RichFunction 中使用,在 RichFunction 方法中 RuntimeContext 訪問不通類型 State 的方法:

  • ValueState<T> getState(ValueStateDescriptor<T>)
  • ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
  • ListState<T> getListState(ListStateDescriptor<T>)
  • AggregatingState<IN, OUT> getAggregatingState(AggregatingState<IN, OUT>)
  • FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)
  • MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)

下面是在 FlatMapFunction 中的使用示例:

實現了一個簡單的計數窗口,通過輸入元組的第一個參數分組,在分組的流中,每接收到兩個元組,返回兩那個元組的第二個參數的平均值

class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {

    // 使用 ValueState 保存元素求和值
    // 元組第一個參數為求和個數 count,第二個參數為求和值 sum
    private var sum: ValueState[(Long, Long)] = _

    override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {
        // 訪問 State 的值
        val tmpCurrentSum = sum.value
        
        // 初始值 (0, 0)
        val currentSum = if (tmpCurrentSum != null) {
            tmpCurrentSum
        } else {
            (0L, 0L)
        }

        // 求和并更新 State
        val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
        sum.update(newSum)

        // 如果 state value 達到 2, 發(fā)送統(tǒng)計的平均值并清空 state
        if (newSum._1 >= 2) {
            out.collect((input._1, newSum._2 / newSum._1))
            sum.clear()
        }
    }

    override def open(parameters: Configuration): Unit = {
    
        // 通過 RuntimeContext 和 ValueStateDescriptor 獲取 ValueState
        sum = getRuntimeContext.getState(
            new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
        )
    }
}

object ExampleCountWindowAverage extends App {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // create keyed stream and call CountWindowAverage
    env.fromCollection(List(
        (1L, 3L), (1L, 5L), (1L, 7L), (1L, 4L), (1L, 2L)
    ))
        .keyBy(_._1)
        .flatMap(new CountWindowAverage())
        .print()
        
    // 輸出 (1,4) (1,5)
    env.execute("ExampleManagedState")
}

狀態(tài)生存周期 (TTL)

生存期(TTL)可以被指定給任何類型的 Keyed state,如果配置了TTL并且 State value 已過期,State value 會被清除。所有集合類型 State (ListState 和 MapState)都支持為每個條目設置TTL。為了啟用 State TTL,首先需要構建 StateTtlConfig 對象,然后通過在 ValueStateDescriptor(其他類型同理)構造中傳入該對象來啟用TTL,參考下面的例子:

// Time.seconds(1) 生存時間 
// 
// StateTtlConfig.UpdateType 更新類型
//   - UpdateType.OnCreateAndWrite - 創(chuàng)建和寫入時更新(默認)
//   - UpdateType.OnReadAndWrite - 讀取和寫入時更新
// 
// StateTtlConfig.StateVisibility 狀態(tài)可見性,訪問時是否返回已經過期的值
//   - StateVisibility.NeverReturnExpired - 永遠不會返回過期的值(默認)
//   - StateVisibility.ReturnExpiredIfNotCleanedUp -如果可以讀到會返回
// 
val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) 
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build
// 構建 ValueStateDescriptor
val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
// 啟用TTL
stateDescriptor.enableTimeToLive(ttlConfig)

補充:

  • State backends 存儲最新一次修改的時間戳以及更新值,因此啟用TTL會增加存儲的消耗。
  • 目前TTL僅支持處理時間(processing time)。
  • 如果之前沒有配置TTL,而狀態(tài)恢復時啟用TTL(相反的情況同樣),會引起兼容性錯誤和 StateMigrationException 異常。
  • TTL配置不是 checkpoint 或 savepoint 的一部分,而是在 Flink 運行時做處理。
  • 對于 Map 類型,只有序列化方法支持空值時,TTL的設置才支持空值,否則需要使用 NullableSerializer 進行封裝(序列化會占用額外的字節(jié))。

過期數據清理

目前過期值只有在顯式訪問時才會被刪除(如調用 ValueState.value())。如果未讀取過期狀態(tài)的數據,則不會將其刪除,這可能會導致狀態(tài)不斷增長。將來的版本中可能會發(fā)生變化。

此外,可以指定在獲取完整的 State 快照時激活清理方法,以減小快照的大小。當從上一個快照恢復時,不會包括已刪除的過期 State。StateTtlConfig 可以指定(不適用于 RocksDB 增量 checkpoint 的情況):

val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupFullSnapshot
    .build

Scala 中的快捷接口

除了上述接口外,Scala API 可以在 KeyedStream 上的有狀態(tài) map()flatMap() 函數中使用 ValueState。如下獲取 ValueState 當前值,并必須返回用于更新的新值:

val stream: DataStream[(String, Int)] = ...

// 調用 mapWithState
val counts: DataStream[(String, Int)] = stream
  .keyBy(_._1)
  .mapWithState((in: (String, Int), count: Option[Int]) =>
    count match {
      case Some(c) => ( (in._1, c), Some(c + in._2) )
      case None => ( (in._1, 0), Some(in._2) )
    })

托管的 Operator State

Operator state 的數據結構不像 Keyed state 豐富,只支持 List,可以認為是可序列化對象的列表,彼此獨立。這些對象在動態(tài)擴展時是可以重新分配 non-keyed state 的最小單元。目前支持幾種動態(tài)擴展方式:

  • Even-split redistribution:算子并發(fā)度發(fā)生改變的時候,并發(fā)的每個實例取出 State 列表,合并到一個新的列表上,形成邏輯上完整的 State。然后根據列表元素的個數,均勻分配給新的并發(fā)實例(Task)。
    例如,如果并行度為1,算子的 State checkpoint 包含數據元 element1 和 element2,當并行度增加到2時,element1 會在 算子實例0中,而element2在算子實例1中。

  • Union redistribution:相比于平均分配更加靈活,把完整 State 劃分的方式交給用戶去做。并發(fā)度發(fā)生改變的時候,按同樣的方式取到完整的 State 列表,然后直接交給每個實例。

使用托管的 Operator State,有狀態(tài)函數需要實現 CheckpointedFunction 接口(更通用的)或 ListCheckpointed<T extends Serializable>接口。

CheckpointedFunction

CheckpointedFunction 需要實現兩個方法:

// 執(zhí)行 Checkpoint 時調用
void snapshotState(FunctionSnapshotContext context) throws Exception;

// 初始化函數時調用
void initializeState(FunctionInitializationContext context) throws Exception;

initializeState() 函數不止在首次初始化函數調用,從 checkpoint 恢復時同樣會調用。因此,不僅要初始化 State 的邏輯,還包括 State 恢復的邏輯。

下面的例子是有狀態(tài)的 SinkFunction,利用 CheckpointedFunction 在將數據元發(fā)送之前進行緩存,checkpoint 時緩存寫入 State,啟動時判斷是否使用 重發(fā)的 State 恢復緩存:

class BufferingSink(threshold: Int = 0) extends SinkFunction[(String, Int)]
    with CheckpointedFunction {

    @transient
    private var checkpointedState: ListState[(String, Int)] = _

    // 輸入數據緩存
    private val bufferedElements = ListBuffer[(String, Int)]()

    override def invoke(value: (String, Int)): Unit = {
        // 輸入數據存入 buffer
        bufferedElements += value
        
        // 當緩存數量達到閾值,發(fā)送數據,并清理 buffer
        if (bufferedElements.size == threshold) {
            for (element <- bufferedElements) {
                // send it to the sink
            }
            bufferedElements.clear()
        }
    }

    // 執(zhí)行 Checkpoint 時調用
    // 清除前一個檢查點包含的所有對象
    // 緩存數據添加到 State
    override def snapshotState(context: FunctionSnapshotContext): Unit = {
        checkpointedState.clear()
        for (element <- bufferedElements) {
            checkpointedState.add(element)
        }
    }

    // 用于首次初始化或從 checkpoint 恢復
    override def initializeState(context: FunctionInitializationContext): Unit = {
        
        val descriptor = new ListStateDescriptor[(String, Int)](
            "buffered-elements",
            TypeInformation.of(new TypeHint[(String, Int)]() {})
        )

        // 使用 ctx 和 ListStateDescriptor 訪問 ListState
        // 注意調用方法 getListState(descriptor)
        checkpointedState = context.getOperatorStateStore.getListState(descriptor)

        // 根據 isRestored() 方法來檢查當前是否是 checkpoint 恢復的情況
        if(context.isRestored) {
            // 如果 true,表示是恢復失敗的情況,應用恢復數據的邏輯:
            //   恢復數據添加到 buffer 中
            for(element <- checkpointedState.get()) {
                bufferedElements += element
            }
        }
    }

}

獲取 State 句柄函數的命名中約定包含其重新分發(fā)模式,其次時狀態(tài)結構。上面的例子中調用方法 getListState 表示使用了平均分發(fā)(even-split)的方式。如果要使用 union 的方式,使用 getUnionListState(descriptor) 方法返回的 ListState,會包含完整的 State 列表,由用戶決定處理恢復哪些數據。

ListCheckpointed

ListCheckpointedCheckpointedFunction 的一個有限變體,只支持平均分發(fā)(even-split)的情況,同樣需要實現兩個方法:

// 執(zhí)行 Checkpoint 時調用
List<T> snapshotState(long checkpointId, long timestamp) throws Exception;

// 初始化函數時調用
void restoreState(List<T> state) throws Exception;

有狀態(tài)的 Source Functions

在 Source 方法中,更新狀態(tài)和輸出應該在一個原子操作下完成(為了滿足精確一次的語義下的故障恢復),因此用戶需要取一個鎖,可以從ctx中獲得。

class CounterSource extends RichParallelSourceFunction[Long]
       with ListCheckpointed[Long] {

    @volatile
    private var isRunning = true

    private var offset = 0L

    override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
        
        // 獲取鎖
        val lock = ctx.getCheckpointLock

        while (isRunning) {
            // 輸出和更新在一個原子操作中
            lock.synchronized({
                ctx.collect(offset)
                offset += 1
            })
        }
    }

    override def cancel(): Unit = isRunning = false

    override def restoreState(state: util.List[Long]): Unit =
        for (s <- state) {
            offset = s
        }

    override def snapshotState(checkpointId: Long, timestamp: Long): util.List[Long] =
        Collections.singletonList(offset)
}

Broadcast State

前面提到了兩種 Operator state 支持的動態(tài)擴展方法:even-split(算子的每個平行任務平均的恢復狀態(tài)的一部分)和 union(每個任務獲得完整的狀態(tài)恢復)。Broadcast State 是 Flink 支持的另一種擴展方式。用來支持將某一個流的數據廣播到所有下游任務,數據被存儲在本地,接受到廣播的流在操作時可以使用這些數據。

Broadcast state 的特點是:

  • 使用 Map 類型的數據結構
  • 僅適用于同時具有廣播流和非廣播流作為數據輸入的特定算子
  • 可以具有多個不同名稱的 Broadcast state

Broadcast State API

下面是一個例子來展示 Broadcast State API 的使用,在這個示例中,要處理的數據流中是有不同顏色(Color)和形狀(Shape)屬性的對象,希望在流中找到一對具有相同顏色的,并且遵循一個特定的形狀模式的對象組(例如,一個紅色長方形后面緊跟著一個紅色三角形的組合)。

第一個數據流是要處理的數據源,流中的對象具有 Color 和 Shape 屬性

// 使用 Color 作為鍵來分組,保證相同 Color 的數據進入到同一個子任務中
KeyedStream<Item, Color> colorPartitionedStream = shapeStream
                        .keyBy(new KeySelector<Shape, Color>(){...});

第二個數據流是要廣播的數據,流中的對象是匹配規(guī)則(Rules)

// MapState 中保存 (RuleName,Rule) ,在描述類中指定 State name
MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(
            "RulesBroadcastState",
            BasicTypeInfo.STRING_TYPE_INFO,
            TypeInformation.of(new TypeHint<Rule>() {}));
        
// ruleStream 使用 MapStateDescriptor 作為參數廣播,得到廣播流
BroadcastStream<Rule> ruleBroadcastStream = ruleStream
                        .broadcast(ruleStateDescriptor);

然后需要做的是,連接兩個流并且指定兩個連接后的處理邏輯(使用廣播的 rules 解析流中匹配的數據組,并返回)

DataStream<Match> output = colorPartitionedStream
                .connect(ruleBroadcastStream)
                .process(
                    new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
                        // matching logic
                    }
                )

stream.connect(BroadcastStream) 方法用來連接廣播流和非廣播流,BroadcastStream 作為參數,返回一個 BroadcastConnectedStream 對象。BroadcastConnectedStream 調用 process() 方法執(zhí)行處理邏輯,需要指定一個邏輯實現類作為參數,而具體的需要實現的抽象類取決于非廣播流的類型:

  • 如果非廣播流是 keyed stream,需要實現 KeyedBroadcastProcessFunction
  • 如果非廣播流是 non-keyed stream,需要實現 BroadcastProcessFunction

這里注意下 KeyedBroadcastProcessFunction<Color, Item, Rule, String> 各個參數含義:
Color,非廣播流 keyed stream 的鍵的類型
Item,非廣播流的對象類型
Rule,廣播流的對象類型
String,返回結果的類型

BroadcastProcessFunction 和 KeyedBroadcastProcessFunction

這兩個抽象函數有兩個相同的需要實現的接口:

  • processBroadcastElement() 處理廣播流中接收的數據元
  • processElement() 處理非廣播流數據的方法

用于處理非廣播流是 non-keyed stream 的情況

public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {

    public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;

    public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
}

用于處理非廣播流是 keyed stream 的情況

public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {

    public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;

    public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;

    public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
}

可以看到這兩個接口提供的上下文對象有所不同。非廣播方(processElement)使用 ReadOnlyContext,而廣播方(processBroadcastElement)使用 Context。

這兩個上下文對象(簡稱ctx)通用的方法接口:

  1. 訪問 Broadcast state:ctx.getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)
  2. 查詢數據元的時間戳:ctx.timestamp()
  3. 獲取當前水?。?code>ctx.currentWatermark()
  4. 獲取當前處理時間:ctx.currentProcessingTime()
  5. 向旁路輸出(side-outputs)發(fā)送數據:ctx.output(OutputTag<X> outputTag, X value)

這兩者不同之處在于對 Broadcast state 的訪問限制:廣播方對其具有讀和寫的權限(read-write),非廣播方只有讀的權限(read-only)

這么設計的原因是,保證 Broadcast state 在算子的所有并行實例中是相同的。由于 Flink 中沒有跨任務的通信機制,在一個任務實例中的修改不能在并行任務間傳遞。而廣播端在所有并行任務中都能看到相同的數據元,只對廣播端提供可寫的權限。

同時要求在廣播端的每個并行任務中,對接收數據的處理是相同的。如果忽略此規(guī)則會破壞 State 的一致性保證,從而導致不一致且難以診斷的結果。也就是說,processBroadcast() 的實現邏輯必須在所有并行實例中具有相同的確定性行為。

回到上一個例子,KeyedBroadcastProcessFunction 的實現可能看起來如下:

new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {

    // store partial matches, i.e. first elements of the pair waiting for their second element
    // we keep a list as we may have many first elements waiting
    private final MapStateDescriptor<String, List<Item>> mapStateDesc =
        new MapStateDescriptor<>(
            "items",
            BasicTypeInfo.STRING_TYPE_INFO, 
            new ListTypeInfo<>(Item.class));

    // identical to our ruleStateDescriptor above
    private final MapStateDescriptor<String, Rule> ruleStateDescriptor = 
        new MapStateDescriptor<>(
            "RulesBroadcastState",
            BasicTypeInfo.STRING_TYPE_INFO,
            TypeInformation.of(new TypeHint<Rule>() {}));

    @Override
    public void processBroadcastElement(Rule value, 
                                        Context ctx, 
                                        Collector<String> out) throws Exception {
        ctx.getBroadcastState(ruleStateDescriptor).put(value.name, value);
    }

    @Override
    public void processElement(Item value, 
                               ReadOnlyContext ctx, 
                               Collector<String> out) throws Exception {

        final MapState<String, List<Item>> state = getRuntimeContext().getMapState(mapStateDesc);
        final Shape shape = value.getShape();
    
        for (Map.Entry<String, Rule> entry: 
                ctx.getBroadcastState(ruleStateDescriptor).immutableEntries()) {
            final String ruleName = entry.getKey();
            final Rule rule = entry.getValue();
    
            List<Item> stored = state.get(ruleName);
            if (stored == null) {
                stored = new ArrayList<>();
            }
    
            if (shape == rule.second && !stored.isEmpty()) {
                for (Item i : stored) {
                    out.collect("MATCH: " + i + " - " + value);
                }
                stored.clear();
            }
    
            // there is  no else{} to cover if rule.first == rule.second
            if (shape.equals(rule.first)) {
                stored.add(value);
            }
    
            if (stored.isEmpty()) {
                state.remove(ruleName);
            } else {
                state.put(ruleName, stored);
            }
        }
    }
}

重要考慮因素

使用 Broadcast state 時要注意的是:

  • 沒有跨任務的通信,這就是為什么只有廣播方可以修改 Broadcast state 的原因。
  • 用戶必須確保所有任務以相同的方式為每個傳入的數據元更新 Broadcast state。否則,可能導致不一致的結果。
  • 跨任務的 Broadcast state 中的事件順序可能不同,盡管廣播的元素可以保證所有元素都將轉到所有下游任務,但元素可能以不同的順序到達。因此,Broadcast state 更新不能依賴傳入事件的順序。
  • 所有任務都會把 Broadcast state 存入 checkpoint,而不僅僅是其中一,雖然 checkpoint 發(fā)生時所有任務在具有相同的 broadcast state。這是為了避免在恢復期間所有任務從同一文件中進行恢復(避免熱點),然而代價是做 state checkpoint 的大小增加了并行度數量的倍數。
  • Flink 確保在恢復或改變并行度時不會有重復數據,也不會丟失數據。在具有相同或更小并行度的恢復的情況下,每個任務讀取其狀態(tài)檢查點。在并行度放大時,每個任務都會讀取自己的狀態(tài),多余的任務以循環(huán)方式讀取前面任務的檢查點。
  • 不支持 RocksDB state backend,broadcast state 在運行時保存在內存中。

Reference:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/broadcast_state.html
https://flink.xskoo.com/dev/stream/state/state.html

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容