ZooKeeper 源碼分析 Leader/Follower 啟動, Leader 選舉, Leader/Follower 建立 (基于3.4.6)

1. ZooKeeper Leader/Follower 啟動, Leader 選舉, Leader/Follower 建立 概述

先看一下下面這張圖:

zookeeper啟動.png

上面這張圖片有點大, 建議在 百度云 里面進行下載預覽, 接下來我們會一步一步進行下去
PS: 吐槽一下簡書的圖片系統(tǒng), 圖片一旦大了就預覽出問題(不清晰)

2. QuorumPeerMain 解析配置文件構建 QuorumPeer

下面的代碼主要是從配置文件中獲取配置信息, 構建 QuorumPeer

// 根據 配置 QuorumPeerConfig 來啟動  QuorumPeer
public void runFromConfig(QuorumPeerConfig config) throws IOException {
    LOG.info("QuorumPeerConfig : " + config);
  try {
      ManagedUtil.registerLog4jMBeans();
  } catch (JMException e) {
      LOG.warn("Unable to register log4j JMX control", e);
  }

  LOG.info("Starting quorum peer");
  try {                                                                         // 1. 在 ZooKeeper 集群中, 每個 QuorumPeer 代表一個 服務
      ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
      cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns());

      quorumPeer = new QuorumPeer();
      quorumPeer.setClientPortAddress(config.getClientPortAddress());
      quorumPeer.setTxnFactory(new FileTxnSnapLog(                              // 2. 設置 FileTxnSnapLog(這個類包裹 TxnLog, SnapShot)
              new File(config.getDataLogDir()),
              new File(config.getDataDir())));
      quorumPeer.setQuorumPeers(config.getServers());                           // 3. 集群中所有機器
      quorumPeer.setElectionType(config.getElectionAlg());                      // 4. 設置集群 Leader 選舉所使用的的算法(默認值 3, 代表 FastLeaderElection)
      quorumPeer.setMyid(config.getServerId());                                 // 5. 每個 QuorumPeer 設置一個 myId 用于區(qū)分集群中的各個節(jié)點
      quorumPeer.setTickTime(config.getTickTime());
      quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());           // 6. 客戶端最小的 sessionTimeout 時間(若不設置的話, 就是 tickTime * 2)
      quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());           // 7. 客戶端最小的 sessionTimeout 時間(若不設置的話, 就是 tickTime * 20)
      quorumPeer.setInitLimit(config.getInitLimit());                           // 8. 最常用的就是 initLimit * tickTime, getEpochToPropose(等待集群中所有節(jié)點的 Epoch值 ) waitForEpochAck(在 Leader 建立過程中, Leader 會向所有節(jié)點發(fā)送 LEADERINFO, 而Follower 節(jié)點會回復ACKEPOCH) waitForNewLeaderAck(在 Leader 建立的過程中, Leader 會向 Follower 發(fā)送 NEWLEADER, waitForNewLeaderAck 就是等候所有Follower 回復對應的 ACK 值)
      quorumPeer.setSyncLimit(config.getSyncLimit());                           // 9. 常用方法 self.tickTime * self.syncLimit 用于限制集群中各個節(jié)點相互連接的 socket 的soTimeout
      quorumPeer.setQuorumVerifier(config.getQuorumVerifier());                 // 10.投票方法, 默認超過半數就通過 (默認值 QuorumMaj)
      quorumPeer.setCnxnFactory(cnxnFactory);                                   // 11.設置集群節(jié)點接收client端連接使用的 nioCnxnFactory(用 基于原生 java nio, netty nio) (PS 在原生 NIO 的類中發(fā)現代碼中沒有處理 java nio CPU 100% 的bug)
      quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));     // 12.設置 ZKDataBase
      quorumPeer.setLearnerType(config.getPeerType());                          // 13.設置節(jié)點的類別 (參與者/觀察者)
      quorumPeer.setSyncEnabled(config.getSyncEnabled());                       // 14.這個參數主要用于 (Observer Enables/Disables sync request processor. This option is enable by default and is to be used with observers.) 就是 Observer 是否使用 SyncRequestProcessor
      quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());

      quorumPeer.start();                                                       // 15.開啟服務
      LOG.info("quorumPeer.join begin");
      quorumPeer.join();                                                        // 16.等到 線程 quorumPeer 執(zhí)行完成, 程序才會繼續(xù)向下再執(zhí)行, 詳情見方法注解 (Waits for this thread to die.)
      LOG.info("quorumPeer.join end");
  } catch (InterruptedException e) {
      // warn, but generally this is ok
      LOG.warn("Quorum Peer interrupted", e);
  }
}
3. QuorumPeer 啟動

主要是 加載數據到DataTree, 開啟監(jiān)聽客戶端連接, 開啟Leader選舉, 最后程序會在 QuorumPeer.run() 的while loop 里面

public synchronized void start() {
    loadDataBase();           // 從SnapShot,TxnFile 加載數據到 DataTree
    cnxnFactory.start();      // 開啟服務端的 端口監(jiān)聽
    startLeaderElection();    // 開啟 Leader 選舉線程
    super.start();            // 這一步 開啟 Thread.run() 方法
}
4. QuorumPeer.loadDataBase

從 snapshot/TxnLog里面加載數據到DataTree里面

  // 經過下面的操作, 就會存在 currentEpoch, acceptEpoch 文件, 并且 DataTree 文件也會進行加載
  private void loadDataBase() {
      File updating = new File(getTxnFactory().getSnapDir(),                // 1. 在 snap shot 文件目錄下面有對應的 updateEpoch 文件
                               UPDATING_EPOCH_FILENAME);
  try {
          zkDb.loadDataBase();                                              // 2. 從 snapshot, TxnLog 里面加載出 dataTree 及 sessionsWithTimeouts

          // load the epochs
          long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;    // 3. 獲取 zkDb 對應的處理過的 最新的一個 zxid 的值
      long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid); // 4. 將 zxid 的高 32 位當做 epoch 值, 低 32 位才是 zxid
          try {
            currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);      // 5. 從文件中加載 epoch 值 (若不存在 currentEpoch 文件, 則直接在 catch 中執(zhí)行代碼, 而且一般都是這樣)
              if (epochOfZxid > currentEpoch && updating.exists()) {        // 6. 此處說明 QuorumPeer 在進行 takeSnapShot 后, 進程直接掛了, 還沒來得及更新 currentEpoch
                  LOG.info("{} found. The server was terminated after " +
                           "taking a snapshot but before updating current " +
                           "epoch. Setting current epoch to {}.",
                           UPDATING_EPOCH_FILENAME, epochOfZxid);
                  setCurrentEpoch(epochOfZxid);
                  if (!updating.delete()) {
                      throw new IOException("Failed to delete " +
                                            updating.toString());
                  }
              }
          } catch(FileNotFoundException e) {
            // pick a reasonable epoch number
            // this should only happen once when moving to a
            // new code version
            currentEpoch = epochOfZxid;                                    // 7. 遇到的是 currentEpoch 文件不存在, 直接運行到這里了
            LOG.info(CURRENT_EPOCH_FILENAME
                    + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
                    currentEpoch);
            writeLongToFile(CURRENT_EPOCH_FILENAME, currentEpoch);
          }
          if (epochOfZxid > currentEpoch) {
            throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + ", is older than the last zxid, " + lastProcessedZxid);
          }
          try {
            acceptedEpoch = readLongFromFile(ACCEPTED_EPOCH_FILENAME);     // 8. 從文件中讀取當前接收到的 epoch 值
          } catch(FileNotFoundException e) {
            // pick a reasonable epoch number
            // this should only happen once when moving to a
            // new code version
            acceptedEpoch = epochOfZxid;                                   // 9. 當從 acceptEpoch 文件里面讀取數據失敗時, 就直接運行這邊的代碼
            LOG.info(ACCEPTED_EPOCH_FILENAME
                    + " not found! Creating with a reasonable default of {}. This should only happen when you are upgrading your installation",
                    acceptedEpoch);
            writeLongToFile(ACCEPTED_EPOCH_FILENAME, acceptedEpoch);       // 10 將 acceptEpoch 值直接寫入到對應的文件里面
          }
          if (acceptedEpoch < currentEpoch) {
            throw new IOException("The current epoch, " + ZxidUtils.zxidToString(currentEpoch) + " is less than the accepted epoch, " + ZxidUtils.zxidToString(acceptedEpoch));
          }
      } catch(IOException ie) {
          LOG.error("Unable to load database on disk", ie);
          throw new RuntimeException("Unable to run quorum server ", ie);
      }
}
5. QuorumPeer.startLeaderElection

