Kafka 源碼解析之副本狀態(tài)機(jī)與分區(qū)狀態(tài)機(jī)

[TOC]
上篇講述了 KafkaController 的啟動(dòng)流程,但是關(guān)于分區(qū)狀態(tài)機(jī)和副本狀態(tài)機(jī)的初始化并沒(méi)有觸及,分區(qū)狀態(tài)機(jī)和副本狀態(tài)機(jī)的內(nèi)容將在本篇文章深入講述。分區(qū)狀態(tài)機(jī)記錄著當(dāng)前集群所有 Partition 的狀態(tài)信息以及如何對(duì) Partition 狀態(tài)轉(zhuǎn)移進(jìn)行相應(yīng)的處理;副本狀態(tài)機(jī)則是記錄著當(dāng)前集群所有 Replica 的狀態(tài)信息以及如何對(duì) Replica 狀態(tài)轉(zhuǎn)變進(jìn)行相應(yīng)的處理。

ReplicaStateMachine

ReplicaStateMachine 記錄著集群所有 Replica 的狀態(tài)信息,它決定著一個(gè) replica 處在什么狀態(tài)以及它在什么狀態(tài)下可以轉(zhuǎn)變?yōu)槭裁礌顟B(tài),Kafka 中副本的狀態(tài)總共有以下七種類型:

  1. NewReplica:這種狀態(tài)下 Controller 可以創(chuàng)建這個(gè) Replica,這種狀態(tài)下該 Replica 只能作為 follower,它可以是 Replica 刪除后的一個(gè)臨時(shí)狀態(tài),它有效的前置狀態(tài)是 NonExistentReplica;
  2. OnlineReplica:一旦這個(gè) Replica 被分配到指定的 Partition 上,并且 Replica 創(chuàng)建完成,那么它將會(huì)被置為這個(gè)狀態(tài),在這個(gè)狀態(tài)下,這個(gè) Replica 既可以作為 leader 也可以作為 follower,它有效的前置狀態(tài)是 NewReplica、OnlineReplica 或 OfflineReplica;
  3. OfflineReplica:如果一個(gè) Replica 掛掉(所在的節(jié)點(diǎn)宕機(jī)或者其他情況),該 Replica 將會(huì)被轉(zhuǎn)換到這個(gè)狀態(tài),它有的效前置狀態(tài)是 NewReplica、OfflineReplica 或者 OnlineReplica;
  4. ReplicaDeletionStarted:Replica 開(kāi)始刪除時(shí)被置為的狀態(tài),它有效的前置狀態(tài)是 OfflineReplica;
  5. ReplicaDeletionSuccessful:如果 Replica 在刪除時(shí)沒(méi)有遇到任何錯(cuò)誤信息,它將被置為這個(gè)狀態(tài),這個(gè)狀態(tài)代表該 Replica 的數(shù)據(jù)已經(jīng)從節(jié)點(diǎn)上清除了,它有效的前置狀態(tài)是 ReplicaDeletionStarted;
  6. ReplicaDeletionIneligible:如果 Replica 刪除失敗,它將會(huì)轉(zhuǎn)移到這個(gè)狀態(tài),這個(gè)狀態(tài)意思是非法刪除,也就是刪除是無(wú)法成功的,它有效的前置狀態(tài)是 ReplicaDeletionStarted;
  7. NonExistentReplica:如果 Replica 刪除成功,它將被轉(zhuǎn)移到這個(gè)狀態(tài),它有效的前置狀態(tài)是:ReplicaDeletionSuccessful。

上面的狀態(tài)中其中后面4是專門(mén)為 Replica 刪除而服務(wù)的,副本狀態(tài)機(jī)轉(zhuǎn)移圖如下所示:

image.png

這張圖是副本狀態(tài)機(jī)的核心,在下面會(huì)詳細(xì)講述,接下來(lái)先看下 KafkaController 在啟動(dòng)時(shí),調(diào)用 ReplicaStateMachine 的 startup() 方法初始化的處理過(guò)程。

ReplicaStateMachine 初始化

副本狀態(tài)機(jī)初始化的過(guò)程如下:

//note: Controller 重新選舉后觸發(fā)的操作
def startup() {
  // initialize replica state
  //note: 初始化 zk 上所有的 Replica 狀態(tài)信息(replica 存活的話設(shè)置為 Online,不存活的設(shè)置為 ReplicaDeletionIneligible)
  initializeReplicaState()
  // set started flag
  hasStarted.set(true)
  // move all Online replicas to Online
  //note: 將存活的副本狀態(tài)轉(zhuǎn)變?yōu)?OnlineReplica
  handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica)

  info("Started replica state machine with initial state -> " + replicaState.toString())
}

