kafka consumer offset機制

kafka消息在分區(qū)中是按序一條一條存儲的,假如分區(qū)中有10條消息,位移就是0-9,

consumer消費了5條消息,那么offset就是5,指向了下一條要消費的記錄,consumer

需要向kafka匯報自己的位移數(shù)據(jù),因為consumer是能夠消費多個分區(qū)的,所以offset

的粒度是分區(qū),consumer需要為分配給他的各分區(qū)分別提交offset信息。

從用戶的角度來說,位移提交分為自動提交和手動提交,在consumer的角度來說,位移

分為同步提交和異步提交。

offset提交原理

kafka內(nèi)部有個topic叫 ‘__consumer_offsets’,offset提交就是往這個topic發(fā)送一條消息,

消息格式是key value形式,key是由 groupId、主題名、分區(qū)號組成,消息體是位移值

及用戶自定義數(shù)據(jù)和時間戳等。還有2種特殊的格式,一種是用于保存 Consumer Group?

信息的消息,用于注冊group,另一種是 用于刪除 Group 過期位移和刪除 Group 的消息。

當kafka集群種第一臺consumer啟動時,便會創(chuàng)建__consumer_offsets主題,默認50個

分區(qū)和3個副本。

當提交方式是自動提交時,就算是當前consumer的offset已經(jīng)不更新,kafka還是會自動

定期的往__consumer_offsets發(fā)送位移消息,所以得對位移主題的消息做定期刪除,

假如對于同一個key有2條A和B,A早于B發(fā)送,那么A就是屬于過期消息。

Compact 策略

compact有點類似jvm gc的標記-整理,把過期消息刪掉,把剩下的消息排列在一起


Kafka 提供了專門的后臺線程定期地巡檢待Compact 的主題,看看是否存在滿足條件的

可刪除數(shù)據(jù),這個線程叫Log Cleaner,當我們發(fā)現(xiàn)位移主題日志過多的時候,可以

檢查一下是否是這個線程掛了導致的

自動提交導致重復消費

enable.auto.commit 默認即是true,

auto.commit.interval.ms 默認是5秒,表示kafka每5秒自動提交一次位移信息。

自動提交會有消息重復消費的問題,因為他是間隔時間提交一次,假如在間隔期間,

發(fā)生了Rebalance ,在Rebalance 之后所有的消費者必須從當前最新的offset開始

繼續(xù)消費,那么上一次自動提交到Rebalance 的這段時間消費的數(shù)據(jù)的位移并沒有

提交,所以會重復消費,即時我們通過減少 auto.commit.interval.ms 的值來提高提交頻率,

那也僅僅是縮小了重復消費的時間窗口,所以我們看看能不能通過手動提交來避免重復消費。

同步提交及異步提交

commitSync()是consumer的同步api,手動提交的好處自然是我們可以控制提交的時機

和頻率,由于是同步api,是會阻塞至broker返回結果才會結束這個阻塞狀態(tài),對于系統(tǒng)

而言,自然不想發(fā)生這種不是由于資源的限制導致的阻塞。

commitAsync()是consumer的異步api,commitAsync()不會阻塞,因此不會影響consumer的

tps,但是他的問題在于他無法重試,因為是異步提交,當因為網(wǎng)絡或者系統(tǒng)資源阻塞

導致提交失敗,那么他重試的時候,在這期間,consumer可能已經(jīng)消費好多條消息

并且提交了,所以此時的重試提交的offset已然不是最新值了并沒有意義,我們可以通過

異步和同步提交相結合,我們使用同步提交來規(guī)避因為網(wǎng)絡問題或者broker端的gc導致的

這種瞬時的提交失敗,進而通過重試機制而提交offset,使用異步提交來規(guī)避提交時的阻塞

分段式提交

前面的commitSync()和commitAsync(),都是consumer poll消息,把這些消息消費完,

再提交最新的offset,如果poll的消息很多呢?消費時間較長,假如中間系統(tǒng)宕機,豈不是

得從頭再來一遍,所以kafka提供分段提交的api

commitSync(Map<TopicPartition, OffsetAndMetadata>)?

?commitAsync(Map<TopicPartition, OffsetAndMetadata>)

假設我們poll了一秒鐘的數(shù)據(jù),有5000條,我們可以通過計數(shù)器,累計到100條,

便通過分段提交api向kafka提交一次offset。

?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容