前言
本文來(lái)自 極客時(shí)間 Kafka核心技術(shù)與實(shí)戰(zhàn)
這段時(shí)間有看 極客時(shí)間的這個(gè)課程,
這里僅以分享的角度來(lái)做個(gè)筆記。
那么本文將涉及到以下幾個(gè)知識(shí)點(diǎn):
- 重平衡是什么?為什么要了解他?
- 發(fā)生重平衡的時(shí)機(jī)
- Kafka的心跳機(jī)制 與 Rebalance
- 消費(fèi)者組狀態(tài)切換
- 重平衡全流程解析
重平衡是什么?為什么要了解他?
-
重平衡是什么
Rebalance(重平衡 )本質(zhì)上是一種協(xié)議,
規(guī)定了一個(gè)Consumer Group下的所有 Consumer 如何達(dá)成一致,
來(lái)分配訂閱Topic的每個(gè)分區(qū)。說(shuō)簡(jiǎn)單點(diǎn)就是 給消費(fèi)組每個(gè)消費(fèi)者分配消費(fèi)任務(wù)的過(guò)程。
為什么要了解他?
Rebalance是啟動(dòng)一個(gè)消費(fèi)者組必經(jīng)的過(guò)程,
當(dāng)然這不是最主要的,最主要的是,在消費(fèi)的過(guò)程中,
在某些情況下會(huì)導(dǎo)致這個(gè)過(guò)程再次發(fā)生,
帶來(lái)的后果就是整個(gè)集群暫時(shí)性的癱瘓,
嚴(yán)重影響到Kafka的高可用
發(fā)生重平衡的時(shí)機(jī)
那么 Rebalance 會(huì)在什么時(shí)候發(fā)生呢?
訂閱主題數(shù)發(fā)生變化
這種情況一般不會(huì)發(fā)生,
如果發(fā)生,那也是因?yàn)槲覀兊臉I(yè)務(wù)調(diào)整才會(huì),
所以這種基本要么不發(fā)生要么就是不可避免。主題分區(qū)發(fā)生變化
這種情況發(fā)生會(huì)相對(duì)多一點(diǎn),但是也有限,
在部署Kafka集群前,
我們就需要考慮到該集群的容量,
以便來(lái)確定好分區(qū)數(shù)。
雖然不一定一步到位,
但是調(diào)整的次數(shù)應(yīng)該是極其有限的,
一般也可以選擇在半夜低峰的時(shí)候進(jìn)行調(diào)整,影響不大。-
消費(fèi)端的消費(fèi)者組成員變化
基本上影響最大的就是這個(gè)原因了,
為什么這么說(shuō)呢?
我們考慮下什么時(shí)候消費(fèi)者組的成員會(huì)發(fā)生變化就能大概了解了。- 消費(fèi)者處理消息超時(shí),
即如果消費(fèi)者處理消費(fèi)的消息的時(shí)間超過(guò)了
Kafka集群配置的max.poll.interval.ms的值,
那么該消費(fèi)者將會(huì)自動(dòng)離組 - 心跳超時(shí),
如果消費(fèi)者在指定的session.timeout.ms時(shí)間內(nèi)沒(méi)有匯報(bào)心跳,
那么Kafka就會(huì)認(rèn)為該消費(fèi)已經(jīng)dead了
- 消費(fèi)者處理消息超時(shí),
可以看出,消費(fèi)端的消費(fèi)者組成員變化一般都是由于異常引起的,
所以其產(chǎn)生的 Rebalance 也是最難控制的。
Kafka的心跳機(jī)制 與 Rebalance
Kafka的心跳機(jī)制 與 Rebalance 有什么關(guān)系呢?
事實(shí)上,重平衡過(guò)程是靠消費(fèi)者端的心跳線程(Heartbeat Thread)通知到其他消費(fèi)者實(shí)例的
每當(dāng)消費(fèi)者向其 coordinator 匯報(bào)心跳的時(shí)候,
如果這個(gè)時(shí)候 coordinator 決定開啟 Rebalance ,
那么 coordinator 會(huì)將REBALANCE_IN_PROGRESS封裝到心跳的響應(yīng)中,
當(dāng)消費(fèi)者接受到這個(gè)REBALANCE_IN_PROGRESS,
他就知道需要開啟新的一輪 Rebalance 了,
所以heartbeat.interval.ms除了是設(shè)置心跳的間隔時(shí)間,
其實(shí)也意味著 Rebalance 感知速度,
心跳越快,那么 Rebalance 就能更快的被各個(gè)消費(fèi)者感知。
在 Kafka 0.10.1.0 版本之前,
發(fā)送心跳請(qǐng)求是在消費(fèi)者主線程完成的,
也就是你寫代碼調(diào)用KafkaConsumer.poll方法的那個(gè)線程。
這樣做有諸多弊病,最大的問(wèn)題在于,消息處理邏輯也是在這個(gè)線程中完成的。
因此,一旦消息處理消耗了過(guò)長(zhǎng)的時(shí)間,
心跳請(qǐng)求將無(wú)法及時(shí)發(fā)到協(xié)調(diào)者那里,
導(dǎo)致協(xié)調(diào)者“錯(cuò)誤地”認(rèn)為該消費(fèi)者已“死”。
自 0.10.1.0 版本開始,
社區(qū)引入了一個(gè)單獨(dú)的心跳線程來(lái)專門執(zhí)行心跳請(qǐng)求發(fā)送,避免了這個(gè)問(wèn)題。
消費(fèi)者組狀態(tài)切換
為什么要了解 消費(fèi)者組狀態(tài) 呢?
這里主要是為了方便講解 Rebalance 流程,
所以你需要大概了解一下消費(fèi)者組的狀態(tài)切換,
如下圖

