Watcher 的基本流程
ZooKeeper 的 Watcher 機(jī)制,總的來(lái)說(shuō)可以分為三個(gè)過(guò)程:客戶端注冊(cè) Watcher、服務(wù)器處理 Watcher 和客戶端回調(diào)Watcher
客戶端注冊(cè) watcher 有 3 種方式,getData、exists、getChildren;

以如下代碼為例來(lái)分析整個(gè)觸發(fā)機(jī)制的原理
基于 zkclient 客戶端發(fā)起一個(gè)數(shù)據(jù)操作
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
public static void main(String[] args) throws KeeperException, InterruptedException, IOException {
ZooKeeper zookeeper = new ZooKeeper("192.168.13.102:2181", 4000, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("event.type" + event.getType());
}
});
zookeeper.create("/watch", "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //創(chuàng)建節(jié)點(diǎn)
zookeeper.exists("/watch", true); //注冊(cè)監(jiān)聽(tīng)
Thread.sleep(1000);
zookeeper.setData("/watch", "1".getBytes(), -1); //修改節(jié)點(diǎn)的值觸發(fā)監(jiān)聽(tīng)
System.in.read();
}
ZooKeeper API 的初始化過(guò)程
ZooKeeper zookeeper = new ZooKeeper("192.168.13.102:2181", 4000, new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("event.type" + event.getType());
}
});
在創(chuàng)建一個(gè) ZooKeeper 客戶端對(duì)象實(shí)例時(shí),我們通過(guò) new Watcher()向構(gòu)造方法中傳入一個(gè)默認(rèn)的 Watcher, 這個(gè)Watcher 將作為整個(gè) ZooKeeper 會(huì)話期間的默認(rèn) Watcher,會(huì)一直被保存在客戶端 ZKWatchManager 的 defaultWatcher 中
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
boolean canBeReadOnly, HostProvider aHostProvider,
ZKClientConfig clientConfig) throws IOException {
LOG.info("Initiating client connection, connectString=" + connectString
+ " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
if (clientConfig == null) {
clientConfig = new ZKClientConfig();
}
this.clientConfig = clientConfig;
watchManager = defaultWatchManager();
//在這里將 watcher 設(shè)置到 ZKWatchManager
watchManager.defaultWatcher = watcher;
ConnectStringParser connectStringParser = new ConnectStringParser(
connectString);
hostProvider = aHostProvider;
//初始化了 ClientCnxn,并且調(diào)用 cnxn.start()方法
cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
hostProvider, sessionTimeout, this, watchManager,
getClientCnxnSocket(), canBeReadOnly);
cnxn.start();
}
ClientCnxn:是 Zookeeper 客戶端和 Zookeeper 服務(wù)器端進(jìn)行通信和事件通知處理的主要類(lèi),它內(nèi)部包含兩個(gè)類(lèi):
- SendThread :負(fù)責(zé)客戶端和服務(wù)器端的數(shù)據(jù)通信, 也包括事件信息的傳輸
- EventThread : 主要在客戶端回調(diào)注冊(cè)的 Watchers 進(jìn)行通知處理
ClientCnxn 初始化
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;
connectTimeout = sessionTimeout / hostProvider.size();
readTimeout = sessionTimeout * 2 / 3;
readOnly = canBeReadOnly;
//初始化 sendThread
sendThread = new SendThread(clientCnxnSocket);
//初始化 eventThread
eventThread = new EventThread();
this.clientConfig = zooKeeper.getClientConfig();
}
public void start() {
//啟動(dòng)兩個(gè)線程
sendThread.start();
eventThread.start();
}
服務(wù)端接收請(qǐng)求處理流程
NIOServerCnxn
服務(wù)端有一個(gè) NIOServerCnxn 類(lèi),用來(lái)處理客戶端發(fā)送過(guò)來(lái)的請(qǐng)求
ZookeeperServer-zks.processPacket(this, bb)
處理客戶端傳送過(guò)來(lái)的數(shù)據(jù)包
public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
// We have the request, now process and setup for next
InputStream bais = new ByteBufferInputStream(incomingBuffer);
BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
RequestHeader h = new RequestHeader();
h.deserialize(bia, "header"); //反序列化客戶端 header 頭信息
// Through the magic of byte buffers, txn will not be
// pointing
// to the start of the txn
incomingBuffer = incomingBuffer.slice();
if (h.getType() == OpCode.auth) { //判斷當(dāng)前操作類(lèi)型,如果是 auth 操作,則執(zhí)行下面的代碼
LOG.info("got auth packet " + cnxn.getRemoteSocketAddress());
AuthPacket authPacket = new AuthPacket();
ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
String scheme = authPacket.getScheme();
ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);
Code authReturn = KeeperException.Code.AUTHFAILED;
if (ap != null) {
try {
authReturn = ap.handleAuthentication(new ServerAuthenticationProvider.ServerObjs(this, cnxn), authPacket.getAuth());
} catch (RuntimeException e) {
LOG.warn("Caught runtime exception from AuthenticationProvider: " + scheme + " dueto" + e);
authReturn = KeeperException.Code.AUTHFAILED;
}
}
if (authReturn == KeeperException.Code.OK) {
if (LOG.isDebugEnabled()) {
LOG.debug("Authentication succeeded for scheme: " + scheme);
}
LOG.info("auth success " + cnxn.getRemoteSocketAddress());
ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
KeeperException.Code.OK.intValue());
cnxn.sendResponse(rh, null, null);
} else {
if (ap == null) {
LOG.warn("No authentication provider for scheme: "
+ scheme + " has "
+ ProviderRegistry.listProviders());
} else {
LOG.warn("Authentication failed for scheme: " + scheme);
}
// send a response...
ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
KeeperException.Code.AUTHFAILED.intValue());
cnxn.sendResponse(rh, null, null);
// ... and close connection
cnxn.sendBuffer(ServerCnxnFactory.closeConn);
cnxn.disableRecv();
}
return;
} else { //如果不是授權(quán)操作,再判斷是否為 sasl 操作
if (h.getType() == OpCode.sasl) {
Record rsp = processSasl(incomingBuffer, cnxn);
ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
cnxn.sendResponse(rh, rsp, "response"); // not sure about 3rd arg..what is it?
return;
} else {//最終進(jìn)入這個(gè)代碼塊進(jìn)行處理
//封裝請(qǐng)求對(duì)象
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
h.getType(), incomingBuffer, cnxn.getAuthInfo());
si.setOwner(ServerCnxn.me);
// Always treat packet from the client as a possible
// local request.
setLocalSessionFlag(si);
submitRequest(si); //提交請(qǐng)求
}
}
cnxn.incrOutstandingRequests(h);
}
submitRequest
負(fù)責(zé)在服務(wù)端提交當(dāng)前請(qǐng)求
public void submitRequest(Request si) {
if (firstProcessor == null) { //processor 處理器,request 過(guò)來(lái)以后會(huì)經(jīng)歷一系列處理器的處理過(guò)程
synchronized (this) {
try {
while (state == State.INITIAL) {
wait(1000);
}
} catch (InterruptedException e) {
LOG.warn("Unexpected interruption", e);
}
if (firstProcessor == null || state != State.RUNNING) {
throw new RuntimeException("Not started");
}
}
}
try {
touch(si.cnxn);
boolean validpacket = Request.isValid(si.type); //判斷是否合法
if (validpacket) {
//調(diào)用 firstProcessor 發(fā)起請(qǐng)求,而這個(gè) firstProcess 是一個(gè)接口,有多個(gè)實(shí)現(xiàn)類(lèi)
firstProcessor.processRequest(si);
if (si.cnxn != null) {
incInProcess();
}
} else {
LOG.warn("Received packet at server of unknown type " + si.type);
new UnimplementedRequestProcessor().processRequest(si);
}
} catch (MissingSessionException e) {
if (LOG.isDebugEnabled()) {
LOG.debug("Dropping request: " + e.getMessage());
}
} catch (RequestProcessorException e) {
LOG.error("Unable to process request:" + e.getMessage(), e);
}
}
firstProcessor 的請(qǐng)求鏈組成
firstProcessor 的初始化是在 ZookeeperServer 的 setupRequestProcessor 中完成的,代碼如下
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this);
RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
((SyncRequestProcessor) syncProcessor).start();
firstProcessor = new PrepRequestProcessor(this, syncProcessor);//需要注意的是,PrepRequestProcessor 中傳遞的是一個(gè) syncProcessor
((PrepRequestProcessor) firstProcessor).start();
}
從上面我們可以看到 firstProcessor 的實(shí)例是一個(gè) PrepRequestProcessor,而這個(gè)構(gòu)造方法中又傳遞了一個(gè) Processor構(gòu)成了一個(gè)調(diào)用鏈。
RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
而 syncProcessor 的構(gòu)造方法傳遞的又是一個(gè) Processor,對(duì)應(yīng)的是 FinalRequestProcessor
所以整個(gè)調(diào)用鏈?zhǔn)?PrepRequestProcessor -> SyncRequestProcessor ->FinalRequestProcessor
PredRequestProcessor.processRequest(si);
通過(guò)上面了解到調(diào)用鏈關(guān)系以后,我們繼續(xù)再看 firstProcessor.processRequest(si); 會(huì)調(diào)用到 PrepRequestProcessor
public void processRequest(Request request) {
submittedRequests.add(request);
}
這里processRequest 只是把 request 添加到 submittedRequests 中,根據(jù)前面的經(jīng)驗(yàn),很自然的想到這里又是一個(gè)異步操作。而 subittedRequests 又是一個(gè)阻塞隊(duì)列
LinkedBlockingQueue submittedRequests = new LinkedBlockingQueue();
而 PrepRequestProcessor 這個(gè)類(lèi)又繼承了線程類(lèi),因此我們直接找到當(dāng)前類(lèi)中的 run 方法如下
public void run() {
try {
while (true) {
Request request = submittedRequests.take(); //ok,從隊(duì)列中拿到請(qǐng)求進(jìn)行處理
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) {
traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
}
if (LOG.isTraceEnabled()) {
ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
}
if (Request.requestOfDeath == request) {
break;
}
pRequest(request); //調(diào)用 pRequest 進(jìn)行預(yù)處理
}
} catch (RequestProcessorException e) {
if (e.getCause() instanceof XidRolloverException) {
LOG.info(e.getCause().getMessage());
}
handleException(this.getName(), e);
} catch (Exception e) {
handleException(this.getName(), e);
}
LOG.info("PrepRequestProcessor exited loop!");
}
pRequest
預(yù)處理這塊的代碼太長(zhǎng),就不好貼了。前面的 N 行代碼都是根據(jù)當(dāng)前的 OP 類(lèi)型進(jìn)行判斷和做相應(yīng)的處理,在這個(gè)方法中的最后一行中,我們會(huì)看到如下代碼:
nextProcessor.processRequest(request);
SyncRequestProcessor. processRequest
public void processRequest(Request request) {
// request.addRQRec(">sync");
queuedRequests.add(request);
}
這個(gè)方法的代碼也是一樣,基于異步化的操作,把請(qǐng)求添加到 queuedRequets 中,那么我們繼續(xù)在當(dāng)前類(lèi)找到 run 方法
public void run() {
try {
int logCount = 0;
// we do this in an attempt to ensure that not all of the servers
// in the ensemble take a snapshot at the same time
int randRoll = r.nextInt(snapCount / 2);
while (true) {
Request si = null;
//從阻塞隊(duì)列中獲取請(qǐng)求
if (toFlush.isEmpty()) {
si = queuedRequests.take();
} else {
si = queuedRequests.poll();
if (si == null) {
flush(toFlush);
continue;
}
}
if (si == requestOfDeath) {
break;
}
if (si != null) {
// track the number of records written to the log
//下面這塊代碼,粗略看來(lái)是觸發(fā)快照操作,啟動(dòng)一個(gè)處理快照的線程
if (zks.getZKDatabase().append(si)) {
logCount++;
if (logCount > (snapCount / 2 + randRoll)) {
randRoll = r.nextInt(snapCount / 2);
// roll the log
zks.getZKDatabase().rollLog();
// take a snapshot
if (snapInProcess != null && snapInProcess.isAlive()) {
LOG.warn("Too busy to snap, skipping");
} else {
snapInProcess = new ZooKeeperThread("Snapshot Thread") {
public void run() {
try {
zks.takeSnapshot();
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
}
}
};
snapInProcess.start();
}
logCount = 0;
}
} else if (toFlush.isEmpty()) {
if (nextProcessor != null) {
nextProcessor.processRequest(si); //繼續(xù)調(diào)用下一個(gè)處理器來(lái)處理請(qǐng)求
if (nextProcessor instanceof Flushable) {
((Flushable) nextProcessor).flush();
}
}
continue;
}
toFlush.add(si);
if (toFlush.size() > 1000) {
flush(toFlush);
}
}
}
} catch (Throwable t) {
handleException(this.getName(), t);
} finally {
running = false;
}
LOG.info("SyncRequestProcessor exited!");
}
FinalRequestProcessor. processRequest
這個(gè)方法就是我們?cè)谡n堂上分析到的方法了,F(xiàn)inalRequestProcessor.processRequest 方法并根據(jù) Request 對(duì)象中的操作更新內(nèi)存中 Session 信息或者 znode 數(shù)據(jù)。
這塊代碼有小 300 多行,就不全部貼出來(lái)了,我們直接定位到關(guān)鍵代碼,根據(jù)客戶端的 OP 類(lèi)型找到如下的代碼:
case OpCode.exists: {
lastOp = "EXIS";
// TODO we need to figure out the security requirement for this!
ExistsRequest existsRequest = new ExistsRequest();
//反序列化 (將 ByteBuffer 反序列化成為 ExitsRequest.這個(gè)就是我們?cè)诳蛻舳税l(fā)起請(qǐng)求的時(shí)候傳遞過(guò)來(lái)的 Request 對(duì)象
ByteBufferInputStream.byteBuffer2Record(request.request, existsRequest);
String path = existsRequest.getPath(); //得到請(qǐng)求的路徑
if (path.indexOf('\0') != -1) {
throw new KeeperException.BadArgumentsException();
}
//終于找到一個(gè)很關(guān)鍵的代碼,判斷請(qǐng)求的 getWatch 是否存在,如果存在,則傳遞 cnxn(servercnxn)
//對(duì)于 exists 請(qǐng)求,需要監(jiān)聽(tīng) data 變化事件,添加 watcher
Stat stat = zks.getZKDatabase().statNode(path, existsRequest.getWatch() ? cnxn : null);
rsp = new ExistsResponse(stat); //在服務(wù)端內(nèi)存數(shù)據(jù)庫(kù)中根據(jù)路徑得到結(jié)果進(jìn)行組裝,設(shè)置為 ExistsResponse
break;
}
客戶端接收服務(wù)端處理完成的響應(yīng)
ClientCnxnSocketNetty.messageReceived
服務(wù)端處理完成以后,會(huì)通過(guò) NettyServerCnxn.sendResponse 發(fā)送返回的響應(yīng)信息,
客戶端會(huì)在 ClientCnxnSocketNetty.messageReceived 接收服務(wù)端的返回
public void messageReceived(ChannelHandlerContext ctx,
MessageEvent e) throws Exception {
updateNow();
ChannelBuffer buf = (ChannelBuffer) e.getMessage();
while (buf.readable()) {
if (incomingBuffer.remaining() > buf.readableBytes()) {
int newLimit = incomingBuffer.position()
+ buf.readableBytes();
incomingBuffer.limit(newLimit);
}
buf.readBytes(incomingBuffer);
incomingBuffer.limit(incomingBuffer.capacity());
if (!incomingBuffer.hasRemaining()) {
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
recvCount++;
readLength();
} else if (!initialized) {
readConnectResult();
lenBuffer.clear();
incomingBuffer = lenBuffer;
initialized = true;
updateLastHeard();
} else {
//收到消息以后觸發(fā) SendThread.readResponse方法
sendThread.readResponse(incomingBuffer);
lenBuffer.clear();
incomingBuffer = lenBuffer;
updateLastHeard();
}
}
}
wakeupCnxn();
}
SendThread. readResponse
這個(gè)方法里面主要的流程如下:
- 首先讀取 header,如果其 xid == -2,表明是一個(gè) ping 的 response,return
- 如果 xid 是 -4 ,表明是一個(gè) AuthPacket 的 response return
- 如果 xid 是 -1,表明是一個(gè) notification,此時(shí)要繼續(xù)讀取并構(gòu)造一個(gè) enent,通過(guò) EventThread.queueEvent 發(fā)送,return
其它情況下:
從 pendingQueue 拿出一個(gè) Packet,校驗(yàn)后更新 packet 信息
void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(
incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
replyHdr.deserialize(bbia, "header"); //反序列化 header
if (replyHdr.getXid() == -2) { //?
// -2 is the xid for pings
if (LOG.isDebugEnabled()) {
LOG.debug("Got ping response for sessionid: 0x"
+ Long.toHexString(sessionId)
+ " after "
+ ((System.nanoTime() - lastPingSentNs) / 1000000)
+ "ms");
}
return;
}
if (replyHdr.getXid() == -4) {
// -4 is the xid for AuthPacket
if (replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
state = States.AUTH_FAILED;
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null));
}
if (LOG.isDebugEnabled()) {
LOG.debug("Got auth sessionid:0x"
+ Long.toHexString(sessionId));
}
return;
}
if (replyHdr.getXid() == -1) { //表示當(dāng)前的消息類(lèi)型為一個(gè) notification(意味著是服務(wù)端的一個(gè)響應(yīng)事件)
// -1 means notification
if (LOG.isDebugEnabled()) {
LOG.debug("Got notification sessionid:0x"
+ Long.toHexString(sessionId));
}
WatcherEvent event = new WatcherEvent();//?
event.deserialize(bbia, "response"); //反序列化響應(yīng)信息
// convert from a server path to a client path
if (chrootPath != null) {
String serverPath = event.getPath();
if (serverPath.compareTo(chrootPath) == 0)
event.setPath("/");
else if (serverPath.length() > chrootPath.length())
event.setPath(serverPath.substring(chrootPath.length()));
else {
LOG.warn("Got server path " + event.getPath()
+ " which is too short for chroot path "
+ chrootPath);
}
}
WatchedEvent we = new WatchedEvent(event);
if (LOG.isDebugEnabled()) {
LOG.debug("Got " + we + " for sessionid 0x"
+ Long.toHexString(sessionId));
}
eventThread.queueEvent(we);
return;
}
// If SASL authentication is currently in progress, construct and
// send a response packet immediately, rather than queuing a
// response as with other packets.
if (tunnelAuthInProgress()) {
GetSASLRequest request = new GetSASLRequest();
request.deserialize(bbia, "token");
zooKeeperSaslClient.respondToServer(request.getToken(),
ClientCnxn.this);
return;
}
Packet packet;
synchronized (pendingQueue) {
if (pendingQueue.size() == 0) {
throw new IOException("Nothing in the queue, but got "
+ replyHdr.getXid());
}
packet = pendingQueue.remove(); //因?yàn)楫?dāng)前這個(gè)數(shù)據(jù)包已經(jīng)收到了響應(yīng),所以講它從 pendingQueued 中移除
}
/*
* Since requests are processed in order, we better get a response
* to the first request!
*/
try {//校驗(yàn)數(shù)據(jù)包信息,校驗(yàn)成功后講數(shù)據(jù)包信息進(jìn)行更新(替換為服務(wù)端的信息)
if (packet.requestHeader.getXid() != replyHdr.getXid()) {
packet.replyHeader.setErr(
KeeperException.Code.CONNECTIONLOSS.intValue());
throw new IOException("Xid out of order. Got Xid "
+ replyHdr.getXid() + " with err " +
+replyHdr.getErr() +
" expected Xid "
+ packet.requestHeader.getXid()
+ " for a packet with details: "
+ packet);
}
packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
if (replyHdr.getZxid() > 0) {
lastZxid = replyHdr.getZxid();
}
if (packet.response != null && replyHdr.getErr() == 0) {
packet.response.deserialize(bbia, "response"); //獲得服務(wù)端的響應(yīng),反序列化以后設(shè)置到 p
acket.response 屬性中。所以我們可以在 exists 方法的最后一行通過(guò) packet.response 拿到改請(qǐng)求的返回結(jié)果
}
if (LOG.isDebugEnabled()) {
LOG.debug("Reading reply sessionid:0x"
+ Long.toHexString(sessionId) + ", packet:: " + packet);
}
} finally {
finishPacket(packet); //最后調(diào)用 finishPacket 方法完成處理
}
}
finishPacket 方法
主要功能是把從 Packet 中取出對(duì)應(yīng)的 Watcher 并注冊(cè)到 ZKWatchManager 中去
private void finishPacket(Packet p) {
int err = p.replyHeader.getErr();
if (p.watchRegistration != null) {
p.watchRegistration.register(err); //將事件注冊(cè)到 zkwatchemanager 中
// watchRegistration,熟悉嗎?在組裝請(qǐng)求的時(shí)候,我們初始化了這個(gè)對(duì)象
//把 watchRegistration 子類(lèi)里面的 Watcher 實(shí)例放到 ZKWatchManager 的 existsWatches 中存儲(chǔ)起來(lái)。
}
//將所有移除的監(jiān)視事件添加到事件隊(duì)列, 這樣客戶端能收到 “data/child 事件被移除”的事件類(lèi)型
if (p.watchDeregistration != null) {
Map<EventType, Set<Watcher>> materializedWatchers = null;
try {
materializedWatchers = p.watchDeregistration.unregister(err);
for (Entry<EventType, Set<Watcher>> entry : materializedWatchers.entrySet()) {
Set<Watcher> watchers = entry.getValue();
if (watchers.size() > 0) {
queueEvent(p.watchDeregistration.getClientPath(), err,
watchers, entry.getKey());
p.replyHeader.setErr(Code.OK.intValue());
}
}
} catch (KeeperException.NoWatcherException nwe) {
p.replyHeader.setErr(nwe.code().intValue());
} catch (KeeperException ke) {
p.replyHeader.setErr(ke.code().intValue());
}
}
//cb 就是 AsnycCallback,如果為 null,表明是同步調(diào)用的接口,不需要異步回掉,因此,直接 notifyAll 即可。
if (p.cb == null) {
synchronized (p) {
p.finished = true;
p.notifyAll();
}
} else {
p.finished = true;
eventThread.queuePacket(p);
}
}
watchRegistration
public void register(int rc) {
if (shouldAddWatch(rc)) {
Map<String, Set<Watcher>> watches = getWatches(rc); // //通過(guò)子類(lèi)的實(shí)現(xiàn)取得 ZKWatchManager 中的 existsWatches
synchronized(watches) {
Set<Watcher> watchers = watches.get(clientPath);
if (watchers == null) {
watchers = new HashSet<Watcher>();
watches.put(clientPath, watchers);
}
watchers.add(watcher); //將 Watcher 對(duì)象放到 ZKWatchManager 中的 existsWatches 里面
}
}
}
下面這段代碼是客戶端存儲(chǔ) watcher 的幾個(gè) map 集合,分別對(duì)應(yīng)三種注冊(cè)監(jiān)聽(tīng)事件
static class ZKWatchManager implements ClientWatchManager {
private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>();
private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();
}
總的來(lái)說(shuō),當(dāng)使用 ZooKeeper 構(gòu)造方法或者使用 getData、exists 和getChildren 三個(gè)接口來(lái)向 ZooKeeper 服務(wù)器注冊(cè) Watcher 的時(shí)候,首先將此消息傳遞給服務(wù)端,傳遞成功后,服務(wù)端會(huì)通知客戶端,然后客戶端將該路徑和 Watcher 對(duì)應(yīng)關(guān)系存儲(chǔ)起來(lái)備用。
EventThread.queuePacket()
finishPacket 方法最終會(huì)調(diào)用 eventThread.queuePacket, 講當(dāng)前的數(shù)據(jù)包添加到等待事件通知的隊(duì)列中
public void queuePacket(Packet packet) {
if (wasKilled) {
synchronized (waitingEvents) {
if (isRunning) waitingEvents.add(packet);
else processEvent(packet);
}
} else {
waitingEvents.add(packet);
}
}
事件觸發(fā)
前面這么長(zhǎng)的說(shuō)明,只是為了清洗的說(shuō)明事件的注冊(cè)流程,最終的觸發(fā),還得需要通過(guò)事務(wù)型操作來(lái)完成事件的觸發(fā)
zookeeper.setData(“/wei”, “1”.getByte(),-1) ; //修改節(jié)點(diǎn)的值觸發(fā)監(jiān)聽(tīng)
前面的客戶端和服務(wù)端對(duì)接的流程就不再重復(fù)講解了,交互流程是一樣的,唯一的差別在于事件觸發(fā)了
服務(wù)端的事件響應(yīng) DataTree.setData()
public Stat setData(String path, byte data[], int version, long zxid,
long time) throws KeeperException.NoNodeException {
Stat s = new Stat();
DataNode n = nodes.get(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
byte lastdata[] = null;
synchronized (n) {
lastdata = n.data;
n.data = data;
n.stat.setMtime(time);
n.stat.setMzxid(zxid);
n.stat.setVersion(version);
n.copyStat(s);
}
// now update if the path is in a quota subtree.
String lastPrefix = getMaxPrefixWithQuota(path);
if (lastPrefix != null) {
this.updateBytes(lastPrefix, (data == null ? 0 : data.length)
- (lastdata == null ? 0 : lastdata.length));
}
dataWatches.triggerWatch(path, EventType.NodeDataChanged); //觸發(fā)對(duì)應(yīng)節(jié)點(diǎn)的 NodeDataChanged 事件
return s;
}
WatcherManager. triggerWatch
Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); // 根據(jù)事件類(lèi)型、連接狀態(tài)、節(jié)點(diǎn)路徑創(chuàng)建 WatchedEvent
HashSet<Watcher> watchers;
synchronized (this) {
watchers = watchTable.remove(path); // 從 watcher 表中移除 path,并返回其對(duì)應(yīng)的 watcher 集合
if (watchers == null || watchers.isEmpty()) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"No watchers for " + path);
}
return null;
}
for (Watcher w : watchers) { // 遍歷 watcher 集合
HashSet<String> paths = watch2Paths.get(w); // 根據(jù) watcher 從 watcher 表中取出路徑集合
if (paths != null) {
paths.remove(path); //移除路徑
}
}
}
for (Watcher w : watchers) { // 遍歷 watcher 集合
if (supress != null && supress.contains(w)) {
continue;
}
w.process(e); //OK,重點(diǎn)又來(lái)了,w.process 是做什么呢?
}
return watchers;
}
w.process(e);
還記得我們?cè)诜?wù)端綁定事件的時(shí)候,watcher 綁定是是什么?是 ServerCnxn, 所以 w.process(e),其實(shí)調(diào)用的應(yīng)該是ServerCnxn 的 process 方法。而 servercnxn 又是一個(gè)抽象方法,有兩個(gè)實(shí)現(xiàn)類(lèi),分別是:NIOServerCnxn 和NettyServerCnxn。那接下來(lái)我們扒開(kāi) NIOServerCnxn 這個(gè)類(lèi)的 process 方法看看究竟
public void process(WatchedEvent event) {
ReplyHeader h = new ReplyHeader(-1, -1L, 0);
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"Deliver event " + event + " to 0x"
+ Long.toHexString(this.sessionId)
+ " through " + this);
}
// Convert WatchedEvent to a type that can be sent over the wire
WatcherEvent e = event.getWrapper();
try {
sendResponse(h, e, "notification"); // 這個(gè)地方發(fā)送了一個(gè)事件,事件對(duì)象為 WatcherEvent
} catch (IOException e1) {
if (LOG.isDebugEnabled()) {
LOG.debug("Problem sending to " + getRemoteSocketAddress(), e1);
}
close();
}
}
那接下里,客戶端會(huì)收到這個(gè) response,觸發(fā) SendThread.readResponse 方法
客戶端處理事件響應(yīng)
SendThread.readResponse
這塊代碼上面已經(jīng)貼過(guò)了,所以我們只挑選當(dāng)前流程的代碼進(jìn)行講解,按照前面我們將到過(guò)的,notifacation 通知消息的 xid 為-1,意味著~直接找到-1 的判斷進(jìn)行分析
void readResponse(ByteBuffer incomingBuffer) throws IOException {
ByteBufferInputStream bbis = new ByteBufferInputStream(
incomingBuffer);
BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
ReplyHeader replyHdr = new ReplyHeader();
replyHdr.deserialize(bbia, "header");
if (replyHdr.getXid() == -2) { //?
// -2 is the xid for pings
if (LOG.isDebugEnabled()) {
LOG.debug("Got ping response for sessionid: 0x"
+ Long.toHexString(sessionId)
+ " after "
+ ((System.nanoTime() - lastPingSentNs) / 1000000)
+ "ms");
}
return;
}
if (replyHdr.getXid() == -4) {
// -4 is the xid for AuthPacket
if (replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
state = States.AUTH_FAILED;
eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
Watcher.Event.KeeperState.AuthFailed, null));
}
if (LOG.isDebugEnabled()) {
LOG.debug("Got auth sessionid:0x"
+ Long.toHexString(sessionId));
}
return;
}
if (replyHdr.getXid() == -1) {
// -1 means notification
if (LOG.isDebugEnabled()) {
LOG.debug("Got notification sessionid:0x"
+ Long.toHexString(sessionId));
}
WatcherEvent event = new WatcherEvent();
event.deserialize(bbia, "response"); //這個(gè)地方,是反序列化服務(wù)端的 WatcherEvent 事件。
// convert from a server path to a client path
if (chrootPath != null) {
String serverPath = event.getPath();
if (serverPath.compareTo(chrootPath) == 0)
event.setPath("/");
else if (serverPath.length() > chrootPath.length())
event.setPath(serverPath.substring(chrootPath.length()));
else {
LOG.warn("Got server path " + event.getPath()
+ " which is too short for chroot path "
+ chrootPath);
}
}
WatchedEvent we = new WatchedEvent(event); //組裝 watchedEvent 對(duì)象。
if (LOG.isDebugEnabled()) {
LOG.debug("Got " + we + " for sessionid 0x"
+ Long.toHexString(sessionId));
}
eventThread.queueEvent(we); //通過(guò) eventTherad 進(jìn)行事件處理
return;
}
// If SASL authentication is currently in progress, construct and
// send a response packet immediately, rather than queuing a
// response as with other packets.
if (tunnelAuthInProgress()) {
GetSASLRequest request = new GetSASLRequest();
request.deserialize(bbia, "token");
zooKeeperSaslClient.respondToServer(request.getToken(),
ClientCnxn.this);
return;
}
Packet packet;
synchronized (pendingQueue) {
if (pendingQueue.size() == 0) {
throw new IOException("Nothing in the queue, but got "
+ replyHdr.getXid());
}
packet = pendingQueue.remove();
}
/*
* Since requests are processed in order, we better get a response
* to the first request!
*/
try {
if (packet.requestHeader.getXid() != replyHdr.getXid()) {
packet.replyHeader.setErr(
KeeperException.Code.CONNECTIONLOSS.intValue());
throw new IOException("Xid out of order. Got Xid "
+ replyHdr.getXid() + " with err " +
+replyHdr.getErr() +
" expected Xid "
+ packet.requestHeader.getXid()
+ " for a packet with details: "
+ packet);
}
packet.replyHeader.setXid(replyHdr.getXid());
packet.replyHeader.setErr(replyHdr.getErr());
packet.replyHeader.setZxid(replyHdr.getZxid());
if (replyHdr.getZxid() > 0) {
lastZxid = replyHdr.getZxid();
}
if (packet.response != null && replyHdr.getErr() == 0) {
packet.response.deserialize(bbia, "response");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Reading reply sessionid:0x"
+ Long.toHexString(sessionId) + ", packet:: " + packet);
}
} finally {
finishPacket(packet);
}
}
eventThread.queueEvent
SendThread 接收到服務(wù)端的通知事件后,會(huì)通過(guò)調(diào)用 EventThread 類(lèi)的 queueEvent 方法將事件傳給 EventThread 線程,queueEvent 方法根據(jù)該通知事件,從 ZKWatchManager 中取出所有相關(guān)的 Watcher,如果獲取到相應(yīng)的 Watcher,就會(huì)讓 Watcher 移除失效。
private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) {
if (event.getType() == EventType.None && sessionState == event.getState()) { //判斷類(lèi)型
return;
}
sessionState = event.getState();
final Set<Watcher> watchers;
if (materializedWatchers == null) {
// materialize the watchers based on the event
watchers = watcher.materialize(event.getState(),
event.getType(), event.getPath());
} else {
watchers = new HashSet<Watcher>();
watchers.addAll(materializedWatchers);
}
//封裝 WatcherSetEventPair 對(duì)象,添加到 waitngEvents 隊(duì)列中
WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
// queue the pair (watch set & event) for later processing
waitingEvents.add(pair);
}
Meterialize 方法
通過(guò) dataWatches 或者 existWatches 或者 childWatches 的 remove 取出對(duì)應(yīng)的 watch,表明客戶端 watch 也是注冊(cè)一次就移除
同時(shí)需要根據(jù) keeperState、eventType 和 path 返回應(yīng)該被通知的 Watcher 集合
public Set<Watcher> materialize(Watcher.Event.KeeperState state,
Watcher.Event.EventType type,
String clientPath) {
Set<Watcher> result = new HashSet<Watcher>();
switch (type) {
case None:
result.add(defaultWatcher);
boolean clear = disableAutoWatchReset && state != Watcher.Event.KeeperState.SyncConnect
ed;
synchronized (dataWatches) {
for (Set<Watcher> ws : dataWatches.values()) {
result.addAll(ws);
}
if (clear) {
dataWatches.clear();
}
}
synchronized (existWatches) {
for (Set<Watcher> ws : existWatches.values()) {
result.addAll(ws);
}
if (clear) {
existWatches.clear();
}
}
synchronized (childWatches) {
for (Set<Watcher> ws : childWatches.values()) {
result.addAll(ws);
}
if (clear) {
childWatches.clear();
}
}
return result;
case NodeDataChanged:
case NodeCreated:
synchronized (dataWatches) {
addTo(dataWatches.remove(clientPath), result);
}
synchronized (existWatches) {
addTo(existWatches.remove(clientPath), result);
}
break;
case NodeChildrenChanged:
synchronized (childWatches) {
addTo(childWatches.remove(clientPath), result);
}
break;
case NodeDeleted:
synchronized (dataWatches) {
addTo(dataWatches.remove(clientPath), result);
}
// XXX This shouldn't be needed, but just in case
synchronized (existWatches) {
Set<Watcher> list = existWatches.remove(clientPath);
if (list != null) {
addTo(existWatches.remove(clientPath), result);
LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
}
}
synchronized (childWatches) {
addTo(childWatches.remove(clientPath), result);
}
break;
default:
String msg = "Unhandled watch event type " + type
+ " with state " + state + " on path " + clientPath;
LOG.error(msg);
throw new RuntimeException(msg);
}
return result;
}
waitingEvents.add
最后一步,接近真相了
waitingEvents 是 EventThread 這個(gè)線程中的阻塞隊(duì)列,很明顯,又是在我們第一步操作的時(shí)候?qū)嵗囊粋€(gè)線程。從名字可以指導(dǎo),waitingEvents 是一個(gè)待處理 Watcher 的隊(duì)列,EventThread 的 run() 方法會(huì)不斷從隊(duì)列中取數(shù)據(jù),交由 processEvent 方法處理:
public void run() {
try {
isRunning = true;
while (true) { //死循環(huán)
Object event = waitingEvents.take(); //從待處理的事件隊(duì)列中取出事件
if (event == eventOfDeath) {
wasKilled = true;
} else {
processEvent(event); //執(zhí)行事件處理
}
if (wasKilled)
synchronized (waitingEvents) {
if (waitingEvents.isEmpty()) {
isRunning = false;
break;
}
}
}
} catch (InterruptedException e) {
LOG.error("Event thread exiting due to interruption", e);
}
LOG.info("EventThread shut down for session: 0x{}",
Long.toHexString(getSessionId()));
}
ProcessEvent
由于這塊的代碼太長(zhǎng),只把核心的代碼貼出來(lái),這里就是處理事件觸發(fā)的核心代碼
private void processEvent(Object event) {
try {
if (event instanceof WatcherSetEventPair) { //判斷事件類(lèi)型
// each watcher will process the event
WatcherSetEventPair pair = (WatcherSetEventPair) event; //得到 watcherseteventPair
for (Watcher watcher : pair.watchers) { //拿到符合觸發(fā)機(jī)制的所有 watcher 列表,循環(huán)進(jìn)行調(diào)用
try {
watcher.process(pair.event); //調(diào)用客戶端的回調(diào) process,就是會(huì)調(diào)用開(kāi)篇寫(xiě)的new Watcher()的實(shí)現(xiàn)方法System.out.println("event.type" + event.getType());
} catch (Throwable t) {
LOG.error("Error while calling watcher ", t);
}
}
}
服務(wù)端接收數(shù)據(jù)請(qǐng)求
服務(wù)端收到的數(shù)據(jù)包應(yīng)該在哪里呢?
zookeeper 啟動(dòng)的時(shí)候,通過(guò)下面的代碼構(gòu)建了一個(gè)
ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
NIOServerCnxnFactory,它實(shí)現(xiàn)了 Thread,所以在啟動(dòng)的時(shí)候,會(huì)在 run 方法中不斷循環(huán)接收客戶端的請(qǐng)求進(jìn)行分發(fā)
NIOServerCnxnFactory.run
public void run() {
while (!ss.socket().isClosed()) {
try {
for (SelectionKey k : selectedList) {
// 獲取 client 的連接請(qǐng)求
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
//處理客戶端的讀/寫(xiě)請(qǐng)求
NIOServerCnxn c = (NIOServerCnxn) k.attachment();
c.doIO(k);//處理 IO 操作
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Unexpected ops in select "
+ k.readyOps());
}
}
}
selected.clear();
} catch (RuntimeException e) {
LOG.warn("Ignoring unexpected runtime exception", e);
} catch (Exception e) {
LOG.warn("Ignoring exception", e);
}
}
closeAll();
LOG.info("NIOServerCnxn factory exited run method");
}
NIOServerCnxn.doIO
void doIO(SelectionKey k) {
try {
//省略部分代碼..
if (k.isReadable()) {//處理讀請(qǐng)求,表示接收
//中間這部分邏輯用來(lái)處理報(bào)文以及粘包問(wèn)題
if (isPayload) { // not the case for 4letterword
readPayload();//處理報(bào)文
}
else {
// four letter words take care
// need not do anything else
return;
}
}
}
NIOServerCnxn.readRequest
讀取客戶端的請(qǐng)求,進(jìn)行具體的處理
private void readRequest() throws IOException {
zkServer.processPacket(this, incomingBuffer);
}
ZookeeperServer.processPacket
這個(gè)方法根據(jù)數(shù)據(jù)包的類(lèi)型來(lái)處理不同的數(shù)據(jù)包,對(duì)于讀寫(xiě)請(qǐng)求,我們主要關(guān)注下面這塊代碼即可
Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());
si.setOwner(ServerCnxn.me);
submitRequest(si);
ZookeeperServer.submitRequest
提交請(qǐng)求,這里面涉及到一個(gè) firstProcessor. 這個(gè)是一個(gè)責(zé)任鏈模式,如果當(dāng)前請(qǐng)求發(fā)到了 Leader 服務(wù)器,那么這條鏈路的設(shè)置過(guò)程應(yīng)該在LeaderZookeeperServer. setupRequestProcessors。那么它所組成的責(zé)任鏈為:
leader 節(jié)點(diǎn)請(qǐng)求流程

