zookeeper服務(wù)端實現(xiàn)簡述

基礎(chǔ)組件介紹

QuorumPeerMain,服務(wù)端啟動類;

? QuorumPeer,集群中的一臺server對象實例;

? QuorunCnxManager,負(fù)責(zé)leader選舉;

? WorkerReceiver,

? WorkerSender,

? FastLeaderElection,leader選舉算法;

QuorumVerifier,包括allMembers、votingMembers、observingMembers

? QuorumServer,(serverId,serverAddress)

QuorumPeerConfig,包含所有配置文件的配置信息,從zoo.cfg解析得到;

ServerConfig,服務(wù)端基本配置;

JettyAdminServer,啟動jetty服務(wù)器,用于通過調(diào)用url執(zhí)行zookeeper命令;

ServerCnxnFactory,啟動客戶端監(jiān)聽,接收連接請求,負(fù)責(zé)socket讀寫;

? ClientCnxn,客戶端連接;

ContainerManager,負(fù)責(zé)清空container的znode,應(yīng)當(dāng)只在leader服務(wù)器運(yùn)行;

ZooKeeperServer,zookeeper核心服務(wù),使用CounDownLatch保證服務(wù)不會退出;

? ZKDatabase,zookeeper內(nèi)存數(shù)據(jù)庫,封裝對zk節(jié)點的操作

? FileTxnSnapLog,實際對節(jié)點進(jìn)行操作

? Snapshot,內(nèi)存數(shù)據(jù)的快照,存放于dataDir;

? TxnLog,事務(wù)操作日志,存放于dataLogDir;

QuorumPocket,網(wǎng)絡(luò)請求對象,所有server間的交互最終都封裝成該對象;

Request,請求對象實體,寫請求對象的hdr(header)不為空,讀請求的hdr為空;

ZooKeeper啟動流程

集群模式:

  1. 解析配置文件;(zoo.cfg -> QuorumPeerConfig)
  2. 加載ZKDatabase數(shù)據(jù);(ZKDatabase)
    1. 根據(jù)snapshot初始化內(nèi)存鏡像;
    2. 判斷txtlog是否存在zxid大于當(dāng)前內(nèi)存lastProcessedZxid,是則添加到內(nèi)存鏡像中,同時更新lastProcessedZxid;
  3. 啟動ServerCnxnFactory;(ServerCnxnFactory,接收客戶端連接)
  4. 啟動AdminServer;(AdminServer,用于接收通過url執(zhí)行的命令)
  5. 開始leader選舉;(由QuorumCnxManager負(fù)責(zé)投票的接收和發(fā)送,F(xiàn)astLeaderElection實現(xiàn)具體的選舉邏輯)
    1. 創(chuàng)建選舉算法,當(dāng)前只支持FastLeaderElection;
    2. 啟動選舉流程;
  6. 根據(jù)選舉結(jié)果,執(zhí)行各自的leader/follower/observer邏輯;
    • follower.followLeader/observer.observeLeader:
      1. connectToLeader
      2. registerWithLeader (接收新的epoch)
      3. syncWithLeader
    • leader.lead
      1. 刪除過期session;
      2. 啟動learner監(jiān)聽線程,每個連接的服務(wù)器由一個單獨(dú)的LearnerHandler處理;
        1. syncFollower,同步leader和follower差異;
          1. 如果lastProcessedZxid == peerLastZxid, 發(fā)送diff消息,不進(jìn)行操作;
          2. 如果lastProcessedZxid < peerLastZxid,發(fā)送trunc消息,follower放棄更新;
          3. 如果lastProcessedZxid > peerLastZxid,發(fā)送commit消息;
        2. 啟動leader消息發(fā)送線程,用于將leader消息發(fā)送給follower;
        3. 循環(huán)監(jiān)聽follower發(fā)送到leader的消息,根據(jù)消息類型進(jìn)行處理;
      3. 更新epoch;

ServerState:

? LOOKING, 該狀態(tài)的server會發(fā)起投票邏輯,選舉成功后轉(zhuǎn)為FOLLOWING或LEADING狀態(tài);

? FOLLOWING, 集群follower,可獨(dú)立處理讀請求,并能將寫請求發(fā)送給leader;

? LEADING, 集群Leader;

? OBSERVING,observer,只能從leader同步數(shù)據(jù);

LearnerType:

? PARTICIPANT,可參與投票選舉;

? OBSERVER,無法發(fā)起投票;

Server交互

zookeeper集群配置中的服務(wù)器,默認(rèn)都有選舉權(quán)限,可手動配置為observer,此時只能從follower同步數(shù)據(jù),不能發(fā)起投票;

