[TOC]
上篇講述了 KafkaController 的啟動(dòng)流程,但是關(guān)于分區(qū)狀態(tài)機(jī)和副本狀態(tài)機(jī)的初始化并沒(méi)有觸及,分區(qū)狀態(tài)機(jī)和副本狀態(tài)機(jī)的內(nèi)容將在本篇文章深入講述。分區(qū)狀態(tài)機(jī)記錄著當(dāng)前集群所有 Partition 的狀態(tài)信息以及如何對(duì) Partition 狀態(tài)轉(zhuǎn)移進(jìn)行相應(yīng)的處理;副本狀態(tài)機(jī)則是記錄著當(dāng)前集群所有 Replica 的狀態(tài)信息以及如何對(duì) Replica 狀態(tài)轉(zhuǎn)變進(jìn)行相應(yīng)的處理。
ReplicaStateMachine
ReplicaStateMachine 記錄著集群所有 Replica 的狀態(tài)信息,它決定著一個(gè) replica 處在什么狀態(tài)以及它在什么狀態(tài)下可以轉(zhuǎn)變?yōu)槭裁礌顟B(tài),Kafka 中副本的狀態(tài)總共有以下七種類型:
- NewReplica:這種狀態(tài)下 Controller 可以創(chuàng)建這個(gè) Replica,這種狀態(tài)下該 Replica 只能作為 follower,它可以是 Replica 刪除后的一個(gè)臨時(shí)狀態(tài),它有效的前置狀態(tài)是 NonExistentReplica;
- OnlineReplica:一旦這個(gè) Replica 被分配到指定的 Partition 上,并且 Replica 創(chuàng)建完成,那么它將會(huì)被置為這個(gè)狀態(tài),在這個(gè)狀態(tài)下,這個(gè) Replica 既可以作為 leader 也可以作為 follower,它有效的前置狀態(tài)是 NewReplica、OnlineReplica 或 OfflineReplica;
- OfflineReplica:如果一個(gè) Replica 掛掉(所在的節(jié)點(diǎn)宕機(jī)或者其他情況),該 Replica 將會(huì)被轉(zhuǎn)換到這個(gè)狀態(tài),它有的效前置狀態(tài)是 NewReplica、OfflineReplica 或者 OnlineReplica;
- ReplicaDeletionStarted:Replica 開(kāi)始刪除時(shí)被置為的狀態(tài),它有效的前置狀態(tài)是 OfflineReplica;
- ReplicaDeletionSuccessful:如果 Replica 在刪除時(shí)沒(méi)有遇到任何錯(cuò)誤信息,它將被置為這個(gè)狀態(tài),這個(gè)狀態(tài)代表該 Replica 的數(shù)據(jù)已經(jīng)從節(jié)點(diǎn)上清除了,它有效的前置狀態(tài)是 ReplicaDeletionStarted;
- ReplicaDeletionIneligible:如果 Replica 刪除失敗,它將會(huì)轉(zhuǎn)移到這個(gè)狀態(tài),這個(gè)狀態(tài)意思是非法刪除,也就是刪除是無(wú)法成功的,它有效的前置狀態(tài)是 ReplicaDeletionStarted;
- NonExistentReplica:如果 Replica 刪除成功,它將被轉(zhuǎn)移到這個(gè)狀態(tài),它有效的前置狀態(tài)是:ReplicaDeletionSuccessful。
上面的狀態(tài)中其中后面4是專門(mén)為 Replica 刪除而服務(wù)的,副本狀態(tài)機(jī)轉(zhuǎn)移圖如下所示:

這張圖是副本狀態(tài)機(jī)的核心,在下面會(huì)詳細(xì)講述,接下來(lái)先看下 KafkaController 在啟動(dòng)時(shí),調(diào)用 ReplicaStateMachine 的 startup() 方法初始化的處理過(guò)程。
ReplicaStateMachine 初始化
副本狀態(tài)機(jī)初始化的過(guò)程如下:
//note: Controller 重新選舉后觸發(fā)的操作
def startup() {
// initialize replica state
//note: 初始化 zk 上所有的 Replica 狀態(tài)信息(replica 存活的話設(shè)置為 Online,不存活的設(shè)置為 ReplicaDeletionIneligible)
initializeReplicaState()
// set started flag
hasStarted.set(true)
// move all Online replicas to Online
//note: 將存活的副本狀態(tài)轉(zhuǎn)變?yōu)?OnlineReplica
handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica)
info("Started replica state machine with initial state -> " + replicaState.toString())
}
在這個(gè)方法中,ReplicaStateMachine 先調(diào)用 initializeReplicaState() 方法初始化集群中所有 Replica 的狀態(tài)信息,如果 Replica 所在機(jī)器是 alive 的,那么將其狀態(tài)設(shè)置為 OnlineReplica,否則設(shè)置為 ReplicaDeletionIneligible 狀態(tài),這里只是將 Replica 的狀態(tài)信息更新副本狀態(tài)機(jī)的緩存 replicaState 中,并沒(méi)有真正進(jìn)行狀態(tài)轉(zhuǎn)移的操作。
//note: 初始化所有副本的狀態(tài)信息
private def initializeReplicaState() {
for((topicPartition, assignedReplicas) <- controllerContext.partitionReplicaAssignment) {
val topic = topicPartition.topic
val partition = topicPartition.partition
assignedReplicas.foreach { replicaId =>
val partitionAndReplica = PartitionAndReplica(topic, partition, replicaId)
if (controllerContext.liveBrokerIds.contains(replicaId)) //note: 如果副本是存活,那么將狀態(tài)都設(shè)置為 OnlineReplica
replicaState.put(partitionAndReplica, OnlineReplica)
else
// mark replicas on dead brokers as failed for topic deletion, if they belong to a topic to be deleted.
// This is required during controller failover since during controller failover a broker can go down,
// so the replicas on that broker should be moved to ReplicaDeletionIneligible to be on the safer side.
//note: 將不存活的副本狀態(tài)設(shè)置為 ReplicaDeletionIneligible
replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)
}
}
}
接著第二步調(diào)用 handleStateChanges() 將所有存活的副本狀態(tài)轉(zhuǎn)移為 OnlineReplica 狀態(tài),這里才是真正進(jìn)行狀態(tài)轉(zhuǎn)移的地方,其具體實(shí)現(xiàn)如下:
//note: 用于處理 Replica 狀態(tài)的變化
def handleStateChanges(replicas: Set[PartitionAndReplica], targetState: ReplicaState,
callbacks: Callbacks = (new CallbackBuilder).build) {
if(replicas.nonEmpty) {
info("Invoking state change to %s for replicas %s".format(targetState, replicas.mkString(",")))
try {
brokerRequestBatch.newBatch()
//note: 狀態(tài)轉(zhuǎn)變
replicas.foreach(r => handleStateChange(r, targetState, callbacks))
//note: 向 broker 發(fā)送相應(yīng)請(qǐng)求
brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
}catch {
case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e)
}
}
}
這里是副本狀態(tài)機(jī) startup() 方法的最后一步,它的目的是將所有 alive 的 Replica 狀態(tài)轉(zhuǎn)移到 OnlineReplica 狀態(tài),由于前面已經(jīng)這些 alive replica 的狀態(tài)設(shè)置成了 OnlineReplica,所以這里 Replica 的狀態(tài)轉(zhuǎn)移情況是:OnlineReplica –> OnlineReplica,這個(gè)方法主要是做了兩件事:
- 狀態(tài)轉(zhuǎn)移(這個(gè)在下面詳細(xì)講述);
- 發(fā)送相應(yīng)的請(qǐng)求。
副本的狀態(tài)轉(zhuǎn)移
這里以要轉(zhuǎn)移的 TargetState 區(qū)分做詳細(xì)詳細(xì)講解,當(dāng) TargetState 分別是 NewReplica、ReplicaDeletionStarted、ReplicaDeletionIneligible、ReplicaDeletionSuccessful、NonExistentReplica、OnlineReplica 或者 OfflineReplica 時(shí),副本狀態(tài)機(jī)所做的事情。
TargetState: NewReplica
NewReplica 這個(gè)狀態(tài)是 Replica 準(zhǔn)備開(kāi)始創(chuàng)建是的一個(gè)狀態(tài),其實(shí)現(xiàn)邏輯如下:
val currState = replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica)//note: Replica 不存在的話,狀態(tài)初始化為 NonExistentReplica
assertValidPreviousStates(partitionAndReplica, List(NonExistentReplica), targetState)//note: 驗(yàn)證
// start replica as a follower to the current leader for its partition
//note: 從 zk 獲取 Partition 的 leaderAndIsr 信息
val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)
leaderIsrAndControllerEpochOpt match {
case Some(leaderIsrAndControllerEpoch) =>
if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId)//note: 這個(gè)狀態(tài)的 Replica 不能作為 leader
throw new StateChangeFailedException("Replica %d for partition %s cannot be moved to NewReplica"
.format(replicaId, topicAndPartition) + "state as it is being requested to become leader")
//note: 向該 replicaId 發(fā)送 LeaderAndIsr 請(qǐng)求,這個(gè)方法同時(shí)也會(huì)向所有的 broker 發(fā)送 updateMeta 請(qǐng)求
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
topic, partition, leaderIsrAndControllerEpoch,
replicaAssignment)
case None => // new leader request will be sent to this replica when one gets elected
}
replicaState.put(partitionAndReplica, NewReplica)//note: 緩存這個(gè) replica 對(duì)象的狀態(tài)
stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
.format(controllerId, controller.epoch, replicaId, topicAndPartition, currState,
targetState))
當(dāng)想要把 Replica 的狀態(tài)轉(zhuǎn)移為 NewReplica 時(shí),副本狀態(tài)機(jī)的處理邏輯如下:
- 校驗(yàn) Replica 的前置狀態(tài),只有處于 NonExistentReplica 狀態(tài)的副本才能轉(zhuǎn)移到 NewReplica 狀態(tài);
- 從 zk 中獲取該 Topic-Partition 的 LeaderIsrAndControllerEpoch 信息;
- 如果獲取不到上述信息,直接將該 Replica 的狀態(tài)轉(zhuǎn)移成 NewReplica,然后結(jié)束流程(對(duì)與新建的 Partition,處于這個(gè)狀態(tài)時(shí),該 Partition 是沒(méi)有相應(yīng)的 LeaderAndIsr 信息的);
- 獲取到 Partition 的 LeaderIsrAndControllerEpoch 信息,如果發(fā)現(xiàn)該 Partition 的 leader 是當(dāng)前副本,那么就拋出 StateChangeFailedException 異常,因?yàn)樘幵谶@個(gè)狀態(tài)的 Replica 是不能被選舉為 leader 的;
- 獲取到了 Partition 的 LeaderIsrAndControllerEpoch 信息,并且該 Partition 的 leader 不是當(dāng)前 replica,那么向該 Partition 的所有 Replica 添加一個(gè) LeaderAndIsr 請(qǐng)求(添加 LeaderAndIsr 請(qǐng)求時(shí),實(shí)際上也會(huì)向所有的 Broker 都添加一個(gè) Update-Metadata 請(qǐng)求);
- 最后將該 Replica 的狀態(tài)轉(zhuǎn)移成 NewReplica,然后結(jié)束流程。
TargetState: ReplicaDeletionStarted
這是 Replica 開(kāi)始刪除時(shí)的狀態(tài),Replica 轉(zhuǎn)移到這種狀態(tài)的處理實(shí)現(xiàn)如下:
assertValidPreviousStates(partitionAndReplica, List(OfflineReplica), targetState)
replicaState.put(partitionAndReplica, ReplicaDeletionStarted)
// send stop replica command
//note: 發(fā)送 StopReplica 請(qǐng)求給該副本,并設(shè)置 deletePartition=true
brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true,
callbacks.stopReplicaResponseCallback)
stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
.format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
這部分的實(shí)現(xiàn)邏輯:
- 校驗(yàn)其前置狀態(tài),Replica 只能是在 OfflineReplica 的情況下才能轉(zhuǎn)移到這種狀態(tài);
- 更新向該 Replica 的狀態(tài)為 ReplicaDeletionStarted;
- 向該 replica 發(fā)送 StopReplica 請(qǐng)求(deletePartition = true),收到這請(qǐng)求后,broker 會(huì)從物理存儲(chǔ)上刪除這個(gè) Replica 的數(shù)據(jù)內(nèi)容;
- 如果請(qǐng)求返回的話會(huì)觸發(fā)其回調(diào)函數(shù)(這部分會(huì)在 topic 刪除部分講解)。
TargetState: ReplicaDeletionIneligible
ReplicaDeletionIneligible 是副本刪除失敗時(shí)的狀態(tài),Replica 轉(zhuǎn)移到這種狀態(tài)的處理實(shí)現(xiàn)如下:
assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState)
replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)
stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
.format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
實(shí)現(xiàn)邏輯:
- 校驗(yàn)其前置狀態(tài),Replica 只能是在 ReplicaDeletionStarted 下才能轉(zhuǎn)移這種狀態(tài);
- 更新該 Replica 的狀態(tài)為 ReplicaDeletionIneligible。
TargetState: ReplicaDeletionSuccessful
ReplicaDeletionSuccessful 是副本刪除成功時(shí)的狀態(tài),Replica 轉(zhuǎn)移到這種狀態(tài)的處理實(shí)現(xiàn)如下:
assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState)
replicaState.put(partitionAndReplica, ReplicaDeletionSuccessful)
stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
.format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
實(shí)現(xiàn)邏輯:
- 檢驗(yàn)其前置狀態(tài),Replica 只能是在 ReplicaDeletionStarted 下才能轉(zhuǎn)移這種狀態(tài);
- 更新該 Replica 的狀態(tài)為 ReplicaDeletionSuccessful。
TargetState: NonExistentReplica
NonExistentReplica 是副本完全刪除、不存在這個(gè)副本的狀態(tài),Replica 轉(zhuǎn)移到這種狀態(tài)的處理實(shí)現(xiàn)如下:
assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionSuccessful), targetState)
// remove this replica from the assigned replicas list for its partition
val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
//note: 從 controller 和副本狀態(tài)機(jī)的緩存中清除這個(gè) Replica 的記錄西溪
controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId))
replicaState.remove(partitionAndReplica)
stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
.format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
實(shí)現(xiàn)邏輯:
- 檢驗(yàn)其前置狀態(tài),Replica 只能是在 ReplicaDeletionSuccessful 下才能轉(zhuǎn)移這種狀態(tài);
- 在 controller 的 partitionReplicaAssignment 刪除這個(gè) Partition 對(duì)應(yīng)的 replica 信息;
- 從 Controller 和副本狀態(tài)機(jī)中將這個(gè) Topic 從緩存中刪除。
TargetState: OnlineReplica
OnlineReplica 是副本正常工作時(shí)的狀態(tài),此時(shí)的 Replica 既可以作為 leader 也可以作為 follower,Replica 轉(zhuǎn)移到這種狀態(tài)的處理實(shí)現(xiàn)如下:
assertValidPreviousStates(partitionAndReplica,
List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState)
replicaState(partitionAndReplica) match {
case NewReplica => //note: NewReplica --> OnlineReplica
// add this replica to the assigned replicas list for its partition
//note: 向 the assigned replicas list 添加這個(gè) replica(正常情況下這些 replicas 已經(jīng)更新到 list 中了)
val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
if(!currentAssignedReplicas.contains(replicaId))
controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)
stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
.format(controllerId, controller.epoch, replicaId, topicAndPartition, currState,
targetState))
case _ => //note: OnlineReplica/OfflineReplica/ReplicaDeletionIneligible --> OnlineReplica
// check if the leader for this partition ever existed
//note: 如果該 Partition 的 LeaderIsrAndControllerEpoch 信息存在,那么就更新副本的狀態(tài),并發(fā)送相應(yīng)的請(qǐng)求
controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
case Some(leaderIsrAndControllerEpoch) =>
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch,
replicaAssignment)
replicaState.put(partitionAndReplica, OnlineReplica)
stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
.format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
case None => // that means the partition was never in OnlinePartition state, this means the broker never
// started a log for that partition and does not have a high watermark value for this partition
}
}
replicaState.put(partitionAndReplica, OnlineReplica)
從前面的狀態(tài)轉(zhuǎn)移圖中可以看出,當(dāng) Replica 處在 NewReplica、OnlineReplica、OfflineReplica 或者 ReplicaDeletionIneligible 狀態(tài)時(shí),Replica 是可以轉(zhuǎn)移到 OnlineReplica 狀態(tài)的,下面分兩種情況講述:
NewReplica –> OnlineReplica 的處理邏輯如下:
- 從 Controller 的 partitionReplicaAssignment 中獲取這個(gè) Partition 的 AR;
- 如果 Replica 不在 AR 中的話,那么就將其添加到 Partition 的 AR 中;
- 最后將 Replica 的狀態(tài)設(shè)置為 OnlineReplica 狀態(tài)。
OnlineReplica/OfflineReplica/ReplicaDeletionIneligible –> OnlineReplica 的處理邏輯如下:
- 從 Controller 的 partitionLeadershipInfo 中獲取 Partition 的 LeaderAndIsr 信息;
- 如果該信息存在,那么就向這個(gè) Replica 所在 broker 添加這個(gè) Partition 的 LeaderAndIsr 請(qǐng)求,并將 Replica 的狀態(tài)設(shè)置為 OnlineReplica 狀態(tài);
- 否則不做任務(wù)處理;
- 最后更新R Replica 的狀態(tài)為 OnlineReplica。
TargetState: OfflineReplica
OfflineReplica 是 Replica 所在 Broker 掉線時(shí) Replica 的狀態(tài),轉(zhuǎn)移到這種狀態(tài)的處理邏輯如下:
assertValidPreviousStates(partitionAndReplica,
List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState)
// send stop replica command to the replica so that it stops fetching from the leader
//note: 發(fā)送 StopReplica 請(qǐng)求給該副本,先停止副本同步
brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false)
// As an optimization, the controller removes dead replicas from the ISR
val leaderAndIsrIsEmpty: Boolean =
controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
case Some(_) =>
controller.removeReplicaFromIsr(topic, partition, replicaId) match { //note: 從 isr 中移除這個(gè)副本(前提是 ISR 有其他有效副本)
case Some(updatedLeaderIsrAndControllerEpoch) =>
// send the shrunk ISR state change request to all the remaining alive replicas of the partition.
//note: 發(fā)送 LeaderAndIsr 請(qǐng)求給剩余的其他副本,因?yàn)?ISR 變動(dòng)了
val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
if (!controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition)) {
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId),
topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment)
}
replicaState.put(partitionAndReplica, OfflineReplica)
stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
.format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
false
case None =>
true
}
case None =>
true
}
if (leaderAndIsrIsEmpty && !controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition))
throw new StateChangeFailedException(
"Failed to change state of replica %d for partition %s since the leader and isr path in zookeeper is empty"
.format(replicaId, topicAndPartition))
處理邏輯如下:
- 校驗(yàn)其前置狀態(tài),只有 Replica 在 NewReplica、OnlineReplica、OfflineReplica 或者 ReplicaDeletionIneligible 狀態(tài)時(shí),才能轉(zhuǎn)移到這種狀態(tài);
- 向該 Replica 所在節(jié)點(diǎn)發(fā)送 StopReplica 請(qǐng)求(deletePartition = false);
- 調(diào)用 Controller 的 removeReplicaFromIsr() 方法將該 replica 從 Partition 的 isr 移除這個(gè) replica(前提 isr 中還有其他有效副本),然后向該 Partition 的其他副本發(fā)送 LeaderAndIsr 請(qǐng)求;
- 更新這個(gè) Replica 的狀態(tài)為 OfflineReplica。
狀態(tài)轉(zhuǎn)移觸發(fā)的條件
這里主要是看一下上面 Replica 各種轉(zhuǎn)移的觸發(fā)的條件,整理的結(jié)果如下表所示,部分內(nèi)容會(huì)在后續(xù)文章講解。


