kafka源碼愫讀(6)、GroupCoordinator模塊源碼分析

1、GroupCoordinator及相關(guān)組件簡(jiǎn)介

GroupCoordinator模塊為Consumer管理器,每個(gè)GroupCoordinator管理一個(gè)或多個(gè)Consumer Group,同時(shí)管理了每個(gè)Consumer對(duì)應(yīng)的Partition及其消費(fèi)偏移信息;

每個(gè)ConsumerGroup確定對(duì)應(yīng)GroupCoordinator的計(jì)算公式為:

Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)

groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默認(rèn)是50個(gè)分區(qū),每個(gè)分區(qū)對(duì)應(yīng)的leader副本即為對(duì)應(yīng)的GroupCoordinator;

1.1、offset位移管理

當(dāng)Consumer消費(fèi)對(duì)應(yīng)的Partition中的消息后,需要提交消費(fèi)進(jìn)度給集群;而kafka集群將消費(fèi)進(jìn)度保存在__consumer_offsets主題中,實(shí)際是通過(guò)GroupCoordinator向偏移量主題中寫(xiě)入數(shù)據(jù),同時(shí)會(huì)緩存每個(gè)分區(qū)最新的消費(fèi)進(jìn)度,方便快速獲取分區(qū)的消費(fèi)進(jìn)度;GroupCoordinator是通過(guò)GroupMetadataManager組件來(lái)管理ConsumerGroup和消費(fèi)進(jìn)度的;

1.2、ConsumerReblance管理

kafka中Consumer和Partition是一對(duì)一的,即一個(gè)Consumer只會(huì)消費(fèi)對(duì)應(yīng)主題的一個(gè)分區(qū),而如何確定Consumer和Partition的對(duì)應(yīng)關(guān)系,則是通過(guò)GroupCoordinator來(lái)管理的;當(dāng)Consumer變更時(shí),GroupCoordinator會(huì)通過(guò)Reblance流程來(lái)重新分配Consumer與Partition的消費(fèi)關(guān)系;

2、group狀態(tài)轉(zhuǎn)換

2.1、group狀態(tài)轉(zhuǎn)換圖

group狀態(tài)轉(zhuǎn)換圖.png

2.3、group狀態(tài)轉(zhuǎn)換說(shuō)明

PreparingRebalance狀態(tài):

當(dāng)消費(fèi)者處于PreparingRebalance狀態(tài),GroupCoordinator可以正常處理OffsetFetchRequest,ListGroupRequest,OffsetCommitRequest請(qǐng)求;但是對(duì)于HeartbeatRequest和SyncGroupRequest,則會(huì)在其響應(yīng)里攜帶REBALANCE_IN_PROGRESS錯(cuò)誤碼進(jìn)行標(biāo)識(shí);當(dāng)收到JoinGroupRequest的時(shí)候會(huì)先創(chuàng)建對(duì)應(yīng)的DelayedJoin,等待滿足條件后對(duì)其響應(yīng)。

  • PreparingRebalance -> AwaitingSync: 當(dāng)有DelayedJoin超時(shí)或是消費(fèi)者組之前的成員(消費(fèi)者)都已經(jīng)重新申請(qǐng)加入時(shí)進(jìn)行切換;

  • PreparingRebalance -> Empty: 當(dāng)所有消費(fèi)者都離開(kāi)消費(fèi)者組時(shí)候切換;

  • PreparingRebalance -> Dead:分區(qū)遷移的時(shí)候刪除消費(fèi)者組;

AwaitingSync狀態(tài):

表示消費(fèi)者組正在等待Group Leader的SyncGroupRequest請(qǐng)求時(shí),當(dāng)GroupCoordinator收到OffsetCommitRequest和HeartbeatRequest時(shí)候,會(huì)在響應(yīng)中添加REBALANCE_IN_PROGRESS錯(cuò)誤碼進(jìn)行標(biāo)識(shí),對(duì)于來(lái)自follower的SyncGroupRequest則直接拋棄,直到收到Group Leader的SyncGroupRequest;

  • AwaitingSync -> Stable: 當(dāng)GroupCoordinator收到Group Leader發(fā)來(lái)的SyncGroupRequest時(shí)進(jìn)行切換;
  • AwaitingSync -> PreparingRebalance: 有消費(fèi)者加入或者退出消費(fèi)者組;消費(fèi)者組中有消費(fèi)者心跳超時(shí);已知成員更新元數(shù)據(jù);
  • AwaitingSync -> Dead: 分區(qū)遷移的時(shí)候刪除消費(fèi)者組

