Zookeeper的leader選舉

Leader選舉是保證分布式數(shù)據(jù)一致性的關(guān)鍵所在。當Zookeeper集群中的一臺服務(wù)器出現(xiàn)以下兩種情況之一時,需要進入Leader選舉。

  • 服務(wù)器初始化啟動。
  • 服務(wù)器運行期間無法和Leader保持連接。

服務(wù)器啟動時期的Leader選舉
若進行Leader選舉,則至少需要兩臺機器,這里選取3臺機器組成的服務(wù)器集群為例。在集群初始化階段,當有一臺服務(wù)器Server1啟動時,其單獨無法進行和完成Leader選舉,當?shù)诙_服務(wù)器Server2啟動時,此時兩臺機器可以相互通信,每臺機器都試圖找到Leader,于是進入Leader選舉過程。選舉過程如下

  1. 每個Server發(fā)出一個投票: 由于是初始情況,Server1和Server2都會將自己作為Leader服務(wù)器來進行投票,每次投票會包含所推舉的服務(wù)器的myid和ZXID,使用(myid, ZXID)來表示,此時Server1的投票為(1, 0),Server2的投票為(2, 0),然后各自將這個投票發(fā)給集群中其他機器。

  2. 接收來自各個服務(wù)器的投票: 集群的每個服務(wù)器收到投票后,首先判斷該投票的有效性,如檢查是否是本輪投票、是否來自LOOKING狀態(tài)的服務(wù)器。

  3. 處理投票 針對每一個投票,服務(wù)器都需要將別人的投票和自己的投票進行PK,PK規(guī)則如下:

    • 優(yōu)先檢查ZXID。ZXID比較大的服務(wù)器優(yōu)先作為Leader。
    • 如果ZXID相同,那么就比較myid。myid較大的服務(wù)器作為Leader服務(wù)器。

對于Server1而言,它的投票是(1, 0),接收Server2的投票為(2, 0),首先會比較兩者的ZXID,均為0,再比較myid,此時Server2的myid最大,于是更新自己的投票為(2, 0),然后重新投票,對于Server2而言,其無須更新自己的投票,只是再次向集群中所有機器發(fā)出上一次投票信息即可。

  1. 統(tǒng)計投票:每次投票后,服務(wù)器都會統(tǒng)計投票信息,判斷是否已經(jīng)有過半機器接受到相同的投票信息,對于Server1、Server2而言,都統(tǒng)計出集群中已經(jīng)有兩臺機器接受了(2, 0)的投票信息,此時便認為已經(jīng)選出了Leader。
  2. 改變服務(wù)器狀態(tài): 一旦確定了Leader,每個服務(wù)器就會更新自己的狀態(tài),如果是Follower,那么就變更為FOLLOWING,如果是Leader,就變更為LEADING。

還有一種情形是服務(wù)器運行時期的Leader選舉

在Zookeeper運行期間,Leader與非Leader服務(wù)器各司其職,即便當有非Leader服務(wù)器宕機或新加入,此時也不會影響Leader,但是一旦Leader服務(wù)器掛了,那么整個集群將暫停對外服務(wù),進入新一輪Leader選舉,其過程和啟動時期的Leader選舉過程基本一致。假設(shè)正在運行的有Server1、Server2、Server3三臺服務(wù)器,當前Leader是Server2,若某一時刻Leader掛了,此時便開始Leader選舉。選舉過程如下

  1. 變更狀態(tài)。Leader掛后,余下的非Observer服務(wù)器都會講自己的服務(wù)器狀態(tài)變更為LOOKING,然后開始進入Leader選舉過程。
  2. 每個Server會發(fā)出一個投票。在運行期間,每個服務(wù)器上的ZXID可能不同,此時假定Server1的ZXID為123,Server3的ZXID為122;在第一輪投票中,Server1和Server3都會投自己,產(chǎn)生投票(1, 123),(3, 122),然后各自將投票發(fā)送給集群中所有機器。
  3. 接收來自各個服務(wù)器的投票。與啟動時過程相同。
  4. 處理投票。與啟動時過程相同,此時,Server1將會成為Leader。
  5. 統(tǒng)計投票。與啟動時過程相同。
  6. 改變服務(wù)器的狀態(tài)。與啟動時過程相同。

Leader選舉算法分析

在3.4.0后的Zookeeper的版本只保留了TCP版本的FastLeaderElection選舉算法。當一臺機器進入Leader選舉時,當前集群可能會處于以下兩種狀態(tài)

  • 集群中已經(jīng)存在Leader。
  • 集群中不存在Leader。