在這個(gè)方法中,ReplicaStateMachine 先調(diào)用 initializeReplicaState() 方法初始化集群中所有 Replica 的狀態(tài)信息,如果 Replica 所在機(jī)器是 alive 的,那么將其狀態(tài)設(shè)置為 OnlineReplica,否則設(shè)置為 ReplicaDeletionIneligible 狀態(tài),這里只是將 Replica 的狀態(tài)信息更新副本狀態(tài)機(jī)的緩存 replicaState 中,并沒(méi)有真正進(jìn)行狀態(tài)轉(zhuǎn)移的操作。

//note: 初始化所有副本的狀態(tài)信息
private def initializeReplicaState() {
  for((topicPartition, assignedReplicas) <- controllerContext.partitionReplicaAssignment) {
    val topic = topicPartition.topic
    val partition = topicPartition.partition
    assignedReplicas.foreach { replicaId =>
      val partitionAndReplica = PartitionAndReplica(topic, partition, replicaId)
      if (controllerContext.liveBrokerIds.contains(replicaId)) //note: 如果副本是存活,那么將狀態(tài)都設(shè)置為 OnlineReplica
        replicaState.put(partitionAndReplica, OnlineReplica)
      else
        // mark replicas on dead brokers as failed for topic deletion, if they belong to a topic to be deleted.
        // This is required during controller failover since during controller failover a broker can go down,
        // so the replicas on that broker should be moved to ReplicaDeletionIneligible to be on the safer side.
        //note: 將不存活的副本狀態(tài)設(shè)置為 ReplicaDeletionIneligible
        replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)
    }
  }
}

接著第二步調(diào)用 handleStateChanges() 將所有存活的副本狀態(tài)轉(zhuǎn)移為 OnlineReplica 狀態(tài),這里才是真正進(jìn)行狀態(tài)轉(zhuǎn)移的地方,其具體實(shí)現(xiàn)如下:

//note: 用于處理 Replica 狀態(tài)的變化
def handleStateChanges(replicas: Set[PartitionAndReplica], targetState: ReplicaState,
                       callbacks: Callbacks = (new CallbackBuilder).build) {
  if(replicas.nonEmpty) {
    info("Invoking state change to %s for replicas %s".format(targetState, replicas.mkString(",")))
    try {
      brokerRequestBatch.newBatch()
      //note: 狀態(tài)轉(zhuǎn)變
      replicas.foreach(r => handleStateChange(r, targetState, callbacks))
      //note: 向 broker 發(fā)送相應(yīng)請(qǐng)求
      brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
    }catch {
      case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e)
    }
  }
}

這里是副本狀態(tài)機(jī) startup() 方法的最后一步,它的目的是將所有 alive 的 Replica 狀態(tài)轉(zhuǎn)移到 OnlineReplica 狀態(tài),由于前面已經(jīng)這些 alive replica 的狀態(tài)設(shè)置成了 OnlineReplica,所以這里 Replica 的狀態(tài)轉(zhuǎn)移情況是:OnlineReplica –> OnlineReplica,這個(gè)方法主要是做了兩件事:

  1. 狀態(tài)轉(zhuǎn)移(這個(gè)在下面詳細(xì)講述);
  2. 發(fā)送相應(yīng)的請(qǐng)求。

副本的狀態(tài)轉(zhuǎn)移

這里以要轉(zhuǎn)移的 TargetState 區(qū)分做詳細(xì)詳細(xì)講解,當(dāng) TargetState 分別是 NewReplica、ReplicaDeletionStarted、ReplicaDeletionIneligible、ReplicaDeletionSuccessful、NonExistentReplica、OnlineReplica 或者 OfflineReplica 時(shí),副本狀態(tài)機(jī)所做的事情。

TargetState: NewReplica

NewReplica 這個(gè)狀態(tài)是 Replica 準(zhǔn)備開(kāi)始創(chuàng)建是的一個(gè)狀態(tài),其實(shí)現(xiàn)邏輯如下:

