Flink消費Kafka如何保證相同標(biāo)識消息的有序性

1.需求

在某些情況下,我們需要保證flink在消費kafka時,對于某些具有相同標(biāo)識的消息,要保證其順序性。
比如說具有相同uuid的用戶行為消息,要保證其消息的順序性,這樣才能有效分析其用戶行為。
問題:
kafka只能保證同一個partition內(nèi)的消息是順序性的,但是整個topic下并不能保證是順序的,那么該如何解決呢?

2.解決方案

<1> 在生產(chǎn)消息時,就將具有相同uuid的消息分配到同一個分區(qū)中。
擴展:kafka topic消息分配partition規(guī)則
源碼:

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
         List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
         int numPartitions = partitions.size();
         if (keyBytes == null) {
             int nextValue = counter.getAndIncrement();
             List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
             if (availablePartitions.size() > 0) {
                 int part = Utils.toPositive(nextValue) % availablePartitions.size();
                  return availablePartitions.get(part).partition();
             } else {
                 // no partitions are available, give a non-available partition
                 return Utils.toPositive(nextValue) % numPartitions;
             }
         } else {
             // hash the keyBytes to choose a partition
             return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
     }

通過源碼,分區(qū)器就會根據(jù)消息里面的分區(qū)參數(shù)key值將消息分到對應(yīng)的partition。
1)如果沒有指定key值并且可用分區(qū)個數(shù)大于0時,在就可用分區(qū)中做輪詢決定改消息分配到哪個partition
2)如果沒有指定key值并且沒有可用分區(qū)時,在所有分區(qū)中輪詢決定改消息分配到哪個partition
3)如果指定key值,對key做hash分配到指定的partition

相關(guān)實現(xiàn)方案鏈接:https://blog.csdn.net/justclimbing/article/details/79613900
實現(xiàn)方案1:自定義在flume攔截器中使用 kafka producer
實現(xiàn)方案2:使用kafka-sink

<2> 如果只是某一窗口內(nèi)的統(tǒng)計,可以針對窗口進行時間戳排序
繼承抽象類ProcessWindowFunction,
ProcessWindowFunction<IN, OUT, KEY, W extends Window>

該類一次性迭代整個窗口里的所有元素,比較重要的一個對象是Context,可以獲取到事件和狀態(tài)信息,這樣我們就可以實現(xiàn)更加靈活的控制,這實際上是process的主要特點吧。該算子會浪費很多性能吧,主要原因是不增量計算,要緩存整個窗口然后再去處理,所以要設(shè)計好內(nèi)存。

WindowedStream調(diào)用process方法,傳入ProcessWindowFunction參數(shù)。

public class EventSessionProcessFunction extends ProcessWindowFunction<EventBase, EventBase, String, TimeWindow> {

    @Override
    public void process(String key, Context context, Iterable<EventBase> elements, Collector<EventBase> out) throws Exception {
        List<EventBase> sortedEvents = Ordering.from((Comparator<EventBase>) (x, y) -> {
            if (x.getTimestamp() - y.getTimestamp() > 0) {
                return 1;
            } else if (x.getTimestamp() - y.getTimestamp() < 0) {
                return -1;
            } else {
                return 0;
            }
        }).sortedCopy(elements);

        for (EventBase event : sortedEvents) {
            out.collect(event);
        }
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容