創(chuàng)建 Leader 選舉的算法及開啟 QuorumCnxManager.Listener 監(jiān)聽集群中的節(jié)點相互連接

// 開啟 leader 的選舉操作
synchronized public void startLeaderElection() {
  try {
    currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());  // 1. 生成投給自己的選票
  } catch(IOException e) {
    RuntimeException re = new RuntimeException(e.getMessage());
    re.setStackTrace(e.getStackTrace());
    throw re;
  }
    for (QuorumServer p : getView().values()) {                                // 2. 獲取集群里面的所有的機器
        if (p.id == myid) {
            myQuorumAddr = p.addr;
            break;
        }
    }
    if (myQuorumAddr == null) {
        throw new RuntimeException("My id " + myid + " not in the peer list");
    }
    if (electionType == 0) {                                                   // 3. 現在默認的選舉算法是 FastLeaderElection
        try {
            udpSocket = new DatagramSocket(myQuorumAddr.getPort());
            responder = new ResponderThread();
            responder.start();
        } catch (SocketException e) {
            throw new RuntimeException(e);
        }
    }
    this.electionAlg = createElectionAlgorithm(electionType);                  // 4. 創(chuàng)建 Election
}

protected Election createElectionAlgorithm(int electionAlgorithm){
    Election le=null;
            
    //TODO: use a factory rather than a switch
    switch (electionAlgorithm) {
    case 0:
        le = new LeaderElection(this);
        break;
    case 1:
        le = new AuthFastLeaderElection(this);
        break;
    case 2:
        le = new AuthFastLeaderElection(this, true);
        break;
    case 3:                                                                 // 1. 默認的 leader 選舉的算法
        qcm = new QuorumCnxManager(this);
        QuorumCnxManager.Listener listener = qcm.listener;                  // 2. 等待集群中的其他的機器進行連接
        if(listener != null){
            listener.start();
            le = new FastLeaderElection(this, qcm);
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }
        break;
    default:
        assert false;
    }
    return le;
}

Listener 在監(jiān)聽到有其他節(jié)點連接上, 則進行相應的處理

6. QuorumCnxManager.Listener
/**
 * Sleeps on accept().
 */
@Override
public void run() {
    int numRetries = 0;
    InetSocketAddress addr;
    while((!shutdown) && (numRetries < 3)){       // 1. 有個疑惑 若真的出現 numRetries >= 3 從而退出了, 怎么辦
        try {
            ss = new ServerSocket();
            ss.setReuseAddress(true);
            if (self.getQuorumListenOnAllIPs()) { // 2. 這里的默認值 quorumListenOnAllIPs 是 false
                int port = self.quorumPeers.get(self.getId()).electionAddr.getPort();
                addr = new InetSocketAddress(port);
            } else {
                addr = self.quorumPeers.get(self.getId()).electionAddr;
            }
            LOG.info("My election bind port: " + addr.toString());
            setName(self.quorumPeers.get(self.getId()).electionAddr
                    .toString());
            ss.bind(addr);
            while (!shutdown) {
                Socket client = ss.accept();     // 3. 這里會阻塞, 直到有請求到達
                setSockOpts(client);             // 4. 設置 socket 的連接屬性
                LOG.info("Received connection request " + client.getRemoteSocketAddress());
                receiveConnection(client);
                numRetries = 0;
            }
        } catch (IOException e) {
            LOG.error("Exception while listening", e);
            numRetries++;
            try {
                ss.close();
                Thread.sleep(1000);
            } catch (IOException ie) {
                LOG.error("Error closing server socket", ie);
            } catch (InterruptedException ie) {
                LOG.error("Interrupted while sleeping. " +
                          "Ignoring exception", ie);
            }
        }
    }
    LOG.info("Leaving listener");
    if (!shutdown) {
        LOG.error("As I'm leaving the listener thread, "
                + "I won't be able to participate in leader "
                + "election any longer: "
                + self.quorumPeers.get(self.getId()).electionAddr);
    }
}
7. QuorumCnxManager.Listener.receiveConnection

為防止重復建立連接, 集群中各個節(jié)點之間只允許大的 myid 連接小的 myid, 建立之后會有SendWorker, RecvWorker 來處理消息的接受發(fā)送

/**
 * If this server receives a connection request, then it gives up on the new
 * connection if it wins. Notice that it checks whether it has a connection
 * to this server already or not. If it does, then it sends the smallest
 * possible long value to lose the challenge.
 * 
 */
public boolean receiveConnection(Socket sock) {                 // 1.接收 集群之間各個節(jié)點的相互的連接
    Long sid = null;
    LOG.info("sock:"+sock);
    try {
        // Read server id
        DataInputStream din = new DataInputStream(sock.getInputStream());
        sid = din.readLong();                                   // 2.讀取對應的 myid (這里第一次讀取的可能是一個協議的版本號)
        LOG.info("sid:"+sid);
        if (sid < 0) { // this is not a server id but a protocol version (see ZOOKEEPER-1633)
            sid = din.readLong();
            LOG.info("sid:"+sid);
            // next comes the #bytes in the remainder of the message
            int num_remaining_bytes = din.readInt();            // 3.讀取這整條消息的長度
            byte[] b = new byte[num_remaining_bytes];           // 4.構建要讀取數據長度的字節(jié)數組
            // remove the remainder of the message from din
            int num_read = din.read(b);                         // 5.讀取消息的內容 (疑惑來了, 這里會不會有拆包斷包的情況出現)
            if (num_read != num_remaining_bytes) {              // 6.數據沒有讀滿, 進行日志記錄
                LOG.error("Read only " + num_read + " bytes out of " + num_remaining_bytes + " sent by server " + sid);
            }
        }
        if (sid == QuorumPeer.OBSERVER_ID) {                    // 7.連接過來的是一個觀察者
            /*
             * Choose identifier at random. We need a value to identify
             * the connection.
             */
            
            sid = observerCounter--;
            LOG.info("Setting arbitrary identifier to observer: " + sid);
        }
    } catch (IOException e) {                                   // 8.這里可能會有 EOFException 表示讀取數據到文件尾部, 客戶端已經斷開, 沒什么數據可以讀取了, 所以直接關閉 socket
        closeSocket(sock);
        LOG.warn("Exception reading or writing challenge: " + e.toString() + ", sock:"+sock);
        return false;
    }
    
    //If wins the challenge, then close the new connection.
    if (sid < self.getId()) {                                   // 9.在集群中為了防止重復連接, 只能允許大的 myid 連接小的
        /*
         * This replica might still believe that the connection to sid is
         * up, so we have to shut down the workers before trying to open a
         * new connection.
         */
        SendWorker sw = senderWorkerMap.get(sid);               // 10.看看是否已經有 SendWorker, 有的話就進行關閉
        if (sw != null) {
            sw.finish();
        }

        /*
         * Now we start a new connection
         */
        LOG.debug("Create new connection to server: " + sid);
        closeSocket(sock);                                      // 11.關閉 socket
        connectOne(sid);                                        // 12.因為自己的 myid 比對方的大, 所以進行主動連接

        // Otherwise start worker threads to receive data.
    } else {                                                    // 13.自己的 myid 比對方小
        SendWorker sw = new SendWorker(sock, sid);              // 14.建立 SendWorker
        RecvWorker rw = new RecvWorker(sock, sid, sw);          // 15.建立 RecvWorker
        sw.setRecv(rw); 

        SendWorker vsw = senderWorkerMap.get(sid);              // 16.若以前存在 SendWorker, 則進行關閉
        
        if(vsw != null)
            vsw.finish();
        
        senderWorkerMap.put(sid, sw);
        
        if (!queueSendMap.containsKey(sid)) {                   // 17.若不存在 myid 對應的 消息發(fā)送 queue, 則就構建一個
            queueSendMap.put(sid, new ArrayBlockingQueue<ByteBuffer>(
                    SEND_CAPACITY));
        }
        
        sw.start();                                             // 18.開啟 消息發(fā)送 及 接收的線程
        rw.start();
        
        return true;    
    }
    return false;
}
8. QuorumPeer.run

