基礎(chǔ)組件介紹
QuorumPeerMain,服務(wù)端啟動類;
? QuorumPeer,集群中的一臺server對象實例;
? QuorunCnxManager,負(fù)責(zé)leader選舉;
? WorkerReceiver,
? WorkerSender,
? FastLeaderElection,leader選舉算法;
QuorumVerifier,包括allMembers、votingMembers、observingMembers
? QuorumServer,(serverId,serverAddress)
QuorumPeerConfig,包含所有配置文件的配置信息,從zoo.cfg解析得到;
ServerConfig,服務(wù)端基本配置;
JettyAdminServer,啟動jetty服務(wù)器,用于通過調(diào)用url執(zhí)行zookeeper命令;
ServerCnxnFactory,啟動客戶端監(jiān)聽,接收連接請求,負(fù)責(zé)socket讀寫;
? ClientCnxn,客戶端連接;
ContainerManager,負(fù)責(zé)清空container的znode,應(yīng)當(dāng)只在leader服務(wù)器運(yùn)行;
ZooKeeperServer,zookeeper核心服務(wù),使用CounDownLatch保證服務(wù)不會退出;
? ZKDatabase,zookeeper內(nèi)存數(shù)據(jù)庫,封裝對zk節(jié)點的操作
? FileTxnSnapLog,實際對節(jié)點進(jìn)行操作
? Snapshot,內(nèi)存數(shù)據(jù)的快照,存放于dataDir;
? TxnLog,事務(wù)操作日志,存放于dataLogDir;
QuorumPocket,網(wǎng)絡(luò)請求對象,所有server間的交互最終都封裝成該對象;
Request,請求對象實體,寫請求對象的hdr(header)不為空,讀請求的hdr為空;
ZooKeeper啟動流程
集群模式:
- 解析配置文件;(zoo.cfg -> QuorumPeerConfig)
- 加載ZKDatabase數(shù)據(jù);(ZKDatabase)
- 根據(jù)snapshot初始化內(nèi)存鏡像;
- 判斷txtlog是否存在zxid大于當(dāng)前內(nèi)存lastProcessedZxid,是則添加到內(nèi)存鏡像中,同時更新lastProcessedZxid;
- 啟動ServerCnxnFactory;(ServerCnxnFactory,接收客戶端連接)
- 啟動AdminServer;(AdminServer,用于接收通過url執(zhí)行的命令)
- 開始leader選舉;(由QuorumCnxManager負(fù)責(zé)投票的接收和發(fā)送,F(xiàn)astLeaderElection實現(xiàn)具體的選舉邏輯)
- 創(chuàng)建選舉算法,當(dāng)前只支持FastLeaderElection;
- 啟動選舉流程;
- 根據(jù)選舉結(jié)果,執(zhí)行各自的leader/follower/observer邏輯;
- follower.followLeader/observer.observeLeader:
- connectToLeader
- registerWithLeader (接收新的epoch)
- syncWithLeader
- leader.lead
- 刪除過期session;
- 啟動learner監(jiān)聽線程,每個連接的服務(wù)器由一個單獨(dú)的LearnerHandler處理;
- syncFollower,同步leader和follower差異;
- 如果lastProcessedZxid == peerLastZxid, 發(fā)送diff消息,不進(jìn)行操作;
- 如果lastProcessedZxid < peerLastZxid,發(fā)送trunc消息,follower放棄更新;
- 如果lastProcessedZxid > peerLastZxid,發(fā)送commit消息;
- 啟動leader消息發(fā)送線程,用于將leader消息發(fā)送給follower;
- 循環(huán)監(jiān)聽follower發(fā)送到leader的消息,根據(jù)消息類型進(jìn)行處理;
- syncFollower,同步leader和follower差異;
- 更新epoch;
- follower.followLeader/observer.observeLeader:
ServerState:
? LOOKING, 該狀態(tài)的server會發(fā)起投票邏輯,選舉成功后轉(zhuǎn)為FOLLOWING或LEADING狀態(tài);
? FOLLOWING, 集群follower,可獨(dú)立處理讀請求,并能將寫請求發(fā)送給leader;
? LEADING, 集群Leader;
? OBSERVING,observer,只能從leader同步數(shù)據(jù);
LearnerType:
? PARTICIPANT,可參與投票選舉;
? OBSERVER,無法發(fā)起投票;
Server交互
zookeeper集群配置中的服務(wù)器,默認(rèn)都有選舉權(quán)限,可手動配置為observer,此時只能從follower同步數(shù)據(jù),不能發(fā)起投票;
默認(rèn)選舉算法實現(xiàn)類:FastLeaderElection,包含WorkerSender和WorkerReceiver線程;
選舉過程管理類:QuorumCnxManager,包括消息發(fā)送(SendWorker)、接收(RecvWorker)以及連接監(jiān)聽(Listener)線程;
logicalclock,當(dāng)前選舉輪次;
1. 選主