Stable狀態(tài):

該狀態(tài)下,GroupCoordinator可以處理所有的請(qǐng)求,例如:Offset

FetchRequest,HeartbeatRequest,OffsetCommitRequest,來(lái)自follower的JoinGroupRequestd等等;

  • Stable -> PreparingRebalance:消費(fèi)者組有消費(fèi)者心跳檢測(cè)超時(shí);有消費(fèi)者主動(dòng)退出;當(dāng)前Group Leader發(fā)送JoinGroupRequest;有新的消費(fèi)者請(qǐng)求加入消費(fèi)者組;
  • Stable -> Dead: 分區(qū)遷移的時(shí)候刪除消費(fèi)者組

Dead狀態(tài):

處于此狀態(tài)的消費(fèi)者組中沒(méi)有消費(fèi)者,其對(duì)應(yīng)的GroupMetadata也將被刪除,除了OffsetCommitRequest其他請(qǐng)求響應(yīng)會(huì)攜帶UNKNOWN_MEMBER_ID.

Empty狀態(tài):

消費(fèi)者組中沒(méi)有消費(fèi)者了,但是不會(huì)被刪除,直到所有offset都已經(jīng)到期;這個(gè)狀態(tài)還表示消費(fèi)者組只用于offset提交;

  • Empty -> Dead: 最后的offset被刪除;組因到期被刪除;組因分區(qū)遷移被刪除;
  • Empty -> PreparingRebalance:新的成員加入,發(fā)送JoinGroupRequest;

3、主要處理流程

GroupCoordinator處理三類命令,OFFSET相關(guān)命令、GROUP相關(guān)命令及HEARTBEAT命令。具體命令如下:ApiKeys.OFFSET_COMMIT、ApiKeys.OFFSET_FETCH、ApiKeys.JOIN_GROUP、ApiKeys.LEAVE_GROUP、ApiKeys.SYNC_GROUP、ApiKeys.DESCRIBE_GROUPS、ApiKeys.LIST_GROUPS、ApiKeys.HEARTBEAT;

3.1、JoinGroup處理

JoinGroup顧名思義,為Consumer加入group請(qǐng)求,其主要是將Consumer的基本信息添加到Group中,而GroupCoordinator會(huì)選舉出一個(gè)Consumer成為leader,當(dāng)所有組成員都join完成后,leader會(huì)根據(jù)分配策略,對(duì)Consumer及對(duì)應(yīng)的分區(qū)進(jìn)行分配,并在sync過(guò)程中完成分配結(jié)果的同步。

handleJoinGroup()處理流程:

handleJoinGroup()處理流程.png

handleJoinGroup()主要源碼說(shuō)明:

