09 | Kafka_無(wú)消息丟失配置怎么實(shí)現(xiàn)?

一直以來(lái),很多人對(duì)于 Kafka 丟失消息這件事情都有著自己的理解,因而也就有著自己的解決之道。在討論具體的應(yīng)對(duì)方法之前,我覺(jué)得我們首先要明確,在 Kafka 的世界里什么才算是消息丟失,或者說(shuō) Kafka 在什么情況下能保證消息不丟失。這點(diǎn)非常關(guān)鍵,因?yàn)楹芏鄷r(shí)候我們?nèi)菀谆煜?zé)任的邊界,如果搞不清楚事情由誰(shuí)負(fù)責(zé),自然也就不知道由誰(shuí)來(lái)出解決方案了。

那 Kafka 到底在什么情況下才能保證消息不丟失呢?

一句話概括,Kafka 只對(duì)“已提交”的消息(committed message)做有限度的持久化保證。

這句話里面有兩個(gè)核心要素,我們一一來(lái)看。

第一個(gè)核心要素是“已提交的消息”。什么是已提交的消息?當(dāng) Kafka 的若干個(gè) Broker 成功地接收到一條消息并寫(xiě)入到日志文件后,它們會(huì)告訴生產(chǎn)者程序這條消息已成功提交。此時(shí),這條消息在 Kafka 看來(lái)就正式變?yōu)椤耙烟峤弧毕⒘恕?/p>

那為什么是若干個(gè) Broker 呢?這取決于你對(duì)“已提交”的定義。你可以選擇只要有一個(gè) Broker 成功保存該消息就算是已提交,也可以是令所有 Broker 都成功保存該消息才算是已提交。不論哪種情況,Kafka 只對(duì)已提交的消息做持久化保證這件事情是不變的。

第二個(gè)核心要素就是“有限度的持久化保證”,也就是說(shuō) Kafka 不可能保證在任何情況下都做到不丟失消息。舉個(gè)極端點(diǎn)的例子,如果地球都不存在了,Kafka 還能保存任何消息嗎?顯然不能!倘若這種情況下你依然還想要 Kafka 不丟消息,那么只能在別的星球部署 Kafka Broker 服務(wù)器了。

現(xiàn)在你應(yīng)該能夠稍微體會(huì)出這里的“有限度”的含義了吧,其實(shí)就是說(shuō) Kafka 不丟消息是有前提條件的。假如你的消息保存在 N 個(gè) Kafka Broker 上,那么這個(gè)前提條件就是這 N 個(gè) Broker 中至少有 1 個(gè)存活。只要這個(gè)條件成立,Kafka 就能保證你的這條消息永遠(yuǎn)不會(huì)丟失。

總結(jié)一下,Kafka 是能做到不丟失消息的,只不過(guò)這些消息必須是已提交的消息,而且還要滿足一定的條件。當(dāng)然,說(shuō)明這件事并不是要為 Kafka 推卸責(zé)任,而是為了在出現(xiàn)該類(lèi)問(wèn)題時(shí)我們能夠明確責(zé)任邊界。

“消息丟失”案例

好了,理解了 Kafka 是怎樣做到不丟失消息的,那接下來(lái)我?guī)銖?fù)盤(pán)一下那些常見(jiàn)的“Kafka 消息丟失”案例。注意,這里可是帶引號(hào)的消息丟失哦,其實(shí)有些時(shí)候我們只是冤枉了 Kafka 而已。

案例 1:生產(chǎn)者程序丟失數(shù)據(jù)

Producer 程序丟失消息,這應(yīng)該算是被抱怨最多的數(shù)據(jù)丟失場(chǎng)景了。我來(lái)描述一個(gè)場(chǎng)景:你寫(xiě)了一個(gè) Producer 應(yīng)用向 Kafka 發(fā)送消息,最后發(fā)現(xiàn) Kafka 沒(méi)有保存,于是大罵:“Kafka 真爛,消息發(fā)送居然都能丟失,而且還不告訴我?!”如果你有過(guò)這樣的經(jīng)歷,那么請(qǐng)先消消氣,我們來(lái)分析下可能的原因。

