KafkaController分析1-選主和Failover

  • Controller這個(gè)角色是在kafka 0.8以后添加的,它負(fù)責(zé)的功能很多;
  • Topic的創(chuàng)始, Partition leader的選取, Partition的增加, PartitionReassigned, PreferredReplicaElection, Topic的刪除等;

選主

Kafkak中有多處涉及到選主和failover, 比如Controller, 比如Partition leader. 我們先來(lái)看下和選主有關(guān)的類(lèi);

LeaderElector

  • 所在文件: core/src/main/scala/kafka/server/LeaderElector.scala
  • 是個(gè)trait, 源碼中的注釋:

This trait defines a leader elector If the existing leader is dead, this class will handle automatic re-election and if it succeeds, it invokes the leader state change callback

  • 接口:
trait LeaderElector extends Logging {
       def startup // 啟動(dòng)
       def amILeader : Boolean //標(biāo)識(shí)是否為主
       def elect: Boolean //選主
       def close  //關(guān)閉
}

ZookeeperLeaderElector

  • 所在文件: core/src/main/scala/kafka/server/ZookeeperLeaderElector.scala
  • 實(shí)現(xiàn)了 trait LeaderElector
  • 基于zookeeper臨時(shí)節(jié)點(diǎn)的搶占式選主策略, 多個(gè)備選者都去zk上注冊(cè)同一個(gè)臨時(shí)節(jié)點(diǎn), 但zk保證同時(shí)只有一個(gè)備選者注冊(cè)成功, 此備選者即成為leader, 然后大家都watch這個(gè)臨時(shí)節(jié)點(diǎn), 一旦此臨時(shí)節(jié)點(diǎn)消失, watcher被觸發(fā), 各備選者又一次開(kāi)始搶占選主;
  • startup方法: 先watch這個(gè)zk節(jié)點(diǎn), 然后調(diào)用elect;
def startup {
    inLock(controllerContext.controllerLock) {
      controllerContext.zkUtils.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
      elect
    }
  }
  • elect方法:
zookeeper_leader_elect.png
  • controllerContext.zkUtils.zkClient.subscribeDataChanges(electionPath, leaderChangeListener) 這個(gè)leaderChangeListener被觸發(fā)時(shí):
1. 臨時(shí)節(jié)點(diǎn)數(shù)據(jù)發(fā)生變化handleDataChange: 如果改變前是leader, 改變后不是leader, 則回調(diào)onResigningAsLeader();
2. 臨時(shí)節(jié)點(diǎn)被刪除handleDataDeleted: 如果當(dāng)前是leader, 則回調(diào)onResigningAsLeader()并同次調(diào)用elect開(kāi)始搶占式選主;

KafkaController的選主與Failover

  • 使用ZookeeperLeaderElector作選主和Failover
private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
    onControllerResignation, config.brokerId)
  • 在zk上的臨時(shí)節(jié)點(diǎn): ZkUtils.ControllerPath = /controller
  • KafkaController::startup:
def startup() = {
    inLock(controllerContext.controllerLock) {
      info("Controller starting up")
      registerSessionExpirationListener()
      isRunning = true
      controllerElector.startup
      info("Controller startup complete")
    }
  }

其中
registerSessionExpirationListener() 注冊(cè)zk連接的狀態(tài)回調(diào),處理SessionExpiration;
controllerElector.startup 開(kāi)始選主和Failover;

  • onControllerFailover: 變?yōu)閘eader時(shí)被回調(diào),
    設(shè)置當(dāng)前broker的狀態(tài)為RunningAsController 作下面的事情:

This callback is invoked by the zookeeper leader elector on electing the current broker as the new controller.
It does the following things on the become-controller state change -
1. Register controller epoch changed listener
2. Increments the controller epoch
3. Initializes the controller's context object that holds cache objects for current topics, live brokers and leaders for all existing partitions.
4. Starts the controller's channel manager
5. Starts the replica state machine
6. Starts the partition state machine

KafkaController分析2-NetworkClient分析
Kafka源碼分析-匯總
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 我們都知道, Kafka的每個(gè)Topic的存儲(chǔ)在邏輯上分成若干個(gè)Partition,每個(gè)Partition又可以設(shè)...
    掃帚的影子閱讀 2,503評(píng)論 0 0
  • 本文轉(zhuǎn)載自http://dataunion.org/?p=9307 背景介紹Kafka簡(jiǎn)介Kafka是一種分布式的...
    Bottle丶Fish閱讀 5,581評(píng)論 0 34
  • 從本章開(kāi)始我們來(lái)介紹一個(gè)kafka集群逐步建立的過(guò)程; 集群中只有一臺(tái)broker; topic的創(chuàng)建; 增加多臺(tái)...
    掃帚的影子閱讀 2,243評(píng)論 2 5
  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,502評(píng)論 19 139
  • 一個(gè)真正的寫(xiě)數(shù)據(jù)流程是怎么樣的?一個(gè)真正的讀數(shù)據(jù)流程是怎么樣的?一個(gè)真正的同步數(shù)據(jù)流程是怎么樣的?從哪里到哪里?什...
    時(shí)待吾閱讀 4,308評(píng)論 0 14

友情鏈接更多精彩內(nèi)容