zookeeper單機(jī)版-client啟動(dòng)

前言

在前面介紹了zookeeper server端的啟動(dòng)過(guò)程,現(xiàn)在我們分析zookeeper client啟動(dòng)過(guò)程

創(chuàng)建客戶端連接對(duì)象

一般情況下使用zookeeper原生庫(kù)創(chuàng)建建立的方式如下

Zookeeper zookeeper = new Zookeeper(connectionString,sessionTimeout,watcher)

我直接看Zookeeper類(lèi)初始化的源代碼

public ZooKeeper(
        String connectString,
        int sessionTimeout,
        Watcher watcher,
        boolean canBeReadOnly,
        HostProvider aHostProvider,
        ZKClientConfig clientConfig) throws IOException {
        LOG.info(
            "Initiating client connection, connectString={} sessionTimeout={} watcher={}",
            connectString,
            sessionTimeout,
            watcher);

        if (clientConfig == null) {
            clientConfig = new ZKClientConfig();
        }
         //clientConfig存儲(chǔ)zookeeper客戶端一些可配置屬性的信息
        this.clientConfig = clientConfig;
       //創(chuàng)建客戶端的watcher的管理器
        watchManager = defaultWatchManager();
        //設(shè)置watcher管理器默認(rèn)的watcher
        watchManager.defaultWatcher = watcher;
        //根據(jù)用戶提供的connectString穿件ConnectStringParser對(duì)象,下面會(huì)解析ConnectStringParser對(duì)象的作用
        ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
      //用戶提供的連接信息中可能包含多個(gè)ip地址,那么當(dāng)客戶端去連接zookeeper server的時(shí)候應(yīng)該選擇哪一個(gè)ip去連接呢?通過(guò)hostProvider來(lái)封裝這個(gè)邏輯
        hostProvider = aHostProvider;
      //客戶端的連接對(duì)象,這個(gè)對(duì)象是實(shí)現(xiàn)客戶端連接服務(wù)端的核心,在下面我們會(huì)詳細(xì)解析
        cnxn = createConnection(
            connectStringParser.getChrootPath(),
            hostProvider,
            sessionTimeout,
            this,
            watchManager,
            getClientCnxnSocket(),
            canBeReadOnly);
        //啟動(dòng)客戶端相關(guān)的一些線程
        cnxn.start();
    }
ConnectStringParser

我們先解析下ConnectStringParser這個(gè)類(lèi)


ConnectStringParser.png

從上圖我們可以看出ConnectStringParser類(lèi)有兩個(gè)重要的屬性chrootPath,serverAddresses。ConnectStringParser就是根據(jù)用戶傳入的連接信息解析出這兩個(gè)屬性的值,chrootPath是用戶傳入的連接信息中包含的路徑信息,比如用戶提供的連接信息是"192.168.11.1:2181,192.168.11.2:2181/tt",那么通過(guò)ConnectStringParser的解析chrootPath=tt,serverAddresses用來(lái)存儲(chǔ)用戶提供的地址和端口對(duì)的解析結(jié)果,同樣是上面的例子,serverAddresses解析的結(jié)果是["192.168.11.1:2181","192.168.11.2:2181"]
現(xiàn)在給出ConnectStringParser解析用戶提供的連接信息的源代碼

public ConnectStringParser(String connectString) {
        // parse out chroot, if any
        //取得chrootPath的分解符的位置
        int off = connectString.indexOf('/');
        if (off >= 0) {
            //解析出chrootPath
            String chrootPath = connectString.substring(off);
            // ignore "/" chroot spec, same as null
            if (chrootPath.length() == 1) {
                this.chrootPath = null;
            } else {
                PathUtils.validatePath(chrootPath);
                this.chrootPath = chrootPath;
            }
            //解析出客戶端連接服務(wù)端的ip:port對(duì)信息
            connectString = connectString.substring(0, off);
        } else {
            this.chrootPath = null;
        }
       //通過(guò)逗號(hào)分隔符分割出每個(gè)ip:port的連接信息
        List<String> hostsList = split(connectString, ",");
        for (String host : hostsList) {
            int port = DEFAULT_PORT;
            try {
                //解析出ip和port
                String[] hostAndPort = ConfigUtils.getHostAndPort(host);
                host = hostAndPort[0];
                if (hostAndPort.length == 2) {
                    port = Integer.parseInt(hostAndPort[1]);
                }
            } catch (ConfigException e) {
                e.printStackTrace();
            }
            //根據(jù)ip和port創(chuàng)建InetSocketAddress對(duì)象,然后加入serverAddresses中
            serverAddresses.add(InetSocketAddress.createUnresolved(host, port));
        }
    }


HostProvider

