Zookeeper如何網(wǎng)絡(luò)通信+監(jiān)聽?看一看Watch機(jī)制

Zookeeper Watch機(jī)制

Watcher是一種簡單的機(jī)制,使客戶端得到關(guān)于ZooKeeper集合中的更改的通知。 客戶端可以在讀取特定znode時設(shè)置Watcher。Watcher會向注冊的客戶端發(fā)送任何znode(客戶端注冊表)更改的通知。

1. 概述

ZooKeeper Watch 機(jī)制是指,客戶端在所有的讀命令上告知服務(wù)端:這個節(jié)點或者子節(jié)點變化時通知我,具體來說,支持的寫操作有:

  • getData
  • getChildren
  • exists

例如,我們在命令行可以輸入 get -w /foo,其中 -w 參數(shù)就是用于告知 ZooKeeper 服務(wù)端,當(dāng)前客戶端想在 /foo 節(jié)點上設(shè)置一個監(jiān)聽器。

ZooKeeper Watch 機(jī)制的兩個細(xì)節(jié):

  • wactch 是一次性觸發(fā)的(除了永久遞歸 watch),如果客戶端如果在一個 watch 通知后繼續(xù)收到相同節(jié)點的 watch 通知,那么必須再次注冊 watch 一次;
  • 服務(wù)端發(fā)給客戶端的 watch 通知并不包含具體的節(jié)點數(shù)據(jù),其起到的作用非常存粹:告知客戶端其關(guān)注的節(jié)點發(fā)生了 watch 事件;

本篇博客在客戶端角度,從底層出發(fā),看一下Zookeeper Watch機(jī)制。開始之前,先思考一下以下疑問,帶著這些問題進(jìn)行Zookeeper客戶端的學(xué)習(xí)。

  • Zookeeper 客戶端如何進(jìn)行網(wǎng)絡(luò)請求
  • Zookeeper 如何處理同步和異步請求
  • Zookeeper如何注冊和觸發(fā)Watcher
  • 我們常用的ZkClient又做了什么?

2. 客戶端網(wǎng)絡(luò)IO模型

Copy From ZooKeeper客戶端源碼解讀(網(wǎng)絡(luò)I/O)

2.1 整體結(jié)構(gòu)圖

ClientCnxnSocket 封裝了底層Socket通信層, ClientCnxnSocket整體結(jié)構(gòu)如圖所示:


2.2 Packet

Packet是ClientCnxn內(nèi)部定義的一個對協(xié)議層的封裝,作為ZooKeeper]中請求與響應(yīng)的載體。



從上圖可以看出,Packet中包含了請求頭、響應(yīng)頭、請求體、響應(yīng)體、節(jié)點路徑和注冊的Watcher等信息。

2.3 SenderThread

2.3.1 基本概念

SendThread是客戶端ClientCnxn內(nèi)部一個核心的I/O調(diào)度線程,用于管理客戶端和服務(wù)端之間的所有網(wǎng)絡(luò)I/O操作。在ZooKeeper客戶端的實際運行過程中

  • SendThead維護(hù)了客戶端和服務(wù)端之間的會話生命周期,其通過在一定的周期頻率內(nèi)向服務(wù)器發(fā)送一個PING包來實現(xiàn)心跳檢測,同時,在會話周期內(nèi),如果客戶端和服務(wù)端之間出現(xiàn)TCP連接斷開的情況,那么就會自動而且透明化完成重連操作。
  • 另一方面,SendThread管理了客戶端所有的請求發(fā)送和響應(yīng)接收操作,其將上層客戶端API操作轉(zhuǎn)換成相應(yīng)的請求協(xié)議并發(fā)送到服務(wù)端,并完成對同步調(diào)用的返回和異步調(diào)用的回調(diào)。
  • 同時,SendThread還負(fù)責(zé)將來自服務(wù)端的事件傳遞給EventThread去處理。

Sender進(jìn)程就一直嘗試與Zookeeper服務(wù)器進(jìn)行交互:

//org.apache.zookeeper.ClientCnxn.SendThread

@Override
public void run() {
   // ...
   while (state.isAlive()) {
       clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
   }
   //...
}

 // org.apache.zookeeper.ClientCnxnSocketNIO#doTransport
 void doTransport(...) {
         ...
        //監(jiān)聽Selector,對讀和寫進(jìn)行操作
        for (SelectionKey k : selected) {
            ...
            if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                //doIO
                doIO(pendingQueue, cnxn);
            }
          ...
        }

    }

