簡(jiǎn)介
由于項(xiàng)目中需要使用kafka作為消息隊(duì)列,并且項(xiàng)目是基于spring-boot來(lái)進(jìn)行構(gòu)建的,所以項(xiàng)目采用了spring-kafka作為原生kafka的一個(gè)擴(kuò)展庫(kù)進(jìn)行使用。先說(shuō)明一下版本:
- spring-boot 的版本是1.4.0.RELEASE
- kafka 的版本是0.9.0.x 版本
- spring-kafka 的版本是1.0.3.RELEASE
用過(guò)kafka的人都知道,對(duì)于使用kafka來(lái)說(shuō),producer的使用相對(duì)簡(jiǎn)單一些,只需要把數(shù)據(jù)按照指定的格式發(fā)送給kafka中某一個(gè)topic就可以了。本文主要是針對(duì)spring-kafka的consumer端上的使用進(jìn)行簡(jiǎn)單一些分析和總結(jié)。
kafka的速度是很快,所以一般來(lái)說(shuō)producer的生產(chǎn)消息的邏輯速度都會(huì)比consumer的消費(fèi)消息的邏輯速度快。
具體案例
之前在項(xiàng)目中遇到了一個(gè)案例是,consumer消費(fèi)一條數(shù)據(jù)平均需要200ms的時(shí)間,并且在某個(gè)時(shí)刻,producer會(huì)在短時(shí)間內(nèi)產(chǎn)生大量的數(shù)據(jù)丟進(jìn)kafka的broker里面(假設(shè)平均1s中內(nèi)丟入了5w條需要消費(fèi)的消息,這個(gè)情況會(huì)持續(xù)幾分鐘)。
對(duì)于這種情況,kafka的consumer的行為會(huì)是:
- kafka的consumer會(huì)從broker里面取出一批數(shù)據(jù),?給消費(fèi)線程進(jìn)行消費(fèi)。
- 由于取出的一批消息數(shù)量太大,consumer在session.timeout.ms時(shí)間之內(nèi)沒(méi)有消費(fèi)完成
- consumer coordinator 會(huì)由于沒(méi)有接受到心跳而掛掉,并且出現(xiàn)一些日志
日志的意思大概是coordinator掛掉了,然后自動(dòng)提交offset失敗,然后重新分配partition給客戶端 - 由于自動(dòng)提交offset失敗,導(dǎo)致重新分配了partition的客戶端又重新消費(fèi)之前的一批數(shù)據(jù)
- 接著consumer重新消費(fèi),又出現(xiàn)了消費(fèi)超時(shí),無(wú)限循環(huán)下去。
解決方案
遇到了這個(gè)問(wèn)題之后, 我們做了一些步驟:
- 提高了partition的數(shù)量,從而提高了consumer的并行能力,從而提高數(shù)據(jù)的消費(fèi)能力
- 對(duì)于單partition的消費(fèi)線程,增加了一個(gè)固定長(zhǎng)度的阻塞隊(duì)列和工作線程池進(jìn)一步提高并行消費(fèi)的能力
- 由于使用了spring-kafka,則把kafka-client的enable.auto.commit設(shè)置成了false,表示禁止kafka-client自動(dòng)提交offset,因?yàn)榫褪侵暗淖詣?dòng)提交失敗,導(dǎo)致offset永遠(yuǎn)沒(méi)更新,從而轉(zhuǎn)向使用spring-kafka的offset提交機(jī)制。并且spring-kafka提供了多種提交策略:
這些策略保證了在一批消息沒(méi)有完成消費(fèi)的情況下,也能提交offset,從而避免了完全提交不上而導(dǎo)致永遠(yuǎn)重復(fù)消費(fèi)的問(wèn)題。
分析
那么問(wèn)題來(lái)了,為什么spring-kafka的提交offset的策略能夠解決spring-kafka的auto-commit的帶來(lái)的重復(fù)消費(fèi)的問(wèn)題呢?下面通過(guò)分析spring-kafka的關(guān)鍵源碼來(lái)解析這個(gè)問(wèn)題。
首先來(lái)看看spring-kafka的消費(fèi)線程邏輯
if (isRunning() && this.definedPartitions != null) {
initPartitionsIfNeeded();
// we start the invoker here as there will be no rebalance calls to
// trigger it, but only if the container is not set to autocommit
// otherwise we will process records on a separate thread
if (!this.autoCommit) {
startInvoker();
}
}
上面可以看到,如果auto.commit關(guān)掉的話,spring-kafka會(huì)啟動(dòng)一個(gè)invoker,這個(gè)invoker的目的就是啟動(dòng)一個(gè)線程去消費(fèi)數(shù)據(jù),他消費(fèi)的數(shù)據(jù)不是直接從kafka里面直接取的,那么他消費(fèi)的數(shù)據(jù)從哪里來(lái)呢?他是從一個(gè)spring-kafka自己創(chuàng)建的阻塞隊(duì)列里面取的。
然后會(huì)進(jìn)入一個(gè)循環(huán),從源代碼中可以看到如果auto.commit被關(guān)掉的話, 他會(huì)先把之前處理過(guò)的數(shù)據(jù)先進(jìn)行提交offset,然后再去從kafka里面取數(shù)據(jù)。
然后把取到的數(shù)據(jù)丟給上面提到的阻塞列隊(duì),由上面創(chuàng)建的線程去消費(fèi),并且如果阻塞隊(duì)列滿了導(dǎo)致取到的數(shù)據(jù)塞不進(jìn)去的話,spring-kafka會(huì)調(diào)用kafka的pause方法,則consumer會(huì)停止從kafka里面繼續(xù)再拿數(shù)據(jù)。
接著spring-kafka還會(huì)處理一些異常的情況,比如失敗之后是不是需要commit offset這樣的邏輯。
方法二
- 可以根據(jù)消費(fèi)者的消費(fèi)速度對(duì)session.timeout.ms的時(shí)間進(jìn)行設(shè)置,適當(dāng)延長(zhǎng)
- 或者減少每次從partition里面撈取的數(shù)據(jù)分片的大小,提高消費(fèi)者的消費(fèi)速度。