Flink State 和 Fault Tolerance(一)

Overview

有狀態(tài)函數(shù)(Stateful function)和算子(Operator)在處理獨立的數(shù)據(jù)或事件時存儲數(shù)據(jù),使得狀態(tài)(State)成為任何復(fù)雜算子中的關(guān)鍵部分,例如:

  • 當(dāng)應(yīng)用檢索特定的事件模式,State 將存儲到接收到的事件的序列
  • 當(dāng)按照分鐘/小時/天的唯獨聚合事件時,狀態(tài)將保留待處理的聚合狀態(tài)
  • 當(dāng)在流數(shù)據(jù)上進(jìn)行機(jī)器學(xué)習(xí)的模型訓(xùn)練時,狀態(tài)保存模型當(dāng)前版本的參數(shù)
  • 當(dāng)需要管理歷史數(shù)據(jù)時,狀態(tài)允許有效地訪問過去發(fā)生的事件

Flink 需要知道 State,以便使程序的 checkpoint 和 savepoint 可用。

有關(guān)狀態(tài)的知識還允許重新縮放Flink應(yīng)用程序,這意味著Flink負(fù)責(zé)在并行實例之間重新分配狀態(tài)。

Work with State

State 分類

Flink 中的狀態(tài)分為兩種:Keyed state 和 Operator state

Operator State

每個 Operator state 都綁定到一個并行的算子的實例上??梢詤⒖?Kafka connector 的例子,Kafka 消費者的每個并行實例都維護(hù)一個 topic 分區(qū)和 offset 的對應(yīng)關(guān)系作為 Operator state。Operator state 支持當(dāng)并行度更改時,在并行的算子實例間重新分配 State,進(jìn)行這種重新分配有多種不同的方案(后面會介紹)。

Operator state 在 Python DataStream API 中還不支持。

Keyed State

Keyed state 與 key 強相關(guān),只能在 KeyedStream 上應(yīng)用的函數(shù)和算子內(nèi)使用。

可以將 Keyed state 認(rèn)為是分區(qū)后的 Operator state,每個 key 有一個 State 的分區(qū)。邏輯上,每個 Keyed state 綁定唯一的 <parallel-operator-instance, key>(算子并行實例和 key 的一對元組),可以將其簡單地視為 <operator, key>(因為每個 key 屬于算子的唯一一個并行實例)。

Keyed state 進(jìn)一步被組織為 Key groups。Key groups 是 Flink 重新分配 Keyed state 的原子單位,Key groups 的數(shù)量與最大并行度相同。在程序執(zhí)行期間 keyed operator 的每個并行實例都使用一個或多個 Key groups 的 keys。

Broadcast State

Broadcast State 是一種特殊的 Operator State。Broadcast State 是 Flink 支持的另一種擴(kuò)展方式。用來支持將某一個流的數(shù)據(jù)廣播到所有下游任務(wù),數(shù)據(jù)被存儲在本地,接受到廣播的流在操作時可以使用這些數(shù)據(jù)。

Broadcast state 的特點是:

  • 使用 Map 類型的數(shù)據(jù)結(jié)構(gòu)
  • 僅適用于同時具有廣播流和非廣播流作為數(shù)據(jù)輸入的特定算子
  • 可以具有多個不同名稱的 Broadcast state

Broadcast state 在 Python DataStream API 中還不支持。

Raw State 和 Managed State

State 有兩種狀態(tài):托管狀態(tài)(managed)和原生狀態(tài)(raw)。

托管狀態(tài)(managed state) 由 Flink runtime 管理的數(shù)據(jù)結(jié)構(gòu)表示,例如內(nèi)部哈希表或 RocksDB。Flink runtime 對 State 進(jìn)行編碼并寫入 checkpoint。

原生狀態(tài)(raw state) 是在算子內(nèi)部的數(shù)據(jù)結(jié)構(gòu)中的保存。Checkpoint 只會保存 State 內(nèi)容的字節(jié)序列,State 的真實數(shù)據(jù)結(jié)構(gòu)對 Flink 是透明的。

所有數(shù)據(jù)流函數(shù)都可以使用托管狀態(tài)(managed state),而原生狀態(tài)(raw state)只能在具體實現(xiàn)算子時使用。建議使用托管狀態(tài),因為在托管狀態(tài)下,F(xiàn)link 能夠在并行度改變時自適應(yīng)地重新分配 State,并且在內(nèi)存管理方面可以做的更好。

Keyed DataStream