對于集群中已經(jīng)存在Leader而言,此種情況一般都是某臺機器啟動得較晚,在其啟動之前,集群已經(jīng)在正常工作,對這種情況,該機器試圖去選舉Leader時,會被告知當前服務(wù)器的Leader信息,對于該機器而言,僅僅需要和Leader機器建立起連接,并進行狀態(tài)同步即可。而在集群中不存在Leader情況下則會相對復(fù)雜,其步驟如下

  1. 第一次投票。無論哪種導致進行Leader選舉,集群的所有機器都處于試圖選舉出一個Leader的狀態(tài),即LOOKING狀態(tài),LOOKING機器會向所有其他機器發(fā)送消息,該消息稱為投票。投票中包含了SID(服務(wù)器的唯一標識)和ZXID(事務(wù)ID),(SID, ZXID)形式來標識一次投票信息。假定Zookeeper由5臺機器組成,SID分別為1、2、3、4、5,ZXID分別為9、9、9、8、8,并且此時SID為2的機器是Leader機器,某一時刻,1、2所在機器出現(xiàn)故障,因此集群開始進行Leader選舉。在第一次投票時,每臺機器都會將自己作為投票對象,于是SID為3、4、5的機器投票情況分別為(3, 9),(4, 8), (5, 8)。

  2. 變更投票。每臺機器發(fā)出投票后,也會收到其他機器的投票,每臺機器會根據(jù)一定規(guī)則來處理收到的其他機器的投票,并以此來決定是否需要變更自己的投票,這個規(guī)則也是整個Leader選舉算法的核心所在,其中術(shù)語描述如下

vote_sid:接收到的投票中所推舉Leader服務(wù)器的SID。

vote_zxid:接收到的投票中所推舉Leader服務(wù)器的ZXID。

 self_sid:當前服務(wù)器自己的SID。

self_zxid:當前服務(wù)器自己的ZXID。

每次對收到的投票的處理,都是對(vote_sid, vote_zxid)和(self_sid, self_zxid)對比的過程。

 規(guī)則一:如果vote_zxid大于self_zxid,就認可當前收到的投票,并再次將該投票發(fā)送出去。

 規(guī)則二:如果vote_zxid小于self_zxid,那么堅持自己的投票,不做任何變更。

  規(guī)則三:如果vote_zxid等于self_zxid,那么就對比兩者的SID,如果vote_sid大于self_sid,那么就認可當前收到的投票,并再次將該投票發(fā)送出去。

規(guī)則四:如果vote_zxid等于self_zxid,并且vote_sid小于self_sid,那么堅持自己的投票,不做任何變更。

結(jié)合上面規(guī)則,給出下面的集群變更過程。


image.png
  1. 確定Leader。經(jīng)過第二輪投票后,集群中的每臺機器都會再次接收到其他機器的投票,然后開始統(tǒng)計投票,如果一臺機器收到了超過半數(shù)的相同投票,那么這個投票對應(yīng)的SID機器即為Leader。此時Server3將成為Leader。

由上面規(guī)則可知,通常那臺服務(wù)器上的數(shù)據(jù)越新(ZXID會越大),其成為Leader的可能性越大,也就越能夠保證數(shù)據(jù)的恢復(fù)。如果ZXID相同,則SID越大機會越大。

Leader選舉實現(xiàn)細節(jié)

  1. 服務(wù)器狀態(tài): 服務(wù)器具有四種狀態(tài),分別是LOOKING、FOLLOWING、LEADING、OBSERVING。
  LOOKING:尋找Leader狀態(tài)。當服務(wù)器處于該狀態(tài)時,它會認為當前集群中沒有Leader,因此需要進入Leader選舉狀態(tài)。
  FOLLOWING:跟隨者狀態(tài)。表明當前服務(wù)器角色是Follower。
  LEADING:領(lǐng)導者狀態(tài)。表明當前服務(wù)器角色是Leader。
  OBSERVING:觀察者狀態(tài)。表明當前服務(wù)器角色是Observer。
  1. 投票數(shù)據(jù)結(jié)構(gòu)
      每個投票中包含了兩個最基本的信息,所推舉服務(wù)器的SID和ZXID,投票(Vote)在Zookeeper中包含字段如下
   id:被推舉的Leader的SID。

  zxid:被推舉的Leader事務(wù)ID。

  electionEpoch:邏輯時鐘,用來判斷多個投票是否在同一輪選舉周期中,該值在服務(wù)端是一個自增序列,每次進入新一輪的投票后,都會對該值進行加1操作。

  peerEpoch:被推舉的Leader的epoch。

  state:當前服務(wù)器的狀態(tài)。
  1. QuorumCnxManager:網(wǎng)絡(luò)I/O
      每臺服務(wù)器在啟動的過程中,會啟動一個QuorumPeerManager,負責各臺服務(wù)器之間的底層Leader選舉過程中的網(wǎng)絡(luò)通信。
  • 消息隊列。QuorumCnxManager內(nèi)部維護了一系列的隊列,用來保存接收到的、待發(fā)送的消息以及消息的發(fā)送器,除接收隊列以外,其他隊列都按照SID分組形成隊列集合,如一個集群中除了自身還有3臺機器,那么就會為這3臺機器分別創(chuàng)建一個發(fā)送隊列,互不干擾。
