1.復(fù)制
kafka使用多副本來確保數(shù)據(jù)的持久性,是典型的主備復(fù)制模型。
每個(gè)topic分區(qū)包含一個(gè)leader副本和若干個(gè)follower副本,leader副本對(duì)外提供讀寫服務(wù),follower副本只同步數(shù)據(jù),不對(duì)外提供服務(wù)。

1.1 日志+Pull復(fù)制模型
leader維護(hù)本地最新日志偏移leo,和已提交日志偏移hw(所有副本均寫入成功,則認(rèn)為可提交)。
follwer基于pull方式從leader同步數(shù)據(jù),數(shù)據(jù)寫入本地日志文件(內(nèi)存pagecache)后,向leader確認(rèn)offset。
RPC:ReplicaFetchReqeust,設(shè)置offset字段指示待消費(fèi)的日志偏移,也是對(duì)offset之前消息的確認(rèn)。
這種簡(jiǎn)單的日志+Pull復(fù)制模型,在編程實(shí)現(xiàn)上非常簡(jiǎn)單,并且,
1)當(dāng)某個(gè)follower發(fā)生故障,或者網(wǎng)絡(luò)中斷后,可以丟掉不完整的日志,從指定offset重新開始同步。
2)leader和follower副本上的消息順序是完全一致的。
1.2 Isr和慢副本檢測(cè)
在kafka中,每個(gè)分區(qū)的多個(gè)副本組成了一個(gè)Isr(同步副本)隊(duì)列。當(dāng)某個(gè)follower的復(fù)制速度落后于leader太多的時(shí)候,leader從Isr隊(duì)列中刪除這個(gè)follower。這個(gè)follower可以繼續(xù)復(fù)制,直到追趕上leader的日志進(jìn)度后,再把它重新加入Isr隊(duì)列。
leader通過比較hw和follower的offset,當(dāng)兩者的差距超過replica.lag.time.max.ms的時(shí)候,就判定為慢副本,從Isr隊(duì)列中刪除該副本。
如果當(dāng)前分區(qū)的leader掛了,隊(duì)列里的每個(gè)副本都有機(jī)會(huì)成為新的leader(由controller來選擇新的leader,下面會(huì)講到)。
1.3 leader切換
當(dāng)發(fā)生leader切換時(shí),controller從Isr中選擇一個(gè)副本作為新的leader,
1)新的leader:設(shè)置hw=leo。
2)follower:從hw截?cái)嗳罩疚募㈤_始從新的leader同步。
從以上過程可以看出,已commit消息保證不會(huì)丟失,未commit的消息可能會(huì)丟失。
例如,Isr中有副本a和b, hw是6, a的leo是10,b的leo是9,假如b被選擇為新的leader,則新的hw=9,則消息10就會(huì)丟失。
1.4 持久性
kafka可以提供的保證是commit消息不會(huì)丟失,未commit消息對(duì)consumer不可見。
持久性保證的前提是topic分區(qū)Isr隊(duì)列至少有一個(gè)副本是可用的,
1)如果Isr副本全部宕掉,這時(shí)有一個(gè)不在Isr中的副本,如果它被選為leader,可能會(huì)導(dǎo)致提交的消息丟失。應(yīng)用如果追求高可用性,必須容忍部分commit數(shù)據(jù)丟失的風(fēng)險(xiǎn);如果追求持久性,則可以設(shè)置unclean.leader.election.enable=false,禁止未同步副本當(dāng)選leader。
2)設(shè)置min.insync.replicas,來避免因同步副本數(shù)較少,而帶來的數(shù)據(jù)丟失風(fēng)險(xiǎn)。
producer可以通過指定request.required.acks=N(總副本數(shù)),來和leader約定持久性級(jí)別。
0:producer不等待來自borker的ack,繼續(xù)發(fā)送下一條(批)消息。消息是否寫入成功沒有保證,甚至broker可能都沒收到消息。
1:producer等待leader寫入本地日志完成的ack,再繼續(xù)發(fā)送下一條(批)消息。假如在消息commit之前l(fā)eader宕機(jī),則消息可能丟失。
-1:producer等待當(dāng)前Isr中的所有副本寫入成功的ack,再繼續(xù)發(fā)送下一條(批)消息。除非Isr終端所有副本都出現(xiàn)數(shù)據(jù)丟失的故障,否則數(shù)據(jù)的持久性是有保證。
這里有一個(gè)隱藏問題:如果在寫消息的過程中,leader宕機(jī),或者發(fā)給producer的ack消息丟失,producer沒有辦法判斷消息是否已經(jīng)寫入成功。producer只能選擇重試,這就導(dǎo)致消息可能會(huì)被重復(fù)寫入,消息亂序。針對(duì)這個(gè)問題,kafka引入了冪等消息發(fā)送機(jī)制,確保在producer的一次會(huì)話期間,多次重試不會(huì)導(dǎo)致消息重復(fù)和亂序。
2.集群管理
kafka使用一個(gè)全局的controller管理集群分區(qū)和副本的狀態(tài),選擇分區(qū)的leader副本,創(chuàng)建刪除主題,執(zhí)行分區(qū)遷移等管理操作。
集群中的每個(gè)broker都可以充當(dāng)controller,通過zk來選舉controller。當(dāng)contoller所在的borker故障時(shí),會(huì)有新的broker充當(dāng)controller的角色。
controller在zk保存集群的分區(qū)和副本狀態(tài),當(dāng)發(fā)生故障恢復(fù)時(shí),controller可以從zk加載數(shù)據(jù)。
controller通過RPC:LeaderAndIsr 、UpdateMetadata 控制分區(qū)中所有副本所在的broker更新分區(qū)和副本狀態(tài)。直接的RPC交互降低了通信成本,也降低了zk的訪問壓力(想象一下每個(gè)broker都通過監(jiān)控zk,來更新各個(gè)分區(qū)和副本的狀態(tài)(P*N),會(huì)是一個(gè)什么情形),使得集群可以容納更多的的分區(qū)和副本。

當(dāng)leader副本所在的broker宕機(jī),或者發(fā)生分區(qū)自動(dòng)再平衡,或者執(zhí)行分區(qū)遷移命令時(shí),controller在分區(qū)isr中重新選擇一個(gè)副本作為分區(qū)leader(leader選擇算法,例如OfflinePartitionLeaderSelector,PreferredReplicaPartitionLeaderSelector)。

【參考】:
[0]https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Replication
[1]https://cwiki.apache.org/confluence/display/KAFKA/Kafka+replication+detailed+design+V2#KafkareplicationdetaileddesignV2-Messagereplication
[2]https://cwiki.apache.org/confluence/display/KAFKA/kafka+Detailed+Replication+Design+V3
[3]https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Controller+Internals
[4]https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Controller+Redesign