Zookeeper Watcher機(jī)制

ZooKeeper是用來(lái)協(xié)調(diào)(同步)分布式進(jìn)程的服務(wù),多個(gè)分布式進(jìn)程通過(guò)ZooKeeper提供的API來(lái)操作共享的ZooKeeper內(nèi)存數(shù)據(jù)對(duì)象ZNode來(lái)達(dá)成某種一致的行為或結(jié)果,這種模式本質(zhì)上是基于狀態(tài)共享的并發(fā)模型。ZooKeeper實(shí)現(xiàn)這些分布式進(jìn)程的狀態(tài)(ZNode的Data、Children)共享時(shí),基于性能的考慮采用了類似的異步非阻塞的主動(dòng)通知模式即Watch機(jī)制,使得分布式進(jìn)程之間的“共享狀態(tài)通信”更加實(shí)時(shí)高效。注意,這種共享也需要zookeeper使得分布式進(jìn)程能夠順序執(zhí)行,保證結(jié)果的正確性,Zab協(xié)議使得ZooKeeper的內(nèi)部修改狀態(tài)操作直接是有序串行的。在此不討論zab協(xié)議。

Zookeeper Watcher架構(gòu)

Zookeeper Watcher流程是客戶端向服務(wù)端的某個(gè)節(jié)點(diǎn)路徑上注冊(cè)一個(gè)watcher,客戶端同時(shí)會(huì)在本地watcherManager中存儲(chǔ)特定的watcher,當(dāng)發(fā)生節(jié)點(diǎn)數(shù)據(jù)或者節(jié)點(diǎn)子節(jié)點(diǎn)變化時(shí),服務(wù)端會(huì)通知客戶端節(jié)點(diǎn)變化信息,然后客戶端收到通知后,會(huì)調(diào)用回調(diào)函數(shù)。

實(shí)現(xiàn)原理
Watcher接口:客戶端用來(lái)接收從 ZooKeeper 服務(wù)端發(fā)過(guò)來(lái)的消息并且同步地處理這些消息,如果要處理這個(gè)消息,需要為客戶端注冊(cè)一個(gè) CallBack(回調(diào))watcher對(duì)象。設(shè)計(jì)如下:


在 Watcher 接口里面,除了回調(diào)函數(shù) process 以外,還包含 KeeperState 和 EventType 兩個(gè)枚舉類,分別代表了事件發(fā)生時(shí)ZooKeeper連接狀態(tài)和事件的類型。
watcher通知狀態(tài)和事件類型表

根據(jù)特定的事件,調(diào)用process(WatchedEvent event)方法對(duì)事件進(jìn)行處理。

WatchedEventWatcherEvent都表示的是同一個(gè)事物,都是對(duì)一個(gè)服務(wù)端事件的封裝。不同的是,WatchedEvent 是一個(gè)邏輯事件,用于服務(wù)端和客戶端程序執(zhí)行過(guò)程中所需的邏輯對(duì)象,而 WatcherEvent 因?yàn)閷?shí)現(xiàn)了序列化接口,因此可以用于網(wǎng)絡(luò)傳輸。
服務(wù)端在線程 WatchedEvent 事件之后,會(huì)調(diào)用 getWrapper 方法將自己包裝成一個(gè)可序列化的 WatcherEvent 事,以便通過(guò)網(wǎng)絡(luò)傳輸?shù)娇蛻舳恕?蛻舳嗽诮邮盏椒?wù)端的這個(gè)事件對(duì)象后,首先會(huì)將 WatcherEvent 事件還原成一個(gè) WatchedEvent 事件,并傳遞給 process 方法處理,回調(diào)方法 process 根據(jù)入?yún)⒕湍軌蚪馕龀鐾暾姆?wù)端事件了。

客戶端注冊(cè)Watcher
涉及接口:

//創(chuàng)建zk客戶端對(duì)象實(shí)例時(shí)注冊(cè),這個(gè) Watcher 將作為整個(gè) ZooKeeper 會(huì)話期間的默認(rèn) Watcher,
//會(huì)一直被保存在客戶端 ZKWatchManager 的 defaultWatcher 里面,
//如果這個(gè)被創(chuàng)建的節(jié)點(diǎn)在其它時(shí)候被創(chuàng)建watcher并注冊(cè),則這個(gè)默認(rèn)的watcher會(huì)被覆蓋
//watcher觸發(fā)一次就會(huì)失效,不管是創(chuàng)建節(jié)點(diǎn)時(shí)的 watcher 還是以后創(chuàng)建的 watcher.因?yàn)榉?wù)端每次觸發(fā)之后就會(huì)刪掉服務(wù)端的watcher
ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)

getChildren(String path, Watcher watcher)
//Boolean watch表示是否使用上下文中默認(rèn)的watcher,即創(chuàng)建zk實(shí)例時(shí)設(shè)置的watcher
getChildren(String path, boolean watch)