recvQueue:消息接收隊列,用于存放那些從其他服務(wù)器接收到的消息。

queueSendMap:消息發(fā)送隊列,用于保存那些待發(fā)送的消息,按照SID進行分組。

senderWorkerMap:發(fā)送器集合,每個SenderWorker消息發(fā)送器,都對應(yīng)一臺遠程Zookeeper服務(wù)器,負責消息的發(fā)送,也按照SID進行分組。

lastMessageSent:最近發(fā)送過的消息,為每個SID保留最近發(fā)送過的一個消息。
  • 建立連接。為了能夠相互投票,Zookeeper集群中的所有機器都需要兩兩建立起網(wǎng)絡(luò)連接。QuorumCnxManager在啟動時會創(chuàng)建一個ServerSocket來監(jiān)聽Leader選舉的通信端口(默認為3888)。開啟監(jiān)聽后,Zookeeper能夠不斷地接收到來自其他服務(wù)器的創(chuàng)建連接請求,在接收到其他服務(wù)器的TCP連接請求時,會進行處理。為了避免兩臺機器之間重復(fù)地創(chuàng)建TCP連接,Zookeeper只允許SID大的服務(wù)器主動和其他機器建立連接,否則斷開連接。在接收到創(chuàng)建連接請求后,服務(wù)器通過對比自己和遠程服務(wù)器的SID值來判斷是否接收連接請求,如果當前服務(wù)器發(fā)現(xiàn)自己的SID更大,那么會斷開當前連接,然后自己主動和遠程服務(wù)器建立連接。一旦連接建立,就會根據(jù)遠程服務(wù)器的SID來創(chuàng)建相應(yīng)的消息發(fā)送器SendWorker和消息接收器RecvWorker,并啟動。

  • 消息接收與發(fā)送。消息接收:由消息接收器RecvWorker負責,由于Zookeeper為每個遠程服務(wù)器都分配一個單獨的RecvWorker,因此,每個RecvWorker只需要不斷地從這個TCP連接中讀取消息,并將其保存到recvQueue隊列中。消息發(fā)送:由于Zookeeper為每個遠程服務(wù)器都分配一個單獨的SendWorker,因此,每個SendWorker只需要不斷地從對應(yīng)的消息發(fā)送隊列中獲取出一個消息發(fā)送即可,同時將這個消息放入lastMessageSent中。在SendWorker中,一旦Zookeeper發(fā)現(xiàn)針對當前服務(wù)器的消息發(fā)送隊列為空,那么此時需要從lastMessageSent中取出一個最近發(fā)送過的消息來進行再次發(fā)送,這是為了解決接收方在消息接收前或者接收到消息后服務(wù)器掛了,導致消息尚未被正確處理。同時,Zookeeper能夠保證接收方在處理消息時,會對重復(fù)消息進行正確的處理。

  1. FastLeaderElection:選舉算法核心
外部投票:特指其他服務(wù)器發(fā)來的投票。

內(nèi)部投票:服務(wù)器自身當前的投票。

選舉輪次:Zookeeper服務(wù)器Leader選舉的輪次,即logicalclock。

PK:對內(nèi)部投票和外部投票進行對比來確定是否需要變更內(nèi)部投票。
  • 選票管理
sendqueue:選票發(fā)送隊列,用于保存待發(fā)送的選票。

recvqueue:選票接收隊列,用于保存接收到的外部投票。

WorkerReceiver:選票接收器。其會不斷地從QuorumCnxManager中獲取其他服務(wù)器發(fā)來的選舉消息,并將其轉(zhuǎn)換成一個選票,然后保存到recvqueue中,在選票接收過程中,如果發(fā)現(xiàn)該外部選票的選舉輪次小于當前服務(wù)器的,那么忽略該外部投票,同時立即發(fā)送自己的內(nèi)部投票。

WorkerSender:選票發(fā)送器,不斷地從sendqueue中獲取待發(fā)送的選票,并將其傳遞到底層QuorumCnxManager中。

算法核心

image.png

