前言
大家好,我是 yes。
這是Kafka源碼分析第四篇文章,今天來說說 Kafka控制器,即 Kafka Controller。
源碼類的文章在手機(jī)上看其實效果很差,這篇文章我分為兩部分,第一部分就是直接圖文來說清整個 Kafka 控制器事件處理全流程,然后再通過Controller選舉流程進(jìn)行一波源碼分析,再來走一遍處理全流程。
?一些在手機(jī)上看的同學(xué)可以直接看前半部分,沒有一堆代碼比較舒適,也能看明白整個流程,后面源碼部分看個人了。
不過建議電腦端看效果更佳。
正文
在深入源碼之前我們得先搞明白 Controller是什么?它有什么用?這樣在看源碼的時候才能有的放矢。
Controller是核心組件,它的作用是管理和協(xié)調(diào)整個Kafka集群。
具體管理和協(xié)調(diào)什么呢?
- 主題的管理,創(chuàng)建和刪除主題;
- 分區(qū)管理,增加或重分配分區(qū);
- 分區(qū)
Leader選舉; - 監(jiān)聽
Broker相關(guān)變化,即Broker新增、關(guān)閉等; - 元數(shù)據(jù)管理,向其他
Broker提供元數(shù)據(jù)服務(wù);
為什么需要Controller??
我個人理解:凡是管理或者協(xié)調(diào)某樣?xùn)|西,都需要有個Leader,由他來把控全局,管理內(nèi)部,對接外部,咱們就跟著Leader干就完事了。這其實對外也是好的,外部不需要和我們整體溝通,他只要和一個決策者交流,效率更高。
再來看看朱大是怎么說的,以下內(nèi)容來自《深入理解Kafka:核心設(shè)計與實踐原理》。
在Kafka的早期版本中,并沒有采用 Kafka Controller 這樣一概念來對分區(qū)和副本的狀態(tài)進(jìn)行管理,而是依賴于 ZooKeeper,每個 broker都會在 ZooKeeper 上為分區(qū)和副本注冊大量的監(jiān)聽器(Watcher)。
當(dāng)分區(qū)或副本狀態(tài)變化時,會喚醒很多不必要的監(jiān)聽器,這種嚴(yán)重依賴 ZooKeeper 的設(shè)計會有腦裂、羊群效應(yīng),以及造成 ZooKeeper 過載的隱患。在目前的新版本的設(shè)計中,只有 Kafka Controller 在 ZooKeeper 上注冊相應(yīng)的監(jiān)聽器,其他的 broker 極少需要再監(jiān)聽 ZooKeeper 中的數(shù)據(jù)變化,這樣省去了很多不必要的麻煩。
簡單說下ZooKeeper
了解了 Controller的作用之后我們還需要在簡單的了解下ZooKeeper,因為Controller是極度依賴ZooKeeper的。(不過社區(qū)準(zhǔn)備移除ZooKeeper,文末再提一下)
ZooKeeper是一個開源的分布式協(xié)調(diào)服務(wù)框架,最常用來作為注冊中心等。ZooKeeper的數(shù)據(jù)模型就像文件系統(tǒng)一樣,以根目錄 "/" 開始,結(jié)構(gòu)上的每個節(jié)點稱為znode,可以存儲一些信息。節(jié)點分為持久節(jié)點和臨時節(jié)點,臨時節(jié)點會隨著會話結(jié)束而自動被刪除。
并且有Watcher功能,節(jié)點自身數(shù)據(jù)變更、節(jié)點新增、節(jié)點刪除、子節(jié)點數(shù)量變更都可以通過變更監(jiān)聽器通知客戶端。

Controller是如何依賴ZooKeeper的
每個Broker在啟動時會嘗試向ZooKeeper注冊/controller節(jié)點來競選控制器,第一個創(chuàng)建/controller節(jié)點的Broker會被指定為控制器。這就是是控制器的選舉。
/controller節(jié)點是個臨時節(jié)點,其他Broker會監(jiān)聽著此節(jié)點,當(dāng)/controller節(jié)點所在的Broker宕機(jī)之后,會話就結(jié)束了,此節(jié)點就被移除。其他Broker伺機(jī)而動,都來爭當(dāng)控制器,還是第一個創(chuàng)建/controller節(jié)點的Broker被指定為控制器。這就是控制器故障轉(zhuǎn)移,即Failover。
當(dāng)然還包括各種節(jié)點的監(jiān)聽,例如主題的增減等,都通過Watcher功能,來實現(xiàn)相關(guān)的監(jiān)聽,進(jìn)行對應(yīng)的處理。
Controller在初始化的時候會從ZooKeeper拉取集群元數(shù)據(jù)信息,保存在自己的緩存中,然后通過向集群其他Broker發(fā)送請求的方式將數(shù)據(jù)同步給對方。
Controller 底層事件模型
不管是監(jiān)聽Watcher的ZooKeeperWatcher線程,還是定時任務(wù)線程亦或是其他線程都需要訪問或更新Controller從集群拉取的元數(shù)據(jù)。多線程 + 數(shù)據(jù)競爭 = 線程不安全。因此需要加鎖來保證線程安全。
一開始Kafka就是用大量的鎖來保證線程間的同步,各種加鎖使得性能下降,并且多線程加鎖的方式使得代碼復(fù)雜度急劇上升,一不小心就會出各種問題,bug難修復(fù)。
因此在0.11版本之后將多線程并發(fā)訪問改成了單線程事件隊列模式。將涉及到共享數(shù)據(jù)競爭相關(guān)方面的訪問抽象成事件,將事件塞入阻塞隊列中,然后單線程處理。
也就是說其它線程還是在的,只是把涉及共享數(shù)據(jù)的操作封裝成事件由專屬線程處理。