默認(rèn)選舉算法實現(xiàn)類:FastLeaderElection,包含WorkerSender和WorkerReceiver線程;

選舉過程管理類:QuorumCnxManager,包括消息發(fā)送(SendWorker)、接收(RecvWorker)以及連接監(jiān)聽(Listener)線程;

logicalclock,當(dāng)前選舉輪次;

1. 選主

leader-election.png

服務(wù)啟動時選主,啟動QuorumPeer時,

  1. QuorumPeer.start(),調(diào)用startLeaderElection方法,初始化當(dāng)前選票、選舉算法以及初始化QuorumCnxManager,其中包括選舉的消息發(fā)送、接收以及連接監(jiān)聽線程;
  2. QuorumPeer.run(),調(diào)用lookForLeader方法,開始leader選舉;
    1. server的首張選票都投給自己(myid,lastProcessedZxid,currentEpoch)
  3. WorkerSender.run(),發(fā)送選票,如果選舉的leader是自身,則直接入隊;如果是其他server,則將選票加入到發(fā)送隊列中,通過網(wǎng)絡(luò)進(jìn)行發(fā)送;
    • QuorumCnxManager通過queueSendMap,對每個server都維持了一個發(fā)送隊列;
    • 如果當(dāng)前server和目標(biāo)server還沒建立連接,則開始進(jìn)行連接的創(chuàng)建,由QuorumCnxManager中的Listener監(jiān)聽連接請求,初始化SendWorker和RecvWorker線程;
  4. QuorumCnxManager.SendWorker.run(),從發(fā)送隊列獲取待發(fā)送消息進(jìn)行發(fā)送;
  5. QuorumCnxManager.RecvWorker.run(),監(jiān)聽到請求消息,將消息添加到接收隊列中recvQueue;
  6. WorkerReceiver.run(),從接收隊列中獲取消息進(jìn)行解析,添加到recvqueue隊列中;
  7. QuorumPeer從recvqueue獲取消息,判斷是否接收當(dāng)前投票(n為當(dāng)前接收到的選票,self為自身);
    • n.electionEpoch>self.electionEpoch,
      • 1.清空票箱
      • 2.判斷是否接收選票,接收則更新自身選票
      • 3.重新發(fā)送投票;
    • n.electionEpoch<self.electionEpoch,忽略當(dāng)前選票;
    • n.electionEpoch=self.electionEpoch,
      • 判斷是否接收當(dāng)前選票,接收則更新自身選票,重新發(fā)送投票;
  8. 將選票加入票箱,統(tǒng)計投票,判斷當(dāng)前是否已收到所有服務(wù)器的投票,
    • 是,等待一段時間,看后續(xù)有沒有新的投票進(jìn)來;
      1. 有新的投票進(jìn)來,則繼續(xù)下一輪投票;
      2. 無新的投票進(jìn)來,判斷當(dāng)前服務(wù)是否為leader(是則更新服務(wù)狀態(tài)為leading),否則改為following狀態(tài),結(jié)束投票;
    • 否,繼續(xù)投票;
  9. leader執(zhí)行l(wèi)ead方法,follower執(zhí)行followLeader;(Leader,LearnerHandler,F(xiàn)ollower,Learner,RequestProcessor)
    • Leader.lead,leader啟動監(jiān)聽線程,epoch+1,監(jiān)聽follower的請求,將leader已提交的提案同步給learner;
    • ?

zookeeper選主時消息發(fā)送、接收邏輯如圖所示:

zookeeper messenger.png

選票(Vote)結(jié)構(gòu)如下所示:

final private int version;      //

final private long id;          //當(dāng)前zookeeper server的id,當(dāng)前投票所選的leader id
    
final private long zxid;        //會話id
    
final private long electionEpoch;//當(dāng)前選舉周期,每次投票時,通過logicalclock自增
    
final private long peerEpoch;    //當(dāng)前被選舉的leader周期(leader的選舉輪次)

//zxid由peerEpoch(高32位)和electionEpoch(低32位)和組成

electionEpoch,每執(zhí)行一次leader選舉,electionEpoch就會自增,用來標(biāo)記leader選舉的輪次;

peerEpoch,每次leader選舉完成之后,都會選舉出一個新的peerEpoch,用來標(biāo)記事務(wù)請求所屬的輪次;

判斷是否接收當(dāng)前選票的邏輯如下:

 /*
 * We return true if one of the following three cases hold:
 * 1- New epoch is higher
 * 2- New epoch is the same as current epoch, but new zxid is higher
 * 3- New epoch is the same as current epoch, new zxid is the same
 *  as current zxid, but server id is higher.
 */
return ((newEpoch > curEpoch) ||
        ((newEpoch == curEpoch) &&
         ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));

ZAB相關(guān)文章:

