[Kafka 101-6] “茴字有五種寫法”之 offset 的提交

[Kafka 101 - 5] 圖文并茂地介紹 offset 概念 中我們介紹了消息的 offset 和消費者的 offset,并且提到消費者的 offset 的維護(hù)方式是消費者自己提交到一個特殊 topic,聽起來似乎很簡單,但實際這個提交 offset 的過程也有點內(nèi)容,所以本文來學(xué)習(xí) Kafka 中消費者提交 offset 的五種方式

需要明確的一點是,即使 Kafka 中維護(hù)了消費者的 offset,消費者仍然有可能重復(fù)消費或者少消費數(shù)據(jù)的,如果想要保證消費數(shù)據(jù)的完全準(zhǔn)確,不丟不重,即所謂的“Exactly Once”,需要使用別的機制來保證,比如 Flink 的 checkpoint 機制,但這些機制也需要能夠正確的理解 offset 的提交。

內(nèi)容提要:

  1. 環(huán)境說明
  2. 自動提交
  3. 同步提交
  4. 異步提交
  5. 同步&異步結(jié)合
  6. 提交指定 offset
  7. 總結(jié)

1. 環(huán)境說明

操作系統(tǒng):MacOS/Linux
Kafka:本地安裝了社區(qū)版的 2.3.1 版本,運行在 9092 端口

Kafka 的安裝使用可以參考:[Kafka 101-1] Kafka安裝使用
消費者 Java API 的基本使用可以參考:[Kafka 101-3] 使用Java API消費數(shù)據(jù)實戰(zhàn)

2. 自動提交

這是 offset 提交的默認(rèn)方式。

消費者有一個參數(shù)叫做 enable.auto.commit ,表示是否啟用 offset 的自動提交,默認(rèn)值為 true,并且還有一個配套的參數(shù)叫做 auto.commit.interval.ms,表示自動提交 offset 的時間間隔,默認(rèn)值是 5000,即 5s 提交自動提交一次 offset,還記得消費者的 poll 循環(huán)嗎?長這個樣子:

while (true) {
  // consumer 是一個 KafkaConsumer對象
  ConsumerRecords<String, String> records = consumer.poll(100);
  for (ConsumerRecord<String, String> record : records) {
    // 用 println 模擬數(shù)據(jù)數(shù)據(jù)過程
    System.out.printf("消息內(nèi)容為:%s\n", record.value());
  }
}

每當(dāng)調(diào)用 KafkaConsumer 對象的 poll 方法時,它會去檢查距離上一次提交 offset 是否已經(jīng)過去 5s,如果夠 5s 的話,則會幫助你把這次 poll 操作消費的最大 offset 提交給 Kafka broker。

注意:如果在自動提交了 offset 之后 3s 的時候程序因為某種原因掛掉了,并且在這 3s 期間消費了 10000 條數(shù)據(jù),那么當(dāng)程序重啟后,這 10000 條數(shù)據(jù)會被再次消費,因為 Kafka 中記錄的是最后一次提交的 offset,程序重啟后會從這個 offset 開始繼續(xù)消費,所以,自動提交有重復(fù)消費數(shù)據(jù)的風(fēng)險,而且我們完全無法控制,看起來不是很棒,所以在重視數(shù)據(jù)準(zhǔn)確性的場景中,都不會采用自動提交的方式。

下文中的方法都需要在構(gòu)造 KafkaConsumer 對象的時候傳入 auto.commit.offset 參數(shù),并且設(shè)置為 false。

3. 同步提交

直接上代碼:

while (true) {
  ConsumerRecords<String, String> records = consumer.poll(100);
  for (ConsumerRecord<String, String> record : records) {
    // 用 println 模擬數(shù)據(jù)處理過程
    System.out.printf("消息內(nèi)容為:%s\n", record.value());
  }
  // 處理完畢后提交 offset
  try {
    consumer.commitSync();
  } catch (CommitFailedException e) {
    // commitSync 會一直嘗試提交直到成功或者遇到不可恢復(fù)的錯誤
    log.error("commit faile", e)
  }
}

