Zookeeper-leader初始化
在選舉完成后,集群每個節(jié)點的角色狀態(tài)就會確定,回到QuorumPeer#start中,每個節(jié)點會根據(jù)自身的狀態(tài)完成相應的處理,如下:
- LEADING:創(chuàng)建Leader,等待follower 跟隨自身
- FOLLOWING:創(chuàng)建follower,跟隨leader并從leader同步數(shù)據(jù)
- OBSERVING:創(chuàng)建OBSERVING,跟隨leader節(jié)點
本節(jié)我們著重分析leader節(jié)點相關(guān)的邏輯。
Leader 相關(guān)的類
- Leader:leader的控制邏輯類
- LeaderZookeeperServer:leader 處理請求的類
Leader節(jié)點初始化,主要是根據(jù)節(jié)點配置初始化上面兩個類,在leader 初始化完成之后會新建用于leader-follower之間通信的連接(接受來自follower的連接)。
Leader 控制邏輯
當leader初始化完成之后通過leader#lead實現(xiàn)leader節(jié)點的功能,方法出現(xiàn)異常時,將重置當前節(jié)點的狀態(tài)(LOOKING),進入下一輪選舉,下面看下lead的執(zhí)行流程。
1、載入數(shù)據(jù)
首先從zkDb中恢復數(shù)據(jù)(在啟動階段已經(jīng)載入數(shù)據(jù),理論上這里不會再次載入),解析出zxid
- zkDb中恢復數(shù)據(jù)(啟動階段已經(jīng)讀入數(shù)據(jù),理論上這里不會再次讀入)
- 解析zxid
- clean up dead session
- 做 snapshot
2、啟動 LearnerCnxAcceptor
LearnerCnxAcceptor 主要作用是接受來自follower的請求,并為每一個follower 連接新建一個LearnerHandler 用于處理、同步follower,可參考6集群數(shù)據(jù)同步。
3、leader 獲取選舉周期
這一步稍微有點繞,通過Leader#getEpochToPropos實現(xiàn),步驟如下:
- leader 會調(diào)用 Leader#getEpochToPropose 將自身加入到connectingFollowers 這個列表,在沒有收到大多數(shù)的follower連接上之前,它進入等待狀態(tài)
- 當一個follower連上leader之后,通過LearnerHandler 中調(diào)用Leader#getEpochToPropose將follower加入connectingFollowers 列表,并檢查是否大多數(shù)follower已經(jīng)連上leader,如大多數(shù)已經(jīng)連上,則喚醒所有在這個節(jié)點等待的follower 和leader,否則它自己也在這里進入等待狀態(tài)
- 等待的節(jié)點會定時去查看大多數(shù)follower都連接到leader
實現(xiàn)核心代碼如下,可以看出這個通過wait、notify 實現(xiàn)的等待喚醒策略:
long start = Time.currentElapsedTime();
long cur = start;
long end = start + self.getInitLimit()*self.getTickTime();
while(waitingForNewEpoch && cur < end) {
connectingFollowers.wait(end - cur);
cur = Time.currentElapsedTime();
}
等待大部分follower都連接到leader之后,根據(jù)每個follower發(fā)送epoch 計算出一個新的epoch(選出最大的一個epoch并自增)。leader 會根據(jù)這個選舉周期初始化這一輪zxid。
4、發(fā)送 LEADERINFO 數(shù)據(jù)包給Follower
當大部分的follower連接上leader后,阻塞等待的follower會被喚醒,接下來leader會發(fā)送 LEADERINFO(包括newZxid、version等信息)數(shù)據(jù)包給follower,然后等待大部分 follower 回復 ack 消息。等待策略和上面差不多,通過Leader#waitForEpochAck 實現(xiàn)。
注:如果follower 的版本小于 0x10000 那么就不會發(fā)送LEADERINFO 消息給follower,直接阻塞等待follower 回復的ack消息。在ack消息中也包含follower最新的zxid?,F(xiàn)在follower除此回復的消息都是0x10000
這一步主要是將上一步選出來的epoch 廣播給集群中的其他節(jié)點,其他節(jié)點后續(xù)會根據(jù)這個epoch判斷收到的數(shù)據(jù)包是不是新的leader發(fā)送的,避免舊的leader復活之后廣播之前的提議而造成狀態(tài)不一致的問題。
5、Leader通過LearnerHandler異步同步數(shù)據(jù)給Follower
leader 在收到大多數(shù)follower回復ack消息之后,開始和follower 同步數(shù)據(jù),在集群對外服務之前確保各個節(jié)點中的數(shù)據(jù)一致。通過LearnerHandler#syncFollower數(shù)據(jù)同步,同步方式有四種:全量同步(snap)、差異化(diff)、回滾(trunc)、差異化+回滾(diff+trunc) ,具體采用哪種方式還要看follower 和 leader 之間的數(shù)據(jù)差異情況。
peerLastZxid:該Learner最后處理的ZXID
minCommittedLog:LeadercommittedLog中的最小ZXID
maxCommittedLog:LeadercommittedLog中的最大ZXID
lastProcessedZxid:leader中最后處理的ZXID
強制發(fā)送snapshot(測試目的)
-
follower 和 leader 已經(jīng)同步,則發(fā)送空的 diff 消息
if (lastProcessedZxid == peerLastZxid) { // 如果 follower 的peerLastZxid 和 leader lastProcessedZxid相等,說明兩個節(jié)點的數(shù)據(jù)一致,不需要進行同步,這時只需要給follower 發(fā)送一個diff的包 queueOpPacket(Leader.DIFF, peerLastZxid); needOpPacket = false; needSnap = false; } -
follower 的 txn 比leader 要多,那么則發(fā)送 TRUNC ,回滾follower多余的 txn數(shù)據(jù)。
if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) { // follower的數(shù)據(jù)比leader 超前,則回滾follower的數(shù)據(jù)到leader#maxCommittedLog queueOpPacket(Leader.TRUNC, maxCommittedLog); currentZxid = maxCommittedLog; needOpPacket = false; needSnap = false; } -
follower 在committedLog同步的范圍內(nèi),那么根據(jù)follower 的zxid來決定發(fā)送 TRUNC 還是DIFF,如果follower 在同步中的話就發(fā)送空的DIFF
if ((maxCommittedLog >= peerLastZxid) && (minCommittedLog <= peerLastZxid)) { // follower 數(shù)據(jù)在處于leader 的 最大最小事務之間,則增量同步 Iterator<Proposal> itr = db.getCommittedLog().iterator(); // 這里有兩種情形: // 1、發(fā)送 DIFF 給 follower : 對應 follower zxid 在leader的history中,這時只需要將follower中缺失的數(shù)據(jù)(propose+commit)發(fā)過去,然后讓follower重做這些提案即可 // 2、發(fā)送 TRUN + DIFF 給 follower : 對應 follower zxid 滿足上述條件,但是follower中存在leader中不存在的數(shù)據(jù),那么會先發(fā)送回滾消息,再增量同步。這種場景在leader 收到propose消息之后還沒來得及廣播給其他follower就掛掉,新選出的leader沒有這個消息,所以需要將此消息回滾 currentZxid = queueCommittedProposals(itr, peerLastZxid, null, maxCommittedLog); needSnap = false; } -
follower 和leader之間的數(shù)據(jù)差異巨大,follower的最大事務id小于leader 的minCommittedLog。會將leader 磁盤上的txnLog和committedLog同步給follower,如果失敗了,會發(fā)送snapshot
txnLogSyncEnabled :是否開啟事務日志同步 if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) { // follower 最大的zxid 小于 leader 最小 minCommittedLog,并且允許從txnLog中同步數(shù)據(jù) // 計算事務日志允許同步的大小 long sizeLimit = db.calculateTxnLogSizeLimit(); // 如果follower的zxid 在 leader的事務日志中,那么只需要同步事務日志中差異的部分,不需要同步整個snapshot,否則就需要同步snapshot Iterator<Proposal> txnLogItr = db.getProposalsFromTxnLog( peerLastZxid, sizeLimit); if (txnLogItr.hasNext()) { LOG.info("Use txnlog and committedLog for peer sid: " + getSid()); currentZxid = queueCommittedProposals(txnLogItr, peerLastZxid, minCommittedLog, maxCommittedLog); Iterator<Proposal> committedLogItr = db.getCommittedLog().iterator(); currentZxid = queueCommittedProposals(committedLogItr, currentZxid, null, maxCommittedLog); needSnap = false; } // closing the resources if (txnLogItr instanceof TxnLogProposalIterator) { TxnLogProposalIterator txnProposalItr = (TxnLogProposalIterator) txnLogItr; txnProposalItr.close(); } }注:如果需要同步整個鏡像,同步過程就不會通過發(fā)送每個propose+commit數(shù)據(jù)給follower,而是leader將它自身的snapshot文件(SNAP)通過網(wǎng)絡直接發(fā)給follower,follower收到后直接將本地數(shù)據(jù)覆蓋即可。
經(jīng)過上面的步驟,leader 和follower的差異數(shù)據(jù)同步完成之后,再次檢查leader 和follower 在同步數(shù)據(jù)期間是否有其他的差異數(shù)據(jù),所以最后一步就是同步這部分差異數(shù)據(jù)。
// 對這個方法的理解是,follower在故障恢復時,如果和leader的數(shù)據(jù)同步完成,還要再次檢查同步數(shù)據(jù)這段時間是不是leader有了新的提交,需要再次同步
leaderLastZxid = leader.startForwarding(this, currentZxid);
在leader 和follower數(shù)據(jù)同步完成之后,leader 通過LearnerHandler發(fā)送 Leader.NEWLEADER 數(shù)據(jù)包給所有follower,然后leader通過 leader#waitForNewLeaderAck 阻塞等待大多數(shù)follower的ack數(shù)據(jù)。
6、啟動LeaderZooKeeperServer
在大多數(shù)的follower都回復了newLeaderAck 數(shù)據(jù)包之后,這是的leader就成為真正的leader了,它啟動LeaderZooKeeperServer ,履行l(wèi)eader 的職責,如下:
- 創(chuàng)建session追蹤器(SessionTrackerImpl)
- 啟動SessionTracker(它的主要作用是定期清理過期的session)
- 設(shè)置請求處理器(構(gòu)造Leader的請求處理鏈)
- 注冊JMX,然后將當前的服務的狀態(tài)設(shè)置為RUNNING(運行)
所有的LearnerHandler 會阻塞等待LeaderZooKeeperServer 啟動完成,之后它開始給所有follower發(fā)送Leader.UPTODATE 數(shù)據(jù)包,之后便處理來自follower數(shù)據(jù)。它可以收到如下四類消息:
-
Leader.ACK:follower在同步 proposal 之后發(fā)給leader的消息;leader收到此消息后直接通過Leader#processAck 處理這條消息。
1、校驗消息
if (outstandingProposals.size() == 0) { return; } if (lastCommitted >= zxid) { // The proposal has already been committed return; } Proposal p = outstandingProposals.get(zxid); if (p == null) { return; }2、將收到的消息加入到ack桶中
3、嘗試提交消息(通過Leader#tryToCommit,如果沒有收到大多數(shù)follower的回復時,提交失?。?/p>
注:通過tryToCommit可知,在在提交消息時通過在outstandingProposals 中找當前消息的前一條消息,如果找到,說明上一條消息還沒提交,則先不提交當前消息。否則提交當前消息,并且通知所有的Observer,最后通過將當前的請求轉(zhuǎn)發(fā)給CommitProcessor 繼續(xù)處理
Leader.PING:leader 和 follower之間的心跳數(shù)據(jù)。直接通過sessionTracker 更新當前session及超時時間
Leader.REVALIDATE:session校驗是否有效
Leader.REQUEST:follower將客戶端讀以外的操作轉(zhuǎn)發(fā)給leader,這一步Leader會通過LeaderZooKeeperServer#submitLearnerRequest將請求首先轉(zhuǎn)發(fā)給PrepRequestProcessor。
7、和Follower建立心跳
在Leader 啟動完成之后,leader變周期性的發(fā)送心跳數(shù)據(jù)(LearnerHandler#ping)給follower。如果出現(xiàn)follower 掉線或宕機就會將此follower從 learners 中移出,接下來就不會發(fā)送propose或者心跳數(shù)據(jù)給此follower。
當leader上沒有大多數(shù)的follower時,會將自己關(guān)閉,狀態(tài)重置為LOOKING,進行下一輪的選舉。
1、超時時間判斷
public synchronized boolean check(long time) { if (currentTime == 0) { return true; } else { long msDelay = (time - currentTime) / 1000000; return (msDelay < (leader.self.tickTime * leader.self.syncLimit)); } } leader.self.tickTime * leader.self.syncLimit:超時時間
至此,Leader的啟動流程就已經(jīng)完成了。