上圖展示了FastLeaderElection模塊是如何與底層網(wǎng)絡(luò)I/O進行交互的。Leader選舉的基本流程如下

  • 自增選舉輪次。Zookeeper規(guī)定所有有效的投票都必須在同一輪次中,在開始新一輪投票時,會首先對logicalclock進行自增操作。

  • 初始化選票。在開始進行新一輪投票之前,每個服務(wù)器都會初始化自身的選票,并且在初始化階段,每臺服務(wù)器都會將自己推舉為Leader。

  • 發(fā)送初始化選票。完成選票的初始化后,服務(wù)器就會發(fā)起第一次投票。Zookeeper會將剛剛初始化好的選票放入sendqueue中,由發(fā)送器WorkerSender負責發(fā)送出去。

  • 接收外部投票。每臺服務(wù)器會不斷地從recvqueue隊列中獲取外部選票。如果服務(wù)器發(fā)現(xiàn)無法獲取到任何外部投票,那么就會立即確認自己是否和集群中其他服務(wù)器保持著有效的連接,如果沒有連接,則馬上建立連接,如果已經(jīng)建立了連接,則再次發(fā)送自己當前的內(nèi)部投票。

  • 判斷選舉輪次。在發(fā)送完初始化選票之后,接著開始處理外部投票。在處理外部投票時,會根據(jù)選舉輪次來進行不同的處理。

    · 外部投票的選舉輪次大于內(nèi)部投票。若服務(wù)器自身的選舉輪次落后于該外部投票對應(yīng)服務(wù)器的選舉輪次,那么就會立即更新自己的選舉輪次(logicalclock),并且清空所有已經(jīng)收到的投票,然后使用初始化的投票來進行PK以確定是否變更內(nèi)部投票。最終再將內(nèi)部投票發(fā)送出去。

    · 外部投票的選舉輪次小于內(nèi)部投票。若服務(wù)器接收的外選票的選舉輪次落后于自身的選舉輪次,那么Zookeeper就會直接忽略該外部投票,不做任何處理,并返回步驟4。

    · 外部投票的選舉輪次等于內(nèi)部投票。此時可以開始進行選票PK。
  • 選票PK。在進行選票PK時,符合任意一個條件就需要變更投票。
    · 若外部投票中推舉的Leader服務(wù)器的選舉輪次大于內(nèi)部投票,那么需要變更投票。

    · 若選舉輪次一致,那么就對比兩者的ZXID,若外部投票的ZXID大,那么需要變更投票。

    · 若兩者的ZXID一致,那么就對比兩者的SID,若外部投票的SID大,那么就需要變更投票。
  • 變更投票。經(jīng)過PK后,若確定了外部投票優(yōu)于內(nèi)部投票,那么就變更投票,即使用外部投票的選票信息來覆蓋內(nèi)部投票,變更完成后,再次將這個變更后的內(nèi)部投票發(fā)送出去。

  • 選票歸檔。無論是否變更了投票,都會將剛剛收到的那份外部投票放入選票集合recvset中進行歸檔。recvset用于記錄當前服務(wù)器在本輪次的Leader選舉中收到的所有外部投票(按照服務(wù)隊的SID區(qū)別,如{(1, vote1), (2, vote2)...})。

  • 統(tǒng)計投票。完成選票歸檔后,就可以開始統(tǒng)計投票,統(tǒng)計投票是為了統(tǒng)計集群中是否已經(jīng)有過半的服務(wù)器認可了當前的內(nèi)部投票,如果確定已經(jīng)有過半服務(wù)器認可了該投票,則終止投票。否則重新選舉。

  • 更新服務(wù)器狀態(tài)。若已經(jīng)確定可以終止投票,那么就開始更新服務(wù)器狀態(tài),服務(wù)器首選判斷當前被過半服務(wù)器認可的投票所對應(yīng)的Leader服務(wù)器是否是自己,若是自己,則將自己的服務(wù)器狀態(tài)更新為LEADING,若不是,則根據(jù)具體情況來確定自己是FOLLOWING或是OBSERVING。
      以上10個步驟就是FastLeaderElection的核心,選舉過程可能會經(jīng)過幾輪循環(huán),直到有Leader選舉產(chǎn)生。

源碼分析

