Kafka核心組件之控制器和協(xié)調(diào)器

[TOC]

控制器

我們已經(jīng)知道Kafka的集群由n個的broker所組成,每個broker就是一個kafka的實例或者稱之為kafka的服務(wù)。其實控制器也是一個broker,控制器也叫l(wèi)eader broker。

他除了具有一般broker的功能外,還負責(zé)分區(qū)leader的選取,也就是負責(zé)選舉partition的leader replica。

控制器選舉

kafka每個broker啟動的時候,都會實例化一個KafkaController,并將broker的id注冊到zookeeper,集群在啟動過程中,通過選舉機制選舉出其中一個broker作為leader,也就是前面所說的控制器。

包括集群啟動在內(nèi),有三種情況觸發(fā)控制器選舉:

1、集群啟動

2、控制器所在代理發(fā)生故障

3、zookeeper心跳感知,控制器與自己的session過期

按照慣例,先看圖。我們根據(jù)下圖來講解集群啟動時,控制器選舉過程。

image.png

假設(shè)此集群有三個broker,同時啟動。

(一)3個broker從zookeeper獲取/controller臨時節(jié)點信息。/controller存儲的是選舉出來的leader信息。此舉是為了確認是否已經(jīng)存在leader。

(二)如果還沒有選舉出leader,那么此節(jié)點是不存在的,返回-1。如果返回的不是-1,而是leader的json數(shù)據(jù),那么說明已經(jīng)有l(wèi)eader存在,選舉結(jié)束。

(三)三個broker發(fā)現(xiàn)返回-1,了解到目前沒有l(wèi)eader,于是均會觸發(fā)向臨時節(jié)點/controller寫入自己的信息。最先寫入的就會成為leader。

(四)假設(shè)broker 0的速度最快,他先寫入了/controller節(jié)點,那么他就成為了leader。而broker1、broker2很不幸,因為晚了一步,他們在寫/controller的過程中會拋出ZkNodeExistsException,也就是zk告訴他們,此節(jié)點已經(jīng)存在了。

經(jīng)過以上四步,broker 0成功寫入/controller節(jié)點,其它broker寫入失敗了,所以broker 0成功當選leader。

此外zk中還有controller_epoch節(jié)點,存儲了leader的變更次數(shù),初始值為0,以后leader每變一次,該值+1。所有向控制器發(fā)起的請求,都會攜帶此值。如果控制器和自己內(nèi)存中比較,請求值小,說明kafka集群已經(jīng)發(fā)生了新的選舉,此請求過期,此請求無效。如果請求值大于控制器內(nèi)存的值,說明已經(jīng)有新的控制器當選了,自己已經(jīng)退位,請求無效。kafka通過controller_epoch保證集群控制器的唯一性及操作的一致性。

由此可見,Kafka控制器選舉就是看誰先爭搶到/controller節(jié)點寫入自身信息。

控制器初始化

控制器的初始化,其實是初始化控制器所用到的組件及監(jiān)聽器,準備元數(shù)據(jù)。

前面提到過每個broker都會實例化并啟動一個KafkaController。KafkaController和他的組件關(guān)系,以及各個組件的介紹如下圖:

image.png

圖中箭頭為組件層級關(guān)系,組件下面還會再初始化其他組件??梢娍刂破鲀?nèi)部還是有些復(fù)雜的,主要有以下組件:

1、ControllerContext,此對象存儲了控制器工作需要的所有上下文信息,包括存活的代理、所有主題及分區(qū)分配方案、每個分區(qū)的AR、leader、ISR等信息。

image.png

2、一系列的listener,通過對zookeeper的監(jiān)聽,觸發(fā)相應(yīng)的操作,黃色的框的均為listener


image.png

3、分區(qū)和副本狀態(tài)機,管理分區(qū)和副本。


image.png

4、當前代理選舉器ZookeeperLeaderElector,此選舉器有上位和退位的相關(guān)回調(diào)方法。

5、分區(qū)leader選舉器,PartitionLeaderSelector

6、主題刪除管理器,TopicDeletetionManager

7、leader向broker批量通信的ControllerBrokerRequestBatch。緩存狀態(tài)機處理后產(chǎn)生的request,然后統(tǒng)一發(fā)送出去。

