ZooKeeper 源碼分析 session建立及超時機制 (基于3.4.6)

1. ZooKeeper session建立及超時機制 概述

首先說一下, 為什么要寫下這篇, 原因也很簡單, 因為session的建立及超時機制特別

1. ZooKeeper 集群的所有 sessionImpl 都在 Leader端, 而Follower端只將 sessionId 與 timeout 存儲到 HashMap里面
2. 在 Leader 端 每個 LearnerHandler 會定期的向Follower/Observer發(fā)送給ping 包, Follower/Observer在接受到之后, 將會將對應的要檢查超時的 sessionId 發(fā)給 Leader, 統(tǒng)一讓Leader進行檢查
3. Leader用SessionTrackerImpl線程來檢查Session是否超時, 而 session 將放在一個以 expirationTime為Key的HashMap里面, 定時的獲取并檢查, 超時的話就進行刪除, 不超時的話將session移到下一個將要超時的 Bucket 里面(見touchSession)

接下來就直接上代碼(我們這里從Follower的角度出發(fā))

1. Client 連接 Follower

當Client連接Follower時, 會調用 FollowerZooKeeperServer.processPacket 來進行處理(這里不涉及Zookeeper自己的NIO/NettyNIO處理部分), 最后會直接調用 LeaderZooKeeperServer.submitRequest方法將對應的Request進行提交, 到這里有必要說一下 Follower的RequestProcessor處理鏈

/**
 * Follower 的 RequestProcessor 處理鏈 (2條)
 * 第一條 鏈
 * FollowerRequestProcessor: 區(qū)分處理 Request, 將 Request 交由下個 RequestProcessor, 而若涉及事務的操作, 則 交由 Follower 提交給 leader (zks.getFollower().request())
 * CommitProcessor: 這條鏈決定這著 Request 能否提交, 里面主要有兩條鏈 , queuedRequests : 存儲著 等待 ACK 過半確認的 Request, committedRequests :存儲著 已經(jīng)經(jīng)過 ACK 過半確認的 Request
 * FinalRequestProcessor: 前面的 Request 只是在經(jīng)過 SynRequestProcessor 持久化到 txnLog 里面, 而 這里做的就是真正的將數(shù)據(jù)改變到 ZKDataBase 里面(作為  Follower 一定會在 FollowerZooKeeperServer.logRequest 進行同步Request 數(shù)據(jù)到磁盤里面后再到 FinalRequestProcessor)
 *
 * 第二條 鏈
 * SynRequestProcessor: 主要是將 Request 持久化到 TxnLog 里面, 其中涉及到 TxnLog 的滾動, 及 Snapshot 文件的生成
 * AckRequestProcessor: 主要完成 針對 Request 的 ACK 回復, 對 在Leader中就是完成 leader 自己提交 Request, 自己回復 ACK
 *
 * 1. FollowerRequestProcessor --> CommitProcessor --> FinalRequestProcessor
 * 2. SyncRequestProcessor --> SendAckRequestProcessor
 */
@Override
protected void setupRequestProcessors() {
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true);
    commitProcessor.start();
    firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
    ((FollowerRequestProcessor) firstProcessor).start();
    syncProcessor = new SyncRequestProcessor(this, new SendAckRequestProcessor((Learner)getFollower()));
    syncProcessor.start();
}

Leader 的RequestProcessor處理鏈

