[Zookeeper] Zookeeper 請求流程2 (LearnerHandler部分)

1.關于Leader的minCommittedLog和maxCommittedLog

相關屬性是定義在類ZKDatabase中

    protected long minCommittedLog, maxCommittedLog;
    public static final int commitLogCount = 500;
    protected static int commitLogBuffer = 700;
    protected LinkedList<Proposal> committedLog = new LinkedList<Proposal>();

該committedLog列表主要作用是為了leader和follower之間快速完成數(shù)據(jù)同步用的緩存。

方法addCommittedProposal

    public void addCommittedProposal(Request request) {
        WriteLock wl = logLock.writeLock();
        try {
            wl.lock();
            // 當前列表已經(jīng)大于緩存大小(默認值為500)
            if (committedLog.size() > commitLogCount) {
                // 大于的話,就將列表頭部第一個commit log刪除
                committedLog.removeFirst();
                // minCommittedLog,就是committedLog列表中第一個元素
                minCommittedLog = committedLog.getFirst().packet.getZxid();
            }
            if (committedLog.size() == 0) {
                minCommittedLog = request.zxid;
                maxCommittedLog = request.zxid;
            }

            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
            try {
                request.hdr.serialize(boa, "hdr");
                if (request.txn != null) {
                    request.txn.serialize(boa, "txn");
                }
                baos.close();
            } catch (IOException e) {
                LOG.error("This really should be impossible", e);
            }
            QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid,
                    baos.toByteArray(), null);
            Proposal p = new Proposal();
            p.packet = pp;
            p.request = request;
            committedLog.add(p);
            // maxCommittedLog,就是指向committedLog列表的最后一個元素
            maxCommittedLog = p.packet.getZxid();
        } finally {
            wl.unlock();
        }
    }

每當有新提交的議案,都會調(diào)用addCommittedProposal添加到committedLog列表中。

2 Leader的lead流程

QuorumPeer中,如果是LEADING,

 setLeader(makeLeader(logFactory));
 leader.lead();
 setLeader(null);

2.1 loadData

在lead()方法中,其中比較重要的是zk.loadData(),即調(diào)用LeaderZooKeeperServer的loadData(),該方法就是為了重置sessions和data

當一個新的Leader選舉出來,開始執(zhí)行l(wèi)ead()方法,就會調(diào)用loadData()方法。Server的事務數(shù)據(jù)庫,其實在在運行領導者選舉之前進行了初始化,以便服務器可以為其初始投票選擇其zxid。該zxid通過QuorumPeer#getLastLoggedZxid獲取。

基于上,我們不需要再次對其進行初始化,并且避免了再次加載它的麻煩。 不重新加載它對于承載大型數(shù)據(jù)庫的應用程序尤為重要。

loadData()中,

        // 檢查zkDb是否已經(jīng)被初始化
        if(zkDb.isInitialized()){
            setZxid(zkDb.getDataTreeLastProcessedZxid());
        }
        else {
            setZxid(zkDb.loadDataBase());
        }

loadDataBase方法:

將磁盤上的數(shù)據(jù)庫加載到內(nèi)存中,并將事務添加到內(nèi)存中的commitlog中。

調(diào)用FileTxnSnapLog類型的snapLog的restore方法

    public long loadDataBase() throws IOException {
        long zxid = snapLog.restore(dataTree, sessionsWithTimeouts, commitProposalPlaybackListener);
        initialized = true;
        return zxid;
    }

可以看見restore的第三個參數(shù)commitProposalPlaybackListener,這里面就會調(diào)用addCommittedProposal()

2.2 啟動LearnerCnxAcceptor線程,接收Followers的請求

LearnerCnxAcceptor:這是Leader的一個內(nèi)部類,Leader就是通過它來與Learner建立連接