通過(guò)對(duì)ConnectStringParser的解析,我們知道用戶可能會(huì)提供多個(gè)連接服務(wù)端的IP:Port,那么客戶端應(yīng)該選擇哪一個(gè)去連接服務(wù)端呢?這個(gè)就是HostProvider的工作了。HostProvider默認(rèn)實(shí)現(xiàn)是StaticHostProvider


StaticHostProvider.png

上圖是StaticHostProvider屬性圖,紅框框出來(lái)的三個(gè)屬性serverAddresses,lastIndex,currentIndex是StaticHostProvider實(shí)現(xiàn)選擇一個(gè)服務(wù)器進(jìn)行服務(wù)端連接的核心。StaticHostProvider的next()方法向外部提供了選取一個(gè)服務(wù)器進(jìn)行連接的封裝

next()
public InetSocketAddress next(long spinDelay) {
        boolean needToSleep = false;
        InetSocketAddress addr;

        synchronized (this) {
           //reconfigMode是zookeeper為了server端連接的負(fù)債均衡而設(shè)計(jì)的一個(gè)功能
            if (reconfigMode) {
                addr = nextHostInReconfigMode();
                if (addr != null) {
                    currentIndex = serverAddresses.indexOf(addr);
                    return resolve(addr);
                }
                //tried all servers and couldn't connect
                reconfigMode = false;
                needToSleep = (spinDelay > 0);
            }
           //更新currentIndex
            ++currentIndex;
            //如果currentIndex和服務(wù)器列表長(zhǎng)度一樣大,那么重置currentIndex為0
            if (currentIndex == serverAddresses.size()) {
                currentIndex = 0;
            }
           //從服務(wù)器列表中獲取一個(gè)服務(wù)器
            addr = serverAddresses.get(currentIndex);
           //判斷是不是需要sleep 一會(huì)
            needToSleep = needToSleep || (currentIndex == lastIndex && spinDelay > 0);
            if (lastIndex == -1) {
                // We don't want to sleep on the first ever connect attempt.
                //初始化lastIndex為0,如果一開(kāi)始lastIndex就設(shè)置成0而不是-1那么會(huì)導(dǎo)致第一次連接時(shí)候needToSleep就是true,這樣顯然不合適
                lastIndex = 0;
            }
        }
        if (needToSleep) {
            try {
                //休眠一會(huì)會(huì),當(dāng)服務(wù)器列表中的所有的服務(wù)器都被連接一遍之后,再次去連接服務(wù)器的時(shí)候需要休眠一會(huì)(個(gè)人理解:既然所有的服務(wù)器連接都沒(méi)連接上,那么服務(wù)端可能在忙著什么事情,等一會(huì)給服務(wù)器端一些喘息的機(jī)會(huì))
                Thread.sleep(spinDelay);
            } catch (InterruptedException e) {
                LOG.warn("Unexpected exception", e);
            }
        }
      //返回服務(wù)器ip信息
        return resolve(addr);
    }

ClientCnxn

客戶端連接對(duì)象,保存了客戶端連接服務(wù)端的信息,包含的屬性比較多,我們選擇幾個(gè)進(jìn)行注釋


ClientCnxn.png

我們看下ClientCnxn最終的構(gòu)造方法

 public ClientCnxn(
        String chrootPath,
        HostProvider hostProvider,
        int sessionTimeout,
        ZooKeeper zooKeeper,
        ClientWatchManager watcher,
        ClientCnxnSocket clientCnxnSocket,
        long sessionId,
        byte[] sessionPasswd,
        boolean canBeReadOnly) {
        this.zooKeeper = zooKeeper;
        this.watcher = watcher;
        this.sessionId = sessionId;
        this.sessionPasswd = sessionPasswd;
        this.sessionTimeout = sessionTimeout;
        this.hostProvider = hostProvider;
        this.chrootPath = chrootPath;

        connectTimeout = sessionTimeout / hostProvider.size();
        readTimeout = sessionTimeout * 2 / 3;
        readOnly = canBeReadOnly;
        //創(chuàng)建SendThread線程,SendThread負(fù)責(zé)IO的處理,clientCnxnSocket在默認(rèn)情況下的實(shí)現(xiàn)是ClientCnxnSocketNIO
        sendThread = new SendThread(clientCnxnSocket);
       //創(chuàng)建EventThread用來(lái)處理各種watcher關(guān)注的事件
        eventThread = new EventThread();
        this.clientConfig = zooKeeper.getClientConfig();
        initRequestTimeout();
    }
ClientCnxn.start()

ClientCnxn.start會(huì)啟動(dòng)SendThread和EventThread

  public void start() {
        sendThread.start();
        eventThread.start();
    }

SendThread

