zk通信機制源碼分析

zk通信本文講解客戶端Zookeeper
zk通信分為兩部分來說明,第一部分叫做消息的發(fā)送和接收,第二部分是客戶端和服務端會話的建立。

1.消息的發(fā)送和接收

我們平常構建一個zk客戶端都是如下代碼來構建,

ZooKeeper zooKeeper = new ZooKeeper("127.0.0.1:2181", 1000, new Watcher() {
                public void process(WatchedEvent event) {
                    //do sth
                }
            });

追蹤這段代碼進去是這樣的:

    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
            boolean canBeReadOnly, HostProvider aHostProvider,
            ZKClientConfig clientConfig) throws IOException {
        LOG.info("Initiating client connection, connectString=" + connectString
                + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);

        if (clientConfig == null) {
            clientConfig = new ZKClientConfig();
        }
        this.clientConfig = clientConfig;
        //1--1:創(chuàng)建默認watcher
        watchManager = defaultWatchManager();
        watchManager.defaultWatcher = watcher;
        ConnectStringParser connectStringParser = new ConnectStringParser(
                connectString);
        //1--2:設置zk服務器地址列表
        hostProvider = aHostProvider;
        //1--3:創(chuàng)建ClientCnxn,同時初始化outgoingqueue和pendingqueue
        cnxn = createConnection(connectStringParser.getChrootPath(),
                hostProvider, sessionTimeout, this, watchManager,
                getClientCnxnSocket(), canBeReadOnly);
        //1--4:初始化sendThread和eventThread
        cnxn.start();
    }

變量說明:

  • connectString:ZooKeeper集群的服務器地址列表
  • sessionTimeout:最終會引出三個時間設置:和服務端協(xié)商后的sessionTimeout、readTimeout、connectTimeout,服務器端使用協(xié)商后的sessionTimeout:即超過該時間后,客戶端沒有向服務器端發(fā)送任何請求(正常情況下客戶端會每隔一段時間發(fā)送心跳請求,此時服務器端會從新計算客戶端的超時時間點的),則服務器端認為session超時,清理數(shù)據(jù)。此時客戶端的ZooKeeper對象就不再起作用了,需要再重新new一個新的對象了。
    客戶端使用connectTimeout、readTimeout分別用于檢測連接超時和讀取超時,一旦超時,則該客戶端認為該服務器不穩(wěn)定,就會從新連接下一個服務器地址。
  • watcher:作為ZooKeeper對象一個默認的Watcher,用于接收一些事件通知。如和服務器連接成功的通知、斷開連接的通知、Session過期的通知等。
  • canBeReadOnly:是否是只讀客戶端
  • aHostProvider:zk服務器列表(就是由connectString得來的)

這段代碼可以做了這幾件事情:

  • 1:創(chuàng)建watcher,注意這個watcher是交給了Zookeeper類的ZKWatchManager來管理,這個內容會在這個系列后面講解,本文不涉及到。
  • 2:設置zk服務器列表,HostProvider實現(xiàn)
  • 3:創(chuàng)建ClientCnxn,同時初始化outgoingqueue和pendingqueue。
  • 4:初始化sendThread和eventThread,這兩個線程需要結合上面兩個隊列來說,都會在下面具體說明。
    上面兩個隊列和兩個線程都是ClientCnxn這個類中的,所以本文主角ClientCnxn登場。

ClientCnxn是客戶端和服務端底層通信接口,你可以認為是它來主導通信這件事情。打開ClientCnxn類源碼,有下面這幾個變量和內部類需要你關注:


兩隊列兩線程

線程和Packet

這樣就可以引出通信具體過程:

Zookeeper構建Packet,并放入outgoingQueue隊列中,ClientCnxn中發(fā)送線程sendThread發(fā)送outgoingQueue隊列中內容,同時sendThread也兼顧接受服務端響應(SendThread#readResponse處理服務端響應,clientCnxnSocket有兩個實現(xiàn)類ClientCnxnSocketNIO和ClientCnxnSocketNetty,但是這兩個類最終都是調用到SendThread#readResponse),做的處理就是生成watcherEvent事件放入eventThread中的waitingEvents隊列,等到eventThread處理,同時也放入pendingqueue(放入這個隊列的作用在于接收到響應后和原始我應該要發(fā)送的請求作對比來保證接受消息的順序性)。注意這里的步驟是去掉了客戶端和服務端建立連接過程,因為這部分也比較復雜,在這里展開不太好。

以上就是客戶端通信的具體過程,很繞,接下來我們跟著代碼來走一遍。比如創(chuàng)建一個節(jié)點。

    //客戶端調用create在集群內創(chuàng)建node,返回成功創(chuàng)建的路徑
    zooKeeper.create("/test","".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

    //--ZooKeeper對象負責創(chuàng)建出Request,并交給ClientCnxn來執(zhí)行,ZooKeeper對象再對返回結果進行處理。
    public String create(final String path, byte data[], List<ACL> acl,
            CreateMode createMode)
        throws KeeperException, InterruptedException
    {
        final String clientPath = path;
        PathUtils.validatePath(clientPath, createMode.isSequential());
        EphemeralType.validateTTL(createMode, -1);

        final String serverPath = prependChroot(clientPath);

        RequestHeader h = new RequestHeader();
        h.setType(createMode.isContainer() ? ZooDefs.OpCode.createContainer : ZooDefs.OpCode.create);
        //--1:構建CreateRequest包,
        CreateRequest request = new CreateRequest();
        CreateResponse response = new CreateResponse();
        request.setData(data);
        request.setFlags(createMode.toFlag());
        request.setPath(serverPath);
        if (acl != null && acl.size() == 0) {
            throw new KeeperException.InvalidACLException();
        }
        request.setAcl(acl);
        //2:傳遞給服務端
        //submitRequest將CreateRequest包轉換成Packet包,調用sendPacket將發(fā)送包放入隊列outgoingQueue,等待發(fā)送線程發(fā)送給服務端,調用線程wait方法等待返回
        //3:服務端響應后結果填充到response實體返回給客戶端
        ReplyHeader r = cnxn.submitRequest(h, request, response, null);
        if (r.getErr() != 0) {
            throw KeeperException.create(KeeperException.Code.get(r.getErr()),
                    clientPath);
        }
        if (cnxn.chrootPath == null) {
            return response.getPath();
        } else {
            return response.getPath().substring(cnxn.chrootPath.length());
        }
    }

你可以按照這個順序來追蹤代碼:
ClientCnxn#submitRequest()->ClientCnxn#queuePacket()

    public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request,
            Record response, AsyncCallback cb, String clientPath,
            String serverPath, Object ctx, WatchRegistration watchRegistration,
            WatchDeregistration watchDeregistration) {
        Packet packet = null;

        // Note that we do not generate the Xid for the packet yet. It is
        // generated later at send-time, by an implementation of ClientCnxnSocket::doIO(),
        // where the packet is actually sent.
        packet = new Packet(h, r, request, response, watchRegistration);
        packet.cb = cb;
        packet.ctx = ctx;
        packet.clientPath = clientPath;
        packet.serverPath = serverPath;
        packet.watchDeregistration = watchDeregistration;
        // The synchronized block here is for two purpose:
        // 1. synchronize with the final cleanup() in SendThread.run() to avoid race
        // 2. synchronized against each packet. So if a closeSession packet is added,
        // later packet will be notified.
        synchronized (state) {
            if (!state.isAlive() || closing) {
                conLossPacket(packet);
            } else {
                // If the client is asking to close the session then
                // mark as closing
                if (h.getType() == OpCode.closeSession) {
                    closing = true;
                }
                //放入outgoingQueue,等待發(fā)送線程發(fā)送給服務端
                outgoingQueue.add(packet);
            }
        }
        sendThread.getClientCnxnSocket().packetAdded();
        return packet;
    }

這里便是放入了outgoingQueue,等待sendThread處理。還記得我們在創(chuàng)建zk客戶端的時候啟動了這個線程,那么我們查看這個線程具體干了什么。
在這里,我先把和這部分無關代碼屏蔽掉,后面再展開(比如zk的心跳檢查,zk的斷線重連等等這里都先屏蔽掉)

        @Override
        public void run() {
            //---在這里賦值給了具體的實現(xiàn)類()  ClientCnxnSocketNIO和ClientCnxnSocketNetty
            clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
            clientCnxnSocket.updateNow();
            clientCnxnSocket.updateLastSendAndHeard();
            int to;
            long lastPingRwServer = Time.currentElapsedTime();
            final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
            InetSocketAddress serverAddress = null;
            while (state.isAlive()) {
                try {
                    //....此部分是屏蔽代碼部分

                    //sendThread發(fā)送
                    clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
                } catch (Throwable e) {
                    //...屏蔽代碼部分
            }
            }
            synchronized (state) {
                // When it comes to this point, it guarantees that later queued
                // packet to outgoingQueue will be notified of death.
                cleanup();
            }
            clientCnxnSocket.close();
            if (state.isAlive()) {
                eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
                        Event.KeeperState.Disconnected, null));
            }
            eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
                        Event.KeeperState.Closed, null));
            ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
                    "SendThread exited loop for session: 0x"
                           + Long.toHexString(getSessionId()));
        }