LearnerCnxAcceptor中的run方法:

  1. 在創(chuàng)建Leader實例的時候會初始一個ServerSocket用來與Follower以及Observer通信,端口是quorumAddress即投票端口
  2. 等待Learner的連接
  3. 如果有Learner與Leader建立連接成功,設置socket超時時間為initLimit*tickTime,然后通過leader.nodelay這個設置來判斷是否開啟Nagle算法,默認是開啟的。
  4. 創(chuàng)建一個LearnerHandler實例來處理與Learner的消息并且交互它

LearnerHandler的run方法:

    public void run() {
        try {
            leader.addLearnerHandler(this);
            tickOfNextAckDeadline = leader.self.tick.get()
                    + leader.self.initLimit + leader.self.syncLimit;

            ia = BinaryInputArchive.getArchive(bufferedInput);
            bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
            oa = BinaryOutputArchive.getArchive(bufferedOutput);

            QuorumPacket qp = new QuorumPacket();
            // 讀取消息
            ia.readRecord(qp, "packet");
            // 如果發(fā)送的類型,不是FOLLOWERINFO或者OBSERVERINFO,報錯
            if(qp.getType() != Leader.FOLLOWERINFO && qp.getType() != Leader.OBSERVERINFO){
                LOG.error("First packet " + qp.toString()
                        + " is not FOLLOWERINFO or OBSERVERINFO!");
                return;
            }
            byte learnerInfoData[] = qp.getData();
            if (learnerInfoData != null) {
                if (learnerInfoData.length == 8) {
                    ByteBuffer bbsid = ByteBuffer.wrap(learnerInfoData);
                    this.sid = bbsid.getLong();
                } else {
                    LearnerInfo li = new LearnerInfo();
                    ByteBufferInputStream.byteBuffer2Record(ByteBuffer.wrap(learnerInfoData), li);
                    this.sid = li.getServerid();
                    this.version = li.getProtocolVersion();
                }
            } else {
                this.sid = leader.followerCounter.getAndDecrement();
            }

            LOG.info("Follower sid: " + sid + " : info : "
                    + leader.self.quorumPeers.get(sid));
                        
            if (qp.getType() == Leader.OBSERVERINFO) {
                  learnerType = LearnerType.OBSERVER;
            }            
            
            long lastAcceptedEpoch = ZxidUtils.getEpochFromZxid(qp.getZxid());
            
            long peerLastZxid;
            StateSummary ss = null;
            long zxid = qp.getZxid();
            long newEpoch = leader.getEpochToPropose(this.getSid(), lastAcceptedEpoch);
            
            if (this.getVersion() < 0x10000) {
                // we are going to have to extrapolate the epoch information
                long epoch = ZxidUtils.getEpochFromZxid(zxid);
                ss = new StateSummary(epoch, zxid);
                // fake the message
                leader.waitForEpochAck(this.getSid(), ss);
            } else {
                byte ver[] = new byte[4];
                ByteBuffer.wrap(ver).putInt(0x10000);
                QuorumPacket newEpochPacket = new QuorumPacket(Leader.LEADERINFO, ZxidUtils.makeZxid(newEpoch, 0), ver, null);
                oa.writeRecord(newEpochPacket, "packet");
                bufferedOutput.flush();
                QuorumPacket ackEpochPacket = new QuorumPacket();
                ia.readRecord(ackEpochPacket, "packet");
                if (ackEpochPacket.getType() != Leader.ACKEPOCH) {
                    LOG.error(ackEpochPacket.toString()
                            + " is not ACKEPOCH");
                    return;
                }
                ByteBuffer bbepoch = ByteBuffer.wrap(ackEpochPacket.getData());
                ss = new StateSummary(bbepoch.getInt(), ackEpochPacket.getZxid());
                leader.waitForEpochAck(this.getSid(), ss);
            }
            peerLastZxid = ss.getLastZxid();
            
            /* the default to send to the follower */
            int packetToSend = Leader.SNAP;
            long zxidToSend = 0;
            long leaderLastZxid = 0;
            /** the packets that the follower needs to get updates from **/
            long updates = peerLastZxid;
            
            /* we are sending the diff check if we have proposals in memory to be able to 
             * send a diff to the 
             */ 
            ReentrantReadWriteLock lock = leader.zk.getZKDatabase().getLogLock();
            ReadLock rl = lock.readLock();
            try {
                rl.lock();        
                final long maxCommittedLog = leader.zk.getZKDatabase().getmaxCommittedLog();
                final long minCommittedLog = leader.zk.getZKDatabase().getminCommittedLog();
                LOG.info("Synchronizing with Follower sid: " + sid
                        +" maxCommittedLog=0x"+Long.toHexString(maxCommittedLog)
                        +" minCommittedLog=0x"+Long.toHexString(minCommittedLog)
                        +" peerLastZxid=0x"+Long.toHexString(peerLastZxid));

                LinkedList<Proposal> proposals = leader.zk.getZKDatabase().getCommittedLog();

                if (peerLastZxid == leader.zk.getZKDatabase().getDataTreeLastProcessedZxid()) {
                    // Follower is already sync with us, send empty diff
                    LOG.info("leader and follower are in sync, zxid=0x{}",
                            Long.toHexString(peerLastZxid));
                    packetToSend = Leader.DIFF;
                    zxidToSend = peerLastZxid;
                } else if (proposals.size() != 0) {
                    LOG.debug("proposal size is {}", proposals.size());
                    if ((maxCommittedLog >= peerLastZxid)
                            && (minCommittedLog <= peerLastZxid)) {
                        LOG.debug("Sending proposals to follower");

                        // as we look through proposals, this variable keeps track of previous
                        // proposal Id.
                        long prevProposalZxid = minCommittedLog;

                        // Keep track of whether we are about to send the first packet.
                        // Before sending the first packet, we have to tell the learner
                        // whether to expect a trunc or a diff
                        boolean firstPacket=true;

                        // If we are here, we can use committedLog to sync with
                        // follower. Then we only need to decide whether to
                        // send trunc or not
                        packetToSend = Leader.DIFF;
                        zxidToSend = maxCommittedLog;

                        for (Proposal propose: proposals) {
                            // skip the proposals the peer already has
                            if (propose.packet.getZxid() <= peerLastZxid) {
                                prevProposalZxid = propose.packet.getZxid();
                                continue;
                            } else {
                                // If we are sending the first packet, figure out whether to trunc
                                // in case the follower has some proposals that the leader doesn't
                                if (firstPacket) {
                                    firstPacket = false;
                                    // Does the peer have some proposals that the leader hasn't seen yet
                                    if (prevProposalZxid < peerLastZxid) {
                                        // send a trunc message before sending the diff
                                        packetToSend = Leader.TRUNC;                                        
                                        zxidToSend = prevProposalZxid;
                                        updates = zxidToSend;
                                    }
                                }
                                queuePacket(propose.packet);
                                QuorumPacket qcommit = new QuorumPacket(Leader.COMMIT, propose.packet.getZxid(),
                                        null, null);
                                queuePacket(qcommit);
                            }
                        }
                    } else if (peerLastZxid > maxCommittedLog) {
                        LOG.debug("Sending TRUNC to follower zxidToSend=0x{} updates=0x{}",
                                Long.toHexString(maxCommittedLog),
                                Long.toHexString(updates));

                        packetToSend = Leader.TRUNC;
                        zxidToSend = maxCommittedLog;
                        updates = zxidToSend;
                    } else {
                        LOG.warn("Unhandled proposal scenario");
                    }
                } else {
                    // just let the state transfer happen
                    LOG.debug("proposals is empty");
                }               

                LOG.info("Sending " + Leader.getPacketType(packetToSend));
                leaderLastZxid = leader.startForwarding(this, updates);

            } finally {
                rl.unlock();
            }

             QuorumPacket newLeaderQP = new QuorumPacket(Leader.NEWLEADER,
                    ZxidUtils.makeZxid(newEpoch, 0), null, null);
             if (getVersion() < 0x10000) {
                oa.writeRecord(newLeaderQP, "packet");
            } else {
                queuedPackets.add(newLeaderQP);
            }
            bufferedOutput.flush();
            //Need to set the zxidToSend to the latest zxid
            if (packetToSend == Leader.SNAP) {
                zxidToSend = leader.zk.getZKDatabase().getDataTreeLastProcessedZxid();
            }
            oa.writeRecord(new QuorumPacket(packetToSend, zxidToSend, null, null), "packet");
            bufferedOutput.flush();
            
            /* if we are not truncating or sending a diff just send a snapshot */
            if (packetToSend == Leader.SNAP) {
                LOG.info("Sending snapshot last zxid of peer is 0x"
                        + Long.toHexString(peerLastZxid) + " " 
                        + " zxid of leader is 0x"
                        + Long.toHexString(leaderLastZxid)
                        + "sent zxid of db as 0x" 
                        + Long.toHexString(zxidToSend));
                // Dump data to peer
                leader.zk.getZKDatabase().serializeSnapshot(oa);
                oa.writeString("BenWasHere", "signature");
            }
            bufferedOutput.flush();
            
            // Start sending packets
            new Thread() {
                public void run() {
                    Thread.currentThread().setName(
                            "Sender-" + sock.getRemoteSocketAddress());
                    try {
                        sendPackets();
                    } catch (InterruptedException e) {
                        LOG.warn("Unexpected interruption",e);
                    }
                }
            }.start();
            
            /*
             * Have to wait for the first ACK, wait until 
             * the leader is ready, and only then we can
             * start processing messages.
             */
            qp = new QuorumPacket();
            ia.readRecord(qp, "packet");
            if(qp.getType() != Leader.ACK){
                LOG.error("Next packet was supposed to be an ACK");
                return;
            }
            LOG.info("Received NEWLEADER-ACK message from " + getSid());
            leader.waitForNewLeaderAck(getSid(), qp.getZxid(), getLearnerType());

            syncLimitCheck.start();
            
            // now that the ack has been processed expect the syncLimit
            sock.setSoTimeout(leader.self.tickTime * leader.self.syncLimit);

            /*
             * Wait until leader starts up
             */
            synchronized(leader.zk){
                while(!leader.zk.isRunning() && !this.isInterrupted()){
                    leader.zk.wait(20);
                }
            }
            // Mutation packets will be queued during the serialize,
            // so we need to mark when the peer can actually start
            // using the data
            //
            queuedPackets.add(new QuorumPacket(Leader.UPTODATE, -1, null, null));

            while (true) {
                qp = new QuorumPacket();
                ia.readRecord(qp, "packet");

                long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
                if (qp.getType() == Leader.PING) {
                    traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
                }
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
                }
                tickOfNextAckDeadline = leader.self.tick.get() + leader.self.syncLimit;


                ByteBuffer bb;
                long sessionId;
                int cxid;
                int type;

                switch (qp.getType()) {
                case Leader.ACK:
                    if (this.learnerType == LearnerType.OBSERVER) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Received ACK from Observer  " + this.sid);
                        }
                    }
                    syncLimitCheck.updateAck(qp.getZxid());
                    leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());
                    break;
                case Leader.PING:
                    // Process the touches
                    ByteArrayInputStream bis = new ByteArrayInputStream(qp
                            .getData());
                    DataInputStream dis = new DataInputStream(bis);
                    while (dis.available() > 0) {
                        long sess = dis.readLong();
                        int to = dis.readInt();
                        leader.zk.touch(sess, to);
                    }
                    break;
                case Leader.REVALIDATE:
                    bis = new ByteArrayInputStream(qp.getData());
                    dis = new DataInputStream(bis);
                    long id = dis.readLong();
                    int to = dis.readInt();
                    ByteArrayOutputStream bos = new ByteArrayOutputStream();
                    DataOutputStream dos = new DataOutputStream(bos);
                    dos.writeLong(id);
                    boolean valid = leader.zk.touch(id, to);
                    if (valid) {
                        try {
                            //set the session owner
                            // as the follower that
                            // owns the session
                            leader.zk.setOwner(id, this);
                        } catch (SessionExpiredException e) {
                            LOG.error("Somehow session " + Long.toHexString(id) + " expired right after being renewed! (impossible)", e);
                        }
                    }
                    if (LOG.isTraceEnabled()) {
                        ZooTrace.logTraceMessage(LOG,
                                                 ZooTrace.SESSION_TRACE_MASK,
                                                 "Session 0x" + Long.toHexString(id)
                                                 + " is valid: "+ valid);
                    }
                    dos.writeBoolean(valid);
                    qp.setData(bos.toByteArray());
                    queuedPackets.add(qp);
                    break;
                case Leader.REQUEST:                    
                    bb = ByteBuffer.wrap(qp.getData());
                    sessionId = bb.getLong();
                    cxid = bb.getInt();
                    type = bb.getInt();
                    bb = bb.slice();
                    Request si;
                    if(type == OpCode.sync){
                        si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
                    } else {
                        si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
                    }
                    si.setOwner(this);
                    leader.zk.submitRequest(si);
                    break;
                default:
                    LOG.warn("unexpected quorum packet, type: {}", packetToString(qp));
                    break;
                }
            }
        } catch (IOException e) {
            if (sock != null && !sock.isClosed()) {
                LOG.error("Unexpected exception causing shutdown while sock "
                        + "still open", e);
                //close the socket to make sure the 
                //other side can see it being close
                try {
                    sock.close();
                } catch(IOException ie) {
                    // do nothing
                }
            }
        } catch (InterruptedException e) {
            LOG.error("Unexpected exception causing shutdown", e);
        } finally {
            LOG.warn("******* GOODBYE " 
                    + (sock != null ? sock.getRemoteSocketAddress() : "<null>")
                    + " ********");
            shutdown();
        }
    }

