kafka消息冪等性

生產(chǎn)端冪等性

  • 什么是冪等性,為什么要實(shí)現(xiàn)冪等性?
    分布式系統(tǒng)中,一些不可控因素有很多,比如網(wǎng)絡(luò)、OOM、FullGC等。在Kafka Broker確認(rèn)Ack前,有可能出現(xiàn)網(wǎng)絡(luò)異常、FullGC、OOM等問題時(shí)導(dǎo)致Ack超時(shí),Producer會(huì)進(jìn)行重復(fù)發(fā)送。注,在未達(dá)到最大重試次數(shù)前,會(huì)自動(dòng)重試(非應(yīng)用程序代碼寫的重試)

  • 消息重試對(duì)順序消息的影響
    我們知道正常情況下,單個(gè)分區(qū)內(nèi)的消息是按發(fā)送時(shí)間有序的,但發(fā)生消息重試時(shí)候(例如前一條消息發(fā)送失敗,后一條消息發(fā)送成功,前一條消息重試后成功,造成數(shù)據(jù)亂序),kafka如果保證分區(qū)內(nèi)順序有序?


    圖片.png

,這個(gè)找到最舊的非常關(guān)鍵:【Sender 線程發(fā)送時(shí),在遍歷 queue 中的 batch 時(shí),會(huì)檢查這個(gè) batch 是否是重試的 batch,如果是的話,只有這個(gè) batch 是最舊的那個(gè)需要重試的 batch,才允許發(fā)送,否則本次發(fā)送跳過這個(gè) Topic-Partition 數(shù)據(jù)的發(fā)送等待下次發(fā)】。


冪等性的開啟使用

需要在生產(chǎn)端配置參數(shù)enable.idempotence = true,當(dāng)冪等性開啟的時(shí)候acks即為all。如果顯性的將acks設(shè)置為0或-1,那么將會(huì)報(bào)錯(cuò) Must set acks to all in order to use the idempotent producer. Otherwise we cannot guarantee idempotence 。

Properties props = new Properties(); 
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); 
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true"); 
props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.RETRIES_CONFIG, "3"); 
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props); 
kafkaProducer.send(new ProducerRecord<String, String>("truman_kafka_center", "1", "hello world")).get(); kafkaProducer.close();

冪等性實(shí)現(xiàn)原理

Broker端也會(huì)為每個(gè)<PID, Topic, Partition>維護(hù)一個(gè)序號(hào),并且每次Commit一條消息時(shí)將其對(duì)應(yīng)序號(hào)遞增。對(duì)于接收的每條消息,如果其消息序號(hào)比Broker維護(hù)的序號(hào)(即最后一次Commit的消息的序號(hào))大一,則Broker會(huì)接受它,否則將其丟棄:
a. 如果消息序號(hào)比Broker維護(hù)的序號(hào)大于1以上,說明中間有數(shù)據(jù)尚未寫入,也即亂序,此時(shí)Broker拒絕該消息,Producer拋出InvalidSequenceNumber
b. 如果消息序號(hào)小于等于Broker維護(hù)的序號(hào),說明該消息已被保存,即為重復(fù)消息,Broker直接丟棄該消息,Producer拋出DuplicateSequenceNumber
這種設(shè)計(jì)解決的問題:
1: Broker 保存消息后,發(fā)送 ACK 前宕機(jī),Producer 認(rèn)為消息未發(fā)送成功并重試,造成數(shù)據(jù)重復(fù)
2: 前一條消息發(fā)送失敗,后一條消息發(fā)送成功,前一條消息重試后成功,造成數(shù)據(jù)亂序


如前面所述,冪等性要解決的問題是,在設(shè)置了kafka的at least once 時(shí),由于觸發(fā)重試機(jī)制導(dǎo)致的重復(fù)問題簡單來說 at least once + 冪等 = exactly once
kafka producer實(shí)現(xiàn)冪等性 的兩個(gè)重要機(jī)制
1:producerID,用來表示每個(gè)producer Client
2:sequence number client發(fā)送的每條消息都會(huì)帶有seqnum 。server端根據(jù)這個(gè)num來判斷數(shù)據(jù)是否重復(fù)

  • producerID 在哪兒產(chǎn)生?
    在server端產(chǎn)生,Client 通過向 Server 發(fā)送一個(gè) InitProducerIdRequest 請(qǐng)求獲取 PID(冪等性時(shí),是選擇一臺(tái)連接數(shù)最少的 Broker 發(fā)送這個(gè)請(qǐng)求)。
    【凡是開啟冪等性都是需要生成PID,只不過未開啟事務(wù)的PID可以在任意broker生成,而開啟事務(wù)只能在TransactionCoordinator節(jié)點(diǎn)生成】。

  • sequence number:
    Kafka發(fā)送消息都是以batch的格式發(fā)送,batch包含了多條消息。所以Producer發(fā)送消息batch的時(shí)候,只會(huì)設(shè)置該batch的第一個(gè)消息的序列號(hào),后面消息的序列號(hào)可以根據(jù)第一個(gè)消息的序列號(hào)計(jì)算出來