服務(wù)啟動時選主,啟動QuorumPeer時,
- QuorumPeer.start(),調(diào)用startLeaderElection方法,初始化當(dāng)前選票、選舉算法以及初始化QuorumCnxManager,其中包括選舉的消息發(fā)送、接收以及連接監(jiān)聽線程;
- QuorumPeer.run(),調(diào)用lookForLeader方法,開始leader選舉;
- server的首張選票都投給自己(myid,lastProcessedZxid,currentEpoch)
- WorkerSender.run(),發(fā)送選票,如果選舉的leader是自身,則直接入隊;如果是其他server,則將選票加入到發(fā)送隊列中,通過網(wǎng)絡(luò)進(jìn)行發(fā)送;
- QuorumCnxManager通過queueSendMap,對每個server都維持了一個發(fā)送隊列;
- 如果當(dāng)前server和目標(biāo)server還沒建立連接,則開始進(jìn)行連接的創(chuàng)建,由QuorumCnxManager中的Listener監(jiān)聽連接請求,初始化SendWorker和RecvWorker線程;
- QuorumCnxManager.SendWorker.run(),從發(fā)送隊列獲取待發(fā)送消息進(jìn)行發(fā)送;
- QuorumCnxManager.RecvWorker.run(),監(jiān)聽到請求消息,將消息添加到接收隊列中recvQueue;
- WorkerReceiver.run(),從接收隊列中獲取消息進(jìn)行解析,添加到recvqueue隊列中;
- QuorumPeer從recvqueue獲取消息,判斷是否接收當(dāng)前投票(n為當(dāng)前接收到的選票,self為自身);
- n.electionEpoch>self.electionEpoch,
- 1.清空票箱
- 2.判斷是否接收選票,接收則更新自身選票
- 3.重新發(fā)送投票;
- n.electionEpoch<self.electionEpoch,忽略當(dāng)前選票;
- n.electionEpoch=self.electionEpoch,
- 判斷是否接收當(dāng)前選票,接收則更新自身選票,重新發(fā)送投票;
- n.electionEpoch>self.electionEpoch,
- 將選票加入票箱,統(tǒng)計投票,判斷當(dāng)前是否已收到所有服務(wù)器的投票,
- 是,等待一段時間,看后續(xù)有沒有新的投票進(jìn)來;
- 有新的投票進(jìn)來,則繼續(xù)下一輪投票;
- 無新的投票進(jìn)來,判斷當(dāng)前服務(wù)是否為leader(是則更新服務(wù)狀態(tài)為leading),否則改為following狀態(tài),結(jié)束投票;
- 否,繼續(xù)投票;
- 是,等待一段時間,看后續(xù)有沒有新的投票進(jìn)來;
- leader執(zhí)行l(wèi)ead方法,follower執(zhí)行followLeader;(Leader,LearnerHandler,F(xiàn)ollower,Learner,RequestProcessor)
- Leader.lead,leader啟動監(jiān)聽線程,epoch+1,監(jiān)聽follower的請求,將leader已提交的提案同步給learner;
- ?
zookeeper選主時消息發(fā)送、接收邏輯如圖所示:

選票(Vote)結(jié)構(gòu)如下所示:
final private int version; //
final private long id; //當(dāng)前zookeeper server的id,當(dāng)前投票所選的leader id
final private long zxid; //會話id
final private long electionEpoch;//當(dāng)前選舉周期,每次投票時,通過logicalclock自增
final private long peerEpoch; //當(dāng)前被選舉的leader周期(leader的選舉輪次)
//zxid由peerEpoch(高32位)和electionEpoch(低32位)和組成
electionEpoch,每執(zhí)行一次leader選舉,electionEpoch就會自增,用來標(biāo)記leader選舉的輪次;
peerEpoch,每次leader選舉完成之后,都會選舉出一個新的peerEpoch,用來標(biāo)記事務(wù)請求所屬的輪次;
判斷是否接收當(dāng)前選票的邏輯如下:
/*
* We return true if one of the following three cases hold:
* 1- New epoch is higher
* 2- New epoch is the same as current epoch, but new zxid is higher
* 3- New epoch is the same as current epoch, new zxid is the same
* as current zxid, but server id is higher.
*/
return ((newEpoch > curEpoch) ||
((newEpoch == curEpoch) &&
((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
ZAB相關(guān)文章:
https://cwiki.apache.org/confluence/display/ZOOKEEPER/ZooKeeperArticles
https://cwiki.apache.org/confluence/display/ZOOKEEPER/Zab1.0
2. 請求處理

1. 讀請求
follower、leader可獨(dú)立響應(yīng)讀請求,server間不需要交互;
2. 寫請求
leader可單獨(dú)響應(yīng)寫請求;
follower在接收到寫請求后,需將請求轉(zhuǎn)發(fā)給leader進(jìn)行處理,leader返回結(jié)果后,follower再響應(yīng)client請求;
主要功能類:
Leader(LeaderZooKeeperServer),LearnerHandler(處理follower請求),
Follower(FollowerZooKeeper),Learner(接收leader發(fā)送的proposal/commit等請求),
Server在接收到客戶端請求之后,將請求提交給RequestProcessor進(jìn)行處理;
Leader和Follower有各自的RequestProcessor處理鏈;
| ZooKeeperServer | RequestProcessor |
|---|---|
| LeaderZooKeeperServer | LeaderRequestProcessor->PrepRequestProcessor->ProposalRequestProcessor(SyncRequestProcessor->AckRequestProcessor)->CommitProcessor-> Leader.ToBeAppliedRequestProcessor->FinalRequestProcessor |
| FollowerZooKeeperServer | FollowerRequestProcessor-> CommitProcessor->FinalRequestProcessor;SyncRequestProcessor->SendAckRequestProcessor |
| ObserverZooKeeperServer | ObserverRequestProcessor->CommitProcessor->FinalRequestProcessor;SyncRequestProcessor |
| ReadOnlyZooKeeperServer | ReadOnlyRequestProcessor->PrepRequestProcessor->FinalRequestProcessor |
Follower RequestProcessor Chain:

Leader RequestProcessor Chain:

ProposalRequestProcessor:
- 將請求轉(zhuǎn)發(fā)到下一個requestProcessor;
- 如果是事務(wù)請求,向follower發(fā)送proposal請求,成功后,調(diào)用SyncRequestProcessor將請求持久化到本地;
CommitProcessor:處理client請求或leader發(fā)起的commit請求;
- 將請求提交到后續(xù)RequestProcessor進(jìn)行處理,根據(jù)請求的sessionId將請求分配給worker線程,因此同一個sessionId的讀請求和寫請求會分配給同一個worker線程,保證了請求的順序性;請求最終由nextProcessor進(jìn)行處理;
SyncRequestProcessor:將請求批量持久化到本地,如果有nextProcessor,再將請求轉(zhuǎn)發(fā)給nextProcessor;
- leader端,如果是事務(wù)性請求,leader將proposal發(fā)送到follower后,調(diào)用syncRequestProcessor處理;
- follower端,follower收到leader發(fā)送的proposal請求后,將請求添加到隊列中,由syncRequestProcessor處理;
- 滾動事務(wù)文件,將sessions和datatree保存至snapshot文件;
SendAckRequestProcessor:learner發(fā)送ack請求到leader;
AckRequestProcessor:
- leader處理ack請求,向所有follower發(fā)送commit請求,同時也通知所有observer;
- 請求入隊列,由CommitProcessor進(jìn)行處理,最終由FinalRequestProcessor進(jìn)行本地持久化調(diào)用;
FollowerRequestProcessor:將請求轉(zhuǎn)發(fā)到下一個RequestProcessor,對于事務(wù)請求,同時會將請求發(fā)給leader;
FinalRequestProcessor:執(zhí)行實際的查詢和事務(wù)請求;
Server - Client交互
1. 請求處理基本流程
-
ServerCnxnFactory接收客戶端請求;
- readable
- 連接請求
- 如果請求sessionId為0,則新建session
- 否則首先關(guān)閉sessionId對應(yīng)的session,再重新打開
- 其他請求
- 讀請求
- 寫請求
- 連接請求
- writeable
- 服務(wù)端結(jié)果響應(yīng)
- readable
-
ServerCnxnFactory將客戶端請求提交給RequestProcessor進(jìn)行處理;
?
NIOServerCnxnFactory:
NIOServerCnxn
AcceptThread(接收客戶端連接請求) -> SelectorThread(接收客戶端io請求) -> WorkerService(執(zhí)行具體io操作)
ConnectionExpirerThread(關(guān)閉過期連接)
NettyServerCnxnFactory:
NettyServerCnxn
CnxnChannelHandler(處理客戶端請求)
2. session管理
session設(shè)計:
zookeeper session設(shè)計,主要出于2方面考慮。
- 為了滿足臨時節(jié)點的問題,當(dāng)session斷開時,需要將關(guān)聯(lián)的臨界節(jié)點都刪除。
- 同一個client連接服務(wù)端進(jìn)行的操作,必需順序執(zhí)行,通過同樣的sessionid將請求交由同一個線程進(jìn)行處理,保證了請求的順序執(zhí)行。
- 服務(wù)端接收到客戶端連接請求時,創(chuàng)建session;
- 由SessionTracker維護(hù)對session的操作,包括檢測過期的session,調(diào)用SessionExpirer關(guān)閉session;
SessionTracker<Interface> -> SessionTrackerImpl
? session管理,負(fù)責(zé)過期session的檢測,以及調(diào)用session清除方法;
SessionExpirer<Interface> -> ZooKeeperServer
? 提交session關(guān)閉請求,關(guān)閉session;
LocalSession:通過localSessionsEnabled在配置文件中配置;
1. 創(chuàng)建
客戶端與服務(wù)端建立連接后,發(fā)送ConnectRequest到服務(wù)端,服務(wù)端接收到請求后,創(chuàng)建session,設(shè)置owner為當(dāng)前LearnerHandler(一個LearnerHandler對應(yīng)一個Learner);
如果請求的zxid大于服務(wù)端lastProcessedZxid,返回連接異常,客戶端需要連接其他服務(wù)器;
如果連接請求自帶的sessionid=0,則創(chuàng)建新的session,否則校驗sessionId和passwd是否合法,是則重新激活對應(yīng)的session,設(shè)置session owner為連接的服務(wù)器;
2. 激活
session激活:
- 客戶端訪問follower,follower會更新本地session過期時間;在follower響應(yīng)leader的ping請求時,會將本地session過期時間同步給leader;
- 客戶端訪問leader,直接更新session過期時間;
session激活時,重新計算過期時間,將session轉(zhuǎn)移到新的過期時間對應(yīng)的桶中,SessionTracker.touchSession;
3. 過期
session過期策略:分桶,每個桶以過期時間為id,關(guān)聯(lián)一系列過期時間相同的session;
執(zhí)行過期超過時,發(fā)送closeSession請求,刪除內(nèi)存數(shù)據(jù)庫中該sessionId關(guān)聯(lián)的所有臨時節(jié)點;
4. 校驗
對于客戶端的事務(wù)請求,由PrepRequestProcessor校驗session的有效性,服務(wù)端可能返回SessionExpiredException和SessionMovedException異常(由于網(wǎng)絡(luò)原因,session未過期,但客戶端與新的服務(wù)器建立了連接);
http://www.leexide.com/%E5%88%86%E5%B8%83%E5%BC%8F%E7%B3%BB%E7%BB%9F/zookeeper/651.html
3. 寫數(shù)據(jù)
Leader處理:
Leader將需要發(fā)送的Proposal入隊列,由LearnerHandler啟動的獨(dú)立線程獲取并發(fā)送給Follower;
LearnerHandler同時監(jiān)聽Follower的ACK請求,收到響應(yīng)后加入投票隊列,判斷當(dāng)前收到的投票是否已滿足多數(shù);滿足則將發(fā)送commit請求給follower,同時通知observer,最后本機(jī)提交當(dāng)前Proposal;否則繼續(xù)等待響應(yīng);
Follower處理:
事務(wù)請求轉(zhuǎn)發(fā)到Leader;
收到Leader的Proposal請求時,等待寫磁盤,寫成功后,發(fā)送ACK響應(yīng);
收到commit請求時,本地提交該P(yáng)roposal;

ZooKeeper請求處理流程:
http://blog.csdn.net/u011277123/article/details/53637037
4. 讀數(shù)據(jù)
leader/follower接收到client讀請求后,返回請求結(jié)果;
5. Watcher管理
zookeeper維護(hù)2各watch列表,節(jié)點數(shù)據(jù)的watch和子節(jié)點的watch;
watcher是一次性監(jiān)聽,觸發(fā)后需要重新設(shè)置監(jiān)聽;
異步通知;
- watcher觸發(fā)
| Trigger | EventType | Watches |
|---|---|---|
| setData/reconfig | EventType.NodeDataChanged | path |
| create | EventType.NodeCreated | path |
| create | EventType.NodeChildrenChanged | parent_path |
| delete/deleteContainer | EventType.NodeDeleted | path |
| delete/deleteContainer | EventType.NodeChildrenChanged | parent_path |
- watcher設(shè)置
| Trigger | Watches |
|---|---|
| getChildren/getChildren2 | child_path |
| getData | path |
| setWatches | path |
| exists | path |
note:上述內(nèi)容是個人學(xué)習(xí)zookeeper源碼后的理解,難免有不當(dāng)之處,歡迎交流。