《Kafka官方文檔》——設(shè)計(jì)

<h3>Design</h3>

<h4>1. Motivation</h4>

我們設(shè)計(jì)Kafka用來作為統(tǒng)一的平臺來處理大公司可能擁有的所有實(shí)時數(shù)據(jù)源。為了做到這點(diǎn),我們必須思考大量的使用場景。

它必須有高吞吐去支持大數(shù)據(jù)流,例如實(shí)時日志聚合。

它必須優(yōu)雅的處理數(shù)據(jù)積壓,以支持定期從離線系統(tǒng)加載數(shù)據(jù)。

這也以為這系統(tǒng)必須支持低延遲的分發(fā)來處理傳統(tǒng)消息系統(tǒng)的場景。

我們想支持分區(qū)的、分布式的、實(shí)時的處理數(shù)據(jù)源并創(chuàng)建新的數(shù)據(jù)源,這推動了我們的分區(qū)和消費(fèi)模型。

最后,將流反饋到其他系統(tǒng)進(jìn)行服務(wù)的情況下,我們知道系統(tǒng)必須能夠保證容錯性,在部分機(jī)器故障的時候提供服務(wù)。

支持這些使用推動我們做了一些列特殊的元素設(shè)計(jì),比起傳統(tǒng)的消息系統(tǒng)更像是數(shù)據(jù)庫日志。我們將在以下章節(jié)介紹一些設(shè)計(jì)要素。

<h4>2. Persistence</h4>

Don't fear the filesystem!

Kafka強(qiáng)依賴文件系統(tǒng)來存儲和緩存消息。“磁盤是緩慢的”是一個通常的認(rèn)知,這是人們懷疑持久化的結(jié)構(gòu)能否提供強(qiáng)大的性能。事實(shí)上,磁盤比人們想象的更慢也更快,這基于如何去使用它們;一個合適的設(shè)計(jì)可以使磁盤和網(wǎng)絡(luò)一樣的快速。

影響磁盤性能的核心因素是磁盤驅(qū)動的吞吐和過去十年磁盤的查找方式不同了。使用六個7200rpm的SATA RAID-5陣列的JBOD配置線性寫入能力為600MB/sec,而隨機(jī)寫的性能僅僅是100k/sec,相差了6000倍。線性寫入和讀取是最可預(yù)測的,并且被操作系統(tǒng)大量的優(yōu)化。現(xiàn)代操作系統(tǒng)提供read-ahead和write-behind技術(shù),他們大塊預(yù)讀數(shù)據(jù),并將較小的羅機(jī)械合并成較大的物理寫入。在ACM Queue的文章中可以找到此問題相關(guān)的進(jìn)一步討論;他們實(shí)際上發(fā)現(xiàn)順序訪問磁盤在某些情況下比隨機(jī)訪問內(nèi)存還快。

為了彌補(bǔ)性能的差異,現(xiàn)代操作系統(tǒng)在使用主內(nèi)存來做磁盤緩存時變的越來越激進(jìn)。當(dāng)內(nèi)存被回收時,現(xiàn)代操作系統(tǒng)將樂意將所有可用內(nèi)存轉(zhuǎn)移到磁盤緩存,而且性能會降低很多。所有的磁盤讀寫都需要通過這層緩存。這個功能不會被輕易關(guān)閉,除非使用Direct IO,因此盡管在進(jìn)程內(nèi)緩存了數(shù)據(jù),這些數(shù)據(jù)也有可能在操作系統(tǒng)的pagecache中緩存,從而被緩存了兩次。

此外,我們建立在JVM之上,任何在Java內(nèi)存上花費(fèi)過時間的人都知道兩件事情:

對象的內(nèi)存開銷非常大,通常將存儲的數(shù)據(jù)大小翻倍(或更多)。

Java的內(nèi)存回收隨著堆內(nèi)數(shù)據(jù)的增多變的越來越緩慢。

由于這些因素,使用文件系統(tǒng)并依賴于pagecache要優(yōu)于維護(hù)內(nèi)存中緩存或其他結(jié)構(gòu)——我們至少可以通過直接訪問內(nèi)存來是可用內(nèi)存增加一倍,并通過存儲字節(jié)碼而不是對象的方式來節(jié)約更多的內(nèi)存。這樣做將可以在32G的機(jī)器上使用28-30GB的內(nèi)存,而不需要承受GC的問題。此外,及時重啟服務(wù),內(nèi)存會保持有效,而進(jìn)程內(nèi)緩存將需要重建(對于10G的數(shù)據(jù)可能需要10分鐘),否則需要從冷數(shù)據(jù)加載(可怕的初始化性能)。這也大大簡化了代碼,因?yàn)楸3志彺婧臀募g的一致性是由操作系統(tǒng)負(fù)責(zé)的,這比進(jìn)程中操作更不容易出錯。

這是一個簡單的設(shè)計(jì):在進(jìn)程內(nèi)盡量緩沖數(shù)據(jù),空間不足時將所有數(shù)據(jù)刷寫到磁盤,我們采用了相反的方式。數(shù)據(jù)并盡快寫入一個持久化的日志而不需要立即刷到磁盤。實(shí)際上這只是意味著數(shù)據(jù)被轉(zhuǎn)移到了內(nèi)核的pagecache。

(以pagecache為中心的設(shè)計(jì)風(fēng)格)

Constant Time Suffices

在消息系統(tǒng)中使用持久化數(shù)據(jù)通常是具有關(guān)聯(lián)的BTree或其他隨機(jī)訪問的數(shù)據(jù)結(jié)構(gòu),以維護(hù)消息的元數(shù)據(jù)。BTree是最通用的數(shù)據(jù)結(jié)構(gòu),可以在消息系統(tǒng)中支持各種各樣的語義。BTree的操作時間復(fù)雜度是O(log N)。通常O(log N)被認(rèn)為是固定時間的,但是在磁盤操作中卻不是。每個磁盤一次只能執(zhí)行一個seek,所以并行度受到限制。因此即使少量的磁盤搜索也會導(dǎo)致非常高的開銷。由于操作系統(tǒng)將快速的緩存操作和非常慢的磁盤操作相結(jié)合,所以觀察到樹結(jié)構(gòu)的操作通常是超線性的,因?yàn)閿?shù)據(jù)隨固定緩存增加。

直觀的,持久化隊(duì)列可以像日志的解決方案一樣,簡單的讀取和追加數(shù)據(jù)到文件的結(jié)尾。這個結(jié)構(gòu)的優(yōu)勢是所有的操作都是O(1)的,并且讀取是可以并行不會阻塞的。這具有明顯的性能優(yōu)勢,因?yàn)樾阅芘c數(shù)據(jù)大小完全分離,可以使用低速的TB級SATA驅(qū)動器。雖然這些驅(qū)動器的搜索性能不佳,但是對于大量讀寫而言,他們的性能是可以接受的,并且價(jià)格是三分之一容量是原來的三倍。