clientCnxnSocket有兩個實現(xiàn)類ClientCnxnSocketNIO和ClientCnxnSocketNetty,我們分別進入這兩個類并一直往下面走,你會發(fā)現(xiàn)我們要發(fā)送的packet都是來自之前所說ClientCnxn中的outgoingQueue,這也和前面對應之前,同時最終的發(fā)送給服務端都還是調到ClientCnxn.Packet#createBB,兜來兜去還是回到了ClientCnxn。這個方法便是將從outgoingQueue中封裝好的Packet真正發(fā)送給服務端的方法,源碼如下:

        public void createBB() {
            try {
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
                boa.writeInt(-1, "len"); // We'll fill this in later
                if (requestHeader != null) {
                    requestHeader.serialize(boa, "header");
                }
                if (request instanceof ConnectRequest) {
                    request.serialize(boa, "connect");
                    // append "am-I-allowed-to-be-readonly" flag
                    boa.writeBool(readOnly, "readOnly");
                } else if (request != null) {
                    request.serialize(boa, "request");
                }
                baos.close();
                this.bb = ByteBuffer.wrap(baos.toByteArray());
                this.bb.putInt(this.bb.capacity() - 4);
                this.bb.rewind();
            } catch (IOException e) {
                LOG.warn("Ignoring unexpected exception", e);
            }
        }

接下來我們就可以看到回調這個內容,既然是通信,肯定a發(fā)給b,b接受a,b回復a,a接受b,這里的a就是客戶端,b就是服務端,我們說完了a發(fā)給b,那我們再來看a如何接受b。
前面說到SendThread#readResponse處理服務端響應,但是不止它一個處理回調,因為Nio和Netty處理不一樣,我直接說明答案。

  • ClientCnxnSocketNIO的回調是ClientCnxnSocketNIO.doIO(),這個方法不僅是發(fā)送也是接受,
  • ClientCnxnSocketNetty的回調ClientCnxnSocketNetty.ZKClientHandler#channelRead0(不同版本這里不一樣,我的是3.5.7,但代碼都一樣)
    先看向ClientCnxnSocketNIO:
//1--回調事件和發(fā)送請求
    void doIO(List<Packet> pendingQueue, ClientCnxn cnxn)
      throws InterruptedException, IOException {
        SocketChannel sock = (SocketChannel) sockKey.channel();
        if (sock == null) {
            throw new IOException("Socket is null!");
        }
        //1--回調
        if (sockKey.isReadable()) {
            int rc = sock.read(incomingBuffer);
            if (rc < 0) {
                throw new EndOfStreamException(
                        "Unable to read additional data from server sessionid 0x"
                                + Long.toHexString(sessionId)
                                + ", likely server has closed socket");
            }
            if (!incomingBuffer.hasRemaining()) {
                incomingBuffer.flip();
                if (incomingBuffer == lenBuffer) {
                    recvCount.getAndIncrement();
                    readLength();
                } else if (!initialized) {//1--也就是當前客戶端和服務端之間正在進行會話創(chuàng)建
                    //1--首先判斷當前的客戶端狀態(tài)是否是“已初始化”
                    readConnectResult();
                    enableRead();
                    if (findSendablePacket(outgoingQueue,
                            sendThread.tunnelAuthInProgress()) != null) {
                        // Since SASL authentication has completed (if client is configured to do so),
                        // outgoing packets waiting in the outgoingQueue can now be sent.
                        enableWrite();
                    }
                    lenBuffer.clear();
                    incomingBuffer = lenBuffer;
                    updateLastHeard();
                    initialized = true;
                } else {//1--常規(guī)請求,create,getdata,exist,事件響應也在里面
                    sendThread.readResponse(incomingBuffer);
                    lenBuffer.clear();
                    incomingBuffer = lenBuffer;
                    updateLastHeard();
                }
            }
        }
        if (sockKey.isWritable()) {
            //4--并把Packet放入pendingQueue(CLientCnxn接受服務端返回結果的隊列),以便等待服務端響應后進行相應的處理
            //4--p.requestHeader != null&& p.requestHeader.getType() != OpCode.ping&& p.requestHeader.getType() != OpCode.auth
            Packet p = findSendablePacket(outgoingQueue,
                    sendThread.tunnelAuthInProgress());

            if (p != null) {
                updateLastSend();
                // If we already started writing p, p.bb will already exist
                if (p.bb == null) {
                    if ((p.requestHeader != null) &&
                            (p.requestHeader.getType() != OpCode.ping) &&
                            (p.requestHeader.getType() != OpCode.auth)) {
                        //1--生成一個客戶端請求序號xid設置到packet請求頭里面去
                        p.requestHeader.setXid(cnxn.getXid());
                    }
                    //---這里才是真正發(fā)送給服務端
                    p.createBB();
                }
                sock.write(p.bb);
                if (!p.bb.hasRemaining()) {
                    sentCount.getAndIncrement();
                    outgoingQueue.removeFirstOccurrence(p);
                    if (p.requestHeader != null
                            && p.requestHeader.getType() != OpCode.ping
                            && p.requestHeader.getType() != OpCode.auth) {
                        //1--這個if判斷在后面pendingQueue取出來的前面是有判斷的(常規(guī)的請求,create,getdata,exist)
                        //1--也就是在sendThread.readResponse
                        //1--ClientCnxn的pendingQueue
                        synchronized (pendingQueue) {
                            //1---在這里加進去pendingQueue
                            pendingQueue.add(p);
                        }
                    }
                }
            }
            if (outgoingQueue.isEmpty()) {
                // No more packets to send: turn off write interest flag.
                // Will be turned on later by a later call to enableWrite(),
                // from within ZooKeeperSaslClient (if client is configured
                // to attempt SASL authentication), or in either doIO() or
                // in doTransport() if not.
                disableWrite();
            } else if (!initialized && p != null && !p.bb.hasRemaining()) {
                // On initial connection, write the complete connect request
                // packet, but then disable further writes until after
                // receiving a successful connection response.  If the
                // session is expired, then the server sends the expiration
                // response and immediately closes its end of the socket.  If
                // the client is simultaneously writing on its end, then the
                // TCP stack may choose to abort with RST, in which case the
                // client would never receive the session expired event.  See
                // http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html
                disableWrite();
            } else {
                // Just in case
                enableWrite();
            }
        }
    }