/**
 * Leader 的 RequestProcessor 處理鏈
 *
 * 第一條 RequestProcessor 鏈
 * PreRequestProcessor : 創(chuàng)建和修改 TxnRequest
 * ProposalRequestProcessor : 提交  Proposal 給各個 Follower 包括 Leader自己 (Leader自己是在 ProposalRequestProcessor 里面通過 syncProcessor.processRequest(request) 直接提交 Proposal)
 * CommitProcessor : 將 經(jīng)過集群中的過半 Proposal 提交(提交的操作直接在 Leader.processAck -> zk.commitProcessor.commit(p.request))
 * ToBeAppliedRequestProcessor: 這個處理鏈其實是 Request 處理時經(jīng)過的最后一個 RequestProcessor, 其中最令人困惑的是 toBeApplied, 而 toBeApplied 中其實維護的是 集群中經(jīng)過 過半 ACK 同意的 proposal, 只有經(jīng)過 FinalRequestProcessor 處理過的 Request 才會在 toBeApplied 中進行刪除
 * FinalRequestProcessor: 前面的 Request 只是在經(jīng)過 SynRequestProcessor 持久化到 txnLog 里面, 而 這里做的就是真正的將數(shù)據(jù)改變到 ZKDataBase 里面
 *
 * 第二條 RequestProcessor 鏈
 * 在 leader 中, SynRequestProcessor, AckRequestProcessor 的創(chuàng)建其實是在 ProposalRequestProcessor 中完成的
 * SynRequestProcessor: 主要是將 Request 持久化到 TxnLog 里面, 其中涉及到 TxnLog 的滾動, 及 Snapshot 文件的生成
 * AckRequestProcessor: 主要完成 針對 Request 的 ACK 回復, 對 在Leader中就是完成 leader 自己提交 Request, 自己回復 ACK
 *
 * PrepRequestProcessor --> ProposalRequestProcessor --> CommitProcessor --> ToBeAppliedRequestProcessor --> FinalRequestProcessor
 *                                                    \
 *                                                     SynRequestProcessor --> AckRequestProcessor (這條分支是在 ProposalRequestProcessor 里面進行構建的)
 */
@Override
protected void setupRequestProcessors() {
    RequestProcessor finalProcessor = new FinalRequestProcessor(this);
    RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(
            finalProcessor, getLeader().toBeApplied);
    // 投票確認處理器
    commitProcessor = new CommitProcessor(toBeAppliedProcessor,
            Long.toString(getServerId()), false);
    commitProcessor.start();
    // 投票發(fā)起處理器
    ProposalRequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
            commitProcessor);
    proposalProcessor.initialize();
    // 預處理器
    firstProcessor = new PrepRequestProcessor(this, proposalProcessor);
    ((PrepRequestProcessor)firstProcessor).start();
}

下面先來張總體的流程圖:

session檢查.png

上面這張圖片有點大, 建議在 百度云 里面進行下載預覽, 接下來我們會一步一步進行下去PS: 吐槽一下簡書的圖片系統(tǒng), 圖片一旦大了就預覽出問題(不清晰)

整個流程涉及好幾個過程, 下面一一分析:

2. FollowerZooKeeperServer createSession
// 創(chuàng)建 session
long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
    long sessionId = sessionTracker.createSession(timeout);     // 1. 創(chuàng)建 會話 Session, 生成 SessionImpl 放入對應的 sessionsById, sessionsWithTimeout, sessionSets 里面, 返回 sessionid
    Random r = new Random(sessionId ^ superSecret);
    r.nextBytes(passwd);                                        // 2. 生成一個隨機的 byte[] passwd
    ByteBuffer to = ByteBuffer.allocate(4);
    to.putInt(timeout);
    cnxn.setSessionId(sessionId);                               // 3. 提交 Request 到RequestProcessor 處理鏈
    submitRequest(cnxn, sessionId, OpCode.createSession, 0, to, null);
    return sessionId;                                           // 4. 返回此回話對應的 sessionId
}
3. FinalRequestProcessor 處理請求
switch (request.type) {
case OpCode.sync:                                // 2. 處理同步數(shù)據(jù)
    zks.pendingSyncs.add(request);
    zks.getFollower().request(request);
    break;
case OpCode.create:                              // 3. 從這里 看出 path 創(chuàng)建/刪除/設置數(shù)據(jù)/設置訪問權限/創(chuàng)建,關閉session, 多個操作 -> 都 是 Follower 交給 leader 進行處理
case OpCode.delete:
case OpCode.setData:
case OpCode.setACL:
case OpCode.createSession:
case OpCode.closeSession:
case OpCode.multi:
    zks.getFollower().request(request);          // 4. 將事務類的請求都交給 Leader 處理
    break;
}

follower提交 Request 到Leader

/**
 * send a request packet to the leader
 *
 * @param request
 *                the request from the client
 * @throws IOException
 */