每臺服務(wù)器在啟動的過程中,會啟動一個 QuorumPeerManager,負責各臺服務(wù)器之間的底層 Leader 選舉過程中的網(wǎng)絡(luò)通信。

  1. 初始化 QuorumCnxManager
 protected Election createElectionAlgorithm(int electionAlgorithm){
        Election le=null;
                
        //TODO: use a factory rather than a switch
        switch (electionAlgorithm) {
        case 0:
            le = new LeaderElection(this);
            break;
        case 1:
            le = new AuthFastLeaderElection(this);
            break;
        case 2:
            le = new AuthFastLeaderElection(this, true);
            break;
        case 3:
            qcm = createCnxnManager();
            QuorumCnxManager.Listener listener = qcm.listener;
            if(listener != null){
                listener.start();
                le = new FastLeaderElection(this, qcm);
            } else {
                LOG.error("Null listener when initializing cnx manager");
            }
            break;
        default:
            assert false;
        }
        return le;
    }

在 createElectionAlgorithm 會啟動 QuorumCnxManager

public QuorumCnxManager(QuorumPeer self) {
    this.recvQueue = new ArrayBlockingQueue<Message>(RECV_CAPACITY);
    this.queueSendMap = new ConcurrentHashMap<Long, ArrayBlockingQueue<ByteBuffer>>();
    this.senderWorkerMap = new ConcurrentHashMap<Long, SendWorker>();
    this.lastMessageSent = new ConcurrentHashMap<Long, ByteBuffer>();
    
    String cnxToValue = System.getProperty("zookeeper.cnxTimeout");
    if(cnxToValue != null){
        this.cnxTO = Integer.parseInt(cnxToValue);
    }
    
    this.self = self;

    // Starts listener thread that waits for connection requests 
    listener = new Listener();
    listener.setName("QuorumPeerListener");
}

可以看到QuorumCnxManager 內(nèi)部維護了一系列的隊列,用來保存接收到的、待發(fā)送的消息以及消息的發(fā)送器,除接收隊列以外,其他隊列都按照 SID 分組形成隊列集合。


recvQueue 消息接收隊列,用于存放那些從其他服務(wù)器接收到的消息。
queueSendMap 消息發(fā)送隊列,用于保存那些待發(fā)送的消息,按照 SID 進行分組。
senderWorkerMap 發(fā)送器集合,每個 SenderWorker 消息發(fā)送器,都對應(yīng)一臺遠程 Zookeeper 服務(wù)器,負責消息的發(fā)送,也按照 SID 進行分組。
lastMessageSent 最近發(fā)送過的消息,為每個 SID 保留最近發(fā)送過的一個消息。

Listener: 可以看到 Listener 初始化了一個 ServerSocket,默認端口為 3888 進行底層 Leader 選舉通信。

