Kafka的也存在Leader和Follow節(jié)點(diǎn),這樣就會(huì)有一致性問(wèn)題。
概念
1. ?位標(biāo)記
?位或?印(watermark)?詞,表示位置信息,即位移(offset)。Kafka源碼中使?的名字是??位,HW(high watermark)。
2. 副本??
Kafka分區(qū)使?多個(gè)副本(replica)提供?可?。
3. LEO和HW
每個(gè)分區(qū)副本對(duì)象都有兩個(gè)重要的屬性:LEO和HW。
LEO:即?志末端位移(log end offset),記錄了該副本?志中下?條消息的位移值。如果LEO=10,那么表示該副本保存了10條消息,位移值范圍是[0, 9]。另外,Leader LEO和Follower LEO的更新是有區(qū)別的。
HW:即上?提到的?位值。對(duì)于同?個(gè)副本對(duì)象??,其HW值不會(huì)?于LEO值。?于等于HW值的所有消息都被認(rèn)為是“已備份”的(replicated)。Leader副本和Follower副本的HW更新不同。

上圖中,HW值是7,表示位移是 0~7 的所有消息都已經(jīng)處于“已提交狀態(tài)”(committed),?LEO值是14,8~13的消息就是未完全備份(fully replicated)——為什么沒(méi)有14?LEO指向的是下?條消息到來(lái)時(shí)的位移。
消費(fèi)者?法消費(fèi)分區(qū)下Leader副本中位移?于分區(qū)HW的消息。
Follower副本何時(shí)更新LEO
Follower副本不停地向Leader副本所在的broker發(fā)送FETCH請(qǐng)求,?旦獲取消息后寫(xiě)???的?志中進(jìn)?備份。
那么Follower副本的LEO是何時(shí)更新的呢??先我必須?明,Kafka有兩套Follower副本LEO:
- ?套LEO保存在Follower副本所在Broker的副本管理機(jī)中;
- 另?套LEO保存在Leader副本所在Broker的副本管理機(jī)中。Leader副本機(jī)器上保存了所有的follower副本的LEO。
Kafka使?前者幫助Follower副本更新其HW值;利?后者幫助Leader副本更新其HW。 - Follower副本的本地LEO何時(shí)更新?
Follower副本的LEO值就是?志的LEO值,每當(dāng)新寫(xiě)??條消息,LEO值就會(huì)被更新。當(dāng)Follower發(fā)送FETCH請(qǐng)求后,Leader將數(shù)據(jù)返回給Follower,此時(shí)Follower開(kāi)始Log寫(xiě)數(shù)據(jù),從??動(dòng)更新LEO值。 - Leader端Follower的LEO何時(shí)更新?
Leader端的Follower的LEO更新發(fā)?在Leader在處理Follower FETCH請(qǐng)求時(shí)。?旦Leader接收到Follower發(fā)送的FETCH請(qǐng)求,它先從Log中讀取相應(yīng)的數(shù)據(jù),給Follower返回?cái)?shù)據(jù)前,先更新Follower的LEO。
Follower副本何時(shí)更新HW
Follower更新HW發(fā)?在其更新LEO之后,?旦Follower向Log寫(xiě)完數(shù)據(jù),嘗試更新??的HW值。
?較當(dāng)前LEO值與FETCH響應(yīng)中Leader的HW值,取兩者的?者作為新的HW值。
即:如果Follower的LEO?于Leader的HW,F(xiàn)ollower HW值不會(huì)?于Leader的HW值。
Leader副本何時(shí)更新LEO
和Follower更新LEO相同,Leader寫(xiě)Log時(shí)?動(dòng)更新??的LEO值。
Leader副本何時(shí)更新HW值
Leader的HW值就是分區(qū)HW值,直接影響分區(qū)數(shù)據(jù)對(duì)消費(fèi)者的可?性 。
Leader會(huì)嘗試去更新分區(qū)HW的四種情況:
- Follower副本成為L(zhǎng)eader副本時(shí):Kafka會(huì)嘗試去更新分區(qū)HW。
- Broker崩潰導(dǎo)致副本被踢出ISR時(shí):檢查下分區(qū)HW值是否需要更新是有必要的。
- ?產(chǎn)者向Leader副本寫(xiě)消息時(shí):因?yàn)閷?xiě)?消息會(huì)更新Leader的LEO,有必要檢查HW值是否需要更新
- Leader處理Follower FETCH請(qǐng)求時(shí):?先從Log讀取數(shù)據(jù),之后嘗試更新分區(qū)HW值
結(jié)論:
當(dāng)Kafka broker都正常?作時(shí),分區(qū)HW值的更新時(shí)機(jī)有兩個(gè):
- Leader處理PRODUCE請(qǐng)求時(shí)
- Leader處理FETCH請(qǐng)求時(shí)。
Leader如何更新??的HW值?Leader broker上保存了?套Follower副本的LEO以及??的LEO。當(dāng)嘗試確定分
區(qū)HW時(shí),它會(huì)選出所有滿?條件的副本,?較它們的LEO(包括Leader的LEO),并選擇最?的LEO值作為HW值。
需要滿?的條件,(?選?): - 處于ISR中
- 副本LEO落后于Leader LEO的時(shí)?不?于 replica.lag.time.max.ms 參數(shù)值(默認(rèn)是10s)
如果Kafka只判斷第?個(gè)條件的話,確定分區(qū)HW值時(shí)就不會(huì)考慮這些未在ISR中的副本,但這些副本已經(jīng)具備了“?刻進(jìn)?ISR”的資格,因此就可能出現(xiàn)分區(qū)HW值越過(guò)ISR中副本LEO的情況——不允許。因?yàn)榉謪^(qū)HW定義就是ISR中所有副本LEO的最?值。
HW和LEO正常更新案例
我們假設(shè)有?個(gè)topic,單分區(qū),副本因子是2,即?個(gè)Leader副本和?個(gè)Follower副本。我們看下當(dāng)producer發(fā)送一條消息時(shí),broker端的副本到底會(huì)發(fā)生什么事情以及分區(qū)HW是如何被更新的。
初始狀態(tài)
初始時(shí)Leader和Follower的HW和LEO都是0(嚴(yán)格來(lái)說(shuō)源代碼會(huì)初始化LEO為-1,不過(guò)這不影響之后的討論)。
Leader中的Remote LEO指的就是Leader端保存的Follower LEO,也被初始化成0。此時(shí),生產(chǎn)者沒(méi)有發(fā)送任何消息給Leader,而Follower已經(jīng)開(kāi)始不斷地給Leader發(fā)送FETCH請(qǐng)求了,但因?yàn)闆](méi)有數(shù)據(jù)因此什么都不會(huì)發(fā)生。
值得?提的是,F(xiàn)ollower發(fā)送過(guò)來(lái)的FETCH請(qǐng)求因?yàn)闊o(wú)數(shù)據(jù)而暫時(shí)會(huì)被寄存到Leader端的purgatory中,待500ms( replica.fetch.wait.max.ms 參數(shù))超時(shí)后會(huì)強(qiáng)制完成。倘若在寄存期間生產(chǎn)者發(fā)來(lái)數(shù)據(jù),則Kafka會(huì)自動(dòng)喚醒該FETCH請(qǐng)求,讓Leader繼續(xù)處理。