https://cwiki.apache.org/confluence/display/ZOOKEEPER/ZooKeeperArticles

https://cwiki.apache.org/confluence/display/ZOOKEEPER/Zab1.0

2. 請求處理

server-sync.png
1. 讀請求

follower、leader可獨(dú)立響應(yīng)讀請求,server間不需要交互;

2. 寫請求

leader可單獨(dú)響應(yīng)寫請求;

follower在接收到寫請求后,需將請求轉(zhuǎn)發(fā)給leader進(jìn)行處理,leader返回結(jié)果后,follower再響應(yīng)client請求;

主要功能類:

Leader(LeaderZooKeeperServer),LearnerHandler(處理follower請求),

Follower(FollowerZooKeeper),Learner(接收leader發(fā)送的proposal/commit等請求),

Server在接收到客戶端請求之后,將請求提交給RequestProcessor進(jìn)行處理;

Leader和Follower有各自的RequestProcessor處理鏈;

ZooKeeperServer RequestProcessor
LeaderZooKeeperServer LeaderRequestProcessor->PrepRequestProcessor->ProposalRequestProcessor(SyncRequestProcessor->AckRequestProcessor)->CommitProcessor-> Leader.ToBeAppliedRequestProcessor->FinalRequestProcessor
FollowerZooKeeperServer FollowerRequestProcessor-> CommitProcessor->FinalRequestProcessor;SyncRequestProcessor->SendAckRequestProcessor
ObserverZooKeeperServer ObserverRequestProcessor->CommitProcessor->FinalRequestProcessor;SyncRequestProcessor
ReadOnlyZooKeeperServer ReadOnlyRequestProcessor->PrepRequestProcessor->FinalRequestProcessor

Follower RequestProcessor Chain:

follower-requestprocessor-chain.png

Leader RequestProcessor Chain:

leader-requestprocessor-chain.png

ProposalRequestProcessor:

  1. 將請求轉(zhuǎn)發(fā)到下一個requestProcessor;
  2. 如果是事務(wù)請求,向follower發(fā)送proposal請求,成功后,調(diào)用SyncRequestProcessor將請求持久化到本地;

CommitProcessor:處理client請求或leader發(fā)起的commit請求;

  1. 將請求提交到后續(xù)RequestProcessor進(jìn)行處理,根據(jù)請求的sessionId將請求分配給worker線程,因此同一個sessionId的讀請求和寫請求會分配給同一個worker線程,保證了請求的順序性;請求最終由nextProcessor進(jìn)行處理;

SyncRequestProcessor:將請求批量持久化到本地,如果有nextProcessor,再將請求轉(zhuǎn)發(fā)給nextProcessor;

  • leader端,如果是事務(wù)性請求,leader將proposal發(fā)送到follower后,調(diào)用syncRequestProcessor處理;
  • follower端,follower收到leader發(fā)送的proposal請求后,將請求添加到隊列中,由syncRequestProcessor處理;
  • 滾動事務(wù)文件,將sessions和datatree保存至snapshot文件;

SendAckRequestProcessor:learner發(fā)送ack請求到leader;

AckRequestProcessor:

  1. leader處理ack請求,向所有follower發(fā)送commit請求,同時也通知所有observer;
  2. 請求入隊列,由CommitProcessor進(jìn)行處理,最終由FinalRequestProcessor進(jìn)行本地持久化調(diào)用;

FollowerRequestProcessor:將請求轉(zhuǎn)發(fā)到下一個RequestProcessor,對于事務(wù)請求,同時會將請求發(fā)給leader;

FinalRequestProcessor:執(zhí)行實際的查詢和事務(wù)請求;

Server - Client交互

1. 請求處理基本流程

  1. ServerCnxnFactory接收客戶端請求;

    1. readable
      • 連接請求
        1. 如果請求sessionId為0,則新建session
        2. 否則首先關(guān)閉sessionId對應(yīng)的session,再重新打開
      • 其他請求
        • 讀請求
        • 寫請求
    2. writeable
      • 服務(wù)端結(jié)果響應(yīng)
  2. ServerCnxnFactory將客戶端請求提交給RequestProcessor進(jìn)行處理;

    ?

NIOServerCnxnFactory

NIOServerCnxn

AcceptThread(接收客戶端連接請求) -> SelectorThread(接收客戶端io請求) -> WorkerService(執(zhí)行具體io操作)

ConnectionExpirerThread(關(guān)閉過期連接)

NettyServerCnxnFactory

NettyServerCnxn

CnxnChannelHandler(處理客戶端請求)

2. session管理