getData(String path, boolean watch, Stat stat)
getData(String path, Watcher watcher, AsyncCallback.DataCallback cb, Object ctx)

exists(String path, boolean watch)
exists(String path, Watcher watcher)

客戶端注冊(cè)流程

在 ZooKeeper 中,Packet 是一個(gè)最小的通信協(xié)議單元,即數(shù)據(jù)包。Pakcet 用于進(jìn)行客戶端與服務(wù)端之間的網(wǎng)絡(luò)傳輸,任何需要傳輸?shù)膶?duì)象都需要包裝成一個(gè) Packet 對(duì)象。在 ClientCnxn 中 WatchRegistration 也會(huì)被封裝到 Pakcet 中,然后由 SendThread 線程調(diào)用 queuePacke 方法把 Packet 放入發(fā)送隊(duì)列中等待客戶端發(fā)送,這又是一個(gè)異步過(guò)程,分布式系統(tǒng)采用異步通信是一個(gè)普遍認(rèn)同的觀念。隨后,SendThread 線程會(huì)通過(guò) readResponse 方法接收來(lái)自服務(wù)端的響應(yīng),異步地調(diào)用 finishPacket 方法從 Packet 中取出對(duì)應(yīng)的 Watcher 并注冊(cè)到 ZKWatchManager 中去。
WatcherRegistation 除了 Header 和 request 兩個(gè)屬性被傳遞到了服務(wù)端,其他都沒(méi)有到服務(wù)端,否則服務(wù)端就容易出現(xiàn)內(nèi)存緊張甚至溢出的危險(xiǎn),因?yàn)閿?shù)據(jù)量太大了。這就是 ZooKeeper 為什么適用于分布式環(huán)境的原因,它在網(wǎng)絡(luò)中傳輸?shù)氖窍?,而不是?shù)據(jù)包實(shí)體。
服務(wù)端處理 Watcher 流程

對(duì)于注冊(cè) Watcher 請(qǐng)求,F(xiàn)inalRequestProcessor 的 ProcessRequest 方法會(huì)判斷當(dāng)前請(qǐng)求是否需要注冊(cè) Watcher,如果為 true,就會(huì)將當(dāng)前的 ServerCnxn 對(duì)象和數(shù)據(jù)節(jié)點(diǎn)路徑傳入 getData 方法中去。ServerCnxn 是一個(gè) ZooKeeper 客戶端和服務(wù)器之間的連接接口,代表了一個(gè)客戶端和服務(wù)器的連接,我們后面講到的 process 回調(diào)方法,實(shí)際上也是從這里回調(diào)的,所以可以把 ServerCnxn 看作是一個(gè) Watcher 對(duì)象。數(shù)據(jù)節(jié)點(diǎn)的節(jié)點(diǎn)路徑和 ServerCnxn 最終會(huì)被存儲(chǔ)在 WatchManager 的 watchTable 和 watch2Paths 中。
WatchManager 負(fù)責(zé) Watcher 事件的觸發(fā),它是一個(gè)統(tǒng)稱,在服務(wù)端 DataTree 會(huì)托管兩個(gè) WatchManager,分別是watchTable和 watch2Paths,分別對(duì)應(yīng)數(shù)據(jù)變更 Watcher 和節(jié)點(diǎn)變更 Watcher。

當(dāng)DataTree中節(jié)點(diǎn)數(shù)據(jù)內(nèi)容或版本發(fā)生變化或節(jié)點(diǎn)變更時(shí),會(huì)調(diào)用相應(yīng)方法去觸發(fā) WatchManager 的 triggerWatch 方法,該方法返回 ZNODE 的信息,自此進(jìn)入到回調(diào)本地 process 的序列。

public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type,
 KeeperState.SyncConnected, path);
//將事件類型(EventType)、通知狀態(tài)(WatchedEvent)、節(jié)點(diǎn)路徑封裝成一個(gè) WatchedEvent 對(duì)象
 HashSet<Watcher> watchers;
 synchronized (this) {
//根據(jù)數(shù)據(jù)節(jié)點(diǎn)的節(jié)點(diǎn)路徑從 watchTable 里面取出對(duì)應(yīng)的 Watcher。如果沒(méi)有找到 Watcher 對(duì)象,
//說(shuō)明沒(méi)有任何客戶端在該數(shù)據(jù)節(jié)點(diǎn)上注冊(cè)過(guò) Watcher,直接退出。如果找打了 Watcher 就將其提取出來(lái),
//同時(shí)會(huì)直接從 watchTable 和 watch2Paths 里刪除 Watcher,即 Watcher 是一次性的,觸發(fā)一次就失效了。
 watchers = watchTable.remove(path);
for (Watcher w : watchers) {
 HashSet<String> paths = watch2Paths.get(w);
 }
 }
 for (Watcher w : watchers) {
 if (supress != null && supress.contains(w)) {
 continue;
 }
//對(duì)于需要注冊(cè) Watcher 的請(qǐng)求,ZooKeeper 會(huì)把請(qǐng)求對(duì)應(yīng)的ServerCnxn 作為一個(gè) Watcher 存儲(chǔ),
//所以這里調(diào)用的 process 方法實(shí)質(zhì)上是 ServerCnxn 的對(duì)應(yīng)方法
 w.process(e);
 }
 return watchers;
}

