前言
在上文中我們介紹了 Kafka 的網(wǎng)絡(luò)通信,本文打算詳細(xì)分析 Kafka 的核心 — 隊(duì)列的設(shè)計(jì)和實(shí)現(xiàn),來(lái)對(duì) Kafka 進(jìn)行更深一步的了解。
如何設(shè)計(jì)隊(duì)列
隊(duì)列是一種先進(jìn)先出(FIFO)的數(shù)據(jù)結(jié)構(gòu),它是 Kafka 中最重要的部分,負(fù)責(zé)收集生產(chǎn)者生產(chǎn)的消息,并將這些消息傳遞給消費(fèi)者。要實(shí)現(xiàn)一個(gè)隊(duì)列有多種方式,Kafka 作為一個(gè)消息隊(duì)列中間件,在設(shè)計(jì)隊(duì)列時(shí)主要要考慮兩個(gè)問(wèn)題:
1. 隊(duì)列數(shù)據(jù)是寫(xiě)到內(nèi)存還是寫(xiě)到磁盤(pán)
乍一看到這個(gè)問(wèn)題,我們會(huì)想,內(nèi)存的讀取速度遠(yuǎn)快于磁盤(pán),如果追求性能,內(nèi)存也充足的話(huà),當(dāng)然是將生產(chǎn)者產(chǎn)生的消息數(shù)據(jù)寫(xiě)到內(nèi)存(比如用一個(gè)數(shù)組或者鏈表來(lái)存儲(chǔ)隊(duì)列數(shù)據(jù)),供消費(fèi)者消費(fèi)。真的是這樣嗎?
下面我們依次分析下寫(xiě)內(nèi)存和寫(xiě)磁盤(pán)文件的優(yōu)缺點(diǎn),首先,內(nèi)存的優(yōu)點(diǎn)是讀寫(xiě)速度非???,但是,如果我們的目標(biāo)是設(shè)計(jì)「大數(shù)據(jù)量」下的「高吞吐量」的消息隊(duì)列,會(huì)有以下幾個(gè)問(wèn)題:
- GC 的時(shí)間消耗:如果隊(duì)列數(shù)據(jù)都在內(nèi)存,數(shù)據(jù)會(huì)非常大(幾十 G), 因?yàn)橄㈥?duì)列需要不斷地接受新產(chǎn)生的消息和刪除已經(jīng)被消費(fèi)的消息(不然內(nèi)存很快會(huì)被撐爆),Java GC 的消耗不容忽視。
- Java 內(nèi)存存儲(chǔ)效率:如我們所知,一個(gè) Java 對(duì)象的內(nèi)存開(kāi)銷(xiāo)會(huì)大于其對(duì)象數(shù)據(jù)本身,通常對(duì)象的內(nèi)存開(kāi)銷(xiāo)是數(shù)據(jù)本身的一倍(甚至更多)。
- 設(shè)計(jì)復(fù)雜度增加:為保證數(shù)據(jù)不丟失,需要一個(gè)預(yù)寫(xiě)日志(WAL),如果程序異常掛掉,重啟時(shí)可以從 WAL 恢復(fù)數(shù)據(jù)。
- 數(shù)據(jù)初始化的時(shí)間消耗:程序重啟時(shí)需要從文件將數(shù)據(jù) load 到內(nèi)存,如果數(shù)據(jù)對(duì)象大小是 10G 級(jí)別的,也會(huì)消耗大量的時(shí)間(10 分鐘左右)。
接下來(lái)我們來(lái)分析一下磁盤(pán),寫(xiě)磁盤(pán)文件方式存儲(chǔ)隊(duì)列數(shù)據(jù)的優(yōu)點(diǎn)就是能規(guī)避上述內(nèi)存的缺點(diǎn),但其有很?chē)?yán)重的缺點(diǎn),就是讀寫(xiě)速度慢,如果純依靠磁盤(pán),那消息隊(duì)列肯定做不到「高吞吐量」這個(gè)目標(biāo)。
分析了內(nèi)存跟磁盤(pán)的優(yōu)缺點(diǎn),好像我們還是只能選寫(xiě)內(nèi)存,但我們忽視了磁盤(pán)的兩個(gè)情況:一是磁盤(pán)慢是慢在隨機(jī)讀寫(xiě),如果是順序讀寫(xiě),他的速度能達(dá)到 600MB/sec(RAID-5 磁盤(pán)陣列),并不慢,如果我們盡可能地將數(shù)據(jù)的讀寫(xiě)設(shè)計(jì)成順序的,可以大大提升性能。二是現(xiàn)代的操作系統(tǒng)會(huì)(盡可能地)將磁盤(pán)里的文件進(jìn)行緩存。
有了操作系統(tǒng)級(jí)別的文件緩存,那用磁盤(pán)存儲(chǔ)隊(duì)列數(shù)據(jù)的方式就變得有優(yōu)勢(shì)了。首先,磁盤(pán)文件的數(shù)據(jù)會(huì)有文件緩存,所以不必?fù)?dān)心隨機(jī)讀寫(xiě)的性能;其次,同樣是使用內(nèi)存,磁盤(pán)文件使用的是操作系統(tǒng)級(jí)別的內(nèi)存,相比于在 Java 內(nèi)存堆中存儲(chǔ)隊(duì)列,它沒(méi)有 GC 問(wèn)題,也沒(méi)有 Java 對(duì)象的額外內(nèi)存開(kāi)銷(xiāo),更可以規(guī)避應(yīng)用重啟后的內(nèi)存 load 數(shù)據(jù)耗時(shí)的問(wèn)題,而且,文件緩存是操作系統(tǒng)提供的,因?yàn)槲覀冎灰?jiǎn)單的寫(xiě)磁盤(pán)文件,系統(tǒng)復(fù)雜性大大降低。
因此,Kafka 直接使用磁盤(pán)來(lái)存儲(chǔ)消息隊(duì)列的數(shù)據(jù)。
2. 以何種數(shù)據(jù)結(jié)構(gòu)存儲(chǔ)隊(duì)列數(shù)據(jù)
剛才我們已經(jīng)決定用磁盤(pán)文件來(lái)存儲(chǔ)隊(duì)列數(shù)據(jù),那么要如何選擇數(shù)據(jù)結(jié)構(gòu)呢?一般情況下,如果需要查找數(shù)據(jù)并隨機(jī)訪(fǎng)問(wèn),我們會(huì)用 B+ 樹(shù)來(lái)存儲(chǔ)數(shù)據(jù),但其時(shí)間復(fù)雜度是 O(log N),由于我們?cè)O(shè)計(jì)的是消息隊(duì)列,我們可以完全順序的寫(xiě)收到的生產(chǎn)者消息,消費(fèi)者消費(fèi)時(shí),只要記錄下消費(fèi)者當(dāng)前消費(fèi)的位置,往后消費(fèi)就可以了,這樣可以對(duì)文件盡可能的進(jìn)行順序讀寫(xiě),同時(shí),時(shí)間復(fù)雜度是O(1)。其實(shí),這跟我們寫(xiě)日志的方式很像,每條日志順序 append 到日志文件。
隊(duì)列實(shí)現(xiàn)
之前我們已經(jīng)確定采用直接順序?qū)懘疟P(pán)文件的方式來(lái)存儲(chǔ)隊(duì)列數(shù)據(jù),下面我們來(lái)剖析下具體的實(shí)現(xiàn)細(xì)節(jié)。
在 Kafka 中,用一個(gè)文件夾存儲(chǔ)一條消息隊(duì)列,成為一個(gè) Log,每條消息隊(duì)列由多個(gè)文件組成,每個(gè)文件稱(chēng)為一個(gè) LogSegment,每當(dāng)一個(gè) LogSegment 的大小到達(dá)閾值,系統(tǒng)就會(huì)重新生成一個(gè) LogSegment;當(dāng)舊的 LogSegment 過(guò)期需要清理時(shí)(雖然磁盤(pán)空間相對(duì)于內(nèi)存會(huì)寬裕很多,我們可以保存更長(zhǎng)時(shí)間的消息數(shù)據(jù),比如一周,以供消費(fèi)者更靈活的使用,但還是需要定期清理太老的數(shù)據(jù)),系統(tǒng)會(huì)根據(jù)清理策略刪除這些文件。
現(xiàn)在我們知道一個(gè)隊(duì)列(Log)是由多個(gè)隊(duì)列段文件(LogSegment)組成的,那么 Kafka 是如何將這些文件邏輯上連接從而組成一條有序隊(duì)列的呢?在生成每個(gè)隊(duì)列段文件時(shí),Kafka 用該段的初始位移來(lái)對(duì)其命名,如在新建一個(gè)隊(duì)列時(shí),會(huì)初始化第一個(gè)隊(duì)列段文件,那么其文件名就是0,假設(shè)每個(gè)段的大小是固定值 L,那么第二個(gè)段文件名就是 L,第 N 個(gè)就是 (N - 1)* L。這樣,我們就可以根據(jù)文件名對(duì)段文件進(jìn)行排序,排序后的順序就是整個(gè)隊(duì)列的邏輯順序。
隊(duì)列讀寫(xiě)
了解了隊(duì)列的基本實(shí)現(xiàn),下面我們就來(lái)分析下隊(duì)列的核心操作—讀和寫(xiě)。
寫(xiě)
寫(xiě)操作發(fā)生在生產(chǎn)者向隊(duì)列生產(chǎn)消息時(shí),在上篇文章講網(wǎng)絡(luò)通信時(shí)我們已經(jīng)說(shuō)到,所有的客戶(hù)端請(qǐng)求會(huì)根據(jù)協(xié)議轉(zhuǎn)到一個(gè) Handler 來(lái)具體處理,負(fù)責(zé)寫(xiě)操作的 Handler 叫 ProducerHandler,整個(gè)寫(xiě)請(qǐng)求的流程如下:

- 根據(jù)消息的 topic、partition 信息(跟其分布式有關(guān),以后文章會(huì)專(zhuān)門(mén)說(shuō)到),定位到具體的隊(duì)列,即上面我們說(shuō)到的 Log。
- 檢驗(yàn)生產(chǎn)者消息是否合法
- 我們知道一個(gè) Log 由多個(gè) LogSegment 組成,在任意時(shí)刻,只有最新一個(gè) LogSegment 是可寫(xiě)的,其他的都是只讀的,所以,接下來(lái)我們通過(guò)排序后的 LogSegment 獲取到最新一個(gè)可寫(xiě)的 LogSegment。
- 用 NIO,將消息(一個(gè) ByteBuffer)寫(xiě)到 LogSegment 所屬的文件
- 上一步中說(shuō)的寫(xiě)文件,考慮到效率問(wèn)題,并沒(méi)有直接將消息 flush 到磁盤(pán),所以,這里其實(shí)存在一個(gè)丟消息的風(fēng)險(xiǎn)。在本步驟中,主要檢查待 flush 的消息大小是否到達(dá)指定閾值,如果到了,就 flush 到磁盤(pán)
- 檢查最新的 LogSegment 大小是否到達(dá)閾值,如果是,則保存關(guān)閉當(dāng)前文件,新建一個(gè) LogSegment 文件。
之前我們說(shuō)過(guò),如果是順序?qū)?,由于省掉了磁頭尋址的時(shí)間,磁盤(pán)的性能還是很高的,我們看到 Kakfa 隊(duì)列是以順序方式寫(xiě)的,所以性能很高。但是,如果一臺(tái) Kafka 服務(wù)器有很多個(gè)隊(duì)列,而硬盤(pán)的磁頭是有限的,所以還是得在不同的隊(duì)列直接來(lái)回切換尋址,性能會(huì)有所下降。
讀
隊(duì)列的讀操作發(fā)送在消費(fèi)者消費(fèi)隊(duì)列數(shù)據(jù)時(shí),由于隊(duì)列是線(xiàn)性的,只需要記錄消費(fèi)者上次消費(fèi)到了哪里(offset),接下去消費(fèi)就好了。那么首先會(huì)有一個(gè)問(wèn)題,由誰(shuí)來(lái)記消費(fèi)者到底消費(fèi)到哪里了?
一般情況下,我們會(huì)想到讓服務(wù)端來(lái)記錄各個(gè)消費(fèi)者當(dāng)前的消費(fèi)位置,當(dāng)消費(fèi)者來(lái)拉數(shù)據(jù),根據(jù)記錄的消費(fèi)位置和隊(duì)列的當(dāng)前位置,要么返回新的待消費(fèi)數(shù)據(jù),要么返回空。讓服務(wù)端記錄消費(fèi)位置,當(dāng)遇到網(wǎng)絡(luò)異常時(shí)會(huì)有一些問(wèn)題,比如服務(wù)端將消息發(fā)給消費(fèi)者后,如果網(wǎng)絡(luò)異常消費(fèi)者沒(méi)有收到消息,那么這條消息就被「跳過(guò)」了,當(dāng)然我們可以借鑒二階段提交的思想,服務(wù)端將消息發(fā)送給消費(fèi)者后,標(biāo)記狀態(tài)為「已發(fā)送」,等消費(fèi)者消費(fèi)成功后,返回一個(gè) ack 給服務(wù)端,服務(wù)端再將其標(biāo)記為「成功消費(fèi)」。不過(guò)這樣設(shè)計(jì)還是會(huì)有一個(gè)問(wèn)題,如果消費(fèi)者沒(méi)有返回 ack 給服務(wù)端,此時(shí)這條消息可能在已經(jīng)被消費(fèi)也可能還沒(méi)被消費(fèi),服務(wù)端無(wú)從得知,只能根據(jù)人為策略跳過(guò)(可能會(huì)漏消息)或者重發(fā)(可能存在重復(fù)數(shù)據(jù))。另一個(gè)問(wèn)題是,如果有很多消費(fèi)者,服務(wù)端需要記錄每條消息的每個(gè)消費(fèi)者的消費(fèi)狀態(tài),這在大數(shù)據(jù)的場(chǎng)景下,非常消耗性能和內(nèi)存。
Kafka 將每個(gè)消費(fèi)者的消費(fèi)狀態(tài)記錄在消費(fèi)者本身(隔一段時(shí)間將最新消費(fèi)狀態(tài)同步到 zookeeper),每次消費(fèi)者要拉數(shù)據(jù),就給服務(wù)端傳遞一個(gè) offset,告訴服務(wù)端從隊(duì)列的哪個(gè)位置開(kāi)始給我數(shù)據(jù),以及一個(gè)參數(shù) length,告訴服務(wù)端最多給我多大的數(shù)據(jù)(批量順序讀數(shù)據(jù),更高性能),這樣就能使服務(wù)端的設(shè)計(jì)復(fù)雜度大大降低。當(dāng)然這解決不了一致性的問(wèn)題,不過(guò)消費(fèi)者可以根據(jù)自己程序特點(diǎn),更靈活地處理事務(wù)。
下面就來(lái)分析整個(gè)讀的流程:

- 在收到消費(fèi)者讀的請(qǐng)求后,根據(jù)請(qǐng)求中的 topic、partition 信息,定位到具體的隊(duì)列(Log)。
- 根據(jù) offset(我們要開(kāi)始消費(fèi)的隊(duì)列位置),因?yàn)槊總€(gè) LogSegment 文件都是以隊(duì)列的位置命名的,所以可以利用 offset 進(jìn)行二分查找尋找具體的 LogSegment。
- 在找到具體的 LogSegment 后,就可以讀數(shù)據(jù)了,不過(guò),在這里并不真正讀數(shù)據(jù),而是生成一個(gè)引用,記錄該文件的 channel,這次要讀取數(shù)據(jù)在文件中的起始位置以及結(jié)束位置,在真正進(jìn)行網(wǎng)絡(luò)傳輸時(shí),我們利用零拷貝(zero-copy)將數(shù)據(jù)傳輸,即從文件的 channel 直接向 socket channel 傳輸數(shù)據(jù)(Java 中是 channel.transferTo() 方法)。
- 消費(fèi)者收到返回的數(shù)據(jù)后,解碼成真正的 message 列表。
一致性問(wèn)題
分布式系統(tǒng)中不可避免的會(huì)遇到一致性問(wèn)題,主要是兩塊:生產(chǎn)者與隊(duì)列服務(wù)端之間的一致性問(wèn)題、消費(fèi)者與隊(duì)列服務(wù)端之間的一致性問(wèn)題,下面依次展開(kāi)。
生產(chǎn)者與隊(duì)列服務(wù)端之間的一致性
當(dāng)生產(chǎn)者向服務(wù)端投遞消息時(shí),可能會(huì)由于網(wǎng)絡(luò)或者其他問(wèn)題失敗,如果要保證一致性,需要生產(chǎn)者在失敗后重試,不過(guò)重試又會(huì)導(dǎo)致消息重復(fù)的問(wèn)題,一個(gè)解決方案是每個(gè)消息給一個(gè)唯一的 id,通過(guò)服務(wù)端的主動(dòng)去重來(lái)避免重復(fù)消息的問(wèn)題,不過(guò)這一機(jī)制目前 Kafka 還未實(shí)現(xiàn)。目前 Kafka 提供配置,供用戶(hù)不同場(chǎng)景下選擇允許漏消息(失敗后不重試)還是允許重復(fù)消息(失敗后重試)。
消費(fèi)者與隊(duì)列服務(wù)端之間的一致性
由于在消費(fèi)者里我們可以自己控制消費(fèi)位置,就可以更靈活的進(jìn)行個(gè)性化設(shè)計(jì)。如果我們?cè)诶〉较⒑?,先增?offset,然后再進(jìn)行消息的后續(xù)處理,如果在消息還未處理完消費(fèi)者就掛掉,就會(huì)存在消息遺漏的問(wèn)題;如果我們?cè)诶〉较⒑?,先進(jìn)行消息處理,處理成功后再增加 offset,那么如果消息處理一半消費(fèi)者掛掉,會(huì)存在重復(fù)消息的問(wèn)題。要做到完全一致,最好的辦法是將 offset 的存儲(chǔ)與消費(fèi)者放一起,每消費(fèi)一條數(shù)據(jù)就將 offset+1。
總結(jié)
本文介紹了 Kafka 的隊(duì)列實(shí)現(xiàn)以及其讀寫(xiě)過(guò)程。Kafka 認(rèn)為操作系統(tǒng)級(jí)別的文件緩存比 Java 的堆內(nèi)存更省空間和高效,如果生產(chǎn)者消費(fèi)者之間比較「和諧」的話(huà),大部分的讀寫(xiě)操作都會(huì)落在文件緩存,且在順序讀寫(xiě)的情況下,硬盤(pán)的速度并不慢,因此選擇直接寫(xiě)磁盤(pán)文件的方式存儲(chǔ)隊(duì)列。在隊(duì)列的讀寫(xiě)過(guò)程中,Kafka 盡可能地使用順序讀寫(xiě),并使用零拷貝來(lái)優(yōu)化性能。最后,Kafka 讓消費(fèi)者自己控制消費(fèi)位置,提供了更加靈活的數(shù)據(jù)消費(fèi)方式。