無需任何的性能代價(jià)就可以訪問幾乎無限的磁盤空間,這意味著我們可以提供一些在消息系統(tǒng)中非尋常的功能。例如,在Kafka中,我們可以將消息保留較長的時間(如一周),而不是在消費(fèi)后就盡快刪除。這位消費(fèi)者帶來了很大的靈活性。

<h4>3. Efficiency</h4>

我們在效率上付出了很大的努力。主要的用例是處理web的數(shù)據(jù),這個數(shù)據(jù)量非常大:每個頁面可能會生成十幾個寫入。此外我們假設(shè)每個發(fā)布的消息至少被一個Consumer消費(fèi),因此我們盡可能使消費(fèi)的開銷小一些。

從構(gòu)建和運(yùn)行一些類似的系統(tǒng)的經(jīng)驗(yàn)發(fā)現(xiàn),效率是多租戶操作的關(guān)鍵。如果下游基礎(chǔ)服務(wù)成為瓶頸,那么應(yīng)用程序的抖動將會引起問題。我們確保應(yīng)用程序不會引起基礎(chǔ)服務(wù)的Load問題,這個非常重要的,當(dāng)一個集群服務(wù)上百個應(yīng)用程序的時候,因?yàn)閼?yīng)用的使用模式的變化時非常頻繁的。

我們在之前的章節(jié)中討論過磁盤的效率。一旦不良的磁盤訪問模式被消除,這種類型的系統(tǒng)有兩個低效的原因:太多太小的IO操作和過多的數(shù)據(jù)拷貝。

太小的IO操作問題存在于客戶端和服務(wù)端之間,也存在于服務(wù)端自身的持久化當(dāng)中。

為了避免這個問題,我們的協(xié)議圍繞“message set”抽象,通常是將消息聚合到一起。這允許網(wǎng)絡(luò)請求將消息聚合到一起,并分?jǐn)偩W(wǎng)絡(luò)往返的開銷,而不是一次發(fā)送單個消息。服務(wù)端依次將大塊消息追加到日志中,消費(fèi)者一次線性獲取一批數(shù)據(jù)。

這種簡單的優(yōu)化產(chǎn)生了一個數(shù)量級的加速。分批帶來了更大的網(wǎng)絡(luò)包,連續(xù)的磁盤操作,連續(xù)的內(nèi)存塊等等,這些都使得Kafka將隨機(jī)消息寫入轉(zhuǎn)化為線性的寫入并流向Consumer。

其他低效的地方是字符復(fù)制。在消息少時不是問題,但是對負(fù)載的影響是顯而易見的。為了避免這種情況,我們采用被producer、broker、Consumer共享的標(biāo)準(zhǔn)的二進(jìn)制消息格式(所以數(shù)據(jù)可以在傳輸時不需要進(jìn)行修改)。

由Broker維護(hù)的消息日志本身只是一批文件,每個文件由一系列以相同格式寫入的消息構(gòu)成。保持相同的格式保證了最重要的優(yōu)化:網(wǎng)絡(luò)傳輸和持久化日志塊?,F(xiàn)在UNIX操作系統(tǒng)提供了高度優(yōu)化的代碼路徑用于將pagecache的數(shù)據(jù)傳輸?shù)骄W(wǎng)絡(luò);在Linux中,這有sendfile實(shí)現(xiàn)。

要劉姐sendfile的影響,了解從文件到網(wǎng)絡(luò)傳輸數(shù)據(jù)的data path非常重要:

  1. 操作系統(tǒng)從磁盤讀取文件數(shù)據(jù)到pagecache,在內(nèi)核空間
  2. 用戶從內(nèi)核空間將數(shù)據(jù)讀到用戶空間的buffer
  3. 操作系統(tǒng)重新將用戶buffer數(shù)據(jù)讀取到內(nèi)核空間寫入到socket中
  4. 操作系統(tǒng)拷貝socket buffer數(shù)據(jù)到NIC buffer并發(fā)送到網(wǎng)絡(luò)

這顯然是低效的,有四個副本和兩個系統(tǒng)調(diào)用。使用sendfile,允許操作系統(tǒng)直接將數(shù)據(jù)從pagecache寫入到網(wǎng)絡(luò),而避免不必要的拷貝。在這個過程中,只有最終將數(shù)據(jù)拷貝到NIC buffer是必要的。

我們期望一個共同的場景是多個Consumer消費(fèi)一個Topic數(shù)據(jù),使用zero-copy優(yōu)化,數(shù)據(jù)被拷貝到pagecache并且被多次使用,而不是每次讀取的時候拷貝到內(nèi)存。這允許以接近網(wǎng)絡(luò)連接的速度消費(fèi)消息。

pagecache和sendfile的組合意味著在消費(fèi)者追上寫入的情況下,將看不到磁盤上的任何讀取活動,因?yàn)樗麄兌紝木彺孀x取數(shù)據(jù)。

sendfile和更多的zero-copy背景知識見zero-copy

End-to-end Batch Compression

在一些場景下,CPU核磁盤并不是性能瓶頸,而是網(wǎng)絡(luò)帶寬。在數(shù)據(jù)中心和廣域網(wǎng)上傳輸數(shù)據(jù)尤其如此。當(dāng)然,用戶可以壓縮它的消息而不需要Kafka的支持,但是這可能導(dǎo)致非常差的壓縮比,因?yàn)槿哂嗟拇蟛糠质怯捎谙嗤愋偷南⒅g的重復(fù)(例如JSON的字段名)。多個消息進(jìn)行壓縮比單獨(dú)壓縮每條消息效率更高。

Kafka通過允許遞歸消息來支持這一點(diǎn)。一批消息可以一起壓縮并以此方式發(fā)送到服務(wù)端。這批消息將以壓縮的形式被寫入日志,只能在消費(fèi)端解壓縮。

Kafka支持GZIP,Snappy和LZ4壓縮協(xié)議。更多的壓縮相關(guān)的細(xì)節(jié)在這里。

<h4>4. The Producer</h4>

Load balancing

Producer直接向Leader Partition所在的Broker發(fā)送數(shù)據(jù)而不需要經(jīng)過任何路由的干預(yù)。為了支持Producer直接向Leader Partition寫數(shù)據(jù),所有的Kafka服務(wù)節(jié)點(diǎn)都支持Topic Metadata的請求,返回哪些Server節(jié)點(diǎn)存活的、Partition的Leader節(jié)點(diǎn)的分布情況。

由客戶端控制將數(shù)據(jù)寫到哪個Partition。這可以通過隨機(jī)或者一些負(fù)載均衡的策略來實(shí)現(xiàn)(即客戶端去實(shí)現(xiàn)Partition的選擇策略)。Kafka暴露了一個接口用于用戶去指定一個Key,通過Key hash到一個具體的Partition。例如,如果Key是User id,那么同一個User的數(shù)據(jù)將被發(fā)送到同一個分區(qū)。這樣就允許消費(fèi)者在消費(fèi)時能夠?qū)οM(fèi)的數(shù)據(jù)做一些特定的處理。這樣的設(shè)計(jì)被用于處理“局部敏感”的數(shù)據(jù)(結(jié)合上面的場景,Partition內(nèi)的數(shù)據(jù)是可以保持順序消費(fèi)的,那么同一個用戶的數(shù)據(jù)在一個分區(qū),那么就可以保證對任何一個用戶的處理都是順序的)。