每一個(gè)topic-partition 都有一套自己的sequence number
producer初始化的時(shí)候回分配一個(gè)producerID。對(duì)于一個(gè)給定的pid他的sequence number會(huì)從0開始自增。每一個(gè)topic-partition都有自己的一套sequence number,client發(fā)送的每條消息都有seq num Server就是根據(jù)這個(gè)seqnum判斷是否重復(fù), 是一個(gè)從0開始單調(diào)遞增的值。
但是這里的PID是全局唯一的,如果client掛掉重啟優(yōu)惠重寫分配一個(gè)PID,這也是冪等性無法做到跨會(huì)話的原因。


下面代碼摘要自網(wǎng)絡(luò),producerId的創(chuàng)建過程:


圖片.png

調(diào)用了 TransactionCoordinator (Broker 在啟動(dòng) server 服務(wù)時(shí)都會(huì)初始化這個(gè)實(shí)例)的 handleInitProducerId() 方法做了相應(yīng)的處理,其實(shí)現(xiàn)如下(這里只關(guān)注冪等性的處理):


圖片.png

冪等性時(shí)發(fā)送流程

正常流程:



異常流程:


圖片.png

當(dāng)Producer發(fā)送消息(x2,y2)給Broker時(shí),Broker接收到消息并將其追加到消息流中。此時(shí),Broker返回Ack信號(hào)給Producer時(shí),發(fā)生異常導(dǎo)致Producer接收Ack信號(hào)失敗。對(duì)于Producer來說,會(huì)觸發(fā)重試機(jī)制,將消息(x2,y2)再次發(fā)送,但是,由于引入了冪等性,在每條消息中附帶了PID(ProducerID)和SequenceNumber。相同的PID和SequenceNumber發(fā)送給Broker,而之前Broker緩存過之前發(fā)送的相同的消息,那么在消息流中的消息就只有一條(x2,y2),不會(huì)出現(xiàn)重復(fù)發(fā)送的情況。

冪等性處理細(xì)節(jié)

producer端處理細(xì)節(jié):

  1. 應(yīng)用通過 KafkaProducer 的 send() 方法將數(shù)據(jù)添加到 RecordAccumulator 中,添加時(shí)會(huì)判斷是否需要新建一個(gè) ProducerBatch,這時(shí)這個(gè) ProducerBatch(消息集,批量發(fā)送) 還是沒有 PID 和 sequence number 信息的

  2. Producer 后臺(tái)發(fā)送線程 Sender,在 run() 方法中,會(huì)先根據(jù) TransactionManager 的 判斷當(dāng)前的 PID 是否需要重置,重置的原因是因?yàn)椋喝绻?topic-partition 的 batch 重試多次失敗最后因?yàn)槌瑫r(shí)而被移除,這時(shí) sequence number 將無法做到連續(xù),因?yàn)?sequence number 有部分已經(jīng)分配出去,這時(shí)系統(tǒng)依賴自身的機(jī)制無法繼續(xù)進(jìn)行下去(因?yàn)閮绲刃允且WC不丟不重的),相當(dāng)于程序遇到了一個(gè) fatal 異常,PID 會(huì)進(jìn)行重置,TransactionManager 相關(guān)的緩存信息被清空(Producer 不會(huì)重啟),只是保存狀態(tài)信息的 TransactionManager 做了 clear+new 操作,遇到這個(gè)問題時(shí)是無法保證 exactly once 的(有數(shù)據(jù)已經(jīng)發(fā)送失敗了,并且超過了重試次數(shù));

  3. Sender 線程通過 maybeWaitForProducerId() 方法判斷是否需要申請(qǐng) PID,如果需要的話,這里會(huì)阻塞直 到獲取到相應(yīng)的 PID 信息;

  4. Sender 線程通過 sendProducerData() 方法發(fā)送數(shù)據(jù),整體流程與之前的 Producer 流程相似,不同的地方是在 RecordAccumulator 的 drain() 方法中,在加了冪等性之后, drain() 方法多了如下幾步判斷:

  • 常規(guī)的判斷:判斷這個(gè) topic-partition 是否可以繼續(xù)發(fā)送(如果出現(xiàn)前面2中的情況是不允許發(fā)送的)、判斷 PID 是否有效、如果這個(gè) batch 是重試的 batch,那么需要判斷這個(gè) batch 之前是否還有 batch 沒有發(fā)送完成,如果有,這里會(huì)先跳過這個(gè) Topic-Partition 的發(fā)送,直到前面的 batch 發(fā)送完成。
  • 如果這個(gè) ProducerBatch 還沒有這個(gè)相應(yīng)的 PID 和 sequence number 信息,會(huì)在這里進(jìn)行相應(yīng)的設(shè)置;
  1. 最后 Sender 線程再調(diào)用 sendProduceRequests() 方法發(fā)送 ProduceRequest 請(qǐng)求,后面的就跟之前正常。