void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
     SocketChannel sock = (SocketChannel) sockKey.channel();
     if (sockKey.isReadable()) {
       // 讀操作
     }

      if (sockKey.isWritable()) {
        //寫操作
      } 
}


2.3.2 outgoingQueue和pendingQueue

    /**
     * These are the packets that have been sent and are waiting for a response.
     */
    private final Queue<Packet> pendingQueue = new ArrayDeque<>();

    /**
     * These are the packets that need to be sent.
     */
    private final LinkedBlockingDeque<Packet> outgoingQueue = new LinkedBlockingDeque<Packet>();

  • ClientCnxn中,有兩個比較核心的隊列outgoingQueue和pendingQueue,分別代表客戶端的請求發(fā)送隊列和服務(wù)端響應(yīng)的等待隊列。

    • outgoingQueue隊列是一個請求發(fā)送隊列,專門用于存儲那些需要發(fā)送到服務(wù)端的Packet集合
    • pendingQueue隊列是為了存儲那些已經(jīng)從客戶端發(fā)送到服務(wù)端的,但是需要等待服務(wù)端響應(yīng)的Packet集合。(實現(xiàn)同步異步請求的關(guān)鍵

2.3.3 發(fā)送數(shù)據(jù)

在正常情況下(即客戶端與服務(wù)端之間的TCP連接正常且會話有效的情況下):

  • 用戶通過各種接口發(fā)送請求,都會通過submitRequest方法,將請求封裝為packet, 被保存到outgoingQueue隊列方法
//org.apache.zookeeper.ClientCnxn#submitRequest
public ReplyHeader submitRequest(RequestHeader h, Record request,
            Record response, WatchRegistration watchRegistration)
            throws InterruptedException {
        ReplyHeader r = new ReplyHeader();
        Packet packet = queuePacket(h, r, request, response, null, null, null,
                    null, watchRegistration);
        synchronized (packet) {
            while (!packet.finished) {
                packet.wait();
            }
        }
        return r;
    }

  • SenderThreadoutgoingQueue隊列中提取一個可發(fā)送的Packet對象,同時生成一個客戶端請求序號XID,并將其設(shè)置到Packet請求頭中去,然后將其進(jìn)行序列化后進(jìn)行發(fā)送。這里提到的獲取一個可發(fā)送的Packet對象指的哪些Packet呢?在outgoingQueue隊列中的Packet整體上是按照先進(jìn)先出的順序被處理的,但是如果檢測到客戶端與服務(wù)端之間正在進(jìn)行SASL權(quán)限的話,那么那些不含請求頭(requestHeader)的Packet(例如會話創(chuàng)建請求)是可以被發(fā)送的,其余的都無法發(fā)送。
  • 請求發(fā)送完畢后,會立即將該Packet保存到pendingQueue隊列中,以便等待服務(wù)端響應(yīng)返回后進(jìn)行相應(yīng)的處理。
//// org.apache.zookeeper.ClientCnxnSocketNIO#doIO
if (sockKey.isWritable()) {
    // 會從outgoingQueue隊列中提取一個可發(fā)送的Packet對象
    Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());
    ...
    // 發(fā)送請求
    sock.write(p.bb);
    if (!p.bb.hasRemaining()) {
         sentCount.getAndIncrement();
          outgoingQueue.removeFirstOccurrence(p);
          if (p.requestHeader != null
              && p.requestHeader.getType() != OpCode.ping
               && p.requestHeader.getType() != OpCode.auth) {
                   synchronized (pendingQueue) {

                     //寫入pendingQueue
                      pendingQueue.add(p);
                   }
             }
       }
     ... 
}

2.3.4 響應(yīng)接收