SendThread負(fù)責(zé)處理客戶端的所有IO,我們看下它的run方法

SendThread.run
 public void run() {
            //設(shè)置clientCnxnSocket的sessionId,outgoingQueue屬性
            //注意當(dāng)?shù)谝淮谓⑦B接的時(shí)候由于服務(wù)端的sessionId還沒(méi)有生成,所以為默認(rèn)的0
            clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
            clientCnxnSocket.updateNow(); 
            clientCnxnSocket.updateLastSendAndHeard();
            int to;
            long lastPingRwServer = Time.currentElapsedTime();
             //客戶端向服務(wù)端發(fā)送心跳的頻率,默認(rèn)是10s,但是為了debug我把這個(gè)參數(shù)設(shè)置的很大
            final int MAX_SEND_PING_INTERVAL = 10000000; //10 seconds
            InetSocketAddress serverAddress = null;
           //注意這個(gè)這里是while(xxx),如果連接正常,那么會(huì)一直執(zhí)行下面的邏輯
            while (state.isAlive()) {
                try {
                    //如果客戶端還沒(méi)有和服務(wù)端建立連接,那么進(jìn)入建立連接流程
                    if (!clientCnxnSocket.isConnected()) {
                        // don't re-establish connection if we are closing
                        if (closing) {
                            break;
                        }
                        if (rwServerAddress != null) {
                            serverAddress = rwServerAddress;
                            rwServerAddress = null;
                        } else {
                         //通過(guò)hostProvider去服務(wù)器列表中獲取一個(gè)服務(wù)進(jìn)行連接
                            serverAddress = hostProvider.next(1000);
                        }
                       //建立到服務(wù)端的socket連接,下面會(huì)給出具體的源碼
                        startConnect(serverAddress);
                        clientCnxnSocket.updateLastSendAndHeard();
                    }
                    //如果已經(jīng)建立了到服務(wù)端的連接
                    if (state.isConnected()) {
                        // determine whether we need to send an AuthFailed event.
                        if (zooKeeperSaslClient != null) {
                            boolean sendAuthEvent = false;
                            if (zooKeeperSaslClient.getSaslState() == ZooKeeperSaslClient.SaslState.INITIAL) {
                                try {
                                    zooKeeperSaslClient.initialize(ClientCnxn.this);
                                } catch (SaslException e) {
                                    LOG.error("SASL authentication with Zookeeper Quorum member failed.", e);
                                    state = States.AUTH_FAILED;
                                    sendAuthEvent = true;
                                }
                            }
                            KeeperState authState = zooKeeperSaslClient.getKeeperState();
                            if (authState != null) {
                                if (authState == KeeperState.AuthFailed) {
                                    // An authentication error occurred during authentication with the Zookeeper Server.
                                    state = States.AUTH_FAILED;
                                    sendAuthEvent = true;
                                } else {
                                    if (authState == KeeperState.SaslAuthenticated) {
                                        sendAuthEvent = true;
                                    }
                                }
                            }

                            if (sendAuthEvent) {
                                eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None, authState, null));
                                if (state == States.AUTH_FAILED) {
                                    eventThread.queueEventOfDeath();
                                }
                            }
                        }
                        to = readTimeout - clientCnxnSocket.getIdleRecv();
                    } else {
                       //統(tǒng)計(jì)連接的耗時(shí)
                        to = connectTimeout - clientCnxnSocket.getIdleRecv();
                    }
                     //如果to<0說(shuō)明操作超時(shí)拋出異常
                    if (to <= 0) {
                        String warnInfo = String.format(
                            "Client session timed out, have not heard from server in %dms for session id 0x%s",
                            clientCnxnSocket.getIdleRecv(),
                            Long.toHexString(sessionId));
                        LOG.warn(warnInfo);
                        throw new SessionTimeoutException(warnInfo);
                    }
                  
                    if (state.isConnected()) {
                       //如果已經(jīng)建立了到服務(wù)端的連接,下面是下一次發(fā)送心跳信息到服務(wù)端的時(shí)間點(diǎn)
                        //1000(1 second) is to prevent race condition missing to send the second ping
                        //also make sure not to send too many pings when readTimeout is small
                        int timeToNextPing = readTimeout / 2
                                             - clientCnxnSocket.getIdleSend()
                                             - ((clientCnxnSocket.getIdleSend() > 1000) ? 1000 : 0);
                        //send a ping request either time is due or no packet sent out within MAX_SEND_PING_INTERVAL
                        //如果timeTNextPing<=0說(shuō)明發(fā)送的心跳的時(shí)間到了,亦或者客戶端已經(jīng)過(guò)了MAX_SEND_PING_INTERVAL這么久都沒(méi)有發(fā)送任何消息到服務(wù)端,在上述兩種情況下都需要發(fā)送心跳信息到服務(wù)端
                        if (timeToNextPing <= 0 || clientCnxnSocket.getIdleSend() > MAX_SEND_PING_INTERVAL) {
                            sendPing();
                            clientCnxnSocket.updateLastSend();
                        } else {
                            if (timeToNextPing < to) {
                                to = timeToNextPing;
                            }
                        }
                    }
                   //如果state == States.CONNECTEDREADONLY,看下面的英文解釋
                    // If we are in read-only mode, seek for read/write server
                    if (state == States.CONNECTEDREADONLY) {
                        long now = Time.currentElapsedTime();
                        int idlePingRwServer = (int) (now - lastPingRwServer);
                        if (idlePingRwServer >= pingRwTimeout) {
                            lastPingRwServer = now;
                            idlePingRwServer = 0;
                            pingRwTimeout = Math.min(2 * pingRwTimeout, maxPingRwTimeout);
                            pingRwServer();
                        }
                        to = Math.min(to, pingRwTimeout - idlePingRwServer);
                    }
                  //客戶端處理各種IO事件,這個(gè)我們后面會(huì)詳細(xì)解析
                    clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
                } catch (Throwable e) {
                    if (closing) {
                        // closing so this is expected
                        LOG.warn(
                            "An exception was thrown while closing send thread for session 0x{}.",
                            Long.toHexString(getSessionId()),
                            e);
                        break;
                    } else {
                        LOG.warn(
                            "Session 0x{} for sever {}, Closing socket connection. "
                                + "Attempting reconnect except it is a SessionExpiredException.",
                            Long.toHexString(getSessionId()),
                            serverAddress,
                            e);

                        // At this point, there might still be new packets appended to outgoingQueue.
                        // they will be handled in next connection or cleared up if closed.
                        cleanAndNotifyState();
                    }
                }
            }
         //代碼到這步,說(shuō)明客戶端和服務(wù)端的連接出現(xiàn)了異常
            synchronized (state) {
                // When it comes to this point, it guarantees that later queued
                // packet to outgoingQueue will be notified of death.
                cleanup();
            }
            clientCnxnSocket.close();
            if (state.isAlive()) {
                //向客戶端事件處理線程發(fā)現(xiàn)服務(wù)端連接斷開(kāi)信息
                eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Disconnected, null));
            }
            eventThread.queueEvent(new WatchedEvent(Event.EventType.None, Event.KeeperState.Closed, null));
            ZooTrace.logTraceMessage(
                LOG,
                ZooTrace.getTextTraceLevel(),
                "SendThread exited loop for session: 0x" + Long.toHexString(getSessionId()));
        }

