原文地址:https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
筆者做了翻譯和整理,有誤之處請指正。
很高興地告訴大家,具備新的里程碑意義的功能的Kafka 0.11.x版本(對應(yīng) Confluent Platform 3.3)已經(jīng)release,該版本引入了exactly-once語義,本文闡述的內(nèi)容包括:
- Apache Kafka的exactly-once語義;
- 為什么exactly-once是一個很難解決的分布式問題;
- 使用Kafka Stream API來進(jìn)行正確的exactly-once流式處理;
Exactly-once 是真正意義上的難題
從理論上來說,Exactly-once delivery是不可能的,它的代價太高無法實際應(yīng)用到生產(chǎn)環(huán)境,包括業(yè)內(nèi)的大牛Mathias Verroaes也這么認(rèn)為,它是分布式系統(tǒng)中最難解決的唯二問題:

甚至有很多人說這是無法實現(xiàn)的:

但現(xiàn)在,我并不認(rèn)為引入Exactly-once delivery并且支持流處理是一個真正難以解決的問題。首先,讓我們來概述下消息的精確提交語義。
消息語義概述
在分布式系統(tǒng)中,構(gòu)成系統(tǒng)的任何節(jié)點都是被定義為可以彼此獨立失敗的。比如在 Kafka中,broker可能會crash,在producer推送數(shù)據(jù)至topic的過程中也可能會遇到網(wǎng)絡(luò)問題。根據(jù)producer處理此類故障所采取的提交策略類型,我們可以獲得不同的語義:
- at-least-once:如果producer收到來自Kafka broker的確認(rèn)(ack)或者acks = all,則表示該消息已經(jīng)寫入到Kafka。但如果producer ack超時或收到錯誤,則可能會重試發(fā)送消息,客戶端會認(rèn)為該消息未寫入Kafka。如果broker在發(fā)送Ack之前失敗,但在消息成功寫入Kafka之后,此重試將導(dǎo)致該消息被寫入兩次,因此消息會被不止一次地傳遞給最終consumer,這種策略可能導(dǎo)致重復(fù)的工作和不正確的結(jié)果。
- at-most-once:如果在ack超時或返回錯誤時producer不重試,則該消息可能最終不會寫入Kafka,因此不會傳遞給consumer。在大多數(shù)情況下,這樣做是為了避免重復(fù)的可能性,業(yè)務(wù)上必須接收數(shù)據(jù)傳遞可能的丟失。
- exactly-once:即使producer重試發(fā)送消息,消息也會保證最多一次地傳遞給最終consumer。該語義是最理想的,但也難以實現(xiàn),這是因為它需要消息系統(tǒng)本身與生產(chǎn)和消費消息的應(yīng)用程序進(jìn)行協(xié)作。例如如果在消費消息成功后,將Kafka consumer的偏移量rollback,我們將會再次從該偏移量開始接收消息。這表明消息傳遞系統(tǒng)和客戶端應(yīng)用程序必須配合調(diào)整才能實現(xiàn)excactly-once。
必須處理的常見災(zāi)難場景
為了清楚描述實現(xiàn) exactly-once delivery語義的挑戰(zhàn),我們來看一個簡單的例子。
假設(shè)有某個單進(jìn)程producer應(yīng)用在發(fā)送"Hello Kafka"到某個單partition topic(topic_name=EoS),有一個運行在其他節(jié)點的單實例consumer從topic里拉數(shù)據(jù)并進(jìn)行打印。理想情況下如果沒有任何災(zāi)難發(fā)生的話,"Hello Kafka"將會被exactly-once傳遞,consumer獲取消息進(jìn)行消費并提交commit到Kafka去完成這一次消息處理。即使在這之后consumer掛了或者被重啟,也不會再收到這條消息。
然而生產(chǎn)環(huán)境錯綜復(fù)雜,災(zāi)難場景是無法避免的:
- Broker失敗:Kafka,作為一個高可用、持久化系統(tǒng),保證每條消息被持久化并且冗余多份(假設(shè)是n份),所以理論上Kafka可以容忍n-1臺broker宕機(jī)。Kafka的備份機(jī)制保證了一旦消息被成功寫入leader replica,將會把數(shù)據(jù)同步到其他所有replica。
- Producer到Broker的RPC失敗:Kafka的durability特性是基于producer從broker收到的ack的,而沒有收到ack并不代表請求肯定失敗。Broker可能會在消息被寫入之后返回ack之前宕機(jī),同時也可能會在消息被寫入topic之前宕機(jī)。因為producer沒有任何途徑可以得知失敗的真實原因,而只會嘗試重試。在一些場景下,下游consumer會收到若干的重復(fù)數(shù)據(jù)。
- 客戶端也可能會失敗:Exactly-once delivery也必須考慮客戶端失敗的情況。但是我們?nèi)绾稳^(qū)分客戶端是真的掛了(永久性宕機(jī))還是說只是暫時丟失心跳?追求正確性的話,broker應(yīng)該丟棄由zombie producer發(fā)送的消息。 consumer也是如此,一旦新的客戶端實例已經(jīng)啟動,它必須能夠從失敗實例的任何狀態(tài)中恢復(fù),并從安全點(safe checkpoint)開始處理,這意味著消費的偏移量必須始終與生成的輸出保持同步。
Apache Kafka的exactly-once語義
在0.11.x版本之前,Apache Kafka支持at-least-once delivery語義以及partition內(nèi)部的順序delivery,如前所述這在某些場景下可能會導(dǎo)致數(shù)據(jù)重復(fù)消費。而Kafka 0.11.x支持exactly-once語義,不會導(dǎo)致該情況發(fā)生,其中主要包括三個內(nèi)部邏輯的改造:
冪等:partition內(nèi)部的exactly-once順序語義
冪等操作,是指可以執(zhí)行多次,而不會產(chǎn)生與僅執(zhí)行一次不同結(jié)果的操作,Producer的send操作現(xiàn)在是冪等的。在任何導(dǎo)致producer重試的情況下,相同的消息,如果被producer發(fā)送多次,也只會被寫入Kafka一次。要打開此功能,并讓所有partition獲得exactly-once delivery、無數(shù)據(jù)丟失和in-order語義,需要修改broker的配置:enable.idempotence = true。
這個功能如何工作?它的工作方式類似于TCP:發(fā)送到Kafka的每批消息將包含一個序列號,該序列號用于重復(fù)數(shù)據(jù)的刪除。與TCP不同,TCP只能在transient in-memory中提供保證。序列號將被持久化存儲topic中,因此即使leader replica失敗,接管的任何其他broker也將能感知到消息是否重復(fù)。這種機(jī)制的開銷相當(dāng)?shù)停核皇窃诿颗⒅刑砑恿藥讉€額外字段。正如本文稍后將會看到的,該功能僅僅在非冪等producer上增加了可忽略的性能開銷。
事務(wù):跨partition的原子性寫操作
第二點,Kafka現(xiàn)在支持使用新事務(wù)API原子性的對跨partition進(jìn)行寫操作,該API允許producer發(fā)送批量消息到多個partition。該功能同樣支持在同一個事務(wù)中提交消費者offsets,因此真正意義上實現(xiàn)了end-to-end的exactly-once delivery語義。以下是一段示例代碼:
producer.initTransactions();
try {
producer.beginTransaction();
producer.send(record1);
producer.send(record2);
producer.commitTransaction();
} catch(ProducerFencedException e) {
producer.close();
} catch(KafkaException e) {
producer.abortTransaction();
}
該代碼片段描述了如何使用新的producer事務(wù)API原子性的發(fā)送消息至多個partition。值得注意的是,某個Kafka topic partition內(nèi)部的消息可能是事務(wù)完整提交后的消息,也可能是事務(wù)執(zhí)行過程中的部分消息。
而從consumer的角度來看,有兩種策略去讀取事務(wù)寫入的消息,通過"isolation.level"來進(jìn)行配置:
-
read_committed:可以同時讀取事務(wù)執(zhí)行過程中的部分寫入數(shù)據(jù)和已經(jīng)完整提交的事務(wù)寫入數(shù)據(jù); -
read_uncommitted:完全不等待事務(wù)提交,按照offsets order去讀取消息,也就是兼容0.11.x版本前Kafka的語義;
我們必須通過配置consumer端的配置isolation.level,來正確使用事務(wù)API,通過使用 new Producer API并且對一些unique ID設(shè)置transaction.id(該配置屬于producer端),該unique ID用于提供事務(wù)狀態(tài)的連續(xù)性。
Exactly-once 流處理
基于冪等和原子性,通過Streams API實現(xiàn)exactly-once流處理成為可能。如果要在流應(yīng)用中實現(xiàn)相關(guān)語義,只需要配置 processing.guarantee=exactly_once,這會影響所有的流處理環(huán)境中的語義,包括將處理作業(yè)和由加工作業(yè)創(chuàng)建的所有物理狀態(tài)同時寫回到Kafka的操作。
這就是為什么Kafka Streams API提供的exactly-once保證是迄今為止任何流處理系統(tǒng)中的最強(qiáng)實現(xiàn)的原因。 它為以Kafka作為數(shù)據(jù)源的流處理應(yīng)用程序提供端對端的exactly-once保證,Streams應(yīng)用程序?qū)⑷魏蜬afka的物化狀態(tài)在最終環(huán)節(jié)寫回到Kafka。 僅依靠外部數(shù)據(jù)系統(tǒng)實現(xiàn)物化狀態(tài)的流處理系統(tǒng)僅支持對exactly-once的較弱保證。 即使他們使用Kafka作為流處理來源,在需要從故障中恢復(fù)的情況下,也只能rollback他們的Kafka消費者offset以重新消費并處理消息,而不能回滾關(guān)聯(lián)狀態(tài),當(dāng)更新不是冪等的時候會導(dǎo)致結(jié)果不正確。
我來解釋下這段話的細(xì)節(jié)。 流處理系統(tǒng)的關(guān)鍵問題是我的流處理應(yīng)用程序是否獲得正確的答案,即使其中一個實例在處理過程中崩潰,恢復(fù)失敗實例時的關(guān)鍵是把狀態(tài)恢復(fù)到與崩潰前相同。
流處理可以看成是一個關(guān)于Kafka topic的讀寫操作集合, 消費者從Kafka topic讀取消息,其他一些處理邏輯轉(zhuǎn)換消息或修改cpu維護(hù)的狀態(tài),同時生產(chǎn)者將消息寫入另一個Kafka topic。 Exactly-once流處理就是保證讀寫數(shù)據(jù)有且只有一次的一種能力。,在這種情況下,獲得正確結(jié)果意味著不丟失任何輸入消息或產(chǎn)生任何重復(fù)的輸出,而這就是用戶所期望的。
除了我們迄今為止討論的簡單災(zāi)難場景之外,還有許多其他故障情況需要考慮:
- 流處理器可能會從多個source topic獲取輸入,并且跨多個source topic的數(shù)據(jù)排序不是確定的,因此多次運行可能會產(chǎn)生不同的結(jié)果;
- 同樣,流處理器可能產(chǎn)生多個dest topic的輸出。如果生產(chǎn)者無法跨多個topic執(zhí)行原子寫入,如果對某些(但不是全部)分區(qū)的寫入失敗,則producer的輸出可能不正確;
- 流處理器可以使用Streams API提供的managed state facilities去聚合或join多個輸入的數(shù)據(jù)。如果流處理器的一個實例失敗,那么需要能夠回滾該流處理器實例的物化狀態(tài)。在重新啟動實例時,還需要能夠恢復(fù)處理并重新創(chuàng)建其狀態(tài)。
- 流處理器可以查找外部數(shù)據(jù)庫或者調(diào)通服務(wù)來豐富信息?;谕獠糠?wù)的流處理器基本上來說是非確定性的:如果外部服務(wù)在流處理器的兩次運行之間改變其內(nèi)部狀態(tài),則會導(dǎo)致下游的結(jié)果出錯。但是,如果處理正確,則不會導(dǎo)致完全不正確的結(jié)果,而僅僅會導(dǎo)致流處理器的輸出是期望輸出的子集。
特別是當(dāng)與非確定性操作和應(yīng)用程序計算的持久狀態(tài)的更改相結(jié)合時,如果實例失敗或者重新啟動,可能導(dǎo)致數(shù)據(jù)重復(fù)甚至是計算結(jié)果錯誤。
"流處理保證確定性操作exactly-once的正確方法是:保證讀取寫入操作的輸出在任何非災(zāi)難場景下一致。"
針對非確定性操作的exactly-one流處理
Exactly-once流處理對確定性操作是有意義的,但是當(dāng)處理邏輯本身存在不確定的邏輯時呢?假設(shè)有這樣一個場景,流處理器用于計算滿足條件的流入的事件數(shù)量,條件由外部服務(wù)動態(tài)決定。從根本上來說這種操作本質(zhì)上是非決定性的,因為外部服務(wù)指定的條件是不確定的,這可能會導(dǎo)致下游數(shù)據(jù)流得到不同的結(jié)果。那么,對這樣的非確定性操作來說,正確的策略又是什么呢?
"對于非確定性操作來說,正確的處理方式是確保讀取寫入流處理操作的輸出屬于預(yù)期輸出的子集,該集合應(yīng)該可以由非確定性輸入得到的預(yù)期值組合得到。"
因此,對于我們的示例流處理器,假設(shè)當(dāng)前計數(shù)為31,輸入事件值為2,故障時正確輸出只能是31或者33其中一個:如果輸入事件被外部條件指定需要丟棄那么就是31 ,反之則為33。
Kafka的exactly-once保證真的起作用了嗎?
為了回答這個關(guān)于Kafka exactly-once保證的問題,讓我們來看看正確性(也就是我們?nèi)绾卧O(shè)計,構(gòu)建和測試這個功能)和性能。
精妙的設(shè)計和review過程
正確性和性能都從堅實的設(shè)計開始。 大約三年前,我們開始在LinkedIn上進(jìn)行設(shè)計和原型開發(fā)工作。 我們在Confluent上尋求一個優(yōu)雅的方式來將冪等和事務(wù)的功能性要求融合成一個整體的封裝。 我們寫了一個60+頁的設(shè)計文檔,概述了設(shè)計的各個方面:從高級消息流到每個數(shù)據(jù)結(jié)構(gòu)和RPC的細(xì)節(jié)實現(xiàn)細(xì)節(jié)。 經(jīng)過9個月的廣泛公眾監(jiān)督,設(shè)計也從社區(qū)的不斷反饋中大大得到改善。 例如,基于開源討論,我們用更智能的服務(wù)器端過濾替代消費者端緩存以進(jìn)行事務(wù)讀取,從而避免了潛在的性能開銷。 同時,我們也改進(jìn)了事務(wù)與compacted topic,并增加了相應(yīng)的安全機(jī)制。
最終我們機(jī)智地得到了一個極簡設(shè)計,在很大程度上也依賴于強(qiáng)大的Kafka原型:
- 事務(wù)日志是一個Kafka topic,享受到了與生俱來的durability;
- Broker內(nèi)部新增了事務(wù)協(xié)調(diào)線程(用于管理每個生產(chǎn)者的事務(wù)狀態(tài)),自然地利用了Kafka自有的選舉算法來處理failover;
- 對于使用了Kafka Streams API構(gòu)建的流處理應(yīng)用程序,我們會將數(shù)據(jù)透明地fold起來合并成原子性操作以事務(wù)的形式寫入多個分區(qū),從而為讀取寫入操作提供exactly-once保證;
這種足夠簡單、專注于細(xì)節(jié)的設(shè)計,實施效果非常好。
迭代的開發(fā)過程
我們在開發(fā)該功能時,會確保每一個pull request經(jīng)過廣泛的審查。這意味著在幾個月的時間內(nèi)一些pull request經(jīng)歷過幾十次迭代,審查過程中發(fā)現(xiàn)了之前設(shè)計上沒有考慮到的無數(shù)邊界問題。
我們編寫了超過15,000個測試用例,包括分布式測試,運行時的故障測試。該流程揭示了各個方面的問題,從測試工具中的基本編碼錯誤到深奧的NTP同步問題。其中的一個子集是分布式混沌測試,我們?yōu)槎鄠€事務(wù)客戶端提供了一個完整的Kafka集群,通過事務(wù)產(chǎn)生消息,同時讀取這些消息,并在過程中強(qiáng)行終止客戶端或服務(wù)器,以確保數(shù)據(jù)既不丟失也不重復(fù)。
因此經(jīng)過良好測試,高質(zhì)量代碼庫的簡單而堅固的設(shè)計構(gòu)成了我們解決方案的基石。
好消息:Kafka 還是非???!
在設(shè)計此功能時,一個重點是性能的保證:由于exactly-once設(shè)計帶來的性能開銷,我們淘汰了許多更簡單的設(shè)計選型。經(jīng)過多番思考,我們采用的設(shè)計盡可能地使每個事務(wù)的開銷最小(每個分區(qū)約1次寫入,盡可能少的寫入記錄至中心事務(wù)日志)。對于耗時100ms的1KB消息和事務(wù)寫入,與配置為at-least-once并且保序交付(acks = all,max.in.flight.requests.per.connection = 1)的生產(chǎn)者的吞吐量相比吞吐量僅下降3%;與at-most-once并且無排序保證(acks = 1,max.in.flight.requests.per.connection = 5)的生產(chǎn)者的吞吐量相比下降20%。
具體的測試benchmark可以看這里。
除了確保新功能的低性能開銷之外,我們也不希望在沒有使用exactly-once功能的應(yīng)用程序中看到性能有意外損耗。為了確保這一點,我們不僅在Kafka消息頭中添加了一些新的字段來實現(xiàn)exactly-once功能,而且還重新設(shè)計了Kafka消息格式,在網(wǎng)絡(luò)傳輸和磁盤存儲時,更有效地壓縮消息。特別是,我們將一大堆常見的元數(shù)據(jù)轉(zhuǎn)移到批量頭文件中,并將可變長度編碼引入批次中的每個記錄。通過這種批量優(yōu)化,整體信息的size顯著減小。例如,一批7條記錄、每條10個字節(jié)的批量消息,使用新的格式將減少35%的體量,這使得生產(chǎn)者吞吐量提高了20%,處理小消息時提高了50%的消費者吞吐量。任何Kafka 0.11用戶都可以使用此性能提升,即使沒有使用任何exactly-once功能。
我們還著眼于優(yōu)化Streams API中的exactly-once流處理的開銷。 以100ms作為提交間隔的情況下(保證端到端延遲較低的一個值),我們看到吞吐量下降了15%至30%(損耗百分比取決于消息大小,前者為1KB的消息大小,后者為100字節(jié))。 但是,對于>=1KB的消息,30秒的提交間隔是沒有任何吞吐性能損耗的。 在下一個版本中,我們計劃引入推測性執(zhí)行機(jī)制:即使我們使用較大的提交間隔,我們也可以保持端到端的延遲較低,最終我們期望將事務(wù)的開銷降至零。
總而言之,通過從根本上重新調(diào)整我們的一些核心數(shù)據(jù)結(jié)構(gòu),我們在較小的性能損耗下實現(xiàn)了冪等和事務(wù)功能使得Kafka在大部分場景下依然很快。
這個魔法小精靈粉塵可以灑在我的應(yīng)用程序上嗎?
Exacrtly-once處理是一種端到端的保證,在灑上去之前應(yīng)用程序必須保證自身設(shè)計不違反該原則。 如果您使用的是消費者API,則必須保證你提交的應(yīng)用程序狀態(tài)變更和你的偏移量是一致的。
對于流處理應(yīng)用,情況會更好一些。 因為流處理是一個封閉的系統(tǒng),其中輸入、輸出和狀態(tài)修改都在相同的操作中建模,它實際上已經(jīng)類似于exactly-once中的事務(wù),具備原子性了。 配置更改就直接可以為您提供端到端的保證。 但是,您仍然需要從Kafka獲取數(shù)據(jù),當(dāng)與exactly-once的connector組合時,將直接擁有該特性。