如果要使用 keyed state,首先需要在 DataStream 上指定 key(可以使用方法 keyBy(KeySelector) ),用于對狀態(tài)以及流中的記錄進(jìn)行分區(qū),這將產(chǎn)生 KeyedStream,可以使用 Keyed state。

KeySelector 函數(shù)將單個記錄作為輸入并返回該記錄的 key 值,可以是任何類型,并且必須從確定性計算中派生。

Flink 的數(shù)據(jù)模型不是基于 key-value 的。因此,不需要將數(shù)據(jù)集類型物理地加入到 key 和 value 中。key 被定義用于指導(dǎo)分組算子。

public class WC {
  public String word;
  // ...

  public String getWord() { return word; }
}

DataStream<WC> words = ...
KeyedStream<WC> keyed = words.keyBy(WC::getWord);

Tuple Keys and Expression Keys

Flink 還有兩種定義 key 的方法:Java/scala api 中的 tuple keys 和 expression keys(python api中仍然不支持)。

可以使用元組(Tuple)字段索引或表達(dá)式來選擇對象的字段來指定 key。現(xiàn)在不推薦使用這些,使用 KeySelector 函數(shù)是絕對優(yōu)越的:使用 lambda,易于使用,在運行時的開銷可能更小。

Keyed State 和 Operator State

Use Keyed State

Keys state 提供多種不同類型 State,作用域都是當(dāng)前輸入數(shù)據(jù)的鍵,只能用于 KeyedStream,可以通過 stream.keyBy(…) 創(chuàng)建。

首先看有哪些不同類型的狀態(tài),以及如何在程序中使用:

  • ValueState<T>:保存了一個值,可以更新和讀?。ㄋ阕硬僮鞯拿總€ key 可能有一個 value)

    update(T) 更新
    T value() 取值

  • ListState<T>:保存了一個列表,可以追加元素,可以獲取到一個包含所有當(dāng)前存儲的元素的迭代器
    add(T)addAll(List<T>) 添加到列表
    Iterable<T> get() 獲取迭代器
    update(List<T>) 使用新的列表覆蓋現(xiàn)有列表

  • ReducingState<T>:保存一個值,表示添加到 State 的所有值的聚合結(jié)果。提供的接口類似于 ListState
    add(T) 函數(shù)會使用指定的函數(shù)(ReduceFunction)對添加的值進(jìn)行聚合

  • AggregatingState<IN, OUT>:保存一個值,表示添加到 State 的所有值的聚合結(jié)果。與 ReducingState 不同的是,聚合結(jié)果的數(shù)據(jù)類型可以與添加到 State 的元素的數(shù)據(jù)類型不同。接口同樣類似于 ListState
    add(IN) 函數(shù)會使用指定的函數(shù)(AggregateFunction)對添加的值進(jìn)行聚合

  • MapState<UK, UV>:保存一個 Map??梢詫?key/value 存入 State,也可以獲取到一個包含所有當(dāng)前存儲的元素的迭代器
    put(UK, UV)putAll(Map<UK, UV>) 添加 key/value 到 Map
    get(UK) 獲取與指定 key 的 value
    entries()、keys()values() 對 Map 的元素/鍵/值遍歷訪問

所有類型的 State 都有 clear() 方法來清除當(dāng)前狀態(tài)。

首先要記住這些 State 對象僅用于有狀態(tài)接口,State 不一定存儲在內(nèi)存,也可能存儲在磁盤或其他位置。其次要記住的是,從 State 獲得的值取決于輸入數(shù)據(jù)的 key,如果處理的 keys 不同,定義的函數(shù)的調(diào)用結(jié)果會不同。

要想操作 State 對象,首先必須創(chuàng)建一個 StateDescriptor,該對象擁有 State 名稱(可以創(chuàng)建多個 State,必須具有唯一的名稱來引用 State),State 所持有的值的類型,可能還有用戶指定的函數(shù)(例如 ReduceFunction)。對應(yīng)不同的 State 類型,有如下類對象:ValueStateDescriptor、ListStateDescriptorAggregatingStateDescriptor, ReducingStateDescriptorMapStateDescriptor。

然后使用 RuntimeContext 可以才訪問到 State,因此只能在 RichFunction 中使用,在 RichFunction 方法中 RuntimeContext 訪問各種類型 State 的方法:

  • ValueState<T> getState(ValueStateDescriptor<T>)
  • ReducingState<T> getReducingState(ReducingStateDescriptor<T>)
  • ListState<T> getListState(ListStateDescriptor<T>)
  • AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT>)
  • MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)