void request(Request request) throws IOException {
    ByteArrayOutputStream baos = new ByteArrayOutputStream();       // 1. 將要發(fā)送給 Leader 的數(shù)據(jù)包序列化
    DataOutputStream oa = new DataOutputStream(baos);
    oa.writeLong(request.sessionId);
    oa.writeInt(request.cxid);
    oa.writeInt(request.type);
    if (request.request != null) {
        request.request.rewind();
        int len = request.request.remaining();
        byte b[] = new byte[len];
        request.request.get(b);
        request.request.rewind();
        oa.write(b);
    }
    oa.close();                                                     // 2. 封裝請求數(shù)據(jù)包
    QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos.toByteArray(), request.authInfo);

    writePacket(qp, true);                                         // 3. 將 事務請求 request 發(fā)送給 Leader
}

FinalRequestProcessor處理了Request后一方面是follower提交給Leader, 另一方面是提交給 CommitProcessor

4. CommitProcessor 處理請求
@Override
public void run() {
    try {
        Request nextPending = null;            
        while (!finished) {                                            // while loop
            int len = toProcess.size();
            for (int i = 0; i < len; i++) {
                Request request = toProcess.get(i);
                LOG.info("request:"+ request);                         // 1. Follower 里面就是 丟給 FinalRequestProcessor 處理
                nextProcessor.processRequest(request);                 // 2. 將 ack 過半的 Request 丟給 ToBeAppliedRequestProcessor 來進行處理 (Leader 中是這樣處理)
            }
            toProcess.clear();
            synchronized (this) {
                if ((queuedRequests.size() == 0 || nextPending != null)// 3. 如果沒有 Commit 的請求, 則進行wait, 直到 commit 請求的到來
                        && committedRequests.size() == 0) {
                    wait();
                    continue;
                }
                // First check and see if the commit came in for the pending
                // request
                if ((queuedRequests.size() == 0 || nextPending != null)// 4. 當 Leader 通過了 過半ACK確認后, 則會將這個 Request 丟給 Follower 來處理, Follower 會直接將 Request 丟到 committedRequests 里面, 進而處理
                        && committedRequests.size() > 0) {
                    Request r = committedRequests.remove();
                    /*
                     * We match with nextPending so that we can move to the
                     * next request when it is committed. We also want to
                     * use nextPending because it has the cnxn member set
                     * properly.
                     */
                    if (nextPending != null                            // 5. 這里其實就是比較 nextPending 與 committedRequests 中的 request 請求
                            && nextPending.sessionId == r.sessionId    // 6. 而 nextPending 又是從 queuedRequests 里面拿出來的, 若相同, 則直接用 committedRequests 里面的 消息頭, 消息體, zxid
                            && nextPending.cxid == r.cxid) {
                        // we want to send our version of the request.
                        // the pointer to the connection in the request
                        nextPending.hdr = r.hdr;
                        nextPending.txn = r.txn;
                        nextPending.zxid = r.zxid;
                        toProcess.add(nextPending);                    // 7. 將 請求 直接加入 toProcess, 直到下次 loop 被 nextProcessor 處理
                        nextPending = null;
                    } else {                                           // 8. Leader 直接 調用 commit 方法提交的 請求, 直接加入 toProcess, 直到下次 loop 被 nextProcessor 處理 (這個 IF 判斷中是 Leader 中處理的)
                        // this request came from someone else so just
                        // send the commit packet
                        toProcess.add(r);
                    }
                }
            }

            // We haven't matched the pending requests, so go back to
            // waiting
            if (nextPending != null) {
                continue;
            }

            synchronized (this) {
                // Process the next requests in the queuedRequests
                while (nextPending == null && queuedRequests.size() > 0) {
                    Request request = queuedRequests.remove();
                    switch (request.type) {
                    case OpCode.create:
                    case OpCode.delete:
                    case OpCode.setData:
                    case OpCode.multi:
                    case OpCode.setACL:
                    case OpCode.createSession:
                    case OpCode.closeSession:
                        nextPending = request;                          // 9. 若請求是事務請求, 則將 follower 自己提交的 request 賦值給 nextPending
                        break;
                    case OpCode.sync:
                        if (matchSyncs) {
                            nextPending = request;
                        } else {
                            toProcess.add(request);
                        }
                        break;
                    default:                                            // 10.這里直接加入到 隊列 toProcess 中的其實是 非 事務的請求 (比如getData), 丟到 toProcess 里面的請求會丟到下個 RequestProcessor
                        toProcess.add(request);
                    }
                }
            }
        }
    } catch (InterruptedException e) {
        LOG.warn("Interrupted exception while waiting", e);
    } catch (Throwable e) {
        LOG.error("Unexpected exception causing CommitProcessor to exit", e);
    }
    LOG.info("CommitProcessor exited loop!");
}