分析過程:

  1. 將這個LearnerHandler實例添加到Leader的LearnerHandler列表learners中
  2. 獲取下一個ACK截止時間,計算方法為:當前tick+initLimit+syncLimit(Leader在一個tickTime周期內(nèi)會跟Learner進行兩次PING的交互,然后這個tick就會自增1)
  3. 分別獲取自定義jute協(xié)議中的BinaryInputArchive和BinaryOutputArchive以用作數(shù)據(jù)讀寫
  4. 讀取Learner發(fā)來的數(shù)據(jù)包,判斷是否是FOLLOWERINFO或者OBSERVERINFO,如果都不是,那么直接退出run()方法,因為Learner連接上Leader發(fā)送的第一個數(shù)據(jù)包必須是FOLLOWERINFO或者OBSERVERINFO
  5. 如果數(shù)據(jù)包的data字段存在的話,那么解析這個數(shù)據(jù)包的data字段,分別獲取serverId、版本號version和投票驗證器版本configVersion,接下來對比Leader和Learner的投票驗證器版本;如果data字段不存在,則以-1再遞減的形式賦予serverId
  6. 獲取followerInfo以及確定learnerType,接下來就為LearnerHandler注冊JMX服務
  7. ZxidUtils.getEpochFromZxid(qp.getZxid()); 通過數(shù)據(jù)包傳來的zxid獲取到最近一次通過事務投票的epoch即lastAcceptedEpoch,緊接著調(diào)用getEpochToPropose(long sid, long lastAcceptedEpoch)獲取當前集群的領導紀元并計算出集群Leader的zxid(getEpochToPropose這個方法,執(zhí)行方其實只有兩類即LearnerMaster和LearnerHandler,每個執(zhí)行這個方法的線程都會執(zhí)行wait()陷入等待,等待周期為initTime * tickTime,直到投票驗證器通過了獲取集群領導紀元的提案后退出方法棧執(zhí)行后面的流程)
    后面會詳細介紹下getEpochToPropose
  8. 判斷協(xié)議版本是否是0x10000,最新版本的是0x10000,然后發(fā)送LEADERINFO數(shù)據(jù)包給Learner并等待接收Learner發(fā)送的ACKEPOCH數(shù)據(jù)包,然后調(diào)用waitForEpochAck(long sid, StateSummary ss)方法等待initLimit * tickTime時間周期內(nèi)領導紀元的ack
  9. 接下來進入數(shù)據(jù)同步過程,針對3種不同的情況,具體后面再講
  10. 最新版本將NEWLEADER數(shù)據(jù)包緩存到阻塞隊列queuedPackets中以等待稍后的發(fā)送,老版本直接發(fā)送NEWLEADER數(shù)據(jù)包給Learner;
    接下來啟動一個發(fā)送緩存數(shù)據(jù)包的線程,這個線程將循環(huán)從阻塞隊列queuedPackets中獲取緩存的數(shù)據(jù)包并將它發(fā)送給相應的Learner;
    然后等待接收Learner對NEWLEADER消息發(fā)送的ACK數(shù)據(jù)包,接收到之后將會調(diào)用waitForNewLeaderAck方法等待initLimit * tickTime時間周期內(nèi)NEWLEADER的ACK;
  11. 啟動同步檢查器syncLimitCheck,設置Socket超時時間為syncLimit * tickTime,然后等待learnerMaster即Leader或者ObserverMaster的Zookeeper服務啟動,接下來再緩存UPTODATE數(shù)據(jù)包到阻塞隊列queuedPackets中等待稍后的發(fā)送
  12. 循環(huán)處理Learner發(fā)來的數(shù)據(jù)包,首先讀取數(shù)據(jù)包,然后設置新的下一個ACK截止時間tickOfNextAckDeadline并統(tǒng)計數(shù)據(jù)包接收數(shù)量,針對不同類型的數(shù)據(jù)包處理方式也不盡相同,如下:
  • ACK:這個是針對事務請求的提案,表示Follower同意當前事務請求的提案,首先會更新同步檢查器syncLimitCheck中提案的處理時間,然后將會由Leader進行事務處理
  • PING:這個是處理LearnerMaster主動發(fā)送PING數(shù)據(jù)包給Learner之后,Learner回復的數(shù)據(jù)包,數(shù)據(jù)包中的data字段存儲的是會話信息,然后將會由LearnerMaster延長會話過期時間
  • REVALIDATE:這個是用于重新驗證并激活會話的
  • REQUEST:這個是處理Learner轉發(fā)的事務請求以及sync請求,因為Learner無權處理事務請求,應該交由Leader處理