目前 Kafka Producer 是異步發(fā)送消息的,也就是說(shuō)如果你調(diào)用的是 producer.send(msg) 這個(gè) API,那么它通常會(huì)立即返回,但此時(shí)你不能認(rèn)為消息發(fā)送已成功完成。

這種發(fā)送方式有個(gè)有趣的名字,叫“fire and forget”,翻譯一下就是“發(fā)射后不管”。這個(gè)術(shù)語(yǔ)原本屬于導(dǎo)彈制導(dǎo)領(lǐng)域,后來(lái)被借鑒到計(jì)算機(jī)領(lǐng)域中,它的意思是,執(zhí)行完一個(gè)操作后不去管它的結(jié)果是否成功。調(diào)用 producer.send(msg) 就屬于典型的“fire and forget”,因此如果出現(xiàn)消息丟失,我們是無(wú)法知曉的。這個(gè)發(fā)送方式挺不靠譜吧,不過(guò)有些公司真的就是在使用這個(gè) API 發(fā)送消息。

如果用這個(gè)方式,可能會(huì)有哪些因素導(dǎo)致消息沒(méi)有發(fā)送成功呢?其實(shí)原因有很多,例如網(wǎng)絡(luò)抖動(dòng),導(dǎo)致消息壓根就沒(méi)有發(fā)送到 Broker 端;或者消息本身不合格導(dǎo)致 Broker 拒絕接收(比如消息太大了,超過(guò)了 Broker 的承受能力)等。這么來(lái)看,讓 Kafka“背鍋”就有點(diǎn)冤枉它了。就像前面說(shuō)過(guò)的,Kafka 不認(rèn)為消息是已提交的,因此也就沒(méi)有 Kafka 丟失消息這一說(shuō)了。

不過(guò),就算不是 Kafka 的“鍋”,我們也要解決這個(gè)問(wèn)題吧。實(shí)際上,解決此問(wèn)題的方法非常簡(jiǎn)單:Producer 永遠(yuǎn)要使用帶有回調(diào)通知的發(fā)送 API,也就是說(shuō)不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。不要小瞧這里的 callback(回調(diào)),它能準(zhǔn)確地告訴你消息是否真的提交成功了。一旦出現(xiàn)消息提交失敗的情況,你就可以有針對(duì)性地進(jìn)行處理。

舉例來(lái)說(shuō),如果是因?yàn)槟切┧矔r(shí)錯(cuò)誤,那么僅僅讓 Producer 重試就可以了;如果是消息不合格造成的,那么可以調(diào)整消息格式后再次發(fā)送。總之,處理發(fā)送失敗的責(zé)任在 Producer 端而非 Broker 端。

你可能會(huì)問(wèn),發(fā)送失敗真的沒(méi)可能是由 Broker 端的問(wèn)題造成的嗎?當(dāng)然可能!如果你所有的 Broker 都宕機(jī)了,那么無(wú)論 Producer 端怎么重試都會(huì)失敗的,此時(shí)你要做的是趕快處理 Broker 端的問(wèn)題。但之前說(shuō)的核心論據(jù)在這里依然是成立的:Kafka 依然不認(rèn)為這條消息屬于已提交消息,故對(duì)它不做任何持久化保證。

案例 2:消費(fèi)者程序丟失數(shù)據(jù)

Consumer 端丟失數(shù)據(jù)主要體現(xiàn)在 Consumer 端要消費(fèi)的消息不見(jiàn)了。Consumer 程序有個(gè)“位移”的概念,表示的是這個(gè) Consumer 當(dāng)前消費(fèi)到的 Topic 分區(qū)的位置。下面這張圖來(lái)自于官網(wǎng),它清晰地展示了 Consumer 端的位移數(shù)據(jù)。

比如對(duì)于 Consumer A 而言,它當(dāng)前的位移值就是 9;Consumer B 的位移值是 11。

這里的“位移”類(lèi)似于我們看書(shū)時(shí)使用的書(shū)簽,它會(huì)標(biāo)記我們當(dāng)前閱讀了多少頁(yè),下次翻書(shū)的時(shí)候我們能直接跳到書(shū)簽頁(yè)繼續(xù)閱讀。