下面是在 FlatMapFunction 中的使用示例:

實現(xiàn)了一個簡單的計數(shù)窗口,通過輸入元組的第一個參數(shù)分組,在分組的流中,每接收到兩個元組,返回兩那個元組的第二個參數(shù)的平均值

class CountWindowAverage extends RichFlatMapFunction[(Long, Long), (Long, Long)] {

    // 使用 ValueState 保存元素求和值
    // 元組第一個參數(shù)為求和個數(shù) count,第二個參數(shù)為求和值 sum
    private var sum: ValueState[(Long, Long)] = _

    override def flatMap(input: (Long, Long), out: Collector[(Long, Long)]): Unit = {
        // 訪問 State 的值
        val tmpCurrentSum = sum.value
        
        // 初始值 (0, 0)
        val currentSum = if (tmpCurrentSum != null) {
            tmpCurrentSum
        } else {
            (0L, 0L)
        }

        // 求和并更新 State
        val newSum = (currentSum._1 + 1, currentSum._2 + input._2)
        sum.update(newSum)

        // 如果 state value 達(dá)到 2, 發(fā)送統(tǒng)計的平均值并清空 state
        if (newSum._1 >= 2) {
            out.collect((input._1, newSum._2 / newSum._1))
            sum.clear()
        }
    }

    override def open(parameters: Configuration): Unit = {
    
        // 通過 RuntimeContext 和 ValueStateDescriptor 獲取 ValueState
        sum = getRuntimeContext.getState(
            new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long, Long)])
        )
    }
}

object ExampleCountWindowAverage extends App {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // create keyed stream and call CountWindowAverage
    env.fromCollection(List(
        (1L, 3L), (1L, 5L), (1L, 7L), (1L, 4L), (1L, 2L)
    ))
        .keyBy(_._1)
        .flatMap(new CountWindowAverage())
        .print()
        
    // 輸出 (1,4) (1,5)
    env.execute("ExampleManagedState")
}

狀態(tài)生存周期 (TTL)

狀態(tài)生存周期(TTL)可以被指定給任何類型的 Keyed state,如果配置了 TTL 并且 State value 已過期,State value 會被清除。所有集合類型 State(ListState 和 MapState)都支持為每個條目設(shè)置 TTL。

為了啟用 State TTL,首先需要構(gòu)建 StateTtlConfig 對象,然后通過在 ValueStateDescriptor(其他類型同理)構(gòu)造中傳入該對象來啟用 TTL,參考下面的例子:

// Time.seconds(1) 生存時間,必填項
// 
// StateTtlConfig.UpdateType 更新類型
//   - UpdateType.OnCreateAndWrite - 創(chuàng)建和寫入時更新(默認(rèn))
//   - UpdateType.OnReadAndWrite - 讀取和寫入時更新
// 
// StateTtlConfig.StateVisibility 狀態(tài)可見性,訪問時是否返回已經(jīng)過期的值
//   - StateVisibility.NeverReturnExpired - 永遠(yuǎn)不會返回過期的值(默認(rèn)),對于不能訪問過期數(shù)據(jù)的場景有用
//   - StateVisibility.ReturnExpiredIfNotCleanedUp -如果可以讀到會返回
// 
val ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) 
    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
    .build
// 構(gòu)建 ValueStateDescriptor
val stateDescriptor = new ValueStateDescriptor[String]("text state", classOf[String])
// 啟用 TTL
stateDescriptor.enableTimeToLive(ttlConfig)

補充:

  • 狀態(tài)上次的修改時間會和數(shù)據(jù)一起保存在 state backend 中,因此開啟 TTL 特性會增加狀態(tài)數(shù)據(jù)的存儲。
  • 目前TTL僅支持處理時間(processing time)。
  • 如果之前沒有配置TTL,而狀態(tài)恢復(fù)時啟用TTL(相反的情況同樣),會引起兼容性錯誤和 StateMigrationException 異常。
  • TTL配置不是 checkpoint 或 savepoint 的一部分,而是在 Flink 運行時做處理。
  • 對于 Map 類型,只有序列化方法支持空值時,TTL的設(shè)置才支持空值,否則需要使用 NullableSerializer 進(jìn)行封裝(序列化會占用額外的字節(jié))。
  • State TTL 當(dāng)前在 PyFlink DataStream API 中還不支持。

過期數(shù)據(jù)清理