Asynchronous send

批處理是提升效率的主要方式一致,為了支持批處理,Kafka允許Producer在內(nèi)存聚合數(shù)據(jù)并在一個請求中發(fā)出。批處理的大小可以是通過消息數(shù)量指定的,也可以是通過等待的時間決定的(例如64K或者10ms)。這樣允許聚合更多的數(shù)據(jù)后發(fā)送,減少了IO操作。緩沖的數(shù)據(jù)大小是可以配置了,這樣能適當(dāng)增加延遲來提升吞吐。

更多的細(xì)節(jié)可以在Producer的配合和API文檔中找到。

<h4>5 The Consumer</h4>

Kafka Consumer通過給Leader Partition所在的Broker發(fā)送“fetch”請求來進(jìn)行消費(fèi)。Consumer在請求中指定Offset,并獲取從指定的Offset開始的一段數(shù)據(jù)。因此Consumer對消費(fèi)的位置有絕對的控制權(quán),通過重新設(shè)置Offset就可以重新消費(fèi)數(shù)據(jù)。

Push vs Pull

我們考慮的一個初步問題是Consumer應(yīng)該從Broker拉取數(shù)據(jù)還是Broker將數(shù)據(jù)推送給Consumer。在這方面,Kafka和大多數(shù)消息系統(tǒng)一樣,采用傳統(tǒng)的設(shè)計(jì)方式,由Producer想Broker推送數(shù)據(jù),Consumer從Broker上拉取數(shù)據(jù)。一些日志中心系統(tǒng),如Scribe和Apache Flume,遵循數(shù)據(jù)向下游推送的方式。兩種方式各有利弊。基于推送的方式,由于是由Broker控制速率,不能很好對不同的Consumer做處理。Consumer的目標(biāo)通常是以最大的速率消費(fèi)消息,不幸的是,在一個基于推送的系統(tǒng)中,當(dāng)Consumer消費(fèi)速度跟不上生產(chǎn)速度
時,推送的方式將使Consumer“過載”?;诶〉南到y(tǒng)在這方面做的更好,Consumer只是消費(fèi)落后并在允許時可以追上進(jìn)度。消費(fèi)者通過某種協(xié)議來緩解這種情況,消費(fèi)者可以通過這種方式來表明它的負(fù)載,這讓消費(fèi)者獲得充分的利用但不會“過載”。以上原因最終使我們使用更為傳統(tǒng)的Pull的方式。

Pull模型的另一個優(yōu)勢是可以聚合數(shù)據(jù)批量發(fā)送給Consumer。Push模型必須考慮是立即推送數(shù)據(jù)給Consumer還是等待聚合一批數(shù)據(jù)之后發(fā)送。如果調(diào)整為低延遲,這將導(dǎo)致每次只發(fā)送一條消息(增加了網(wǎng)絡(luò)交互)?;赑ull的模式,Consumer每次都會盡可能多的獲取消息(受限于可消費(fèi)的消息數(shù)和配置的每一批數(shù)據(jù)最大的消息數(shù)),所以可以優(yōu)化批處理而不增加不必要的延遲。

基于Pull模式的一個缺陷是如果Broker沒有數(shù)據(jù),Consumer可能需要busy-waiting的輪訓(xùn)方式來保證高效的數(shù)據(jù)獲?。ㄔ跀?shù)據(jù)到達(dá)后快速的響應(yīng))。為了避免這種情況,我們在Pull請求中可以通過參數(shù)配置“l(fā)ong poll”的等待時間,可以在服務(wù)端等待數(shù)據(jù)的到達(dá)(可選的等待數(shù)據(jù)量的大小以保證每次傳輸?shù)臄?shù)據(jù)量,減少網(wǎng)絡(luò)交互)。

你可以想象其他一些從端到端,采用Pull的可能的設(shè)計(jì)。Producer把數(shù)據(jù)寫到本地日志,Broker拉取這些Consumer需要的數(shù)據(jù)。一個相似的被稱為“store-and-forward”的Producer經(jīng)常被提及。這是有趣的,但是我們覺得不太適合我們可能會有成千上萬個Producer的目標(biāo)場景。我們維護(hù)持久化數(shù)據(jù)系統(tǒng)的經(jīng)驗(yàn)告訴我們,在系統(tǒng)中使多應(yīng)用涉及到上千塊磁盤將會使事情變得不可靠并且會使操作它們變成噩夢。最后再實(shí)踐中,我們發(fā)現(xiàn)可以大規(guī)模的運(yùn)行強(qiáng)大的SLAs通道,而不需要生產(chǎn)者持久化。

Consumer Position

記錄哪些消息被消費(fèi)過是消息系統(tǒng)的關(guān)鍵性能點(diǎn)。

大多數(shù)消息系統(tǒng)在Broker上保存哪些消息已經(jīng)被消費(fèi)的元數(shù)據(jù)。也就是說,Broker可以在消費(fèi)傳遞給Consumer后立即記錄或等待消費(fèi)者確認(rèn)之后記錄。這是一個直觀的選擇,并且對于單個服務(wù)器而言并沒有更好的方式可以存儲這個狀態(tài)。大多數(shù)消息系統(tǒng)中的存儲設(shè)備并不能很好的伸縮,所以這也是務(wù)實(shí)的選擇——當(dāng)Broker確認(rèn)消息被消費(fèi)后就立即刪除,以保證存儲較少的數(shù)據(jù)。

讓Broker和Consumer關(guān)于那些消息已經(jīng)被消費(fèi)了達(dá)成一致并不是一個簡單的問題。如果Broker在將消息寫到網(wǎng)絡(luò)之后就立即認(rèn)為消息已經(jīng)被消費(fèi),那么如果Consumer消費(fèi)失?。–onsumer在消費(fèi)消息之前Crash或者網(wǎng)絡(luò)問題等)消息將丟失。為了解決這個問題,一些消息系統(tǒng)增加了ACK機(jī)制,消息被標(biāo)記為只是發(fā)送出去而不是已經(jīng)被消費(fèi),Broker需要等待Consumer發(fā)送的ACK請求之后標(biāo)記具體哪些消息已經(jīng)被消費(fèi)了。這個策略修復(fù)了消息丟失的問題,但是引起了新的問題。第一,如果Consumer處理了消息,但是在發(fā)送ACK給Broker之前出現(xiàn)問題,那么消息會被重復(fù)消息。第二,Broker需要維護(hù)每一條消息的多個狀態(tài)(是否被發(fā)送、是否被消費(fèi))。棘手的問題是要處理被發(fā)送出去但是沒有被ACK的消息。

