06. KafkaConsumer是如何處理過期offset(越界offset)請求

06. KafkaConsumer是如何處理過期offset(越界offset)請求

該篇文章是基于以下的條件編寫完成的

1. Kafka cluster集群版本0.10.0

2. 提前創(chuàng)建好了一個單partition,單副本的topic:kafka.log.retention

3. Kafka log清理策略配置log.retention.minutes=10,該配置的作用是清理掉當前時間減去日志段中最新時間的差值,如果大于10分鐘,就將刪除過期的日志段

接下來我們基于以下的步驟演示:

1. 編寫程序向topic:kafka.log.retention中發(fā)送10條數(shù)據(jù)

01

輸出結果

02

我們可以看到發(fā)送數(shù)據(jù)后最新時間是21:45,大概將在21:55的時候生成新的日志段文件

03

2. 我們消費其中的兩條數(shù)據(jù),并且commit offset:1

以下是KafkaConsumer的相關參數(shù)配置

04
05

消費的兩條數(shù)據(jù)打印格式為:offset:key:value

06

通過以上的程序我們看到,并且同步提交了offset到kafka server端

3. 我們等10分鐘時間,日志段將被清理

07

我們看到準時在21:55生成了新的日志段

4. 接下來我們再使用上面的消費程序接著消費數(shù)據(jù)

由于我們之前消費到了offset=1的地方,接下來要開始消費offset=2的數(shù)據(jù),其實我們從第三步可以看到offset從2到9的數(shù)據(jù)已經(jīng)被刪除了,接下來我們看kafka是如何處理的

我們看到ApiKeys.FETCH請求offset=1,服務器會從>1的offset返回數(shù)據(jù)也就是offset=2

08

請求的響應如下圖:

09

我們看到服務器返回了Errors.OFFSET_OUT_OF_RANGE(offset越界)的錯誤,由于我們設置了默認的offset重置策略,接下來會根據(jù)重置策略重新設置消費組獲取數(shù)據(jù)的起始offset

10

會通過該方法重新獲取起始offset

11

由于我們設置的策略是properties.put("auto.offset.reset", "earliest");所以從最早的offset開始,目前最早的offset=10

5. 我們將重置策略設置成:none

properties.put("auto.offset.reset", " none")

接著執(zhí)行上面第4步的消費操作,kafka的client會怎樣處理呢?

12

我們看到傳遞參數(shù)中的offset還是1

13

我們看到此時的fetch響應走到了else的分支

會在接下來走到該代碼

14

會在該方法中拋出異常OffsetOutOfRangeException

15

堆棧異常

16

總結

通過我們上面的試驗,我們發(fā)現(xiàn)不同的offset重置策略對kafka-client對過期請求的響應方式是不同的

Offse重置策略一共有三種:"latest", "earliest", "none"

"latest", "earliest"不影響程序的正常運行,會打印相關的offset越界日志

"none"會直接拋出OffsetOutOfRangeException異常

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容