程序最終一直在QuorumPeer.run里面, 而且狀態(tài)從 LOOKING -> LEADING ->LOOING -> LEADING 一直循環(huán)

@Override
public void run() {
    setName("QuorumPeer" + "[myid=" + getId() + "]" +                    // 1. 設置當前線程的名稱
            cnxnFactory.getLocalAddress());

    LOG.debug("Starting quorum peer");
    try {
        jmxQuorumBean = new QuorumBean(this);
        MBeanRegistry.getInstance().register(jmxQuorumBean, null);       // 2. 在 QuorumPeer 上包裝 QuorumBean 注入到 JMX
        for(QuorumServer s: getView().values()){                         // 3. 遍歷每個 ZooKeeperServer 節(jié)點
            ZKMBeanInfo p;
            if (getId() == s.id) {
                p = jmxLocalPeerBean = new LocalPeerBean(this);
                try {                                                    // 4. 將 LocalPeerBean 注入到 JMX 里面
                    MBeanRegistry.getInstance().register(p, jmxQuorumBean);
                } catch (Exception e) {
                    LOG.warn("Failed to register with JMX", e);
                    jmxLocalPeerBean = null;
                }
            } else {                                                     // 5. 若myid不是本機, 也注入到 JMX 里面
                p = new RemotePeerBean(s);
                try {
                    MBeanRegistry.getInstance().register(p, jmxQuorumBean);
                } catch (Exception e) {
                    LOG.warn("Failed to register with JMX", e);
                }
            }
        }
    } catch (Exception e) {
        LOG.warn("Failed to register with JMX", e);
        jmxQuorumBean = null;
    }

    try {
        /*
         * Main loop
         */
        while (running) {                                               // 6. QuorumPeer 會一直在這個 while 里面 (一般先是 LOOKING, LEADING/FOLLOWING)
            switch (getPeerState()) {
            case LOOKING:                                               // 7. QuorumPeer 是 LOOKING 狀態(tài), 正在尋找 Leader 機器
                LOG.info("LOOKING, and myid is " + myid);

                if (Boolean.getBoolean("readonlymode.enabled")) {       // 8. 判斷啟動服務是否是 readOnly 模式
                    LOG.info("Attempting to start ReadOnlyZooKeeperServer");

                    // Create read-only server but don't start it immediately
                    final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(
                            logFactory, this,
                            new ZooKeeperServer.BasicDataTreeBuilder(),
                            this.zkDb);

                    // Instead of starting roZk immediately, wait some grace
                    // period before we decide we're partitioned.
                    //
                    // Thread is used here because otherwise it would require
                    // changes in each of election strategy classes which is
                    // unnecessary code coupling.
                    Thread roZkMgr = new Thread() {
                        public void run() {
                            try {
                                // lower-bound grace period to 2 secs
                                sleep(Math.max(2000, tickTime));
                                if (ServerState.LOOKING.equals(getPeerState())) {
                                    roZk.startup();
                                }
                            } catch (InterruptedException e) {
                                LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
                            } catch (Exception e) {
                                LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
                            }
                        }
                    };
                    try {
                        roZkMgr.start();                                 // 9.  這里分兩部(a.. QuorumPeer.start().startLeaderElection().createElectionAlgorithm()
                        setBCVote(null);                                 // b. 調用 Election.lookForLeader方法開始選舉, 直至 選舉成功/其中發(fā)生異常
                        setCurrentVote(makeLEStrategy().lookForLeader());// 10. 創(chuàng)建選舉 Leader 德策略)選舉算法, 在這里可能需要消耗一點時間
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e);
                        setPeerState(ServerState.LOOKING);
                    } finally {
                        // If the thread is in the the grace period, interrupt
                        // to come out of waiting.
                        roZkMgr.interrupt();
                        roZk.shutdown();
                    }
                } else {
                    try {                                                // 11. 這里分兩部(a. QuorumPeer.start().startLeaderElection().createElectionAlgorithm()
                        setBCVote(null);                                 // b. 調用 Election.lookForLeader方法開始選舉, 直至 選舉成功/其中發(fā)生異常
                        setCurrentVote(makeLEStrategy().lookForLeader());// 12. 選舉算法, 在這里可能需要消耗一點時間
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception", e);
                        setPeerState(ServerState.LOOKING);
                    }
                }
                break;
            case OBSERVING:
                try {
                    LOG.info("OBSERVING, and myid is " + myid);
                    setObserver(makeObserver(logFactory));
                    observer.observeLeader();
                } catch (Exception e) {
                    LOG.warn("Unexpected exception",e );                        
                } finally {
                    observer.shutdown();
                    setObserver(null);
                    setPeerState(ServerState.LOOKING);
                }
                break;
            case FOLLOWING:
                try {
                    LOG.info("FOLLOWING, and myid is " + myid);         // 13. 最上層還是 QuorumPeer
                    setFollower(makeFollower(logFactory));              // 14. 初始化 follower, 在 Follower 里面引用 FollowerZooKeeperServer
                    follower.followLeader();                            // 15. 帶用 follower.followLeader, 程序會阻塞在這里
                } catch (Exception e) {
                    LOG.warn("Unexpected exception",e);
                } finally {
                    follower.shutdown();
                    setFollower(null);
                    setPeerState(ServerState.LOOKING);
                }
                break;
            case LEADING:
                LOG.info("LEADING, and myid is " + myid);
                try {
                    setLeader(makeLeader(logFactory));                  // 16. 初始化 Leader 對象
                    leader.lead();                                      // 17. Leader 程序會阻塞在這里
                    setLeader(null);
                } catch (Exception e) {
                    LOG.warn("Unexpected exception",e);
                } finally {
                    if (leader != null) {
                        leader.shutdown("Forcing shutdown");
                        setLeader(null);
                    }
                    setPeerState(ServerState.LOOKING);
                }
                break;
            }
        }
    } finally {
        LOG.warn("QuorumPeer main thread exited");
        try {
            MBeanRegistry.getInstance().unregisterAll();
        } catch (Exception e) {
            LOG.warn("Failed to unregister with JMX", e);
        }
        jmxQuorumBean = null;
        jmxLocalPeerBean = null;
    }
}
9. FastLeaderElection.lookForLeader

zookeeper默認使用FastLeaderElection做Leader選舉的算法, 接下來直接看代碼

/**
 * Starts a new round of leader election. Whenever our QuorumPeer
 * changes its state to LOOKING, this method is invoked, and it
 * sends notifications to all other peers.
 */