prepRequestProcessor,對(duì)于事務(wù)請(qǐng)求進(jìn)行一些列的預(yù)處理,比如創(chuàng)建請(qǐng)求事務(wù)頭、會(huì)話檢查、ACl 檢查等.
接下來(lái)把請(qǐng)求傳遞給 ProposalRequestProcessor,它負(fù)責(zé)準(zhǔn)備 proposal 并且發(fā)送給follower 節(jié)點(diǎn),另外,還會(huì)把事務(wù)請(qǐng)求轉(zhuǎn)發(fā)給 SyncRequestProcessor。
SynRequestProcessor 負(fù)責(zé)把事務(wù)持久化到磁盤(pán),并且會(huì)觸發(fā)一個(gè)AckRequestProcessor,它會(huì)產(chǎn)生一個(gè) ack 給自己。leader 會(huì)等待集群中的每個(gè)節(jié)點(diǎn)的ack,包括自己的。
CommitRequestProcessor 負(fù)責(zé)提交 proposal。前提是如果收到足夠多的 ack 時(shí)。
ToBeAppliedRequestProcessor,主要是為了維護(hù) Leader 類(lèi)的 toBeApplied 隊(duì)列,這個(gè)隊(duì)列中保存了已經(jīng)完成投票(即 commit)的 proposal,但是這些 proposal 還沒(méi)有應(yīng)用到本機(jī)的內(nèi)存中FinalRequestProcessos 是責(zé)任鏈中的最后一個(gè),主要負(fù)責(zé)把已經(jīng) commit 的寫(xiě)操作應(yīng)用到本機(jī),對(duì)于讀操作則從本機(jī)中讀取數(shù)據(jù)并返回給 client
follower 節(jié)點(diǎn)請(qǐng)求流程
如果當(dāng)前請(qǐng)求發(fā)送到了 Follow 服務(wù)器,那么這條鏈路的設(shè)置過(guò)程應(yīng)該在
FollowerZooKeeperServer. setupRequestProcessors。那么它所組成的責(zé)任鏈為

首先 FollowRequestProcessor 接收并處理 client 的請(qǐng)求,并把請(qǐng)求轉(zhuǎn)發(fā)到CommitRequestProcessor,此外還轉(zhuǎn)發(fā)寫(xiě)請(qǐng)求到 leader,還把讀請(qǐng)求直接轉(zhuǎn)發(fā)到FinalRequestProcessor。寫(xiě)請(qǐng)求則不一樣,CommitRequestProcessor 在寫(xiě)請(qǐng)求在FinalRequestProcessor 上 commit 之前保持等待狀態(tài)
集群模式下的處理流程
集群模式下,涉及到 zab 協(xié)議,所以處理流程比較復(fù)雜,可以基于這個(gè)圖來(lái)定位代碼的流程

——學(xué)自咕泡學(xué)院