kafka消費(fèi)延遲或者重復(fù)消費(fèi)原因

簡(jiǎn)介

由于項(xiàng)目中需要使用kafka作為消息隊(duì)列,并且項(xiàng)目是基于spring-boot來(lái)進(jìn)行構(gòu)建的,所以項(xiàng)目采用了spring-kafka作為原生kafka的一個(gè)擴(kuò)展庫(kù)進(jìn)行使用。先說(shuō)明一下版本:

  • spring-boot 的版本是1.4.0.RELEASE
  • kafka 的版本是0.9.0.x 版本
  • spring-kafka 的版本是1.0.3.RELEASE

用過(guò)kafka的人都知道,對(duì)于使用kafka來(lái)說(shuō),producer的使用相對(duì)簡(jiǎn)單一些,只需要把數(shù)據(jù)按照指定的格式發(fā)送給kafka中某一個(gè)topic就可以了。本文主要是針對(duì)spring-kafka的consumer端上的使用進(jìn)行簡(jiǎn)單一些分析和總結(jié)。

kafka的速度是很快,所以一般來(lái)說(shuō)producer的生產(chǎn)消息的邏輯速度都會(huì)比consumer的消費(fèi)消息的邏輯速度快。

具體案例

之前在項(xiàng)目中遇到了一個(gè)案例是,consumer消費(fèi)一條數(shù)據(jù)平均需要200ms的時(shí)間,并且在某個(gè)時(shí)刻,producer會(huì)在短時(shí)間內(nèi)產(chǎn)生大量的數(shù)據(jù)丟進(jìn)kafka的broker里面(假設(shè)平均1s中內(nèi)丟入了5w條需要消費(fèi)的消息,這個(gè)情況會(huì)持續(xù)幾分鐘)。

對(duì)于這種情況,kafka的consumer的行為會(huì)是:

  • kafka的consumer會(huì)從broker里面取出一批數(shù)據(jù),?給消費(fèi)線程進(jìn)行消費(fèi)。
  • 由于取出的一批消息數(shù)量太大,consumer在session.timeout.ms時(shí)間之內(nèi)沒(méi)有消費(fèi)完成
  • consumer coordinator 會(huì)由于沒(méi)有接受到心跳而掛掉,并且出現(xiàn)一些日志
    日志的意思大概是coordinator掛掉了,然后自動(dòng)提交offset失敗,然后重新分配partition給客戶端
  • 由于自動(dòng)提交offset失敗,導(dǎo)致重新分配了partition的客戶端又重新消費(fèi)之前的一批數(shù)據(jù)
  • 接著consumer重新消費(fèi),又出現(xiàn)了消費(fèi)超時(shí),無(wú)限循環(huán)下去。

解決方案

遇到了這個(gè)問(wèn)題之后, 我們做了一些步驟:

  • 提高了partition的數(shù)量,從而提高了consumer的并行能力,從而提高數(shù)據(jù)的消費(fèi)能力
  • 對(duì)于單partition的消費(fèi)線程,增加了一個(gè)固定長(zhǎng)度的阻塞隊(duì)列和工作線程池進(jìn)一步提高并行消費(fèi)的能力
  • 由于使用了spring-kafka,則把kafka-client的enable.auto.commit設(shè)置成了false,表示禁止kafka-client自動(dòng)提交offset,因?yàn)榫褪侵暗淖詣?dòng)提交失敗,導(dǎo)致offset永遠(yuǎn)沒(méi)更新,從而轉(zhuǎn)向使用spring-kafka的offset提交機(jī)制。并且spring-kafka提供了多種提交策略:
    這些策略保證了在一批消息沒(méi)有完成消費(fèi)的情況下,也能提交offset,從而避免了完全提交不上而導(dǎo)致永遠(yuǎn)重復(fù)消費(fèi)的問(wèn)題。

分析

那么問(wèn)題來(lái)了,為什么spring-kafka的提交offset的策略能夠解決spring-kafka的auto-commit的帶來(lái)的重復(fù)消費(fèi)的問(wèn)題呢?下面通過(guò)分析spring-kafka的關(guān)鍵源碼來(lái)解析這個(gè)問(wèn)題。

首先來(lái)看看spring-kafka的消費(fèi)線程邏輯

if (isRunning() && this.definedPartitions != null) { 
      initPartitionsIfNeeded();      
 // we start the invoker here as there will be no rebalance calls to       
// trigger it, but only if the container is not set to autocommit       
// otherwise we will process records on a separate thread      
     if (!this.autoCommit) {        
            startInvoker();     
     }
 }
  • 上面可以看到,如果auto.commit關(guān)掉的話,spring-kafka會(huì)啟動(dòng)一個(gè)invoker,這個(gè)invoker的目的就是啟動(dòng)一個(gè)線程去消費(fèi)數(shù)據(jù),他消費(fèi)的數(shù)據(jù)不是直接從kafka里面直接取的,那么他消費(fèi)的數(shù)據(jù)從哪里來(lái)呢?他是從一個(gè)spring-kafka自己創(chuàng)建的阻塞隊(duì)列里面取的。

  • 然后會(huì)進(jìn)入一個(gè)循環(huán),從源代碼中可以看到如果auto.commit被關(guān)掉的話, 他會(huì)先把之前處理過(guò)的數(shù)據(jù)先進(jìn)行提交offset,然后再去從kafka里面取數(shù)據(jù)。

  • 然后把取到的數(shù)據(jù)丟給上面提到的阻塞列隊(duì),由上面創(chuàng)建的線程去消費(fèi),并且如果阻塞隊(duì)列滿了導(dǎo)致取到的數(shù)據(jù)塞不進(jìn)去的話,spring-kafka會(huì)調(diào)用kafka的pause方法,則consumer會(huì)停止從kafka里面繼續(xù)再拿數(shù)據(jù)。

  • 接著spring-kafka還會(huì)處理一些異常的情況,比如失敗之后是不是需要commit offset這樣的邏輯。

方法二

  • 可以根據(jù)消費(fèi)者的消費(fèi)速度對(duì)session.timeout.ms的時(shí)間進(jìn)行設(shè)置,適當(dāng)延長(zhǎng)
  • 或者減少每次從partition里面撈取的數(shù)據(jù)分片的大小,提高消費(fèi)者的消費(fèi)速度。

參考鏈接:http://www.itdecent.cn/p/4e00dff97f39

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,694評(píng)論 19 139
  • 姓名:周小蓬 16019110037 轉(zhuǎn)載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,915評(píng)論 13 425
  • Kafka簡(jiǎn)介 Kafka是一種分布式的,基于發(fā)布/訂閱的消息系統(tǒng)。主要設(shè)計(jì)目標(biāo)如下: 以時(shí)間復(fù)雜度為O(1)的方...
    Alukar閱讀 3,155評(píng)論 0 43
  • 本文轉(zhuǎn)載自http://dataunion.org/?p=9307 背景介紹Kafka簡(jiǎn)介Kafka是一種分布式的...
    Bottle丶Fish閱讀 5,592評(píng)論 0 34
  • 對(duì)于對(duì)面樓上那位領(lǐng)導(dǎo)長(zhǎng)期把狗放在樓下公共區(qū)域、狗叫吵人的事,葉子經(jīng)過(guò)了長(zhǎng)期的憤怒之后,終于有了一點(diǎn)轉(zhuǎn)折性的進(jìn)步。 ...
    自由心空閱讀 379評(píng)論 0 2

友情鏈接更多精彩內(nèi)容