可以看到,同步提交 offset 的方法叫做 commitSync(),這個方法會提交最近一次調(diào)用 poll 所消費到的最大 offset,這上面的例子中,我們是把 commitSync() 放在了數(shù)據(jù)處理之后,如果程序在數(shù)據(jù)處理過程中掛掉,這時已經(jīng)處理了一部分?jǐn)?shù)據(jù)(比如寫到了MySQL 中),那么重啟后會從上一次提交的 offset 繼續(xù)消費,之前已經(jīng)寫入 MySQL 的數(shù)據(jù)會被再處理一次,因此MySQL 中的數(shù)據(jù)會重復(fù)。

那么如果我們把 commitSync 放在數(shù)據(jù)處理的代碼之前呢,答案是數(shù)據(jù)有可能丟失,因為這種情況下的問題是,可能已經(jīng)提交了 offset,但是數(shù)據(jù)處理過程沒有完成,故障重啟之后只能消費到新數(shù)據(jù),重啟前沒有來得及處理的數(shù)據(jù)也處理不到了。

所以,同步提交的方式,有可能造成數(shù)據(jù)重復(fù),也有可能造成數(shù)據(jù)丟失,取決于 commitSync() 方法的位置。

4. 異步提交

有個同步,就有個異步,同步相比異步的不足是,提交 offset 的時候程序會阻塞住,限制了消費吞吐量,因此就有了異步提交,上代碼:

while (true) {
  ConsumerRecords<String, String> records = consumer.poll(100);
  for (ConsumerRecord<String, String> record : records) {
    // 用 println 模擬數(shù)據(jù)處理過程
    System.out.printf("消息內(nèi)容為:%s\n", record.value());
  }
  // 處理完畢后提交 offset
  consumer.commitAsync();
}

調(diào)用了 commitAsync 方法后,程序會繼續(xù)執(zhí)行,確實可以提高吞吐量,但是沒有只有好處沒有壞處的事,上面提到,提交 offset 是有可能失敗的,同步提交方法會一直嘗試提交,要么成功,要么遇到不可恢復(fù)的錯誤拋異常,但是異步提交方法不會重試,為什么不重試呢?因為如果不成功就一直重試,有可能更新一次的 offset 提交都已經(jīng)完成了,如果重試成功了,反而會把更新的 offset 給覆蓋掉,造成重復(fù),所以干脆不重試。

異步的方法一般都一個回調(diào)函數(shù),這個也有:

consumer.commitAsyn(new OffsetCommitCallback() {
    public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
        if (e != null)
            log.error("Commit failed for offsets {}", offsets, e);
    }
})

如果想要在異步提交失敗的時候重試,《Kafka 權(quán)威指南》給了一種方法,維護(hù)一個單調(diào)遞增的序列號,每提交一次,遞增一下這個序列號,并把序列號傳給回調(diào)函數(shù),當(dāng)在回調(diào)函數(shù)中發(fā)現(xiàn)失敗時,比較一下回調(diào)函數(shù)中的序列號和當(dāng)前序列號的大小,如果當(dāng)前的序列號已經(jīng)比回調(diào)函數(shù)的序列號大,那就不用重新提交 offset 了,因為已經(jīng)有一個更新的 offset 在提交了。

我覺得按照這個說法,代碼可能長這個樣子(聲明:沒有驗證過):