val currState = replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica)//note: Replica 不存在的話,狀態(tài)初始化為 NonExistentReplica
assertValidPreviousStates(partitionAndReplica, List(NonExistentReplica), targetState)//note: 驗(yàn)證
// start replica as a follower to the current leader for its partition
//note: 從 zk 獲取 Partition 的 leaderAndIsr 信息
val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)
leaderIsrAndControllerEpochOpt match {
  case Some(leaderIsrAndControllerEpoch) =>
    if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId)//note: 這個(gè)狀態(tài)的 Replica 不能作為 leader
      throw new StateChangeFailedException("Replica %d for partition %s cannot be moved to NewReplica"
        .format(replicaId, topicAndPartition) + "state as it is being requested to become leader")
    //note: 向該 replicaId 發(fā)送 LeaderAndIsr 請(qǐng)求,這個(gè)方法同時(shí)也會(huì)向所有的 broker 發(fā)送 updateMeta 請(qǐng)求
    brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
                                                        topic, partition, leaderIsrAndControllerEpoch,
                                                        replicaAssignment)
  case None => // new leader request will be sent to this replica when one gets elected
}
replicaState.put(partitionAndReplica, NewReplica)//note: 緩存這個(gè) replica 對(duì)象的狀態(tài)
stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
                          .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState,
                                  targetState))

當(dāng)想要把 Replica 的狀態(tài)轉(zhuǎn)移為 NewReplica 時(shí),副本狀態(tài)機(jī)的處理邏輯如下:

  1. 校驗(yàn) Replica 的前置狀態(tài),只有處于 NonExistentReplica 狀態(tài)的副本才能轉(zhuǎn)移到 NewReplica 狀態(tài);
  2. 從 zk 中獲取該 Topic-Partition 的 LeaderIsrAndControllerEpoch 信息;
  3. 如果獲取不到上述信息,直接將該 Replica 的狀態(tài)轉(zhuǎn)移成 NewReplica,然后結(jié)束流程(對(duì)與新建的 Partition,處于這個(gè)狀態(tài)時(shí),該 Partition 是沒(méi)有相應(yīng)的 LeaderAndIsr 信息的);
  4. 獲取到 Partition 的 LeaderIsrAndControllerEpoch 信息,如果發(fā)現(xiàn)該 Partition 的 leader 是當(dāng)前副本,那么就拋出 StateChangeFailedException 異常,因?yàn)樘幵谶@個(gè)狀態(tài)的 Replica 是不能被選舉為 leader 的;
  5. 獲取到了 Partition 的 LeaderIsrAndControllerEpoch 信息,并且該 Partition 的 leader 不是當(dāng)前 replica,那么向該 Partition 的所有 Replica 添加一個(gè) LeaderAndIsr 請(qǐng)求(添加 LeaderAndIsr 請(qǐng)求時(shí),實(shí)際上也會(huì)向所有的 Broker 都添加一個(gè) Update-Metadata 請(qǐng)求);
  6. 最后將該 Replica 的狀態(tài)轉(zhuǎn)移成 NewReplica,然后結(jié)束流程。

TargetState: ReplicaDeletionStarted
這是 Replica 開(kāi)始刪除時(shí)的狀態(tài),Replica 轉(zhuǎn)移到這種狀態(tài)的處理實(shí)現(xiàn)如下:

assertValidPreviousStates(partitionAndReplica, List(OfflineReplica), targetState)
replicaState.put(partitionAndReplica, ReplicaDeletionStarted)
// send stop replica command
//note: 發(fā)送 StopReplica 請(qǐng)求給該副本,并設(shè)置 deletePartition=true
brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = true,
  callbacks.stopReplicaResponseCallback)
stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
  .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))

這部分的實(shí)現(xiàn)邏輯:

  1. 校驗(yàn)其前置狀態(tài),Replica 只能是在 OfflineReplica 的情況下才能轉(zhuǎn)移到這種狀態(tài);
  2. 更新向該 Replica 的狀態(tài)為 ReplicaDeletionStarted;
  3. 向該 replica 發(fā)送 StopReplica 請(qǐng)求(deletePartition = true),收到這請(qǐng)求后,broker 會(huì)從物理存儲(chǔ)上刪除這個(gè) Replica 的數(shù)據(jù)內(nèi)容;
  4. 如果請(qǐng)求返回的話會(huì)觸發(fā)其回調(diào)函數(shù)(這部分會(huì)在 topic 刪除部分講解)。