8、控制器平衡操作的KafkaScheduler,僅在broker作為leader時有效。

Kafka集群的一些重要信息都記錄在ZK中,比如集群的所有代理節(jié)點、主題的所有分區(qū)、分區(qū)的副本信息(副本集、主副本、同步的副本集)。每個broker都有一個控制器,為了管理整個集群Kafka選利用zk選舉模式,為整個集群選舉一個“中央控制器”或”主控制器“,控制器其實就是一個broker節(jié)點,除了一般broker功能外,還具有分區(qū)首領(lǐng)選舉功能。中央控制器管理所有節(jié)點的信息,并通過向ZK注冊各種監(jiān)聽事件來管理整個集群節(jié)點、分區(qū)的leader的選舉、再平衡等問題。外部事件會更新ZK的數(shù)據(jù),ZK中的數(shù)據(jù)一旦發(fā)生變化,控制器都要做不同的響應(yīng)處理。

故障轉(zhuǎn)移

故障轉(zhuǎn)移其實就是leader所在broker發(fā)生故障,leader轉(zhuǎn)移為其他的broker。轉(zhuǎn)移的過程就是重新選舉leader的過程。

重新選舉leader后,需要為該broker注冊相應(yīng)權(quán)限,調(diào)用的是ZookeeperLeaderElector的onControllerFailover()方法。在這個方法中初始化和啟動了一系列的組件來完成leader的各種操作。具體如下,其實和控制器初始化有很大的相似度。

1、注冊分區(qū)管理的相關(guān)監(jiān)聽器

2、注冊主題管理的相關(guān)監(jiān)聽

3、注冊代理變化監(jiān)聽器

4、重新初始化ControllerContext,

5、啟動控制器和其他代理之間通信的ControllerChannelManager

6、創(chuàng)建用于刪除主題的TopicDeletionManager對象,并啟動。

7、啟動分區(qū)狀態(tài)機和副本狀態(tài)機

8、輪詢每個主題,添加監(jiān)聽分區(qū)變化的PartitionModificationsListener

9、如果設(shè)置了分區(qū)平衡定時操作,那么創(chuàng)建分區(qū)平衡的定時任務(wù),默認300秒檢查并執(zhí)行。

除了這些組件的啟動外,onControllerFailover方法中還做了如下操作:

1、/controller_epoch值+1,并且更新到ControllerContext

2、檢查是否出發(fā)分區(qū)重分配,并做相關(guān)操作

3、檢查需要將優(yōu)先副本選為leader,并做相關(guān)操作

4、向kafka集群所有代理發(fā)送更新元數(shù)據(jù)的請求。

下面來看leader權(quán)限被取消時,調(diào)用的方法onControllerResignation

1、該方法中注銷了控制器的權(quán)限。取消在zookeeper中對于分區(qū)、副本感知的相應(yīng)監(jiān)聽器的監(jiān)聽。

2、關(guān)閉啟動的各個組件

3、最后把ControllerContext中記錄控制器版本的數(shù)值清零,并設(shè)置當前broker為RunnignAsBroker,變?yōu)槠胀ǖ腷roker。

通過對控制器啟動過程的學(xué)習(xí),我們應(yīng)該已經(jīng)對kafka工作的原理有了了解,核心是監(jiān)聽zookeeper的相關(guān)節(jié)點,節(jié)點變化時觸發(fā)相應(yīng)的操作。

代理上下線

有新的broker加入集群時,稱為代理上線。反之,當broker關(guān)閉,推出集群時,稱為代理下線。

代理上線:

1、新代理啟動時向/brokers/ids寫數(shù)據(jù)

2、BrokerChangeListener監(jiān)聽到變化。對新上線節(jié)點調(diào)用controllerChannelManager.addBroker(),完成新上線代理網(wǎng)絡(luò)層初始化

3、調(diào)用KafkaController.onBrokerStartup()處理

3.1通過向所有代理發(fā)送UpdateMetadataRequest,告訴所有代理有新代理加入

3.2根據(jù)分配給新上線節(jié)點的副本集合,對副本狀態(tài)做變遷。對分區(qū)也進行處理。

3.3觸發(fā)一次leader選舉,確認新加入的是否為分區(qū)leader