默認(rèn)情況下,過期值在讀取時被顯式刪除(如調(diào)用 ValueState.value()),如果配置的 state 后端支持,則定期在后臺進(jìn)行垃圾收集??梢栽?StateTtlConfig 中禁用后臺清理:

某些老版本,如果未讀取過期狀態(tài)的數(shù)據(jù),則不會將其刪除,這可能會導(dǎo)致狀態(tài)不斷增長

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .disableCleanupInBackground()
    .build();

為了對后臺的某些特殊清理進(jìn)行更細(xì)粒度的控制,可以按如下所述單獨配置。目前,heap state backend 依賴于增量清理,RocksD backend 使用壓縮過濾器進(jìn)行后臺清理。

全量快照時進(jìn)行清理

可以指定在獲取完整的 State 快照時激活清理方法,以減小快照的大小。本地狀態(tài)在當(dāng)前實現(xiàn)下未被清除,但當(dāng)從上一個快照恢復(fù)時,不會包括已刪除的過期 State。

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupFullSnapshot()
    .build();

注意:

  • 這種策略在 RocksDBStateBackend 的增量 checkpoint 模式下無效。
  • 這種清理方式可以在任何時候通過 StateTtlConfig 啟用或者關(guān)閉,比如在從 Savepoint 恢復(fù)時。
增量數(shù)據(jù)清理

可以選擇增量方式清理狀態(tài)數(shù)據(jù),在狀態(tài)訪問或處理時進(jìn)行。如果某個狀態(tài)開啟了該清理策略,則會在存儲后端保留一個所有狀態(tài)的惰性全局迭代器。 每次觸發(fā)增量清理時,從迭代器中選擇已經(jīng)過期的數(shù)進(jìn)行清理。

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupIncrementally(10, true)
    .build();

該策略有兩個參數(shù)。 第一個是每次清理時檢查狀態(tài)的條目數(shù),在每個狀態(tài)訪問時觸發(fā)。第二個參數(shù)表示是否在處理每條記錄時觸發(fā)清理。 Heap backend 默認(rèn)會檢查 5 條狀態(tài),并且關(guān)閉在每條記錄時觸發(fā)清理。

注意:

  • 如果沒有 state 訪問,也沒有處理數(shù)據(jù),則不會清理過期數(shù)據(jù)。
  • 增量清理會增加數(shù)據(jù)處理的耗時。
  • 現(xiàn)在僅 Heap state backend 支持增量清除機(jī)制。在 RocksDB state backend 上啟用該特性無效。
  • 如果 Heap state backend 使用同步快照方式,則會保存一份所有 key 的拷貝,從而防止并發(fā)修改問題,因此會增加內(nèi)存的使用。但異步快照則沒有這個問題。
  • 對已有的作業(yè),這個清理方式可以在任何時候通過 StateTtlConfig 啟用或禁用該特性,比如從 savepoint 重啟后。
在 RocksDB 壓縮時清理

如果使用 RocksDB state backend,則會啟用 Flink 為 RocksDB 定制的壓縮過濾器。RocksDB 會周期性的對數(shù)據(jù)進(jìn)行合并壓縮從而減少存儲空間。 Flink 提供的 RocksDB 壓縮過濾器會在壓縮時過濾掉已經(jīng)過期的狀態(tài)數(shù)據(jù)。

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupInRocksdbCompactFilter(1000)
    .build();

Flink 處理一定條數(shù)的狀態(tài)數(shù)據(jù)后,會使用當(dāng)前時間戳來檢測 RocksDB 中的狀態(tài)是否已經(jīng)過期,可以通過 StateTtlConfig.newBuilder(...).cleanupInRocksdbCompactFilter(long queryTimeAfterNumEntries) 方法指定處理狀態(tài)的條數(shù)。

時間戳更新的越頻繁,狀態(tài)的清理越及時,但由于壓縮會有調(diào)用 JNI 的開銷,因此會影響整體的壓縮性能。RocksDB backend 的默認(rèn)后臺清理策略會每處理 1000 條數(shù)據(jù)進(jìn)行一次。

可以通過配置開啟 RocksDB 過濾器的 debug 日志查看清理操作: log4j.logger.org.rocksdb.FlinkCompactionFilter=DEBUG