客戶端到服務(wù)端的socket連接

客戶端建立到服務(wù)端的socket發(fā)生在ClientCnxnSocket.connect中

  void connect(InetSocketAddress addr) throws IOException {
       //創(chuàng)建客戶端socketChannel
        SocketChannel sock = createSock();
        try {
           //socketChannel向selector注冊(cè)O(shè)P_CONNECT時(shí)間,同時(shí)
           //socketChannel向遠(yuǎn)程服務(wù)器發(fā)起連接請(qǐng)求
            registerAndConnect(sock, addr);
        } catch (IOException e) {
            LOG.error("Unable to open socket to {}", addr);
            sock.close();
            throw e;
        }
        //連接初始化標(biāo)識(shí)
        initialized = false;

        /*
         * Reset incomingBuffer
         */
       //zookeeper默認(rèn)連接是基于NIO實(shí)現(xiàn),通信消息流分成兩個(gè)部分:消息長(zhǎng)度和消息,消息又分成兩個(gè)部分[消息頭,消息體]
      //消息長(zhǎng)度部分固定為4個(gè)字節(jié)大小用來(lái)標(biāo)識(shí)消息體的長(zhǎng)度,lenBuffer就是用來(lái)表示消息長(zhǎng)度的byteBuffer
     
        lenBuffer.clear();
        incomingBuffer = lenBuffer;
    }

上面客戶端向服務(wù)端發(fā)起了連接請(qǐng)求,之后會(huì)執(zhí)行到ClientCnxnSocketNIO.doTransport方法,這個(gè)方法是客戶端處理IO信息的入口