ClientCnxnSocketNetty.ZKClientHandler#channelRead0代碼最終也是調用到sendThread.readResponse(),這里就不再多敘述。
直接看向sendThread.readResponse()

void readResponse(ByteBuffer incomingBuffer) throws IOException {
            ByteBufferInputStream bbis = new ByteBufferInputStream(
                    incomingBuffer);
            BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
            ReplyHeader replyHdr = new ReplyHeader();

            //反序列化(解碼)
            replyHdr.deserialize(bbia, "header");
            //1--這些if判斷在pendingQueue加進去時是有限制的
            if (replyHdr.getXid() == -2) {
                // -2 is the xid for pings
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got ping response for sessionid: 0x"
                            + Long.toHexString(sessionId)
                            + " after "
                            + ((System.nanoTime() - lastPingSentNs) / 1000000)
                            + "ms");
                }
                return;
            }
            if (replyHdr.getXid() == -4) {
                // -4 is the xid for AuthPacket               
                if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
                    state = States.AUTH_FAILED;                    
                    eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, 
                            Watcher.Event.KeeperState.AuthFailed, null) );
                    eventThread.queueEventOfDeath();
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got auth sessionid:0x"
                            + Long.toHexString(sessionId));
                }
                return;
            }
            if (replyHdr.getXid() == -1) {//10--通知消息
                // -1 means notification
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got notification sessionid:0x"
                        + Long.toHexString(sessionId));
                }
                //10--1:字節(jié)流反序列化WatcherEvent
                WatcherEvent event = new WatcherEvent();
                event.deserialize(bbia, "response");

                // convert from a server path to a client path
                //10--2:處理chrootpath
                if (chrootPath != null) {
                    String serverPath = event.getPath();
                    if(serverPath.compareTo(chrootPath)==0)
                        event.setPath("/");
                    else if (serverPath.length() > chrootPath.length())
                        event.setPath(serverPath.substring(chrootPath.length()));
                    else {
                        LOG.warn("Got server path " + event.getPath()
                                + " which is too short for chroot path "
                                + chrootPath);
                    }
                }
                //10--3:還原watchedEvent
                WatchedEvent we = new WatchedEvent(event);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Got " + we + " for sessionid 0x"
                            + Long.toHexString(sessionId));
                }
                //10--4:將watcher事件放入eventThread線程等待隊列中,回調watcher
                eventThread.queueEvent( we );
                return;
            }

            // If SASL authentication is currently in progress, construct and
            // send a response packet immediately, rather than queuing a
            // response as with other packets.
            if (tunnelAuthInProgress()) {
                GetSASLRequest request = new GetSASLRequest();
                request.deserialize(bbia,"token");
                zooKeeperSaslClient.respondToServer(request.getToken(),
                  ClientCnxn.this);
                return;
            }

            Packet packet;
            synchronized (pendingQueue) {
                if (pendingQueue.size() == 0) {
                    throw new IOException("Nothing in the queue, but got "
                            + replyHdr.getXid());
                }
                packet = pendingQueue.remove();
            }
            /*
             * Since requests are processed in order, we better get a response
             * to the first request!
             */
            try {
                //保證消息的有序性
                if (packet.requestHeader.getXid() != replyHdr.getXid()) {
                    packet.replyHeader.setErr(
                            KeeperException.Code.CONNECTIONLOSS.intValue());
                    throw new IOException("Xid out of order. Got Xid "
                            + replyHdr.getXid() + " with err " +
                            + replyHdr.getErr() +
                            " expected Xid "
                            + packet.requestHeader.getXid()
                            + " for a packet with details: "
                            + packet );
                }

                packet.replyHeader.setXid(replyHdr.getXid());
                packet.replyHeader.setErr(replyHdr.getErr());
                packet.replyHeader.setZxid(replyHdr.getZxid());
                if (replyHdr.getZxid() > 0) {
                    lastZxid = replyHdr.getZxid();
                }
                //--相應消息填充到response字段
                if (packet.response != null && replyHdr.getErr() == 0) {
                    packet.response.deserialize(bbia, "response");
                }

                if (LOG.isDebugEnabled()) {
                    LOG.debug("Reading reply sessionid:0x"
                            + Long.toHexString(sessionId) + ", packet:: " + packet);
                }
            } finally {
                //1--完成watch注冊等邏輯
                finishPacket(packet);
            }
        }