TargetState: ReplicaDeletionIneligible
ReplicaDeletionIneligible 是副本刪除失敗時(shí)的狀態(tài),Replica 轉(zhuǎn)移到這種狀態(tài)的處理實(shí)現(xiàn)如下:

assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState)
replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)
stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
  .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))

實(shí)現(xiàn)邏輯:

  1. 校驗(yàn)其前置狀態(tài),Replica 只能是在 ReplicaDeletionStarted 下才能轉(zhuǎn)移這種狀態(tài);
  2. 更新該 Replica 的狀態(tài)為 ReplicaDeletionIneligible。

TargetState: ReplicaDeletionSuccessful
ReplicaDeletionSuccessful 是副本刪除成功時(shí)的狀態(tài),Replica 轉(zhuǎn)移到這種狀態(tài)的處理實(shí)現(xiàn)如下:

assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState)
replicaState.put(partitionAndReplica, ReplicaDeletionSuccessful)
stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
  .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))

實(shí)現(xiàn)邏輯:

  1. 檢驗(yàn)其前置狀態(tài),Replica 只能是在 ReplicaDeletionStarted 下才能轉(zhuǎn)移這種狀態(tài);
  2. 更新該 Replica 的狀態(tài)為 ReplicaDeletionSuccessful。

TargetState: NonExistentReplica
NonExistentReplica 是副本完全刪除、不存在這個(gè)副本的狀態(tài),Replica 轉(zhuǎn)移到這種狀態(tài)的處理實(shí)現(xiàn)如下:

assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionSuccessful), targetState)
// remove this replica from the assigned replicas list for its partition
val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
//note: 從 controller 和副本狀態(tài)機(jī)的緩存中清除這個(gè) Replica 的記錄西溪
controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId))
replicaState.remove(partitionAndReplica)
stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
  .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))

實(shí)現(xiàn)邏輯:

  1. 檢驗(yàn)其前置狀態(tài),Replica 只能是在 ReplicaDeletionSuccessful 下才能轉(zhuǎn)移這種狀態(tài);
  2. 在 controller 的 partitionReplicaAssignment 刪除這個(gè) Partition 對(duì)應(yīng)的 replica 信息;
  3. 從 Controller 和副本狀態(tài)機(jī)中將這個(gè) Topic 從緩存中刪除。

TargetState: OnlineReplica
OnlineReplica 是副本正常工作時(shí)的狀態(tài),此時(shí)的 Replica 既可以作為 leader 也可以作為 follower,Replica 轉(zhuǎn)移到這種狀態(tài)的處理實(shí)現(xiàn)如下:

assertValidPreviousStates(partitionAndReplica,
  List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState)
replicaState(partitionAndReplica) match {
  case NewReplica => //note: NewReplica --> OnlineReplica
    // add this replica to the assigned replicas list for its partition
    //note: 向 the assigned replicas list 添加這個(gè) replica(正常情況下這些 replicas 已經(jīng)更新到 list 中了)
    val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
    if(!currentAssignedReplicas.contains(replicaId))
      controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)
    stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
                              .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState,
                                      targetState))
  case _ => //note: OnlineReplica/OfflineReplica/ReplicaDeletionIneligible --> OnlineReplica
    // check if the leader for this partition ever existed
    //note: 如果該 Partition 的 LeaderIsrAndControllerEpoch 信息存在,那么就更新副本的狀態(tài),并發(fā)送相應(yīng)的請(qǐng)求
    controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
      case Some(leaderIsrAndControllerEpoch) =>
        brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch,
          replicaAssignment)
        replicaState.put(partitionAndReplica, OnlineReplica)
        stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
          .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
      case None => // that means the partition was never in OnlinePartition state, this means the broker never
        // started a log for that partition and does not have a high watermark value for this partition
    }
}
replicaState.put(partitionAndReplica, OnlineReplica)

從前面的狀態(tài)轉(zhuǎn)移圖中可以看出,當(dāng) Replica 處在 NewReplica、OnlineReplica、OfflineReplica 或者 ReplicaDeletionIneligible 狀態(tài)時(shí),Replica 是可以轉(zhuǎn)移到 OnlineReplica 狀態(tài)的,下面分兩種情況講述:

NewReplica –> OnlineReplica 的處理邏輯如下:

  1. 從 Controller 的 partitionReplicaAssignment 中獲取這個(gè) Partition 的 AR;
  2. 如果 Replica 不在 AR 中的話,那么就將其添加到 Partition 的 AR 中;
  3. 最后將 Replica 的狀態(tài)設(shè)置為 OnlineReplica 狀態(tài)。