Follower發(fā)送FETCH請(qǐng)求在Leader處理完P(guān)RODUCE請(qǐng)求之后
producer給該topic分區(qū)發(fā)送了?條消息,此時(shí)的狀態(tài)如下圖所示:

如上圖所示,Leader接收到PRODUCE請(qǐng)求主要做兩件事情:
- 把消息寫(xiě)入Log,同時(shí)自動(dòng)更新Leader自己的LEO。
-
嘗試更新Leader HW值。假設(shè)此時(shí)Follower尚未發(fā)送FETCH請(qǐng)求,Leader端保存的Remote LEO依然是0,因此Leader會(huì)比較它自己的LEO值和Remote LEO值,發(fā)現(xiàn)最小值是0,與當(dāng)前HW值相同,故不會(huì)更新分區(qū)HW值(仍為0)。
PRODUCE請(qǐng)求處理完成后各值如下,Leader端的HW值依然是0,而LEO是1,Remote LEO也是0。
image.png
假設(shè)此時(shí)follower發(fā)送了FETCH請(qǐng)求,則狀態(tài)變更如下:
image.png
本例中當(dāng)follower發(fā)送FETCH請(qǐng)求時(shí),Leader端的處理依次是:
1)讀取Log數(shù)據(jù)。
2) 更新remote LEO = 0(為什么是0? 因?yàn)榇藭r(shí)Follower還沒(méi)有寫(xiě)?這條消息。Leader如何確認(rèn)Follower還未寫(xiě)?呢?這是通過(guò)Follower發(fā)來(lái)的FETCH請(qǐng)求中的Fetch offset來(lái)確定的)。
3)嘗試更新分區(qū)HW:此時(shí)Leader LEO = 1,Remote LEO = 0,故分區(qū)HW值= min(Leader LEO, Follower Remote LEO) = 0。
4)把數(shù)據(jù)和當(dāng)前分區(qū)HW值(依然是0)發(fā)送給Follower副本。
而Follower副本接收到FETCH Response后依次執(zhí)?下列操作:
1)寫(xiě)?本地Log,同時(shí)更新Follower??管理的 LEO為1。
2)更新Follower HW:比較本地LEO和 FETCH Response 中的當(dāng)前Leader HW值,取較?者,F(xiàn)ollower HW = 0。
此時(shí),第?輪FETCH RPC結(jié)束,我們會(huì)發(fā)現(xiàn)雖然Leader和Follower都已經(jīng)在Log中保存了這條消息,但分區(qū)HW值尚未被更新,仍為0。