在CommitProcessor 里面有幾個特別的隊列

/**
 * Requests that we are holding until the commit comes in.
 */
// 等待 ACK 確認的 Request
LinkedList<Request> queuedRequests = new LinkedList<Request>();

/**
 * Requests that have been committed.
 */
// 已經(jīng) Proposal ACK 過半確認過的 Request, 一般的要么是 Leader 自己 commit, 要么就是 Follower 接收到 Leader 的 commit 消息
LinkedList<Request> committedRequests = new LinkedList<Request>();

// 等待被 nextProcessor 處理的隊列, 其里面的數(shù)據(jù)是從 committedRequests, queuedRequests 里面獲取來的
ArrayList<Request> toProcess = new ArrayList<Request>();

5. LearnerHandler 處理Request請求

此時LearnerHandler在while loop里面處理對應的Request請求

while (true) {
    qp = new QuorumPacket();
    ia.readRecord(qp, "packet");                         // 47. 這里其實就是不斷的從數(shù)據(jù)流(來源于 Follower 的) 讀取數(shù)據(jù)
    LOG.info("qp:" + qp);

    long traceMask = ZooTrace.SERVER_PACKET_TRACE_MASK;
    if (qp.getType() == Leader.PING) {
        traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
    }
    if (LOG.isTraceEnabled()) {
        ZooTrace.logQuorumPacket(LOG, traceMask, 'i', qp);
    }
    tickOfNextAckDeadline = leader.self.tick + leader.self.syncLimit;
    LOG.info("tickOfNextAckDeadline :" + tickOfNextAckDeadline);

    ByteBuffer bb;
    long sessionId;
    int cxid;
    int type;

    LOG.info("qp.getType() : " + qp);
    switch (qp.getType()) {
    case Leader.ACK:                                   // 48. 處理 Follower 回復給 Leader 的ACK 包看看之前的投票是否結束 ( 這里是 Follower 在處理 UPTODATE 后恢復 ACK)
        if (this.learnerType == LearnerType.OBSERVER) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Received ACK from Observer  " + this.sid);
            }
        }
        LOG.info("syncLimitCheck.updateAck(qp.getZxid()):"  + qp.getZxid());
        syncLimitCheck.updateAck(qp.getZxid());
        LOG.info("this.sid:" + this.sid + ", qp.getZxid():" + qp.getZxid() + ", sock.getLocalSocketAddress():" + sock.getLocalSocketAddress());
                                                      // 49. ack 包處理成功, 如果 follower 數(shù)據(jù)同步成功, 則將它添加到 NEWLEADER 這個投票的結果中
        leader.processAck(this.sid, qp.getZxid(), sock.getLocalSocketAddress());        
        break;
    case Leader.PING:                                 // 50. ping 數(shù)據(jù)包, 更新 session 的超時時間
        // Process the touches
        ByteArrayInputStream bis = new ByteArrayInputStream(qp.getData());
        DataInputStream dis = new DataInputStream(bis);
        while (dis.available() > 0) {
            long sess = dis.readLong();
            int to = dis.readInt();
            LOG.info("leader.zk.touch: sess" + sess + ", to:"+to);
            leader.zk.touch(sess, to);
        }
        break;
    case Leader.REVALIDATE:                          // 51. 檢查 session 是否還存活
        bis = new ByteArrayInputStream(qp.getData());
        dis = new DataInputStream(bis);
        long id = dis.readLong();
        int to = dis.readInt();
        LOG.info("id:"+id + ", to:" + to);
        ByteArrayOutputStream bos = new ByteArrayOutputStream();
        DataOutputStream dos = new DataOutputStream(bos);
        dos.writeLong(id);
        boolean valid = leader.zk.touch(id, to);
        LOG.info("id:" + id + ", to:" + to + ", valid:" + valid);
        if (valid) {
            try {
                //set the session owner
                // as the follower that
                // owns the session
                leader.zk.setOwner(id, this);
            } catch (SessionExpiredException e) {
                LOG.error("Somehow session " + Long.toHexString(id) + " expired right after being renewed! (impossible)", e);
            }
        }
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG,
                                     ZooTrace.SESSION_TRACE_MASK,
                                     "Session 0x" + Long.toHexString(id)
                                     + " is valid: "+ valid);
        }
        dos.writeBoolean(valid);
        qp.setData(bos.toByteArray());
        queuedPackets.add(qp);                         // 52. 將數(shù)據(jù)包返回給對應的 follower
        break;
    case Leader.REQUEST:                               // 53. REQUEST 數(shù)據(jù)包, follower 會將事務請求轉發(fā)給 leader 進行處理
        bb = ByteBuffer.wrap(qp.getData());
        sessionId = bb.getLong();
        cxid = bb.getInt();
        type = bb.getInt();
        bb = bb.slice();                               // 54. 讀取事務信息
        Request si;
        LOG.info(" sessionId:" + sessionId + ", cxid:" + cxid + ", type:" + type);
        if(type == OpCode.sync){
            si = new LearnerSyncRequest(this, sessionId, cxid, type, bb, qp.getAuthinfo());
        } else {
            si = new Request(null, sessionId, cxid, type, bb, qp.getAuthinfo());
        }
        si.setOwner(this);
        LOG.info("si:" + si);
        leader.zk.submitRequest(si);                   // 55. 將事務請求的信息交由 Leader 的 RequestProcessor 處理
        break;
    default:
    }
}