OnlineReplica/OfflineReplica/ReplicaDeletionIneligible –> OnlineReplica 的處理邏輯如下:

  1. 從 Controller 的 partitionLeadershipInfo 中獲取 Partition 的 LeaderAndIsr 信息;
  2. 如果該信息存在,那么就向這個(gè) Replica 所在 broker 添加這個(gè) Partition 的 LeaderAndIsr 請(qǐng)求,并將 Replica 的狀態(tài)設(shè)置為 OnlineReplica 狀態(tài);
  3. 否則不做任務(wù)處理;
  4. 最后更新R Replica 的狀態(tài)為 OnlineReplica。

TargetState: OfflineReplica
OfflineReplica 是 Replica 所在 Broker 掉線時(shí) Replica 的狀態(tài),轉(zhuǎn)移到這種狀態(tài)的處理邏輯如下:

assertValidPreviousStates(partitionAndReplica,
  List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState)
// send stop replica command to the replica so that it stops fetching from the leader
//note: 發(fā)送 StopReplica 請(qǐng)求給該副本,先停止副本同步
brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false)
// As an optimization, the controller removes dead replicas from the ISR
val leaderAndIsrIsEmpty: Boolean =
  controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
    case Some(_) =>
      controller.removeReplicaFromIsr(topic, partition, replicaId) match { //note: 從 isr 中移除這個(gè)副本(前提是 ISR 有其他有效副本)
        case Some(updatedLeaderIsrAndControllerEpoch) =>
          // send the shrunk ISR state change request to all the remaining alive replicas of the partition.
          //note: 發(fā)送 LeaderAndIsr 請(qǐng)求給剩余的其他副本,因?yàn)?ISR 變動(dòng)了
          val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
          if (!controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition)) {
            brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId),
              topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment)
          }
          replicaState.put(partitionAndReplica, OfflineReplica)
          stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
            .format(controllerId, controller.epoch, replicaId, topicAndPartition, currState, targetState))
          false
        case None =>
          true
      }
    case None =>
      true
  }
if (leaderAndIsrIsEmpty && !controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition))
  throw new StateChangeFailedException(
    "Failed to change state of replica %d for partition %s since the leader and isr path in zookeeper is empty"
    .format(replicaId, topicAndPartition))

處理邏輯如下:

  1. 校驗(yàn)其前置狀態(tài),只有 Replica 在 NewReplica、OnlineReplica、OfflineReplica 或者 ReplicaDeletionIneligible 狀態(tài)時(shí),才能轉(zhuǎn)移到這種狀態(tài);
  2. 向該 Replica 所在節(jié)點(diǎn)發(fā)送 StopReplica 請(qǐng)求(deletePartition = false);
  3. 調(diào)用 Controller 的 removeReplicaFromIsr() 方法將該 replica 從 Partition 的 isr 移除這個(gè) replica(前提 isr 中還有其他有效副本),然后向該 Partition 的其他副本發(fā)送 LeaderAndIsr 請(qǐng)求;
  4. 更新這個(gè) Replica 的狀態(tài)為 OfflineReplica。

狀態(tài)轉(zhuǎn)移觸發(fā)的條件

這里主要是看一下上面 Replica 各種轉(zhuǎn)移的觸發(fā)的條件,整理的結(jié)果如下表所示,部分內(nèi)容會(huì)在后續(xù)文章講解。

image.png
image.png

PartitionStateMachine

PartitionStateMachine 記錄著集群所有 Partition 的狀態(tài)信息,它決定著一個(gè) Partition 處在什么狀態(tài)以及它在什么狀態(tài)下可以轉(zhuǎn)變?yōu)槭裁礌顟B(tài),Kafka 中 Partition 的狀態(tài)總共有以下四種類型:

  1. NonExistentPartition:這個(gè)代表著這個(gè) Partition 之前沒(méi)有被創(chuàng)建過(guò)或者之前創(chuàng)建了現(xiàn)在又被刪除了,它有效的前置狀態(tài)是 OfflinePartition;
  2. NewPartition:Partition 創(chuàng)建后,它將處于這個(gè)狀態(tài),這個(gè)狀態(tài)的 Partition 還沒(méi)有 leader 和 isr,它有效的前置狀態(tài)是 NonExistentPartition;
  3. OnlinePartition:一旦這個(gè) Partition 的 leader 被選舉出來(lái)了,它將處于這個(gè)狀態(tài),它有效的前置狀態(tài)是 NewPartition、OnlinePartition、OfflinePartition;
  4. OfflinePartition:如果這個(gè) Partition 的 leader 掉線,這個(gè) Partition 將被轉(zhuǎn)移到這個(gè)狀態(tài),它有效的前置狀態(tài)是 NewPartition、OnlinePartition、OfflinePartition。