// 每個 QuorumPeer 啟動時會調用這個方法, 通過這里的調用來完成 Leader 的選舉
public Vote lookForLeader() throws InterruptedException {
    LOG.info("QuorumPeer {" + self  + "} is LOOKING !");
    try {
        self.jmxLeaderElectionBean = new LeaderElectionBean();
        MBeanRegistry.getInstance().register(                               // 1. 將 jmxLeaderElectionBean 注冊入 JMX 里面 (有個注意點在使用 classLader 時,進行熱部署需要 unregister 掉注入到 java核心包里面德 SQL Driver)
                self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
    } catch (Exception e) {
        LOG.warn("Failed to register with JMX", e);
        self.jmxLeaderElectionBean = null;
    }
    if (self.start_fle == 0) {
       self.start_fle = System.currentTimeMillis();
    }
    try {
        HashMap<Long, Vote> recvset = new HashMap<Long, Vote>();                    // 2. 收到的投票信息

        HashMap<Long, Vote> outofelection = new HashMap<Long, Vote>();

        int notTimeout = finalizeWait;

        synchronized(this){
            LOG.info("logicalclock :" + logicalclock);
            logicalclock++;                                               // 3. 獲取對應的 myid, zxid, epoch 值
            updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
        }

        LOG.info("New election. My id =  " + self.getId() + ", proposed zxid=0x " + Long.toHexString(proposedZxid));
        LOG.info("sendNotifications to QuorumPeers ");
        sendNotifications();                                              // 4. 先將進行 Leader 選舉的信息發(fā)送給集群里面德節(jié)點 (包括自己)

        /*
         * Loop in which we exchange notifications until we find a leader
         */

        while ((self.getPeerState() == ServerState.LOOKING) && (!stop)){ // 5. 若 QuorumPeer 還處于 LOOKING 狀態(tài), 則一直運行下面的 loop, 直到 Leader 選舉成功
            /*
             * Remove next notification from queue, times out after 2 times
             * the termination time
             */                                                          // 6. 獲取 投票的信息(這里是 Leader/Follower 發(fā)給自己德投票德信息)
            Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
            LOG.info("Notification:"+n);

            /*
             * Sends more notifications if haven't received enough.
             * Otherwise processes new notification.
             */
            if(n == null){                                               // 7. 這里 n == null, 說明有可能 集群之間的節(jié)點還沒有正真連接上
                if(manager.haveDelivered()){
                    sendNotifications();
                } else {
                    manager.connectAll();                                // 8. 開始連接集群中的各臺機器, 連接成功后都會有對應的 SenderWorker ReceiverWorker 與之對應
                }

                /*
                 * Exponential backoff
                 */
                int tmpTimeOut = notTimeout*2;
                notTimeout = (tmpTimeOut < maxNotificationInterval?
                        tmpTimeOut : maxNotificationInterval);
                LOG.info("Notification time out: " + notTimeout);
            }
            else if(self.getVotingView().containsKey(n.sid)) {           // 9. 處理集群中節(jié)點發(fā)來 Leader 選舉德投票消息收到投票的信息
                /*
                 * Only proceed if the vote comes from a replica in the
                 * voting view.
                 */
                switch (n.state) {
                case LOOKING:                                            // 10.消息的來源方說自己也在找 Leader
                    // If notification > current, replace and send messages out
                    if (n.electionEpoch > logicalclock) {                // 11.若果接收到的 notication 的 epoch(選舉的輪次)大于當前的輪次
                        logicalclock = n.electionEpoch;
                        recvset.clear();                                 // 12.totalOrderPredicate 將收到的 投票信息與 自己的進行比較(比較的次序依次是 epoch, zxid, myid)
                        boolean totalOrderPredicate = totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                        LOG.info("n.leader:" + n.leader + ", n.zxid:"+ n.zxid +", n.peerEpoch:"+n.peerEpoch +", getInitId():"+getInitId() +", getInitLastLoggedZxid():"+getInitLastLoggedZxid() + ", getPeerEpoch():"+getPeerEpoch());
                        LOG.info("totalOrderPredicate:"+totalOrderPredicate);
                        if(totalOrderPredicate) {                        // 13.新收到的 Leader 選舉信息勝出, 覆蓋本地的選舉信息
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                        } else {
                            updateProposal(getInitId(),
                                    getInitLastLoggedZxid(),
                                    getPeerEpoch());
                        }
                        sendNotifications();                             // 14.因為這里 Leader 選舉德信息已經更新了, 所以這里 將 Leader 選舉的消息發(fā)送出去
                    } else if (n.electionEpoch < logicalclock) {         // 15.若果接收到的 notication 的 epoch(選舉的輪次)小于當前的輪次, 則直接丟掉
                        LOG.info("Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                                + Long.toHexString(n.electionEpoch)
                                + ", logicalclock=0x" + Long.toHexString(logicalclock));
                        break;                                           // 16.若接收到德選舉消息德 epoch 與自己的在同一個選舉周期上 totalOrderPredicate 將收到的 投票信息與 自己的進行比較(比較的次序依次是 epoch, zxid, myid)
                    } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                            proposedLeader, proposedZxid, proposedEpoch)) {
                        updateProposal(n.leader, n.zxid, n.peerEpoch);   // 17.接收到的 Leader新收到的 Notification 勝出, 將Notification里面的內容覆蓋本地的選舉信息
                        sendNotifications();                             // 18.因為這里 Leader 選舉德信息已經更新了, 所以這里 將 Leader 選舉的消息發(fā)送出去
                    }                                                    // 19. 將收到的投票信息放入投票的集合 recvset 中, 用來作為最終的 "過半原則" 判斷
                    Vote vote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                    LOG.info("Receive Notification: " + n);
                    LOG.info("Adding vote: " + vote);
                    recvset.put(n.sid, vote);

                                                                         // 20.生成這次Vote 通過 termPredicate判斷這次選舉是否結束
                    Vote selfVote = new Vote(proposedLeader, proposedZxid, logicalclock, proposedEpoch);
                    boolean termPredicate = termPredicate(recvset,selfVote );    // 21.判斷選舉是否結束 (默認的就是過半原則)
                    LOG.info("recvset:"+recvset +", || selfVote: " + selfVote);
                    LOG.info("termPredicate:"+termPredicate);
                    if (termPredicate) {                                         // 22.滿足過半原則, Leader 選舉成功

                        // Verify if there is any change in the proposed leader
                        while((n = recvqueue.poll(finalizeWait,
                                TimeUnit.MILLISECONDS)) != null){                // 23.這時候再從 recvqueue 里面獲取 Notification
                            boolean totalOrderPredicate2 = totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    proposedLeader, proposedZxid, proposedEpoch);// 24.判斷是否需要更新 本機(QuorumPeer) 的投票信息
                            LOG.info("totalOrderPredicate2:"+totalOrderPredicate2);
                            if(totalOrderPredicate2){                            // 25.若還需要更新 Leader 的投票信息
                                recvqueue.put(n);                                // 26.則將對方發(fā)來的 Notification 放入 recvqueue, 重新等待獲取 Notification
                                break;
                            }
                        }

                        /*
                         * This predicate is true once we don't read any new
                         * relevant message from the reception queue
                         */
                        if (n == null) {                                        // 27.若n==null, 說明 Leader 集群中的選舉可以定下來了, 修改狀態(tài)信息 至 Leading
                            self.setPeerState((proposedLeader == self.getId()) ?
                                    ServerState.LEADING: learningState());      // 28.判斷這時確認的 Leader 是否是本機, 若是的話, 則更新本機的state為 LEADING

                            Vote endVote = new Vote(proposedLeader,             // 29.組裝生成這次 Leader 選舉最終的投票的結果
                                                    proposedZxid,
                                                    logicalclock,
                                                    proposedEpoch);
                            leaveInstance(endVote);                             // 30.Leader選舉結束, 清空 recvqueue
                            return endVote;                                     // 31.這時會退回到程序的上層, 進行 follower.followLeader() / leader.lead()
                        }
                    }
                    break;
                case OBSERVING:                                                // 32.角色是 OBSERVING 的 QuorumPeer 不參與 Leader 的選舉
                    LOG.debug("Notification from observer: " + n.sid);
                    break;
                case FOLLOWING:
                case LEADING:
                    /*
                     * Consider all notifications from the same epoch
                     * together.
                     */
                    if(n.electionEpoch == logicalclock){
                        recvset.put(n.sid, new Vote(n.leader,                  // 33.同樣需要將 投票的信息加入到集合里面
                                                      n.zxid,
                                                      n.electionEpoch,
                                                      n.peerEpoch));
                       
                        if(ooePredicate(recvset, outofelection, n)) {          // 34.檢測投票是否結束,  Leader 是否已經去人
                            self.setPeerState((n.leader == self.getId()) ?
                                    ServerState.LEADING: learningState());     // 35.在此處進行更行 QuorumPeer 的狀態(tài)信息 (LEADING / FOLLOWING)

                            Vote endVote = new Vote(n.leader, 
                                    n.zxid, 
                                    n.electionEpoch, 
                                    n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                    }

                    /*
                     * Before joining an established ensemble, verify
                     * a majority is following the same leader.
                     */
                    outofelection.put(n.sid, new Vote(n.version,
                                                        n.leader,
                                                        n.zxid,
                                                        n.electionEpoch,
                                                        n.peerEpoch,
                                                        n.state));
       
                    if(ooePredicate(outofelection, outofelection, n)) {
                        synchronized(this){
                            logicalclock = n.electionEpoch;
                            self.setPeerState((n.leader == self.getId()) ?
                                    ServerState.LEADING: learningState());
                        }
                        Vote endVote = new Vote(n.leader,
                                                n.zxid,
                                                n.electionEpoch,
                                                n.peerEpoch);
                        leaveInstance(endVote);
                        return endVote;
                    }
                    break;
                default:
                    LOG.warn("Notification state unrecognized: {} (n.state), {} (n.sid)",
                            n.state, n.sid);
                    break;
                }
            } else {
                LOG.warn("Ignoring notification from non-cluster member " + n.sid);
            }
        }
        return null;
    } finally {
        try {
            if(self.jmxLeaderElectionBean != null){
                MBeanRegistry.getInstance().unregister(
                        self.jmxLeaderElectionBean);
            }
        } catch (Exception e) {
            LOG.warn("Failed to unregister with JMX", e);
        }
        self.jmxLeaderElectionBean = null;
    }
}