先小結(jié)一下
到這我們已經(jīng)清楚了Controller主要用來管理和協(xié)調(diào)集群,具體是通過ZooKeeper臨時節(jié)點和Watcher機(jī)制來監(jiān)控集群的變化(當(dāng)然還有來自定時任務(wù)或其他線程的事件驅(qū)動),更新集群的元數(shù)據(jù),并且通知集群中的其他Broker進(jìn)行相關(guān)的操作(這部分下文會講)。
而由于集群元數(shù)據(jù)會有并發(fā)修改問題,因此將操作抽象成事件,由阻塞隊列和單線程處理來替換之前的多線程處理,降低代碼的復(fù)雜度,提升代碼的可維護(hù)性和性能。
接下來我們再講講Controller通知集群中的其他Broker的相關(guān)操作。
Controller的請求發(fā)送
Controller從ZooKeeper那兒得到變更通知之后,需要告知集群中的Broker(包括它自身)做相應(yīng)的處理。
Controller只會給集群的Broker發(fā)送三種請求:分別是 LeaderAndIsrRequest、StopReplicaRequest和 UpdateMetadataRequest
LeaderAndIsrRequest
告知Broker主題相關(guān)分區(qū)Leader和ISR副本都在哪些 Broker上。
StopReplicaRequest
告知Broker停止相關(guān)副本操作,用于刪除主題場景或分區(qū)副本遷移場景。
UpdateMetadataRequest
更新Broker上的元數(shù)據(jù)。
Controller事件處理線程會把事件封裝成對應(yīng)的請求,然后將請求寫入對應(yīng)的Broker的請求阻塞隊列,然后RequestSendThread不斷從阻塞隊列中獲取待發(fā)送的請求。

先解釋下controllerBrokerStateInfo,它就是個 POJO類,可以理解為集群每個broker對應(yīng)一個controllerBrokerStateInfo.

然后再看下ControllerChannelManager,從名字可以看出它管理Controller和集群Broker之間的連接,并為每個Broker創(chuàng)建一個RequestSendThread線程。

再小結(jié)一下
接著上個小結(jié),事件處理線程將事件隊列里面的事件處理之后再進(jìn)行對應(yīng)的請求封裝,塞入需要通知的集群Broker對應(yīng)的阻塞隊列中,然后由每個Broker專屬的requestSendThread發(fā)送請求至對應(yīng)的Broker。
總的步驟如下圖:

現(xiàn)在應(yīng)該已經(jīng)清楚Controller大概是如何運作的,整體看起來還是生產(chǎn)者-消費者模型。
接下來就進(jìn)入源碼環(huán)節(jié)。
Controller選舉流程源碼分析
事件處理的流程都是一樣的,只是具體處理的事件邏輯不同,我們從Controller選舉入手,來走一遍處理流程。
ControllerChangeHandler
選舉會觸發(fā)此handler,可以看到直接往ControllerEventManager的事件隊列里塞。

這個QueueEvent和ControllerEventManager,我們先來看看是啥。不過在此之前先了解下ControllerEvent和ControllerEventProcessor。
ControllerEvent:事件

ControllerEventProcessor : 事件處理接口
此接口的唯一實現(xiàn)類是 KafkaController。

ControllerEventManager:事件處理器
此類主要用來管理事件處理線程和事件隊列。

QueuedEvent:封裝了ControllerEvent的類
主要是記錄了下入隊時間,并且提供了事件需要調(diào)用的方法。

ControllerEventThread:事件處理線程
整體而言還是很簡單的,從隊列拿事件,然后處理。

KafkaController#process
就是個switch,根據(jù)事件調(diào)用對應(yīng)的processXXXX方法。

來關(guān)注下controller 重選事件

然后在onControllerFailover里面會調(diào)用sendUpdateMetadataRequest方法

中間省略調(diào)用,內(nèi)容太多了,不是重點,到后來調(diào)用ControllerBrokerRequestBatch#sendRequest

最后還是調(diào)用了controllerChannelManager#sendRequest.

然后 RequestSendThread#doWork,不斷從請求隊列里拿請求,發(fā)送請求。

一個環(huán)節(jié)完成了!我們來看下整體流程圖

最后我們來看下元數(shù)據(jù)到底有啥和KafkaController的一些字段。
ControllerContext:元數(shù)據(jù)
主要有運行中的Broker、所有主題信息、主題分區(qū)副本信息等。

KafkaController
基本上關(guān)鍵的字段都解釋了,關(guān)于狀態(tài)機(jī)那一塊篇幅有限,之后再說。

最后
整體的流程就是將Controller相關(guān)操作都封裝成一個個事件,然后將事件入隊,由一個事件處理線程來處理,保證數(shù)據(jù)的安全(從這也可以看出,不是多線程就是好,有利有弊最終還是看場景)。
最后在通知集群中Broker的過程是每個Broker配備一個發(fā)送線程,因為發(fā)送是同步的,因此每個Broker線程隔離可以防止某個Broker阻塞而導(dǎo)致整體都阻塞的情況。
前面有說到Kafka Controller 強(qiáng)依賴 ZooKeeper。但是現(xiàn)在社區(qū)打算移除 ZooKeeper,因為ZooKeeper不適合頻繁寫,并且是CP的。而且用Kafka還需要維護(hù)ZooKeeper集群,提升了系統(tǒng)的復(fù)雜度和運維難度,降低了系統(tǒng)的穩(wěn)定性。
像位移信息,已經(jīng)通過內(nèi)部主題的方式保存,繞開了ZooKeeper。
社區(qū)打算通過類 Raft 共識算法來選舉Controller,并且把元數(shù)據(jù)存儲在 Log 中的方式來做。
我是 yes,從一點點到億點點,我們下篇見。