客戶端獲取到來自服務(wù)端的完整響應(yīng)數(shù)據(jù)后,根據(jù)不同的客戶端請求類型,會進(jìn)行不同的處理

  • 如果檢測到當(dāng)前客戶端還未進(jìn)行初始化,那么說明當(dāng)前客戶端與服務(wù)端之間正在進(jìn)行會話創(chuàng)建,那么就直接將接收到的ByteBuffer(incomingBuffer)序列化為ConnectResponse對象
  • 如果當(dāng)前客戶端已經(jīng)處于正常的會話周期,那么接收到的服務(wù)端響應(yīng)是一個事件,讓eventThread觸發(fā)相應(yīng)的watcher。
  • 如果是一個常規(guī)的請求響應(yīng)(指的是Create、GetData和Exist等操作請求),那么會從pendingQueue隊列中取出一個Packet來進(jìn)行相應(yīng)的處理。通過在finishPacket方法中處理響應(yīng):
    • 如果存在Watcher,就注冊
    • 如果是同步請求,可以讓調(diào)用方從阻塞中恢復(fù)。
    • 如果是異步請求,放入EventQueue等待后續(xù)通知
            // org.apache.zookeeper.ClientCnxnSocketNIO#doIO
            if (sockKey.isReadable()) {
                  int rc = sock.read(incomingBuffer);
                  sendThread.readResponse(incomingBuffer);
                  lenBuffer.clear();
                  incomingBuffer = lenBuffer;
                  updateLastHeard();

             }

            // org.apache.zookeeper.ClientCnxnSocketNIO#readResponse
             void readResponse(ByteBuffer incomingBuffer) throws IOException {
                        ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
                        BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
                        ReplyHeader replyHdr = new ReplyHeader();
                        replyHdr.deserialize(bbia, "header");
                 switch (replyHdr.getXid()) {
                     ...

                    // -1 means notification(WATCHER_EVENT)
                    // 如果是事務(wù)通知
                     case NOTIFICATION_XID:
                            LOG.debug("Got notification session id: 0x{}",
                                Long.toHexString(sessionId));
                            WatcherEvent event = new WatcherEvent();
                            event.deserialize(bbia, "response");
                            ...
                            WatchedEvent we = new WatchedEvent(event);
                            LOG.debug("Got {} for session id 0x{}", we, Long.toHexString(sessionId));
                            //讓eventThread觸發(fā)相應(yīng)的watcher
                            eventThread.queueEvent(we);
                            return;
                      default:
                            break;
                }

               //如果是常規(guī)應(yīng)答
               Packet packet;
               synchronized (pendingQueue) {
                    if (pendingQueue.size() == 0) {
                          throw new IOException("Nothing in the queue, but got " + replyHdr.getXid());
                     }
                     packet = pendingQueue.remove();
                }
               ...
               // 處理Watcher注冊等邏輯
               finishPacket(packet);

             }  

2.4 EventThread

EventThread中有一個waitingEvents隊列,用于臨時存放那么需要被觸發(fā)的Object,包括那些客戶點注冊的Watcher和異步接口中注冊的回到器AsyncCallBack

  • SenderThread收到 event通知請求 時,會將Watcher 加入到 EventThread
  • SenderThread收到 應(yīng)答請求 時,會將AsyncCallBack 加入到 EventThread