ServerCnxn 類代碼

synchronized public void process(WatchedEvent event) {
 ReplyHeader h = new ReplyHeader(-1, -1L, 0);
 if (LOG.isTraceEnabled()) {
 ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
 "Deliver event " + event + " to 0x"
 + Long.toHexString(this.sessionId)
 + " through " + this);
 }
 
 // Convert WatchedEvent to a type that can be sent over the wire
 WatcherEvent e = event.getWrapper();
 
 sendResponse(h, e, "notification");
 }

客戶端收到消息后,會(huì)調(diào)用 ClientCnxn 的 SendThread.readResponse 方法來(lái)進(jìn)行統(tǒng)一處理,如清單所示。如果響應(yīng)頭 replyHdr 中標(biāo)識(shí)的 Xid 為 02,表示是 ping,如果為-4,表示是驗(yàn)證包,如果是-1,表示這是一個(gè)通知類型的響應(yīng),然后進(jìn)行反序列化、處理 chrootPath、還原 WatchedEvent、回調(diào) Watcher 等步驟,其中回調(diào) Watcher 步驟將 WacthedEvent 對(duì)象交給 EventThread 線程,在下一個(gè)輪詢周期中進(jìn)行 Watcher 回調(diào)。

Zookeeper Watcher特點(diǎn)
注冊(cè)只能確保一次消費(fèi)

無(wú)論是服務(wù)端還是客戶端,一旦一個(gè) Watcher 被觸發(fā),ZooKeeper 都會(huì)將其從相應(yīng)的存儲(chǔ)中移除。因此,開發(fā)人員在 Watcher 的使用上要記住的一點(diǎn)是需要反復(fù)注冊(cè)。這樣的設(shè)計(jì)有效地減輕了服務(wù)端的壓力。如果注冊(cè)一個(gè) Watcher 之后一直有效,那么針對(duì)那些更新非常頻繁的節(jié)點(diǎn),服務(wù)端會(huì)不斷地向客戶端發(fā)送事件通知,這無(wú)論對(duì)于網(wǎng)絡(luò)還是服務(wù)端性能的影響都非常大。
持久Watcher需要每次收到通知事件后重復(fù)注冊(cè)。
客戶端串行執(zhí)行

客戶端 Watcher 回調(diào)的過(guò)程是一個(gè)串行同步的過(guò)程,這為我們保證了順序,同時(shí),需要開發(fā)人員注意的一點(diǎn)是,千萬(wàn)不要因?yàn)橐粋€(gè) Watcher 的處理邏輯影響了整個(gè)客戶端的 Watcher 回調(diào)。

輕量級(jí)設(shè)計(jì)

WatchedEvent 是 ZooKeeper 整個(gè) Watcher 通知機(jī)制的最小通知單元,這個(gè)數(shù)據(jù)結(jié)構(gòu)中只包含三部分的內(nèi)容:通知狀態(tài)、事件類型和節(jié)點(diǎn)路徑。也就是說(shuō),Watcher 通知非常簡(jiǎn)單,只會(huì)告訴客戶端發(fā)生了事件,而不會(huì)說(shuō)明事件的具體內(nèi)容。例如針對(duì) NodeDataChanged 事件,ZooKeeper 的 Watcher 只會(huì)通知客戶指定數(shù)據(jù)節(jié)點(diǎn)的數(shù)據(jù)內(nèi)容發(fā)生了變更,而對(duì)于原始數(shù)據(jù)以及變更后的新數(shù)據(jù)都無(wú)法從這個(gè)事件中直接獲取到,而是需要客戶端主動(dòng)重新去獲取數(shù)據(jù),這也是 ZooKeeper 的 Watcher 機(jī)制的一個(gè)非常重要的特性。另外,客戶端向服務(wù)端注冊(cè) Watcher 的時(shí)候,并不會(huì)把客戶端真實(shí)的 Watcher 對(duì)象傳遞到服務(wù)端,僅僅只是在客戶端請(qǐng)求中使用 boolean 類型屬性進(jìn)行了標(biāo)記,同時(shí)服務(wù)端也僅僅只是保存了當(dāng)前連接的 ServerCnxn 對(duì)象。這樣輕量級(jí)的 Watcher 機(jī)制設(shè)計(jì),在網(wǎng)絡(luò)開銷和服務(wù)端內(nèi)存開銷上都是非常廉價(jià)的。

參考資料:
ZooKeeper Watcher機(jī)制
Apache ZooKeeper Watcher 機(jī)制源碼解釋
品味ZooKeeper之Watcher機(jī)制

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容