這里你可以看到邏輯就是最終調用eventThread.queueEvent(),這個方法就是生成watcherEvent事件放入eventThread中的waitingEvents隊列,和之前對應上。

2.客戶端和服務端會話的建立

前面先說明了sendThread和eventThread這些線程的作用,你再來看這樣一張圖,會清除一些。


zk客戶端會話建立

這里的會話建立其實是兩部分內容:

  • 客戶端與服務器端的TCP連接
  • 在TCP連接的基礎上建立session關聯(lián)
    我們看向之前省略掉的代碼:
@Override
        public void run() {
            //---在這里賦值給了具體的實現(xiàn)類()  ClientCnxnSocketNIO和ClientCnxnSocketNetty
            clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
            clientCnxnSocket.updateNow();
            clientCnxnSocket.updateLastSendAndHeard();
            int to;
            long lastPingRwServer = Time.currentElapsedTime();
            final int MAX_SEND_PING_INTERVAL = 10000; //10 seconds
            InetSocketAddress serverAddress = null;
            while (state.isAlive()) {
                try {
                    //--沒有鏈接
                    //--一旦客戶端開始創(chuàng)建Zookeeper對象,客戶端Zookeeper狀態(tài)state設置為CONNECTING,成功連接上服務器后,客戶端Zookeeper狀態(tài)變更為CONNECTED。
                    if (!clientCnxnSocket.isConnected()) {
                        // don't re-establish connection if we are closing
                        //--服務器正在關閉
                        if (closing) {
                            break;
                        }
                        if (rwServerAddress != null) {
                            serverAddress = rwServerAddress;
                            rwServerAddress = null;
                        } else {
                            //--1000毫秒后嘗試下一個服務端地址
                            //--當在sessionTimeout時間內,即還未超時,此時TCP連接斷開
                            //--服務器端仍然認為該sessionId處于存活狀態(tài)。此時,客戶端會選擇下一個ZooKeeper服務器地址進行TCP連接建立
                            //--TCP連接建立完成后,拿著之前的sessionId和密碼發(fā)送ConnectRequest請求
                            //--如果還未到該sessionId的超時時間,則表示自動重連成功,對客戶端用戶是透明的,一切都在背后默默執(zhí)行,ZooKeeper對象是有效的
                            //--休息的原因在于可能列表中地址都連不上,所以休息一段時間再去鏈接
                            serverAddress = hostProvider.next(1000);
                        }
                        //--發(fā)起鏈接
                        //-- 沒有tcp鏈接
                        //--內部還有session鏈接
                        startConnect(serverAddress);
                        //--更新最后一次發(fā)送和接受時間
                        clientCnxnSocket.updateLastSendAndHeard();
                    }

                    //--鏈接狀態(tài)
                    if (state.isConnected()) {
                        // determine whether we need to send an AuthFailed event.
                        if (zooKeeperSaslClient != null) {
                            boolean sendAuthEvent = false;
                            if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
                                try {
                                    zooKeeperSaslClient.initialize(ClientCnxn.this);
                                } catch (SaslException e) {
                                   LOG.error("SASL authentication with Zookeeper Quorum member failed: " + e);
                                    state = States.AUTH_FAILED;
                                    sendAuthEvent = true;
                                }
                            }
                            KeeperState authState = zooKeeperSaslClient.getKeeperState();
                            if (authState != null) {
                                if (authState == KeeperState.AuthFailed) {
                                    // An authentication error occurred during authentication with the Zookeeper Server.
                                    state = States.AUTH_FAILED;
                                    sendAuthEvent = true;
                                } else {
                                    if (authState == KeeperState.SaslAuthenticated) {
                                        sendAuthEvent = true;
                                    }
                                }
                            }

                            if (sendAuthEvent) {
                                eventThread.queueEvent(new WatchedEvent(
                                      Watcher.Event.EventType.None,
                                      authState,null));
                                if (state == States.AUTH_FAILED) {
                                    //--添加死亡事件
                                  eventThread.queueEventOfDeath();
                                }
                            }
                        }
                        //--鏈接狀態(tài)判斷讀超時
                        to = readTimeout - clientCnxnSocket.getIdleRecv();
                    } else {
                        //--斷開狀態(tài)判斷鏈接超時
                        to = connectTimeout - clientCnxnSocket.getIdleRecv();
                    }
                    
                    if (to <= 0) {
                        String warnInfo;
                        warnInfo = "Client session timed out, have not heard from server in "
                            + clientCnxnSocket.getIdleRecv()
                            + "ms"
                            + " for sessionid 0x"
                            + Long.toHexString(sessionId);
                        LOG.warn(warnInfo);
                        throw new SessionTimeoutException(warnInfo);
                    }
                    //--不斷發(fā)送ping通知,從當前時間計算ssession過期時間,會話遷移激活
                    //--會話遷移公式:
                    //long expireTime = currentTime + sessionTimeout);
                    //expireTime = (expireTime / expirationInterval + 1) * expirationInterval;
                    if (state.isConnected()) {
                        //1000(1 second) is to prevent race condition missing to send the second ping
                        //also make sure not to send too many pings when readTimeout is small 
                        int timeToNextPing = readTimeout / 2 - clientCnxnSocket.getIdleSend() - 
                                ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
                        //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
                        //--最后一次發(fā)送數(shù)據(jù)包的時間與當前時間的間隔  clientCnxnSocket.getIdleSend()
                        if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
                            //--也是放到outgoingQueue等待發(fā)送
                            sendPing();
                            clientCnxnSocket.updateLastSend();
                        } else {
                            if (timeToNextPing < to) {
                                to = timeToNextPing;
                            }
                        }
                    }

                    // If we are in read-only mode, seek for read/write server
                    if (state == States.CONNECTEDREADONLY) {
                        long now = Time.currentElapsedTime();
                        int idlePingRwServer = (int) (now - lastPingRwServer);
                        if (idlePingRwServer >= pingRwTimeout) {
                            lastPingRwServer = now;
                            idlePingRwServer = 0;
                            pingRwTimeout =
                                Math.min(2*pingRwTimeout, maxPingRwTimeout);
                            pingRwServer();
                        }
                        to = Math.min(to, pingRwTimeout - idlePingRwServer);
                    }
                    //sendThread發(fā)送
                    //---這里是建立tcp鏈接的地方,這里還是不清楚,到底哪里建立了tcp鏈接
                    //--ClientCnxnSocket負責和服務器創(chuàng)建一個TCP長連接
                    //---執(zhí)行IO操作,即發(fā)送請求隊列中的請求和讀取服務器端的響應數(shù)據(jù)。---NIO
                    //---Netty發(fā)送請求
                    //--- ClientCnxnSocketNIO.doTransport()->doIO()->(這里面有兩個發(fā)送)
                    //--- ClientCnxnSocketNetty.doTransport()->doWrite()->sendPktOnly()->sendPkt()
                    //---總而言之,這個方法不僅是發(fā)送請求的,也是接受返回的,也就是回調(ClientCnxnSocketNIO是的,ClientCnxnSocketNetty不是的)
                    //4回調ClientCnxnSocketNIO.doIO()
                    //4回調ClientCnxnSocketNetty.channelRead0
                    clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
                } catch (Throwable e) {
                    if (closing) {
                        if (LOG.isDebugEnabled()) {
                            // closing so this is expected
                            LOG.debug("An exception was thrown while closing send thread for session 0x"
                                    + Long.toHexString(getSessionId())
                                    + " : " + e.getMessage());
                        }
                        break;
                    } else {
                        // this is ugly, you have a better way speak up
                        if (e instanceof SessionExpiredException) {
                            LOG.info(e.getMessage() + ", closing socket connection");
                        } else if (e instanceof SessionTimeoutException) {
                            LOG.info(e.getMessage() + RETRY_CONN_MSG);
                        } else if (e instanceof EndOfStreamException) {
                            LOG.info(e.getMessage() + RETRY_CONN_MSG);
                        } else if (e instanceof RWServerFoundException) {
                            LOG.info(e.getMessage());
                        } else if (e instanceof SocketException) {
                            LOG.info("Socket error occurred: {}: {}", serverAddress, e.getMessage());
                        } else {
                            LOG.warn("Session 0x{} for server {}, unexpected error{}",
                                            Long.toHexString(getSessionId()),
                                            serverAddress,
                                            RETRY_CONN_MSG,
                                            e);
                        }
                        // At this point, there might still be new packets appended to outgoingQueue.
                        // they will be handled in next connection or cleared up if closed.
                        cleanAndNotifyState();
                    }
                }
            }
            synchronized (state) {
                // When it comes to this point, it guarantees that later queued
                // packet to outgoingQueue will be notified of death.
                cleanup();
            }
            clientCnxnSocket.close();
            if (state.isAlive()) {
                eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
                        Event.KeeperState.Disconnected, null));
            }
            eventThread.queueEvent(new WatchedEvent(Event.EventType.None,
                        Event.KeeperState.Closed, null));
            ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
                    "SendThread exited loop for session: 0x"
                           + Long.toHexString(getSessionId()));
        }

