前面我們已經(jīng)分析了KafkaController中使用的一系列組件, 從本章開始,我們開始介紹KafkaController的各個功能:
KafkaController分析1-選主和Failover
KafkaController分析2-NetworkClient分析
KafkaController分析3-ControllerChannelManager
KafkaController分析4-Partition選主
KafkaController分析5-Partition狀態(tài)機(jī)
KafkaController分析6-Replica狀態(tài)機(jī)
KafkaController啟動流程
- 注冊zk的SessionExpiration事件通知:
registerSessionExpirationListener, 當(dāng)session到期且新session建立后,進(jìn)行controller的重新選主;
def handleNewSession() {
info("ZK expired; shut down all controller components and try to re-elect")
inLock(controllerContext.controllerLock) {
onControllerResignation()
controllerElector.elect
}
}
- 啟動 ZookeeperLeaderElector:
controllerElector.startup. 如果當(dāng)前broker成功選為Controller, 則onControllerFailover回調(diào)被觸發(fā).
readControllerEpochFromZookeeper()
incrementControllerEpoch(zkUtils.zkClient)
registerReassignedPartitionsListener()
registerIsrChangeNotificationListener()
registerPreferredReplicaElectionListener()
partitionStateMachine.registerListeners()
replicaStateMachine.registerListeners()
initializeControllerContext()
replicaStateMachine.startup()
partitionStateMachine.startup()
brokerState.newState(RunningAsController)
maybeTriggerPartitionReassignment()
maybeTriggerPreferredReplicaElection()
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
if (config.autoLeaderRebalanceEnable) {
autoRebalanceScheduler.startup()
autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance,
5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS)
}
deleteTopicManager.start()
- 更新zk上的controller epoch信息;
- 注冊zk上的broker/topic節(jié)點(diǎn)變化事件通知;
- 初始化ControllerContext, 主要是從zk上獲取broker, topic, parition, isr, partition leader, replicas等信息;
- 啟動ReplicaStateMachine;
- 啟動PartitionStateMachine;
- 發(fā)送所有的partition信息(leader, isr, replica, epoch等)到所有的 live brokers;
- 如果允許自動leader rebalance的話, 則啟動AutoRebalanceScheduler;
- 啟動TopicDeletionManager;
- KafkaController的啟動圖解:

KafkaController.png