聊聊 Kafka:Kafka 消息丟失的場景以及最佳實踐

一、前言

大家好,我是老周,有快二十多天沒有更新文章了,很多小伙伴一直在催更。先說明下最近的情況,最近項目上線很忙,沒有時間寫,并且組里有個同事使用 Kafka 不當,導致線上消息丟失,在修復一些線上的數(shù)據(jù),人都麻了。事情是這樣,有個 Kafka 消費者實例,部署到線上去,消費到了線上的數(shù)據(jù),而新版本做了新的邏輯,新版本的業(yè)務邏輯與老版本的業(yè)務邏輯不兼容,直接導致消費失敗,沒有進行重試操作,關鍵還提交了 offset。直接這部分數(shù)據(jù)沒有被業(yè)務處理,導致消息丟失,然后緊急修復線上數(shù)據(jù)。

剛好這些天忙完了有空,所以記錄一下,同時看是否對大家能起到避免踩坑的作用,能有一些作用,那我寫的也就值了。

我們下面會從以下三個方面來說一下 Kafka 消息丟失的場景以及最佳實踐。

  • 生產者丟失消息
  • Kafka Broker 服務端丟失消息
  • 消費者丟失消息

二、Kafka 的三種消息語義

先說 Kafka 消息丟失的場景之前,我們先來說下 Kafka 的三種消息語義,不會還有人不知道吧?這個不應該了,消息系統(tǒng)基本上抽象成這以下三種消息語義了:

  • 最多傳遞一次
  • 最少傳遞一次
  • 僅有一次傳遞

[圖片上傳失敗...(image-5cecfb-1651760752586)]

類型 消息是否會重復 消息是否會丟失 優(yōu)勢 劣勢 適用場景
最多一次 生產端發(fā)送消息后不用等待和處理服務端響應,消息發(fā)送速度會很快。 網絡或服務端有問題會造成消息的丟失 消息系統(tǒng)吞吐量大且對消息的丟失不敏感。例如:日志收集、用戶行為等場景。
最少一次 生產端發(fā)送消息后需要等待和處理服務端響應,如果失敗會重試。 吞吐量較低,有重復發(fā)送的消息。 消息系統(tǒng)吞吐量一般,但是絕不能丟消息,對于重復消息不敏感。
有且僅有一次 消息不重復,消息不丟失,消息可靠性很好。 吞吐量較低 對消息的可靠性要求很高,同時可以容忍較小的吞吐量。

三、Kafka 消息丟失的場景

3.1 生產者丟失消息

  • 目前 Kafka Producer 是異步發(fā)送消息的,如果你的 Producer 客戶端使用了 producer.send(msg) 方法來發(fā)送消息,方法會立即返回,但此時并不能代表消息已經發(fā)送成功了。
  • 如果消息再發(fā)送的過程中發(fā)生了網絡抖動,那么消息可能沒有傳遞到 Broker,那么消息可能會丟失。
  • 如果發(fā)送的消息本身不符合,如大小超過了 Broker 的承受能力等。

3.2 Kafka Broker 服務端丟失消息

  • Leader Broker 宕機了,觸發(fā)選舉過程,集群選舉了一個落后 Leader 太多的 Broker 作為 Leader,那么落后的那些消息就會丟失了。
  • Kafka 為了提升性能,使用頁緩存機制,將消息寫入頁緩存而非直接持久化至磁盤,采用了異步批量刷盤機制,也就是說,按照一定的消息量和時間間隔去刷盤,刷盤的動作由操作系統(tǒng)來調度的,如果刷盤之前,Broker 宕機了,重啟后在頁緩存的這部分消息則會丟失。

3.3 消費者丟失消息

  • 消費者拉取了消息,并處理了消息,但處理消息異常了導致失敗,并且提交了偏移量,消費者重啟后,會從之前已提交的位移的下一個位置重新開始消費,消費失敗的那些消息不會再次處理,即相當于消費者丟失了消息。
  • 消費者拉取了消息,并提交了消費位移,但是在消息處理結束之前突然發(fā)生了宕機等故障,消費者重啟后,會從之前已提交的位移的下一個位置重新開始消費,之前未處理完成的消息不會再次處理,即相當于消費者丟失了消息。