注意:

  • 壓縮時調(diào)用 TTL 過濾器會降低速度。TTL 過濾器需要解析上次訪問的時間戳,并對每個將參與壓縮的狀態(tài)進(jìn)行是否過期檢查。對于集合型狀態(tài)類型(比如 list 和 map),會對集合中每個元素進(jìn)行檢查。
  • 對于元素序列化后長度不固定的列表狀態(tài),TTL 過濾器需要在每次 JNI 調(diào)用過程中,額外調(diào)用 Flink 的 Java 序列化器,從而確定下一個未過期數(shù)據(jù)的位置。
  • 對已有的作業(yè),這個清理方式可以在任何時候通過 StateTtlConfig 啟用或禁用該特性,比如從 savepoint 重啟后。

Use Operator State

使用托管的 Operator State,有狀態(tài)函數(shù)需要實現(xiàn) CheckpointedFunction 接口。

CheckpointedFunction

CheckpointedFunction 接口提供了訪問 non-keyed state 的方法,需要實現(xiàn)如下兩個方法:

// 執(zhí)行 Checkpoint 時調(diào)用
void snapshotState(FunctionSnapshotContext context) throws Exception;

// 初始化函數(shù)時調(diào)用
void initializeState(FunctionInitializationContext context) throws Exception;

進(jìn)行 checkpoint 時會調(diào)用 snapshotState()。 用戶自定義函數(shù)初始化時會調(diào)用 initializeState(),初始化包括第一次自定義函數(shù)初始化和從之前的 checkpoint 恢復(fù)。 因此 initializeState() 不僅是定義不同狀態(tài)類型初始化的地方,也需要包括狀態(tài)恢復(fù)的邏輯。

Operator state 的數(shù)據(jù)結(jié)構(gòu)不像 Keyed state 豐富,只支持 List,可以認(rèn)為是可序列化對象的列表,彼此獨立。這些對象在動態(tài)擴(kuò)展時是可以重新分配 non-keyed state 的最小單元。目前支持幾種動態(tài)擴(kuò)展方式:

  • Even-split redistribution:算子并發(fā)度發(fā)生改變的時候,并發(fā)的每個實例取出 State 列表,合并到一個新的列表上,形成邏輯上完整的 State。然后根據(jù)列表元素的個數(shù),均勻分配給新的并發(fā)實例(Task)。例如,如果并行度為1,算子的 State checkpoint 包含數(shù)據(jù)元 element1 和 element2,當(dāng)并行度增加到2時,element1 會在 算子實例0中,而element2在算子實例1中。
  • Union redistribution:相比于平均分配更加靈活,把完整 State 劃分的方式交給用戶去做。并發(fā)度發(fā)生改變的時候,按同樣的方式取到完整的 State 列表,然后直接交給每個實例。

下面的例子是有狀態(tài)的 SinkFunction,利用 CheckpointedFunction 在將數(shù)據(jù)元發(fā)送之前進(jìn)行緩存,checkpoint 時緩存寫入 State,啟動時判斷是否使用 重發(fā)的 State 恢復(fù)緩存:

class BufferingSink(threshold: Int = 0) extends SinkFunction[(String, Int)]
    with CheckpointedFunction {

    @transient
    private var checkpointedState: ListState[(String, Int)] = _

    // 輸入數(shù)據(jù)緩存
    private val bufferedElements = ListBuffer[(String, Int)]()

    override def invoke(value: (String, Int)): Unit = {
        // 輸入數(shù)據(jù)存入 buffer
        bufferedElements += value
        
        // 當(dāng)緩存數(shù)量達(dá)到閾值,發(fā)送數(shù)據(jù),并清理 buffer
        if (bufferedElements.size == threshold) {
            for (element <- bufferedElements) {
                // send it to the sink
            }
            bufferedElements.clear()
        }
    }

    // 執(zhí)行 Checkpoint 時調(diào)用
    // 清除前一個檢查點包含的所有對象
    // 緩存數(shù)據(jù)添加到 State
    override def snapshotState(context: FunctionSnapshotContext): Unit = {
        checkpointedState.clear()
        for (element <- bufferedElements) {
            checkpointedState.add(element)
        }
    }

    // 用于首次初始化或從 checkpoint 恢復(fù)
    override def initializeState(context: FunctionInitializationContext): Unit = {
        
        val descriptor = new ListStateDescriptor[(String, Int)](
            "buffered-elements",
            TypeInformation.of(new TypeHint[(String, Int)]() {})
        )

        // 使用 ctx 和 ListStateDescriptor 訪問 ListState
        // 注意調(diào)用方法 getListState(descriptor)
        checkpointedState = context.getOperatorStateStore.getListState(descriptor)

        // 根據(jù) isRestored() 方法來檢查當(dāng)前是否是 checkpoint 恢復(fù)的情況
        if(context.isRestored) {
            // 如果 true,表示是恢復(fù)失敗的情況,應(yīng)用恢復(fù)數(shù)據(jù)的邏輯:
            //   恢復(fù)數(shù)據(jù)添加到 buffer 中
            for(element <- checkpointedState.get()) {
                bufferedElements += element
            }
        }
    }

}