LearnerHandler調用Leader.zk.submitRequest(Request request) 到RequestProcessor處理鏈里面;

6. PrepRequestProcessor 處理請求
case OpCode.createSession:                                  // 創(chuàng)建 session
    request.request.rewind();
    int to = request.request.getInt();
    request.txn = new CreateSessionTxn(to);                 // 組裝事務體, 事務頭在最前面已經(jīng)弄好了
    request.request.rewind();
    zks.sessionTracker.addSession(request.sessionId, to);   // 調用 sessionTracker.addSession() 將follower里的session加入到Leader的sessionsWithTimeout里面
    zks.setOwner(request.sessionId, request.getOwner());
    break;

這里的操作就是將session加入到Leader的sessionsById里面

7. ProposalRequestProcessor 處理請求
    nextProcessor.processRequest(request);         // 1. 這里的 nextProcessor 其實就是 CommitProcessor
if (request.hdr != null) {                         // 2. 若是 事務請求
    // We need to sync and get consensus on any transactions
    try {
        zks.getLeader().propose(request);          // 3. Leader 進行 Request 的投票 (Proposal) 將 request 發(fā)送給 Follower
    } catch (XidRolloverException e) {
        throw new RequestProcessorException(e.getMessage(), e);
    }
    syncProcessor.processRequest(request);         // 4. 將 request 交給 syncProcessor 進行落磁盤
}

這里就這幾步:

1. 提交請求到CommitProcessor.queuedRequests里面
2. 通過zks.getLeader().propose(request) 向各個Follower提交 Leader.PROPOSAL
3. 本機的 syncProcessor處理請求(持久化, 接下來就是本機的 AckRequestProcessor回復ack給 Leader.processAck 阻塞這里, ACK過半了就不會阻塞)
8. Follower.processPacket 處理請求

接著就是 Follower處理Leader提出的Proposal

case Leader.PROPOSAL:                                             // 1. 處理 Leader 發(fā)來的 Proposal 包, 投票處理
    TxnHeader hdr = new TxnHeader();
    Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);// 2. 反序列化出 Request
    if (hdr.getZxid() != lastQueued + 1) {                        // 3. 這里說明什么呢, 說明 Follower 可能少掉了 Proposal
        LOG.warn("Got zxid 0x"
                + Long.toHexString(hdr.getZxid())
                + " expected 0x"
                + Long.toHexString(lastQueued + 1));
    }
    lastQueued = hdr.getZxid();
    fzk.logRequest(hdr, txn);                                     // 4. 將 Request 交給 FollowerZooKeeperServer 來進行處理