/**
     * Thread to listen on some port
     */
    public class Listener extends ZooKeeperThread {

        volatile ServerSocket ss = null;

        public Listener() {
            // During startup of thread, thread name will be overridden to
            // specific election address
            super("ListenerThread");
        }

        /**
         * Sleeps on accept().
         */
        @Override
        public void run() {
            int numRetries = 0;
            InetSocketAddress addr;
            while((!shutdown) && (numRetries < 3)){
                try {
                    ss = new ServerSocket();
                    ss.setReuseAddress(true);
                    if (listenOnAllIPs) {
                        int port = view.get(QuorumCnxManager.this.mySid)
                            .electionAddr.getPort();
                        addr = new InetSocketAddress(port);
                    } else {
                        addr = view.get(QuorumCnxManager.this.mySid)
                            .electionAddr;
                    }
                    LOG.info("My election bind port: " + addr.toString());
                    setName(view.get(QuorumCnxManager.this.mySid)
                            .electionAddr.toString());
                    ss.bind(addr);
                    while (!shutdown) {
                        Socket client = ss.accept();
                        setSockOpts(client);
                        LOG.info("Received connection request "
                                + client.getRemoteSocketAddress());

                        // Receive and handle the connection request
                        // asynchronously if the quorum sasl authentication is
                        // enabled. This is required because sasl server
                        // authentication process may take few seconds to finish,
                        // this may delay next peer connection requests.
                        if (quorumSaslAuthEnabled) {
                            receiveConnectionAsync(client);
                        } else {
                            receiveConnection(client);
                        }

                        numRetries = 0;
                    }
                } catch (IOException e) {
                    LOG.error("Exception while listening", e);
                    numRetries++;
                    try {
                        ss.close();
                        Thread.sleep(1000);
                    } catch (IOException ie) {
                        LOG.error("Error closing server socket", ie);
                    } catch (InterruptedException ie) {
                        LOG.error("Interrupted while sleeping. " +
                                  "Ignoring exception", ie);
                    }
                }
            }
            LOG.info("Leaving listener");
            if (!shutdown) {
                LOG.error("As I'm leaving the listener thread, "
                        + "I won't be able to participate in leader "
                        + "election any longer: "
                        + view.get(QuorumCnxManager.this.mySid).electionAddr);
            }
        }
        

為了避免兩臺機器之間重復(fù)地創(chuàng)建 TCP 連接,Zookeeper 只允許 SID 大的服務(wù)器主動和其他機器建立連接,否則斷開連接。在接收到創(chuàng)建連接請求后,服務(wù)器通過對比自己和遠程服務(wù)器的 SID 值來判斷是否接收連接請求,如果當前服務(wù)器發(fā)現(xiàn)自己的 SID 更大,那么會斷開當前連接,然后自己主動和遠程服務(wù)器建立連接。一旦連接建立,就會根據(jù)遠程服務(wù)器的 SID 來創(chuàng)建相應(yīng)的消息發(fā)送器 SendWorker 和消息接收器 RecvWorker,并啟動。每個 RecvWorker 只需要不斷地從這個 TCP 連接中讀取消息,并將其保存到 recvQueue 隊列中。每個 SendWorker 只需要不斷地從對應(yīng)的消息發(fā)送隊列中獲取出一個消息發(fā)送即可,同時將這個消息放入 lastMessageSent 中。

 public void receiveConnection(final Socket sock) {
        DataInputStream din = null;
        try {
            din = new DataInputStream(
                    new BufferedInputStream(sock.getInputStream()));

            handleConnection(sock, din);
        } catch (IOException e) {
            LOG.error("Exception handling connection, addr: {}, closing server connection",
                     sock.getRemoteSocketAddress());
            closeSocket(sock);
        }
    }


    private void handleConnection(Socket sock, DataInputStream din)
            throws IOException {
        Long sid = null;
        try {
            // Read server id
            sid = din.readLong();
            if (sid < 0) { // this is not a server id but a protocol version (see ZOOKEEPER-1633)
                sid = din.readLong();

                // next comes the #bytes in the remainder of the message
                // note that 0 bytes is fine (old servers)
                int num_remaining_bytes = din.readInt();
                if (num_remaining_bytes < 0 || num_remaining_bytes > maxBuffer) {
                    LOG.error("Unreasonable buffer length: {}", num_remaining_bytes);
                    closeSocket(sock);
                    return;
                }
                byte[] b = new byte[num_remaining_bytes];

                // remove the remainder of the message from din
                int num_read = din.read(b);
                if (num_read != num_remaining_bytes) {
                    LOG.error("Read only " + num_read + " bytes out of " + num_remaining_bytes + " sent by server " + sid);
                }
            }
            if (sid == QuorumPeer.OBSERVER_ID) {
                /*
                 * Choose identifier at random. We need a value to identify
                 * the connection.
                 */
                sid = observerCounter.getAndDecrement();
                LOG.info("Setting arbitrary identifier to observer: " + sid);
            }
        } catch (IOException e) {
            closeSocket(sock);
            LOG.warn("Exception reading or writing challenge: " + e.toString());
            return;
        }

        // do authenticating learner
        LOG.debug("Authenticating learner server.id: {}", sid);
        authServer.authenticate(sock, din);

        //If wins the challenge, then close the new connection.
        if (sid < this.mySid) {
            /*
             * This replica might still believe that the connection to sid is
             * up, so we have to shut down the workers before trying to open a
             * new connection.
             */
            SendWorker sw = senderWorkerMap.get(sid);
            if (sw != null) {
                sw.finish();
            }

            /*
             * Now we start a new connection
             */
            LOG.debug("Create new connection to server: " + sid);
            closeSocket(sock);
            connectOne(sid);

            // Otherwise start worker threads to receive data.
        } else {
            SendWorker sw = new SendWorker(sock, sid);
            RecvWorker rw = new RecvWorker(sock, din, sid, sw);
            sw.setRecv(rw);

            SendWorker vsw = senderWorkerMap.get(sid);
            
            if(vsw != null)
                vsw.finish();
            
            senderWorkerMap.put(sid, sw);
            
            if (!queueSendMap.containsKey(sid)) {
                queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
                        SEND_CAPACITY));
            }
            
            sw.start();
            rw.start();
            
            return;
        }
    }

