Flink-Streaming-State & Fault Tolerance-The Broadcast State Pattern

上篇講了operator state在恢復(fù)(重啟/故障恢復(fù))時要么使用evenly distributed策略,要么使用union策略,來重啟operator的并發(fā)實例。
operator state支持的第三種類型是 Broadcast State。引入該類型state是為了支持一個流中的一些數(shù)據(jù)需要廣播到所有流中的場景,這些數(shù)據(jù)會被存儲在本地,并應(yīng)用在另一些流的所有數(shù)據(jù)上以便進(jìn)行處理。例如一個很自然的例子,我們有一個包含一系列規(guī)則的很緩慢的流,我們想要將這些規(guī)則應(yīng)用到其他流的所有數(shù)據(jù)上。把這個例子記在腦子里,broadcast state與其他operator state不同點在于:

  • 它有一個map結(jié)構(gòu)
  • 它僅能在某些特定的operator中使用,operator需要既可以將 broadcasted stream作為輸入,也可以將 non-broadcasted 流作為輸入
  • 這些特定的操作符可以有多個不同的 broadcast state,每個state都有自己的名稱。

Provided APIs


在展示完整功能前,我們先從一個示例開始講解flink提供的api。我們假設(shè)我們的數(shù)據(jù)流數(shù)據(jù)含有color與shape兩個屬性。我們想要找到一些特性模式的數(shù)據(jù)對,如:color屬性相同,且shape符合某些規(guī)則,如先出現(xiàn)矩形再出現(xiàn)三角形。我們假設(shè)這些規(guī)則是隨時間變化的。
在這個示例中,第一個流包含 Item類型的數(shù)據(jù),他有 Color 與 Shape兩個屬性。另一個流包含Rule類型數(shù)據(jù)。
我們從包含Item的流入手,我們需要將其根據(jù)Color進(jìn)行key操作,因為我們要找的數(shù)據(jù)對的color都是一樣的。這樣做之后,會保證具有相同color的數(shù)據(jù)會分配到相同的物理機(jī)器上。

// key the shapes by color
KeyedStream<Item, Color> colorPartitionedStream = shapeStream
                        .keyBy(new KeySelector<Shape, Color>(){...});

再來看 Rule 數(shù)據(jù)流,包含Rule的數(shù)據(jù)流需要被廣播broadcast到所有的下游任務(wù)中去,并且這些任務(wù)需要將其存儲到本地,這樣就可以使用本地數(shù)據(jù)與Item數(shù)據(jù)做計算
下面的小段程序會:1)廣播rule數(shù)據(jù)流 2)使用提供的MapStateDescriptor 來創(chuàng)建rule需要存儲到的broadcast state對象。

// a map descriptor to store the name of the rule (string) and the rule itself.
MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(
            "RulesBroadcastState",
            BasicTypeInfo.STRING_TYPE_INFO,
            TypeInformation.of(new TypeHint<Rule>() {}));
        
// broadcast the rules and create the broadcast state
BroadcastStream<Rule> ruleBroadcastStream = ruleStream
                        .broadcast(ruleStateDescriptor);

最后,為了在Item流上的每個Item中都應(yīng)用Rule進(jìn)行計算,我們需要:
1.連接這兩個流
2.定義我們模式匹配的邏輯

可以使用 connect() 方法來連接一個(keyed/non-keyed)流與 Broadcast 流。在非 broadcast流上調(diào)用connect() 方法,broadcast流作為參數(shù)傳入。這個方法會返回 BroadcastConnectedStream 類,在這個類上,我們可以調(diào)用 process() 方法傳入 CoProcessFunction的實現(xiàn)類。我們可以在這個函數(shù)內(nèi)實現(xiàn)我們的邏輯。函數(shù)的類型取決于non-broadcast 流的情況:

  • 如果它是 keyed 流,那么函數(shù)類型就是 KeyedBroadcastProcessFunction
  • 如果它是non-keyed 流,那么函數(shù)類型是 BroadcastProcessFunction

假設(shè),我們的non-broadcast流是 non-keyed 類型。
注意:connect方法需要由 non-broadcast 流來調(diào)用,broadcast流作為參數(shù)