Kafka采用不同的方式處理。Topic被劃分為多個內(nèi)部有序的分區(qū),每個分區(qū)任何時刻只會被一個group內(nèi)的一個Consumer消費(fèi)。這意味著一個Partition的Position信息只是一個數(shù)字,標(biāo)識下一條要消費(fèi)的消息的偏移量。這使得哪些消息已經(jīng)被消費(fèi)的狀態(tài)變成了一個簡單的數(shù)據(jù)。這個位置可以定期做CheckPoint。這使得消息的ACK的代價(jià)非常小。

這個方案還有其他的好處。消費(fèi)者可以優(yōu)雅的指定一個舊的偏移量并重新消費(fèi)這些數(shù)據(jù)。這和通常的消息系統(tǒng)的觀念相違背,但對很多消費(fèi)者來說是一個很重要的特性。比如,如果Consumer程序存在BUG,在發(fā)現(xiàn)并修復(fù)后,可以通過重新消費(fèi)來保證數(shù)據(jù)都正確的處理。

Offline Data Load

可擴(kuò)展的持久化存儲的能力,是消費(fèi)者可以定期的將數(shù)據(jù)導(dǎo)入到像Hadoop這樣的離線系統(tǒng)或關(guān)系型數(shù)據(jù)倉庫中。

在Hadoop的場景中,我們通過把數(shù)據(jù)分發(fā)到獨(dú)立的任務(wù)中進(jìn)行并行處理,按照node/topic/partition組合,充分使用另行能力加載數(shù)據(jù)。Hadoop提供任務(wù)管理,失敗的任務(wù)可以重新啟動,而不需要擔(dān)心重復(fù)數(shù)據(jù)的危險(xiǎn)——任務(wù)會從原始位置重新啟動。

<h4>6. Message Delivery Semantics</h4>

現(xiàn)在我們對Producer和Consumer已經(jīng)有了一定的了解,接著我們來討論Kafka在Producer和Consumer上提供的語義。顯然的,在分發(fā)消息時是可以有多種語義的:

  • At most once:消息可能丟失,但不會重復(fù)投遞
  • At least once:消息不會丟失,但可能會重復(fù)投遞
  • Exactly once:消息不丟失、不重復(fù),會且只會被分發(fā)一次(真正想要的)

值得注意的是這分為兩個問題:發(fā)布消息的可用性和消費(fèi)消息的可用性。

許多系統(tǒng)都聲稱提供“exactly once”語義,仔細(xì)閱讀會發(fā)現(xiàn),這些聲明是誤導(dǎo)的(他們沒有考慮Producer和Consumer可能Crash的場景,或是數(shù)據(jù)寫入磁盤后丟失的情況)。

Kafka提供的語義是直接了當(dāng)?shù)摹0l(fā)送消息的時候我們有一個消息被Commit到Log的概念。一旦消息已經(jīng)被Commit,它將不會丟失,只要還有一個復(fù)制了消息所在Partition的Broker存活著?!按婊睢钡亩x以及我們覆蓋的失敗的情況將在下一節(jié)描述?,F(xiàn)在假設(shè)一個完美的Broker,并且不會丟失,來理解對Producer和Consumer提供的語義保證。如果Producer發(fā)送一條消息,并且發(fā)生了網(wǎng)絡(luò)錯誤,我們是不能確認(rèn)錯誤發(fā)生在消息Commit之前還是消息Commit之后的。類似于使用自增主鍵插入數(shù)據(jù)庫,是不能確認(rèn)寫入之后的主鍵值的。

Producer沒有使用的強(qiáng)制可能的語義。我們無法確認(rèn)網(wǎng)絡(luò)是否會發(fā)生異常,可以使Producer創(chuàng)建有序的主鍵使重試發(fā)送成為冪等的行為。這個特性對一個復(fù)制系統(tǒng)來說不是無價(jià)值的,因?yàn)榉?wù)器在發(fā)生故障的情況下依舊需要提供服務(wù)。使用這個功能,Producer可以重試,直到收到消息成功commit的響應(yīng),在這個點(diǎn)上保證消息發(fā)送的exactly once。我們希望把這個特性加到后續(xù)的Kafka版本中。

不是所有的場景都需要這樣的保證。對應(yīng)延遲敏感的場景,我們允許Producer指定其期望的可用性級別。如果Producer期望等待消息Commit,那么這可能消耗10ms。Producer也可以指定以異步的方式發(fā)送消息或只等Leader節(jié)點(diǎn)寫入消息(不能Follower)。

接著我們從消費(fèi)者的視角來描述語義。所有的副本都擁有偏移量相同的日志。Consumer控制它在日志中的偏移量。如果Consumer一直正常運(yùn)行,它可以只把偏移量存儲在內(nèi)存中,但是如果Consumer crash且我們期望另一個新的Consumer接管消費(fèi),那么需要選擇一個位置來開始消費(fèi)。假設(shè)Consumer讀取了一些消息——它有集中處理消息和位置的方式。

它可以讀取消息,然后保存位置信息,然后處理消息。在這個場景中,Consumer可能在保存位置信息后消費(fèi)消息失敗,那么下一次消費(fèi)可能從保存的位點(diǎn)開始,盡管之前部分消息被處理失敗。這是消費(fèi)處理過程中失敗的at-most-once(只被處理了一次,但是可能處理失?。?。

它可以讀取消息,之后處理消息,最后保存位置信息。這個場景中,Consumer可能在處理完消息,但是保存位點(diǎn)之前Crash,那么下一次會重新消費(fèi)這些消息,盡管已經(jīng)被消費(fèi)過。這是Consumer Crash引起的at-least-once(消息可能會被處理多次)。

在很多場景沖,消息可以有一個逐漸,這樣可以保證處理的冪等性(多次處理不會有影響)。

那么什么是exactly once語義?這里的限制實(shí)際上不是消息系統(tǒng)的特性,而是消息處理和位置信息的保存。經(jīng)典的解決方案是采用兩階段提交的方式來處理。但是這也可以用一個更簡單的方式來處理:通過將消息處理結(jié)果和位置信息保存在同一位置上。這是更好的,因?yàn)楹芏郈onsumer期望寫入的系統(tǒng)并不支持兩階段提交。例如, 我們的hadoop ETL工具從保存數(shù)據(jù)到dhfs上的同時也把位移位置也保存到hdfs中了, 這樣可以保證數(shù)據(jù)和位移位置同時被更新或者都沒更新.我們在很多系統(tǒng)上使用類似的模式, 用于解決那些需要這種強(qiáng)語義但是卻沒有主鍵用于區(qū)分重復(fù)的儲存系統(tǒng)中.

默認(rèn)Kafka提供at-least-once語義的消息分發(fā),允許用戶通過在處理消息之前保存位置信息的方式來提供at-most-once語義。exactly-once語義需要和輸出系統(tǒng)像結(jié)合,Kafka提供的offset可以使這個實(shí)現(xiàn)變的“直接了當(dāng)?shù)摹保ㄗ兊帽容^簡單)。

