Kafka消息送達語義詳解

消息送達語義是消息系統(tǒng)中一個常見的問題,主要包含三種語義:

  • At most once:消息發(fā)送或消費至多一次
  • At least once:消息發(fā)送或消費至少一次
  • Exactly once:消息恰好只發(fā)送一次或只消費一次

下面我們分別從發(fā)送者和消費者的角度來闡述這三種消息送達語義。

Producer

從Producer的角度來看,At most once意味著Producer發(fā)送完一條消息后,不會確認消息是否成功送達。這樣從Producer的角度來看,消息僅僅被發(fā)送一次,也就存在者丟失的可能性。

從Producer的角度來看,At least once意味著Producer發(fā)送完一條消息后,會確認消息是否發(fā)送成功。如果Producer沒有收到Broker的ack確認消息,那么會不斷重試發(fā)送消息。這樣就意味著消息可能被發(fā)送不止一次,也就存在這消息重復的可能性。

從Producer的角度來看,Exactly once意味著Producer消息的發(fā)送是冪等的。這意味著不論消息重發(fā)多少遍,最終Broker上記錄的只有一條不重復的數(shù)據(jù)。

Producer At least once配置

Kafka默認的Producer消息送達語義就是At least once,這意味著我們不用做任何配置就能夠?qū)崿F(xiàn)At least once消息語義。

原因是Kafka中默認acks=1并且retries=2147483647。

Producer At most once配置

我們可以通過配置Producer的以下配置項來實現(xiàn)At most once語義:

  • acks=0acks配置項表示Producer期望的Broker的確認數(shù)。默認值為1??蛇x項:[0,1,all]。如果設置為0,表示Producer發(fā)送完消息后不會等待任何Broker的確認;設置為1表示Producer會等待Broker集群中的leader的確認寫入消息;設置為all表示Producer需要等待Broker集群中l(wèi)eader和其所有follower的確認寫入消息。
  • retries=0。retires配置項表示當消息發(fā)送失敗時,Producer重發(fā)消息的次數(shù)。默認值為2147483647。當配置了acks=0時,retries配置項就失去了作用,因此這兒可以不用配置。

當配置了retires的值后,如果沒有將max.in.flight.requests.per.connection配置的值設置為1,有可能造成消息亂序的結(jié)果。max.in.flight.requests.per.connection配置代表著一個Producer同時可以發(fā)送的未收到確認的消息數(shù)量。如果max.in.flight.requests.per.connection數(shù)量大于1,那么可能發(fā)送了message1后,在沒有收到確認前就發(fā)送了message2,此時message1發(fā)送失敗后觸發(fā)重試,而message2直接發(fā)送成功,就造成了Broker上消息的亂序。max.in.flight.requests.per.connection的默認值為5。

Producer Exactly once配置

Exactly once是Kafka從版本0.11之后提供的高級特性。我們可以通過配置Producer的以下配置項來實現(xiàn)Exactly once語義:

  • enable.idempotence=true。enable.idempotence配置項表示是否使用冪等性。當enable.idempotence配置為true時,acks必須配置為all。并且建議max.in.flight.requests.per.connection的值小于5。
  • acks=all

Kafka如何實現(xiàn)消息發(fā)送冪等

Kafka本身支持At least once消息送達語義,因此實現(xiàn)消息發(fā)送的冪等關鍵是要實現(xiàn)Broker端消息的去重。為了實現(xiàn)消息發(fā)送的冪等性,Kafka引入了兩個新的概念:

  • PID。每個新的Producer在初始化的時候會被分配一個唯一的PID,這個PID對用戶是不可見的。
  • Sequence Numbler。對于每個PID,該Producer發(fā)送數(shù)據(jù)的每個<Topic, Partition>都對應一個從0開始單調(diào)遞增的Sequence Number

Broker端在緩存中保存了這Sequence Numbler,對于接收的每條消息,如果其序號比Broker緩存中序號大于1則接受它,否則將其丟棄。這樣就可以實現(xiàn)了消息重復提交了。但是,只能保證單個Producer對于同一個<Topic, Partition>的Exactly Once語義。不能保證同一個Producer一個topic不同的partion冪等。

