Zookeeper Watch機(jī)制
Watcher是一種簡單的機(jī)制,使客戶端得到關(guān)于ZooKeeper集合中的更改的通知。 客戶端可以在讀取特定znode時設(shè)置Watcher。Watcher會向注冊的客戶端發(fā)送任何znode(客戶端注冊表)更改的通知。
1. 概述
ZooKeeper Watch 機(jī)制是指,客戶端在所有的讀命令上告知服務(wù)端:這個節(jié)點或者子節(jié)點變化時通知我,具體來說,支持的寫操作有:
- getData
- getChildren
- exists
例如,我們在命令行可以輸入 get -w /foo,其中 -w 參數(shù)就是用于告知 ZooKeeper 服務(wù)端,當(dāng)前客戶端想在 /foo 節(jié)點上設(shè)置一個監(jiān)聽器。
ZooKeeper Watch 機(jī)制的兩個細(xì)節(jié):
- wactch 是一次性觸發(fā)的(除了永久遞歸 watch),如果客戶端如果在一個 watch 通知后繼續(xù)收到相同節(jié)點的 watch 通知,那么必須再次注冊 watch 一次;
- 服務(wù)端發(fā)給客戶端的 watch 通知并不包含具體的節(jié)點數(shù)據(jù),其起到的作用非常存粹:告知客戶端其關(guān)注的節(jié)點發(fā)生了 watch 事件;
本篇博客在客戶端角度,從底層出發(fā),看一下Zookeeper Watch機(jī)制。開始之前,先思考一下以下疑問,帶著這些問題進(jìn)行Zookeeper客戶端的學(xué)習(xí)。
- Zookeeper 客戶端如何進(jìn)行網(wǎng)絡(luò)請求
- Zookeeper 如何處理同步和異步請求
- Zookeeper如何注冊和觸發(fā)Watcher
- 我們常用的ZkClient又做了什么?
2. 客戶端網(wǎng)絡(luò)IO模型
Copy From ZooKeeper客戶端源碼解讀(網(wǎng)絡(luò)I/O)
2.1 整體結(jié)構(gòu)圖
ClientCnxnSocket 封裝了底層Socket通信層, ClientCnxnSocket整體結(jié)構(gòu)如圖所示:

2.2 Packet
Packet是ClientCnxn內(nèi)部定義的一個對協(xié)議層的封裝,作為ZooKeeper]中請求與響應(yīng)的載體。


