Zab系列8 集群工作原理Follower篇

Zab系列博客

Raft Vs Zab
http://www.itdecent.cn/p/24307e7ca9da
Zab系列1 核心概念
http://www.itdecent.cn/p/76e5dba31ea4
Zab系列2 角色和存儲
http://www.itdecent.cn/p/d80f9250ffd1
Zab系列3 選舉
http://www.itdecent.cn/p/0d2390c242f6
Zab系列4 zookeeper特性
http://www.itdecent.cn/p/08b62ca1fe4e
Zab系列5 選舉恢復(fù)(源碼分析)
http://www.itdecent.cn/p/b6acd99921b7
Zab系列6 zk單機版工作原理
http://www.itdecent.cn/p/ed45982b18b4
Zab系列7 集群工作原理Leader篇
http://www.itdecent.cn/p/59240c36ba1b
Zab系列8 集群工作原理Follower篇
http://www.itdecent.cn/p/8d7c7f1b2838
Zab系列9 消息順序性
http://www.itdecent.cn/p/0aa96b6a2070

概述

Follower完成一個事務(wù)請求完整的流程:

  • 通過請求轉(zhuǎn)發(fā)器,把所有事務(wù)請求轉(zhuǎn)發(fā)給Leader
  • 每個Follower在身份確定之后,開啟一個循環(huán),監(jiān)聽和處理Leader發(fā)過來的消息(Follower.followLeader()),最終調(diào)用processPacket(QuorumPacket)
  • follower監(jiān)聽到leader的proposal消息的時候,會將該proposal進行持久化,持久化完成之后,反饋一個ACK消息給leader
  • 當(dāng)leader統(tǒng)計ack過半的時候,再發(fā)一個commit的消息過來,follower監(jiān)聽到之后,會把該proposal在本地提交,更新 lastProcessZxid,把事務(wù)結(jié)果apply到自己的內(nèi)存樹當(dāng)中

Follower的監(jiān)聽方法followLeader()

循環(huán)監(jiān)聽leader的消息,并且處理

void followLeader() throws InterruptedException {
    QuorumServer leaderServer = findLeader();
    QuorumPacket qp = new QuorumPacket();
    while (this.isRunning()) {
        readPacket(qp);
        processPacket(qp);
    }
}

根據(jù)消息的類型的不同,處理的方式不同

  • ping消息
  • Proposal消息
  • commit消息
 protected void processPacket(QuorumPacket qp) throws Exception{
        switch (qp.getType()) {
        case Leader.PING:            
            ping(qp);            
            break;
        case Leader.PROPOSAL:           
            TxnHeader hdr = new TxnHeader();
            Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
            lastQueued = hdr.getZxid();
        case Leader.COMMIT:
            fzk.commit(qp.getZxid());

核心流程代碼

  1. 當(dāng)follower監(jiān)聽到Leader的Proposal消息時,最終會調(diào)用 syncProcessor.processRequest(request);
    public void logRequest(TxnHeader hdr, Record txn) {
        Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
        pendingTxns.add(request);
        syncProcessor.processRequest(request);
    }
  1. syncProcessor完成持久化之后之后,根據(jù)Follower初始化時,把下一個處理器綁定為SendAckRequestProcessor。

  2. SendAckRequestProcessor再調(diào)用learner.writePacket,告知到leader

    public void processRequest(Request si) {
        QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null,null);
        learner.writePacket(qp, false);
    }
  1. leader收集到ack時觸發(fā)Leader.tryToCommit,當(dāng)收到過半的Ack后,再廣播commit
 synchronized public boolean tryToCommit() {
        if (!p.hasAllQuorums()) {
           return false;
        }
        commit(zxid);
        inform(p);
        zk.commitProcessor.commit(p.request);

FollowerZooKeeperServer的責(zé)任鏈拼裝方式

有兩條責(zé)任鏈:

  • 第一條既可以處理非事務(wù)請求,又可以處理leaderCommit之后的請求。firstProcessor :FollowerRequestProcessor -->CommitProcessor-->FinalRequestProcessor
  • 第二條是用來申明SyncRequestProcessor的下一個處理器為SendAckRequestProcessor的,這個在leader發(fā)起proposal時會用到
FollowerZooKeeperServer.setupRequestProcessors()
        protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        
        commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
        commitProcessor.start();
        
        firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
        ((FollowerRequestProcessor) firstProcessor).start();
        
        syncProcessor = new SyncRequestProcessor(this,new SendAckRequestProcessor((Learner)getFollower()));
        syncProcessor.start();
    }

參考

這個大佬很牛逼,一篇博客講明白了整個事務(wù)操作的流程,以及各個Processor的各個細節(jié)
http://www.itdecent.cn/p/64f6e8124625

大佬視角很高,簡單明了
http://www.itdecent.cn/p/a8b5783eec63

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容