Zookeeper-leader初始化

Zookeeper-leader初始化

在選舉完成后,集群每個節(jié)點的角色狀態(tài)就會確定,回到QuorumPeer#start中,每個節(jié)點會根據(jù)自身的狀態(tài)完成相應的處理,如下:

  • LEADING:創(chuàng)建Leader,等待follower 跟隨自身
  • FOLLOWING:創(chuàng)建follower,跟隨leader并從leader同步數(shù)據(jù)
  • OBSERVING:創(chuàng)建OBSERVING,跟隨leader節(jié)點

本節(jié)我們著重分析leader節(jié)點相關(guān)的邏輯。

Leader 相關(guān)的類

  • Leader:leader的控制邏輯類
  • LeaderZookeeperServer:leader 處理請求的類

Leader節(jié)點初始化,主要是根據(jù)節(jié)點配置初始化上面兩個類,在leader 初始化完成之后會新建用于leader-follower之間通信的連接(接受來自follower的連接)。

Leader 控制邏輯

當leader初始化完成之后通過leader#lead實現(xiàn)leader節(jié)點的功能,方法出現(xiàn)異常時,將重置當前節(jié)點的狀態(tài)(LOOKING),進入下一輪選舉,下面看下lead的執(zhí)行流程。

1、載入數(shù)據(jù)

首先從zkDb中恢復數(shù)據(jù)(在啟動階段已經(jīng)載入數(shù)據(jù),理論上這里不會再次載入),解析出zxid

  1. zkDb中恢復數(shù)據(jù)(啟動階段已經(jīng)讀入數(shù)據(jù),理論上這里不會再次讀入)
  2. 解析zxid
  3. clean up dead session
  4. 做 snapshot

2、啟動 LearnerCnxAcceptor

LearnerCnxAcceptor 主要作用是接受來自follower的請求,并為每一個follower 連接新建一個LearnerHandler 用于處理、同步follower,可參考6集群數(shù)據(jù)同步。

3、leader 獲取選舉周期

這一步稍微有點繞,通過Leader#getEpochToPropos實現(xiàn),步驟如下:

  1. leader 會調(diào)用 Leader#getEpochToPropose 將自身加入到connectingFollowers 這個列表,在沒有收到大多數(shù)的follower連接上之前,它進入等待狀態(tài)
  2. 當一個follower連上leader之后,通過LearnerHandler 中調(diào)用Leader#getEpochToPropose將follower加入connectingFollowers 列表,并檢查是否大多數(shù)follower已經(jīng)連上leader,如大多數(shù)已經(jīng)連上,則喚醒所有在這個節(jié)點等待的follower 和leader,否則它自己也在這里進入等待狀態(tài)
  3. 等待的節(jié)點會定時去查看大多數(shù)follower都連接到leader

實現(xiàn)核心代碼如下,可以看出這個通過wait、notify 實現(xiàn)的等待喚醒策略:

long start = Time.currentElapsedTime();
long cur = start;
long end = start + self.getInitLimit()*self.getTickTime();
while(waitingForNewEpoch && cur < end) {
    connectingFollowers.wait(end - cur);
    cur = Time.currentElapsedTime();
}

等待大部分follower都連接到leader之后,根據(jù)每個follower發(fā)送epoch 計算出一個新的epoch(選出最大的一個epoch并自增)。leader 會根據(jù)這個選舉周期初始化這一輪zxid。

4、發(fā)送 LEADERINFO 數(shù)據(jù)包給Follower

當大部分的follower連接上leader后,阻塞等待的follower會被喚醒,接下來leader會發(fā)送 LEADERINFO(包括newZxid、version等信息)數(shù)據(jù)包給follower,然后等待大部分 follower 回復 ack 消息。等待策略和上面差不多,通過Leader#waitForEpochAck 實現(xiàn)。

注:如果follower 的版本小于 0x10000 那么就不會發(fā)送LEADERINFO 消息給follower,直接阻塞等待follower 回復的ack消息。在ack消息中也包含follower最新的zxid?,F(xiàn)在follower除此回復的消息都是0x10000

這一步主要是將上一步選出來的epoch 廣播給集群中的其他節(jié)點,其他節(jié)點后續(xù)會根據(jù)這個epoch判斷收到的數(shù)據(jù)包是不是新的leader發(fā)送的,避免舊的leader復活之后廣播之前的提議而造成狀態(tài)不一致的問題。

5、Leader通過LearnerHandler異步同步數(shù)據(jù)給Follower

leader 在收到大多數(shù)follower回復ack消息之后,開始和follower 同步數(shù)據(jù),在集群對外服務之前確保各個節(jié)點中的數(shù)據(jù)一致。通過LearnerHandler#syncFollower數(shù)據(jù)同步,同步方式有四種全量同步(snap)、差異化(diff)、回滾(trunc)、差異化+回滾(diff+trunc) ,具體采用哪種方式還要看follower 和 leader 之間的數(shù)據(jù)差異情況。