四、最佳實踐

4.1 生產端

  • 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。帶有回調通知的 send 方法可以針對發(fā)送失敗的消息進行重試處理。

  • 設置 acks = all。代表了你對“已提交”消息的定義。如果設置成 all,則表明所有副本 Broker 都要接收到消息,該消息才算是“已提交”。這是最高等級的“已提交”定義。
    [圖片上傳失敗...(image-ce92e-1651760752586)]

  • 設置 retries = 3,當出現(xiàn)網絡的瞬時抖動時,消息發(fā)送可能會失敗,此時配置了 retries > 0 的 Producer 能夠自動重試消息發(fā)送,避免消息丟失。
    [圖片上傳失敗...(image-55e592-1651760752586)]
    如果重試達到設定的次數(shù),那么生產者就會放棄重試并返回異常。不過并不是所有的異常都是可以通過重試來解決的,比如消息太大,超過max.request.size參數(shù)配置的值時,這種方式就不可行了。

  • 設置 retry.backoff.ms = 300,合理估算重試的時間間隔,可以避免無效的頻繁重試。
    [圖片上傳失敗...(image-1185de-1651760752586)]
    它用來設定兩次重試之間的時間間隔,避免無效的頻繁重試。在配置 retriesretry.backoff.ms之前,最好先估算一下可能的異常恢復時間,這樣可以設定總的重試時間大于這個異?;謴蜁r間,以此來避免生產者過早地放棄重試。

4.2 Broker 端

  • 設置 unclean.leader.election.enable = false。它控制的是哪些 Broker 有資格競選分區(qū)的 Leader。如果一個 Broker 落后原先的 Leader 太多,那么它一旦成為新的 Leader,必然會造成消息的丟失。故一般都要將該參數(shù)設置成 false,即不允許這種情況的發(fā)生。
    [圖片上傳失敗...(image-e4c99f-1651760752586)]
  • 設置 replication.factor >= 3。其實這里想表述的是,最好將消息多保存幾份,畢竟目前防止消息丟失的主要機制就是冗余。
    [圖片上傳失敗...(image-832664-1651760752586)]
  • 設置 min.insync.replicas > 1。這控制的是消息至少要被寫入到多少個副本才算是“已提交”。設置成大于 1 可以提升消息持久性。在實際環(huán)境中千萬不要使用默認值 1。
    [圖片上傳失敗...(image-671e34-1651760752586)]
  • 確保 replication.factor > min.insync.replicas。如果兩者相等,那么只要有一個副本掛機,整個分區(qū)就無法正常工作了。我們不僅要改善消息的持久性,防止數(shù)據(jù)丟失,還要在不降低可用性的基礎上完成。推薦設置成 replication.factor = min.insync.replicas + 1。

4.3 消費端

  • 確保消息消費完成再提交。最好把它設置成 enable.auto.commit = false,并采用手動提交位移的方式。這對于單 Consumer 多線程處理的場景而言是至關重要的。
    [圖片上傳失敗...(image-2018bd-1651760752586)]
    雖然采用手動提交位移的方式可以解決消費端消息丟失的場景,但同時會存在重復消費問題,關于重復消費問題我們下一篇再講。

  • 像我們上面說的那個線上問題,即使你設置了手動提交,消費異常了同時也提交了位移,還是會存在消息丟失。

    Kafka 沒有重試機制不支持消息重試,也沒有死信隊列,因此使用 Kafka 做消息隊列時,需要自己
    實現(xiàn)消息重試的功能。這里我先說下大致的思路,后續(xù)有時間再分享代碼出來:

    • 創(chuàng)建一個 Topic 作為重試 Topic,用于接收等待重試的消息。
    • 普通 Topic 消費者設置待重試消息的下一個重試 Topic。
    • 從重試 Topic 獲取待重試消息存儲到 Redis 的 ZSet 中,并以下一次消費時間排序。
    • 定時任務從 Redis 獲取到達消費時間的消息,并把消息發(fā)送到對應的 Topic。
    • 同一個消息重試次數(shù)過多則不再重試。
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容