FastLeaderElection選舉算法

    public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager){
        this.stop = false;
        this.manager = manager;
        starter(self, manager);
    }
 private void starter(QuorumPeer self, QuorumCnxManager manager) {
        this.self = self;
        proposedLeader = -1;
        proposedZxid = -1;

        sendqueue = new LinkedBlockingQueue<ToSend>();
        recvqueue = new LinkedBlockingQueue<Notification>();
        this.messenger = new Messenger(manager);
    }
 /**
         * Constructor of class Messenger.
         *
         * @param manager   Connection manager
         */
        Messenger(QuorumCnxManager manager) {
    // 1. WorkerSender 選票接收器,負責從 QuorumCnxManager 接收選票后保存到 recvqueue 中
            this.ws = new WorkerSender(manager);

            Thread t = new Thread(this.ws,
                    "WorkerSender[myid=" + self.getId() + "]");
            t.setDaemon(true);
            t.start();
 // 2. WorkerReceiver 選票發(fā)送器,負責從 sendqueue 中獲取待發(fā)送的選票并傳遞給 QuorumCnxManager
            this.wr = new WorkerReceiver(manager);

            t = new Thread(this.wr,
                    "WorkerReceiver[myid=" + self.getId() + "]");
            t.setDaemon(true);
            t.start();
        }

在 FastLeaderElection 中有幾個屬性需要我們重點關(guān)注一下:

sendqueue 選票發(fā)送隊列,用于保存待發(fā)送的選票。

recvqueue 選票接收隊列,用于保存接收到的外部投票。

WorkerReceiver 選票接收器。其會不斷地從 QuorumCnxManager 中獲取其他服務(wù)器發(fā)來的選舉消息,并將其轉(zhuǎn)換成一個選票,然后保存到 recvqueue 中,在選票接收過程中,如果發(fā)現(xiàn)該外部選票的選舉輪次小于當前服務(wù)器的,那么忽略該外部投票,同時立即發(fā)送自己的內(nèi)部投票。

WorkerSender 選票發(fā)送器,不斷地從 sendqueue 中獲取待發(fā)送的選票,并將其傳遞到底層 QuorumCnxManager 中。

lookForLeader

    ...
  try {
                            roZkMgr.start();
                            setBCVote(null);
                            setCurrentVote(makeLEStrategy().lookForLeader());
                        }
 ...
public Vote lookForLeader() throws InterruptedException {
    // 省略...
    if (self.start_fle == 0) {
       self.start_fle = Time.currentElapsedTime();
    }
    try {
        HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();
        HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();
        int notTimeout = finalizeWait;

        // 1. 啟動時先投自己一票并廣播給其它服務(wù)器
        synchronized(this){
            logicalclock.incrementAndGet();
            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
        }
        sendNotifications();

        while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
            // 2. 獲取其它服務(wù)器發(fā)送過來的選票
            Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);

            // 3. 如果沒有選票,則先判斷是否存在連接,如存在則是先投自己一票,如沒則立即連接
            if(n == null){
                if(manager.haveDelivered()){
                    sendNotifications();
                } else {
                    manager.connectAll();
                }
                // 省略...
            } 
            // 4. 收到投票信息,根據(jù) LOOKING、OBSERVING、FOLLOWING、LEADING 分別處理
            else if (self.getCurrentAndNextConfigVoters().contains(n.sid)) {
                switch (n.state) {
                // 5. LOOKING 時才會進行選舉
                case LOOKING:
                    // 5.1 判斷投票是否過時,如果自己過時就清除之前已經(jīng)接收到的信息
                    if (n.electionEpoch > logicalclock.get()) {
                        logicalclock.set(n.electionEpoch);
                        recvset.clear();
                        // 重新發(fā)起投票,PK 一下:如果收到的票據(jù)大則更新票據(jù),否則仍投自己一票
                        if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                            // 更新票據(jù)
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                        } else {
                            // 仍投自己一票
                            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                        }
                        sendNotifications();
                    // 5.2 收到的票據(jù)過時則直接忽略
                    } else if (n.electionEpoch < logicalclock.get()) {
                        break;
                    // 5.3 epoch 相等則要 PK
                    } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                            proposedLeader, proposedZxid, proposedEpoch)) {
                        updateProposal(n.leader, n.zxid, n.peerEpoch);
                        sendNotifications();
                    }

                    recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

                    // 5.4 統(tǒng)計誰的投票超過半數(shù),就成為 Leader
                    if (termPredicate(recvset, new Vote(proposedLeader, proposedZxid,
                                    logicalclock.get(), proposedEpoch))) {

                        // 5.5 再等一會兒(200ms),看是否有新的投票
                        while((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
                            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    proposedLeader, proposedZxid, proposedEpoch)) {
                                recvqueue.put(n);
                                break;
                            }
                        }

                        // 5.6 如果沒有發(fā)生新的投票,則結(jié)束選舉過程則結(jié)束選舉,修改狀態(tài)為 LEADING
                        if (n == null) {
                            self.setPeerState((proposedLeader == self.getId()) ? ServerState.LEADING: learningState());
                            Vote endVote = new Vote(proposedLeader, proposedZxid, proposedEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }
                    break;
                // 6. OBSERVING 不能與投票
                case OBSERVING:
                    LOG.debug("Notification from observer: " + n.sid);
                    break;
                // 7. FOLLOWING、LEADING 說明已存在 Leader。
                //    可能在同一輪選舉中,也可能是之前就存在的 Leader ,則不在同一輪選舉中
                case FOLLOWING:
                case LEADING:
                    // 7.1 在同一輪選舉中,則收集所有的選票放到 recvset 中
                    //     如有半數(shù)支持則更新狀態(tài)退出選舉
                    if(n.electionEpoch == logicalclock.get()) {
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                        if(termPredicate(recvset, new Vote(n.leader,
                                        n.zxid, n.electionEpoch, n.peerEpoch, n.state))
                                        && checkLeader(outofelection, n.leader, n.electionEpoch)) {
                            self.setPeerState((n.leader == self.getId()) ?
                                    ServerState.LEADING: learningState());

                            Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }

                    // 7.2 如果收到的 logicalclock 與當前不相等,那說明在另一個選舉中已經(jīng)有了結(jié)果(Leader 已存在)
                    //     收集所有的選票到 outofelection 中,如有半數(shù)支持則更新狀態(tài)退出選舉
                    outofelection.put(n.sid, new Vote(n.leader, IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state));
                    if (termPredicate(outofelection, new Vote(n.leader,
                            IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state))
                            && checkLeader(outofelection, n.leader, IGNOREVALUE)) {
                        synchronized(this){
                            logicalclock.set(n.electionEpoch);
                            self.setPeerState((n.leader == self.getId()) ?
                                    ServerState.LEADING: learningState());
                        }
                        Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
                        leaveInstance(endVote);
                        return endVote;
                    }
                    break;
                default:
                    break;
                }
            } else {
                LOG.warn("Ignoring notification from non-cluster member " + n.sid);
            }
        }
        return null;
    } finally {
        // 省略...
    }
}