def handleJoinGroup(groupId: String,
                    memberId: String,
                    groupInstanceId: Option[String],
                    requireKnownMemberId: Boolean,
                    clientId: String,
                    clientHost: String,
                    rebalanceTimeoutMs: Int,
                    sessionTimeoutMs: Int,
                    protocolType: String,
                    protocols: List[(String, Array[Byte])],
                    responseCallback: JoinCallback): Unit = {
    val isUnknownMember = memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID
    groupManager.getGroup(groupId) match {
      case None =>
        //當(dāng)group和memberId都不存在,表示分組中第一個(gè)Consumer進(jìn)行joinGroup,此時(shí)需要添加gourp及初始化group的狀態(tài)等信息
        if (isUnknownMember) {
          //添加新group到GroupMetadataManager
          val group = groupManager.addGroup(new GroupMetadata(groupId, Empty, time))
          //初始化group信息及添加InitialDelayedJoin處理等
          doUnknownJoinGroup(group, groupInstanceId, requireKnownMemberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
        } else {
          responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID))
        }
      case Some(group) =>
        group.inLock {
          //group中member的大小超過(guò)了設(shè)置容量?移除memberId并報(bào)錯(cuò)
          if ((groupIsOverCapacity(group)
                && group.has(memberId) && !group.get(memberId).isAwaitingJoin) // oversized group, need to shed members that haven't joined yet
              || (isUnknownMember && group.size >= groupConfig.groupMaxSize)) {
            group.remove(memberId)
            group.removeStaticMember(groupInstanceId)
            responseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.GROUP_MAX_SIZE_REACHED))
          } else if (isUnknownMember) {
            //gourp存在,但memberId不存在
            doUnknownJoinGroup(group, groupInstanceId, requireKnownMemberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
          } else {
            //group和mamberId都存在
            doJoinGroup(group, memberId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
          }

          // attempt to complete JoinGroup
          //若group的狀態(tài)為PreparingRebalance,嘗試處理InitialDelayedJoin延時(shí)任務(wù),
          //InitialDelayedJoin任務(wù)中會(huì)對(duì)group中所有member進(jìn)行awaitingJoinCallback處理,即進(jìn)行JoinGroup的Response
          //Response中會(huì)返回group中的leader,同時(shí)會(huì)對(duì)leader返回group中所有的members;
          if (group.is(PreparingRebalance)) {
            joinPurgatory.checkAndComplete(GroupKey(group.groupId))
          }
        }
      }
    }

doUnkonwJoinGroup()處理流程:

doUnkonwJoinGroup()處理流程.png

doUnkonwJoinGroup()主要源碼說(shuō)明:

private def doUnknownJoinGroup(group: GroupMetadata,
                               groupInstanceId: Option[String],
                               requireKnownMemberId: Boolean,
                               clientId: String,
                               clientHost: String,
                               rebalanceTimeoutMs: Int,
                               sessionTimeoutMs: Int,
                               protocolType: String,
                               protocols: List[(String, Array[Byte])],
                               responseCallback: JoinCallback): Unit = {
      //生成新的memberId
      val newMemberId = group.generateMemberId(clientId, groupInstanceId)

      //當(dāng)前member為靜態(tài)成員,(groupInstanceId.isDefined && staticMembers.contains(groupInstanceId.get))靜態(tài)成員即為consumer配置了groupInstanceId的成員
      if (group.hasStaticMember(groupInstanceId)) {
        //更新靜態(tài)成員信息及rebalance處理
        updateStaticMemberAndRebalance(group, newMemberId, groupInstanceId, protocols, responseCallback)
      }
      //必須要memberId,(joinGroupRequest.version >= 4 && groupInstanceId.isEmpty)
      else if (requireKnownMemberId) {
          // If member id required (dynamic membership), register the member in the pending member list
          // and send back a response to call for another join group request with allocated member id.
        debug(s"Dynamic member with unknown member id joins group ${group.groupId} in " +
            s"${group.currentState} state. Created a new member id $newMemberId and request the member to rejoin with this id.")
        //將新的memberId放入pendingMembers列表中,并返回MEMBER_ID_REQUIRED,讓consumer用新的memberId進(jìn)行rejoin
        group.addPendingMember(newMemberId)
        addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs)
        responseCallback(JoinGroupResult(newMemberId, Errors.MEMBER_ID_REQUIRED))
      } else {
        info(s"${if (groupInstanceId.isDefined) "Static" else "Dynamic"} Member with unknown member id joins group ${group.groupId} in " +
          s"${group.currentState} state. Created a new member id $newMemberId for this member and add to the group.")
        //添加member并進(jìn)行rebalance處理
        addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, newMemberId, groupInstanceId,
          clientId, clientHost, protocolType, protocols, group, responseCallback)
      }
    }
  }

doJoinGroup()處理流程:

doJoinGroup()處理流程.png

doJoinGroup()主要源碼說(shuō)明:

private def doJoinGroup(group: GroupMetadata,
                        memberId: String,
                        groupInstanceId: Option[String],
                        clientId: String,
                        clientHost: String,
                        rebalanceTimeoutMs: Int,
                        sessionTimeoutMs: Int,
                        protocolType: String,
                        protocols: List[(String, Array[Byte])],
                        responseCallback: JoinCallback): Unit = {
  group.inLock {
    if (group.is(Dead)) {
      responseCallback(JoinGroupResult(memberId, Errors.COORDINATOR_NOT_AVAILABLE))
    } else if (!group.supportsProtocols(protocolType, MemberMetadata.plainProtocolSet(protocols))) {
      responseCallback(JoinGroupResult(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
    }
    //當(dāng)前memberId在PendingMember中,即當(dāng)前Consumer獲取memberId進(jìn)行了rejoin
    else if (group.isPendingMember(memberId)) {
      // A rejoining pending member will be accepted. Note that pending member will never be a static member.
      //是pending狀態(tài)的member,表示其為動(dòng)態(tài)member,而此類型的member不允許設(shè)置groupInstanceId,只要靜態(tài)成員才能設(shè)置
      if (groupInstanceId.isDefined) {
        throw new IllegalStateException(s"the static member $groupInstanceId was not expected to be assigned " +
          s"into pending member bucket with member id $memberId")
      } else {
        debug(s"Dynamic Member with specific member id $memberId joins group ${group.groupId} in " +
          s"${group.currentState} state. Adding to the group now.")
        //添加member及rebalance處理
        addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, memberId, groupInstanceId,
          clientId, clientHost, protocolType, protocols, group, responseCallback)
      }
    } else {
      //
      val groupInstanceIdNotFound = groupInstanceId.isDefined && !group.hasStaticMember(groupInstanceId)

      //為靜態(tài)成員,但memberId和存儲(chǔ)的memberId不同
      if (group.isStaticMemberFenced(memberId, groupInstanceId, "join-group")) {
        // given member id doesn't match with the groupInstanceId. Inform duplicate instance to shut down immediately.
        responseCallback(JoinGroupResult(memberId, Errors.FENCED_INSTANCE_ID))
      }
      //組不包含此member或此成員為靜態(tài)成員但無(wú)memberId
      else if (!group.has(memberId) || groupInstanceIdNotFound) {
          // If the dynamic member trying to register with an unrecognized id, or
          // the static member joins with unknown group instance id, send the response to let
          // it reset its member id and retry.
        responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID))
      } else {
        val member = group.get(memberId)
        //分組狀態(tài)來(lái)處理
        group.currentState match {
          //狀態(tài)為PreparingRebalance,表示當(dāng)前所有Consumer都在進(jìn)行Join處理
          case PreparingRebalance =>
            updateMemberAndRebalance(group, member, protocols, responseCallback)

          //狀態(tài)為CompletingRebalance,表示當(dāng)前所有Consumer已經(jīng)join完成,正準(zhǔn)備Sync處理
          case CompletingRebalance =>
            if (member.matches(protocols)) {
              // member is joining with the same metadata (which could be because it failed to
              // receive the initial JoinGroup response), so just return current group information
              // for the current generation.
              //返回JoinGroupResult
              responseCallback(JoinGroupResult(
                members = if (group.isLeader(memberId)) {
                  group.currentMemberMetadata
                } else {
                  List.empty
                },
                memberId = memberId,
                generationId = group.generationId,
                protocolType = group.protocolType,
                protocolName = group.protocolName,
                leaderId = group.leaderOrNull,
                error = Errors.NONE))
            } else {
              // member has changed metadata, so force a rebalance
              //更新member信息
              updateMemberAndRebalance(group, member, protocols, responseCallback)
            }
          //  狀態(tài)為Stable,表示當(dāng)前所有Consumer已經(jīng)sync完成,即已經(jīng)rebalance完成,所有Consumer已經(jīng)可以正常消費(fèi)消息了
          case Stable =>
            val member = group.get(memberId)
            //member為leader或其metadata已經(jīng)更改,則更新member信息并rebalance,否則返回當(dāng)前member的信息
            if (group.isLeader(memberId) || !member.matches(protocols)) {
              // force a rebalance if a member has changed metadata or if the leader sends JoinGroup.
              // The latter allows the leader to trigger rebalances for changes affecting assignment
              // which do not affect the member metadata (such as topic metadata changes for the consumer)
              updateMemberAndRebalance(group, member, protocols, responseCallback)
            } else {
              // for followers with no actual change to their metadata, just return group information
              // for the current generation which will allow them to issue SyncGroup
              responseCallback(JoinGroupResult(
                members = List.empty,
                memberId = memberId,
                generationId = group.generationId,
                protocolType = group.protocolType,
                protocolName = group.protocolName,
                leaderId = group.leaderOrNull,
                error = Errors.NONE))
            }

          case Empty | Dead =>
            // Group reaches unexpected state. Let the joining member reset their generation and rejoin.
            warn(s"Attempt to add rejoining member $memberId of group ${group.groupId} in " +
              s"unexpected group state ${group.currentState}")
            responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID))
        }
      }
    }
  }
}