DataStream<String> output = colorPartitionedStream
                 .connect(ruleBroadcastStream)
                 .process(
                     
                     // type arguments in our KeyedBroadcastProcessFunction represent: 
                     //   1. the key of the keyed stream
                     //   2. the type of elements in the non-broadcast side
                     //   3. the type of elements in the broadcast side
                     //   4. the type of the result, here a string
                     
                     new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
                         // my matching logic
                     }
                 );
BroadcastProcessFunction 與 KeyedBroadcastProcessFunction

在 CoProcessFunction 接口中,有兩個處理方法需要實現(xiàn):processBroadcastElement() 方法,處理broadcast 流中的數(shù)據(jù);processElement() 方法處理 non-broadcast流的數(shù)據(jù)。兩個方法的詳細(xì)方法簽名如下:

public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {

    public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;

    public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
}
public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {

    public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception;

    public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception;

    public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception;
}

首先需要注意的是:兩個接口都需要實現(xiàn) processBroadcastElement() 方法來處理 broadcast流中的數(shù)據(jù),以及 processElement() 方法來處理 non-broadcast流中的數(shù)據(jù)。
兩個方法的區(qū)別在于他們?nèi)雲(yún)⒌腸ontext 的不同。處理non-broadcast 的方法的入?yún)?ReadOnlyContext,處理 broadcast 的方法的入?yún)?Context。
兩個context都有如下特點

  • 獲取 broadcast.state 的能力:ctx.getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)
  • 允許查詢元素的時間戳:ctx.timestamp(
  • 獲取當(dāng)前的watermark:ctx.currentWatermark()
  • 獲取當(dāng)前processing time:ctx.currentProcessingTime()
  • 發(fā)射數(shù)據(jù)到 side-output:ctx.output(OutputTag<X> outputTag, X value)

getBroadcastState()方法中的 stateDescriptor 需要與 .broadcast(ruleStateDescriptor) 中的相同。
兩個context的不同點在于兩者對 broadcast state 的訪問級別。 處理broadcast 流的方法的context對broadcast state 是讀寫權(quán)限,而處理 non-broadcast流的方法的context對broadcast state是只讀權(quán)限。這樣做的原因是:在Flink中,是沒有跨任務(wù)數(shù)據(jù)交換(no cross-task communication)的。因此,為了保證在操作符的所有并發(fā)實例中的broadcast state都是相同的,我們僅給broadcast流讀寫權(quán)限,這樣所有任務(wù)中的broadcast的數(shù)據(jù)都會相同,我們也能保證應(yīng)用于non-broadcast數(shù)據(jù)上的計算在所有task中都是相同的。不這么做,就無法達(dá)到一致性的保證,導(dǎo)致不一致且很難調(diào)試的結(jié)果。
注意:processBroadcast() 方法中實現(xiàn)的邏輯也需要在所有并發(fā)操作實例中保持一致性。

最終,由于 KeyedBroadcastProcessFunction 是在 keyed 流上進(jìn)行操作,它提供了 BroadcastProcessFunction 沒有的一些功能:

  1. processElement()中的 ReadOnlyContext可以訪問Flink提供的 timer server服務(wù),它允許注冊一個 event time/processing time的時間回調(diào)函數(shù)。當(dāng)觸發(fā)回調(diào)時,會調(diào)用onTime()方法,該方法的 OnTimeContext 參數(shù)包含了 ReadOnlyContext的所有功能,再加上:
  • 判斷該timer是基于event time還是processing time的
  • 查詢timer關(guān)聯(lián)的key

2.processBroadcastElement()方法中的 Context 有 applyToKeyedState(StateDescriptor<S, VS> stateDescriptor, KeyedStateFunction<KS, S> function) 方法。該方法可以注冊一個
KeyedStateFunction 應(yīng)用于stateDescriptor所指代的state,所有key上的符合要求的state都會應(yīng)用此functino。

注意:只可以在 KeyedBroadcastProcessFunction 中的 processElement() 中注冊timer。不可以在 processBroadcastElement()中注冊,因為這個方法中處理的是broadcast element,沒有關(guān)聯(lián)的key。

回到我們一開始的例子,我們的 KeyedBroadcastProcessFunction 可以這樣寫:

new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {

    // store partial matches, i.e. first elements of the pair waiting for their second element
    // we keep a list as we may have many first elements waiting
    private final MapStateDescriptor<String, List<Item>> mapStateDesc =
        new MapStateDescriptor<>(
            "items",
            BasicTypeInfo.STRING_TYPE_INFO,
            new ListTypeInfo<>(Item.class));

    // identical to our ruleStateDescriptor above
    private final MapStateDescriptor<String, Rule> ruleStateDescriptor = 
        new MapStateDescriptor<>(
            "RulesBroadcastState",
            BasicTypeInfo.STRING_TYPE_INFO,
            TypeInformation.of(new TypeHint<Rule>() {}));

    @Override
    public void processBroadcastElement(Rule value,
                                        Context ctx,
                                        Collector<String> out) throws Exception {
        ctx.getBroadcastState(ruleStateDescriptor).put(value.name, value);
    }

    @Override
    public void processElement(Item value,
                               ReadOnlyContext ctx,
                               Collector<String> out) throws Exception {

        final MapState<String, List<Item>> state = getRuntimeContext().getMapState(mapStateDesc);
        final Shape shape = value.getShape();
    
        for (Map.Entry<String, Rule> entry :
                ctx.getBroadcastState(ruleStateDescriptor).immutableEntries()) {
            final String ruleName = entry.getKey();
            final Rule rule = entry.getValue();
    
            List<Item> stored = state.get(ruleName);
            if (stored == null) {
                stored = new ArrayList<>();
            }
    
            if (shape == rule.second && !stored.isEmpty()) {
                for (Item i : stored) {
                    out.collect("MATCH: " + i + " - " + value);
                }
                stored.clear();
            }
    
            // there is no else{} to cover if rule.first == rule.second
            if (shape.equals(rule.first)) {
                stored.add(value);
            }
    
            if (stored.isEmpty()) {
                state.remove(ruleName);
            } else {
                state.put(ruleName, stored);
            }
        }
    }
}

重要考量


介紹完提供的API后,接下來的這部分用于提醒你,在使用broadcast state時,需要記住這些重要的事情:

  • 沒有跨任務(wù)(實例)間的數(shù)據(jù)交換(no cross-task communication):之前已經(jīng)提過沒什么在 (Keyed)-BroadcastProcessFunction 中僅有處理 broadcast數(shù)據(jù)的方法可以修改 broadcast state 的內(nèi)容。除此之外,使用者需要保證所有并發(fā)實例中修改broadcast state時的邏輯都是一樣的。否則,不同的實例就會有不同的state內(nèi)容,導(dǎo)致計算結(jié)果的不一致。
  • 不同任務(wù)(實例)中broadcast state的內(nèi)容的順序可能會不一致:盡管廣播一個流的數(shù)據(jù)會保證所有數(shù)據(jù)將會(最終)到達(dá)所有下游任務(wù),但是數(shù)據(jù)到達(dá)每一個任務(wù)的順序可能不一樣。因此,state更新時,絕對不可以根據(jù)數(shù)據(jù)的順序來更新(而是根據(jù)數(shù)據(jù)的屬性)
  • 所有的任務(wù)都會checkpoint它們的 broadcast state:盡管所有的任務(wù)(實例)的broadcast state都保存著相同的數(shù)據(jù),但是在checkpoint時,所有的任務(wù)(實例)都會對它們所維持的broadcast state進(jìn)行備份,而不是只選擇其中的一個實例的broadcast state進(jìn)行備份。這樣的設(shè)計是為了避免在故障恢復(fù)時,所有的任務(wù)都讀取同一個文件,盡管這樣做會帶來增加state快照的消耗問題,這個消耗與并發(fā)度p相關(guān)。Flink會保證,在重啟/伸縮 應(yīng)用時,沒有副本并且不會丟失數(shù)據(jù)。為了以相同的并發(fā)度或更少的并發(fā)度來重啟,每個task都會讀取它自己狀態(tài)快照。當(dāng)以更多的并發(fā)度來重啟時,每一個task讀取自己的狀態(tài)快照,剩下的任務(wù)(p_new - p_old)以輪詢的方式依次讀取舊任務(wù)的checkpoint。
  • No RocksDB state backend:broadcast state會保存在內(nèi)存中,需要完成相應(yīng)的內(nèi)存配置。這適用于所有的 operator state。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容