kafka消息在分區(qū)中是按序一條一條存儲的,假如分區(qū)中有10條消息,位移就是0-9,
consumer消費了5條消息,那么offset就是5,指向了下一條要消費的記錄,consumer
需要向kafka匯報自己的位移數(shù)據(jù),因為consumer是能夠消費多個分區(qū)的,所以offset
的粒度是分區(qū),consumer需要為分配給他的各分區(qū)分別提交offset信息。
從用戶的角度來說,位移提交分為自動提交和手動提交,在consumer的角度來說,位移
分為同步提交和異步提交。
offset提交原理
kafka內(nèi)部有個topic叫 ‘__consumer_offsets’,offset提交就是往這個topic發(fā)送一條消息,
消息格式是key value形式,key是由 groupId、主題名、分區(qū)號組成,消息體是位移值
及用戶自定義數(shù)據(jù)和時間戳等。還有2種特殊的格式,一種是用于保存 Consumer Group?
信息的消息,用于注冊group,另一種是 用于刪除 Group 過期位移和刪除 Group 的消息。
當kafka集群種第一臺consumer啟動時,便會創(chuàng)建__consumer_offsets主題,默認50個
分區(qū)和3個副本。
當提交方式是自動提交時,就算是當前consumer的offset已經(jīng)不更新,kafka還是會自動
定期的往__consumer_offsets發(fā)送位移消息,所以得對位移主題的消息做定期刪除,
假如對于同一個key有2條A和B,A早于B發(fā)送,那么A就是屬于過期消息。
Compact 策略
compact有點類似jvm gc的標記-整理,把過期消息刪掉,把剩下的消息排列在一起

Kafka 提供了專門的后臺線程定期地巡檢待Compact 的主題,看看是否存在滿足條件的
可刪除數(shù)據(jù),這個線程叫Log Cleaner,當我們發(fā)現(xiàn)位移主題日志過多的時候,可以
檢查一下是否是這個線程掛了導致的
自動提交導致重復消費
enable.auto.commit 默認即是true,
auto.commit.interval.ms 默認是5秒,表示kafka每5秒自動提交一次位移信息。
自動提交會有消息重復消費的問題,因為他是間隔時間提交一次,假如在間隔期間,
發(fā)生了Rebalance ,在Rebalance 之后所有的消費者必須從當前最新的offset開始
繼續(xù)消費,那么上一次自動提交到Rebalance 的這段時間消費的數(shù)據(jù)的位移并沒有
提交,所以會重復消費,即時我們通過減少 auto.commit.interval.ms 的值來提高提交頻率,
那也僅僅是縮小了重復消費的時間窗口,所以我們看看能不能通過手動提交來避免重復消費。
同步提交及異步提交
commitSync()是consumer的同步api,手動提交的好處自然是我們可以控制提交的時機
和頻率,由于是同步api,是會阻塞至broker返回結果才會結束這個阻塞狀態(tài),對于系統(tǒng)
而言,自然不想發(fā)生這種不是由于資源的限制導致的阻塞。
commitAsync()是consumer的異步api,commitAsync()不會阻塞,因此不會影響consumer的
tps,但是他的問題在于他無法重試,因為是異步提交,當因為網(wǎng)絡或者系統(tǒng)資源阻塞
導致提交失敗,那么他重試的時候,在這期間,consumer可能已經(jīng)消費好多條消息
并且提交了,所以此時的重試提交的offset已然不是最新值了并沒有意義,我們可以通過
異步和同步提交相結合,我們使用同步提交來規(guī)避因為網(wǎng)絡問題或者broker端的gc導致的
這種瞬時的提交失敗,進而通過重試機制而提交offset,使用異步提交來規(guī)避提交時的阻塞
分段式提交
前面的commitSync()和commitAsync(),都是consumer poll消息,把這些消息消費完,
再提交最新的offset,如果poll的消息很多呢?消費時間較長,假如中間系統(tǒng)宕機,豈不是
得從頭再來一遍,所以kafka提供分段提交的api
commitSync(Map<TopicPartition, OffsetAndMetadata>)?
?commitAsync(Map<TopicPartition, OffsetAndMetadata>)
假設我們poll了一秒鐘的數(shù)據(jù),有5000條,我們可以通過計數(shù)器,累計到100條,
便通過分段提交api向kafka提交一次offset。