2.3 數(shù)據(jù)包發(fā)送線程

            // Start sending packets
            new Thread() {
                public void run() {
                    Thread.currentThread().setName(
                            "Sender-" + sock.getRemoteSocketAddress());
                    try {
                        sendPackets();
                    } catch (InterruptedException e) {
                        LOG.warn("Unexpected interruption",e);
                    }
                }
            }.start();

    private void sendPackets() throws InterruptedException {
        long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
        while (true) {
            try {
                QuorumPacket p;
                p = queuedPackets.poll();
                if (p == null) {
                    bufferedOutput.flush();
                    p = queuedPackets.take();
                }

                if (p == proposalOfDeath) {
                    // Packet of death!
                    break;
                }
                if (p.getType() == Leader.PING) {
                    traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
                }
                if (p.getType() == Leader.PROPOSAL) {
                    syncLimitCheck.updateProposal(p.getZxid(), System.nanoTime());
                }
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logQuorumPacket(LOG, traceMask, 'o', p);
                }
                oa.writeRecord(p, "packet");
            } catch (IOException e) {
                if (!sock.isClosed()) {
                    LOG.warn("Unexpected exception at " + this, e);
                    try {
                        // this will cause everything to shutdown on
                        // this learner handler and will help notify
                        // the learner/observer instantaneously
                        sock.close();
                    } catch(IOException ie) {
                        LOG.warn("Error closing socket for handler " + this, ie);
                    }
                }
                break;
            }
        }
    }
  1. 啟動一個線程專門用來發(fā)送緩存在queuedPackets隊列中的數(shù)據(jù)包
  2. 循環(huán)從queuedPackets這個隊列中獲取數(shù)據(jù)包,首先會進行直接獲取,如果獲取不到則刷新緩沖區(qū)將數(shù)據(jù)發(fā)送出去并且阻塞直到獲取新的數(shù)據(jù)包為止
  3. 進行指標統(tǒng)計,如果是事務請求還需要更新同步檢查器syncLimitCheck
  4. 重置lastZxid并將數(shù)據(jù)包寫到緩沖區(qū),然后統(tǒng)計已發(fā)送的數(shù)據(jù)包