ClientCnxnSocketNIO.doTransport
 void doTransport(
        int waitTimeOut,
        Queue<Packet> pendingQueue,
        ClientCnxn cnxn) throws IOException, InterruptedException {
       //等待注冊(cè)監(jiān)聽(tīng)事件的發(fā)生
        selector.select(waitTimeOut);
        Set<SelectionKey> selected;
        synchronized (this) {
            selected = selector.selectedKeys();
        }
        // Everything below and until we get back to the select is
        // non blocking, so time is effectively a constant. That is
        // Why we just have to do this once, here
        updateNow();
        for (SelectionKey k : selected) {
            SocketChannel sc = ((SocketChannel) k.channel());
            if ((k.readyOps() & SelectionKey.OP_CONNECT) != 0) {
                 //如果發(fā)生的是OP_CONNECT,那么完成socketChannel的連接
                if (sc.finishConnect()) {
                    updateLastSendAndHeard();
                    updateSocketAddresses();
                   //完成了sessionId建立和認(rèn)證等操作,我們?cè)谙旅鏁?huì)詳細(xì)解析
                    sendThread.primeConnection();
                }
            } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                //如果是發(fā)生的是IO讀寫(xiě)事件執(zhí)行doIO,在下面我們會(huì)詳細(xì)解析
                doIO(pendingQueue, cnxn);
            }
        }
        if (sendThread.getZkState().isConnected()) {
           //如果發(fā)送隊(duì)列outgoingQueue有數(shù)據(jù)那么向selector注冊(cè)O(shè)P_WRITE監(jiān)聽(tīng)
          //多說(shuō)一句,因?yàn)閦ookeeper NIO一次IO寫(xiě)出去的數(shù)據(jù)量有限制,所以在一次doIO完成后還需要判斷outgoingQueue是不是還有數(shù)據(jù)要寫(xiě),如果有那么就設(shè)置OP_WRITE監(jiān)聽(tīng)
            if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
                enableWrite();
            }
        }
      //清空事件
        selected.clear();
    }

當(dāng)客戶端和服務(wù)端建立起socket連接之后,緊接著就是session的建立,
sendThread.primeConnection完成了這一過(guò)程

sendThread.primeConnection

客戶端在完成到服務(wù)端的socket連接建立之后,會(huì)向服務(wù)端發(fā)起建立session會(huì)話的請(qǐng)求,下面就是這一邏輯的實(shí)現(xiàn)