initializeState 方法接收一個 FunctionInitializationContext 參數(shù),會用來初始化 non-keyed state 的 “容器”。這些容器是一個 ListState 用于在 checkpoint 時保存 non-keyed state 對象。

注意這些狀態(tài)是如何初始化的,和 keyed state 類似,StateDescriptor 會包括狀態(tài)名字、以及狀態(tài)類型相關(guān)信息。

ListStateDescriptor<Tuple2<String, Integer>> descriptor =
    new ListStateDescriptor<>(
        "buffered-elements",
        TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));

checkpointedState = context.getOperatorStateStore().getListState(descriptor);

調(diào)用不同的獲取狀態(tài)對象的接口,會使用不同的狀態(tài)分配算法。比如 getUnionListState(descriptor) 會使用 union redistribution 算法, 而 getListState(descriptor) 則簡單的使用 even-split redistribution 算法。

當(dāng)初始化好狀態(tài)對象后,我們通過 isRestored() 方法判斷是否從之前的故障中恢復(fù)回來,如果該方法返回 true 則表示從故障中進(jìn)行恢復(fù),會執(zhí)行接下來的恢復(fù)邏輯。

正如代碼所示,BufferingSink 中初始化時,恢復(fù)回來的 ListState 的所有元素會添加到一個局部變量中,供下次 snapshotState() 時使用。 然后清空 ListState,再把當(dāng)前局部變量中的所有元素寫入到 checkpoint 中。

另外,我們同樣可以在 initializeState() 方法中使用 FunctionInitializationContext 初始化 keyed state。

有狀態(tài)的 Source Functions

帶狀態(tài)的數(shù)據(jù)源比其他的算子需要注意更多東西。為了保證更新狀態(tài)以及輸出的原子性(用于支持 exactly-once 語義),用戶需要在發(fā)送數(shù)據(jù)前獲取數(shù)據(jù)源的全局鎖。

class CounterSource extends RichParallelSourceFunction[Long]
       with ListCheckpointed[Long] {

    @volatile
    private var isRunning = true

    private var offset = 0L

    override def run(ctx: SourceFunction.SourceContext[Long]): Unit = {
        
        // 獲取鎖
        val lock = ctx.getCheckpointLock

        while (isRunning) {
            // 輸出和更新在一個原子操作中
            lock.synchronized({
                ctx.collect(offset)
                offset += 1
            })
        }
    }

    override def cancel(): Unit = isRunning = false

    override def restoreState(state: util.List[Long]): Unit =
        for (s <- state) {
            offset = s
        }

    override def snapshotState(checkpointId: Long, timestamp: Long): util.List[Long] =
        Collections.singletonList(offset)
}

Broadcast State Pattern

Broadcast State API

下面是一個例子來展示 Broadcast State API 的使用。在這個示例中,要處理的數(shù)據(jù)流中是有不同顏色(Color)和形狀(Shape)屬性的對象,希望在流中找到一對具有相同顏色的,并且遵循一個特定的形狀模式的對象組(例如,一個紅色長方形后面緊跟著一個紅色三角形的組合)。 同時希望尋找的模式也會隨著時間而改變。

第一個數(shù)據(jù)流是要處理的數(shù)據(jù)源,流中的對象是 Item,具有 ColorShape 屬性

// 使用 Color 作為鍵來分組,保證相同 Color 的數(shù)據(jù)進(jìn)入到同一個子任務(wù)中
KeyedStream<Item, Color> colorPartitionedStream = shapeStream
                        .keyBy(new KeySelector<Shape, Color>(){...});

第二個數(shù)據(jù)流是要廣播的數(shù)據(jù),流中的對象是匹配規(guī)則(Rules)

// MapState 中保存 (RuleName,Rule) ,在描述類中指定 State name
MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(
                "RulesBroadcastState",
                    BasicTypeInfo.STRING_TYPE_INFO,
                    TypeInformation.of(new TypeHint<Rule>() {}));
        
// ruleStream 使用 MapStateDescriptor 作為參數(shù)廣播,得到廣播流
BroadcastStream<Rule> ruleBroadcastStream = ruleStream
                        .broadcast(ruleStateDescriptor);