分區(qū)狀態(tài)機(jī)轉(zhuǎn)移圖如下所示:

image.png

這張圖是分區(qū)狀態(tài)機(jī)的核心,在下面會(huì)詳細(xì)講述,接下來(lái)先看下 KafkaController 在啟動(dòng)時(shí),調(diào)用 PartitionStateMachine 的 startup() 方法初始化的處理過(guò)程。

PartitionStateMachine 初始化

PartitionStateMachine 的初始化方法如下所示:

//note: Controller 啟動(dòng)時(shí)觸發(fā)
//note: 初始化所有 Partition 的狀態(tài)(從 zk 獲?。? 然后對(duì)于 new/offline Partition 觸發(fā)選主(選主成功的話,變?yōu)?OnlinePartition)
def startup() {
  // initialize partition state
  //note: 初始化 partition 的狀態(tài),如果 leader 所在 broker 是 alive 的,那么狀態(tài)為 OnlinePartition,否則為 OfflinePartition
  initializePartitionState()
  // set started flag
  hasStarted.set(true)
  // try to move partitions to online state
  //note: 為所有處理 NewPartition 或 OnlinePartition 狀態(tài) Partition 選舉 leader
  triggerOnlinePartitionStateChange()

  info("Started partition state machine with initial state -> " + partitionState.toString())
}

在這個(gè)方法中,PartitionStateMachine 先調(diào)用 initializePartitionState() 方法初始化集群中所有 Partition 的狀態(tài)信息:

  1. 如果該 Partition 有 LeaderAndIsr 信息,那么如果 Partition leader 所在的機(jī)器是 alive 的,那么將其狀態(tài)設(shè)置為 OnlinePartition,否則設(shè)置為 OfflinePartition 狀態(tài);
  2. 如果該 Partition 沒(méi)有 LeaderAndIsr 信息,那么將其狀態(tài)設(shè)置為 NewPartition。

這里只是將 Partition 的狀態(tài)信息更新分區(qū)狀態(tài)機(jī)的緩存 partitionState 中,并沒(méi)有真正進(jìn)行狀態(tài)的轉(zhuǎn)移。

//note: 根據(jù)從 zk 獲取的所有 Partition,進(jìn)行狀態(tài)初始化
private def initializePartitionState() {
  for (topicPartition <- controllerContext.partitionReplicaAssignment.keys) {
    // check if leader and isr path exists for partition. If not, then it is in NEW state
    controllerContext.partitionLeadershipInfo.get(topicPartition) match {
      case Some(currentLeaderIsrAndEpoch) =>
        // else, check if the leader for partition is alive. If yes, it is in Online state, else it is in Offline state
        if (controllerContext.liveBrokerIds.contains(currentLeaderIsrAndEpoch.leaderAndIsr.leader))
          // leader is alive
          //note: 有 LeaderAndIsr 信息,并且 leader 存活,設(shè)置為 OnlinePartition 狀態(tài)
          partitionState.put(topicPartition, OnlinePartition)
        else
          //note: 有 LeaderAndIsr 信息,但是 leader 不存活,設(shè)置為 OfflinePartition 狀態(tài)
          partitionState.put(topicPartition, OfflinePartition)
      case None =>
        //note: 沒(méi)有 LeaderAndIsr 信息,設(shè)置為 NewPartition 狀態(tài)(這個(gè) Partition 還沒(méi)有)
        partitionState.put(topicPartition, NewPartition)
    }
  }
}

在初始化的第二步,將會(huì)調(diào)用 triggerOnlinePartitionStateChange() 方法,為所有的狀態(tài)為 NewPartition/OnlinePartition 的 Partition 進(jìn)行 leader 選舉,選舉成功后的話,其狀態(tài)將會(huì)設(shè)置為 OnlinePartition,調(diào)用的 Leader 選舉方法是 OfflinePartitionLeaderSelector(具體實(shí)現(xiàn)參考鏈接)。