fzk.logRequest 提交Request到syncProcessor里面, 而后就是通過SendAckRequestProcessor向Leader發(fā)送剛才Proposal對應的ack

9. Leader.processAck 處理Follower發(fā)來的ack
/**
 * 參考資料
 * http://blog.csdn.net/vinowan/article/details/22196707
 *
 * Keep a count of acks that are received by the leader for a particular
 * proposal
 * 
 * @param zxid the zxid of the proposal sent out
 */
synchronized public void processAck(long sid, long zxid, SocketAddress followerAddr) {
    LOG.info("sid:" + sid + ", zxid:" + zxid + ", followerAddr:" + followerAddr);
    if (LOG.isTraceEnabled()) {
        LOG.trace("Ack zxid: 0x{}", Long.toHexString(zxid));
        for (Proposal p : outstandingProposals.values()) {
            long packetZxid = p.packet.getZxid();
            LOG.trace("outstanding proposal: 0x{}",
                    Long.toHexString(packetZxid));
        }
        LOG.trace("outstanding proposals all");
    }

    LOG.info("(zxid & 0xffffffffL) == 0 :" + ((zxid & 0xffffffffL) == 0));
    if ((zxid & 0xffffffffL) == 0) {                              // 1. zxid 全是 0
        /*
         * We no longer process NEWLEADER ack by this method. However,
         * the learner sends ack back to the leader after it gets UPTODATE
         * so we just ignore the message.
         */
        return;
    }

    LOG.info("outstandingProposals :" + outstandingProposals);
    if (outstandingProposals.size() == 0) {                       // 2. 沒有要回應 ack 的 Proposal 存在
        if (LOG.isDebugEnabled()) {
            LOG.debug("outstanding is 0");
        }
        return;
    }
    LOG.info("lastCommitted :" + lastCommitted + ", zxid:" + zxid);
    if (lastCommitted >= zxid) {                                  // 3. Leader 端處理的 lastCommited >= zxid, 說明 zxid 對應的 proposal 已經(jīng)處理過了
        if (LOG.isDebugEnabled()) {
            LOG.debug("proposal has already been committed, pzxid: 0x{} zxid: 0x{}", Long.toHexString(lastCommitted), Long.toHexString(zxid));
        }
        // The proposal has already been committed
        return;
    }
    Proposal p = outstandingProposals.get(zxid);                  // 4. 從投票箱 outstandingProposals 獲取 zxid 對應的 Proposal
    LOG.info("p:" + p);
    if (p == null) {
        LOG.warn("Trying to commit future proposal: zxid 0x{} from {}", Long.toHexString(zxid), followerAddr);
        return;
    }
    LOG.info("p:" + p + ", sid:" + sid);

    p.ackSet.add(sid);                                            // 5. 將 follower 的 myid 加入結果列表
    if (LOG.isDebugEnabled()) {
        LOG.info("Count for zxid: 0x{} is {}", Long.toHexString(zxid), p.ackSet.size());
    }

    LOG.info("self.getQuorumVerifier().containsQuorum(p.ackSet):" + self.getQuorumVerifier().containsQuorum(p.ackSet));
    if (self.getQuorumVerifier().containsQuorum(p.ackSet)){       // 6. 判斷是否票數(shù)夠了, 則啟動  leader 的 CommitProcessor 來進行處理

        LOG.info("zxid:" + zxid + ", lastCommitted:" + lastCommitted);
        if (zxid != lastCommitted+1) {
            LOG.warn("Commiting zxid 0x{} from {} not first!", Long.toHexString(zxid), followerAddr);
            LOG.warn("First is 0x{}", Long.toHexString(lastCommitted + 1));
        }

        LOG.info("outstandingProposals:" + outstandingProposals);
        outstandingProposals.remove(zxid);                        // 7. 從 outstandingProposals 里面刪除那個可以提交的 Proposal
        if (p.request != null) {
            toBeApplied.add(p);                                   // 8. 加入到 toBeApplied 隊列里面, 這里的 toBeApplied 是 ToBeAppliedRequestProcessor, Leader 共用的隊列, 在經(jīng)過 CommitProcessor 處理過后, 就到 ToBeAppliedRequestProcessor 里面進行處理
            LOG.info("toBeApplied:" + toBeApplied);               // 9. toBeApplied 對應的刪除操作在 ToBeAppliedRequestProcessor 里面, 在進行刪除時, 其實已經(jīng)經(jīng)過 FinalRequestProcessor 處理過的
        }

        if (p.request == null) {
            LOG.warn("Going to commmit null request for proposal: {}", p);
        }
        commit(zxid);                                             // 10. 向 集群中的 Followers 發(fā)送 commit 消息, 來通知大家, zxid 對應的 Proposal 可以 commit 了
        inform(p);                                                // 11. 向 集群中的 Observers 發(fā)送 commit 消息, 來通知大家, zxid 對應的 Proposal 可以 commit 了
        zk.commitProcessor.commit(p.request);                     // 12. 自己進行 proposal 的提交 (直接調用 commitProcessor 進行提交 )
                                                                  // 13. 其實這里隱藏一個細節(jié), 就是有可能 有些 Proposal 在 Follower 上進行了 commit, 而 Leader 上還沒來得及提交, 就有可能與集群間的其他節(jié)點斷開連接
        LOG.info("pendingSyncs :" + pendingSyncs);
        if(pendingSyncs.containsKey(zxid)){
            for(LearnerSyncRequest r: pendingSyncs.remove(zxid)) {
                sendSync(r);
            }
        }
    }
}

