kafka之重復(fù)消費數(shù)據(jù)

在進(jìn)入主題之前,我們先思考一個問題。
問題

kafka消費者使用自動提交的模式,提交間隔為2s,消費者在獲取數(shù)據(jù)的時候處理0.5s,從kafka拉取過來的數(shù)據(jù)只夠處理1秒。那么消費者下次拉取過來的數(shù)據(jù)是否是已經(jīng)消費完的數(shù)據(jù)?或者說由于數(shù)據(jù)已經(jīng)消費,但是偏移量沒有被提交,是否會造成下次獲取的數(shù)據(jù)是從舊的偏移量開始拉???


答案

不會是舊數(shù)據(jù),kafka的消費者也有自己偏移量,這個偏移量是從kafka中讀取的量,和kafka提交的偏移量不一樣。假設(shè)變成自動提交偏移量,而且沒有寫提交的邏輯,同一個消費者,除了第一次或者rebalance會根據(jù)已提交的offset來獲取數(shù)據(jù),剩下的時候都是根據(jù)自己本地的偏移量來獲取的。這個模式有點類似于用桶取水,用瓢來喝水。消費者就是桶的角色,poll就是瓢的角色。

重復(fù)消費的情況

我們把重復(fù)消費的情況分為2種,一種是想避免的,一種是故意如此的。

想避免的場景

  1. 消費者使用了自動提交模式,當(dāng)還沒有提交的時候,有新的消費者加入或者移除,發(fā)生了rebalance。再次消費的時候,消費者會根據(jù)提交的偏移量來,于是重復(fù)消費了數(shù)據(jù)。
  2. 使用異步提交,并且在callback里寫了失敗重試,但是沒有注意順序。例如提交5的時候,發(fā)送網(wǎng)絡(luò)故障,由于是異步,程序繼續(xù)運行,再次提交10的時候,提交成功,此時正好運行到5的重試,并且成功。當(dāng)發(fā)生了rebalance,又會重復(fù)消費了數(shù)據(jù)。

注:這里不討論那個消費者提交的offset的作用。

故意的場景

  1. 使用不同的組消費同一個topic。改個 group.id屬性即可。
  2. 自己手動提交偏移量。
    這里的麻煩的地方就是需要理解開頭的問題,并不是說你提交完就可以了。你得想個辦法去讀取那個偏移量再次消費。下面提供一個暴力的手段,關(guān)閉消費者,然后再次開啟新的。
 public static void consumer(Properties properties,String info) {
        System.out.println(info);
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer(properties);
        kafkaConsumer.subscribe(Arrays.asList(new String[]{"hello"}));
        boolean flag = false;
        while (true) {
            ConsumerRecords<String, String> poll = kafkaConsumer.poll(100);
            if (!poll.isEmpty()) {
                for (ConsumerRecord<String, String> o : poll) {
                    System.out.println(o.value() + o.offset());
                    //假設(shè)場景為重復(fù)消費3,這里需要根據(jù)業(yè)務(wù)來提交便宜量
                    if (o.offset() == 3) {
                        //手動提交偏移量
                        Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<TopicPartition, OffsetAndMetadata>();
                        //提交的偏移量,這個偏移量就是下次消費的第一條數(shù)據(jù)
                        currentOffset.put(new TopicPartition(o.topic(), o.partition()), new OffsetAndMetadata(o.offset()+1, ""));
                        kafkaConsumer.commitSync(currentOffset);
                        flag = true;
                        break;
                    }
                }
            }
            if(flag){
                kafkaConsumer.close();
                break;
            }
        }
    }

這里也必須注意,kafka并不是數(shù)據(jù)庫,他保存的數(shù)據(jù)有持久化的時間和大小的限制,可能你提交的偏移量的數(shù)據(jù)已經(jīng)被kafka清理掉了。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 姓名:周小蓬 16019110037 轉(zhuǎn)載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,911評論 13 425
  • Kafka簡介 Kafka是一種分布式的,基于發(fā)布/訂閱的消息系統(tǒng)。主要設(shè)計目標(biāo)如下: 以時間復(fù)雜度為O(1)的方...
    Alukar閱讀 3,154評論 0 43
  • 消費者網(wǎng)絡(luò)客戶端輪詢:ConsumerNetworkClient。ConsumerNetworkClient是對N...
    紹圣閱讀 842評論 0 0
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,680評論 19 139
  • 上一章說了600516的技術(shù)分析,這一章就接著說。而且因為大的走勢圖上沒有形成背馳段,所以現(xiàn)在還是來說五分鐘走勢圖...
    磨劍斬蒼穹閱讀 131評論 0 1

友情鏈接更多精彩內(nèi)容