同時,EventThread會不斷地從waitingEvents這個隊列中取出Object,識別出其具體類型(Watcher或者AsynCallBack),并分別調(diào)用process和processResult接口方法來實現(xiàn)對事件的觸發(fā)和回調(diào)

           //org.apache.zookeeper.ClientCnxn.EventThread#run   
           public void run() {
                    ...
                  while (true) {
                     Object event = waitingEvents.take();
                     if (event == eventOfDeath) {
                          wasKilled = true;
                      } else {
                           processEvent(event);
                       }

                   }

                        ...

           //org.apache.zookeeper.ClientCnxn.EventThread#processEvent   
           private void processEvent(Object event) {
                     try {
                         if (event instanceof WatcherSetEventPair) {
                            WatcherSetEventPair pair = (WatcherSetEventPair) event;
                                   for (Watcher watcher : pair.watchers) {
                                 try {
                                     watcher.process(pair.event);
                                 } catch (Throwable t) {
                                     LOG.error("Error while calling watcher ", t);
                                 }
                             } 
                         } else {
                             Packet p = (Packet) event;
                             int rc = 0;
                             StatCallback cb = (StatCallback) p.cb;
                            cb.processResult(rc, clientPath, p.ctx,
                                                 ((ExistsResponse) p.response)
                                                         .getStat());
                         }
                     }

              }

3. Zookeeper 客戶端Watcher機(jī)制原理

ZooKeeper 允許客戶端向服務(wù)端注冊一個Watcher監(jiān)聽,當(dāng)服務(wù)端的一些指定事件觸發(fā)了這個Watcher,那么就會向指定客戶端發(fā)送一個事件通知來實現(xiàn)分布式的通知功能。ZooKeeper的Watcher機(jī)制主要包括客戶端線程客戶端WatchManagerZooKeeper服務(wù)器三部分。

  1. 客戶端向 ZooKeeper 服務(wù)器注冊 Watcher

  2. ZooKeeper 注冊成功后,會對客戶端做出應(yīng)答。

  3. 客戶端將 Watcher 對象存儲在客戶端的 WatchManager 中;

  4. ZooKeeper 服務(wù)端觸發(fā) Watcher 事件后,向客戶端發(fā)送通知;

  5. 客戶端線程從 WatchManager 中取出對應(yīng)的 Watcher 對象來執(zhí)行回調(diào)邏輯。

getData接口為例,過一下客戶端的注冊邏輯:

注冊

  1. 當(dāng)發(fā)送一個帶有 Watch 事件的請求時,客戶端首先會把該會話標(biāo)記為帶有 Watch 監(jiān)控的事件請求,發(fā)送給服務(wù)器。
// org.apache.zookeeper.ZooKeeper#getData
               public byte[] getData(final String path, Watcher watcher, Stat stat){
                    ...
                    WatchRegistration wcb = null;
                    if (watcher != null) {
                    wcb = new DataWatchRegistration(watcher, clientPath);
                    }
                    RequestHeader h = new RequestHeader();
                    request.setWatch(watcher != null);
                    ...
                    GetDataResponse response = new GetDataResponse();
                    ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
               }

             //org.apache.zookeeper.ClientCnxn#submitRequest
             public ReplyHeader submitRequest(RequestHeader h, Record request,
                         Record response, WatchRegistration watchRegistration)
                         throws InterruptedException {
                     ReplyHeader r = new ReplyHeader();
                     Packet packet = queuePacket(h, r, request, response, null, null, null,
                                 null, watchRegistration);
                     synchronized (packet) {
                         while (!packet.finished) {
                             packet.wait();
                         }
                     }
                     return r;
                 }

  1. 上一步的“發(fā)送”其實就是寫入outgoingQueue, 等待SenderThread發(fā)送
  2. 調(diào)用負(fù)責(zé)處理服務(wù)器響應(yīng)的 SendThread線程類中的readResponse方法接收服務(wù)端的回調(diào),并在最后執(zhí)行 finishPacket()方法將 Watch 注冊到 ZKWatchManager中。
               // org.apache.zookeeper.ClientCnxn#finishPacket
               private void finishPacket(Packet p) {
                    int err = p.replyHeader.getErr();
                    if (p.watchRegistration != null) {
                        p.watchRegistration.register(err);
                    }
                    ...
               }

客戶端回調(diào)的處理過程

  1. 客戶端使用 SendThread.readResponse()方法來統(tǒng)一處理服務(wù)端的相應(yīng)。通過請求頭信息判斷為事件通知類型,首先將己收到的字節(jié)流反序列化轉(zhuǎn)換成 WatcherEvent對象。然后調(diào)用 eventThread.queueEvent( )方法將接收到的事件交給 EventThread 線程進(jìn)行處理。
  2. 按照通知的事件類型,從 ZKWatchManager 中查詢注冊過的客戶端 Watch 信息。客戶端在查詢到對應(yīng)的 Watch 信息后,會將其從 ZKWatchManager 的管理中刪除。
  3. 將查詢到的 Watcher 存儲到 waitingEvents隊列中,調(diào)用 EventThread 類中的 run 方法會循環(huán)取出在 waitingEvents隊列中等待的 Watcher 事件進(jìn)行處理。
                 public void queueEvent(WatchedEvent event) {
                            if (event.getType() == EventType.None
                                    && sessionState == event.getState()) {
                                return;
                            }
                            sessionState = event.getState();

                            // materialize the watchers based on the event
                            WatcherSetEventPair pair = new WatcherSetEventPair(
                                    watcher.materialize(event.getState(), event.getType(),
                                            event.getPath()),
                                            event);
                            // queue the pair (watch set & event) for later processing
                            waitingEvents.add(pair);
                        }

4. ZkClient”奪權(quán)“EventThread

我們常用的ZkClient其實就是一個Watcher:

               public class ZkClient implements Watcher {

               }

在創(chuàng)建Zookeeper客戶端的時候,它將自己當(dāng)作DefaultWatcher傳入,并且之后再設(shè)置監(jiān)聽都 watch = false,對所有注冊的事件都采用ZkClient來處理。

即ZkClient全面接手waitingEvents的事件處理邏輯,調(diào)用自己內(nèi)部實現(xiàn)的一個Event隊列。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容