這里處理ACK時, 若已經(jīng)收到集群中過半的ack則就可以向集群中的其他節(jié)點發(fā)送commit, 或inform其他Observer節(jié)點, 然后 zk.commitProcessor.commit(p.request) 提交request到Leader的commitProcessor.committedRequests里面, 最后就是 先在FinalRequestProcessor處理一下, 再在ToBeAppliedRequestProcessor.toBeApplied刪除request

10. FollowerZooKeeperServer.commit(long zxid) 提交Proposal
/**
 * When a COMMIT message is received, eventually this method is called, 
 * which matches up the zxid from the COMMIT with (hopefully) the head of
 * the pendingTxns queue and hands it to the commitProcessor to commit.
 * @param zxid - must correspond to the head of pendingTxns if it exists
 */
public void commit(long zxid) {
    if (pendingTxns.size() == 0) {
        LOG.warn("Committing " + Long.toHexString(zxid)
                + " without seeing txn");
        return;
    }
    long firstElementZxid = pendingTxns.element().zxid;         // 1. http://blog.csdn.net/fei33423/article/details/53749138
    if (firstElementZxid != zxid) {                             // 2. 這里就有經(jīng)典問題, 在 Leader 端提交了 3 個 Proposal 的信息(comit 1, comit 2, comit 3), 但 follower 在接收到 comit 1 后就接收到 comit 3
        LOG.error("Committing zxid 0x" + Long.toHexString(zxid) // 3. 則就會打印這里的日志, 并且進行退出
                + " but next pending txn 0x"
                + Long.toHexString(firstElementZxid));
        System.exit(12);
    }
    Request request = pendingTxns.remove();
    commitProcessor.commit(request);                            // 4. 提交到 commitProcessor.committedRequests 里面
}

而后就是Follower.FinalRequestProcessor進行最終的響應客戶端處理

11. Session超時機制 Leader.ping()

在Leader上有個while loop會遍歷 LearnerHandler 然后發(fā)送 ping請求給 Follower/Observer

/**
 * ping calls from the leader to the peers
 */
// 這里其實是 Leader 向 Follower 發(fā)送 ping 請求
// 在向 Learner 發(fā)送ping消息之前, 首先會通過 syncLimitCheck 來檢查
public void ping() {
    long id;
    if (syncLimitCheck.check(System.nanoTime())) {
        synchronized(leader) {
            id = leader.lastProposed;
        }
        QuorumPacket ping = new QuorumPacket(Leader.PING, id, null, null);
        queuePacket(ping);
    } else {
        LOG.warn("Closing connection to peer due to transaction timeout.");
        shutdown();
    }
}
12. Follower處理Leader發(fā)來的ping請求

Follower在接到Leader的ping請求后會將sessionId及timeout發(fā)送給Leader, 進行超時機制檢查