通過 FastLeaderElection 確定 Leader 后, Leader與Follower就分開來處理, 下面分開進行

10. Leader.lead() 第一部分

在Leader端, 則通過lead()來處理與Follower的交互(這里的過程主要涉及 Follower, Learner, LearnerHandler, Leader 之間的交互, 而且有好幾個阻塞的地方)

self.tick = 0;
zk.loadData();                                                      // 2. 從 snapshot, txn log 里面進行恢復 zxid
                                                                                                                                        // 3. 生成 Leader 的狀態(tài)信息
leaderStateSummary = new StateSummary(self.getCurrentEpoch(), zk.getLastProcessedZxid());
LOG.info("leaderStateSummary:" + leaderStateSummary);
// Start thread that waits for connection requests from 
// new followers.
cnxAcceptor = new LearnerCnxAcceptor();                             // 4. LearnerCnxAcceptor 它會監(jiān)聽在對應端口, 一有 follower 連接上, 就開啟一個 LearnerHandler 來處理對應的事件
LOG.info("cnxAcceptor start");
cnxAcceptor.start();

readyToStart = true;                                                // 5. 一開始這個 getAcceptedEpoch 是直接從文件中恢復過來的, 指的是處理過的 Propose
LOG.info("self.getId() :" + self.getId() + ",  self.getAcceptedEpoch():" +  self.getAcceptedEpoch());
                                                                    // 6. 等待足夠多de Follower進來, 代表自己確實是 leader, 此處 lead 線程可能在 while 循環(huán)處等待
                                                                    // 7. 而在對應的 LearnerHandler 里面, 也會收到對應的 FOLLOWERINFO 數據包, 里面包含 acceptEpoch 數據
long epoch = getEpochToPropose(self.getId(), self.getAcceptedEpoch());

在 Leader 端會建立一個 LearnerCnxAcceptor, 用于連接集群中的其他節(jié)點, 建立后會用一個 LearnerHandler 來維護他們之間的交互; 而這時 Leader 會阻塞在 getEpochToPropose里面, 直到有過半 Follower發(fā)來信息, 并且在 LearnerHandler里面調用了 Leader.getEpochToPropose后就解除阻塞

11. Follower 連接 Leader

程序先通過 findLeader找到Leader

/**
 * Returns the address of the node we think is the leader.
 */
// 返回 Leader 的網絡信息
protected InetSocketAddress findLeader() {
    InetSocketAddress addr = null;
    // Find the leader by id
    Vote current = self.getCurrentVote();            // 獲取 QuorumPeer 的投票信息, 里面包含自己Leader選舉所投的信息
    for (QuorumServer s : self.getView().values()) {
        if (s.id == current.getId()) {
            addr = s.addr;                          // 獲取 Leader 的 addr
            break;
        }
    }
    if (addr == null) {
        LOG.warn("Couldn't find the leader with id = "
                + current.getId());
    }
    return addr;
}   

找到 Leader 的地址后就可以和Leader建立連接

/**
 * Establish a connection with the Leader found by findLeader. Retries
 * 5 times before giving up. 
 * @param addr - the address of the Leader to connect to.
 * @throws IOException - if the socket connection fails on the 5th attempt
 * @throws ConnectException
 * @throws InterruptedException
 */
// 連接 leader, 建立成功后, 在 Leader 端會有一個 LearnerHandler 處理與之的通信
protected void connectToLeader(InetSocketAddress addr) throws IOException, ConnectException, InterruptedException {

    sock = new Socket();        
    sock.setSoTimeout(self.tickTime * self.initLimit);          // 1. 這里的 SoTimeout 很重要, 若 InputStream.read 超過這個時間,則會報出 SocketTimeoutException 異常
    for (int tries = 0; tries < 5; tries++) {                   // 2. 連接 Leader 嘗試 5次, 若還是失敗, 則拋出異常, 一直往外拋出, 直到 QuorumPeer 的重新開始選舉 leader run 方法里面 -> 進行選舉 Leader
        try {
            sock.connect(addr, self.tickTime * self.syncLimit); // 3. 連接 leader
            sock.setTcpNoDelay(nodelay);                        // 4. 設置 tcpnoDelay <- 這里其實就是禁止 tcp 底層合并小數據包, 一次發(fā)送所有數據的 算法
            break;
        } catch (IOException e) {
            if (tries == 4) {
                LOG.error("Unexpected exception",e);
                throw e;
            } else {
                LOG.warn("Unexpected exception, tries="+tries+
                        ", connecting to " + addr,e);
                sock = new Socket();
                sock.setSoTimeout(self.tickTime * self.initLimit);
            }
        }
        Thread.sleep(1000);
    }                                                           // 5. 封裝對應的 I/O 數據流
    leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));
    bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
    leaderOs = BinaryOutputArchive.getArchive(bufferedOutput); //  6. 封裝輸出數據流
} 

Follower在與Leader建立連接之后會調用registerWithLeader()方法, 與Leader同步確認 epoch 值

12. Learner.registerWithLeader 第一部分

registerWithLeader(Leader.FOLLOWERINFO) 將Follower的zxid及 myid 等信息封裝好發(fā)送到Leader

LOG.info("registerWithLeader:" + pktType);
/*
 * Send follower info, including last zxid and sid
 */