void primeConnection() throws IOException {
            LOG.info(
                "Socket connection established, initiating session, client: {}, server: {}",
                clientCnxnSocket.getLocalSocketAddress(),
                clientCnxnSocket.getRemoteSocketAddress());
            isFirstConnect = false;
            long sessId = (seenRwServerBefore) ? sessionId : 0;
             //初始化創(chuàng)建會(huì)話請(qǐng)求
            ConnectRequest conReq = new ConnectRequest(0, lastZxid, sessionTimeout, sessId, sessionPasswd);
            // We add backwards since we are pushing into the front
            // Only send if there's a pending watch
            // TODO: here we have the only remaining use of zooKeeper in
            // this class. It's to be eliminated!
            if (!clientConfig.getBoolean(ZKClientConfig.DISABLE_AUTO_WATCH_RESET)) {
             //下面很長(zhǎng)的這段代碼是客戶端處理各種watcher發(fā)送到服務(wù)端的情況
                List<String> dataWatches = zooKeeper.getDataWatches();
                List<String> existWatches = zooKeeper.getExistWatches();
                List<String> childWatches = zooKeeper.getChildWatches();
                List<String> persistentWatches = zooKeeper.getPersistentWatches();
                List<String> persistentRecursiveWatches = zooKeeper.getPersistentRecursiveWatches();
                if (!dataWatches.isEmpty() || !existWatches.isEmpty() || !childWatches.isEmpty()
                        || !persistentWatches.isEmpty() || !persistentRecursiveWatches.isEmpty()) {
                    Iterator<String> dataWatchesIter = prependChroot(dataWatches).iterator();
                    Iterator<String> existWatchesIter = prependChroot(existWatches).iterator();
                    Iterator<String> childWatchesIter = prependChroot(childWatches).iterator();
                    Iterator<String> persistentWatchesIter = prependChroot(persistentWatches).iterator();
                    Iterator<String> persistentRecursiveWatchesIter = prependChroot(persistentRecursiveWatches).iterator();
                    long setWatchesLastZxid = lastZxid;

                    while (dataWatchesIter.hasNext() || existWatchesIter.hasNext() || childWatchesIter.hasNext()
                            || persistentWatchesIter.hasNext() || persistentRecursiveWatchesIter.hasNext()) {
                        List<String> dataWatchesBatch = new ArrayList<String>();
                        List<String> existWatchesBatch = new ArrayList<String>();
                        List<String> childWatchesBatch = new ArrayList<String>();
                        List<String> persistentWatchesBatch = new ArrayList<String>();
                        List<String> persistentRecursiveWatchesBatch = new ArrayList<String>();
                        int batchLength = 0;

                        // Note, we may exceed our max length by a bit when we add the last
                        // watch in the batch. This isn't ideal, but it makes the code simpler.
                        while (batchLength < SET_WATCHES_MAX_LENGTH) {
                            final String watch;
                            if (dataWatchesIter.hasNext()) {
                                watch = dataWatchesIter.next();
                                dataWatchesBatch.add(watch);
                            } else if (existWatchesIter.hasNext()) {
                                watch = existWatchesIter.next();
                                existWatchesBatch.add(watch);
                            } else if (childWatchesIter.hasNext()) {
                                watch = childWatchesIter.next();
                                childWatchesBatch.add(watch);
                            }  else if (persistentWatchesIter.hasNext()) {
                                watch = persistentWatchesIter.next();
                                persistentWatchesBatch.add(watch);
                            } else if (persistentRecursiveWatchesIter.hasNext()) {
                                watch = persistentRecursiveWatchesIter.next();
                                persistentRecursiveWatchesBatch.add(watch);
                            } else {
                                break;
                            }
                            batchLength += watch.length();
                        }

                        Record record;
                        int opcode;
                        if (persistentWatchesBatch.isEmpty() && persistentRecursiveWatchesBatch.isEmpty()) {
                            // maintain compatibility with older servers - if no persistent/recursive watchers
                            // are used, use the old version of SetWatches
                            record = new SetWatches(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch, childWatchesBatch);
                            opcode = OpCode.setWatches;
                        } else {
                            record = new SetWatches2(setWatchesLastZxid, dataWatchesBatch, existWatchesBatch,
                                    childWatchesBatch, persistentWatchesBatch, persistentRecursiveWatchesBatch);
                            opcode = OpCode.setWatches2;
                        }
                   //set watcher 請(qǐng)求的header
                        RequestHeader header = new RequestHeader(ClientCnxn.SET_WATCHES_XID, opcode);
                        //把請(qǐng)求頭和請(qǐng)求體封裝成Packet對(duì)象然后放入outgoingQueue中,等待發(fā)送
                        Packet packet = new Packet(header, new ReplyHeader(), record, null, null);
                        outgoingQueue.addFirst(packet);
                    }
                }
            }
           
            for (AuthData id : authInfo) {
              //把客戶端認(rèn)證信息放入outgoingQueue中
                outgoingQueue.addFirst(
                    new Packet(
                        new RequestHeader(ClientCnxn.AUTHPACKET_XID, OpCode.auth),
                        null,
                        new AuthPacket(0, id.scheme, id.data),
                        null,
                        null));
            }
          //最后把連接請(qǐng)求加入到outgoingQueue的頭部,
            outgoingQueue.addFirst(new Packet(null, null, conReq, null, null, readOnly));
            //通過(guò)connectionPrimed向selector注冊(cè)op_read和op_write事件
            clientCnxnSocket.connectionPrimed();
            LOG.debug("Session establishment request sent on {}", clientCnxnSocket.getRemoteSocketAddress());
        }

上面的發(fā)送的連接請(qǐng)求就會(huì)觸發(fā)ClientCnxnSocketNIO.doIO方法

ClientCnxnSocketNIO.doIO

客戶端處理IO事件的方法,這個(gè)方法也是很長(zhǎng)請(qǐng)大家耐心看完,我都耐心的分析完了,我想讀者應(yīng)該更有耐心讀完