addMemberAndRebalance()處理流程:

addMemberAndRebalance()處理流程.png

addMemberAndRebalance()主要源碼說(shuō)明:

private def addMemberAndRebalance(rebalanceTimeoutMs: Int,
                                  sessionTimeoutMs: Int,
                                  memberId: String,
                                  groupInstanceId: Option[String],
                                  clientId: String,
                                  clientHost: String,
                                  protocolType: String,
                                  protocols: List[(String, Array[Byte])],
                                  group: GroupMetadata,
                                  callback: JoinCallback): Unit = {
  //創(chuàng)建新的MemberMetadata
  val member = new MemberMetadata(memberId, group.groupId, groupInstanceId,
    clientId, clientHost, rebalanceTimeoutMs,
    sessionTimeoutMs, protocolType, protocols)

  member.isNew = true

  // update the newMemberAdded flag to indicate that the join group can be further delayed
  //若當(dāng)前group狀態(tài)為PreparingRebalance,并且還未完成
  if (group.is(PreparingRebalance) && group.generationId == 0)
    group.newMemberAdded = true

  //添加新member
  group.add(member, callback)

  // The session timeout does not affect new members since they do not have their memberId and
  // cannot send heartbeats. Furthermore, we cannot detect disconnects because sockets are muted
  // while the JoinGroup is in purgatory. If the client does disconnect (e.g. because of a request
  // timeout during a long rebalance), they may simply retry which will lead to a lot of defunct
  // members in the rebalance. To prevent this going on indefinitely, we timeout JoinGroup requests
  // for new members. If the new member is still there, we expect it to retry.
  completeAndScheduleNextExpiration(group, member, NewMemberJoinTimeoutMs)

  //靜態(tài)成員
  if (member.isStaticMember) {
    info(s"Adding new static member $groupInstanceId to group ${group.groupId} with member id $memberId.")
    //添加到靜態(tài)成員列表
    group.addStaticMember(groupInstanceId, memberId)
  } else {
    //移除PendingMember列表,表示member添加成功
    group.removePendingMember(memberId)
  }
  //進(jìn)行rebalance的延遲任務(wù)處理
  maybePrepareRebalance(group, s"Adding new member $memberId with group instance id $groupInstanceId")
}

prepareRebalance()處理流程:

prepareRebalance()處理流程.png

prepareRebalance()主要源碼說(shuō)明:

private def prepareRebalance(group: GroupMetadata, reason: String): Unit = {
  // if any members are awaiting sync, cancel their request and have them rejoin
  //若gourp的所有成員已經(jīng)join完成,但還未sync完成,則所有Consumer返回REBALANCE_IN_PROGRESS,并等待重新join
  if (group.is(CompletingRebalance))
    resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)

  //若group的member為空,則創(chuàng)建InitialDelayedJoin延時(shí)任務(wù),否則創(chuàng)建DelayedJoin任務(wù)
  val delayedRebalance = if (group.is(Empty))
    new InitialDelayedJoin(this,
      joinPurgatory,
      group,
      groupConfig.groupInitialRebalanceDelayMs,
      groupConfig.groupInitialRebalanceDelayMs,
      max(group.rebalanceTimeoutMs - groupConfig.groupInitialRebalanceDelayMs, 0))
  else
    new DelayedJoin(this, group, group.rebalanceTimeoutMs)

  //將group狀態(tài)切換為PreparingRebalance
  group.transitionTo(PreparingRebalance)

  info(s"Preparing to rebalance group ${group.groupId} in state ${group.currentState} with old generation " +
    s"${group.generationId} (${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)}) (reason: $reason)")

  //將延時(shí)任務(wù)添加到j(luò)oinPurgatory,等待超時(shí)或全部member都join完成
  val groupKey = GroupKey(group.groupId)
  joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
}

