前言
Kafka 是 LinkedIn 開發(fā)的一個(gè)分布式的消息中間件。由于其高吞吐量、可水平擴(kuò)展等特性,目前被廣泛使用,已經(jīng)是目前大數(shù)據(jù)生態(tài)系統(tǒng)中不可或缺的一環(huán),有關(guān)其詳細(xì)介紹可以查看官方的文檔。Kafka 的流行源于他優(yōu)秀的設(shè)計(jì),如依靠磁盤(以及操作系統(tǒng)的 Page Cache)而不是內(nèi)存來存儲(chǔ)隊(duì)列數(shù)據(jù)、充分使用零拷貝(zero-copy)以減少數(shù)據(jù)在不同內(nèi)存空間間的拷貝、數(shù)據(jù)盡可能的使用順序讀寫等。今天準(zhǔn)備深度解析 kafka 的網(wǎng)絡(luò)通信模塊,來學(xué)習(xí)下實(shí)現(xiàn)一個(gè)高吞吐量的系統(tǒng)要設(shè)計(jì)一個(gè)怎么樣的網(wǎng)絡(luò)通信機(jī)制。
網(wǎng)絡(luò)通訊協(xié)議
作為一個(gè)消息隊(duì)列,涉及的網(wǎng)絡(luò)通信主要有兩塊:
- 消息生產(chǎn)者與消息隊(duì)列服務(wù)器之間(Kafka 中是生產(chǎn)者向隊(duì)列「推」消息)
- 消息消費(fèi)者與消息隊(duì)列服務(wù)器之間(Kafka 中是消費(fèi)者向隊(duì)列「拉」消息)
要實(shí)現(xiàn)上述的網(wǎng)絡(luò)通信,我們可以使用 HTTP 協(xié)議,比如服務(wù)端內(nèi)嵌一個(gè) jetty 容器,通過 servlet 來實(shí)現(xiàn)客戶端與服務(wù)端之間的交互,但是其性能存在問題,無法滿足高吞吐量這個(gè)需求。要實(shí)現(xiàn)高性能的網(wǎng)絡(luò)通信,我們可以使用更底層的 TCP 或者 UDP 來實(shí)現(xiàn)自己的私有協(xié)議,而 UDP 協(xié)議是不可靠的傳輸協(xié)議,畢竟我們不希望一條消息在投遞或者消費(fèi)途中丟失了,所以 Kafka 選擇 TCP 作為服務(wù)間通訊的協(xié)議。
網(wǎng)絡(luò) IO 模型
談到網(wǎng)絡(luò)通信,繞不過 IO 模型,IO 模型主要是同步與異步,阻塞與非阻塞之間進(jìn)行選擇。
Kafka 的生產(chǎn)者同時(shí)實(shí)現(xiàn)了同步和異步兩種類型的客戶端(即:向服務(wù)端發(fā)完請(qǐng)求后可以一直等待響應(yīng)也可以繼續(xù)干后面的事),其異步客戶端實(shí)現(xiàn)方式是通過線程池加回調(diào)函數(shù)。
Kafka 的服務(wù)端使用了 NIO 的 IO 多路復(fù)用技術(shù),是非阻塞的 IO, kafka 的早期版本中,服務(wù)端是通過同步的方式處理客戶端請(qǐng)求,最新版本是通過異步的方式進(jìn)行的。
Kafka 自帶的消費(fèi)者是通過同步阻塞的方式進(jìn)行數(shù)據(jù)拉取的,當(dāng)然如果需要異步處理,可以自己另外寫一個(gè)異步消費(fèi)者。
Reactor 線程模型
Kafka 采用的是 Reactor 多線程模型,即通過一個(gè) Acceptor 線程處理所有的新連接,通過多個(gè) Processor 線程對(duì)請(qǐng)求進(jìn)行處理(解析協(xié)議、封裝請(qǐng)求并轉(zhuǎn)發(fā))。在早期版本中,對(duì)請(qǐng)求的處理在 Processor 線程中同步進(jìn)行,也就是說,有多少個(gè) Processor 線程就有多少個(gè)處理請(qǐng)求的線程。在新版本中,kafka 新增了一個(gè) Handler 模塊,通過指定的線程數(shù)對(duì)請(qǐng)求進(jìn)行專門處理,Handler 與 Processor 之間通過一個(gè) block queue 進(jìn)行連接。線程模型如圖:

網(wǎng)絡(luò)通信流程剖析
Kafka 的整個(gè)網(wǎng)絡(luò)通信框架并非一成不變,從早期版本到現(xiàn)在經(jīng)歷了一些變化,下面我們通過分析早期的版本與最新版本的網(wǎng)絡(luò)通信流程,了解其演變過程,以供自己在設(shè)計(jì)系統(tǒng)的網(wǎng)絡(luò)通信時(shí)的一些參考。
早期版本(0.7)
Kafka 以 NIO 作為網(wǎng)絡(luò)通信的基礎(chǔ),其通過將許多 socket 連接注冊(cè)到一個(gè) Selector 監(jiān)聽,可以只用一個(gè)線程就能管理很多的連接,減少了大量線程的系統(tǒng)開銷。
早期版本的 kafka 的網(wǎng)絡(luò)通信實(shí)現(xiàn)是一個(gè)簡(jiǎn)單的 Reactor 多線程模型,如圖:

- 客戶端向服務(wù)端發(fā)起請(qǐng)求時(shí),Accept 負(fù)責(zé)接受這個(gè) TCP 連接,連接成功后傳遞給其中一個(gè) Processor 線程(先添加到 Processor 線程中的內(nèi)部新連接隊(duì)列)。
- Processor 線程收到該新連接后(從新連接隊(duì)列中 poll),將其注冊(cè)到自身的 Selector 中,監(jiān)聽其 READ 事件。
- 每當(dāng) Client 在這個(gè)連接上寫入數(shù)據(jù),就會(huì)觸發(fā) Processor 線程中 Selector 監(jiān)聽的 READ 事件,這時(shí)該線程會(huì)讀出連接中的元數(shù)據(jù),根據(jù)協(xié)議(Handler Mapping)調(diào)用相應(yīng)的 Handler 進(jìn)行處理
- Handler 處理完成后,可能會(huì)有返回值需要返回給客戶端(如 Fetch 請(qǐng)求就需要返回具體內(nèi)容給客戶端),這時(shí)將 Handler 返回的 Response 綁定到連接上(SelectionKey.attach 方法),同時(shí)將這個(gè)連接的監(jiān)聽事件從 READ 轉(zhuǎn)為 WRITE。
- Selector 監(jiān)聽到剛才注冊(cè)的 WRITE 事件,將連接中綁定的 Response 發(fā)送。
個(gè)人理解 4、5 兩步可以合并,即如果 Handler 有返回值,就直接返回,個(gè)人猜測(cè) kafka 這樣設(shè)計(jì)可能是出于整個(gè)架構(gòu)上更加清晰優(yōu)美的目的。
新版本
新版 Kafka 也是以 NIO 作為網(wǎng)絡(luò)通信的基礎(chǔ),也是用 Reactor 多線程模型,所不同的是新版把具體業(yè)務(wù)處理模塊(Handler 模塊)獨(dú)立出去,用單獨(dú)的線程池進(jìn)行控制。具體如下圖:

新版本分離出 Handler 模塊,我理解的好處有以下幾個(gè):
- 可以單獨(dú)指定 Handler 的線程數(shù)量,便于調(diào)優(yōu)和管理
- 可以避免一個(gè)超大請(qǐng)求堵住整一個(gè) Processor 線程的情況
- 因?yàn)?Request 與 Handler、Handler 與 Response 之間都是通過隊(duì)列進(jìn)行連接,所以彼此是解耦的,可以讓請(qǐng)求變?yōu)楫惒?,?duì)系統(tǒng)的性能會(huì)有提升
總結(jié)
本文通過分析 kafka 的網(wǎng)絡(luò)通信設(shè)計(jì)對(duì)網(wǎng)絡(luò)編程進(jìn)行了一次學(xué)習(xí),筆者之后又對(duì) netty 的網(wǎng)絡(luò)通信進(jìn)行了了解,發(fā)現(xiàn)大部分也類似,可見目前的高性能的網(wǎng)絡(luò)通信可能存在「最佳實(shí)踐」,不過真正在設(shè)計(jì)一個(gè)系統(tǒng)的網(wǎng)絡(luò)通信時(shí),還有很多工程上的問題需要解決,有許多的「坑」,很容易為系統(tǒng)埋下定時(shí)炸彈,因此,我看很多大牛都建議不要自己去實(shí)現(xiàn)網(wǎng)絡(luò)通信模塊,因?yàn)?netty 已經(jīng)足夠優(yōu)秀了。
最后,如果讀者有興趣看 kafka 的源碼,又對(duì) scala 不是很熟悉,可以先看 jafka 的代碼,它是早期 kafka 版本的 java 克隆版。