Follower第?輪FETCH
分區(qū)HW是在第?輪FETCH RPC中被更新的,如下圖所示:

Follower發(fā)來(lái)了第?輪FETCH請(qǐng)求,Leader端接收到后仍然會(huì)依次執(zhí)?下列操作:
- 讀取Log數(shù)據(jù)
- 更新Remote LEO = 1(這次為什么是1了? 因?yàn)檫@輪FETCH RPC攜帶的fetch offset是1,那么為什么這輪攜帶的就是1了呢,因?yàn)樯?輪結(jié)束后Follower LEO被更新為1了)
- 嘗試更新分區(qū)HW:此時(shí)leader LEO = 1,Remote LEO = 1,故分區(qū)HW值= min(Leader LEO, Follower Remote LEO) = 1。
- 把數(shù)據(jù)(實(shí)際上沒(méi)有數(shù)據(jù))和當(dāng)前分區(qū)HW值(已更新為1)發(fā)送給Follower副本作為Response
同樣地,F(xiàn)ollower副本接收到FETCH response后依次執(zhí)行下列操作: - 寫(xiě)?本地Log,當(dāng)然沒(méi)東?可寫(xiě),F(xiàn)ollower LEO也不會(huì)變化,依然是1。
-
更新Follower HW:比較本地LEO和當(dāng)前Leader LEO取小者。由于都是1,故更新follower HW = 1 。
image.png
此時(shí)消息已經(jīng)成功地被復(fù)制到Leader和Follower的Log中且分區(qū)HW是1,表明消費(fèi)者能夠消費(fèi)offset = 0的消息。
FETCH請(qǐng)求保存在purgatory中,PRODUCE請(qǐng)求到來(lái)。
當(dāng)Leader無(wú)法立即滿足FECTH返回要求的時(shí)候(比如沒(méi)有數(shù)據(jù)),那么該FETCH請(qǐng)求被暫存到Leader端的purgatory
中(煉獄),待時(shí)機(jī)成熟嘗試再次處理。Kafka不會(huì)?限期緩存,默認(rèn)有個(gè)超時(shí)時(shí)間(500ms),?旦超時(shí)時(shí)間已過(guò),
則這個(gè)請(qǐng)求會(huì)被強(qiáng)制完成。當(dāng)寄存期間還沒(méi)超時(shí),?產(chǎn)者發(fā)送PRODUCE請(qǐng)求從而使之滿足了條件以致被喚醒。此時(shí),
Leader端處理流程如下:
- Leader寫(xiě)Log(?動(dòng)更新Leader LEO)
- 嘗試喚醒在purgatory中寄存的FETCH請(qǐng)求
- 嘗試更新分區(qū)HW
HW和LEO異常案例
Kafka使?HW值來(lái)決定副本備份的進(jìn)度,而HW值的更新通常需要額外?輪FETCH RPC才能完成。但這種設(shè)計(jì)是有問(wèn)題的,可能引起的問(wèn)題包括:
- 備份數(shù)據(jù)丟失
- 備份數(shù)據(jù)不?致
數(shù)據(jù)丟失
使?HW值來(lái)確定備份進(jìn)度時(shí)其值的更新是在下?輪RPC中完成的。如果Follower副本在標(biāo)記上方的第?步與第二步之間發(fā)生崩潰,那么就有可能造成數(shù)據(jù)的丟失。

