Zab系列博客
Raft Vs Zab
http://www.itdecent.cn/p/24307e7ca9da
Zab系列1 核心概念
http://www.itdecent.cn/p/76e5dba31ea4
Zab系列2 角色和存儲
http://www.itdecent.cn/p/d80f9250ffd1
Zab系列3 選舉
http://www.itdecent.cn/p/0d2390c242f6
Zab系列4 zookeeper特性
http://www.itdecent.cn/p/08b62ca1fe4e
Zab系列5 選舉恢復(fù)(源碼分析)
http://www.itdecent.cn/p/b6acd99921b7
Zab系列6 zk單機版工作原理
http://www.itdecent.cn/p/ed45982b18b4
Zab系列7 集群工作原理Leader篇
http://www.itdecent.cn/p/59240c36ba1b
Zab系列8 集群工作原理Follower篇
http://www.itdecent.cn/p/8d7c7f1b2838
Zab系列9 消息順序性
http://www.itdecent.cn/p/0aa96b6a2070
概述
Follower完成一個事務(wù)請求完整的流程:
- 通過請求轉(zhuǎn)發(fā)器,把所有事務(wù)請求轉(zhuǎn)發(fā)給Leader
- 每個Follower在身份確定之后,開啟一個循環(huán),監(jiān)聽和處理Leader發(fā)過來的消息(Follower.followLeader()),最終調(diào)用processPacket(QuorumPacket)
- follower監(jiān)聽到leader的proposal消息的時候,會將該proposal進行持久化,持久化完成之后,反饋一個ACK消息給leader
- 當(dāng)leader統(tǒng)計ack過半的時候,再發(fā)一個commit的消息過來,follower監(jiān)聽到之后,會把該proposal在本地提交,更新 lastProcessZxid,把事務(wù)結(jié)果apply到自己的內(nèi)存樹當(dāng)中
Follower的監(jiān)聽方法followLeader()
循環(huán)監(jiān)聽leader的消息,并且處理
void followLeader() throws InterruptedException {
QuorumServer leaderServer = findLeader();
QuorumPacket qp = new QuorumPacket();
while (this.isRunning()) {
readPacket(qp);
processPacket(qp);
}
}
根據(jù)消息的類型的不同,處理的方式不同
- ping消息
- Proposal消息
- commit消息
protected void processPacket(QuorumPacket qp) throws Exception{
switch (qp.getType()) {
case Leader.PING:
ping(qp);
break;
case Leader.PROPOSAL:
TxnHeader hdr = new TxnHeader();
Record txn = SerializeUtils.deserializeTxn(qp.getData(), hdr);
lastQueued = hdr.getZxid();
case Leader.COMMIT:
fzk.commit(qp.getZxid());
核心流程代碼
- 當(dāng)follower監(jiān)聽到Leader的Proposal消息時,最終會調(diào)用 syncProcessor.processRequest(request);
public void logRequest(TxnHeader hdr, Record txn) {
Request request = new Request(hdr.getClientId(), hdr.getCxid(), hdr.getType(), hdr, txn, hdr.getZxid());
pendingTxns.add(request);
syncProcessor.processRequest(request);
}
syncProcessor完成持久化之后之后,根據(jù)Follower初始化時,把下一個處理器綁定為SendAckRequestProcessor。
SendAckRequestProcessor再調(diào)用learner.writePacket,告知到leader
public void processRequest(Request si) {
QuorumPacket qp = new QuorumPacket(Leader.ACK, si.getHdr().getZxid(), null,null);
learner.writePacket(qp, false);
}
- leader收集到ack時觸發(fā)Leader.tryToCommit,當(dāng)收到過半的Ack后,再廣播commit
synchronized public boolean tryToCommit() {
if (!p.hasAllQuorums()) {
return false;
}
commit(zxid);
inform(p);
zk.commitProcessor.commit(p.request);
FollowerZooKeeperServer的責(zé)任鏈拼裝方式
有兩條責(zé)任鏈:
- 第一條既可以處理非事務(wù)請求,又可以處理leaderCommit之后的請求。firstProcessor :FollowerRequestProcessor -->CommitProcessor-->FinalRequestProcessor
- 第二條是用來申明SyncRequestProcessor的下一個處理器為SendAckRequestProcessor的,這個在leader發(fā)起proposal時會用到
FollowerZooKeeperServer.setupRequestProcessors()
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
commitProcessor = new CommitProcessor(finalProcessor, Long.toString(getServerId()), true, getZooKeeperServerListener());
commitProcessor.start();
firstProcessor = new FollowerRequestProcessor(this, commitProcessor);
((FollowerRequestProcessor) firstProcessor).start();
syncProcessor = new SyncRequestProcessor(this,new SendAckRequestProcessor((Learner)getFollower()));
syncProcessor.start();
}
參考
這個大佬很牛逼,一篇博客講明白了整個事務(wù)操作的流程,以及各個Processor的各個細節(jié)
http://www.itdecent.cn/p/64f6e8124625
大佬視角很高,簡單明了
http://www.itdecent.cn/p/a8b5783eec63