2.4 同步檢查器syncLimitCheck

    private class SyncLimitCheck {
        private boolean started = false;
        private long currentZxid = 0;
        private long currentTime = 0;
        private long nextZxid = 0;
        private long nextTime = 0;

        public synchronized void start() {
            started = true;
        }

        public synchronized void updateProposal(long zxid, long time) {
            if (!started) {
                return;
            }
            if (currentTime == 0) {
                currentTime = time;
                currentZxid = zxid;
            } else {
                nextTime = time;
                nextZxid = zxid;
            }
        }

        public synchronized void updateAck(long zxid) {
             if (currentZxid == zxid) {
                 currentTime = nextTime;
                 currentZxid = nextZxid;
                 nextTime = 0;
                 nextZxid = 0;
             } else if (nextZxid == zxid) {
                 LOG.warn("ACK for " + zxid + " received before ACK for " + currentZxid + "!!!!");
                 nextTime = 0;
                 nextZxid = 0;
             }
        }

        public synchronized boolean check(long time) {
            if (currentTime == 0) {
                return true;
            } else {
                long msDelay = (time - currentTime) / 1000000;
                return (msDelay < learnerMaster.syncTimeout());
            }
        }
    }
  • updateProposal:LearnerHandler發(fā)送提案時,檢查是否已啟動以及當前已發(fā)送ACK的提案的時間currentTime,如果currentTime等于0,則將這個提案設置為當前提案,否則設置為下一提案
  • updateAck:LearnerHandler接收到Learner發(fā)送的ACK時,檢查當前提案是否是剛剛發(fā)送ACK的提案,如果是的話更新當前提案的時間為之前調(diào)用updateProposal方法設置的nextTime 和 nextZxid并重置nextTime 和 nextZxid為0,如果不是的話說明請求處理亂序了,也重置nextTime 和 nextZxid為0
  • check:LearnerHandler發(fā)送PING數(shù)據(jù)包給Learner之前會進行提案處理時間的檢查,檢查邏輯就是判斷currentTime是否等于0以及當前時間減去currentTime是否小于同步超時時間syncLimit * tickTime;假如一個提案一直未提交,直到發(fā)送PING數(shù)據(jù)包時發(fā)現(xiàn)超時將關閉當前LearnerHandler(注意:一個tickTime周期內(nèi)發(fā)送兩次PING數(shù)據(jù)包)
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

  • 目錄: 數(shù)據(jù)同步與初始化(選舉完leader之后) 分角色業(yè)務處理分析(leader,follower,obser...
    LZhan閱讀 460評論 0 1
  • Zookeeper-leader初始化 在選舉完成后,集群每個節(jié)點的角色狀態(tài)就會確定,回到QuorumPeer#s...
    maxam0128閱讀 995評論 0 2
  • 最近這兩年年終總結一直未能寫出來??赡苁遣恢廊绾伍_始寫作。也沒什么說得上的成就值得去總結的。導致這兩年總結的空缺...
    云端的故事閱讀 277評論 0 0
  • 你們的項目使用的是http還是https呢?如果使用 https,你們有遇到過請求中斷的問題嗎?說說我們使用htt...
    Peanut_S1閱讀 1,783評論 0 2
  • 21天的小白訓練營的課程結束了,此處應該有掌聲,再高聲抒情一段。居然一不小心堅持下來了,看來只有真金白銀才能打動我...
    angus0851閱讀 382評論 2 0

友情鏈接更多精彩內(nèi)容