long lastLoggedZxid = self.getLastLoggedZxid();                     // 獲取 Follower 的最后處理的 zxid
QuorumPacket qp = new QuorumPacket();                
qp.setType(pktType);                                                // 若是 Follower ,則當前的角色是  Leader.FOLLOWERINFO
qp.setZxid(ZxidUtils.makeZxid(self.getAcceptedEpoch(), 0));         // Follower 的 lastZxid 的值

/*
 * Add sid to payload
 */
LearnerInfo li = new LearnerInfo(self.getId(), 0x10000);            // 將 Follower 的信息封裝成 LearnerInfo
LOG.info("li:" + li);

ByteArrayOutputStream bsid = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(bsid);
boa.writeRecord(li, "LearnerInfo");
qp.setData(bsid.toByteArray());                                     // 在 QuorumPacket 里面添加 Follower 的信息
LOG.info("qp:" + qp);

writePacket(qp, true);                                              // 發(fā)送 QuorumPacket 包括 learnerInfo 與 Leader.FOLLOWERINFO, 通過 self.getAcceptedEpoch() 構成的 zxid
12. LearnerHandler.run 第一部分

處理Follower發(fā)過來的Leader.FOLLOWERINFO信息

tickOfNextAckDeadline = leader.self.tick + leader.self.initLimit + leader.self.syncLimit;
LOG.info("tickOfNextAckDeadline : " + tickOfNextAckDeadline);
                                                                            // 1. 構建與 Follower 之間建立的 socket 成數據流
ia = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
oa = BinaryOutputArchive.getArchive(bufferedOutput);
                                                                            // 2. 等待 Follower 發(fā)來數據包
QuorumPacket qp = new QuorumPacket();
long a1 = System.currentTimeMillis();
ia.readRecord(qp, "packet");                                                // 3. 讀取 Follower 發(fā)過來的 FOLLOWERINFO 數據包
LOG.info("System.currentTimeMillis() - a1 : " + (System.currentTimeMillis() - a1));
LOG.info("qp:" + qp);
                                                                            // 4. 不應該有這種數據的存在
if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO){
    LOG.error("First packet " + qp.toString() + " is not FOLLOWERINFO or OBSERVERINFO!");
    return;
}
byte learnerInfoData[] = qp.getData();                                      // 5. 讀取參與者發(fā)來的數據

LOG.info("learnerInfoData :" + Arrays.toString(learnerInfoData));           // 6. 這里的 learnerInfo 就是 Follower/Observer 的信息
if (learnerInfoData != null) {
    if (learnerInfoData.length == 8) {
        ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
        this.sid = bbsid.getLong();
    } else {
        LearnerInfo li = new LearnerInfo();                                 // 7. 反序列化出 LearnerInfo
        ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(learnerInfoData), li);
        LOG.info("li :" + li);
        this.sid = li.getServerid();                                        // 8. 取出 Follower 的 myid
        this.version = li.getProtocolVersion();                             // 9. 通訊的協議
    }
} else {
    this.sid = leader.followerCounter.getAndDecrement();
}

LOG.info("Follower sid: " + sid + " : info : " + leader.self.quorumPeers.get(sid));
            
if (qp.getType() == Leader.OBSERVERINFO) {
      learnerType = LearnerType.OBSERVER;
}            

long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());          // 10. 通過 zxid 來獲取 Follower 的 Leader 選舉的 epoch

LOG.info("qp : " + qp + ", lastAcceptedEpoch : " + lastAcceptedEpoch);

long peerLastZxid;
StateSummary ss = null;
long zxid = qp.getZxid();
long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch); // 11. 將 Follower 的 Leader 選舉的 epoch  信息加入到 connectingFollowers 里面, 判斷 集群中過半的Leader參與者了 getEpochToPropose

程序最后調用了leader.getEpochToPropose(), 當集群中有過半的節(jié)點發(fā)來后, 會在這里解除阻塞
在解除阻塞之后, Leader會向Follower發(fā)送LeaderLEADERINFO的信息

byte ver[] = new byte[4];
ByteBuffer.wrap(ver).putInt(0x10000);                                   // 14. 構建出 描述Leader信息的數據包
QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);
LOG.info("newEpochPacket:" + newEpochPacket);
oa.writeRecord(newEpochPacket, "packet");                               // 15. 將 Leader 的信息發(fā)送給對應的 Follower / Observer
bufferedOutput.flush();

而此時Follower在接受到Leader.LEADERINFO信息之后會回復 Leader.ACKEPOCH 信息

// we are connected to a 1.0 server so accept the new epoch and read the next packet
leaderProtocolVersion = ByteBuffer.wrap(qp.getData()).getInt();
LOG.info("leaderProtocolVersion:" + leaderProtocolVersion);
byte epochBytes[] = new byte[4];
final ByteBuffer wrappedEpochBytes = ByteBuffer.wrap(epochBytes);

LOG.info("newEpoch:" + newEpoch + ", self.getAcceptedEpoch():" + self.getAcceptedEpoch());
if (newEpoch > self.getAcceptedEpoch()) {                       // 若 Follower 的 election 的 epoch 值小于自己, 則用 Leader 的
    wrappedEpochBytes.putInt((int)self.getCurrentEpoch());
    self.setAcceptedEpoch(newEpoch);
} else if (newEpoch == self.getAcceptedEpoch()) {
    // since we have already acked an epoch equal to the leaders, we cannot ack
    // again, but we still need to send our lastZxid to the leader so that we can
    // sync with it if it does assume leadership of the epoch.
    // the -1 indicates that this reply should not count as an ack for the new epoch
    wrappedEpochBytes.putInt(-1);
} else {                                                         // 若 Follower.epoch > Leader.epoch 則說明前面的 Leader 選舉出錯了
    throw new IOException("Leaders epoch, " + newEpoch + " is less than accepted epoch, " + self.getAcceptedEpoch());
}                                                                // 在 接收到 Leader.LEADERINFO 的消息后, 進行回復 Leader.ACKEPOCH 的消息, 并且加上 lastLargestZxid 值
QuorumPacket ackNewEpoch = new QuorumPacket(Leader.ACKEPOCH, lastLoggedZxid, epochBytes, null);

LOG.info("ackNewEpoch:" + ackNewEpoch);
writePacket(ackNewEpoch, true);                                  // 將 ACKEPOCH 信息發(fā)送給對方 用于回復Leader發(fā)過來的LEADERINFO
return ZxidUtils.makeZxid(newEpoch, 0);

接著就是LearnerHandler對Leader.ACKEPOCH的處理了

byte ver[] = new byte[4];
ByteBuffer.wrap(ver).putInt(0x10000);                                   // 14. 構建出 描述Leader信息的數據包
QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);
LOG.info("newEpochPacket:" + newEpochPacket);
oa.writeRecord(newEpochPacket, "packet");                               // 15. 將 Leader 的信息發(fā)送給對應的 Follower / Observer
bufferedOutput.flush();
QuorumPacket ackEpochPacket = new QuorumPacket();
ia.readRecord(ackEpochPacket, "packet");                                // 16. Leader 讀取 Follower 發(fā)來的 ACKEPOCH 信息

LOG.info("ackEpochPacket:" +ackEpochPacket);                            // 17. 剛剛發(fā)送了 leader 的信息, 現在獲取一下確認的 ack

if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
    LOG.error(ackEpochPacket.toString()
            + " is not ACKEPOCH");
    return;
}
ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
LOG.info("ss : " + ss);
leader.waitForEpochAck(this.getSid(), ss);                              // 18. 在這邊等待所有的 Follower 都回復 ACKEPOCH 值 (這里也是滿足過半就可以)

接下來就是 Leader 與 Follower之間的數據同步

13. Learner同步數據至Follower

這里面將涉及下面幾個概念