InitialDelayedJoin和DelayedJoin延時(shí)任務(wù)處理流程:

InitialDelayedJoin和DelayedJoin延時(shí)任務(wù)處理流程.png

主要處理源碼說(shuō)明:

tryCompleteJoin()處理:

def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = {
  group.inLock {
    //若gorup中所有組員狀態(tài)為AwaitingJoin,并且pendingMembers為空
    if (group.hasAllMembersJoined)
      forceComplete()
    else false
  }
}

onCompleteJoin()處理:

def onCompleteJoin(group: GroupMetadata): Unit = {
  group.inLock {
    // 移除還未join的動(dòng)態(tài)成員
    group.notYetRejoinedMembers.filterNot(_.isStaticMember) foreach { failedMember =>
      removeHeartbeatForLeavingMember(group, failedMember)
      group.remove(failedMember.memberId)
      group.removeStaticMember(failedMember.groupInstanceId)
      // TODO: cut the socket connection to the client
    }

    if (group.is(Dead)) {
      info(s"Group ${group.groupId} is dead, skipping rebalance stage")
    }
    //組leader不存在且組成員存在?繼續(xù)等待,直到rebalanceTimeoutMs超時(shí)
    else if (!group.maybeElectNewJoinedLeader() && group.allMembers.nonEmpty) {
      // If all members are not rejoining, we will postpone the completion
      // of rebalance preparing stage, and send out another delayed operation
      // until session timeout removes all the non-responsive members.
      error(s"Group ${group.groupId} could not complete rebalance because no members rejoined")
      joinPurgatory.tryCompleteElseWatch(
        new DelayedJoin(this, group, group.rebalanceTimeoutMs),
        Seq(GroupKey(group.groupId)))
    } else {
      //生成新的generationId同時(shí)轉(zhuǎn)換group狀態(tài)到CompletingRebalance
      group.initNextGeneration()
      if (group.is(Empty)) {
        info(s"Group ${group.groupId} with generation ${group.generationId} is now empty " +
          s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")
        //存儲(chǔ)及緩存組信息
        groupManager.storeGroup(group, Map.empty, error => {
          if (error != Errors.NONE) {
            // we failed to write the empty group metadata. If the broker fails before another rebalance,
            // the previous generation written to the log will become active again (and most likely timeout).
            // This should be safe since there are no active members in an empty generation, so we just warn.
            warn(s"Failed to write empty metadata for group ${group.groupId}: ${error.message}")
          }
        })
      } else {
        info(s"Stabilized group ${group.groupId} generation ${group.generationId} " +
          s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")

        // trigger the awaiting join group response callback for all the members after rebalancing
        //遍歷組成員,返回JoinGroupResult,完成所有成員的join操作
        for (member <- group.allMemberMetadata) {
          val joinResult = JoinGroupResult(
            members = if (group.isLeader(member.memberId)) {
              group.currentMemberMetadata
            } else {
              List.empty
            },
            memberId = member.memberId,
            generationId = group.generationId,
            protocolType = group.protocolType,
            protocolName = group.protocolName,
            leaderId = group.leaderOrNull,
            error = Errors.NONE)

          group.maybeInvokeJoinCallback(member, joinResult)
          completeAndScheduleNextHeartbeatExpiration(group, member)
          member.isNew = false
        }
      }
    }
  }
}

3.2、SyncGroup處理

SyncGroup主要是Consumer向Group同步分配結(jié)果。對(duì)于leader,其在Sync的請(qǐng)求中會(huì)帶上所有Consumer的分配結(jié)果,而GroupCoordinator會(huì)根據(jù)leader的分配,將結(jié)果同步給所有Consumer成員。

handleSyncGroup()主要處理流程:

handleSyncGroup()主要處理流程.png

handleSyncGroup()源碼說(shuō)明:

private def doSyncGroup(group: GroupMetadata,
                        generationId: Int,
                        memberId: String,
                        protocolType: Option[String],
                        protocolName: Option[String],
                        groupInstanceId: Option[String],
                        groupAssignment: Map[String, Array[Byte]],
                        responseCallback: SyncCallback): Unit = {
  group.inLock {
      group.currentState match {
        case Empty =>
          responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))

        case PreparingRebalance =>
          responseCallback(SyncGroupResult(Errors.REBALANCE_IN_PROGRESS))

        //此狀態(tài)表明當(dāng)前正在進(jìn)行分區(qū)分配
        case CompletingRebalance =>
          //設(shè)置member對(duì)應(yīng)的sync回調(diào)
          group.get(memberId).awaitingSyncCallback = responseCallback

          // if this is the leader, then we can attempt to persist state and transition to stable
          //若當(dāng)前mmeber為leader,則需要獲取leader中的分配結(jié)果;否則不進(jìn)行處理
          if (group.isLeader(memberId)) {
            info(s"Assignment received from leader for group ${group.groupId} for generation ${group.generationId}")

            // fill any missing members with an empty assignment
            //處理未分配分區(qū)的member
            val missing = group.allMembers -- groupAssignment.keySet
            val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap

            if (missing.nonEmpty) {
              warn(s"Setting empty assignments for members $missing of ${group.groupId} for generation ${group.generationId}")
            }

            //將分配結(jié)果進(jìn)行存儲(chǔ)和緩存
            groupManager.storeGroup(group, assignment, (error: Errors) => {
              group.inLock {
                // another member may have joined the group while we were awaiting this callback,
                // so we must ensure we are still in the CompletingRebalance state and the same generation
                // when it gets invoked. if we have transitioned to another state, then do nothing
                if (group.is(CompletingRebalance) && generationId == group.generationId) {
                  //若緩存失敗
                  if (error != Errors.NONE) {
                    //重置所有member的分配結(jié)果為空,并返回給Consumer對(duì)應(yīng)的錯(cuò)誤
                    resetAndPropagateAssignmentError(group, error)
                    //設(shè)置DelayedJoin延時(shí)任務(wù),等待Consumer進(jìn)行rejoin
                    maybePrepareRebalance(group, s"error when storing group assignment during SyncGroup (member: $memberId)")
                  } else {
                    //設(shè)置所有member的分配結(jié)果,并返回給consumer對(duì)應(yīng)的分配結(jié)果
                    setAndPropagateAssignment(group, assignment)
                    //將group狀態(tài)轉(zhuǎn)換為Stable
                    group.transitionTo(Stable)
                  }
                }
              }
            })
            groupCompletedRebalanceSensor.record()
          }

        //此狀態(tài)表明group的leader分配已經(jīng)完成
        case Stable =>
          // if the group is stable, we just return the current assignment
          //獲取member信息并返回對(duì)應(yīng)的分配結(jié)果
          val memberMetadata = group.get(memberId)
          responseCallback(SyncGroupResult(group.protocolType, group.protocolName, memberMetadata.assignment, Errors.NONE))
          completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))

        case Dead =>
          throw new IllegalStateException(s"Reached unexpected condition for Dead group ${group.groupId}")
      }
    }
  }