正確使用書(shū)簽有兩個(gè)步驟:第一步是讀書(shū),第二步是更新書(shū)簽頁(yè)。如果這兩步的順序顛倒了,就可能出現(xiàn)這樣的場(chǎng)景:當(dāng)前的書(shū)簽頁(yè)是第 90 頁(yè),我先將書(shū)簽放到第 100 頁(yè)上,之后開(kāi)始讀書(shū)。當(dāng)閱讀到第 95 頁(yè)時(shí),我臨時(shí)有事中止了閱讀。那么問(wèn)題來(lái)了,當(dāng)我下次直接跳到書(shū)簽頁(yè)閱讀時(shí),我就丟失了第 96~99 頁(yè)的內(nèi)容,即這些消息就丟失了。

同理,Kafka 中 Consumer 端的消息丟失就是這么一回事。要對(duì)抗這種消息丟失,辦法很簡(jiǎn)單:維持先消費(fèi)消息(閱讀),再更新位移(書(shū)簽)的順序即可。這樣就能最大限度地保證消息不丟失。

當(dāng)然,這種處理方式可能帶來(lái)的問(wèn)題是消息的重復(fù)處理,類(lèi)似于同一頁(yè)書(shū)被讀了很多遍,但這不屬于消息丟失的情形。在專(zhuān)欄后面的內(nèi)容中,我會(huì)跟你分享如何應(yīng)對(duì)重復(fù)消費(fèi)的問(wèn)題。

除了上面所說(shuō)的場(chǎng)景,其實(shí)還存在一種比較隱蔽的消息丟失場(chǎng)景。

我們依然以看書(shū)為例。假設(shè)你花錢(qián)從網(wǎng)上租借了一本共有 10 章內(nèi)容的電子書(shū),該電子書(shū)的有效閱讀時(shí)間是 1 天,過(guò)期后該電子書(shū)就無(wú)法打開(kāi),但如果在 1 天之內(nèi)你完成閱讀就退還租金。

為了加快閱讀速度,你把書(shū)中的 10 個(gè)章節(jié)分別委托給你的 10 個(gè)朋友,請(qǐng)他們幫你閱讀,并拜托他們告訴你主旨大意。當(dāng)電子書(shū)臨近過(guò)期時(shí),這 10 個(gè)人告訴你說(shuō)他們讀完了自己所負(fù)責(zé)的那個(gè)章節(jié)的內(nèi)容,于是你放心地把該書(shū)還了回去。不料,在這 10 個(gè)人向你描述主旨大意時(shí),你突然發(fā)現(xiàn)有一個(gè)人對(duì)你撒了謊,他并沒(méi)有看完他負(fù)責(zé)的那個(gè)章節(jié)。那么很顯然,你無(wú)法知道那一章的內(nèi)容了。

對(duì)于 Kafka 而言,這就好比 Consumer 程序從 Kafka 獲取到消息后開(kāi)啟了多個(gè)線程異步處理消息,而 Consumer 程序自動(dòng)地向前更新位移。假如其中某個(gè)線程運(yùn)行失敗了,它負(fù)責(zé)的消息沒(méi)有被成功處理,但位移已經(jīng)被更新了,因此這條消息對(duì)于 Consumer 而言實(shí)際上是丟失了。

這里的關(guān)鍵在于 Consumer 自動(dòng)提交位移,與你沒(méi)有確認(rèn)書(shū)籍內(nèi)容被全部讀完就將書(shū)歸還類(lèi)似,你沒(méi)有真正地確認(rèn)消息是否真的被消費(fèi)就“盲目”地更新了位移。

這個(gè)問(wèn)題的解決方案也很簡(jiǎn)單:如果是多線程異步處理消費(fèi)消息,Consumer 程序不要開(kāi)啟自動(dòng)提交位移,而是要應(yīng)用程序手動(dòng)提交位移。在這里我要提醒你一下,單個(gè) Consumer 程序使用多線程來(lái)消費(fèi)消息說(shuō)起來(lái)容易,寫(xiě)成代碼卻異常困難,因?yàn)槟愫茈y正確地處理位移的更新,也就是說(shuō)避免無(wú)消費(fèi)消息丟失很簡(jiǎn)單,但極易出現(xiàn)消息被消費(fèi)了多次的情況。

最佳實(shí)踐