1. committedLog 里面保存著Leader 端處理的最新的500個Proposal
2. 當 Follower處理的Proposal大于 maxCommittedLog, 則 Follower 要TRUNC自己的Proposal至maxCommittedLog
3. 當 Follower處理的Proposal小于 maxCommittedLog, 大于minCommittedLog, 則Leader將Follower沒有的Proposal發(fā)送到Follower, 讓其處理
4. 當 Follower處理的Proposal小于 minCommittedLog, 則Leader發(fā)送 Leader.SNAP給FOLLOWER, 并且將自身的數據序列化成數據流, 發(fā)送給 Follower

下面直接看代碼

/* we are sending the diff check if we have proposals in memory to be able to 
 * send a diff to the 
 */ 
ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock();
ReadLock rl = lock.readLock();
try {
    rl.lock();                                                             // 20. Leader上將 最近已經提交的 Request 緩存到 ZKDatabase.committedLog里面(這個操作在 FinalRequestProcessor.processRequest 里面操作)  事務的 zxid 會 minCommittedLog -> maxCommittedLog 之間的事務
    final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
    final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();

    LOG.info("sid:" + sid + ", maxCommittedLog:" + Long.toHexString(maxCommittedLog)
                            + ", minCommittedLog:" +Long.toHexString(minCommittedLog)
                            + " peerLastZxid=0x"+Long.toHexString(peerLastZxid)
    );


    /**
     * http://www.itdecent.cn/p/4cc1040b6a14
     * 獲取 Leader 已經提交的 Request 數據
     * 1) 若 lastzxid 在 min 和 max 之間
     *      循環(huán) proposals
     *      a) 當單個 proposal 的zxid <= 當前的 peerLastZxid 時, 說明已經提交過了, 直接跳過
     *      b) 當 proposal 的zxid 大于 peerLastZxid 時, 則刪除小于 peerLastZxid部分, 因為已經提交過了, 剩余部分繼續(xù)做 commit 操作,
     *          因此在所有 commit 之前, 先發(fā)送一個 trunc 事件, 刪除已經提交過的部分, 然后發(fā)送需要的 commit 的相關節(jié)點
     * 2) 如果當前的 peerLastZxid 大于 max, 則全部做 TRUNC
     * 3) 剩下的不處理, 可能是新加入的節(jié)點, 所以事件類型為 SNAP, 同步數據時直接取快照
     */

    LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();           // 21. 獲取Leader 上最近提交的Request, 查看是否還有需要的投票
    LOG.info("proposals:"+proposals);
    if (proposals.size() != 0) {                                                            // 22. proposals 里面存儲的是已經提交的  Proposal
        LOG.debug("proposal size is {}", proposals.size());

        if ((maxCommittedLog >= peerLastZxid) && (minCommittedLog <= peerLastZxid)) {       // 23. 若這個 If 條件成立, 說明 Follower 與 Leader 之間有少于 500 條數據未同步
            LOG.info("sid:" + sid + ", maxCommittedLog:" + Long.toHexString(maxCommittedLog)
                    + ", minCommittedLog:" +Long.toHexString(minCommittedLog)
                    + " peerLastZxid=0x"+Long.toHexString(peerLastZxid)
            );
            LOG.debug("Sending proposals to follower");

            // as we look through proposals, this variable keeps track of previous
            // proposal Id.
            long prevProposalZxid = minCommittedLog;

            // Keep track of whether we are about to send the first packet.
            // Before sending the first packet, we have to tell the learner
            // whether to expect a trunc or a diff
            boolean firstPacket=true;

            // If we are here, we can use committedLog to sync with
            // follower. Then we only need to decide whether to
            // send trunc or not
            packetToSend = Leader.DIFF;
            zxidToSend = maxCommittedLog;

            for (Proposal propose: proposals) {
                // skip the proposals the peer already has                                 // 24. 這個 Propose 已經處理過了, continue
                if (propose.packet.getZxid() <= peerLastZxid) {
                    prevProposalZxid = propose.packet.getZxid();                           // 25. 若 follower 已經處理過, 則更新 prevProposalZxid, 輪詢下個 Proposal
                    continue;
                } else {
                    // If we are sending the first packet, figure out whether to trunc
                    // in case the follower has some proposals that the leader doesn't
                    if (firstPacket) {                                                     // 26. 在發(fā)起 Proposal 之前一定要確認 是否 follower 比 Leader 超前處理 Proposal
                        firstPacket = false;
                        // Does the peer have some proposals that the leader hasn't seen yet
                        if (prevProposalZxid < peerLastZxid) {                             // 27. follower 的處理事務處理比 leader 多, 也就是說prevProposalZxid這時就是maxCommittedLog,   則發(fā)送 TRUC 進行 Proposal 數據同步
                            // send a trunc message before sending the diff
                            packetToSend = Leader.TRUNC;                                        
                            zxidToSend = prevProposalZxid;
                            updates = zxidToSend;
                        }
                    }
                    queuePacket(propose.packet);                                           // 28. 將 事務發(fā)送到 發(fā)送隊列里面
                    QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
                            null, null);
                    queuePacket(qcommit);                                                  // 29. 緊接著發(fā)送一個 commit, 讓 Follower 來進行提交 request
                }
            }
        } else if (peerLastZxid > maxCommittedLog) {                                       // 30. follower 的處理事務處理比 leader 多, 則發(fā)送 TRUC 進行 Proposal 數據同步
            LOG.debug("Sending TRUNC to follower zxidToSend=0x{} updates=0x{}",
                    Long.toHexString(maxCommittedLog),
                    Long.toHexString(updates));

            LOG.info("sid:" + sid + ", maxCommittedLog:" + Long.toHexString(maxCommittedLog)
                    + ", minCommittedLog:" +Long.toHexString(minCommittedLog)
                    + " peerLastZxid=0x"+Long.toHexString(peerLastZxid)
                    + ", updates : " + Long.toHexString(updates)
            );

            packetToSend = Leader.TRUNC;                                                   // 31. 發(fā)送 TRUNC, Follower 將刪除比 Leader 多的 Request
            zxidToSend = maxCommittedLog;                                                  // 32. 這里的 maxCommittedLog 是 Leader 處理過的最大 Request的 zxid
            updates = zxidToSend;
        } else {
            LOG.warn("Unhandled proposal scenario");
        }                                                                                  // 33. 若 Follower 與 Leader 的 lastZxid 相同, 則 發(fā)送 DIFF
    } else if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
        // The leader may recently take a snapshot, so the committedLog
        // is empty. We don't need to send snapshot if the follow
        // is already sync with in-memory db.
        LOG.info("committedLog is empty but leader and follower "
                        + "are in sync, zxid=0x{}",
                Long.toHexString(peerLastZxid));

        LOG.info("sid:" + sid + ", maxCommittedLog:" + Long.toHexString(maxCommittedLog)
                + ", minCommittedLog:" +Long.toHexString(minCommittedLog)
                + " peerLastZxid=0x"+Long.toHexString(peerLastZxid)
        );

        packetToSend = Leader.DIFF;
        zxidToSend = peerLastZxid;
    } else {
        // just let the state transfer happen
        LOG.debug("proposals is empty");
    }               


    LOG.info("Sending " + Leader.getPacketType(packetToSend));
    leaderLastZxid = leader.startForwarding(this, updates);                               // 34. leader 將沒有持久化但已經過半 ACK 確認過了的Proposal發(fā)給 Learner (這里就是細節(jié))
    LOG.info("leaderLastZxid : " + leaderLastZxid);

} finally {
    rl.unlock();
} 