其流轉(zhuǎn)過(guò)程大概如下:

一個(gè)消費(fèi)者組最開始是Empty狀態(tài),
當(dāng)重平衡過(guò)程開啟后,
它會(huì)被置于PreparingRebalance狀態(tài) 等待成員加入,
成員都加入之后變更到CompletingRebalance狀態(tài)等待分配方案,
當(dāng)coordinator分配完個(gè)消費(fèi)者消費(fèi)的分區(qū)后,
最后就流轉(zhuǎn)到Stable狀態(tài)完成重平衡。
當(dāng)有新成員加入或已有成員退出時(shí),
消費(fèi)者組的狀態(tài) 從Stable直接跳到PreparingRebalance狀態(tài),
此時(shí),所有現(xiàn)存成員就必須重新申請(qǐng)加入組。
當(dāng)所有成員都退出組后,消費(fèi)者組狀態(tài)變更為Empty。
Kafka定期自動(dòng)刪除過(guò)期位移的條件就是,組要處于Empty狀態(tài)。
因此,如果你的消費(fèi)者組停掉了很長(zhǎng)時(shí)間(超過(guò)7天),
那么Kafka很可能就把該組的位移數(shù)據(jù)刪除了。
消費(fèi)者端重平衡流程
在消費(fèi)者端,重平衡分為兩個(gè)步驟:
-
加入組。
當(dāng)組內(nèi)成員加入組時(shí),它會(huì)向 coordinator 發(fā)送JoinGroup請(qǐng)求。
在該請(qǐng)求中,每個(gè)成員都要將自己訂閱的主題上報(bào),
這樣協(xié)調(diào)者就能收集到所有成員的訂閱信息。
一旦收集了全部成員的JoinGroup請(qǐng)求后,
Coordinator 會(huì)從這些成員中選擇一個(gè)擔(dān)任這個(gè)消費(fèi)者組的領(lǐng)導(dǎo)者。通常情況下,第一個(gè)發(fā)送JoinGroup請(qǐng)求的成員自動(dòng)成為領(lǐng)導(dǎo)者。
領(lǐng)導(dǎo)者消費(fèi)者的任務(wù)是收集所有成員的訂閱信息,
然后根據(jù)這些信息,制定具體的分區(qū)消費(fèi)分配方案。
特別注意的是:這里說(shuō)的是消費(fèi)者領(lǐng)導(dǎo)者。選出領(lǐng)導(dǎo)者之后,
Coordinator 會(huì)把消費(fèi)者組訂閱信息封裝進(jìn)JoinGroup請(qǐng)求的 響應(yīng)體中,
然后發(fā)給領(lǐng)導(dǎo)者,由領(lǐng)導(dǎo)者統(tǒng)一做出分配方案后,
進(jìn)入到下一步:發(fā)送SyncGroup請(qǐng)求。
如下圖就是 JoinGroup 的全過(guò)程[圖片上傳中...(25-消費(fèi)者組重平衡全流程解析.jpg-d67470-1567669412412-0)]JoinGroup 流程解析.jpg -
領(lǐng)導(dǎo)者消費(fèi)者(Leader Consumer)分配方案。
領(lǐng)導(dǎo)者向 Coordinator 發(fā)送SyncGroup請(qǐng)求,
將剛剛做出的分配方案發(fā)給協(xié)調(diào)者。
值得注意的是,其他成員也會(huì)向 Coordinator 發(fā)送SyncGroup請(qǐng)求,
只不過(guò)請(qǐng)求體中并沒(méi)有實(shí)際的內(nèi)容。
這一步的主要目的是讓 Coordinator 接收分配方案,
然后統(tǒng)一以 SyncGroup 響應(yīng)的方式分發(fā)給所有成員,
這樣組內(nèi)所有成員就都知道自己該消費(fèi)哪些分區(qū)了。如下圖:
SyncGroup全流程解析.jpg
消費(fèi)者端重平衡流程 大概就這樣了,下面我們?cè)賮?lái)看看:Broker端重平衡
Broker端重平衡
要剖析協(xié)調(diào)者端處理重平衡的全流程,
我們必須要分幾個(gè)場(chǎng)景來(lái)討論。
這幾個(gè)場(chǎng)景分別是
- 新成員加入組
- 組成員主動(dòng)離組
- 組成員崩潰離組
- 組成員提交位移。
接下來(lái),我們一個(gè)一個(gè)來(lái)討論。
-
新成員入組。
新成員入組是指組處于Stable狀態(tài)后,有新成員加入。
如果是全新啟動(dòng)一個(gè)消費(fèi)者組,Kafka是有一些自己的小優(yōu)化的,流程上會(huì)有些許的不同。
我們這里討論的是,組穩(wěn)定了之后有新成員加入的情形。
當(dāng)協(xié)調(diào)者收到新的JoinGroup請(qǐng)求后,
它會(huì)通過(guò)心跳請(qǐng)求響應(yīng)的方式通知組內(nèi)現(xiàn)有的所有成員,
強(qiáng)制它們開啟新一輪的重平衡。
具體的過(guò)程和之前的客戶端重平衡流程是一樣的。
現(xiàn)在,我用一張時(shí)序圖來(lái)說(shuō)明協(xié)調(diào)者一端是如何處理新成員入組的。新成員入組場(chǎng)景.jpg -
組成員主動(dòng)離組。
何謂主動(dòng)離組?就是指消費(fèi)者實(shí)例所在線程或進(jìn)程調(diào)用close()方法主動(dòng)通知協(xié)調(diào)者它要退出。
這個(gè)場(chǎng)景就涉及到了第三類請(qǐng)求:LeaveGroup請(qǐng)求。
協(xié)調(diào)者收到LeaveGroup請(qǐng)求后,依然會(huì)以心跳響應(yīng)的方式通知其他成員,
因此我就不再贅述了,還是直接用一張圖來(lái)說(shuō)明。
組成員主動(dòng)離組場(chǎng)景.jpg -
組成員崩潰離組。
崩潰離組是指消費(fèi)者實(shí)例出現(xiàn)嚴(yán)重故障,突然宕機(jī)導(dǎo)致的離組。
它和主動(dòng)離組是有區(qū)別的,
因?yàn)楹笳呤侵鲃?dòng)發(fā)起的離組,協(xié)調(diào)者能馬上感知并處理。
但崩潰離組是被動(dòng)的,協(xié)調(diào)者通常需要等待一段時(shí)間才能感知到,
這段時(shí)間一般是由消費(fèi)者端參數(shù)session.timeout.ms控制的。
也就是說(shuō),Kafka一般不會(huì)超過(guò)session.timeout.ms就能感知到這個(gè)崩潰。
當(dāng)然,后面處理崩潰離組的流程與之前是一樣的,我們來(lái)看看下面這張圖。
組成員崩潰離組場(chǎng)景.jpg -
重平衡時(shí)協(xié)調(diào)者對(duì)組內(nèi)成員提交位移的處理。
正常情況下,每個(gè)組內(nèi)成員都會(huì)定期匯報(bào)位移給協(xié)調(diào)者。
當(dāng)重平衡開啟時(shí),協(xié)調(diào)者會(huì)給予成員一段緩沖時(shí)間,
要求每個(gè)成員必須在這段時(shí)間內(nèi)快速地上報(bào)自己的位移信息,
然后再開啟正常的JoinGroup/SyncGroup請(qǐng)求發(fā)送。
還是老辦法,我們使用一張圖來(lái)說(shuō)明。
組內(nèi)成員提交位移場(chǎng)景.jpg
總結(jié):
其實(shí)不論哪種方式,都是差不多的流程,這里放開舉例,最主要的還是為了更加清晰,如果發(fā)生類似的問(wèn)題,可以很快的從上面這些可能入手。
基本流程就是 Coordinator 感知到 消費(fèi)者組的變化,
然后在心跳的過(guò)程中發(fā)送重平衡信號(hào)通知各個(gè)消費(fèi)者離組,
然后消費(fèi)者重新以 JoinGroup 方式加入 Coordinator,并選出Consumer Leader。
當(dāng)所有消費(fèi)者加入 Coordinator,
Consumer Leader會(huì)根據(jù) Coordinator給予的分區(qū)信息給出分區(qū)方案。
Coordinator 將該方案以 SyncGroup 的方式將該方案執(zhí)行下去,通知各個(gè)消費(fèi)者,
這樣就完成了一輪 重平衡了。