冪等性時(shí) Server 端如何處理 ProduceRequest 請(qǐng)求

冪等性服務(wù)端增加的校驗(yàn),有了 PID 信息,并且不是重復(fù) batch 時(shí),在更新 producer 信息時(shí),會(huì)做以下校驗(yàn):

server 端會(huì)緩存 PID 對(duì)應(yīng)這個(gè) Topic-Partition 的最近5個(gè) batch 信息

  1. 檢查該 PID 是否已經(jīng)緩存中存在(主要是在 ProducerStateManager 對(duì)象中檢查);
  2. 如果不存在,那么判斷 sequence number 是否 從0 開始,是的話,在緩存中記錄 PID 的 meta(PID,epoch, sequence number),并執(zhí)行寫入操作。
    否則返回 UnknownProducerIdException(PID 在 server 端已經(jīng)過期或者這個(gè) PID 寫的數(shù)據(jù)都已經(jīng)過期了,但是 Client 還在接著上次的 sequence number 發(fā)送數(shù)據(jù));
  3. 如果該 PID 存在,先檢查 PID epoch 與 server 端記錄的是否相同;
  4. 如果不同并且 sequence number 不從 0 開始,那么返回 OutOfOrderSequenceException 異常;
  5. 如果不同并且 sequence number 從 0 開始,那么正常寫入;
  6. 如果相同,那么根據(jù)緩存中記錄的最近一次 sequence number(currentLastSeq)檢查是否為連續(xù)(會(huì)區(qū)分為 0、Int.MaxValue 等情況),不連續(xù)的情況下返回 OutOfOrderSequenceException 異常。

考兩個(gè)問題:

  1. Producer 在設(shè)置冪等性時(shí),為什么要求 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 小于等于 5,如果設(shè)置大于 5(不考慮 Producer 端參數(shù)校驗(yàn)的報(bào)錯(cuò)),會(huì)帶來什么后果?

答案: Server 端的 ProducerStateManager 實(shí)例會(huì)緩存每個(gè) PID 在每個(gè) Topic-Partition 上發(fā)送的最近 5 個(gè)batch 數(shù)據(jù).
先來分析一下,在什么情況下 Producer 會(huì)出現(xiàn)亂序的問題?沒有冪等性時(shí),亂序的問題是在重試時(shí)出現(xiàn)的,舉個(gè)例子:client 依然發(fā)送了 6 個(gè)請(qǐng)求 1、2、3、4、5、6(它們分別對(duì)應(yīng)了一個(gè) batch),這 6 個(gè)請(qǐng)求只有 2-6 成功 ack 了,1 失敗了,這時(shí)候需要重試,重試時(shí)就會(huì)把 batch 1 的數(shù)據(jù)添加到待發(fā)送的數(shù)據(jù)列隊(duì)中),那么下次再發(fā)送時(shí),batch 1 的數(shù)據(jù)將會(huì)被發(fā)送,這時(shí)候數(shù)據(jù)就已經(jīng)出現(xiàn)了亂序,因?yàn)?batch 1 的數(shù)據(jù)已經(jīng)晚于了 batch 2-6。

  1. Producer 在設(shè)置冪等性時(shí),如果我們?cè)O(shè)置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 大于 1,那么是否可以保證有序,如果可以,是怎么做到的?

當(dāng) MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 設(shè)置為 1 時(shí),是可以解決這個(gè)為題,因?yàn)橥瑫r(shí)只允許一個(gè)請(qǐng)求正在發(fā)送,只有當(dāng)前的請(qǐng)求發(fā)送完成(成功 ack 后),才能繼續(xù)下一條請(qǐng)求的發(fā)送,類似單線程處理這種模式,每次請(qǐng)求發(fā)送時(shí)都會(huì)等待上次的完成,效率非常差,但是可以解決亂序的問題(當(dāng)然這里有序只是針對(duì)單 client 情況,多 client 并發(fā)寫是無法做到的)


Kafka 2.0.0的優(yōu)化:

簡單來說,其實(shí)現(xiàn)機(jī)制概括為:
1:Server 端驗(yàn)證 batch 的 sequence number 值,不連續(xù)時(shí),直接返回異常;
2:Client 端請(qǐng)求重試時(shí),batch 在 reenqueue 時(shí)會(huì)根據(jù) sequence number 值放到合適的位置(有序保證之一);
3:Sender 線程發(fā)送時(shí),在遍歷 queue 中的 batch 時(shí),會(huì)檢查這個(gè) batch 是否是重試的 batch,如果是的話,只有這個(gè) batch 是最舊的那個(gè)需要重試的 batch,才允許發(fā)送,否則本次發(fā)送跳過這個(gè) Topic-Partition 數(shù)據(jù)的發(fā)送等待下次發(fā)送。

參考:
https://blog.csdn.net/qq_37923600/article/details/88583170

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容