對于消息的發(fā)送 Leader 最后發(fā)送一個 Leader.NEWLEADER

 QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER, ZxidUtils.makeZxid(newEpoch, 0), null, null);
 LOG.info("newLeaderQP:" + newLeaderQP);
 if (getVersion() < 0x10000) {
    oa.writeRecord(newLeaderQP, "packet");
} else {
    queuedPackets.add(newLeaderQP);                                                       // 36. 將 Leader.NEWLEADER 的數據包加入到發(fā)送隊列(注意此時還沒有啟動發(fā)送隊列的線程)
}
bufferedOutput.flush();
14. Follower處理與Leader之間的數據同步
synchronized (zk) {
    if (qp.getType() == Leader.DIFF) {                              // DIFF 數據包(DIFF數據包顯示集群中兩個節(jié)點的 lastZxid 相同)
        LOG.info("Getting a diff from the leader 0x" + Long.toHexString(qp.getZxid()));                
    }
    else if (qp.getType() == Leader.SNAP) {                         // 收到的信息是 snap, 則從 leader 復制一份 鏡像數據到本地(Leader比Follower處理的Proposal多至少500個)
        LOG.info("Getting a snapshot from leader");
        // The leader is going to dump the database
        // clear our own database and read
        zk.getZKDatabase().clear();
        zk.getZKDatabase().deserializeSnapshot(leaderIs);           // 從 InputStream 里面 反序列化出 DataTree
        String signature = leaderIs.readString("signature");        // 看了一個 讀取 tag "signature" 代表的一個 String 對象
        if (!signature.equals("BenWasHere")) {
            LOG.error("Missing signature. Got " + signature);
            throw new IOException("Missing signature");                   
        }
    } else if (qp.getType() == Leader.TRUNC) {                     // 回滾到對應的事務到 qp.getZxid()(Follower處理的事務比Leader多)
        //we need to truncate the log to the lastzxid of the leader
        LOG.warn("Truncating log to get in sync with the leader 0x" + Long.toHexString(qp.getZxid()));
        boolean truncated=zk.getZKDatabase().truncateLog(qp.getZxid());
        LOG.info("truncated:" + truncated + ", qp.getZxid():" + qp.getZxid());
        if (!truncated) {
            // not able to truncate the log
            LOG.error("Not able to truncate the log "
                    + Long.toHexString(qp.getZxid()));
            System.exit(13);
        }

    }
    else {
        LOG.error("Got unexpected packet from leader "
                + qp.getType() + " exiting ... " );
        System.exit(13);

    }
    zk.getZKDatabase().setlastProcessedZxid(qp.getZxid());          // 因為這里的 ZKDatatree 是從 Leader 的 SnapShot 的 InputStream 里面獲取的, 所以調用這里通過 set 進行賦值
    zk.createSessionTracker();                                      // Learner 創(chuàng)建對應的 SessionTracker (Follower/Observer)(LearnerSessionTracker)
    
    long lastQueued = 0;

    // in V1.0 we take a snapshot when we get the NEWLEADER message, but in pre V1.0
    // we take the snapshot at the UPDATE, since V1.0 also gets the UPDATE (after the NEWLEADER)
    // we need to make sure that we don't take the snapshot twice.
    boolean snapshotTaken = false;
    // we are now going to start getting transactions to apply followed by an UPTODATE
    outerLoop:
    while (self.isRunning()) {                                     // 這里的 self.isRunning() 默認就是 true
        readPacket(qp);

        LOG.info("qp:" + qp);

        switch(qp.getType()) {
        case Leader.PROPOSAL:                                     // 將投票信息加入到 待處理列表
            PacketInFlight pif = new PacketInFlight();
            pif.hdr = new TxnHeader();
            pif.rec = SerializeUtils.deserializeTxn(qp.getData(), pif.hdr);         // 反序列化對應的 請求事務體
            LOG.info("pif:" + pif);
            if (pif.hdr.getZxid() != lastQueued + 1) {
            LOG.warn("Got zxid 0x"
                    + Long.toHexString(pif.hdr.getZxid())
                    + " expected 0x"
                    + Long.toHexString(lastQueued + 1));
            }
            lastQueued = pif.hdr.getZxid();
            packetsNotCommitted.add(pif);
            break;
        case Leader.COMMIT:                                        // commit 則將事務提交給 Server 處理
            LOG.info("snapshotTaken :" + snapshotTaken);
            if (!snapshotTaken) {
                pif = packetsNotCommitted.peekFirst();
                if (pif.hdr.getZxid() != qp.getZxid()) {
                    LOG.warn("Committing " + qp.getZxid() + ", but next proposal is " + pif.hdr.getZxid());
                } else {
                    zk.processTxn(pif.hdr, pif.rec);               // 處理對應的事件
                    packetsNotCommitted.remove();
                }
            } else {
                packetsCommitted.add(qp.getZxid());
            }
            break;
        case Leader.INFORM:                                                         // 這個 INFORM 只有Observer 才會處理
            /*
             * Only observer get this type of packet. We treat this
             * as receiving PROPOSAL and COMMMIT.
             */
            PacketInFlight packet = new PacketInFlight();
            packet.hdr = new TxnHeader();
            packet.rec = SerializeUtils.deserializeTxn(qp.getData(), packet.hdr);
            LOG.info("packet:" + packet);
            // Log warning message if txn comes out-of-order
            if (packet.hdr.getZxid() != lastQueued + 1) {
                LOG.warn("Got zxid 0x"
                        + Long.toHexString(packet.hdr.getZxid())
                        + " expected 0x"
                        + Long.toHexString(lastQueued + 1));
            }
            lastQueued = packet.hdr.getZxid();
            LOG.info("snapshotTaken : " + snapshotTaken);
            if (!snapshotTaken) {
                // Apply to db directly if we haven't taken the snapshot
                zk.processTxn(packet.hdr, packet.rec);
            } else {
                packetsNotCommitted.add(packet);
                packetsCommitted.add(qp.getZxid());
            }
            break;
        case Leader.UPTODATE:                                               // UPTODATE 數據包, 說明同步數據成功, 退出循環(huán)
            LOG.info("snapshotTaken : " + snapshotTaken + ", newEpoch:" + newEpoch);
            if (!snapshotTaken) { // true for the pre v1.0 case
                zk.takeSnapshot();
                self.setCurrentEpoch(newEpoch);
            }
            self.cnxnFactory.setZooKeeperServer(zk);                
            break outerLoop;                                                // 獲取 UPTODATE 后 退出 while loop
        case Leader.NEWLEADER: // it will be NEWLEADER in v1.0              // 說明之前殘留的投票已經處理完, 則將內存中的數據寫入文件, 并發(fā)送 ACK 包
            LOG.info("newEpoch:" + newEpoch);
            // Create updatingEpoch file and remove it after current
            // epoch is set. QuorumPeer.loadDataBase() uses this file to
            // detect the case where the server was terminated after
            // taking a snapshot but before setting the current epoch.
            File updating = new File(self.getTxnFactory().getSnapDir(),
                                QuorumPeer.UPDATING_EPOCH_FILENAME);
            if (!updating.exists() && !updating.createNewFile()) {
                throw new IOException("Failed to create " +
                                      updating.toString());
            }
            zk.takeSnapshot();
            self.setCurrentEpoch(newEpoch);
            if (!updating.delete()) {
                throw new IOException("Failed to delete " +
                                      updating.toString());
            }
            snapshotTaken = true;
            writePacket(new QuorumPacket(Leader.ACK, newLeaderZxid, null, null), true);
            break;
        }
    }
}

接著Follower將發(fā)送NEWLEADER對應的ACK信息, 并且處理數據同步時Leader發(fā)送過來的Proposal消息, 緊接著Followerjiuzai 一個while循環(huán)里面一直讀取數據包并處理數據包; 與之對應的是LearnerHandler, LearnerHandler最后就在 while loop 里面一直處理與Follower之間的消息

15. 總結

整個Leader/Follower啟動, Leader選舉, Leader與Follower之間數據的同步涉及好幾個步驟, 細節(jié)點比較多, 還好總體線路比較清晰, 若想了解的話, 可以先看一下片頭的那個流程圖!

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容