追蹤下去,看向ClientCnxn.SendThread#primeConnection中:

        //--和服務器建立連接成功后,客戶端sendThread發(fā)送ConnectRequest請求,申請建立session關聯(lián),此時服務器端會為該客戶端分配sessionId和密碼,同時開啟對該session是否超時的檢測。
        //--把該請求放到outgoingQueue請求隊列中,等待被發(fā)送給服務器。
        //--將該請求包裝成網(wǎng)絡I/O層的Packet對象,放入發(fā)送隊列outgoingQueue中去。
        void primeConnection() throws IOException {
            LOG.info("Socket connection established, initiating session, client: {}, server: {}",
                    clientCnxnSocket.getLocalSocketAddress(),
                    clientCnxnSocket.getRemoteSocketAddress());
            isFirstConnect = false;
            long sessId = (seenRwServerBefore) ? sessionId : 0;
            ConnectRequest conReq = new ConnectRequest(0, lastZxid,
                    sessionTimeout, sessId, sessionPasswd);
            // We add backwards since we are pushing into the front
            // Only send if there's a pending watch
            // TODO: here we have the only remaining use of zooKeeper in
            // this class. It's to be eliminated!
            if (!clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET)) {
                List<String> dataWatches = zooKeeper.getDataWatches();
                List<String> existWatches = zooKeeper.getExistWatches();
                List<String> childWatches = zooKeeper.getChildWatches();
                if (!dataWatches.isEmpty()
                        || !existWatches.isEmpty() || !childWatches.isEmpty()) {
                    Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator();
                    Iterator<String> existWatchesIter = prependChroot(existWatches).iterator();
                    Iterator<String> childWatchesIter = prependChroot(childWatches).iterator();
                    long setWatchesLastZxid = lastZxid;

                    while (dataWatchesIter.hasNext()
                           || existWatchesIter.hasNext() || childWatchesIter.hasNext()) {
                        List<String> dataWatchesBatch = new ArrayList<String>();
                        List<String> existWatchesBatch = new ArrayList<String>();
                        List<String> childWatchesBatch = new ArrayList<String>();
                        int batchLength = 0;

                        // Note, we may exceed our max length by a bit when we add the last
                        // watch in the batch. This isn't ideal, but it makes the code simpler.
                        while (batchLength < SET_WATCHES_MAX_LENGTH) {
                            final String watch;
                            if (dataWatchesIter.hasNext()) {
                                watch = dataWatchesIter.next();
                                dataWatchesBatch.add(watch);
                            } else if (existWatchesIter.hasNext()) {
                                watch = existWatchesIter.next();
                                existWatchesBatch.add(watch);
                            } else if (childWatchesIter.hasNext()) {
                                watch = childWatchesIter.next();
                                childWatchesBatch.add(watch);
                            } else {
                                break;
                            }
                            batchLength += watch.length();
                        }

                        SetWatches sw = new SetWatches(setWatchesLastZxid,
                                                       dataWatchesBatch,
                                                       existWatchesBatch,
                                                       childWatchesBatch);
                        RequestHeader header = new RequestHeader(-8, OpCode.setWatches);
                        Packet packet = new Packet(header, new ReplyHeader(), sw, null, null);
                        outgoingQueue.addFirst(packet);
                    }
                }
            }

            for (AuthData id : authInfo) {
                outgoingQueue.addFirst(new Packet(new RequestHeader(-4,
                        OpCode.auth), null, new AuthPacket(0, id.scheme,
                        id.data), null, null));
            }
            //--把該請求放到outgoingQueue請求隊列中,等待被發(fā)送給服務器。
            outgoingQueue.addFirst(new Packet(null, null, conReq,
                    null, null, readOnly));
            clientCnxnSocket.connectionPrimed();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Session establishment request sent on "
                        + clientCnxnSocket.getRemoteSocketAddress());
            }
        }

