1、GroupCoordinator及相關(guān)組件簡(jiǎn)介
GroupCoordinator模塊為Consumer管理器,每個(gè)GroupCoordinator管理一個(gè)或多個(gè)Consumer Group,同時(shí)管理了每個(gè)Consumer對(duì)應(yīng)的Partition及其消費(fèi)偏移信息;
每個(gè)ConsumerGroup確定對(duì)應(yīng)GroupCoordinator的計(jì)算公式為:
Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)
groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默認(rèn)是50個(gè)分區(qū),每個(gè)分區(qū)對(duì)應(yīng)的leader副本即為對(duì)應(yīng)的GroupCoordinator;
1.1、offset位移管理
當(dāng)Consumer消費(fèi)對(duì)應(yīng)的Partition中的消息后,需要提交消費(fèi)進(jìn)度給集群;而kafka集群將消費(fèi)進(jìn)度保存在__consumer_offsets主題中,實(shí)際是通過(guò)GroupCoordinator向偏移量主題中寫(xiě)入數(shù)據(jù),同時(shí)會(huì)緩存每個(gè)分區(qū)最新的消費(fèi)進(jìn)度,方便快速獲取分區(qū)的消費(fèi)進(jìn)度;GroupCoordinator是通過(guò)GroupMetadataManager組件來(lái)管理ConsumerGroup和消費(fèi)進(jìn)度的;
1.2、ConsumerReblance管理
kafka中Consumer和Partition是一對(duì)一的,即一個(gè)Consumer只會(huì)消費(fèi)對(duì)應(yīng)主題的一個(gè)分區(qū),而如何確定Consumer和Partition的對(duì)應(yīng)關(guān)系,則是通過(guò)GroupCoordinator來(lái)管理的;當(dāng)Consumer變更時(shí),GroupCoordinator會(huì)通過(guò)Reblance流程來(lái)重新分配Consumer與Partition的消費(fèi)關(guān)系;
2、group狀態(tài)轉(zhuǎn)換
2.1、group狀態(tài)轉(zhuǎn)換圖

