[TOC]
控制器
我們已經(jīng)知道Kafka的集群由n個的broker所組成,每個broker就是一個kafka的實例或者稱之為kafka的服務(wù)。其實控制器也是一個broker,控制器也叫l(wèi)eader broker。
他除了具有一般broker的功能外,還負責(zé)分區(qū)leader的選取,也就是負責(zé)選舉partition的leader replica。
控制器選舉
kafka每個broker啟動的時候,都會實例化一個KafkaController,并將broker的id注冊到zookeeper,集群在啟動過程中,通過選舉機制選舉出其中一個broker作為leader,也就是前面所說的控制器。
包括集群啟動在內(nèi),有三種情況觸發(fā)控制器選舉:
1、集群啟動
2、控制器所在代理發(fā)生故障
3、zookeeper心跳感知,控制器與自己的session過期
按照慣例,先看圖。我們根據(jù)下圖來講解集群啟動時,控制器選舉過程。

假設(shè)此集群有三個broker,同時啟動。
(一)3個broker從zookeeper獲取/controller臨時節(jié)點信息。/controller存儲的是選舉出來的leader信息。此舉是為了確認是否已經(jīng)存在leader。
(二)如果還沒有選舉出leader,那么此節(jié)點是不存在的,返回-1。如果返回的不是-1,而是leader的json數(shù)據(jù),那么說明已經(jīng)有l(wèi)eader存在,選舉結(jié)束。
(三)三個broker發(fā)現(xiàn)返回-1,了解到目前沒有l(wèi)eader,于是均會觸發(fā)向臨時節(jié)點/controller寫入自己的信息。最先寫入的就會成為leader。
(四)假設(shè)broker 0的速度最快,他先寫入了/controller節(jié)點,那么他就成為了leader。而broker1、broker2很不幸,因為晚了一步,他們在寫/controller的過程中會拋出ZkNodeExistsException,也就是zk告訴他們,此節(jié)點已經(jīng)存在了。
經(jīng)過以上四步,broker 0成功寫入/controller節(jié)點,其它broker寫入失敗了,所以broker 0成功當選leader。
此外zk中還有controller_epoch節(jié)點,存儲了leader的變更次數(shù),初始值為0,以后leader每變一次,該值+1。所有向控制器發(fā)起的請求,都會攜帶此值。如果控制器和自己內(nèi)存中比較,請求值小,說明kafka集群已經(jīng)發(fā)生了新的選舉,此請求過期,此請求無效。如果請求值大于控制器內(nèi)存的值,說明已經(jīng)有新的控制器當選了,自己已經(jīng)退位,請求無效。kafka通過controller_epoch保證集群控制器的唯一性及操作的一致性。
由此可見,Kafka控制器選舉就是看誰先爭搶到/controller節(jié)點寫入自身信息。
控制器初始化
控制器的初始化,其實是初始化控制器所用到的組件及監(jiān)聽器,準備元數(shù)據(jù)。
前面提到過每個broker都會實例化并啟動一個KafkaController。KafkaController和他的組件關(guān)系,以及各個組件的介紹如下圖:

圖中箭頭為組件層級關(guān)系,組件下面還會再初始化其他組件??梢娍刂破鲀?nèi)部還是有些復(fù)雜的,主要有以下組件:
1、ControllerContext,此對象存儲了控制器工作需要的所有上下文信息,包括存活的代理、所有主題及分區(qū)分配方案、每個分區(qū)的AR、leader、ISR等信息。

2、一系列的listener,通過對zookeeper的監(jiān)聽,觸發(fā)相應(yīng)的操作,黃色的框的均為listener

3、分區(qū)和副本狀態(tài)機,管理分區(qū)和副本。