上圖中有兩個(gè)副本:A和B。開(kāi)始狀態(tài)是A是Leader。
假設(shè)?產(chǎn)者 min.insync.replicas 為1,那么當(dāng)?產(chǎn)者發(fā)送兩條消息給A后,A寫(xiě)?Log,此時(shí)Kafka會(huì)通知生產(chǎn)者這兩條消息寫(xiě)入成功。

但是在broker端,Leader和Follower的Log雖都寫(xiě)?了2條消息且分區(qū)HW已經(jīng)被更新到2,但Follower HW尚未被更新還是1,也就是上?標(biāo)記的第二步尚未執(zhí)行,表中最后?條未執(zhí)行。
倘若此時(shí)副本B所在的broker宕機(jī),那么重啟后B會(huì)?動(dòng)把LEO調(diào)整到之前的HW值1,故副本B會(huì)做?志截?cái)?log truncation),將offset = 1的那條消息從log中刪除,并調(diào)整LEO = 1。此時(shí)follower副本底層log中就只有?條消息,即offset = 0的消息!
B重啟之后需要給A發(fā)FETCH請(qǐng)求,但若A所在broker機(jī)器在此時(shí)宕機(jī),那么Kafka會(huì)令B成為新的Leader,而當(dāng)A重啟回來(lái)后也會(huì)執(zhí)行日志截?cái)啵瑢W調(diào)整回1。這樣,offset=1的消息就從兩個(gè)副本的log中被刪除,也就是說(shuō)這條已經(jīng)被生產(chǎn)者認(rèn)為發(fā)送成功的數(shù)據(jù)丟失。
丟失數(shù)據(jù)的前提是 min.insync.replicas=1 時(shí),?旦消息被寫(xiě)?Leader端Log即被認(rèn)為是 committed 。延遲?輪 FETCH RPC 更新HW值的設(shè)計(jì)使follower HW值是異步延遲更新,若在這個(gè)過(guò)程中Leader發(fā)生變更,那么成為新Leader的Follower的HW值就有可能是過(guò)期的,導(dǎo)致生產(chǎn)者本是成功提交的消息被刪除。
Leader和Follower數(shù)據(jù)離散
除了可能造成的數(shù)據(jù)丟失以外,該設(shè)計(jì)還會(huì)造成Leader的Log和Follower的Log數(shù)據(jù)不?致。
如Leader端記錄序列:m1,m2,m3,m4,m5,…;Follower端序列可能是m1,m3,m4,m5,…。
看圖:

假設(shè):A是Leader,A的Log寫(xiě)入了2條消息,但B的Log只寫(xiě)了1條消息。分區(qū)HW更新到2,但B的HW還是1,同時(shí)生產(chǎn)者 min.insync.replicas 仍然為1。
假設(shè)A和B所在Broker同時(shí)宕機(jī),B先重啟回來(lái),因此B成為L(zhǎng)eader,分區(qū)HW = 1。假設(shè)此時(shí)生產(chǎn)者發(fā)送了第3條消息(紅色表示)給B,于是B的log中offset = 1的消息變成了紅框表示的消息,同時(shí)分區(qū)HW更新到2(A還沒(méi)有回來(lái),就B?個(gè)副本,故可以直接更新HW而不用理會(huì)A)之后A重啟回來(lái),需要執(zhí)行日志截?cái)啵l(fā)現(xiàn)此時(shí)分區(qū)HW=2而A之前的HW值也是2,故不做任何調(diào)整。此后A和B將以這種狀態(tài)繼續(xù)正常工作。
顯然,這種場(chǎng)景下,A和B的Log中保存在offset = 1的消息是不同的記錄,從而引發(fā)不?致的情形出現(xiàn)。
Leader Epoch使?
Kafka解決?案
造成上述兩個(gè)問(wèn)題的根本原因在于
- HW值被?于衡量副本備份的成功與否。
- 在出現(xiàn)失敗重啟時(shí)作為?志截?cái)嗟囊罁?jù)。
但HW值的更新是異步延遲的,特別是需要額外的FETCH請(qǐng)求處理流程才能更新,故這中間發(fā)?的任何崩潰都可能導(dǎo)致HW值的過(guò)期。
Kafka從0.11引?了 leader epoch 來(lái)取代HW值。Leader端使?內(nèi)存保存Leader的epoch信息,即使出現(xiàn)上?的兩個(gè)場(chǎng)景也能規(guī)避這些問(wèn)題。
所謂Leader epoch實(shí)際上是?對(duì)值:<epoch, offset>: - epoch表示Leader的版本號(hào),從0開(kāi)始,Leader變更過(guò)1次,epoch+1
- offset對(duì)應(yīng)于該epoch版本的Leader寫(xiě)?第?條消息的offset。因此假設(shè)有兩對(duì)值:
<0, 0>
<1, 120>
則表示第?個(gè)Leader從位移0開(kāi)始寫(xiě)?消息;共寫(xiě)了120條[0, 119];而第?個(gè)Leader版本號(hào)是1,從位移120處開(kāi)始寫(xiě)入消息。
- Leader broker中會(huì)保存這樣的?個(gè)緩存,并定期地寫(xiě)入到一個(gè) checkpoint 文件中。
- 當(dāng)Leader寫(xiě)Log時(shí)它會(huì)嘗試更新整個(gè)緩存:如果這個(gè)Leader首次寫(xiě)消息,則會(huì)在緩存中增加?個(gè)條目;否則就不做更新。
- 每次副本變?yōu)長(zhǎng)eader時(shí)會(huì)查詢這部分緩存,獲取出對(duì)應(yīng)Leader版本的位移,則不會(huì)發(fā)生數(shù)據(jù)不?致和丟失的情況。
規(guī)避數(shù)據(jù)丟失

只需要知道每個(gè)副本都引入了新的狀態(tài)來(lái)保存自己當(dāng)leader時(shí)開(kāi)始寫(xiě)?的第?條消息的offset以及l(fā)eader版本。這樣在恢復(fù)的時(shí)候完全使?這些信息而非HW來(lái)判斷是否需要截?cái)?志。
規(guī)避數(shù)據(jù)不一致

依靠Leader epoch的信息可以有效地規(guī)避數(shù)據(jù)不?致的問(wèn)題。