看完這兩個(gè)案例之后,我來(lái)分享一下 Kafka 無(wú)消息丟失的配置,每一個(gè)其實(shí)都能對(duì)應(yīng)上面提到的問(wèn)題。

  1. 不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。記住,一定要使用帶有回調(diào)通知的 send 方法。
  2. 設(shè)置 acks = all。acks 是 Producer 的一個(gè)參數(shù),代表了你對(duì)“已提交”消息的定義。如果設(shè)置成 all,則表明所有副本 Broker 都要接收到消息,該消息才算是“已提交”。這是最高等級(jí)的“已提交”定義。
  3. 設(shè)置 retries 為一個(gè)較大的值。這里的 retries 同樣是 Producer 的參數(shù),對(duì)應(yīng)前面提到的 Producer 自動(dòng)重試。當(dāng)出現(xiàn)網(wǎng)絡(luò)的瞬時(shí)抖動(dòng)時(shí),消息發(fā)送可能會(huì)失敗,此時(shí)配置了 retries > 0 的 Producer 能夠自動(dòng)重試消息發(fā)送,避免消息丟失。
  4. 設(shè)置 unclean.leader.election.enable = false。這是 Broker 端的參數(shù),它控制的是哪些 Broker 有資格競(jìng)選分區(qū)的 Leader。如果一個(gè) Broker 落后原先的 Leader 太多,那么它一旦成為新的 Leader,必然會(huì)造成消息的丟失。故一般都要將該參數(shù)設(shè)置成 false,即不允許這種情況的發(fā)生。
  5. 設(shè)置 replication.factor >= 3。這也是 Broker 端的參數(shù)。其實(shí)這里想表述的是,最好將消息多保存幾份,畢竟目前防止消息丟失的主要機(jī)制就是冗余。
  6. 設(shè)置 min.insync.replicas > 1。這依然是 Broker 端參數(shù),控制的是消息至少要被寫(xiě)入到多少個(gè)副本才算是“已提交”。設(shè)置成大于 1 可以提升消息持久性。在實(shí)際環(huán)境中千萬(wàn)不要使用默認(rèn)值 1。
  7. 確保 replication.factor > min.insync.replicas。如果兩者相等,那么只要有一個(gè)副本掛機(jī),整個(gè)分區(qū)就無(wú)法正常工作了。我們不僅要改善消息的持久性,防止數(shù)據(jù)丟失,還要在不降低可用性的基礎(chǔ)上完成。推薦設(shè)置成 replication.factor = min.insync.replicas + 1。
  8. 確保消息消費(fèi)完成再提交。Consumer 端有個(gè)參數(shù) enable.auto.commit,最好把它設(shè)置成 false,并采用手動(dòng)提交位移的方式。就像前面說(shuō)的,這對(duì)于單 Consumer 多線程處理的場(chǎng)景而言是至關(guān)重要的。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Design 1. Motivation 我們?cè)O(shè)計(jì)Kafka用來(lái)作為統(tǒng)一的平臺(tái)來(lái)處理大公司可能擁有的所有實(shí)時(shí)數(shù)據(jù)源...
    BlackManba_24閱讀 1,645評(píng)論 0 8
  • Kafka無(wú)消息丟失配置 Kafka到底會(huì)不會(huì)丟數(shù)據(jù)(data loss)? 網(wǎng)上各種說(shuō)法都有,在回答這個(gè)問(wèn)題之前...
    生活的探路者閱讀 336評(píng)論 0 0
  • MQ(消息隊(duì)列)是跨進(jìn)程通信的方式之一,可理解為異步rpc,上游系統(tǒng)對(duì)調(diào)用結(jié)果的態(tài)度往往是重要不緊急。使用消息隊(duì)列...
    allin8116閱讀 634評(píng)論 0 0
  • 一、入門(mén)1、簡(jiǎn)介Kafka is a distributed,partitioned,replicated com...
    HxLiang閱讀 3,682評(píng)論 0 9
  • 做少年時(shí),讀名人的說(shuō)話,多半要思考,有汲取精華的渴望。現(xiàn)在讀這些,但有判別心而已。大概這就是成人后的事情吧,有偏見(jiàn)...
    彭有倦閱讀 183評(píng)論 0 4

友情鏈接更多精彩內(nèi)容