3.4輪詢分配給新broker的副本,調(diào)用KafkaController.onPartitionReassignment(),執(zhí)行分區(qū)副本分配

3.5恢復(fù)因新代理上線暫停的刪除主題操作線程

代理下線:

1、查找下線節(jié)點集合

2、輪詢下線節(jié)點,調(diào)用controllerChannelManager.removeBroker(),關(guān)閉每個下線節(jié)點網(wǎng)絡(luò)連接。清空下線節(jié)點消息隊列,關(guān)閉下線節(jié)點request請求

3、輪詢下線節(jié)點,調(diào)用KafkaController.onBrokerFailure處理

3.1處理leader副本在下線節(jié)點上上的分區(qū),重新選出leader副本,發(fā)送updateMetadataRequest請求。

3.2處理下線節(jié)點上的副本集合,做下線處理,從ISR集合中刪除,不再同步,發(fā)送updateMetadataRequest請求。

4、向集群全部存活代理發(fā)送updateMetadataRequest請求

協(xié)調(diào)器

顧名思義,協(xié)調(diào)器負責(zé)協(xié)調(diào)工作。本節(jié)所講的協(xié)調(diào)器,是用來協(xié)調(diào)消費者工作分配的。簡單點說,就是消費者啟動后,到可以正常消費前,這個階段的初始化工作。消費者能夠正常運轉(zhuǎn)起來,全有賴于協(xié)調(diào)器。

主要的協(xié)調(diào)器有如下兩個:

1、消費者協(xié)調(diào)器(ConsumerCoordinator)

2、組協(xié)調(diào)器(GroupCoordinator)

kafka引入?yún)f(xié)調(diào)器有其歷史過程,原來consumer信息依賴于zookeeper存儲,當代理或消費者發(fā)生變化時,引發(fā)消費者平衡,此時消費者之間是互不透明的,每個消費者和zookeeper單獨通信,容易造成羊群效應(yīng)和腦裂問題。

為了解決這些問題,kafka引入了協(xié)調(diào)器。服務(wù)端引入組協(xié)調(diào)器(GroupCoordinator),消費者端引入消費者協(xié)調(diào)器(ConsumerCoordinator)。每個broker啟動的時候,都會創(chuàng)建GroupCoordinator實例,管理部分消費組(集群負載均衡)和組下每個消費者消費的偏移量(offset)。每個consumer實例化時,同時實例化一個ConsumerCoordinator對象,負責(zé)同一個消費組下各個消費者和服務(wù)端組協(xié)調(diào)器之前的通信。如下圖:

image.png

2.1 消費者協(xié)調(diào)器

消費者協(xié)調(diào)器,可以看作是消費者做操作的代理類(其實并不是),消費者很多操作通過消費者協(xié)調(diào)器進行處理。

消費者協(xié)調(diào)器主要負責(zé)如下工作:

1、更新消費者緩存的MetaData

2、向組協(xié)調(diào)器申請加入組

3、消費者加入組后的相應(yīng)處理

4、請求離開消費組

5、向組協(xié)調(diào)器提交偏移量

6、通過心跳,保持組協(xié)調(diào)器的連接感知。

7、被組協(xié)調(diào)器選為leader的消費者的協(xié)調(diào)器,負責(zé)消費者分區(qū)分配。分配結(jié)果發(fā)送給組協(xié)調(diào)器。

8、非leader的消費者,通過消費者協(xié)調(diào)器和組協(xié)調(diào)器同步分配結(jié)果。

消費者協(xié)調(diào)器主要依賴的組件和說明見下圖:

image.png

可以看到這些組件和消費者協(xié)調(diào)器擔負的工作是可以對照上的。

2.2 組協(xié)調(diào)器

組協(xié)調(diào)器負責(zé)處理消費者協(xié)調(diào)器發(fā)過來的各種請求。它主要提供如下功能:

  1. 在與之連接的消費者中選舉出消費者leader
  2. 下發(fā)leader消費者返回的消費者分區(qū)分配結(jié)果給所有的消費者
  3. 管理消費者的消費偏移量提交,保存在kafka的內(nèi)部主題中
  4. 和消費者心跳保持,知道哪些消費者已經(jīng)死掉,組中存活的消費者是哪些。