void doIO(Queue<Packet> pendingQueue, ClientCnxn cnxn) throws InterruptedException, IOException {
       //從SelectionKey中獲取SocketChannel
        SocketChannel sock = (SocketChannel) sockKey.channel();
        if (sock == null) {
            throw new IOException("Socket is null!");
        }
        //如果是可讀事件發(fā)生
        if (sockKey.isReadable()) {
           //從socket中讀取消息
            int rc = sock.read(incomingBuffer);
            if (rc < 0) {
                throw new EndOfStreamException("Unable to read additional data from server sessionid 0x"
                                               + Long.toHexString(sessionId)
                                               + ", likely server has closed socket");
            }
            
            if (!incomingBuffer.hasRemaining()) {
              //數(shù)據(jù)讀取完成,設(shè)置byteBuffer狀態(tài)準(zhǔn)備讀
                incomingBuffer.flip();
   
                if (incomingBuffer == lenBuffer) {
                  //incomingBuffer等于lenBuffer說(shuō)讀取的消息長(zhǎng)度信息
                    recvCount.getAndIncrement();
                     //獲取到了消息的長(zhǎng)度,那么就初始化一個(gè)相應(yīng)長(zhǎng)度的bytebuffer,為讀取消息做準(zhǔn)備
                    readLength();
                } else if (!initialized) {
                   //如果連接還沒(méi)有初始化,說(shuō)明session會(huì)話還沒(méi)建立完成,
                   //那么通過(guò)readConnectResult來(lái)處理服務(wù)端發(fā)送來(lái)的ConnectResponse,下面我們會(huì)解析
                    readConnectResult();
                    //注冊(cè)op_read事件
                    enableRead();
                    //下面同樣是根據(jù)outgoingQueue的狀態(tài)設(shè)置op_write信息
                    if (findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress()) != null) {
                        // Since SASL authentication has completed (if client is configured to do so),
                        // outgoing packets waiting in the outgoingQueue can now be sent.
                        enableWrite();
                    }
                    //下面是一些設(shè)置和清理操作
                    lenBuffer.clear();
                    incomingBuffer = lenBuffer;
                    updateLastHeard();
                    initialized = true;
                } else {
                    //上面是讀取消息長(zhǎng)度的過(guò)程,下面就是讀取消息體過(guò)程,
                   //readResponse下面會(huì)詳細(xì)解析
                    sendThread.readResponse(incomingBuffer);
                   //重置lenBuffer
                    lenBuffer.clear();
                   //設(shè)置incomingBuffer = lenBuffer 為下次讀取做準(zhǔn)備
                    incomingBuffer = lenBuffer;
                    updateLastHeard();
                }
            }
        }
        //下面是處理寫(xiě)事件的過(guò)程
        if (sockKey.isWritable()) {
            //從outgoingQueue中獲取第一個(gè)待發(fā)送的消息
            Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());

            if (p != null) {
                updateLastSend();
                // If we already started writing p, p.bb will already exist
                if (p.bb == null) {
                    if ((p.requestHeader != null)
                        && (p.requestHeader.getType() != OpCode.ping)
                        && (p.requestHeader.getType() != OpCode.auth)) {
                        //如果請(qǐng)求不是ping和auth類(lèi)型,那么客戶端為了保證請(qǐng)求按照順序處理,會(huì)在requestHeader中設(shè)置xid,xid在客戶端按照自增的形式產(chǎn)生
                        p.requestHeader.setXid(cnxn.getXid());
                    }
                   //把請(qǐng)求消息對(duì)象轉(zhuǎn)換成byteBuffer,下面會(huì)解析
                    p.createBB();
                }
               //把消息通過(guò)socket發(fā)送給服務(wù)端
                sock.write(p.bb);
                if (!p.bb.hasRemaining()) {
                   //如果一個(gè)消息被一次性的發(fā)送了,那么從outgoingQueue把這個(gè)消息刪除,如果一次write io操作沒(méi)有把一個(gè)消息寫(xiě)完,那么這個(gè)消息會(huì)繼續(xù)存在outgoingQueue中等待下一次write io 繼續(xù)寫(xiě)出去
                    sentCount.getAndIncrement();
                    outgoingQueue.removeFirstOccurrence(p);
                    if (p.requestHeader != null
                        && p.requestHeader.getType() != OpCode.ping
                        && p.requestHeader.getType() != OpCode.auth) {
                        //如果發(fā)送的請(qǐng)求不是ping和auth類(lèi)型的,那么這個(gè)請(qǐng)求需要等待服務(wù)端的response,把該請(qǐng)求放入pendingQueue中
                        synchronized (pendingQueue) {
                            pendingQueue.add(p);
                        }
                    }
                }
            }
            if (outgoingQueue.isEmpty()) {
                //如果outgoingQueue中的所有消息都發(fā)送了,那么取消對(duì)op_write的監(jiān)控
                // No more packets to send: turn off write interest flag.
                // Will be turned on later by a later call to enableWrite(),
                // from within ZooKeeperSaslClient (if client is configured
                // to attempt SASL authentication), or in either doIO() or
                // in doTransport() if not.
                disableWrite();
            } else if (!initialized && p != null && !p.bb.hasRemaining()) {
                // On initial connection, write the complete connect request
                // packet, but then disable further writes until after
                // receiving a successful connection response.  If the
                // session is expired, then the server sends the expiration
                // response and immediately closes its end of the socket.  If
                // the client is simultaneously writing on its end, then the
                // TCP stack may choose to abort with RST, in which case the
                // client would never receive the session expired event.  See
                // http://docs.oracle.com/javase/6/docs/technotes/guides/net/articles/connection_release.html
      //簡(jiǎn)單來(lái)說(shuō)就是在初始連接成功后但是很快session超時(shí)了,這個(gè)時(shí)候服務(wù)端會(huì)給客戶端發(fā)送session超時(shí)事件同時(shí)關(guān)閉socket連接,如果與此同時(shí)客戶端發(fā)送消息給服務(wù)端,會(huì)導(dǎo)致TCP的RST狀態(tài)從而導(dǎo)致客戶端收不到session 超時(shí)的消息。故而在連接沒(méi)有完成的情況下initialized=false,客戶端取消對(duì)op_write的監(jiān)聽(tīng)
                disableWrite();
            } else {
                // Just in case
               //就像注釋一樣,以防萬(wàn)一,outgoingQueue還有數(shù)據(jù)繼續(xù)注冊(cè)op_write監(jiān)聽(tīng)
                enableWrite();
            }
        }
    }