Leader 選舉有兩個函數(shù)需要重點關(guān)注一下,totalOrderPredicate() 對兩張選票進行 PK,termPredicate() 判斷投票是否可以結(jié)束了。

totalOrderPredicate(PK 選票)

// id(sid) zxid(事務(wù)id) epoch(選舉輪數(shù),每更新一次 Leader 自增 1)
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, 
        long curId, long curZxid, long curEpoch) {
    /*
     * 1- New epoch is higher
     * 2- New epoch is the same as current epoch, but new zxid is higher
     * 3- New epoch is the same as current epoch, new zxid is the same
     *  as current zxid, but server id is higher.
     */
    return ((newEpoch > curEpoch) ||
            ((newEpoch == curEpoch) &&
            ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}

termPredicate(結(jié)束投票)

/ 票據(jù)占多數(shù)則結(jié)束選舉
private boolean termPredicate(HashMap<Long, Vote> votes, Vote vote) {
    SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
    voteSet.addQuorumVerifier(self.getQuorumVerifier());
    if (self.getLastSeenQuorumVerifier() != null
            && self.getLastSeenQuorumVerifier().getVersion() > self
                    .getQuorumVerifier().getVersion()) {
        voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
    }

    // 將支持 vote 的票據(jù)放到 set 集合中(Set 可去重)
    for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
        if (vote.equals(entry.getValue())) {
            voteSet.addAck(entry.getKey());
        }
    }
    
    // self.getQuorumVerifier().containsQuorum(set)
    return voteSet.hasAllQuorums();
}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 【轉(zhuǎn)自】http://www.cnblogs.com/leesf456/p/6107600.html 一、前言 前...
    lxqfirst閱讀 897評論 0 0
  • 一、Leader選舉過程 Leader選舉是保證分布式數(shù)據(jù)一致性的關(guān)鍵所在。當Zookeeper集群中的一臺服務(wù)器...
    yannhuang閱讀 1,295評論 0 2
  • 一、前言 前面學習了Zookeeper服務(wù)端的相關(guān)細節(jié),其中對于集群啟動而言,很重要的一部分就是Leader選舉,...
    阿斯蒂芬2閱讀 17,733評論 4 19
  • zookeeper集群中往往需要在集群服務(wù)器中選舉出一個Leader,Leader選舉是保證分布式數(shù)據(jù)一致性的關(guān)鍵...
    探索者_逗你玩兒閱讀 520評論 0 0
  • 若進行Leader選舉,則至少需要兩臺機器,這里選取3臺機器組成的服務(wù)器集群為例。在集群初始化階段,當有一臺服務(wù)器...
    白紙糊閱讀 1,023評論 0 1

友情鏈接更多精彩內(nèi)容