組協(xié)調(diào)器在broker啟動的時候?qū)嵗?,每個組協(xié)調(diào)器負責(zé)一部分消費組的管理。它主要依賴的組件見下圖:

image.png

這些組件也是和組協(xié)調(diào)器的功能能夠?qū)?yīng)上的。具體內(nèi)容不在詳述。

2.3 消費者入組過程

下圖展示了消費者啟動選取leader、入組的過程。

image.png

消費者入組的過程,很好的展示了消費者協(xié)調(diào)器和組協(xié)調(diào)器之間是如何配合工作的。leader consumer會承擔分區(qū)分配的工作,這樣kafka集群的壓力會小很多。同組的consumer通過組協(xié)調(diào)器保持同步。消費者和分區(qū)的對應(yīng)關(guān)系持久化在kafka內(nèi)部主題。

2.4 消費偏移量管理

消費者消費時,會在本地維護消費到的位置(offset),就是偏移量,這樣下次消費才知道從哪里開始消費。如果整個環(huán)境沒有變化,這樣做就足夠了。但一旦消費者平衡操作或者分區(qū)變化后,消費者不再對應(yīng)原來的分區(qū),而每個消費者的offset也沒有同步到服務(wù)器,這樣就無法接著前任的工作繼續(xù)進行了。

因此只有把消費偏移量定期發(fā)送到服務(wù)器,由GroupCoordinator集中式管理,分區(qū)重分配后,各個消費者從GroupCoordinator讀取自己對應(yīng)分區(qū)的offset,在新的分區(qū)上繼續(xù)前任的工作。

下圖展示了不提交offset到服務(wù)端的問題:

image.png

開始時,consumer 0消費partition 0 和1,后來由于新的consumer 2入組,分區(qū)重新進行了分配。consumer 0不再消費partition2,而由consumer 2來消費partition 2,但由于consumer之間是不能通訊的,所有consumer2并不知道從哪里開始自己的消費。

因此consumer需要定期提交自己消費的offset到服務(wù)端,這樣在重分區(qū)操作后,每個consumer都能在服務(wù)端查到分配給自己的partition所消費到的offset,繼續(xù)消費。

由于kafka有高可用和橫向擴展的特性,當有新的分區(qū)出現(xiàn)或者新的消費入組后,需要重新分配消費者對應(yīng)的分區(qū),所以如果偏移量提交的有問題,會重復(fù)消費或者丟消息。偏移量提交的時機和方式要格外注意??!

2.4.1 偏移量有兩種提交方式

1、自動提交偏移量

設(shè)置 enable.auto.commit為true,設(shè)定好周期,默認5s。消費者每次調(diào)用輪詢消息的poll() 方法時,會檢查是否超過了5s沒有提交偏移量,如果是,提交上一次輪詢返回的偏移量。

這樣做很方便,但是會帶來重復(fù)消費的問題。假如最近一次偏移量提交3s后,觸發(fā)了再均衡,服務(wù)器端存儲的還是上次提交的偏移量,那么再均衡結(jié)束后,新的消費者會從最后一次提交的偏移量開始拉取消息,此3s內(nèi)消費的消息會被重復(fù)消費。

2、手動提交偏移量

設(shè)置 enable.auto.commit為false。程序中手動調(diào)用commitSync()提交偏移量,此時提交的是poll方法返回的最新的偏移量。

commitSync()是同步提交偏移量,主程序會一直阻塞,偏移量提交成功后才往下運行。這樣會限制程序的吞吐量。如果降低提交頻次,又很容易發(fā)生重復(fù)消費。

這里我們可以使用commitAsync()異步提交偏移量。只管提交,而不會等待broker返回提交結(jié)果

commitSync只要沒有發(fā)生不可恢復(fù)錯誤,會進行重試,直到成功。而commitAsync不會進行重試,失敗就是失敗了。commitAsync不重試,是因為重試提交時,可能已經(jīng)有其它更大偏移量已經(jīng)提交成功了,如果此時重試提交成功,那么更小的偏移量會覆蓋大的偏移量。那么如果此時發(fā)生再均衡,新的消費者將會重復(fù)消費消息。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容