//note: 這個(gè)方法是在 controller 選舉后或 broker 上線或下線時(shí)時(shí)觸發(fā)的
def triggerOnlinePartitionStateChange() {
  try {
    brokerRequestBatch.newBatch()
    // try to move all partitions in NewPartition or OfflinePartition state to OnlinePartition state except partitions
    // that belong to topics to be deleted
    //note: 開(kāi)始為所有狀態(tài)在 NewPartition or OfflinePartition 狀態(tài)的 partition 更新?tīng)顟B(tài)(除去將要被刪除的 topic)
    for((topicAndPartition, partitionState) <- partitionState
        if !controller.deleteTopicManager.isTopicQueuedUpForDeletion(topicAndPartition.topic)) {
      if(partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition))
        //note: 嘗試為處在 OfflinePartition 或 NewPartition 狀態(tài)的 Partition 選主,成功后轉(zhuǎn)換為 OnlinePartition
        handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, controller.offlinePartitionSelector,
                          (new CallbackBuilder).build)
    }
    //note: 發(fā)送請(qǐng)求給所有的 broker,包括 LeaderAndIsr 請(qǐng)求和 UpdateMetadata 請(qǐng)求(這里只是添加到 Broker 對(duì)應(yīng)的 RequestQueue 中,后臺(tái)有線程去發(fā)送)
    brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
  } catch {
    case e: Throwable => error("Error while moving some partitions to the online state", e)
    // TODO: It is not enough to bail out and log an error, it is important to trigger leader election for those partitions
  }
}

上面方法的目的是為嘗試將所有的狀態(tài)為 NewPartition/OnlinePartition 的 Partition 狀態(tài)轉(zhuǎn)移到 OnlinePartition,這個(gè)方法主要是做了兩件事:

  1. 狀態(tài)轉(zhuǎn)移(這個(gè)在下面詳細(xì)講述);
  2. 發(fā)送相應(yīng)的請(qǐng)求。

分區(qū)的狀態(tài)轉(zhuǎn)移
這里以要轉(zhuǎn)移的 TargetState 區(qū)分做詳細(xì)詳細(xì)講解,當(dāng) TargetState 分別是 NewPartition、OfflinePartition、NonExistentPartition 或者 OnlinePartition 時(shí),副本狀態(tài)機(jī)所做的事情。

TargetState: NewPartition
NewPartition 是 Partition 剛創(chuàng)建時(shí)的一個(gè)狀態(tài),其處理邏輯如下:

//note: 如果該 Partition 的狀態(tài)不存在,默認(rèn)為 NonExistentPartition
val currState = partitionState.getOrElseUpdate(topicAndPartition, NonExistentPartition)
// pre: partition did not exist before this
assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition)
partitionState.put(topicAndPartition, NewPartition) //note: 緩存 partition 的狀態(tài)
val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",")
stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s with assigned replicas %s"
                          .format(controllerId, controller.epoch, topicAndPartition, currState, targetState,
                                  assignedReplicas))

實(shí)現(xiàn)邏輯:

  1. 校驗(yàn)其前置狀態(tài),它有效的前置狀態(tài)為 NonExistentPartition;
  2. 將該 Partition 的狀態(tài)轉(zhuǎn)移為 NewPartition 狀態(tài),并且更新到緩存中。

TargetState: OnlinePartition
OnlinePartition 是一個(gè) Partition 正常工作時(shí)的狀態(tài),這個(gè)狀態(tài)下的 Partition 已經(jīng)成功選舉出了 leader 和 isr 信息,其實(shí)現(xiàn)邏輯如下:

//note: 判斷 Partition 之前的狀態(tài)是否可以轉(zhuǎn)換為目的狀態(tài)
assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition)
partitionState(topicAndPartition) match {
  case NewPartition => //note: 新建的 Partition
    //note: 選舉 leader 和 isr,更新到 zk 和 controller 中,如果沒(méi)有存活的 replica,拋出異常
    // initialize leader and isr path for new partition
    initializeLeaderAndIsrForPartition(topicAndPartition)
  case OfflinePartition => //note: leader 掛掉的 Partition
    //note: 進(jìn)行 leader 選舉,更新到 zk 及 controller 緩存中,失敗的拋出異常
    electLeaderForPartition(topic, partition, leaderSelector)
  case OnlinePartition => // invoked when the leader needs to be re-elected
    //note:這種只有在 leader 需要重新選舉時(shí)才會(huì)觸發(fā)
    electLeaderForPartition(topic, partition, leaderSelector)
  case _ => // should never come here since illegal previous states are checked above
}
partitionState.put(topicAndPartition, OnlinePartition)
val leader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
stateChangeLogger.trace("Controller %d epoch %d changed partition %s from %s to %s with leader %d"
                          .format(controllerId, controller.epoch, topicAndPartition, currState, targetState, leader))