<h4>7. Replication</h4>

Kafka為Topic的每個Partition日志進(jìn)行備份,備份數(shù)量可以由用戶進(jìn)行配置。這保證了系統(tǒng)的自動容錯,如果有服務(wù)器宕機(jī),消息可以從剩余的服務(wù)器中讀取。

其他消息系統(tǒng)提供了備份相關(guān)的功能,但在我們看來,這是一個附加的功能,不能被大量使用,并且伴隨著大量的缺點(diǎn):Slave是不活躍的(這里應(yīng)該是指Slave只提供了備份,并不可以被消費(fèi)等等)、吞吐受到很大的影響、需要手動配置等等。在Kafka中,我們默認(rèn)就提供備份,實(shí)際上我們認(rèn)為沒有備份的Topic是一種特殊的備份,只是備份數(shù)為1。

備份的單位是Topic的分區(qū)。在沒有發(fā)生異常的情況下,Kafka中每個分區(qū)都會有一個Leader和0或多個Follower。備份包含Leader在內(nèi)(也就是說如果備份數(shù)為3,那么有一個Leader Partition和兩個Follower Partition)。所有的讀寫請求都落在Leader Partition上。通常情況下分區(qū)要比Broker多,Leader分區(qū)分布在Broker上。Follower上的日志和Leader上的日志相同,擁有相同的偏移量和消息順序(當(dāng)然,在特定時間內(nèi),Leader上日志會有一部分?jǐn)?shù)據(jù)還沒復(fù)制到Follower上)。

Follower作為普通的Consumer消費(fèi)Leader上的日志,并應(yīng)用到自己的日志中。Leader允許Follower自然的,成批的從服務(wù)端獲取日志并應(yīng)用到自己的日志中。

大部分分布式系統(tǒng)都需要自動處理故障,需要對節(jié)點(diǎn)“alive”進(jìn)行精確的定義。例如在Kafka中,節(jié)點(diǎn)存活需要滿足兩個條件:

  1. 節(jié)點(diǎn)需要保持它和ZooKeeper之間的Session(通過ZK的心跳機(jī)制)
  2. 如果是Follower,需要復(fù)制Leader上的寫事件,并且復(fù)制進(jìn)度沒有“落后太多”

我們稱滿足這兩個條件的節(jié)點(diǎn)為“同步的”來避免使用“alive”或“failed”這樣模糊的概念。Leader節(jié)點(diǎn)保存同步中的Follower節(jié)點(diǎn)。如果一個Follower宕機(jī)或復(fù)制落后太多,Leader將從同步的Follower List中將其移除。通過replica.lag.time.max.ms配置來定義“落后太多”。

在分布式系統(tǒng)的術(shù)語中,我們只嘗試處理“失敗/恢復(fù)”模型——節(jié)點(diǎn)突然停止工作之后恢復(fù)的情況。Kafka不處理“拜占庭”問題。

一條消息在被應(yīng)用到所有的備份上之后被認(rèn)為是“已經(jīng)提交的”。只有提交了的消息會被Consumer消費(fèi)。這意味著Consumer不需要擔(dān)心Leader節(jié)點(diǎn)宕機(jī)后消息會丟失。另一方面,Producer可以配置是否等待消息被提交,這取決于他們在延遲和可用性上的權(quán)衡。這個可以通過Producer的配置項(xiàng)中設(shè)置。

Kafka提供了一條消息被提交之后,只要還有一個備份可用,消息就不會丟失的保證。

Kafka保證在節(jié)點(diǎn)故障后依舊可用,但是無法再網(wǎng)絡(luò)分區(qū)的情況下保持可用。

Replicated Logs: Quorums, ISRs, and State Machines (Oh my!)

Kafka分區(qū)機(jī)制的核心是日志復(fù)制。日志復(fù)制是分布式系統(tǒng)中最基礎(chǔ)的東西,有很多方式可以實(shí)現(xiàn)。日志復(fù)制可以作為基于狀態(tài)機(jī)的分布式系統(tǒng)的基礎(chǔ)設(shè)置。

日志復(fù)制模型用于處理連續(xù)、有序的輸入(例如給log entry添加0、1、2這樣的編號)。有很多方式實(shí)現(xiàn)日志復(fù)制,最簡單的方式是Leader選擇和提供這個順序之。只要Leader節(jié)點(diǎn)存活,F(xiàn)ollower只需要按照Leader選擇的值和順序來復(fù)制即可。

當(dāng)然,如果Leader不會宕機(jī),那我們也不需要Follower了!在Leader宕機(jī)之后,我們需要在Follower中選擇一個節(jié)點(diǎn)成為新的Leader。Follower可能會宕機(jī)或者日志落后較多,所以我們必須確保選擇一個“及時同步”(復(fù)制進(jìn)度和Leader最近的節(jié)點(diǎn))成為新的Leader。復(fù)制算法必須提供這樣的保證:如果Client收到一條消息已經(jīng)被Commit了,如果Leader宕機(jī),新Leader必須包含這條已經(jīng)被Commit的消息。這是一個權(quán)衡:Leader在確認(rèn)消息Commit之前需要等待更多的Follower來確認(rèn)復(fù)制了消息來保證在Leader宕機(jī)后有更多可以成為Leader的Follower節(jié)點(diǎn)。

如果你選擇了所需要的ACK的數(shù)量以及選擇Leader時需要比較的日志數(shù)以確保能重合,這個叫做Quorum。

一個通用的來權(quán)衡的方式是提交日志和選擇Leader時都采用大多數(shù)投票的原則。這不是Kafka使用的方式,但是無所謂,讓我們?nèi)ダ斫膺@種方式來了解實(shí)現(xiàn)原理。假設(shè)一共有2f+1個備份,那么f+1的副本必須在Leader提交commit之前接收到消息,這樣就可以從f+1個節(jié)點(diǎn)中選擇出新節(jié)點(diǎn)作為Leader。因?yàn)槿魏蝔+1個節(jié)點(diǎn),必然有一個節(jié)點(diǎn)包含最全的日志。還有很多關(guān)于這個算法的細(xì)節(jié)需要處理(如何定義日志更全、在Leader節(jié)點(diǎn)宕機(jī)時保持日志一致性等)在這里先忽略。

大多數(shù)選票的方法有非常好的特性:延遲取決于同步最快的Server節(jié)點(diǎn)。這說明,如果備份數(shù)為3,那么延遲取決于兩個備份節(jié)點(diǎn)中較快的節(jié)點(diǎn)。

有很多類似的算法變體,例如ZooKeeper的Zab,Raft,Viewstamped Replication等。和Kafka最相似的學(xué)術(shù)刊物是微軟的PacificA。