peerLastZxid:該Learner最后處理的ZXID
minCommittedLog:LeadercommittedLog中的最小ZXID
maxCommittedLog:LeadercommittedLog中的最大ZXID
lastProcessedZxid:leader中最后處理的ZXID
  • 強制發(fā)送snapshot(測試目的)

  • follower 和 leader 已經(jīng)同步,則發(fā)送空的 diff 消息

    if (lastProcessedZxid == peerLastZxid) {       
     // 如果 follower 的peerLastZxid 和 leader lastProcessedZxid相等,說明兩個節(jié)點的數(shù)據(jù)一致,不需要進行同步,這時只需要給follower 發(fā)送一個diff的包
     queueOpPacket(Leader.DIFF, peerLastZxid);
     needOpPacket = false;
     needSnap = false;
    } 
    
  • follower 的 txn 比leader 要多,那么則發(fā)送 TRUNC ,回滾follower多余的 txn數(shù)據(jù)。

    if (peerLastZxid > maxCommittedLog && !isPeerNewEpochZxid) {
             
     // follower的數(shù)據(jù)比leader 超前,則回滾follower的數(shù)據(jù)到leader#maxCommittedLog
     queueOpPacket(Leader.TRUNC, maxCommittedLog);
     currentZxid = maxCommittedLog;
     needOpPacket = false;
     needSnap = false;
    }
    
  • follower 在committedLog同步的范圍內(nèi),那么根據(jù)follower 的zxid來決定發(fā)送 TRUNC 還是DIFF,如果follower 在同步中的話就發(fā)送空的DIFF

    if ((maxCommittedLog >= peerLastZxid) && (minCommittedLog <= peerLastZxid)) {
     // follower 數(shù)據(jù)在處于leader 的 最大最小事務之間,則增量同步 
     Iterator<Proposal> itr = db.getCommittedLog().iterator();
     
     // 這里有兩種情形:
     // 1、發(fā)送 DIFF 給 follower : 對應 follower zxid 在leader的history中,這時只需要將follower中缺失的數(shù)據(jù)(propose+commit)發(fā)過去,然后讓follower重做這些提案即可
     // 2、發(fā)送 TRUN + DIFF 給 follower : 對應 follower zxid 滿足上述條件,但是follower中存在leader中不存在的數(shù)據(jù),那么會先發(fā)送回滾消息,再增量同步。這種場景在leader 收到propose消息之后還沒來得及廣播給其他follower就掛掉,新選出的leader沒有這個消息,所以需要將此消息回滾
     currentZxid = queueCommittedProposals(itr, peerLastZxid,
                                          null, maxCommittedLog);
     needSnap = false;
    }
    
    
  • follower 和leader之間的數(shù)據(jù)差異巨大,follower的最大事務id小于leader 的minCommittedLog。會將leader 磁盤上的txnLog和committedLog同步給follower,如果失敗了,會發(fā)送snapshot

    txnLogSyncEnabled :是否開啟事務日志同步
    if (peerLastZxid < minCommittedLog && txnLogSyncEnabled) {
     // follower 最大的zxid 小于 leader 最小 minCommittedLog,并且允許從txnLog中同步數(shù)據(jù)
    
     // 計算事務日志允許同步的大小
     long sizeLimit = db.calculateTxnLogSizeLimit();
     
     // 如果follower的zxid 在 leader的事務日志中,那么只需要同步事務日志中差異的部分,不需要同步整個snapshot,否則就需要同步snapshot
     Iterator<Proposal> txnLogItr = db.getProposalsFromTxnLog(
             peerLastZxid, sizeLimit);
     if (txnLogItr.hasNext()) {
         LOG.info("Use txnlog and committedLog for peer sid: " +  getSid());
         currentZxid = queueCommittedProposals(txnLogItr, peerLastZxid,
                                              minCommittedLog, maxCommittedLog);
    
         Iterator<Proposal> committedLogItr = db.getCommittedLog().iterator();
         currentZxid = queueCommittedProposals(committedLogItr, currentZxid,
                                              null, maxCommittedLog);
         needSnap = false;
     }
     // closing the resources
     if (txnLogItr instanceof TxnLogProposalIterator) {
         TxnLogProposalIterator txnProposalItr = (TxnLogProposalIterator) txnLogItr;
         txnProposalItr.close();
     }
    }
    

    注:如果需要同步整個鏡像,同步過程就不會通過發(fā)送每個propose+commit數(shù)據(jù)給follower,而是leader將它自身的snapshot文件(SNAP)通過網(wǎng)絡直接發(fā)給follower,follower收到后直接將本地數(shù)據(jù)覆蓋即可。

經(jīng)過上面的步驟,leader 和follower的差異數(shù)據(jù)同步完成之后,再次檢查leader 和follower 在同步數(shù)據(jù)期間是否有其他的差異數(shù)據(jù),所以最后一步就是同步這部分差異數(shù)據(jù)。