實(shí)現(xiàn)邏輯:

  1. 校驗(yàn)這個(gè) Partition 的前置狀態(tài),有效的前置狀態(tài)是:NewPartition、OnlinePartition 或者 OfflinePartition;
  2. 如果前置狀態(tài)是 NewPartition,那么為該 Partition 選舉 leader 和 isr,更新到 zk 和 controller 的緩存中,如果副本沒(méi)有處于 alive 狀態(tài)的話,就拋出異常;
  3. 如果前置狀態(tài)是 OnlinePartition,那么只是觸發(fā) leader 選舉,在 OnlinePartition –> OnlinePartition 這種狀態(tài)轉(zhuǎn)移時(shí),需要傳入 leader 選舉的方法,觸發(fā)該 Partition 的 leader 選舉;
  4. 如果前置狀態(tài)是 OfflinePartition,同上,也是觸發(fā) leader 選舉。
  5. 更新 Partition 的狀態(tài)為 OnlinePartition。

對(duì)于以上這幾種情況,無(wú)論前置狀態(tài)是什么,最后都會(huì)觸發(fā)這個(gè) Partition 的 leader 選舉,leader 成功后,都會(huì)觸發(fā)向這個(gè) Partition 的所有 replica 發(fā)送 LeaderAndIsr 請(qǐng)求。

TargetState: OfflinePartition
OfflinePartition 是這個(gè) Partition 的 leader 掛掉時(shí)轉(zhuǎn)移的一個(gè)狀態(tài),如果 Partition 轉(zhuǎn)移到這個(gè)狀態(tài),那么就意味著這個(gè) Partition 沒(méi)有了可用 leader。

// pre: partition should be in New or Online state
assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OfflinePartition)
// should be called when the leader for a partition is no longer alive
stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s"
                          .format(controllerId, controller.epoch, topicAndPartition, currState, targetState))
partitionState.put(topicAndPartition, OfflinePartition)
// post: partition has no alive leader

實(shí)現(xiàn)邏輯:

  1. 校驗(yàn)其前置狀態(tài),它有效的前置狀態(tài)為 NewPartition、OnlinePartition 或者 OfflinePartition;
  2. 將該 Partition 的狀態(tài)轉(zhuǎn)移為 OfflinePartition 狀態(tài),并且更新到緩存中。

TargetState: NonExistentPartition
NonExistentPartition 代表了已經(jīng)處于 OfflinePartition 狀態(tài)的 Partition 已經(jīng)從 metadata 和 zk 中刪除后進(jìn)入的狀態(tài)。

 // pre: partition should be in Offline state
assertValidPreviousStates(topicAndPartition, List(OfflinePartition), NonExistentPartition)
stateChangeLogger.trace("Controller %d epoch %d changed partition %s state from %s to %s"
                          .format(controllerId, controller.epoch, topicAndPartition, currState, targetState))
partitionState.put(topicAndPartition, NonExistentPartition)
// post: partition state is deleted from all brokers and zookeeper

實(shí)現(xiàn)邏輯:

  1. 校驗(yàn)其前置狀態(tài),它有效的前置狀態(tài)為 OfflinePartition;
  2. 將該 Partition 的狀態(tài)轉(zhuǎn)移為 NonExistentPartition 狀態(tài),并且更新到緩存中。
狀態(tài)轉(zhuǎn)移觸發(fā)的條件

這里主要是看一下上面 Partition 各種轉(zhuǎn)移的觸發(fā)的條件,整理的結(jié)果如下表所示,部分內(nèi)容會(huì)在后續(xù)文章講解。

image.png
image.png

上面就是副本狀態(tài)機(jī)與分區(qū)狀態(tài)機(jī)的所有內(nèi)容,這里只是單純地講述了一下這兩種狀態(tài)機(jī),后續(xù)文章會(huì)開(kāi)始介紹 Controller 一些其他內(nèi)容,包括 Partition 遷移、Topic 新建、Topic 下線等,這些內(nèi)容都會(huì)用到這篇文章講述的內(nèi)容。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容