PartitionStateMachine
PartitionStateMachine 記錄著集群所有 Partition 的狀態(tài)信息,它決定著一個(gè) Partition 處在什么狀態(tài)以及它在什么狀態(tài)下可以轉(zhuǎn)變?yōu)槭裁礌顟B(tài),Kafka 中 Partition 的狀態(tài)總共有以下四種類型:
- NonExistentPartition:這個(gè)代表著這個(gè) Partition 之前沒(méi)有被創(chuàng)建過(guò)或者之前創(chuàng)建了現(xiàn)在又被刪除了,它有效的前置狀態(tài)是 OfflinePartition;
- NewPartition:Partition 創(chuàng)建后,它將處于這個(gè)狀態(tài),這個(gè)狀態(tài)的 Partition 還沒(méi)有 leader 和 isr,它有效的前置狀態(tài)是 NonExistentPartition;
- OnlinePartition:一旦這個(gè) Partition 的 leader 被選舉出來(lái)了,它將處于這個(gè)狀態(tài),它有效的前置狀態(tài)是 NewPartition、OnlinePartition、OfflinePartition;
- OfflinePartition:如果這個(gè) Partition 的 leader 掉線,這個(gè) Partition 將被轉(zhuǎn)移到這個(gè)狀態(tài),它有效的前置狀態(tài)是 NewPartition、OnlinePartition、OfflinePartition。
分區(qū)狀態(tài)機(jī)轉(zhuǎn)移圖如下所示:

這張圖是分區(qū)狀態(tài)機(jī)的核心,在下面會(huì)詳細(xì)講述,接下來(lái)先看下 KafkaController 在啟動(dòng)時(shí),調(diào)用 PartitionStateMachine 的 startup() 方法初始化的處理過(guò)程。
PartitionStateMachine 初始化
PartitionStateMachine 的初始化方法如下所示:
//note: Controller 啟動(dòng)時(shí)觸發(fā)
//note: 初始化所有 Partition 的狀態(tài)(從 zk 獲?。? 然后對(duì)于 new/offline Partition 觸發(fā)選主(選主成功的話,變?yōu)?OnlinePartition)
def startup() {
// initialize partition state
//note: 初始化 partition 的狀態(tài),如果 leader 所在 broker 是 alive 的,那么狀態(tài)為 OnlinePartition,否則為 OfflinePartition
initializePartitionState()
// set started flag
hasStarted.set(true)
// try to move partitions to online state
//note: 為所有處理 NewPartition 或 OnlinePartition 狀態(tài) Partition 選舉 leader
triggerOnlinePartitionStateChange()
info("Started partition state machine with initial state -> " + partitionState.toString())
}
在這個(gè)方法中,PartitionStateMachine 先調(diào)用 initializePartitionState() 方法初始化集群中所有 Partition 的狀態(tài)信息:
- 如果該 Partition 有 LeaderAndIsr 信息,那么如果 Partition leader 所在的機(jī)器是 alive 的,那么將其狀態(tài)設(shè)置為 OnlinePartition,否則設(shè)置為 OfflinePartition 狀態(tài);
- 如果該 Partition 沒(méi)有 LeaderAndIsr 信息,那么將其狀態(tài)設(shè)置為 NewPartition。
這里只是將 Partition 的狀態(tài)信息更新分區(qū)狀態(tài)機(jī)的緩存 partitionState 中,并沒(méi)有真正進(jìn)行狀態(tài)的轉(zhuǎn)移。
//note: 根據(jù)從 zk 獲取的所有 Partition,進(jìn)行狀態(tài)初始化
private def initializePartitionState() {
for (topicPartition <- controllerContext.partitionReplicaAssignment.keys) {
// check if leader and isr path exists for partition. If not, then it is in NEW state
controllerContext.partitionLeadershipInfo.get(topicPartition) match {
case Some(currentLeaderIsrAndEpoch) =>
// else, check if the leader for partition is alive. If yes, it is in Online state, else it is in Offline state
if (controllerContext.liveBrokerIds.contains(currentLeaderIsrAndEpoch.leaderAndIsr.leader))
// leader is alive
//note: 有 LeaderAndIsr 信息,并且 leader 存活,設(shè)置為 OnlinePartition 狀態(tài)
partitionState.put(topicPartition, OnlinePartition)
else
//note: 有 LeaderAndIsr 信息,但是 leader 不存活,設(shè)置為 OfflinePartition 狀態(tài)
partitionState.put(topicPartition, OfflinePartition)
case None =>
//note: 沒(méi)有 LeaderAndIsr 信息,設(shè)置為 NewPartition 狀態(tài)(這個(gè) Partition 還沒(méi)有)
partitionState.put(topicPartition, NewPartition)
}
}
}
在初始化的第二步,將會(huì)調(diào)用 triggerOnlinePartitionStateChange() 方法,為所有的狀態(tài)為 NewPartition/OnlinePartition 的 Partition 進(jìn)行 leader 選舉,選舉成功后的話,其狀態(tài)將會(huì)設(shè)置為 OnlinePartition,調(diào)用的 Leader 選舉方法是 OfflinePartitionLeaderSelector(具體實(shí)現(xiàn)參考鏈接)。
//note: 這個(gè)方法是在 controller 選舉后或 broker 上線或下線時(shí)時(shí)觸發(fā)的
def triggerOnlinePartitionStateChange() {
try {
brokerRequestBatch.newBatch()
// try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state except partitions
// that belong to topics to be deleted
//note: 開(kāi)始為所有狀態(tài)在 NewPartition or OfflinePartition 狀態(tài)的 partition 更新?tīng)顟B(tài)(除去將要被刪除的 topic)
for((topicAndPartition, partitionState) <- partitionState
if !controller.deleteTopicManager.isTopicQueuedUpForDeletion(topicAndPartition.topic)) {
if(partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition))
//note: 嘗試為處在 OfflinePartition 或 NewPartition 狀態(tài)的 Partition 選主,成功后轉(zhuǎn)換為 OnlinePartition
handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, controller.offlinePartitionSelector,
(new CallbackBuilder).build)
}
//note: 發(fā)送請(qǐng)求給所有的 broker,包括 LeaderAndIsr 請(qǐng)求和 UpdateMetadata 請(qǐng)求(這里只是添加到 Broker 對(duì)應(yīng)的 RequestQueue 中,后臺(tái)有線程去發(fā)送)
brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
} catch {
case e: Throwable => error("Error while moving some partitions to the online state", e)
// TODO: It is not enough to bail out and log an error, it is important to trigger leader election for those partitions
}
}
上面方法的目的是為嘗試將所有的狀態(tài)為 NewPartition/OnlinePartition 的 Partition 狀態(tài)轉(zhuǎn)移到 OnlinePartition,這個(gè)方法主要是做了兩件事:
- 狀態(tài)轉(zhuǎn)移(這個(gè)在下面詳細(xì)講述);
- 發(fā)送相應(yīng)的請(qǐng)求。
分區(qū)的狀態(tài)轉(zhuǎn)移
這里以要轉(zhuǎn)移的 TargetState 區(qū)分做詳細(xì)詳細(xì)講解,當(dāng) TargetState 分別是 NewPartition、OfflinePartition、NonExistentPartition 或者 OnlinePartition 時(shí),副本狀態(tài)機(jī)所做的事情。
TargetState: NewPartition
NewPartition 是 Partition 剛創(chuàng)建時(shí)的一個(gè)狀態(tài),其處理邏輯如下:
//note: 如果該 Partition 的狀態(tài)不存在,默認(rèn)為 NonExistentPartition
val currState = partitionState.getOrElseUpdate(topicAndPartition, NonExistentPartition)
// pre: partition did not exist before this
assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition)
partitionState.put(topicAndPartition, NewPartition) //note: 緩存 partition 的狀態(tài)
val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",")
stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s with assigned replicas %s"
.format(controllerId, controller.epoch, topicAndPartition, currState, targetState,
assignedReplicas))
實(shí)現(xiàn)邏輯:
- 校驗(yàn)其前置狀態(tài),它有效的前置狀態(tài)為 NonExistentPartition;
- 將該 Partition 的狀態(tài)轉(zhuǎn)移為 NewPartition 狀態(tài),并且更新到緩存中。
TargetState: OnlinePartition
OnlinePartition 是一個(gè) Partition 正常工作時(shí)的狀態(tài),這個(gè)狀態(tài)下的 Partition 已經(jīng)成功選舉出了 leader 和 isr 信息,其實(shí)現(xiàn)邏輯如下:
//note: 判斷 Partition 之前的狀態(tài)是否可以轉(zhuǎn)換為目的狀態(tài)
assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition)
partitionState(topicAndPartition) match {
case NewPartition => //note: 新建的 Partition
//note: 選舉 leader 和 isr,更新到 zk 和 controller 中,如果沒(méi)有存活的 replica,拋出異常
// initialize leader and isr path for new partition
initializeLeaderAndIsrForPartition(topicAndPartition)
case OfflinePartition => //note: leader 掛掉的 Partition
//note: 進(jìn)行 leader 選舉,更新到 zk 及 controller 緩存中,失敗的拋出異常
electLeaderForPartition(topic, partition, leaderSelector)
case OnlinePartition => // invoked when the leader needs to be re-elected
//note:這種只有在 leader 需要重新選舉時(shí)才會(huì)觸發(fā)
electLeaderForPartition(topic, partition, leaderSelector)
case _ => // should never come here since illegal previous states are checked above
}
partitionState.put(topicAndPartition, OnlinePartition)
val leader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
stateChangeLogger.trace("Controller %d epoch %d changed partition %s from %s to %s with leader %d"
.format(controllerId, controller.epoch, topicAndPartition, currState, targetState, leader))
實(shí)現(xiàn)邏輯:
- 校驗(yàn)這個(gè) Partition 的前置狀態(tài),有效的前置狀態(tài)是:NewPartition、OnlinePartition 或者 OfflinePartition;
- 如果前置狀態(tài)是 NewPartition,那么為該 Partition 選舉 leader 和 isr,更新到 zk 和 controller 的緩存中,如果副本沒(méi)有處于 alive 狀態(tài)的話,就拋出異常;
- 如果前置狀態(tài)是 OnlinePartition,那么只是觸發(fā) leader 選舉,在 OnlinePartition –> OnlinePartition 這種狀態(tài)轉(zhuǎn)移時(shí),需要傳入 leader 選舉的方法,觸發(fā)該 Partition 的 leader 選舉;
- 如果前置狀態(tài)是 OfflinePartition,同上,也是觸發(fā) leader 選舉。
- 更新 Partition 的狀態(tài)為 OnlinePartition。
對(duì)于以上這幾種情況,無(wú)論前置狀態(tài)是什么,最后都會(huì)觸發(fā)這個(gè) Partition 的 leader 選舉,leader 成功后,都會(huì)觸發(fā)向這個(gè) Partition 的所有 replica 發(fā)送 LeaderAndIsr 請(qǐng)求。
TargetState: OfflinePartition
OfflinePartition 是這個(gè) Partition 的 leader 掛掉時(shí)轉(zhuǎn)移的一個(gè)狀態(tài),如果 Partition 轉(zhuǎn)移到這個(gè)狀態(tài),那么就意味著這個(gè) Partition 沒(méi)有了可用 leader。
// pre: partition should be in New or Online state
assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OfflinePartition)
// should be called when the leader for a partition is no longer alive
stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s"
.format(controllerId, controller.epoch, topicAndPartition, currState, targetState))
partitionState.put(topicAndPartition, OfflinePartition)
// post: partition has no alive leader
實(shí)現(xiàn)邏輯:
- 校驗(yàn)其前置狀態(tài),它有效的前置狀態(tài)為 NewPartition、OnlinePartition 或者 OfflinePartition;
- 將該 Partition 的狀態(tài)轉(zhuǎn)移為 OfflinePartition 狀態(tài),并且更新到緩存中。
TargetState: NonExistentPartition
NonExistentPartition 代表了已經(jīng)處于 OfflinePartition 狀態(tài)的 Partition 已經(jīng)從 metadata 和 zk 中刪除后進(jìn)入的狀態(tài)。
// pre: partition should be in Offline state
assertValidPreviousStates(topicAndPartition, List(OfflinePartition), NonExistentPartition)
stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s"
.format(controllerId, controller.epoch, topicAndPartition, currState, targetState))
partitionState.put(topicAndPartition, NonExistentPartition)
// post: partition state is deleted from all brokers and zookeeper
實(shí)現(xiàn)邏輯:
- 校驗(yàn)其前置狀態(tài),它有效的前置狀態(tài)為 OfflinePartition;
- 將該 Partition 的狀態(tài)轉(zhuǎn)移為 NonExistentPartition 狀態(tài),并且更新到緩存中。
狀態(tài)轉(zhuǎn)移觸發(fā)的條件
這里主要是看一下上面 Partition 各種轉(zhuǎn)移的觸發(fā)的條件,整理的結(jié)果如下表所示,部分內(nèi)容會(huì)在后續(xù)文章講解。


上面就是副本狀態(tài)機(jī)與分區(qū)狀態(tài)機(jī)的所有內(nèi)容,這里只是單純地講述了一下這兩種狀態(tài)機(jī),后續(xù)文章會(huì)開(kāi)始介紹 Controller 一些其他內(nèi)容,包括 Partition 遷移、Topic 新建、Topic 下線等,這些內(nèi)容都會(huì)用到這篇文章講述的內(nèi)容。