大多數(shù)選票方式的取消是它不能容忍很多的故障,導(dǎo)致你沒有可以被選為新Leader的節(jié)點(diǎn)。為了容忍一個節(jié)點(diǎn)故障,需要3分?jǐn)?shù)據(jù)備份,容忍兩個節(jié)點(diǎn)故障則需要5個節(jié)點(diǎn)。在我們的經(jīng)驗(yàn)中,只有足夠的冗余才能容忍單一的故障在實(shí)際系統(tǒng)中是不夠的,每次寫5次副本,使用5倍的存儲空間,和1/5的帶寬,在大體量的數(shù)據(jù)存儲上不是很可行。這就是為什么quorum算法多多應(yīng)用在像ZK這樣存儲配置的集群中,而不是數(shù)據(jù)存儲系統(tǒng)中。例如HDFS的namenode的高可用建立在大多數(shù)選票的機(jī)制上,但是數(shù)據(jù)存儲缺不是。

Kafka使用一個明顯不同的方式來選擇quorum集合。代替大多數(shù)選票,Kafka動態(tài)的維護(hù)一個“同步的備份(in-sync replicas ISR)”的集合。只有這個集合中的成員能被選舉為Leader。一個寫入請求需要同步到所有的同步中的備份才能認(rèn)為是提交的。ISR集合在變更時會被持久化到ZK。因此,任何ISR中的備份都可以被選舉為新的Leader。這對于Kafka這種擁有多分區(qū)并且需要保證這節(jié)點(diǎn)負(fù)載均衡的模型來說非常重要。使用ISR模型和f+1個副本,Kafka可以容忍f個備份不可用的情況。

對于大多數(shù)的場景,我們認(rèn)為這樣的妥協(xié)是合理的。在實(shí)踐中,為了容忍f個節(jié)點(diǎn)故障,大多數(shù)選票原則和ISR方式都需要等待相同的備份在提交消息前進(jìn)行確認(rèn)(如需要容忍一個節(jié)點(diǎn)故障,大多數(shù)選票的選擇需要3個節(jié)點(diǎn),并且提交消息需要至少一個備份的確認(rèn);ISR只需要兩個節(jié)點(diǎn),需要確認(rèn)的副本數(shù)一樣是一個)。相對于大多數(shù)選票的原則,ISR方式不需要等待最慢的服務(wù)器確認(rèn)消息是一個優(yōu)勢。盡管如此,我們進(jìn)行改善,讓客戶端決定是否等待消息提交,使用較小的副本數(shù),這樣帶來的吞吐和更小的磁盤空間要求是有價(jià)值的。

另一個重要的設(shè)計(jì)是Kafka不需要故障的節(jié)點(diǎn)恢復(fù)所有的數(shù)據(jù)。這是不常見的,復(fù)制算法依賴于存儲介質(zhì)在任何故障的情況下都不丟失數(shù)據(jù)并且不違反一致性原則。這個假設(shè)有兩個主要的問題。第一,磁盤故障是持久化數(shù)據(jù)系統(tǒng)中最常見的問題,并且它通常導(dǎo)致數(shù)據(jù)不完整。第二,即使這不是一個問題,我們也不希望在每一次寫入之后都使用fsync來保證一致性,這會使性能下降兩三個數(shù)量級。我們的協(xié)議中允許一個副本重新加入到ISR集合中,在重新加入之前,它需要從新同步在故障時丟失的數(shù)據(jù)。

<h4>Unclean leader election: What if they all die?</h4>

Kafka保證的數(shù)據(jù)不丟失,在至少有一個備份保持同步的情況下。如果一個分區(qū)所有的備份的節(jié)點(diǎn)都故障,那么就不能提供這個保障了。

但是實(shí)踐系統(tǒng)中需要一些合理的事情,在所有備份故障時。如果不巧遇上這個問題,去考慮哪些情況會發(fā)生是非常重要的。有兩種方式去做:

  1. 等待一個ISR中的副本恢復(fù)并將其選舉為新的Leader(期望它擁有所有的數(shù)據(jù))。
  2. 選擇第一個副本(無需在ISR中)作為Leader。

這是在可用性和一致性之間的權(quán)衡。如果我們等待ISR中的備份恢復(fù),那么會在這個期間一直不可用。如果這樣的副本被損壞,那么我們將永久性的失效。另一方便,如果使用不在ISR中的備份成為Leader,盡管它可能不包含所有的日志。默認(rèn)情況下,Kafka使用第二種策略,當(dāng)所有ISR中的備份不可用時,傾向于選擇可能不一致的備份。這個方式可以通過unclean.leader.election.enable配置禁用,在哪些停機(jī)時間優(yōu)于不一致的場景。

這種困境不是kafka特有的, 這存在于任何基于quorum方式的結(jié)構(gòu)中. 例如, 多數(shù)投票算法, 如果大多數(shù)的服務(wù)器都永久性失效了, 你必須選擇丟失全部的數(shù)據(jù)或者接受某一臺可能數(shù)據(jù)不一致的服務(wù)器上的數(shù)據(jù).

Availability and Durability Guarantees

在向Kafka寫入數(shù)據(jù)時,Producer可以選擇是否等待0,1或(-1)個備份響應(yīng)。注意,這里說的“被所有備份響應(yīng)”不是說被所有分配的備份響應(yīng),默認(rèn)情況下只的時所有ISR集合中的備份響應(yīng)。例如,如果一個Topic配置成只需要兩個備份,并且一個備份故障了,那么寫入一個備份即認(rèn)為收到了所有響應(yīng)。但是,如果這個備份也故障了,那么數(shù)據(jù)會丟失。這樣保證了分區(qū)的最大可用,但是可能不是那些相對于可用性更需要可靠性的用戶的需求。因此,我們提供兩種Topic級別的配置,相對于可用性,優(yōu)先保證可靠性:

  1. 禁用unclean leader election;如果所有備份不可用,那么分區(qū)保持不可用,直到最近的Leader重新恢復(fù)可用。這可能導(dǎo)致不可用,但是不會丟失數(shù)據(jù)。

  2. 配置一個最小的ISR大?。环謪^(qū)只會在滿足最小ISR的情況下接受請求,這樣可以避免數(shù)據(jù)只寫入一個備份,而這個備份后續(xù)故障導(dǎo)致數(shù)據(jù)丟失。這個配置只在Producer使用acks=all的配置時有效。這個配置在一致性和可用性上做了權(quán)衡。更大的ISR提供了更好的一致性,但是降低了可用性,如果同步備份數(shù)小于最小ISR配置時將不可用。

Replica Management

以上的討論都是基于一個日志,即一個Topic的分區(qū)考慮的。但是Kafka集群擁有成百上千這樣的分區(qū)。我們嘗試使用輪訓(xùn)的方式來平衡分區(qū),避免高數(shù)量的Topic的分區(qū)集中在一部分少量的節(jié)點(diǎn)上。同樣我們要平衡所有Leader分區(qū),這樣每個節(jié)點(diǎn)上承載的主分區(qū)都有一定的比例。