然后需要做的是,連接兩個流并且指定兩個連接后的處理邏輯(使用廣播的 Rules 解析流中匹配的數(shù)據(jù)組,并返回)

DataStream<Match> output = colorPartitionedStream
                .connect(ruleBroadcastStream)
                .process(
                                    /**
                                     * 各個參數(shù)含義:   
                                         * Color,非廣播流 keyed stream 的鍵的類型 
                                         * Item,非廣播流的對象類型
                                         * Rule,廣播流的對象類型  
                                         * String,返回結(jié)果的類型
                                     **/
                    new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
                        // matching logic
                    }
                )

如何連接兩個流:非廣播流調(diào)用 connect(BroadcastStream) 方法用來連接廣播流和非廣播流,BroadcastStream 作為參數(shù),返回一個 BroadcastConnectedStream 對象。BroadcastConnectedStream 調(diào)用 process() 方法執(zhí)行處理邏輯,需要指定一個邏輯實現(xiàn)類作為參數(shù),而具體的需要實現(xiàn)的抽象類取決于非廣播流的類型:

  • 如果非廣播流是 keyed stream,需要實現(xiàn) KeyedBroadcastProcessFunction
  • 如果非廣播流是 non-keyed stream,需要實現(xiàn) BroadcastProcessFunction

BroadcastProcessFunction 和 KeyedBroadcastProcessFunction

這兩個抽象函數(shù)有兩個相同的需要實現(xiàn)的接口:

  • processBroadcastElement() 處理廣播流數(shù)據(jù)
  • processElement() 處理非廣播流數(shù)據(jù)

用于處理非廣播流是 non-keyed stream 的情況

public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {

    public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;

    public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
}

用于處理非廣播流是 keyed stream 的情況

public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {

    public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;

    public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;

    public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
}

可以看到這兩個接口提供的上下文對象有所不同。處理非廣播流(processElement())使用 ReadOnlyContext,處理廣播流(processBroadcastElement())使用 Context。

這兩個上下文對象(簡稱 ctx)提供的方法接口:

  1. 訪問 Broadcast state:ctx.getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)
    1. getBroadcastState() 方法中傳入的 stateDescriptor 應(yīng)該與調(diào)用 .broadcast(ruleStateDescriptor) 的參數(shù)相同。
  2. 查詢數(shù)據(jù)的時間戳:ctx.timestamp()
  3. 獲取當(dāng)前水?。?code>ctx.currentWatermark()
  4. 獲取當(dāng)前處理時間:ctx.currentProcessingTime()
  5. 向旁路輸出(side-outputs)發(fā)送數(shù)據(jù):ctx.output(OutputTag<X> outputTag, X value)

這兩個上下文對象不同之處在于對 Broadcast state 的訪問限制:處理廣播流元素,具有讀和寫的權(quán)限(read-write),處理非廣播流元素只有讀的權(quán)限(read-only)。

這么設(shè)計的原因是,保證 Broadcast state 在算子的所有并行實例中是相同的。由于 Flink 中沒有跨 Task 的通信機(jī)制,在一個任務(wù)實例中的修改不能在并行 Task 間傳遞。而廣播端在所有并行任務(wù)中都能看到相同的數(shù)據(jù)元素,只對廣播端提供可寫的權(quán)限。同時要求在廣播端的每個并行任務(wù)中,對接收數(shù)據(jù)的處理是相同的。如果忽略此規(guī)則會破壞 State 的一致性保證,從而導(dǎo)致不一致且難以診斷的結(jié)果。也就是說,processBroadcast() 的實現(xiàn)邏輯必須在所有并行實例中具有相同的確定性行為。

KeyedBroadcastProcessFunction 在 Keyed Stream 上工作,提供了一些 BroadcastProcessFunction 沒有的功能:

  1. processElement() 的參數(shù) ReadOnlyContext 提供了方法能夠訪問 Flink 的定時器服務(wù),可以注冊事件定時器(event-time timer)或者處理時間的定時器(processing-time timer)。當(dāng)定時器觸發(fā)時,會調(diào)用 onTimer() 方法, 提供了 OnTimerContext,具有 ReadOnlyContext 的全部功能,并且提供:
    • 查詢當(dāng)前觸發(fā)的是一個事件還是處理時間的定時器
    • 查詢定時器關(guān)聯(lián)的key
  2. processBroadcastElement() 方法中的參數(shù) Context 會提供方法 applyToKeyedState(StateDescriptor<S, VS> stateDescriptor, KeyedStateFunction<KS, S> function)。 這個方法使用一個 KeyedStateFunction 能夠?qū)?stateDescriptor 對應(yīng)的 state 中所有 key 的存儲狀態(tài)進(jìn)行某些操作。