// 對這個方法的理解是,follower在故障恢復時,如果和leader的數(shù)據(jù)同步完成,還要再次檢查同步數(shù)據(jù)這段時間是不是leader有了新的提交,需要再次同步
leaderLastZxid = leader.startForwarding(this, currentZxid);

在leader 和follower數(shù)據(jù)同步完成之后,leader 通過LearnerHandler發(fā)送 Leader.NEWLEADER 數(shù)據(jù)包給所有follower,然后leader通過 leader#waitForNewLeaderAck 阻塞等待大多數(shù)follower的ack數(shù)據(jù)。

6、啟動LeaderZooKeeperServer

在大多數(shù)的follower都回復了newLeaderAck 數(shù)據(jù)包之后,這是的leader就成為真正的leader了,它啟動LeaderZooKeeperServer ,履行l(wèi)eader 的職責,如下:

  1. 創(chuàng)建session追蹤器(SessionTrackerImpl)
  2. 啟動SessionTracker(它的主要作用是定期清理過期的session)
  3. 設(shè)置請求處理器(構(gòu)造Leader的請求處理鏈)
  4. 注冊JMX,然后將當前的服務的狀態(tài)設(shè)置為RUNNING(運行)

所有的LearnerHandler 會阻塞等待LeaderZooKeeperServer 啟動完成,之后它開始給所有follower發(fā)送Leader.UPTODATE 數(shù)據(jù)包,之后便處理來自follower數(shù)據(jù)。它可以收到如下四類消息:

  • Leader.ACK:follower在同步 proposal 之后發(fā)給leader的消息;leader收到此消息后直接通過Leader#processAck 處理這條消息。

    1、校驗消息

    if (outstandingProposals.size() == 0) {
        return;
    }
    if (lastCommitted >= zxid) {
        // The proposal has already been committed
        return;
    }
    Proposal p = outstandingProposals.get(zxid);
    if (p == null) {
        return;
    }
    

    2、將收到的消息加入到ack桶中

    3、嘗試提交消息(通過Leader#tryToCommit,如果沒有收到大多數(shù)follower的回復時,提交失?。?/p>

    注:通過tryToCommit可知,在在提交消息時通過在outstandingProposals 中找當前消息的前一條消息,如果找到,說明上一條消息還沒提交,則先不提交當前消息。否則提交當前消息,并且通知所有的Observer,最后通過將當前的請求轉(zhuǎn)發(fā)給CommitProcessor 繼續(xù)處理

  • Leader.PING:leader 和 follower之間的心跳數(shù)據(jù)。直接通過sessionTracker 更新當前session及超時時間

  • Leader.REVALIDATE:session校驗是否有效

  • Leader.REQUEST:follower將客戶端讀以外的操作轉(zhuǎn)發(fā)給leader,這一步Leader會通過LeaderZooKeeperServer#submitLearnerRequest將請求首先轉(zhuǎn)發(fā)給PrepRequestProcessor。

7、和Follower建立心跳

在Leader 啟動完成之后,leader變周期性的發(fā)送心跳數(shù)據(jù)(LearnerHandler#ping)給follower。如果出現(xiàn)follower 掉線或宕機就會將此follower從 learners 中移出,接下來就不會發(fā)送propose或者心跳數(shù)據(jù)給此follower。

當leader上沒有大多數(shù)的follower時,會將自己關(guān)閉,狀態(tài)重置為LOOKING,進行下一輪的選舉。

1、超時時間判斷

public synchronized boolean check(long time) {
    if (currentTime == 0) {
        return true;
    } else {
        long msDelay = (time - currentTime) / 1000000;
        return (msDelay < (leader.self.tickTime * leader.self.syncLimit));
    }
}
leader.self.tickTime * leader.self.syncLimit:超時時間

至此,Leader的啟動流程就已經(jīng)完成了。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

  • 什么是Zab協(xié)議 Zab 協(xié)議的作用 Zab 協(xié)議原理 Zab 協(xié)議核心 Zab 協(xié)議內(nèi)容 原子廣播 崩潰恢復 如...
    廟人閱讀 614評論 0 1
  • 聲明:本文寫的時候,當時就是完全不懂zk,邊看網(wǎng)上的文章邊學習歸納和整理,這不是我的產(chǎn)出,不用點贊打賞。大家理智友...
    _Zy閱讀 76,626評論 38 129
  • Apache Zookeeper是由Apache Hadoop的子項目發(fā)展而來,于2010年11月正式成為Apac...
    壹點零閱讀 571評論 0 0
  • Apache Zookeeper是由Apache Hadoop的子項目發(fā)展而來,于2010年11月正式成為Apac...
    olostin閱讀 6,238評論 2 9
  • Zookeeper依賴ZAB協(xié)議來實現(xiàn)分布式數(shù)據(jù)的一致性,其中ZAB協(xié)議包括原子廣播和崩潰恢復兩個階段。 基于該協(xié)...
    liuzx32閱讀 612評論 0 0

友情鏈接更多精彩內(nèi)容