3.3、CommitOffsets處理

CommitOffsets主要為Consumer提交消費(fèi)偏移量,GroupCoordinator會(huì)在兩種狀態(tài)下運(yùn)行提交offsets,Stable或PreparingRebalance,即Consumer可以在JoinGroup過(guò)程中和SyncGroup完成后可以進(jìn)行提交;

handleCommitOffsets()主要處理:

def handleCommitOffsets(groupId: String,
                        memberId: String,
                        groupInstanceId: Option[String],
                        generationId: Int,
                        offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
                        responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit = {
  validateGroupStatus(groupId, ApiKeys.OFFSET_COMMIT) match {
    case Some(error) => responseCallback(offsetMetadata.map { case (k, _) => k -> error })
    case None =>
      //獲取group信息
      groupManager.getGroup(groupId) match {
        case None =>
          //獲取不到且generationId<0,表示本次提交的offset為Consumer上個(gè)group的數(shù)據(jù)
          if (generationId < 0) {
            // the group is not relying on Kafka for group management, so allow the commit
            //添加group并提交offsets信息
            val group = groupManager.addGroup(new GroupMetadata(groupId, Empty, time))
            doCommitOffsets(group, memberId, groupInstanceId, generationId, offsetMetadata, responseCallback)
          } else {
            // or this is a request coming from an older generation. either way, reject the commit
            responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.ILLEGAL_GENERATION })
          }
        //獲取到group信息,則提交offsets
        case Some(group) =>
          doCommitOffsets(group, memberId, groupInstanceId, generationId, offsetMetadata, responseCallback)
      }
  }
}