注冊一個定時器只能在 KeyedBroadcastProcessFunctionprocessElement() 方法中進(jìn)行。在 processBroadcastElement() 方法中不能注冊定時器,因為廣播的元素中并沒有關(guān)聯(lián)的 key。

回到前面的例子,KeyedBroadcastProcessFunction 的實現(xiàn)可能看起來如下:

new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {

    // 存儲部分匹配的結(jié)果,即匹配了一個元素,正在等待第二個元素
    // 我們用一個數(shù)組來存儲,因為同時可能有很多第一個元素正在等待
    private final MapStateDescriptor<String, List<Item>> mapStateDesc =
        new MapStateDescriptor<>(
            "items",
            BasicTypeInfo.STRING_TYPE_INFO,
            new ListTypeInfo<>(Item.class));

    // 與之前的 ruleStateDescriptor 相同
    private final MapStateDescriptor<String, Rule> ruleStateDescriptor = 
        new MapStateDescriptor<>(
            "RulesBroadcastState",
            BasicTypeInfo.STRING_TYPE_INFO,
            TypeInformation.of(new TypeHint<Rule>() {}));

    @Override
    public void processBroadcastElement(Rule value,
                                        Context ctx,
                                        Collector<String> out) throws Exception {
        ctx.getBroadcastState(ruleStateDescriptor).put(value.name, value);
    }

    @Override
    public void processElement(Item value,
                               ReadOnlyContext ctx,
                               Collector<String> out) throws Exception {

        final MapState<String, List<Item>> state = getRuntimeContext().getMapState(mapStateDesc);
        final Shape shape = value.getShape();
    
        for (Map.Entry<String, Rule> entry :
                ctx.getBroadcastState(ruleStateDescriptor).immutableEntries()) {
            final String ruleName = entry.getKey();
            final Rule rule = entry.getValue();
    
            List<Item> stored = state.get(ruleName);
            if (stored == null) {
                stored = new ArrayList<>();
            }
    
            if (shape == rule.second && !stored.isEmpty()) {
                for (Item i : stored) {
                    out.collect("MATCH: " + i + " - " + value);
                }
                stored.clear();
            }
    
            // 不需要額外的 else{} 段來考慮 rule.first == rule.second 的情況
            if (shape.equals(rule.first)) {
                stored.add(value);
            }
    
            if (stored.isEmpty()) {
                state.remove(ruleName);
            } else {
                state.put(ruleName, stored);
            }
        }
    }
}

重要考慮因素

使用 Broadcast state 時要注意的是:

  • 沒有跨 Task 的通信,這就是為什么只有處理廣播流元素可以修改 Broadcast state 的原因。用戶需要保證所有 Task 對于 Broadcast state 的處理方式是一致的,否則會造成不同 Task 讀取 Broadcast state 時內(nèi)容不一致的情況,最終導(dǎo)致結(jié)果不一致。
  • 跨 Task 的 Broadcast state 中的事件順序可能不同,盡管廣播流的元素可以保證都將轉(zhuǎn)到所有下游 Task,但元素可能以不同的順序到達(dá)下游 Task。因此,Broadcast state 更新不能依賴傳入事件的順序。
  • 所有 Task 都會把 Broadcast state 存入 checkpoint,而不僅僅是其中一,雖然 checkpoint 發(fā)生時所有任務(wù)在具有相同的 broadcast state。這是為了避免在恢復(fù)期間所有任務(wù)從同一文件中進(jìn)行恢復(fù)(避免熱點),代價是 state checkpoint 的大小增加了并行度數(shù)量的倍數(shù)。Flink 會保證在恢復(fù)狀態(tài)/改變并發(fā)的時候數(shù)據(jù)沒有重復(fù)沒有缺失。 在作業(yè)恢復(fù)時,如果與之前具有相同或更小的并發(fā)度,所有的 Task 讀取之前已經(jīng)在 checkpoint 中的 state。在增大并發(fā)的情況下,多出來的并發(fā)會使用輪詢調(diào)度算法讀取之前 Task 的 state。
  • 不支持 RocksDB state backend,broadcast state 在運行時保存在內(nèi)存中,需要保證內(nèi)存充足。這一特性同樣適用于所有其他 Operator State。

主要引用:

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/state/

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/fault-tolerance/broadcast_state/

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容