Kafka冪等性配置時要求 max.in.flight.requests.per.connection 小于等于 5 的主要原因是:Server 端的 ProducerStateManager 實例會緩存每個 PID 在每個 Topic-Partition 上發(fā)送的最近 5 個batch 數(shù)據(jù)(這個 5 是寫死的,至于為什么是 5,可能跟經(jīng)驗有關,當不設置冪等性時,當這個設置為 5 時,性能相對來說較高,社區(qū)是有一個相關測試文檔,忘記在哪了),如果超過 5,ProducerStateManager 就會將最舊的 batch 數(shù)據(jù)清除。假設應用將 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 設置為 6,假設發(fā)送的請求順序是 1、2、3、4、5、6,這時候 server 端只能緩存 2、3、4、5、6 請求對應的 batch 數(shù)據(jù),這時候假設請求 1 發(fā)送失敗,需要重試,當重試的請求發(fā)送過來后,首先先檢查是否為重復的 batch,這時候檢查的結(jié)果是否,之后會開始 check 其 sequence number 值,這時候只會返回一個 OutOfOrderSequenceException 異常,client 在收到這個異常后,會再次進行重試,直到超過最大重試次數(shù)或者超時,這樣不但會影響 Producer 性能,還可能給 Server 帶來壓力(相當于client 狂發(fā)錯誤請求)。

Consumer

從Consumer的角度來看,At most once意味著Consumer對一條消息最多消費一次,因此有可能存在消息消費失敗依舊提交offset的情況??紤]下面的情況:Consumer首先讀取消息,然后提交offset,最后處理這條消息。在處理消息時,Consumer宕機了,此時offset已經(jīng)提交,下一次讀取消息時讀到的是下一條消息了,這就是At most once消費。

從Consumer的角度來看,At least once意味著Consumer對一條消息可能消費多次??紤]下面的情況:Consumer首先讀取消息,然后處理這條消息,最后提交offset。在處理消息時成功后,Consumer宕機了,此時offset還未提交,下一次讀取消息時依舊是這條消息,那么處理消息的邏輯又將被執(zhí)行一遍,這就是At least once消費。

從Consumer的角度來看,Exactly once意味著消息的消費處理邏輯和offset的提交是原子性的,即消息消費成功后offset改變,消息消費失敗offset也能回滾。

Consumer At least once配置

  • enable.auto.commit=false。禁止后臺自動提交offset。
  • 手動調(diào)用consumer.commitSync()來提交offset。手動調(diào)用保證了offset即時更新。

通過手動提交offset,就可以實現(xiàn)Consumer At least once語義。

Consumer At most once配置

  • enable.auto.commit=true。后臺定時提交offset。
  • auto.commit.interval.ms配置為一個很小的數(shù)值。auto.commit.interval.ms表示后臺提交offset的時間間隔。

通過自動提交offset,并且將定時提交時間間隔設置的很小,就可以實現(xiàn)Consumer At most once語義。

Consumer Exactly once配置

  • isolation.level=read_committed。isolation.level表示何種類型的message對Consumer可見。下面會更詳細的介紹這個參數(shù)。

一個常見的Exactly once的的使用場景是:當我們訂閱了一個topic,然后往另一個topic里寫入數(shù)據(jù)時,我們希望這兩個操作是原子性的,即如果寫入消息失敗,那么我們希望讀取消息的offset可以回滾。

此時可以通過Kafka的Transaction特性來實現(xiàn)。Kafka是在版本0.11之后開始提供事務特性的。我們可以將Consumer讀取數(shù)據(jù)和Producer寫入數(shù)據(jù)放進一個同一個事務中,在事務沒有成功結(jié)束前,所有的這個事務中包含的消息都被標記為uncommitted。只有事務執(zhí)行成功后,所有的消息才會被標記為committed。

我們知道,offset信息是以消息的方式存儲在Broker的__consumer_offsets topic中的。因此在事務開始后,Consumer讀取消息后,所有的offset消息都是uncommitted狀態(tài)。所有的Producer寫入的消息也都是uncommitted狀態(tài)。

而Consumer可以通過配置isolation.level來決定uncommitted狀態(tài)的message是否對Consumer可見。isolation.level擁有兩個可選值:read_committedread_uncommitted。默認值為read_uncommitted。

當我們將isolation.level配置為read_committed后,那么所有事務未提交的數(shù)據(jù)就都對Consumer不可見了,也就實現(xiàn)了Kafka的事務語義。

參考文章

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容