從上圖可以看出,Packet中包含了請求頭、響應(yīng)頭、請求體、響應(yīng)體、節(jié)點路徑和注冊的Watcher等信息。
2.3 SenderThread
2.3.1 基本概念
SendThread是客戶端ClientCnxn內(nèi)部一個核心的I/O調(diào)度線程,用于管理客戶端和服務(wù)端之間的所有網(wǎng)絡(luò)I/O操作。在ZooKeeper客戶端的實際運行過程中
- SendThead維護(hù)了客戶端和服務(wù)端之間的會話生命周期,其通過在一定的周期頻率內(nèi)向服務(wù)器發(fā)送一個PING包來實現(xiàn)心跳檢測,同時,在會話周期內(nèi),如果客戶端和服務(wù)端之間出現(xiàn)TCP連接斷開的情況,那么就會自動而且透明化完成重連操作。
- 另一方面,SendThread管理了客戶端所有的請求發(fā)送和響應(yīng)接收操作,其將上層客戶端API操作轉(zhuǎn)換成相應(yīng)的請求協(xié)議并發(fā)送到服務(wù)端,并完成對同步調(diào)用的返回和異步調(diào)用的回調(diào)。
- 同時,SendThread還負(fù)責(zé)將來自服務(wù)端的事件傳遞給EventThread去處理。
Sender進(jìn)程就一直嘗試與Zookeeper服務(wù)器進(jìn)行交互:
//org.apache.zookeeper.ClientCnxn.SendThread
@Override
public void run() {
// ...
while (state.isAlive()) {
clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
}
//...
}
// org.apache.zookeeper.ClientCnxnSocketNIO#doTransport
void doTransport(...) {
...
//監(jiān)聽Selector,對讀和寫進(jìn)行操作
for (SelectionKey k : selected) {
...
if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
//doIO
doIO(pendingQueue, cnxn);
}
...
}
}
void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
SocketChannel sock = (SocketChannel) sockKey.channel();
if (sockKey.isReadable()) {
// 讀操作
}
if (sockKey.isWritable()) {
//寫操作
}
}
2.3.2 outgoingQueue和pendingQueue
/**
* These are the packets that have been sent and are waiting for a response.
*/
private final Queue<Packet> pendingQueue = new ArrayDeque<>();
/**
* These are the packets that need to be sent.
*/
private final LinkedBlockingDeque<Packet> outgoingQueue = new LinkedBlockingDeque<Packet>();
-
ClientCnxn中,有兩個比較核心的隊列outgoingQueue和pendingQueue,分別代表客戶端的請求發(fā)送隊列和服務(wù)端響應(yīng)的等待隊列。
- outgoingQueue隊列是一個請求發(fā)送隊列,專門用于存儲那些需要發(fā)送到服務(wù)端的Packet集合。
- pendingQueue隊列是為了存儲那些已經(jīng)從客戶端發(fā)送到服務(wù)端的,但是需要等待服務(wù)端響應(yīng)的Packet集合。(實現(xiàn)同步異步請求的關(guān)鍵)
2.3.3 發(fā)送數(shù)據(jù)
在正常情況下(即客戶端與服務(wù)端之間的TCP連接正常且會話有效的情況下):
- 用戶通過各種接口發(fā)送請求,都會通過submitRequest方法,將請求封裝為packet, 被保存到outgoingQueue隊列方法
//org.apache.zookeeper.ClientCnxn#submitRequest
public ReplyHeader submitRequest(RequestHeader h, Record request,
Record response, WatchRegistration watchRegistration)
throws InterruptedException {
ReplyHeader r = new ReplyHeader();
Packet packet = queuePacket(h, r, request, response, null, null, null,
null, watchRegistration);
synchronized (packet) {
while (!packet.finished) {
packet.wait();
}
}
return r;
}
- SenderThread 從outgoingQueue隊列中提取一個可發(fā)送的Packet對象,同時生成一個客戶端請求序號XID,并將其設(shè)置到Packet請求頭中去,然后將其進(jìn)行序列化后進(jìn)行發(fā)送。這里提到的獲取一個可發(fā)送的Packet對象指的哪些Packet呢?在outgoingQueue隊列中的Packet整體上是按照先進(jìn)先出的順序被處理的,但是如果檢測到客戶端與服務(wù)端之間正在進(jìn)行SASL權(quán)限的話,那么那些不含請求頭(requestHeader)的Packet(例如會話創(chuàng)建請求)是可以被發(fā)送的,其余的都無法發(fā)送。
- 請求發(fā)送完畢后,會立即將該Packet保存到pendingQueue隊列中,以便等待服務(wù)端響應(yīng)返回后進(jìn)行相應(yīng)的處理。
//// org.apache.zookeeper.ClientCnxnSocketNIO#doIO
if (sockKey.isWritable()) {
// 會從outgoingQueue隊列中提取一個可發(fā)送的Packet對象
Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());
...
// 發(fā)送請求
sock.write(p.bb);
if (!p.bb.hasRemaining()) {
sentCount.getAndIncrement();
outgoingQueue.removeFirstOccurrence(p);
if (p.requestHeader != null
&& p.requestHeader.getType() != OpCode.ping
&& p.requestHeader.getType() != OpCode.auth) {
synchronized (pendingQueue) {
//寫入pendingQueue
pendingQueue.add(p);
}
}
}
...
}
2.3.4 響應(yīng)接收
客戶端獲取到來自服務(wù)端的完整響應(yīng)數(shù)據(jù)后,根據(jù)不同的客戶端請求類型,會進(jìn)行不同的處理
- 如果檢測到當(dāng)前客戶端還未進(jìn)行初始化,那么說明當(dāng)前客戶端與服務(wù)端之間正在進(jìn)行會話創(chuàng)建,那么就直接將接收到的ByteBuffer(incomingBuffer)序列化為ConnectResponse對象
- 如果當(dāng)前客戶端已經(jīng)處于正常的會話周期,那么接收到的服務(wù)端響應(yīng)是一個事件,讓eventThread觸發(fā)相應(yīng)的watcher。
- 如果是一個常規(guī)的請求響應(yīng)(指的是Create、GetData和Exist等操作請求),那么會從pendingQueue隊列中取出一個Packet來進(jìn)行相應(yīng)的處理。通過在finishPacket方法中處理響應(yīng):
- 如果存在Watcher,就注冊
- 如果是同步請求,可以讓調(diào)用方從阻塞中恢復(fù)。
- 如果是異步請求,放入EventQueue等待后續(xù)通知
// org.apache.zookeeper.ClientCnxnSocketNIO#doIO
if (sockKey.isReadable()) {
int rc = sock.read(incomingBuffer);
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
// org.apache.zookeeper.ClientCnxnSocketNIO#readResponse
void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
replyHdr.deserialize(bbia, "header");
switch (replyHdr.getXid()) {
...
// -1 means notification(WATCHER_EVENT)
// 如果是事務(wù)通知
case NOTIFICATION_XID:
LOG.debug("Got notification session id: 0x{}",
Long.toHexString(sessionId));
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response");
...
WatchedEvent we = new WatchedEvent(event);
LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId));
//讓eventThread觸發(fā)相應(yīng)的watcher
eventThread.queueEvent(we);
return;
default:
break;
}
//如果是常規(guī)應(yīng)答
Packet packet;
synchronized (pendingQueue) {
if (pendingQueue.size() == 0) {
throw new IOException("Nothing in the queue, but got " + replyHdr.getXid());
}
packet = pendingQueue.remove();
}
...
// 處理Watcher注冊等邏輯
finishPacket(packet);
}
2.4 EventThread
EventThread中有一個waitingEvents隊列,用于臨時存放那么需要被觸發(fā)的Object,包括那些客戶點注冊的Watcher和異步接口中注冊的回到器AsyncCallBack。
- SenderThread收到 event通知請求 時,會將Watcher 加入到 EventThread
- SenderThread收到 應(yīng)答請求 時,會將AsyncCallBack 加入到 EventThread
同時,EventThread會不斷地從waitingEvents這個隊列中取出Object,識別出其具體類型(Watcher或者AsynCallBack),并分別調(diào)用process和processResult接口方法來實現(xiàn)對事件的觸發(fā)和回調(diào)
//org.apache.zookeeper.ClientCnxn.EventThread#run
public void run() {
...
while (true) {
Object event = waitingEvents.take();
if (event == eventOfDeath) {
wasKilled = true;
} else {
processEvent(event);
}
}
...
//org.apache.zookeeper.ClientCnxn.EventThread#processEvent
private void processEvent(Object event) {
try {
if (event instanceof WatcherSetEventPair) {
WatcherSetEventPair pair = (WatcherSetEventPair) event;
for (Watcher watcher : pair.watchers) {
try {
watcher.process(pair.event);
} catch (Throwable t) {
LOG.error("Error while calling watcher ", t);
}
}
} else {
Packet p = (Packet) event;
int rc = 0;
StatCallback cb = (StatCallback) p.cb;
cb.processResult(rc, clientPath, p.ctx,
((ExistsResponse) p.response)
.getStat());
}
}
}
3. Zookeeper 客戶端Watcher機(jī)制原理
ZooKeeper 允許客戶端向服務(wù)端注冊一個Watcher監(jiān)聽,當(dāng)服務(wù)端的一些指定事件觸發(fā)了這個Watcher,那么就會向指定客戶端發(fā)送一個事件通知來實現(xiàn)分布式的通知功能。ZooKeeper的Watcher機(jī)制主要包括客戶端線程、客戶端WatchManager和ZooKeeper服務(wù)器三部分。
客戶端向 ZooKeeper 服務(wù)器注冊 Watcher
ZooKeeper 注冊成功后,會對客戶端做出應(yīng)答。
客戶端將 Watcher 對象存儲在客戶端的 WatchManager 中;
ZooKeeper 服務(wù)端觸發(fā) Watcher 事件后,向客戶端發(fā)送通知;
-
客戶端線程從 WatchManager 中取出對應(yīng)的 Watcher 對象來執(zhí)行回調(diào)邏輯。
以 getData接口為例,過一下客戶端的注冊邏輯:
注冊
- 當(dāng)發(fā)送一個帶有 Watch 事件的請求時,客戶端首先會把該會話標(biāo)記為帶有 Watch 監(jiān)控的事件請求,發(fā)送給服務(wù)器。
// org.apache.zookeeper.ZooKeeper#getData
public byte[] getData(final String path, Watcher watcher, Stat stat){
...
WatchRegistration wcb = null;
if (watcher != null) {
wcb = new DataWatchRegistration(watcher, clientPath);
}
RequestHeader h = new RequestHeader();
request.setWatch(watcher != null);
...
GetDataResponse response = new GetDataResponse();
ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
}
//org.apache.zookeeper.ClientCnxn#submitRequest
public ReplyHeader submitRequest(RequestHeader h, Record request,
Record response, WatchRegistration watchRegistration)
throws InterruptedException {
ReplyHeader r = new ReplyHeader();
Packet packet = queuePacket(h, r, request, response, null, null, null,
null, watchRegistration);
synchronized (packet) {
while (!packet.finished) {
packet.wait();
}
}
return r;
}
- 上一步的“發(fā)送”其實就是寫入
outgoingQueue, 等待SenderThread發(fā)送 - 調(diào)用負(fù)責(zé)處理服務(wù)器響應(yīng)的
SendThread線程類中的readResponse方法接收服務(wù)端的回調(diào),并在最后執(zhí)行finishPacket()方法將 Watch 注冊到ZKWatchManager中。
// org.apache.zookeeper.ClientCnxn#finishPacket
private void finishPacket(Packet p) {
int err = p.replyHeader.getErr();
if (p.watchRegistration != null) {
p.watchRegistration.register(err);
}
...
}
客戶端回調(diào)的處理過程
- 客戶端使用
SendThread.readResponse()方法來統(tǒng)一處理服務(wù)端的相應(yīng)。通過請求頭信息判斷為事件通知類型,首先將己收到的字節(jié)流反序列化轉(zhuǎn)換成WatcherEvent對象。然后調(diào)用eventThread.queueEvent( )方法將接收到的事件交給 EventThread 線程進(jìn)行處理。 - 按照通知的事件類型,從 ZKWatchManager 中查詢注冊過的客戶端 Watch 信息。客戶端在查詢到對應(yīng)的 Watch 信息后,會將其從 ZKWatchManager 的管理中刪除。
- 將查詢到的 Watcher 存儲到
waitingEvents隊列中,調(diào)用 EventThread 類中的 run 方法會循環(huán)取出在waitingEvents隊列中等待的 Watcher 事件進(jìn)行處理。
public void queueEvent(WatchedEvent event) {
if (event.getType() == EventType.None
&& sessionState == event.getState()) {
return;
}
sessionState = event.getState();
// materialize the watchers based on the event
WatcherSetEventPair pair = new WatcherSetEventPair(
watcher.materialize(event.getState(), event.getType(),
event.getPath()),
event);
// queue the pair (watch set & event) for later processing
waitingEvents.add(pair);
}
4. ZkClient”奪權(quán)“EventThread
我們常用的ZkClient其實就是一個Watcher:
public class ZkClient implements Watcher {
}
在創(chuàng)建Zookeeper客戶端的時候,它將自己當(dāng)作DefaultWatcher傳入,并且之后再設(shè)置監(jiān)聽都 watch = false,對所有注冊的事件都采用ZkClient來處理。
即ZkClient全面接手waitingEvents的事件處理邏輯,調(diào)用自己內(nèi)部實現(xiàn)的一個Event隊列。