doCommitOffsets()主要處理說(shuō)明:

private def doCommitOffsets(group: GroupMetadata,
                            memberId: String,
                            groupInstanceId: Option[String],
                            generationId: Int,
                            offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
                            responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit = {
  group.inLock {
   {
      group.currentState match {
        //當(dāng)前組狀態(tài)為  Stable | PreparingRebalance,則調(diào)用groupManager.storeOffsets()將offset信息保存到topic中及緩存中
        case Stable | PreparingRebalance =>
          // During PreparingRebalance phase, we still allow a commit request since we rely
          // on heartbeat response to eventually notify the rebalance in progress signal to the consumer
          val member = group.get(memberId)
          completeAndScheduleNextHeartbeatExpiration(group, member)
          groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback)
        case CompletingRebalance =>
          responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.REBALANCE_IN_PROGRESS })

        case _ =>
          throw new RuntimeException(s"Logic error: unexpected group state ${group.currentState}")
      }
    }
  }

3.4、FetchOffsets處理

FetchOffsets主要是Consumer獲取其分配的分區(qū)的消費(fèi)偏移量。

主要源碼說(shuō)明:

def handleFetchOffsets(groupId: String, requireStable: Boolean, partitions: Option[Seq[TopicPartition]] = None):
(Errors, Map[TopicPartition, OffsetFetchResponse.PartitionData]) = {

  validateGroupStatus(groupId, ApiKeys.OFFSET_FETCH) match {
    case Some(error) => error -> Map.empty
    case None =>
      // return offsets blindly regardless the current group state since the group may be using
      // Kafka commit storage without automatic group management
      //從GroupMetadataManager中獲取緩存的分區(qū)的偏移量及分區(qū)信息等
      (Errors.NONE, groupManager.getOffsets(groupId, requireStable, partitions))
  }
}

4、group及offsets存儲(chǔ)說(shuō)明

GroupCoordinator在Consumer進(jìn)行SyncGroup和CommitOffsets時(shí),會(huì)將group中各Consumer的分配結(jié)果及offset進(jìn)行保存,即提交到__consumer_offsets的主題中及本地內(nèi)存緩存中;

4.1、group存儲(chǔ)

所有Consumer在JoinGroup完成后,都會(huì)進(jìn)行SyncGroup,而leader在SyncGroup請(qǐng)求時(shí),會(huì)帶上所有成員的分配結(jié)果,此時(shí)GroupCoordinator會(huì)將group信息及對(duì)應(yīng)的分配結(jié)果提交的__consumer_offsets主題,格式為key-value的方式,提交成功后也會(huì)在本地內(nèi)存中緩存,方面后續(xù)查詢;

groupMetadataKey格式為group名稱,GroupMetadataValue格式為協(xié)議版本、Consumer的協(xié)議類型、generationId、protocolName、是否為leader、時(shí)間戳以及member相關(guān)的數(shù)據(jù),如memberId、clientId、clientHost、sessionTimeout、rebalanceTimeout、groupInstanceId、訂閱的協(xié)議、分配的分區(qū)等;

4.2、offsets存儲(chǔ)

offsets存儲(chǔ)格式同group一直,都是存儲(chǔ)在__consumer_offsets主題和緩存中;

offsetCommitKey格式為group、topic和partition信息,offsetCommitValue格式為存儲(chǔ)協(xié)議版本、提交的offset、leaderEpoch、offset的metadata、commit的時(shí)間戳等;

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容