其實也是放入outgoingQueue等待sendThread發(fā)送。那么肯定這個請求和之前我們的消息發(fā)送和接收不一樣,因為這個發(fā)送這個請求時客戶端和服務端并沒有建立鏈接,接著我們看向ClientCnxnSocketNIO#doIO和ClientCnxnSocketNetty.ZKClientHandler#channelRead0的回調中有這樣一個方法:ClientCnxnSocket#readConnectResult,這個是客戶端發(fā)送建立連接時服務端返回的響應,我們看向代碼:

   void readConnectResult() throws IOException {
        if (LOG.isTraceEnabled()) {
            StringBuilder buf = new StringBuilder("0x[");
            for (byte b : incomingBuffer.array()) {
                buf.append(Integer.toHexString(b) + ",");
            }
            buf.append("]");
            LOG.trace("readConnectResult " + incomingBuffer.remaining() + " "
                    + buf.toString());
        }
        ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
        BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
        //4反序列化,得到ConnectResponse對象
        //4我們就可以獲取到服務器端給我們客戶端分配的sessionId和passwd,以及協(xié)商后的sessionTimeOut時間。
        ConnectResponse conRsp = new ConnectResponse();
        conRsp.deserialize(bbia, "connect");

        // read "is read-only" flag
        boolean isRO = false;
        try {
            isRO = bbia.readBool("readOnly");
        } catch (IOException e) {
            // this is ok -- just a packet from an old server which
            // doesn't contain readOnly field
            LOG.warn("Connected to an old server; r-o mode will be unavailable");
        }

        //4取到Zookeeper服務端分配的會話SessionId
        this.sessionId = conRsp.getSessionId();
        sendThread.onConnected(conRsp.getTimeOut(), this.sessionId,
                conRsp.getPasswd(), isRO);
    }
        //--這個函數(shù)跟服務端沒關系
        //--如果重新建立TCP連接后,已經達到該sessionId的超時時間了(服務器端就會清理與該sessionId相關的數(shù)據(jù)):
        // 則返回給客戶端的sessionTimeout時間為0,sessionid為0,密碼為空字節(jié)數(shù)組。(這是是服務端做的事情,客戶端接收事件才是這個函數(shù),才是下面的事件)
        // 客戶端接收到該數(shù)據(jù)后,會判斷協(xié)商后的sessionTimeout時間是否小于等于0,
        // 如果小于等于0,則使用eventThread線程先發(fā)出一個KeeperState.Expired事件,通知相應的Watcher,
        // 然后結束EventThread線程的循環(huán),開始走向結束。此時ZooKeeper對象就是無效的了,必須要重新new一個新的ZooKeeper對象,分配新的sessionId了。
        void onConnected(int _negotiatedSessionTimeout, long _sessionId,
                byte[] _sessionPasswd, boolean isRO) throws IOException {
            negotiatedSessionTimeout = _negotiatedSessionTimeout;
            if (negotiatedSessionTimeout <= 0) {
                state = States.CLOSED;

                eventThread.queueEvent(new WatchedEvent(
                        Watcher.Event.EventType.None,
                        Watcher.Event.KeeperState.Expired, null));
                eventThread.queueEventOfDeath();

                String warnInfo;
                warnInfo = "Unable to reconnect to ZooKeeper service, session 0x"
                    + Long.toHexString(sessionId) + " has expired";
                LOG.warn(warnInfo);
                throw new SessionExpiredException(warnInfo);
            }
            if (!readOnly && isRO) {
                LOG.error("Read/write client got connected to read-only server");
            }
            readTimeout = negotiatedSessionTimeout * 2 / 3;
            connectTimeout = negotiatedSessionTimeout / hostProvider.size();
            //1--回調方法,通知Hostprovider
            hostProvider.onConnected();
            sessionId = _sessionId;
            sessionPasswd = _sessionPasswd;
            state = (isRO) ?
                    States.CONNECTEDREADONLY : States.CONNECTED;
            seenRwServerBefore |= !isRO;
            LOG.info("Session establishment complete on server "
                    + clientCnxnSocket.getRemoteSocketAddress()
                    + ", sessionid = 0x" + Long.toHexString(sessionId)
                    + ", negotiated timeout = " + negotiatedSessionTimeout
                    + (isRO ? " (READ-ONLY mode)" : ""));
            //4通過EventThread發(fā)送一個SyncConnected連接成功事件
            KeeperState eventState = (isRO) ?
                    KeeperState.ConnectedReadOnly : KeeperState.SyncConnected;
            //4代表客戶端與服務器會話創(chuàng)建成功,并將該事件傳遞給EventThread線程
            //4ventThread線程收到事件后,會從ClientWatchManager管理器中查詢出對應的Watcher,
            //4針對SyncConnected-None事件,那么就直接找出存儲的默認Watcher,然后將其放到
            //4 EventThread的watingEvents隊列中去。
            eventThread.queueEvent(new WatchedEvent(
                    Watcher.Event.EventType.None,
                    eventState, null));
        }

最終都是調用eventThread.queueEvent(),只是放入的事件類型不一樣.
以上就是本文全部內容。

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容