// 維護(hù)全局序列號
int global_seq = 0;
while (true) {
  ConsumerRecords<String, String> records = consumer.poll(100);
  for (ConsumerRecord<String, String> record : records) {
    System.out.printf("消息內(nèi)容為:%s\n", record.value());
  }
  // 異步提交 offset 時傳給回調(diào)函數(shù)
  consumer.commitAsync(new MyOffsetCommitCallback(global_seq++));
}
// 私有類實現(xiàn) OffsetCommitCallback 接口
private class MyOffsetCommitCallback implements OffsetCommitCallback {
  private int seq;
  // 回調(diào)函數(shù)初始化時記錄當(dāng)前序列號
  MyOffsetCommitCallback(int global_seq) {
    this.seq = global_seq;
  }
  public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception e) {
        if (exception != nul) {
      // 兩者相等表示還沒有更新的 offset 在提交,可以重試
      if (this.seq == global_seq) {
        // 在這里重試,怎么重試你來想吧
      }
    }
  }
}

5. 同步&異步結(jié)合

其實偶爾有一兩次提交 offset 失敗問題不大,因為只要后續(xù)有 offset 提交成功了,之前的失敗可以忽略的。但是如果知道這是最后一次提交了,那么還是有必要確保這次提交能成功的。所以一種常用的提交 offset 的方式,是同時使用同步提交和異步提交,它可以兼顧效率和可靠性(數(shù)據(jù)準(zhǔn)確性),上代碼:

try {
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records) {
      System.out.printf("消息內(nèi)容為:%s\n", record.value());
        }
    // 即使失敗也不要緊,要么有下一次異步的提交,要么有關(guān)閉前的最后一次提交
    consumer.commitAsync();
  }
// 捕獲處理不了的異常
} catch (Exception e) {
  log.error("Unexpected error", e);
} finally {
  try {
    // 關(guān)閉 consumer 前最后一次提交使用同步的方式,最大程度的確保成功
    consumer.commitSync();
  } finally {
    consumer.close();
  }
}

簡單的解釋一下代碼:poll 循環(huán)里用異步提交,效率高,整個 poll 循環(huán)捕獲到異常之后在關(guān)閉前進(jìn)行一次同步提交,穩(wěn)妥,保證最新的 offset能被提交(當(dāng)然,如果是不可恢復(fù)的異常,比如 Kafka 宕機,發(fā)生這種異常是無法提交成功的)。

6. 提交指定 offset

上面的這些提交方式,都是 poll 一次,提交一次,其實還可以 poll 一次,提交多次,比如每處理一條數(shù)據(jù),提交一次,上代碼:

// 引入由 TopPartition 和 OffsetAndMetadata 組成的 Map 類型
private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
while (true) {
  ConsumerRecords<String, String> records = consumer.poll(100);
  for (ConsumerRecord<String, String> record : records)
  {
    System.out.printf("消息內(nèi)容為:%s\n", record.value());
    // 更新當(dāng)前分區(qū)的 offset
    currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()+1, "no metadata"));
    // 這里用了異步提交,也可以用同步提交
    // consumer.commitSync(currentOffsets);
    consumer.commitAsync(currentOffsets, null);
  }
}

解釋一下:同步提交的方法和異步提交的方法都可以接受Map<TopicPartition, OffsetAndMetadata> 類型的參數(shù),提交這個參數(shù)指定的 offset,而不是通過 poll 方法消費的最大 offset。

7. 總結(jié)

各種 offset 的提交方式和優(yōu)缺點總結(jié)如下:

提交方式 優(yōu)點&缺點
自動提交 可能有大量重復(fù)消費,不受控制
同步提交 效率低,也有可能重復(fù)消費, 但比自動提交少
異步提交 調(diào)用效率高
同步&異步結(jié)合 高吞吐量,且可靠,可能有少量的重復(fù)消費(推薦)
提交指定 offset 可以比其它方式更加頻繁的提交,仍然有可能重復(fù)

歡迎交流討論,吐槽建議。

勤學(xué)似春起之苗,不見其增,日有所長
輟學(xué)如磨刀之石,不見其損,日有所虧
關(guān)注【大數(shù)據(jù)學(xué)徒】,用技術(shù)干貨助你日有所長

大數(shù)據(jù)學(xué)徒

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容