// Follower 將自己的 sessionId 及超時時間發(fā)送給 Leader, 讓 Leader 進行 touch 操作, 校驗是否 session 超時
protected void ping(QuorumPacket qp) throws IOException {
    // Send back the ping with our session data
    ByteArrayOutputStream bos = new ByteArrayOutputStream();
    DataOutputStream dos = new DataOutputStream(bos);
    HashMap<Long, Integer> touchTable = zk   // 1. 獲取 Follower/Observer 的 touchTable(sessionId <-> sessionTimeout) 發(fā)給 Leader 進行session超時的檢測
            .getTouchSnapshot();
    for (Entry<Long, Integer> entry : touchTable.entrySet()) {
        dos.writeLong(entry.getKey());
        dos.writeInt(entry.getValue());
    }
    qp.setData(bos.toByteArray());          // 2. 轉化成字節(jié)數(shù)組, 進行數(shù)據(jù)的寫入
    writePacket(qp, true);                  // 3. 發(fā)送數(shù)據(jù)包
}
13. Leader處理Follower發(fā)來的sessionId及timeout

Leader在接收到Follower發(fā)來的sessionId及timeout, 將會調用SessionTrackerImpl.touchSession(long sessionId, int timeout)來進行校驗

// 更新 session 的過期時間
synchronized public boolean touchSession(long sessionId, int timeout) {
    ZooTrace.logTraceMessage(LOG,
            ZooTrace.CLIENT_PING_TRACE_MASK,
            "SessionTrackerImpl --- Touch session: 0x"
                    + Long.toHexString(sessionId) + " with timeout " + timeout);

    SessionImpl s = sessionsById.get(sessionId);    // 1. 從 sessionsById 獲取 session, sessionsById 是一個 SessionId <-> SessionImpl 的 map
    // Return false, if the session doesn't exists or marked as closing
    if (s == null || s.isClosing()) {
        return false;
    }                                               // 2. 計算過期時間
    long expireTime = roundToInterval(System.currentTimeMillis() + timeout); 
    if (s.tickTime >= expireTime) {
        // Nothing needs to be done
        return true;
    }
    SessionSet set = sessionSets.get(s.tickTime);   // 3. 這里的 SessionSet 就是一個 timeout 對應額 Bucket, 將有一個線程, 在超時時間點檢查這個 SessionSet
    if (set != null) {
        set.sessions.remove(s);
    }
    s.tickTime = expireTime;                        // 4. 下面的步驟就是將 session 以 tickTime 為單位放入 sessionSets 中
    set = sessionSets.get(s.tickTime);
    if (set == null) {
        set = new SessionSet();
        sessionSets.put(expireTime, set);
    }
    set.sessions.add(s);                            // 5. 將 SessionImpl 放入對應的 SessionSets 里面
    return true;
}
總結

zookeeper的session機制只適用于有少量client連接Server的場景(zookeeper的默認maxClientCnxns 是60, 超過的話就會socket主動關閉), 當有百萬連接時, 用這種session集中, 用一條線程檢測超時的機制可能在性能上出現(xiàn)問題, 當zookeeper還是給出了一種很好的思考方向! 在理解了session創(chuàng)建機制后, 對應的create/setData/delete就很好理解了!

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

  • 本文將從系統(tǒng)模型、序列化與協(xié)議、客戶端工作原理、會話、服務端工作原理以及數(shù)據(jù)存儲等方面來揭示ZooKeeper的技...
    端木軒閱讀 3,905評論 0 42
  • 一個真正的寫數(shù)據(jù)流程是怎么樣的?一個真正的讀數(shù)據(jù)流程是怎么樣的?一個真正的同步數(shù)據(jù)流程是怎么樣的?從哪里到哪里?什...
    時待吾閱讀 4,309評論 0 14
  • ZooKeeper是一個分布式的,開放源碼的分布式應用程序協(xié)調服務,它包含一個簡單的原語集,分布式應用程序可以基于...
    rthsfjhtrj閱讀 606評論 0 1
  • 在閱讀了Zab的論文<<Zab:High-performance broadcast for primary-ba...
    AlstonWilliams閱讀 4,270評論 1 8
  • ZooKeeper是一個分布式的,開放源碼的分布式應用程序協(xié)調服務,它包含一個簡單的原語集,分布式應用程序可以基于...
    caryr閱讀 395評論 0 0

友情鏈接更多精彩內容