在 [Kafka 101 - 5] 圖文并茂地介紹 offset 概念 中我們介紹了消息的 offset 和消費者的 offset,并且提到消費者的 offset 的維護(hù)方式是消費者自己提交到一個特殊 topic,聽起來似乎很簡單,但實際這個提交 offset 的過程也有點內(nèi)容,所以本文來學(xué)習(xí) Kafka 中消費者提交 offset 的五種方式。
需要明確的一點是,即使 Kafka 中維護(hù)了消費者的 offset,消費者仍然有可能重復(fù)消費或者少消費數(shù)據(jù)的,如果想要保證消費數(shù)據(jù)的完全準(zhǔn)確,不丟不重,即所謂的“Exactly Once”,需要使用別的機制來保證,比如 Flink 的 checkpoint 機制,但這些機制也需要能夠正確的理解 offset 的提交。
內(nèi)容提要:
- 環(huán)境說明
- 自動提交
- 同步提交
- 異步提交
- 同步&異步結(jié)合
- 提交指定 offset
- 總結(jié)
1. 環(huán)境說明
操作系統(tǒng):MacOS/Linux
Kafka:本地安裝了社區(qū)版的 2.3.1 版本,運行在 9092 端口
Kafka 的安裝使用可以參考:[Kafka 101-1] Kafka安裝使用
消費者 Java API 的基本使用可以參考:[Kafka 101-3] 使用Java API消費數(shù)據(jù)實戰(zhàn)
2. 自動提交
這是 offset 提交的默認(rèn)方式。
消費者有一個參數(shù)叫做 enable.auto.commit ,表示是否啟用 offset 的自動提交,默認(rèn)值為 true,并且還有一個配套的參數(shù)叫做 auto.commit.interval.ms,表示自動提交 offset 的時間間隔,默認(rèn)值是 5000,即 5s 提交自動提交一次 offset,還記得消費者的 poll 循環(huán)嗎?長這個樣子:
while (true) {
// consumer 是一個 KafkaConsumer對象
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
// 用 println 模擬數(shù)據(jù)數(shù)據(jù)過程
System.out.printf("消息內(nèi)容為:%s\n", record.value());
}
}
每當(dāng)調(diào)用 KafkaConsumer 對象的 poll 方法時,它會去檢查距離上一次提交 offset 是否已經(jīng)過去 5s,如果夠 5s 的話,則會幫助你把這次 poll 操作消費的最大 offset 提交給 Kafka broker。
注意:如果在自動提交了 offset 之后 3s 的時候程序因為某種原因掛掉了,并且在這 3s 期間消費了 10000 條數(shù)據(jù),那么當(dāng)程序重啟后,這 10000 條數(shù)據(jù)會被再次消費,因為 Kafka 中記錄的是最后一次提交的 offset,程序重啟后會從這個 offset 開始繼續(xù)消費,所以,自動提交有重復(fù)消費數(shù)據(jù)的風(fēng)險,而且我們完全無法控制,看起來不是很棒,所以在重視數(shù)據(jù)準(zhǔn)確性的場景中,都不會采用自動提交的方式。
下文中的方法都需要在構(gòu)造 KafkaConsumer 對象的時候傳入
auto.commit.offset參數(shù),并且設(shè)置為false。
3. 同步提交
直接上代碼:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
// 用 println 模擬數(shù)據(jù)處理過程
System.out.printf("消息內(nèi)容為:%s\n", record.value());
}
// 處理完畢后提交 offset
try {
consumer.commitSync();
} catch (CommitFailedException e) {
// commitSync 會一直嘗試提交直到成功或者遇到不可恢復(fù)的錯誤
log.error("commit faile", e)
}
}
可以看到,同步提交 offset 的方法叫做 commitSync(),這個方法會提交最近一次調(diào)用 poll 所消費到的最大 offset,這上面的例子中,我們是把 commitSync() 放在了數(shù)據(jù)處理之后,如果程序在數(shù)據(jù)處理過程中掛掉,這時已經(jīng)處理了一部分?jǐn)?shù)據(jù)(比如寫到了MySQL 中),那么重啟后會從上一次提交的 offset 繼續(xù)消費,之前已經(jīng)寫入 MySQL 的數(shù)據(jù)會被再處理一次,因此MySQL 中的數(shù)據(jù)會重復(fù)。
那么如果我們把 commitSync 放在數(shù)據(jù)處理的代碼之前呢,答案是數(shù)據(jù)有可能丟失,因為這種情況下的問題是,可能已經(jīng)提交了 offset,但是數(shù)據(jù)處理過程沒有完成,故障重啟之后只能消費到新數(shù)據(jù),重啟前沒有來得及處理的數(shù)據(jù)也處理不到了。
所以,同步提交的方式,有可能造成數(shù)據(jù)重復(fù),也有可能造成數(shù)據(jù)丟失,取決于 commitSync() 方法的位置。
4. 異步提交
有個同步,就有個異步,同步相比異步的不足是,提交 offset 的時候程序會阻塞住,限制了消費吞吐量,因此就有了異步提交,上代碼:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
// 用 println 模擬數(shù)據(jù)處理過程
System.out.printf("消息內(nèi)容為:%s\n", record.value());
}
// 處理完畢后提交 offset
consumer.commitAsync();
}
調(diào)用了 commitAsync 方法后,程序會繼續(xù)執(zhí)行,確實可以提高吞吐量,但是沒有只有好處沒有壞處的事,上面提到,提交 offset 是有可能失敗的,同步提交方法會一直嘗試提交,要么成功,要么遇到不可恢復(fù)的錯誤拋異常,但是異步提交方法不會重試,為什么不重試呢?因為如果不成功就一直重試,有可能更新一次的 offset 提交都已經(jīng)完成了,如果重試成功了,反而會把更新的 offset 給覆蓋掉,造成重復(fù),所以干脆不重試。
異步的方法一般都一個回調(diào)函數(shù),這個也有:
consumer.commitAsyn(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
if (e != null)
log.error("Commit failed for offsets {}", offsets, e);
}
})
如果想要在異步提交失敗的時候重試,《Kafka 權(quán)威指南》給了一種方法,維護(hù)一個單調(diào)遞增的序列號,每提交一次,遞增一下這個序列號,并把序列號傳給回調(diào)函數(shù),當(dāng)在回調(diào)函數(shù)中發(fā)現(xiàn)失敗時,比較一下回調(diào)函數(shù)中的序列號和當(dāng)前序列號的大小,如果當(dāng)前的序列號已經(jīng)比回調(diào)函數(shù)的序列號大,那就不用重新提交 offset 了,因為已經(jīng)有一個更新的 offset 在提交了。
我覺得按照這個說法,代碼可能長這個樣子(聲明:沒有驗證過):
// 維護(hù)全局序列號
int global_seq = 0;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("消息內(nèi)容為:%s\n", record.value());
}
// 異步提交 offset 時傳給回調(diào)函數(shù)
consumer.commitAsync(new MyOffsetCommitCallback(global_seq++));
}
// 私有類實現(xiàn) OffsetCommitCallback 接口
private class MyOffsetCommitCallback implements OffsetCommitCallback {
private int seq;
// 回調(diào)函數(shù)初始化時記錄當(dāng)前序列號
MyOffsetCommitCallback(int global_seq) {
this.seq = global_seq;
}
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
if (exception != nul) {
// 兩者相等表示還沒有更新的 offset 在提交,可以重試
if (this.seq == global_seq) {
// 在這里重試,怎么重試你來想吧
}
}
}
}
5. 同步&異步結(jié)合
其實偶爾有一兩次提交 offset 失敗問題不大,因為只要后續(xù)有 offset 提交成功了,之前的失敗可以忽略的。但是如果知道這是最后一次提交了,那么還是有必要確保這次提交能成功的。所以一種常用的提交 offset 的方式,是同時使用同步提交和異步提交,它可以兼顧效率和可靠性(數(shù)據(jù)準(zhǔn)確性),上代碼:
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("消息內(nèi)容為:%s\n", record.value());
}
// 即使失敗也不要緊,要么有下一次異步的提交,要么有關(guān)閉前的最后一次提交
consumer.commitAsync();
}
// 捕獲處理不了的異常
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
// 關(guān)閉 consumer 前最后一次提交使用同步的方式,最大程度的確保成功
consumer.commitSync();
} finally {
consumer.close();
}
}
簡單的解釋一下代碼:poll 循環(huán)里用異步提交,效率高,整個 poll 循環(huán)捕獲到異常之后在關(guān)閉前進(jìn)行一次同步提交,穩(wěn)妥,保證最新的 offset能被提交(當(dāng)然,如果是不可恢復(fù)的異常,比如 Kafka 宕機,發(fā)生這種異常是無法提交成功的)。
6. 提交指定 offset
上面的這些提交方式,都是 poll 一次,提交一次,其實還可以 poll 一次,提交多次,比如每處理一條數(shù)據(jù),提交一次,上代碼:
// 引入由 TopPartition 和 OffsetAndMetadata 組成的 Map 類型
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.printf("消息內(nèi)容為:%s\n", record.value());
// 更新當(dāng)前分區(qū)的 offset
currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));
// 這里用了異步提交,也可以用同步提交
// consumer.commitSync(currentOffsets);
consumer.commitAsync(currentOffsets, null);
}
}
解釋一下:同步提交的方法和異步提交的方法都可以接受Map<TopicPartition, OffsetAndMetadata> 類型的參數(shù),提交這個參數(shù)指定的 offset,而不是通過 poll 方法消費的最大 offset。
7. 總結(jié)
各種 offset 的提交方式和優(yōu)缺點總結(jié)如下:
| 提交方式 | 優(yōu)點&缺點 |
|---|---|
| 自動提交 | 可能有大量重復(fù)消費,不受控制 |
| 同步提交 | 效率低,也有可能重復(fù)消費, 但比自動提交少 |
| 異步提交 | 調(diào)用效率高 |
| 同步&異步結(jié)合 | 高吞吐量,且可靠,可能有少量的重復(fù)消費(推薦) |
| 提交指定 offset | 可以比其它方式更加頻繁的提交,仍然有可能重復(fù) |
歡迎交流討論,吐槽建議。
勤學(xué)似春起之苗,不見其增,日有所長
輟學(xué)如磨刀之石,不見其損,日有所虧
關(guān)注【大數(shù)據(jù)學(xué)徒】,用技術(shù)干貨助你日有所長