上面分析了doIO的邏輯,還有幾個(gè)小點(diǎn)需要交待,我們先看下消息是如何轉(zhuǎn)化成ByteBuffer的
我們先看消息對(duì)象


Packet.png

對(duì)于發(fā)送端來(lái)說(shuō),Packet類(lèi)有兩個(gè)屬性,請(qǐng)求頭requestHeader和請(qǐng)求體request
這個(gè)兩個(gè)屬性的數(shù)據(jù)會(huì)轉(zhuǎn)化成ByteBuffer類(lèi)型的bb,下面我們就分析這個(gè)過(guò)程

 public void createBB() {
            try {
               //創(chuàng)建輸出流
                ByteArrayOutputStream baos = new ByteArrayOutputStream();
                BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
              //消息流的前4個(gè)字節(jié)是全部消息的長(zhǎng)度,但是目前還沒(méi)確定,所以先初始化成-1
                boa.writeInt(-1, "len"); // We'll fill this in later
                if (requestHeader != null) {
                   //requestHeader序列化到消息輸出流中,requestHeader會(huì)序列化兩個(gè)屬性:客戶端事物id(xid),請(qǐng)求類(lèi)型碼type,其實(shí)這個(gè)對(duì)象序列化結(jié)果也是固定長(zhǎng)度[ 4(xid) +  4(type) = 8個(gè)字節(jié) ]
                    requestHeader.serialize(boa, "header");
                }
                if (request instanceof ConnectRequest) {
                    //如果請(qǐng)求是創(chuàng)建會(huì)話連接,序列化ConnectRequest到消息輸出流
                    request.serialize(boa, "connect");
                    // append "am-I-allowed-to-be-readonly" flag
                    boa.writeBool(readOnly, "readOnly");
                } else if (request != null) {
                //序列化請(qǐng)求到消息輸出流
                    request.serialize(boa, "request");
                }
         
                baos.close();
                //把消息輸出流轉(zhuǎn)化成ByteBuffer
                this.bb = ByteBuffer.wrap(baos.toByteArray());
              //設(shè)置消息的長(zhǎng)度
                this.bb.putInt(this.bb.capacity() - 4);
               //為寫(xiě)做準(zhǔn)備
                this.bb.rewind();
            } catch (IOException e) {
                LOG.warn("Unexpected exception", e);
            }
        }

zookeeper使用自帶的JUTE作為序列化實(shí)現(xiàn),有興趣的可以去研究下
上面解析了消息發(fā)送時(shí)的結(jié)構(gòu),接來(lái)下我們分析客戶端處理會(huì)話創(chuàng)建完成的response

SendThread.readConnectResult
void readConnectResult() throws IOException {
        if (LOG.isTraceEnabled()) {
            StringBuilder buf = new StringBuilder("0x[");
            for (byte b : incomingBuffer.array()) {
                buf.append(Integer.toHexString(b)).append(",");
            }
            buf.append("]");
            if (LOG.isTraceEnabled()) {
                LOG.trace("readConnectResult {} {}", incomingBuffer.remaining(), buf.toString());
            }
        }
       //通過(guò)ByteBuffer創(chuàng)建輸入流
        ByteBufferInputStream bbis = new ByteBufferInputStream(incomingBuffer);
        BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
        ConnectResponse conRsp = new ConnectResponse();
        //反序列化ConnectResponse
        conRsp.deserialize(bbia, "connect");

        // read "is read-only" flag
        boolean isRO = false;
        try {
            isRO = bbia.readBool("readOnly");
        } catch (IOException e) {
            // this is ok -- just a packet from an old server which
            // doesn't contain readOnly field
            LOG.warn("Connected to an old server; r-o mode will be unavailable");
        }
         //從反序列化的ConnectResponse對(duì)象中獲得sessionId
        this.sessionId = conRsp.getSessionId();
       //向eventThread發(fā)送連接結(jié)果的通知
        sendThread.onConnected(conRsp.getTimeOut(), this.sessionId, conRsp.getPasswd(), isRO);
    }

到此zookeeper 完成了客戶端的啟動(dòng),客戶端啟動(dòng)包含了socket連接建立和session建立的過(guò)程,下圖對(duì)上面過(guò)程做了簡(jiǎn)短的概述


連接創(chuàng)建.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容