4、當前代理選舉器ZookeeperLeaderElector,此選舉器有上位和退位的相關(guān)回調(diào)方法。
5、分區(qū)leader選舉器,PartitionLeaderSelector
6、主題刪除管理器,TopicDeletetionManager
7、leader向broker批量通信的ControllerBrokerRequestBatch。緩存狀態(tài)機處理后產(chǎn)生的request,然后統(tǒng)一發(fā)送出去。
8、控制器平衡操作的KafkaScheduler,僅在broker作為leader時有效。
Kafka集群的一些重要信息都記錄在ZK中,比如集群的所有代理節(jié)點、主題的所有分區(qū)、分區(qū)的副本信息(副本集、主副本、同步的副本集)。每個broker都有一個控制器,為了管理整個集群Kafka選利用zk選舉模式,為整個集群選舉一個“中央控制器”或”主控制器“,控制器其實就是一個broker節(jié)點,除了一般broker功能外,還具有分區(qū)首領(lǐng)選舉功能。中央控制器管理所有節(jié)點的信息,并通過向ZK注冊各種監(jiān)聽事件來管理整個集群節(jié)點、分區(qū)的leader的選舉、再平衡等問題。外部事件會更新ZK的數(shù)據(jù),ZK中的數(shù)據(jù)一旦發(fā)生變化,控制器都要做不同的響應(yīng)處理。
故障轉(zhuǎn)移
故障轉(zhuǎn)移其實就是leader所在broker發(fā)生故障,leader轉(zhuǎn)移為其他的broker。轉(zhuǎn)移的過程就是重新選舉leader的過程。
重新選舉leader后,需要為該broker注冊相應(yīng)權(quán)限,調(diào)用的是ZookeeperLeaderElector的onControllerFailover()方法。在這個方法中初始化和啟動了一系列的組件來完成leader的各種操作。具體如下,其實和控制器初始化有很大的相似度。
1、注冊分區(qū)管理的相關(guān)監(jiān)聽器
2、注冊主題管理的相關(guān)監(jiān)聽
3、注冊代理變化監(jiān)聽器
4、重新初始化ControllerContext,
5、啟動控制器和其他代理之間通信的ControllerChannelManager
6、創(chuàng)建用于刪除主題的TopicDeletionManager對象,并啟動。
7、啟動分區(qū)狀態(tài)機和副本狀態(tài)機
8、輪詢每個主題,添加監(jiān)聽分區(qū)變化的PartitionModificationsListener
9、如果設(shè)置了分區(qū)平衡定時操作,那么創(chuàng)建分區(qū)平衡的定時任務(wù),默認300秒檢查并執(zhí)行。
除了這些組件的啟動外,onControllerFailover方法中還做了如下操作:
1、/controller_epoch值+1,并且更新到ControllerContext
2、檢查是否出發(fā)分區(qū)重分配,并做相關(guān)操作
3、檢查需要將優(yōu)先副本選為leader,并做相關(guān)操作
4、向kafka集群所有代理發(fā)送更新元數(shù)據(jù)的請求。
下面來看leader權(quán)限被取消時,調(diào)用的方法onControllerResignation
1、該方法中注銷了控制器的權(quán)限。取消在zookeeper中對于分區(qū)、副本感知的相應(yīng)監(jiān)聽器的監(jiān)聽。
2、關(guān)閉啟動的各個組件
3、最后把ControllerContext中記錄控制器版本的數(shù)值清零,并設(shè)置當前broker為RunnignAsBroker,變?yōu)槠胀ǖ腷roker。
通過對控制器啟動過程的學(xué)習(xí),我們應(yīng)該已經(jīng)對kafka工作的原理有了了解,核心是監(jiān)聽zookeeper的相關(guān)節(jié)點,節(jié)點變化時觸發(fā)相應(yīng)的操作。
代理上下線
有新的broker加入集群時,稱為代理上線。反之,當broker關(guān)閉,推出集群時,稱為代理下線。
代理上線:
1、新代理啟動時向/brokers/ids寫數(shù)據(jù)
2、BrokerChangeListener監(jiān)聽到變化。對新上線節(jié)點調(diào)用controllerChannelManager.addBroker(),完成新上線代理網(wǎng)絡(luò)層初始化
3、調(diào)用KafkaController.onBrokerStartup()處理
3.1通過向所有代理發(fā)送UpdateMetadataRequest,告訴所有代理有新代理加入
3.2根據(jù)分配給新上線節(jié)點的副本集合,對副本狀態(tài)做變遷。對分區(qū)也進行處理。
3.3觸發(fā)一次leader選舉,確認新加入的是否為分區(qū)leader
3.4輪詢分配給新broker的副本,調(diào)用KafkaController.onPartitionReassignment(),執(zhí)行分區(qū)副本分配
3.5恢復(fù)因新代理上線暫停的刪除主題操作線程
代理下線:
1、查找下線節(jié)點集合
2、輪詢下線節(jié)點,調(diào)用controllerChannelManager.removeBroker(),關(guān)閉每個下線節(jié)點網(wǎng)絡(luò)連接。清空下線節(jié)點消息隊列,關(guān)閉下線節(jié)點request請求
3、輪詢下線節(jié)點,調(diào)用KafkaController.onBrokerFailure處理
3.1處理leader副本在下線節(jié)點上上的分區(qū),重新選出leader副本,發(fā)送updateMetadataRequest請求。
3.2處理下線節(jié)點上的副本集合,做下線處理,從ISR集合中刪除,不再同步,發(fā)送updateMetadataRequest請求。
4、向集群全部存活代理發(fā)送updateMetadataRequest請求
協(xié)調(diào)器
顧名思義,協(xié)調(diào)器負責(zé)協(xié)調(diào)工作。本節(jié)所講的協(xié)調(diào)器,是用來協(xié)調(diào)消費者工作分配的。簡單點說,就是消費者啟動后,到可以正常消費前,這個階段的初始化工作。消費者能夠正常運轉(zhuǎn)起來,全有賴于協(xié)調(diào)器。
主要的協(xié)調(diào)器有如下兩個:
1、消費者協(xié)調(diào)器(ConsumerCoordinator)
2、組協(xié)調(diào)器(GroupCoordinator)
kafka引入?yún)f(xié)調(diào)器有其歷史過程,原來consumer信息依賴于zookeeper存儲,當代理或消費者發(fā)生變化時,引發(fā)消費者平衡,此時消費者之間是互不透明的,每個消費者和zookeeper單獨通信,容易造成羊群效應(yīng)和腦裂問題。
為了解決這些問題,kafka引入了協(xié)調(diào)器。服務(wù)端引入組協(xié)調(diào)器(GroupCoordinator),消費者端引入消費者協(xié)調(diào)器(ConsumerCoordinator)。每個broker啟動的時候,都會創(chuàng)建GroupCoordinator實例,管理部分消費組(集群負載均衡)和組下每個消費者消費的偏移量(offset)。每個consumer實例化時,同時實例化一個ConsumerCoordinator對象,負責(zé)同一個消費組下各個消費者和服務(wù)端組協(xié)調(diào)器之前的通信。如下圖:

2.1 消費者協(xié)調(diào)器
消費者協(xié)調(diào)器,可以看作是消費者做操作的代理類(其實并不是),消費者很多操作通過消費者協(xié)調(diào)器進行處理。
消費者協(xié)調(diào)器主要負責(zé)如下工作:
1、更新消費者緩存的MetaData
2、向組協(xié)調(diào)器申請加入組
3、消費者加入組后的相應(yīng)處理
4、請求離開消費組
5、向組協(xié)調(diào)器提交偏移量
6、通過心跳,保持組協(xié)調(diào)器的連接感知。
7、被組協(xié)調(diào)器選為leader的消費者的協(xié)調(diào)器,負責(zé)消費者分區(qū)分配。分配結(jié)果發(fā)送給組協(xié)調(diào)器。
8、非leader的消費者,通過消費者協(xié)調(diào)器和組協(xié)調(diào)器同步分配結(jié)果。
消費者協(xié)調(diào)器主要依賴的組件和說明見下圖:

可以看到這些組件和消費者協(xié)調(diào)器擔負的工作是可以對照上的。
2.2 組協(xié)調(diào)器
組協(xié)調(diào)器負責(zé)處理消費者協(xié)調(diào)器發(fā)過來的各種請求。它主要提供如下功能:
- 在與之連接的消費者中選舉出消費者leader
- 下發(fā)leader消費者返回的消費者分區(qū)分配結(jié)果給所有的消費者
- 管理消費者的消費偏移量提交,保存在kafka的內(nèi)部主題中
- 和消費者心跳保持,知道哪些消費者已經(jīng)死掉,組中存活的消費者是哪些。
組協(xié)調(diào)器在broker啟動的時候?qū)嵗?,每個組協(xié)調(diào)器負責(zé)一部分消費組的管理。它主要依賴的組件見下圖:

這些組件也是和組協(xié)調(diào)器的功能能夠?qū)?yīng)上的。具體內(nèi)容不在詳述。
2.3 消費者入組過程
下圖展示了消費者啟動選取leader、入組的過程。

消費者入組的過程,很好的展示了消費者協(xié)調(diào)器和組協(xié)調(diào)器之間是如何配合工作的。leader consumer會承擔分區(qū)分配的工作,這樣kafka集群的壓力會小很多。同組的consumer通過組協(xié)調(diào)器保持同步。消費者和分區(qū)的對應(yīng)關(guān)系持久化在kafka內(nèi)部主題。
2.4 消費偏移量管理
消費者消費時,會在本地維護消費到的位置(offset),就是偏移量,這樣下次消費才知道從哪里開始消費。如果整個環(huán)境沒有變化,這樣做就足夠了。但一旦消費者平衡操作或者分區(qū)變化后,消費者不再對應(yīng)原來的分區(qū),而每個消費者的offset也沒有同步到服務(wù)器,這樣就無法接著前任的工作繼續(xù)進行了。
因此只有把消費偏移量定期發(fā)送到服務(wù)器,由GroupCoordinator集中式管理,分區(qū)重分配后,各個消費者從GroupCoordinator讀取自己對應(yīng)分區(qū)的offset,在新的分區(qū)上繼續(xù)前任的工作。
下圖展示了不提交offset到服務(wù)端的問題:

開始時,consumer 0消費partition 0 和1,后來由于新的consumer 2入組,分區(qū)重新進行了分配。consumer 0不再消費partition2,而由consumer 2來消費partition 2,但由于consumer之間是不能通訊的,所有consumer2并不知道從哪里開始自己的消費。
因此consumer需要定期提交自己消費的offset到服務(wù)端,這樣在重分區(qū)操作后,每個consumer都能在服務(wù)端查到分配給自己的partition所消費到的offset,繼續(xù)消費。
由于kafka有高可用和橫向擴展的特性,當有新的分區(qū)出現(xiàn)或者新的消費入組后,需要重新分配消費者對應(yīng)的分區(qū),所以如果偏移量提交的有問題,會重復(fù)消費或者丟消息。偏移量提交的時機和方式要格外注意??!
2.4.1 偏移量有兩種提交方式
1、自動提交偏移量
設(shè)置 enable.auto.commit為true,設(shè)定好周期,默認5s。消費者每次調(diào)用輪詢消息的poll() 方法時,會檢查是否超過了5s沒有提交偏移量,如果是,提交上一次輪詢返回的偏移量。
這樣做很方便,但是會帶來重復(fù)消費的問題。假如最近一次偏移量提交3s后,觸發(fā)了再均衡,服務(wù)器端存儲的還是上次提交的偏移量,那么再均衡結(jié)束后,新的消費者會從最后一次提交的偏移量開始拉取消息,此3s內(nèi)消費的消息會被重復(fù)消費。
2、手動提交偏移量
設(shè)置 enable.auto.commit為false。程序中手動調(diào)用commitSync()提交偏移量,此時提交的是poll方法返回的最新的偏移量。
commitSync()是同步提交偏移量,主程序會一直阻塞,偏移量提交成功后才往下運行。這樣會限制程序的吞吐量。如果降低提交頻次,又很容易發(fā)生重復(fù)消費。
這里我們可以使用commitAsync()異步提交偏移量。只管提交,而不會等待broker返回提交結(jié)果
commitSync只要沒有發(fā)生不可恢復(fù)錯誤,會進行重試,直到成功。而commitAsync不會進行重試,失敗就是失敗了。commitAsync不重試,是因為重試提交時,可能已經(jīng)有其它更大偏移量已經(jīng)提交成功了,如果此時重試提交成功,那么更小的偏移量會覆蓋大的偏移量。那么如果此時發(fā)生再均衡,新的消費者將會重復(fù)消費消息。