session設(shè)計:
zookeeper session設(shè)計,主要出于2方面考慮。

  1. 為了滿足臨時節(jié)點的問題,當(dāng)session斷開時,需要將關(guān)聯(lián)的臨界節(jié)點都刪除。
  2. 同一個client連接服務(wù)端進(jìn)行的操作,必需順序執(zhí)行,通過同樣的sessionid將請求交由同一個線程進(jìn)行處理,保證了請求的順序執(zhí)行。
  1. 服務(wù)端接收到客戶端連接請求時,創(chuàng)建session;
  2. 由SessionTracker維護(hù)對session的操作,包括檢測過期的session,調(diào)用SessionExpirer關(guān)閉session;

SessionTracker<Interface> -> SessionTrackerImpl

? session管理,負(fù)責(zé)過期session的檢測,以及調(diào)用session清除方法;

SessionExpirer<Interface> -> ZooKeeperServer

? 提交session關(guān)閉請求,關(guān)閉session;

LocalSession:通過localSessionsEnabled在配置文件中配置;

1. 創(chuàng)建

客戶端與服務(wù)端建立連接后,發(fā)送ConnectRequest到服務(wù)端,服務(wù)端接收到請求后,創(chuàng)建session,設(shè)置owner為當(dāng)前LearnerHandler(一個LearnerHandler對應(yīng)一個Learner);

如果請求的zxid大于服務(wù)端lastProcessedZxid,返回連接異常,客戶端需要連接其他服務(wù)器;

如果連接請求自帶的sessionid=0,則創(chuàng)建新的session,否則校驗sessionId和passwd是否合法,是則重新激活對應(yīng)的session,設(shè)置session owner為連接的服務(wù)器;

2. 激活

session激活:

  1. 客戶端訪問follower,follower會更新本地session過期時間;在follower響應(yīng)leader的ping請求時,會將本地session過期時間同步給leader;
  2. 客戶端訪問leader,直接更新session過期時間;

session激活時,重新計算過期時間,將session轉(zhuǎn)移到新的過期時間對應(yīng)的桶中,SessionTracker.touchSession;

3. 過期

session過期策略:分桶,每個桶以過期時間為id,關(guān)聯(lián)一系列過期時間相同的session;

執(zhí)行過期超過時,發(fā)送closeSession請求,刪除內(nèi)存數(shù)據(jù)庫中該sessionId關(guān)聯(lián)的所有臨時節(jié)點;

4. 校驗

對于客戶端的事務(wù)請求,由PrepRequestProcessor校驗session的有效性,服務(wù)端可能返回SessionExpiredException和SessionMovedException異常(由于網(wǎng)絡(luò)原因,session未過期,但客戶端與新的服務(wù)器建立了連接);

http://www.leexide.com/%E5%88%86%E5%B8%83%E5%BC%8F%E7%B3%BB%E7%BB%9F/zookeeper/651.html

3. 寫數(shù)據(jù)

Leader處理:

Leader將需要發(fā)送的Proposal入隊列,由LearnerHandler啟動的獨(dú)立線程獲取并發(fā)送給Follower;

LearnerHandler同時監(jiān)聽Follower的ACK請求,收到響應(yīng)后加入投票隊列,判斷當(dāng)前收到的投票是否已滿足多數(shù);滿足則將發(fā)送commit請求給follower,同時通知observer,最后本機(jī)提交當(dāng)前Proposal;否則繼續(xù)等待響應(yīng);

Follower處理:

事務(wù)請求轉(zhuǎn)發(fā)到Leader;

收到Leader的Proposal請求時,等待寫磁盤,寫成功后,發(fā)送ACK響應(yīng);

收到commit請求時,本地提交該P(yáng)roposal;

txn-request-process.png

ZooKeeper請求處理流程:

http://blog.csdn.net/u011277123/article/details/53637037

4. 讀數(shù)據(jù)

leader/follower接收到client讀請求后,返回請求結(jié)果;

5. Watcher管理

zookeeper維護(hù)2各watch列表,節(jié)點數(shù)據(jù)的watch和子節(jié)點的watch;

watcher是一次性監(jiān)聽,觸發(fā)后需要重新設(shè)置監(jiān)聽;

異步通知;

  1. watcher觸發(fā)
Trigger EventType Watches
setData/reconfig EventType.NodeDataChanged path
create EventType.NodeCreated path
create EventType.NodeChildrenChanged parent_path
delete/deleteContainer EventType.NodeDeleted path
delete/deleteContainer EventType.NodeChildrenChanged parent_path
  1. watcher設(shè)置
Trigger Watches
getChildren/getChildren2 child_path
getData path
setWatches path
exists path

note:上述內(nèi)容是個人學(xué)習(xí)zookeeper源碼后的理解,難免有不當(dāng)之處,歡迎交流。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容