06. KafkaConsumer是如何處理過期offset(越界offset)請求
該篇文章是基于以下的條件編寫完成的
1. Kafka cluster集群版本0.10.0
2. 提前創(chuàng)建好了一個單partition,單副本的topic:kafka.log.retention
3. Kafka log清理策略配置log.retention.minutes=10,該配置的作用是清理掉當前時間減去日志段中最新時間的差值,如果大于10分鐘,就將刪除過期的日志段
接下來我們基于以下的步驟演示:
1. 編寫程序向topic:kafka.log.retention中發(fā)送10條數(shù)據(jù)

輸出結果

我們可以看到發(fā)送數(shù)據(jù)后最新時間是21:45,大概將在21:55的時候生成新的日志段文件

2. 我們消費其中的兩條數(shù)據(jù),并且commit offset:1
以下是KafkaConsumer的相關參數(shù)配置


消費的兩條數(shù)據(jù)打印格式為:offset:key:value

通過以上的程序我們看到,并且同步提交了offset到kafka server端
3. 我們等10分鐘時間,日志段將被清理

我們看到準時在21:55生成了新的日志段
4. 接下來我們再使用上面的消費程序接著消費數(shù)據(jù)
由于我們之前消費到了offset=1的地方,接下來要開始消費offset=2的數(shù)據(jù),其實我們從第三步可以看到offset從2到9的數(shù)據(jù)已經(jīng)被刪除了,接下來我們看kafka是如何處理的
我們看到ApiKeys.FETCH請求offset=1,服務器會從>1的offset返回數(shù)據(jù)也就是offset=2

請求的響應如下圖:

我們看到服務器返回了Errors.OFFSET_OUT_OF_RANGE(offset越界)的錯誤,由于我們設置了默認的offset重置策略,接下來會根據(jù)重置策略重新設置消費組獲取數(shù)據(jù)的起始offset

會通過該方法重新獲取起始offset

由于我們設置的策略是properties.put("auto.offset.reset", "earliest");所以從最早的offset開始,目前最早的offset=10
5. 我們將重置策略設置成:none
properties.put("auto.offset.reset", " none")
接著執(zhí)行上面第4步的消費操作,kafka的client會怎樣處理呢?

我們看到傳遞參數(shù)中的offset還是1

我們看到此時的fetch響應走到了else的分支
會在接下來走到該代碼

會在該方法中拋出異常OffsetOutOfRangeException

堆棧異常

總結
通過我們上面的試驗,我們發(fā)現(xiàn)不同的offset重置策略對kafka-client對過期請求的響應方式是不同的
Offse重置策略一共有三種:"latest", "earliest", "none"
"latest", "earliest"不影響程序的正常運行,會打印相關的offset越界日志
"none"會直接拋出OffsetOutOfRangeException異常