優(yōu)化Leader的選舉過程也是非常重要的,因?yàn)檫@是系統(tǒng)不可用的窗口期。一個直觀的實(shí)現(xiàn)是,如果一個節(jié)點(diǎn)故障了,為這個節(jié)點(diǎn)上所有的分區(qū)都獨(dú)立的執(zhí)行一次選舉。代替這種方式,我們選擇一個Broker作為Controller,Controller負(fù)責(zé)一個故障節(jié)點(diǎn)影響的所有分區(qū)的Leader變更。這樣的好處是我們可以批量處理,減少獨(dú)立選舉時大量的通知,這使得大量分區(qū)需要選舉時變得更快,代價(jià)更小。如果Controller故障了,剩余的Broker中會有一個節(jié)點(diǎn)成為新的Controller。

<h4>8 Log Compaction</h4>

日志壓縮確保Kafka會為一個Topic分區(qū)數(shù)據(jù)日志中保留至少message key的最后一個值。它解決了應(yīng)用crash或系統(tǒng)故障或應(yīng)用在操作期間重啟來重新加載緩存的場景。讓我們深入到細(xì)節(jié)中解釋日志壓縮是如何工作的。

到屋面位置,我們只說明了在一斷時間或達(dá)到特定大小的時候丟棄就日志的簡單方法。這適用于想日志這樣每一條數(shù)據(jù)都是獨(dú)立數(shù)據(jù)的情況。但是重要類別的數(shù)據(jù)是根據(jù)key處理的數(shù)據(jù)(例如DB中表的變更數(shù)據(jù))。

讓我們來討論這樣一個具體的流的例子。一個Topic包含了用戶email address信息;每一次用戶變更郵箱地址,我們都像這個topic發(fā)送一條消息,使用用戶ID作為primay key。現(xiàn)在我們已經(jīng)為用戶ID為123的用戶發(fā)送了一些消息,每條消息包含了email address的變更:

123 => bill@microsoft.com

123 => bill@gatesfoundation.org

123 => bill@gmail.com

日志壓縮為我們提供了更精細(xì)的保留機(jī)制,至少保存每個key最后一個變更(如123 => bill@gmail.com)。這樣做我們確保了這個日志包含了所有key最后一個值的快照。這樣Consumer可以重建狀態(tài)而不需要保留完成的變更日志。

讓我們列一些日志壓縮有用的場景,然后看他是如果被使用的。

  1. DB變更訂閱。這是很常見的,一個數(shù)據(jù)在多個數(shù)據(jù)系統(tǒng)中,而且其中一個系統(tǒng)是數(shù)據(jù)庫類型的(如RDBMS或KV系統(tǒng))。例如可能有一個數(shù)據(jù)庫,一個戶緩存系統(tǒng),一個搜索集群,一個Hadoop集群。DB的任何一個變更需要反映到緩存、搜索集群,最終保存到Hadoop中。在這個場景中,你只需要實(shí)時系統(tǒng)最新的更新日志。但是如果需要重新加載緩存或恢復(fù)宕機(jī)的檢索節(jié)點(diǎn),就需要完整的數(shù)據(jù)。

  2. 事件源。這是一種應(yīng)用設(shè)計(jì)風(fēng)格,它將查詢處理和應(yīng)用程序設(shè)計(jì)結(jié)合到一起,并使用日志作為程序的主要存儲。

  3. 高可用日志。一個本地集成程序可以通過變更日志來做到容錯,這樣另一個程序能夠在當(dāng)前程序故障時繼續(xù)處理。例如, 像流數(shù)據(jù)查詢例子, 如計(jì)數(shù), 匯總或其他的分組操作. 實(shí)時系統(tǒng)框架如Samza, 就是為了達(dá)到這個目的使用這個特性的。

在這些場景中,主要處理實(shí)時的變更,但有時需要重新加載或重新處理時,需要加載所有數(shù)據(jù)。日志壓縮允許使用相同的Topic來支持這些場景,這種日志使用風(fēng)格在后續(xù)的內(nèi)容中會更詳細(xì)的描述。

想法很簡單,我們有有無限的日志,以上每種情況記錄變更日志,我們從一開始就捕獲每一次變更。使用這個完整的日志,我們可以通過回放日志來恢復(fù)到任何一個時間點(diǎn)的狀態(tài)。這種假設(shè)的情況下,完整的日志是不實(shí)際的,對于那些每一行記錄會變更多次的系統(tǒng),即使數(shù)據(jù)集很小,日志也會無限的增長下去。丟棄舊日志的簡單操作可以限制空間的增長,但是無法重建狀態(tài)——因?yàn)榕f的日志被丟棄,可能一部分記錄的狀態(tài)會無法重建(這寫記錄所有的狀態(tài)變更都在就日志中)。

日志壓縮機(jī)制是更細(xì)粒度的,每個記錄都保留的機(jī)制,而不是基于時間的粗粒度。這個想法是選擇性的刪除哪些有更新的變更的記錄的日志。這樣最終日志至少包含每個key的記錄的最后一個狀態(tài)。

這個策略可以為每個Topic設(shè)置,這樣一個集群中,可以一部分Topic通過時間和大小保留日志,另外一些可以通過壓縮保留。

這個功能的靈感來自于LinkedIn的最古老且最成功的基礎(chǔ)設(shè)置——一個稱為Databus的數(shù)據(jù)庫變更日志緩存系統(tǒng)。不想大多數(shù)的日志存儲系統(tǒng),Kafka為了訂閱而量身打造,用于線性的快速讀寫。和Databus不同,Kafka作為真實(shí)的存儲,壓縮日志是非常有用的,在上有數(shù)據(jù)源不能重放的情況下。

Log Compaction Basics

這里是一個展示Kafka日志的邏輯結(jié)構(gòu)的圖(每條消息包含了一個offset):

log_cleaner_anatomy
log_cleaner_anatomy

Log head中包含傳統(tǒng)的Kafka日志。它包含了連續(xù)的連續(xù)的offset和所有的消息。日志壓縮增加了處理tail Log的選項(xiàng)。上圖展示了日志壓縮的的Log tail的情況。tail中的消息保存了初次寫入時的offset。即使該offset的消息被壓縮,所有offset仍然在日志中是有效的。在這個場景中,無法區(qū)分和下一個出現(xiàn)的更高offset的位置。如上面的例子中,36、37、38是屬于相同位置的,從他們開始讀取日志都將從38開始。

壓縮允許刪除。一條消息伴隨著空的值被認(rèn)為從日志中刪除。這個刪除標(biāo)記將會引起所有之前擁有相同key的消息被移除(包括擁有key相同的新消息),但是刪除標(biāo)記比較特殊,它將在一定周期后被從日志中刪除來示范空間。這個時間點(diǎn)被稱為“delete retention point”。

壓縮操作通過在后臺周期性的拷貝日志段來完成。清除操作不會阻塞讀取,并且可以被配置不超過一定IO吞吐來避免影響Producer和Consumer。實(shí)際的日志段壓縮過程有點(diǎn)像如下:

log_compaction
log_compaction

What guarantees does log compaction provide?

日志壓縮提供了如下的保證:

  1. 所有跟上消費(fèi)的Consumer能消費(fèi)到所有寫入的消息;這些消息有連續(xù)的序列號。Topic的min.compaction.lag.ms可以用于保證消息寫入多久后才會被壓縮。這限制了一條消息在Log Head中的最短存在時間。

  2. 消息的順序會被保留。壓縮不會重排序消息,只是移除其中一部分。

  3. 消息的Offset不會變更。這是消息在日志中的永久標(biāo)志。

  4. 任何從頭開始處理日志的Consumer至少會拿到每個key的最終狀態(tài)。另外,只要Consumer在小于Topic的delete.retention.ms設(shè)置(默認(rèn)24小時)的時間段內(nèi)到達(dá)Log head,將會看到所有刪除記錄的所有刪除標(biāo)記。換句話說,因?yàn)橐瞥齽h除標(biāo)記和讀取是同事發(fā)生的,Consumer可能會因?yàn)槁浜蟪^delete.retention.ms而導(dǎo)致錯過刪除標(biāo)記。

Log Compaction Details

日志壓縮由Log Cleaner執(zhí)行,后臺線程池重新拷貝日志段,移除那些key存在于Log Head中的記錄。每個壓縮線程如下工作:

  1. 選擇Log Head相對于Log Head在日志中占更高比例的日志
  2. 創(chuàng)建Log Head中每個Key最后一個offset的摘要
  3. 從頭到尾的拷貝日志,并刪除之后日志終于到相同key的記錄。新的、干凈的日志將會立即被交到到日志中,所以只需要一個額外的日志段空間
  4. Log Head的摘要實(shí)際上是一個空間緊湊的哈希表。每個條目使用24個字節(jié)。所以如果有8G的整理緩沖區(qū), 則能迭代處理大約366G的日志頭部(假設(shè)消息大小為1k)。

Configuring The Log Cleaner

Log Cleaner默認(rèn)啟用。這會啟動清理的線程池。如果要開始特定Topic的清理功能,可以開啟特定的屬性:

log.cleanup.policy=compact

這個可以通過創(chuàng)建Topic時配置或者之后使用Topic命令實(shí)現(xiàn)。

Log Cleaner可以配置保留最小的不壓縮的日志頭??梢酝ㄟ^配置壓縮的延遲時間:

log.cleaner.min.compaction.lag.ms

這可以用于保證消息比在被壓縮的消息大一段時間。如果沒有設(shè)置,除了最后一個日志外,所有的日志都會被壓縮。當(dāng)前寫入的自如端不會被壓縮,即使所有的消息都落后于比配置的最小壓縮時間。

更多的配置在這里

<h4>9 Quotas</h4>

從0.9版本開始,Kafka可以對生產(chǎn)和消費(fèi)請求進(jìn)行限額配置?;谧止?jié)速率來限制,每個group中所有的客戶端共享一個限額。

Why are quotas necessary?

Producer和Consumer可能生產(chǎn)或消費(fèi)大量的數(shù)據(jù)而耗盡Broker的資源,導(dǎo)致網(wǎng)絡(luò)飽和。進(jìn)行限額可以避免這些問題,特別是在多租戶的集群中,一小部分低質(zhì)量的客戶端會降低整個集群的體驗(yàn)。實(shí)際上,當(dāng)運(yùn)行Kafka作為服務(wù)時,這還可以對API的使用進(jìn)行限制。

Client groups

Kafka客戶端的身份代表了用于鑒權(quán)。 在無鑒權(quán)機(jī)制的集群中, 用戶身份是由服務(wù)器使用可配置的PrincipalBuilder進(jìn)行選擇的, Client-id作為客戶端邏輯分組, 是由客戶端應(yīng)用選擇的一個有意義的名稱. 標(biāo)量(user, client-id)定義共享這個用戶身份和客戶端ID的邏輯客戶端分組.

配額可以用于(user, client-id)組合, 或user, client-id分組。

對一個給定的連接, 最符合這個連接的配額被使用到, 一個限額組的所有連接共享這個限額配置, 例如: 如果(user=”test-user”, client-id=”test-client”) 10MB/s的配額, 這個配置會被所有的具有”test-user”用戶和客戶端ID是 “test-client”的所有生產(chǎn)者所共享.

Quota Configuration

配額可以按照(user, client-id)或者, user或client-id進(jìn)行分組, 如果需要更高或更低的配額, 可以覆蓋默配額, 這個機(jī)制類似于對日志主題配置的覆蓋, user 或者 (user, client-id)配額可以覆蓋寫入到zookeeper下的 /config/users ,client-id配置, 可以寫入到 /config/clients。這些覆蓋寫入會被服務(wù)器很快的讀取到, 這讓我們修改配置不需要重新啟動服務(wù)器. 每個分組的默認(rèn)配置也可以同樣的方式動態(tài)修改。

限額的配置順序如下:

  1. /config/users/<user>/clients/<client-id>
  2. /config/users/<user>/clients/<default>
  3. /config/users/<user>
  4. /config/users/<default>/clients/<client-id>
  5. /config/users/<default>/clients/<default>
  6. /config/users/<default>
  7. /config/clients/<client-id>
  8. /config/clients/<default>

Broker的quota.producer.default,quota.consumer.default也可以用來配置默認(rèn)的client-id分組的默認(rèn)值。這可屬性已經(jīng)不鼓勵使用,后續(xù)將會刪除。默認(rèn)client-id限額配置可以和其它默認(rèn)配置一樣, 在Zookeeper直接設(shè)置。

Enforcement

默認(rèn)情況下,每個唯一的客戶端group會收到一個集群配置的固定的限額。這個限額是基于每個Broker的。每個客戶端能發(fā)布或獲取在每臺服務(wù)器都的最大速率, 我們按服務(wù)器定義配置, 而不是按整個集群定義,是因?yàn)槿绻羌悍秶男枰~外的機(jī)制來共享配額的使用情況, 這會導(dǎo)致配額機(jī)制的實(shí)現(xiàn)比較難。

Broker檢測到限額違規(guī)時時如何處理的?在我們的解決方案中,Broker不會返回錯誤給客戶端,而是降低客戶端的速率。Broker計(jì)算使客戶端回到合理限額的需要的響應(yīng)延遲。這種方法的處理對客戶端是透明,使他們不必執(zhí)行任何棘手的,特殊的操作。實(shí)際上,錯誤的客戶端還可能加劇正在解決的限額問題。

客戶端字節(jié)率在多個小窗口(例如每個1秒的30個窗口)上進(jìn)行測量,以便快速檢測和糾正配額違規(guī)。 通常,具有大的測量窗口(例如,每個30秒的10個窗口)導(dǎo)致大量的流量脈沖,隨后是長時間的延遲,這在用戶體驗(yàn)方面不是很好。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容