2.3、group狀態(tài)轉(zhuǎn)換說(shuō)明
PreparingRebalance狀態(tài):
當(dāng)消費(fèi)者處于PreparingRebalance狀態(tài),GroupCoordinator可以正常處理OffsetFetchRequest,ListGroupRequest,OffsetCommitRequest請(qǐng)求;但是對(duì)于HeartbeatRequest和SyncGroupRequest,則會(huì)在其響應(yīng)里攜帶REBALANCE_IN_PROGRESS錯(cuò)誤碼進(jìn)行標(biāo)識(shí);當(dāng)收到JoinGroupRequest的時(shí)候會(huì)先創(chuàng)建對(duì)應(yīng)的DelayedJoin,等待滿足條件后對(duì)其響應(yīng)。
PreparingRebalance -> AwaitingSync: 當(dāng)有DelayedJoin超時(shí)或是消費(fèi)者組之前的成員(消費(fèi)者)都已經(jīng)重新申請(qǐng)加入時(shí)進(jìn)行切換;
PreparingRebalance -> Empty: 當(dāng)所有消費(fèi)者都離開(kāi)消費(fèi)者組時(shí)候切換;
PreparingRebalance -> Dead:分區(qū)遷移的時(shí)候刪除消費(fèi)者組;
AwaitingSync狀態(tài):
表示消費(fèi)者組正在等待Group Leader的SyncGroupRequest請(qǐng)求時(shí),當(dāng)GroupCoordinator收到OffsetCommitRequest和HeartbeatRequest時(shí)候,會(huì)在響應(yīng)中添加REBALANCE_IN_PROGRESS錯(cuò)誤碼進(jìn)行標(biāo)識(shí),對(duì)于來(lái)自follower的SyncGroupRequest則直接拋棄,直到收到Group Leader的SyncGroupRequest;
- AwaitingSync -> Stable: 當(dāng)GroupCoordinator收到Group Leader發(fā)來(lái)的SyncGroupRequest時(shí)進(jìn)行切換;
- AwaitingSync -> PreparingRebalance: 有消費(fèi)者加入或者退出消費(fèi)者組;消費(fèi)者組中有消費(fèi)者心跳超時(shí);已知成員更新元數(shù)據(jù);
- AwaitingSync -> Dead: 分區(qū)遷移的時(shí)候刪除消費(fèi)者組
Stable狀態(tài):
該狀態(tài)下,GroupCoordinator可以處理所有的請(qǐng)求,例如:Offset
FetchRequest,HeartbeatRequest,OffsetCommitRequest,來(lái)自follower的JoinGroupRequestd等等;
- Stable -> PreparingRebalance:消費(fèi)者組有消費(fèi)者心跳檢測(cè)超時(shí);有消費(fèi)者主動(dòng)退出;當(dāng)前Group Leader發(fā)送JoinGroupRequest;有新的消費(fèi)者請(qǐng)求加入消費(fèi)者組;
- Stable -> Dead: 分區(qū)遷移的時(shí)候刪除消費(fèi)者組
Dead狀態(tài):
處于此狀態(tài)的消費(fèi)者組中沒(méi)有消費(fèi)者,其對(duì)應(yīng)的GroupMetadata也將被刪除,除了OffsetCommitRequest其他請(qǐng)求響應(yīng)會(huì)攜帶UNKNOWN_MEMBER_ID.
Empty狀態(tài):
消費(fèi)者組中沒(méi)有消費(fèi)者了,但是不會(huì)被刪除,直到所有offset都已經(jīng)到期;這個(gè)狀態(tài)還表示消費(fèi)者組只用于offset提交;
- Empty -> Dead: 最后的offset被刪除;組因到期被刪除;組因分區(qū)遷移被刪除;
- Empty -> PreparingRebalance:新的成員加入,發(fā)送JoinGroupRequest;
3、主要處理流程
GroupCoordinator處理三類命令,OFFSET相關(guān)命令、GROUP相關(guān)命令及HEARTBEAT命令。具體命令如下:ApiKeys.OFFSET_COMMIT、ApiKeys.OFFSET_FETCH、ApiKeys.JOIN_GROUP、ApiKeys.LEAVE_GROUP、ApiKeys.SYNC_GROUP、ApiKeys.DESCRIBE_GROUPS、ApiKeys.LIST_GROUPS、ApiKeys.HEARTBEAT;
3.1、JoinGroup處理
JoinGroup顧名思義,為Consumer加入group請(qǐng)求,其主要是將Consumer的基本信息添加到Group中,而GroupCoordinator會(huì)選舉出一個(gè)Consumer成為leader,當(dāng)所有組成員都join完成后,leader會(huì)根據(jù)分配策略,對(duì)Consumer及對(duì)應(yīng)的分區(qū)進(jìn)行分配,并在sync過(guò)程中完成分配結(jié)果的同步。
handleJoinGroup()處理流程:

handleJoinGroup()主要源碼說(shuō)明:
def handleJoinGroup(groupId: String,
memberId: String,
groupInstanceId: Option[String],
requireKnownMemberId: Boolean,
clientId: String,
clientHost: String,
rebalanceTimeoutMs: Int,
sessionTimeoutMs: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
responseCallback: JoinCallback): Unit = {
val isUnknownMember = memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID
groupManager.getGroup(groupId) match {
case None =>
//當(dāng)group和memberId都不存在,表示分組中第一個(gè)Consumer進(jìn)行joinGroup,此時(shí)需要添加gourp及初始化group的狀態(tài)等信息
if (isUnknownMember) {
//添加新group到GroupMetadataManager
val group = groupManager.addGroup(new GroupMetadata(groupId, Empty, time))
//初始化group信息及添加InitialDelayedJoin處理等
doUnknownJoinGroup(group, groupInstanceId, requireKnownMemberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
} else {
responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID))
}
case Some(group) =>
group.inLock {
//group中member的大小超過(guò)了設(shè)置容量?移除memberId并報(bào)錯(cuò)
if ((groupIsOverCapacity(group)
&& group.has(memberId) && !group.get(memberId).isAwaitingJoin) // oversized group, need to shed members that haven't joined yet
|| (isUnknownMember && group.size >= groupConfig.groupMaxSize)) {
group.remove(memberId)
group.removeStaticMember(groupInstanceId)
responseCallback(JoinGroupResult(JoinGroupRequest.UNKNOWN_MEMBER_ID, Errors.GROUP_MAX_SIZE_REACHED))
} else if (isUnknownMember) {
//gourp存在,但memberId不存在
doUnknownJoinGroup(group, groupInstanceId, requireKnownMemberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
} else {
//group和mamberId都存在
doJoinGroup(group, memberId, groupInstanceId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
}
// attempt to complete JoinGroup
//若group的狀態(tài)為PreparingRebalance,嘗試處理InitialDelayedJoin延時(shí)任務(wù),
//InitialDelayedJoin任務(wù)中會(huì)對(duì)group中所有member進(jìn)行awaitingJoinCallback處理,即進(jìn)行JoinGroup的Response
//Response中會(huì)返回group中的leader,同時(shí)會(huì)對(duì)leader返回group中所有的members;
if (group.is(PreparingRebalance)) {
joinPurgatory.checkAndComplete(GroupKey(group.groupId))
}
}
}
}
doUnkonwJoinGroup()處理流程:

doUnkonwJoinGroup()主要源碼說(shuō)明:
private def doUnknownJoinGroup(group: GroupMetadata,
groupInstanceId: Option[String],
requireKnownMemberId: Boolean,
clientId: String,
clientHost: String,
rebalanceTimeoutMs: Int,
sessionTimeoutMs: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
responseCallback: JoinCallback): Unit = {
//生成新的memberId
val newMemberId = group.generateMemberId(clientId, groupInstanceId)
//當(dāng)前member為靜態(tài)成員,(groupInstanceId.isDefined && staticMembers.contains(groupInstanceId.get))靜態(tài)成員即為consumer配置了groupInstanceId的成員
if (group.hasStaticMember(groupInstanceId)) {
//更新靜態(tài)成員信息及rebalance處理
updateStaticMemberAndRebalance(group, newMemberId, groupInstanceId, protocols, responseCallback)
}
//必須要memberId,(joinGroupRequest.version >= 4 && groupInstanceId.isEmpty)
else if (requireKnownMemberId) {
// If member id required (dynamic membership), register the member in the pending member list
// and send back a response to call for another join group request with allocated member id.
debug(s"Dynamic member with unknown member id joins group ${group.groupId} in " +
s"${group.currentState} state. Created a new member id $newMemberId and request the member to rejoin with this id.")
//將新的memberId放入pendingMembers列表中,并返回MEMBER_ID_REQUIRED,讓consumer用新的memberId進(jìn)行rejoin
group.addPendingMember(newMemberId)
addPendingMemberExpiration(group, newMemberId, sessionTimeoutMs)
responseCallback(JoinGroupResult(newMemberId, Errors.MEMBER_ID_REQUIRED))
} else {
info(s"${if (groupInstanceId.isDefined) "Static" else "Dynamic"} Member with unknown member id joins group ${group.groupId} in " +
s"${group.currentState} state. Created a new member id $newMemberId for this member and add to the group.")
//添加member并進(jìn)行rebalance處理
addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, newMemberId, groupInstanceId,
clientId, clientHost, protocolType, protocols, group, responseCallback)
}
}
}
doJoinGroup()處理流程:

doJoinGroup()主要源碼說(shuō)明:
private def doJoinGroup(group: GroupMetadata,
memberId: String,
groupInstanceId: Option[String],
clientId: String,
clientHost: String,
rebalanceTimeoutMs: Int,
sessionTimeoutMs: Int,
protocolType: String,
protocols: List[(String, Array[Byte])],
responseCallback: JoinCallback): Unit = {
group.inLock {
if (group.is(Dead)) {
responseCallback(JoinGroupResult(memberId, Errors.COORDINATOR_NOT_AVAILABLE))
} else if (!group.supportsProtocols(protocolType, MemberMetadata.plainProtocolSet(protocols))) {
responseCallback(JoinGroupResult(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL))
}
//當(dāng)前memberId在PendingMember中,即當(dāng)前Consumer獲取memberId進(jìn)行了rejoin
else if (group.isPendingMember(memberId)) {
// A rejoining pending member will be accepted. Note that pending member will never be a static member.
//是pending狀態(tài)的member,表示其為動(dòng)態(tài)member,而此類型的member不允許設(shè)置groupInstanceId,只要靜態(tài)成員才能設(shè)置
if (groupInstanceId.isDefined) {
throw new IllegalStateException(s"the static member $groupInstanceId was not expected to be assigned " +
s"into pending member bucket with member id $memberId")
} else {
debug(s"Dynamic Member with specific member id $memberId joins group ${group.groupId} in " +
s"${group.currentState} state. Adding to the group now.")
//添加member及rebalance處理
addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, memberId, groupInstanceId,
clientId, clientHost, protocolType, protocols, group, responseCallback)
}
} else {
//
val groupInstanceIdNotFound = groupInstanceId.isDefined && !group.hasStaticMember(groupInstanceId)
//為靜態(tài)成員,但memberId和存儲(chǔ)的memberId不同
if (group.isStaticMemberFenced(memberId, groupInstanceId, "join-group")) {
// given member id doesn't match with the groupInstanceId. Inform duplicate instance to shut down immediately.
responseCallback(JoinGroupResult(memberId, Errors.FENCED_INSTANCE_ID))
}
//組不包含此member或此成員為靜態(tài)成員但無(wú)memberId
else if (!group.has(memberId) || groupInstanceIdNotFound) {
// If the dynamic member trying to register with an unrecognized id, or
// the static member joins with unknown group instance id, send the response to let
// it reset its member id and retry.
responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID))
} else {
val member = group.get(memberId)
//分組狀態(tài)來(lái)處理
group.currentState match {
//狀態(tài)為PreparingRebalance,表示當(dāng)前所有Consumer都在進(jìn)行Join處理
case PreparingRebalance =>
updateMemberAndRebalance(group, member, protocols, responseCallback)
//狀態(tài)為CompletingRebalance,表示當(dāng)前所有Consumer已經(jīng)join完成,正準(zhǔn)備Sync處理
case CompletingRebalance =>
if (member.matches(protocols)) {
// member is joining with the same metadata (which could be because it failed to
// receive the initial JoinGroup response), so just return current group information
// for the current generation.
//返回JoinGroupResult
responseCallback(JoinGroupResult(
members = if (group.isLeader(memberId)) {
group.currentMemberMetadata
} else {
List.empty
},
memberId = memberId,
generationId = group.generationId,
protocolType = group.protocolType,
protocolName = group.protocolName,
leaderId = group.leaderOrNull,
error = Errors.NONE))
} else {
// member has changed metadata, so force a rebalance
//更新member信息
updateMemberAndRebalance(group, member, protocols, responseCallback)
}
// 狀態(tài)為Stable,表示當(dāng)前所有Consumer已經(jīng)sync完成,即已經(jīng)rebalance完成,所有Consumer已經(jīng)可以正常消費(fèi)消息了
case Stable =>
val member = group.get(memberId)
//member為leader或其metadata已經(jīng)更改,則更新member信息并rebalance,否則返回當(dāng)前member的信息
if (group.isLeader(memberId) || !member.matches(protocols)) {
// force a rebalance if a member has changed metadata or if the leader sends JoinGroup.
// The latter allows the leader to trigger rebalances for changes affecting assignment
// which do not affect the member metadata (such as topic metadata changes for the consumer)
updateMemberAndRebalance(group, member, protocols, responseCallback)
} else {
// for followers with no actual change to their metadata, just return group information
// for the current generation which will allow them to issue SyncGroup
responseCallback(JoinGroupResult(
members = List.empty,
memberId = memberId,
generationId = group.generationId,
protocolType = group.protocolType,
protocolName = group.protocolName,
leaderId = group.leaderOrNull,
error = Errors.NONE))
}
case Empty | Dead =>
// Group reaches unexpected state. Let the joining member reset their generation and rejoin.
warn(s"Attempt to add rejoining member $memberId of group ${group.groupId} in " +
s"unexpected group state ${group.currentState}")
responseCallback(JoinGroupResult(memberId, Errors.UNKNOWN_MEMBER_ID))
}
}
}
}
}
addMemberAndRebalance()處理流程:

addMemberAndRebalance()主要源碼說(shuō)明:
private def addMemberAndRebalance(rebalanceTimeoutMs: Int,
sessionTimeoutMs: Int,
memberId: String,
groupInstanceId: Option[String],
clientId: String,
clientHost: String,
protocolType: String,
protocols: List[(String, Array[Byte])],
group: GroupMetadata,
callback: JoinCallback): Unit = {
//創(chuàng)建新的MemberMetadata
val member = new MemberMetadata(memberId, group.groupId, groupInstanceId,
clientId, clientHost, rebalanceTimeoutMs,
sessionTimeoutMs, protocolType, protocols)
member.isNew = true
// update the newMemberAdded flag to indicate that the join group can be further delayed
//若當(dāng)前group狀態(tài)為PreparingRebalance,并且還未完成
if (group.is(PreparingRebalance) && group.generationId == 0)
group.newMemberAdded = true
//添加新member
group.add(member, callback)
// The session timeout does not affect new members since they do not have their memberId and
// cannot send heartbeats. Furthermore, we cannot detect disconnects because sockets are muted
// while the JoinGroup is in purgatory. If the client does disconnect (e.g. because of a request
// timeout during a long rebalance), they may simply retry which will lead to a lot of defunct
// members in the rebalance. To prevent this going on indefinitely, we timeout JoinGroup requests
// for new members. If the new member is still there, we expect it to retry.
completeAndScheduleNextExpiration(group, member, NewMemberJoinTimeoutMs)
//靜態(tài)成員
if (member.isStaticMember) {
info(s"Adding new static member $groupInstanceId to group ${group.groupId} with member id $memberId.")
//添加到靜態(tài)成員列表
group.addStaticMember(groupInstanceId, memberId)
} else {
//移除PendingMember列表,表示member添加成功
group.removePendingMember(memberId)
}
//進(jìn)行rebalance的延遲任務(wù)處理
maybePrepareRebalance(group, s"Adding new member $memberId with group instance id $groupInstanceId")
}
prepareRebalance()處理流程:

prepareRebalance()主要源碼說(shuō)明:
private def prepareRebalance(group: GroupMetadata, reason: String): Unit = {
// if any members are awaiting sync, cancel their request and have them rejoin
//若gourp的所有成員已經(jīng)join完成,但還未sync完成,則所有Consumer返回REBALANCE_IN_PROGRESS,并等待重新join
if (group.is(CompletingRebalance))
resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)
//若group的member為空,則創(chuàng)建InitialDelayedJoin延時(shí)任務(wù),否則創(chuàng)建DelayedJoin任務(wù)
val delayedRebalance = if (group.is(Empty))
new InitialDelayedJoin(this,
joinPurgatory,
group,
groupConfig.groupInitialRebalanceDelayMs,
groupConfig.groupInitialRebalanceDelayMs,
max(group.rebalanceTimeoutMs - groupConfig.groupInitialRebalanceDelayMs, 0))
else
new DelayedJoin(this, group, group.rebalanceTimeoutMs)
//將group狀態(tài)切換為PreparingRebalance
group.transitionTo(PreparingRebalance)
info(s"Preparing to rebalance group ${group.groupId} in state ${group.currentState} with old generation " +
s"${group.generationId} (${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)}) (reason: $reason)")
//將延時(shí)任務(wù)添加到j(luò)oinPurgatory,等待超時(shí)或全部member都join完成
val groupKey = GroupKey(group.groupId)
joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
}
InitialDelayedJoin和DelayedJoin延時(shí)任務(wù)處理流程:

主要處理源碼說(shuō)明:
tryCompleteJoin()處理:
def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = {
group.inLock {
//若gorup中所有組員狀態(tài)為AwaitingJoin,并且pendingMembers為空
if (group.hasAllMembersJoined)
forceComplete()
else false
}
}
onCompleteJoin()處理:
def onCompleteJoin(group: GroupMetadata): Unit = {
group.inLock {
// 移除還未join的動(dòng)態(tài)成員
group.notYetRejoinedMembers.filterNot(_.isStaticMember) foreach { failedMember =>
removeHeartbeatForLeavingMember(group, failedMember)
group.remove(failedMember.memberId)
group.removeStaticMember(failedMember.groupInstanceId)
// TODO: cut the socket connection to the client
}
if (group.is(Dead)) {
info(s"Group ${group.groupId} is dead, skipping rebalance stage")
}
//組leader不存在且組成員存在?繼續(xù)等待,直到rebalanceTimeoutMs超時(shí)
else if (!group.maybeElectNewJoinedLeader() && group.allMembers.nonEmpty) {
// If all members are not rejoining, we will postpone the completion
// of rebalance preparing stage, and send out another delayed operation
// until session timeout removes all the non-responsive members.
error(s"Group ${group.groupId} could not complete rebalance because no members rejoined")
joinPurgatory.tryCompleteElseWatch(
new DelayedJoin(this, group, group.rebalanceTimeoutMs),
Seq(GroupKey(group.groupId)))
} else {
//生成新的generationId同時(shí)轉(zhuǎn)換group狀態(tài)到CompletingRebalance
group.initNextGeneration()
if (group.is(Empty)) {
info(s"Group ${group.groupId} with generation ${group.generationId} is now empty " +
s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")
//存儲(chǔ)及緩存組信息
groupManager.storeGroup(group, Map.empty, error => {
if (error != Errors.NONE) {
// we failed to write the empty group metadata. If the broker fails before another rebalance,
// the previous generation written to the log will become active again (and most likely timeout).
// This should be safe since there are no active members in an empty generation, so we just warn.
warn(s"Failed to write empty metadata for group ${group.groupId}: ${error.message}")
}
})
} else {
info(s"Stabilized group ${group.groupId} generation ${group.generationId} " +
s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")
// trigger the awaiting join group response callback for all the members after rebalancing
//遍歷組成員,返回JoinGroupResult,完成所有成員的join操作
for (member <- group.allMemberMetadata) {
val joinResult = JoinGroupResult(
members = if (group.isLeader(member.memberId)) {
group.currentMemberMetadata
} else {
List.empty
},
memberId = member.memberId,
generationId = group.generationId,
protocolType = group.protocolType,
protocolName = group.protocolName,
leaderId = group.leaderOrNull,
error = Errors.NONE)
group.maybeInvokeJoinCallback(member, joinResult)
completeAndScheduleNextHeartbeatExpiration(group, member)
member.isNew = false
}
}
}
}
}
3.2、SyncGroup處理
SyncGroup主要是Consumer向Group同步分配結(jié)果。對(duì)于leader,其在Sync的請(qǐng)求中會(huì)帶上所有Consumer的分配結(jié)果,而GroupCoordinator會(huì)根據(jù)leader的分配,將結(jié)果同步給所有Consumer成員。
handleSyncGroup()主要處理流程:

handleSyncGroup()源碼說(shuō)明:
private def doSyncGroup(group: GroupMetadata,
generationId: Int,
memberId: String,
protocolType: Option[String],
protocolName: Option[String],
groupInstanceId: Option[String],
groupAssignment: Map[String, Array[Byte]],
responseCallback: SyncCallback): Unit = {
group.inLock {
group.currentState match {
case Empty =>
responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))
case PreparingRebalance =>
responseCallback(SyncGroupResult(Errors.REBALANCE_IN_PROGRESS))
//此狀態(tài)表明當(dāng)前正在進(jìn)行分區(qū)分配
case CompletingRebalance =>
//設(shè)置member對(duì)應(yīng)的sync回調(diào)
group.get(memberId).awaitingSyncCallback = responseCallback
// if this is the leader, then we can attempt to persist state and transition to stable
//若當(dāng)前mmeber為leader,則需要獲取leader中的分配結(jié)果;否則不進(jìn)行處理
if (group.isLeader(memberId)) {
info(s"Assignment received from leader for group ${group.groupId} for generation ${group.generationId}")
// fill any missing members with an empty assignment
//處理未分配分區(qū)的member
val missing = group.allMembers -- groupAssignment.keySet
val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap
if (missing.nonEmpty) {
warn(s"Setting empty assignments for members $missing of ${group.groupId} for generation ${group.generationId}")
}
//將分配結(jié)果進(jìn)行存儲(chǔ)和緩存
groupManager.storeGroup(group, assignment, (error: Errors) => {
group.inLock {
// another member may have joined the group while we were awaiting this callback,
// so we must ensure we are still in the CompletingRebalance state and the same generation
// when it gets invoked. if we have transitioned to another state, then do nothing
if (group.is(CompletingRebalance) && generationId == group.generationId) {
//若緩存失敗
if (error != Errors.NONE) {
//重置所有member的分配結(jié)果為空,并返回給Consumer對(duì)應(yīng)的錯(cuò)誤
resetAndPropagateAssignmentError(group, error)
//設(shè)置DelayedJoin延時(shí)任務(wù),等待Consumer進(jìn)行rejoin
maybePrepareRebalance(group, s"error when storing group assignment during SyncGroup (member: $memberId)")
} else {
//設(shè)置所有member的分配結(jié)果,并返回給consumer對(duì)應(yīng)的分配結(jié)果
setAndPropagateAssignment(group, assignment)
//將group狀態(tài)轉(zhuǎn)換為Stable
group.transitionTo(Stable)
}
}
}
})
groupCompletedRebalanceSensor.record()
}
//此狀態(tài)表明group的leader分配已經(jīng)完成
case Stable =>
// if the group is stable, we just return the current assignment
//獲取member信息并返回對(duì)應(yīng)的分配結(jié)果
val memberMetadata = group.get(memberId)
responseCallback(SyncGroupResult(group.protocolType, group.protocolName, memberMetadata.assignment, Errors.NONE))
completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))
case Dead =>
throw new IllegalStateException(s"Reached unexpected condition for Dead group ${group.groupId}")
}
}
}
3.3、CommitOffsets處理
CommitOffsets主要為Consumer提交消費(fèi)偏移量,GroupCoordinator會(huì)在兩種狀態(tài)下運(yùn)行提交offsets,Stable或PreparingRebalance,即Consumer可以在JoinGroup過(guò)程中和SyncGroup完成后可以進(jìn)行提交;
handleCommitOffsets()主要處理:
def handleCommitOffsets(groupId: String,
memberId: String,
groupInstanceId: Option[String],
generationId: Int,
offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit = {
validateGroupStatus(groupId, ApiKeys.OFFSET_COMMIT) match {
case Some(error) => responseCallback(offsetMetadata.map { case (k, _) => k -> error })
case None =>
//獲取group信息
groupManager.getGroup(groupId) match {
case None =>
//獲取不到且generationId<0,表示本次提交的offset為Consumer上個(gè)group的數(shù)據(jù)
if (generationId < 0) {
// the group is not relying on Kafka for group management, so allow the commit
//添加group并提交offsets信息
val group = groupManager.addGroup(new GroupMetadata(groupId, Empty, time))
doCommitOffsets(group, memberId, groupInstanceId, generationId, offsetMetadata, responseCallback)
} else {
// or this is a request coming from an older generation. either way, reject the commit
responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.ILLEGAL_GENERATION })
}
//獲取到group信息,則提交offsets
case Some(group) =>
doCommitOffsets(group, memberId, groupInstanceId, generationId, offsetMetadata, responseCallback)
}
}
}
doCommitOffsets()主要處理說(shuō)明:
private def doCommitOffsets(group: GroupMetadata,
memberId: String,
groupInstanceId: Option[String],
generationId: Int,
offsetMetadata: immutable.Map[TopicPartition, OffsetAndMetadata],
responseCallback: immutable.Map[TopicPartition, Errors] => Unit): Unit = {
group.inLock {
{
group.currentState match {
//當(dāng)前組狀態(tài)為 Stable | PreparingRebalance,則調(diào)用groupManager.storeOffsets()將offset信息保存到topic中及緩存中
case Stable | PreparingRebalance =>
// During PreparingRebalance phase, we still allow a commit request since we rely
// on heartbeat response to eventually notify the rebalance in progress signal to the consumer
val member = group.get(memberId)
completeAndScheduleNextHeartbeatExpiration(group, member)
groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback)
case CompletingRebalance =>
responseCallback(offsetMetadata.map { case (k, _) => k -> Errors.REBALANCE_IN_PROGRESS })
case _ =>
throw new RuntimeException(s"Logic error: unexpected group state ${group.currentState}")
}
}
}
3.4、FetchOffsets處理
FetchOffsets主要是Consumer獲取其分配的分區(qū)的消費(fèi)偏移量。
主要源碼說(shuō)明:
def handleFetchOffsets(groupId: String, requireStable: Boolean, partitions: Option[Seq[TopicPartition]] = None):
(Errors, Map[TopicPartition, OffsetFetchResponse.PartitionData]) = {
validateGroupStatus(groupId, ApiKeys.OFFSET_FETCH) match {
case Some(error) => error -> Map.empty
case None =>
// return offsets blindly regardless the current group state since the group may be using
// Kafka commit storage without automatic group management
//從GroupMetadataManager中獲取緩存的分區(qū)的偏移量及分區(qū)信息等
(Errors.NONE, groupManager.getOffsets(groupId, requireStable, partitions))
}
}
4、group及offsets存儲(chǔ)說(shuō)明
GroupCoordinator在Consumer進(jìn)行SyncGroup和CommitOffsets時(shí),會(huì)將group中各Consumer的分配結(jié)果及offset進(jìn)行保存,即提交到__consumer_offsets的主題中及本地內(nèi)存緩存中;
4.1、group存儲(chǔ)
所有Consumer在JoinGroup完成后,都會(huì)進(jìn)行SyncGroup,而leader在SyncGroup請(qǐng)求時(shí),會(huì)帶上所有成員的分配結(jié)果,此時(shí)GroupCoordinator會(huì)將group信息及對(duì)應(yīng)的分配結(jié)果提交的__consumer_offsets主題,格式為key-value的方式,提交成功后也會(huì)在本地內(nèi)存中緩存,方面后續(xù)查詢;
groupMetadataKey格式為group名稱,GroupMetadataValue格式為協(xié)議版本、Consumer的協(xié)議類型、generationId、protocolName、是否為leader、時(shí)間戳以及member相關(guān)的數(shù)據(jù),如memberId、clientId、clientHost、sessionTimeout、rebalanceTimeout、groupInstanceId、訂閱的協(xié)議、分配的分區(qū)等;
4.2、offsets存儲(chǔ)
offsets存儲(chǔ)格式同group一直,都是存儲(chǔ)在__consumer_offsets主題和緩存中;
offsetCommitKey格式為group、topic和partition信息,offsetCommitValue格式為存儲(chǔ)協(xié)議版本、提交的offset、leaderEpoch、offset的metadata、commit的時(shí)間戳等;