Zookeeper源碼分析3-Watcher機(jī)制

Watcher 的基本流程

ZooKeeper 的 Watcher 機(jī)制,總的來(lái)說(shuō)可以分為三個(gè)過(guò)程:客戶端注冊(cè) Watcher、服務(wù)器處理 Watcher 和客戶端回調(diào)Watcher
客戶端注冊(cè) watcher 有 3 種方式,getData、exists、getChildren;


image.png

以如下代碼為例來(lái)分析整個(gè)觸發(fā)機(jī)制的原理

基于 zkclient 客戶端發(fā)起一個(gè)數(shù)據(jù)操作

<dependency>
  <groupId>com.101tec</groupId>
  <artifactId>zkclient</artifactId>
  <version>0.10</version>
</dependency>
public static void main(String[] args) throws KeeperException, InterruptedException, IOException {
        ZooKeeper zookeeper = new ZooKeeper("192.168.13.102:2181", 4000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("event.type" + event.getType());
            }
        });
        zookeeper.create("/watch", "0".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); //創(chuàng)建節(jié)點(diǎn)
        zookeeper.exists("/watch", true); //注冊(cè)監(jiān)聽(tīng)
        Thread.sleep(1000);
        zookeeper.setData("/watch", "1".getBytes(), -1); //修改節(jié)點(diǎn)的值觸發(fā)監(jiān)聽(tīng)
        System.in.read();
    }

ZooKeeper API 的初始化過(guò)程

    ZooKeeper zookeeper = new ZooKeeper("192.168.13.102:2181", 4000, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                System.out.println("event.type" + event.getType());
            }
        });

在創(chuàng)建一個(gè) ZooKeeper 客戶端對(duì)象實(shí)例時(shí),我們通過(guò) new Watcher()向構(gòu)造方法中傳入一個(gè)默認(rèn)的 Watcher, 這個(gè)Watcher 將作為整個(gè) ZooKeeper 會(huì)話期間的默認(rèn) Watcher,會(huì)一直被保存在客戶端 ZKWatchManager 的 defaultWatcher 中

    public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,
                     boolean canBeReadOnly, HostProvider aHostProvider,
                     ZKClientConfig clientConfig) throws IOException {
        LOG.info("Initiating client connection, connectString=" + connectString
                + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
        if (clientConfig == null) {
            clientConfig = new ZKClientConfig();
        }
        this.clientConfig = clientConfig;
        watchManager = defaultWatchManager();
        //在這里將 watcher 設(shè)置到 ZKWatchManager
        watchManager.defaultWatcher = watcher;
        ConnectStringParser connectStringParser = new ConnectStringParser(
                connectString);
        hostProvider = aHostProvider;
        //初始化了 ClientCnxn,并且調(diào)用 cnxn.start()方法
        cnxn = new ClientCnxn(connectStringParser.getChrootPath(),
                hostProvider, sessionTimeout, this, watchManager,
                getClientCnxnSocket(), canBeReadOnly);
        cnxn.start();
    }

ClientCnxn:是 Zookeeper 客戶端和 Zookeeper 服務(wù)器端進(jìn)行通信和事件通知處理的主要類(lèi),它內(nèi)部包含兩個(gè)類(lèi):

    1. SendThread :負(fù)責(zé)客戶端和服務(wù)器端的數(shù)據(jù)通信, 也包括事件信息的傳輸
    1. EventThread : 主要在客戶端回調(diào)注冊(cè)的 Watchers 進(jìn)行通知處理

ClientCnxn 初始化

 public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper,
                      ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket,
                      long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) {
        this.zooKeeper = zooKeeper;
        this.watcher = watcher;
        this.sessionId = sessionId;
        this.sessionPasswd = sessionPasswd;
        this.sessionTimeout = sessionTimeout;
        this.hostProvider = hostProvider;
        this.chrootPath = chrootPath;
        connectTimeout = sessionTimeout / hostProvider.size();
        readTimeout = sessionTimeout * 2 / 3;
        readOnly = canBeReadOnly;
        //初始化 sendThread
        sendThread = new SendThread(clientCnxnSocket);
        //初始化 eventThread
        eventThread = new EventThread();
        this.clientConfig = zooKeeper.getClientConfig();
    }

    public void start() {
        //啟動(dòng)兩個(gè)線程
        sendThread.start();
        eventThread.start();
    }

服務(wù)端接收請(qǐng)求處理流程

NIOServerCnxn

服務(wù)端有一個(gè) NIOServerCnxn 類(lèi),用來(lái)處理客戶端發(fā)送過(guò)來(lái)的請(qǐng)求

ZookeeperServer-zks.processPacket(this, bb)

處理客戶端傳送過(guò)來(lái)的數(shù)據(jù)包

 public void processPacket(ServerCnxn cnxn, ByteBuffer incomingBuffer) throws IOException {
        // We have the request, now process and setup for next
        InputStream bais = new ByteBufferInputStream(incomingBuffer);
        BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
        RequestHeader h = new RequestHeader();
        h.deserialize(bia, "header"); //反序列化客戶端 header 頭信息
        // Through the magic of byte buffers, txn will not be
        // pointing
        // to the start of the txn
        incomingBuffer = incomingBuffer.slice();
        if (h.getType() == OpCode.auth) { //判斷當(dāng)前操作類(lèi)型,如果是 auth 操作,則執(zhí)行下面的代碼
            LOG.info("got auth packet " + cnxn.getRemoteSocketAddress());
            AuthPacket authPacket = new AuthPacket();
            ByteBufferInputStream.byteBuffer2Record(incomingBuffer, authPacket);
            String scheme = authPacket.getScheme();
            ServerAuthenticationProvider ap = ProviderRegistry.getServerProvider(scheme);
            Code authReturn = KeeperException.Code.AUTHFAILED;
            if (ap != null) {
                try {
                    authReturn = ap.handleAuthentication(new ServerAuthenticationProvider.ServerObjs(this, cnxn), authPacket.getAuth());
                } catch (RuntimeException e) {
                    LOG.warn("Caught runtime exception from AuthenticationProvider: " + scheme + " dueto" + e);
                            authReturn = KeeperException.Code.AUTHFAILED;
                }
            }
            if (authReturn == KeeperException.Code.OK) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Authentication succeeded for scheme: " + scheme);
                }
                LOG.info("auth success " + cnxn.getRemoteSocketAddress());
                ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
                        KeeperException.Code.OK.intValue());
                cnxn.sendResponse(rh, null, null);
            } else {
                if (ap == null) {
                    LOG.warn("No authentication provider for scheme: "
                            + scheme + " has "
                            + ProviderRegistry.listProviders());
                } else {
                    LOG.warn("Authentication failed for scheme: " + scheme);
                }
                // send a response...
                ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
                        KeeperException.Code.AUTHFAILED.intValue());
                cnxn.sendResponse(rh, null, null);
                // ... and close connection
                cnxn.sendBuffer(ServerCnxnFactory.closeConn);
                cnxn.disableRecv();
            }
            return;
        } else { //如果不是授權(quán)操作,再判斷是否為 sasl 操作
            if (h.getType() == OpCode.sasl) {
                Record rsp = processSasl(incomingBuffer, cnxn);
                ReplyHeader rh = new ReplyHeader(h.getXid(), 0, KeeperException.Code.OK.intValue());
                cnxn.sendResponse(rh, rsp, "response"); // not sure about 3rd arg..what is it?
                return;
            } else {//最終進(jìn)入這個(gè)代碼塊進(jìn)行處理
                //封裝請(qǐng)求對(duì)象
                Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(),
                        h.getType(), incomingBuffer, cnxn.getAuthInfo());
                si.setOwner(ServerCnxn.me);
                // Always treat packet from the client as a possible
                // local request.
                setLocalSessionFlag(si);
                submitRequest(si); //提交請(qǐng)求
            }
        }
        cnxn.incrOutstandingRequests(h);
    }

submitRequest

負(fù)責(zé)在服務(wù)端提交當(dāng)前請(qǐng)求

public void submitRequest(Request si) {
        if (firstProcessor == null) { //processor 處理器,request 過(guò)來(lái)以后會(huì)經(jīng)歷一系列處理器的處理過(guò)程
            synchronized (this) {
                try {
                    while (state == State.INITIAL) {
                        wait(1000);
                    }
                } catch (InterruptedException e) {
                    LOG.warn("Unexpected interruption", e);
                }
                if (firstProcessor == null || state != State.RUNNING) {
                    throw new RuntimeException("Not started");
                }
            }
        }
        try {
            touch(si.cnxn);
            boolean validpacket = Request.isValid(si.type); //判斷是否合法
            if (validpacket) {
                //調(diào)用 firstProcessor 發(fā)起請(qǐng)求,而這個(gè) firstProcess 是一個(gè)接口,有多個(gè)實(shí)現(xiàn)類(lèi)
                firstProcessor.processRequest(si);
                if (si.cnxn != null) {
                    incInProcess();
                }
            } else {
                LOG.warn("Received packet at server of unknown type " + si.type);
                new UnimplementedRequestProcessor().processRequest(si);
            }
        } catch (MissingSessionException e) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Dropping request: " + e.getMessage());
            }
        } catch (RequestProcessorException e) {
            LOG.error("Unable to process request:" + e.getMessage(), e);
        }
    }

firstProcessor 的請(qǐng)求鏈組成

firstProcessor 的初始化是在 ZookeeperServer 的 setupRequestProcessor 中完成的,代碼如下

 protected void setupRequestProcessors() {
        RequestProcessor finalProcessor = new FinalRequestProcessor(this);
        RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
        ((SyncRequestProcessor) syncProcessor).start();
        firstProcessor = new PrepRequestProcessor(this, syncProcessor);//需要注意的是,PrepRequestProcessor 中傳遞的是一個(gè) syncProcessor
                ((PrepRequestProcessor) firstProcessor).start();
    }

從上面我們可以看到 firstProcessor 的實(shí)例是一個(gè) PrepRequestProcessor,而這個(gè)構(gòu)造方法中又傳遞了一個(gè) Processor構(gòu)成了一個(gè)調(diào)用鏈。
RequestProcessor syncProcessor = new SyncRequestProcessor(this, finalProcessor);
而 syncProcessor 的構(gòu)造方法傳遞的又是一個(gè) Processor,對(duì)應(yīng)的是 FinalRequestProcessor

所以整個(gè)調(diào)用鏈?zhǔn)?PrepRequestProcessor -> SyncRequestProcessor ->FinalRequestProcessor

PredRequestProcessor.processRequest(si);

通過(guò)上面了解到調(diào)用鏈關(guān)系以后,我們繼續(xù)再看 firstProcessor.processRequest(si); 會(huì)調(diào)用到 PrepRequestProcessor

  public void processRequest(Request request) {
        submittedRequests.add(request);
    }

這里processRequest 只是把 request 添加到 submittedRequests 中,根據(jù)前面的經(jīng)驗(yàn),很自然的想到這里又是一個(gè)異步操作。而 subittedRequests 又是一個(gè)阻塞隊(duì)列

LinkedBlockingQueue submittedRequests = new LinkedBlockingQueue();

而 PrepRequestProcessor 這個(gè)類(lèi)又繼承了線程類(lèi),因此我們直接找到當(dāng)前類(lèi)中的 run 方法如下

 public void run() {
        try {
            while (true) {
                Request request = submittedRequests.take(); //ok,從隊(duì)列中拿到請(qǐng)求進(jìn)行處理
                long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
                if (request.type == OpCode.ping) {
                    traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
                }
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logRequest(LOG, traceMask, 'P', request, "");
                }
                if (Request.requestOfDeath == request) {
                    break;
                }
                pRequest(request); //調(diào)用 pRequest 進(jìn)行預(yù)處理
            }
        } catch (RequestProcessorException e) {
            if (e.getCause() instanceof XidRolloverException) {
                LOG.info(e.getCause().getMessage());
            }
            handleException(this.getName(), e);
        } catch (Exception e) {
            handleException(this.getName(), e);
        }
        LOG.info("PrepRequestProcessor exited loop!");
    }

pRequest

預(yù)處理這塊的代碼太長(zhǎng),就不好貼了。前面的 N 行代碼都是根據(jù)當(dāng)前的 OP 類(lèi)型進(jìn)行判斷和做相應(yīng)的處理,在這個(gè)方法中的最后一行中,我們會(huì)看到如下代碼:

nextProcessor.processRequest(request);

SyncRequestProcessor. processRequest

 public void processRequest(Request request) {
        // request.addRQRec(">sync");
        queuedRequests.add(request);
    }

這個(gè)方法的代碼也是一樣,基于異步化的操作,把請(qǐng)求添加到 queuedRequets 中,那么我們繼續(xù)在當(dāng)前類(lèi)找到 run 方法

  public void run() {
        try {
            int logCount = 0;
            // we do this in an attempt to ensure that not all of the servers
            // in the ensemble take a snapshot at the same time
            int randRoll = r.nextInt(snapCount / 2);
            while (true) {
                Request si = null;
                //從阻塞隊(duì)列中獲取請(qǐng)求
                if (toFlush.isEmpty()) {
                    si = queuedRequests.take();
                } else {
                    si = queuedRequests.poll();
                    if (si == null) {
                        flush(toFlush);
                        continue;
                    }
                }
                if (si == requestOfDeath) {
                    break;
                }
                if (si != null) {
                    // track the number of records written to the log
                    //下面這塊代碼,粗略看來(lái)是觸發(fā)快照操作,啟動(dòng)一個(gè)處理快照的線程
                    if (zks.getZKDatabase().append(si)) {
                        logCount++;
                        if (logCount > (snapCount / 2 + randRoll)) {
                            randRoll = r.nextInt(snapCount / 2);
                            // roll the log
                            zks.getZKDatabase().rollLog();
                            // take a snapshot
                            if (snapInProcess != null && snapInProcess.isAlive()) {
                                LOG.warn("Too busy to snap, skipping");
                            } else {
                                snapInProcess = new ZooKeeperThread("Snapshot Thread") {
                                    public void run() {
                                        try {
                                            zks.takeSnapshot();
                                        } catch (Exception e) {
                                            LOG.warn("Unexpected exception", e);
                                        }
                                    }
                                };
                                snapInProcess.start();
                            }
                            logCount = 0;
                        }
                    } else if (toFlush.isEmpty()) {
                        if (nextProcessor != null) {
                            nextProcessor.processRequest(si); //繼續(xù)調(diào)用下一個(gè)處理器來(lái)處理請(qǐng)求
                            if (nextProcessor instanceof Flushable) {
                                ((Flushable) nextProcessor).flush();
                            }
                        }
                        continue;
                    }
                    toFlush.add(si);
                    if (toFlush.size() > 1000) {
                        flush(toFlush);
                    }
                }
            }
        } catch (Throwable t) {
            handleException(this.getName(), t);
        } finally {
            running = false;
        }
        LOG.info("SyncRequestProcessor exited!");
    }

FinalRequestProcessor. processRequest

這個(gè)方法就是我們?cè)谡n堂上分析到的方法了,F(xiàn)inalRequestProcessor.processRequest 方法并根據(jù) Request 對(duì)象中的操作更新內(nèi)存中 Session 信息或者 znode 數(shù)據(jù)。
這塊代碼有小 300 多行,就不全部貼出來(lái)了,我們直接定位到關(guān)鍵代碼,根據(jù)客戶端的 OP 類(lèi)型找到如下的代碼:

  case OpCode.exists: {
        lastOp = "EXIS";
        // TODO we need to figure out the security requirement for this!
        ExistsRequest existsRequest = new ExistsRequest();
        //反序列化 (將 ByteBuffer 反序列化成為 ExitsRequest.這個(gè)就是我們?cè)诳蛻舳税l(fā)起請(qǐng)求的時(shí)候傳遞過(guò)來(lái)的 Request 對(duì)象
        ByteBufferInputStream.byteBuffer2Record(request.request, existsRequest);
        String path = existsRequest.getPath(); //得到請(qǐng)求的路徑
        if (path.indexOf('\0') != -1) {
            throw new KeeperException.BadArgumentsException();
        }
        //終于找到一個(gè)很關(guān)鍵的代碼,判斷請(qǐng)求的 getWatch 是否存在,如果存在,則傳遞 cnxn(servercnxn)
        //對(duì)于 exists 請(qǐng)求,需要監(jiān)聽(tīng) data 變化事件,添加 watcher 
        Stat stat = zks.getZKDatabase().statNode(path, existsRequest.getWatch() ? cnxn : null);
        rsp = new ExistsResponse(stat); //在服務(wù)端內(nèi)存數(shù)據(jù)庫(kù)中根據(jù)路徑得到結(jié)果進(jìn)行組裝,設(shè)置為 ExistsResponse
        break;
    }

客戶端接收服務(wù)端處理完成的響應(yīng)

ClientCnxnSocketNetty.messageReceived

服務(wù)端處理完成以后,會(huì)通過(guò) NettyServerCnxn.sendResponse 發(fā)送返回的響應(yīng)信息,
客戶端會(huì)在 ClientCnxnSocketNetty.messageReceived 接收服務(wù)端的返回

 public void messageReceived(ChannelHandlerContext ctx,
                                MessageEvent e) throws Exception {
        updateNow();
        ChannelBuffer buf = (ChannelBuffer) e.getMessage();
        while (buf.readable()) {
            if (incomingBuffer.remaining() > buf.readableBytes()) {
                int newLimit = incomingBuffer.position()
                        + buf.readableBytes();
                incomingBuffer.limit(newLimit);
            }
            buf.readBytes(incomingBuffer);
            incomingBuffer.limit(incomingBuffer.capacity());
            if (!incomingBuffer.hasRemaining()) {
                incomingBuffer.flip();
                if (incomingBuffer == lenBuffer) {
                    recvCount++;
                    readLength();
                } else if (!initialized) {
                    readConnectResult();
                    lenBuffer.clear();
                    incomingBuffer = lenBuffer;
                    initialized = true;
                    updateLastHeard();
                } else {
                    //收到消息以后觸發(fā) SendThread.readResponse方法
                    sendThread.readResponse(incomingBuffer);
                    lenBuffer.clear();
                    incomingBuffer = lenBuffer;
                    updateLastHeard();
                }
            }
        }
        wakeupCnxn();
    }

SendThread. readResponse

這個(gè)方法里面主要的流程如下:

  • 首先讀取 header,如果其 xid == -2,表明是一個(gè) ping 的 response,return
  • 如果 xid 是 -4 ,表明是一個(gè) AuthPacket 的 response return
  • 如果 xid 是 -1,表明是一個(gè) notification,此時(shí)要繼續(xù)讀取并構(gòu)造一個(gè) enent,通過(guò) EventThread.queueEvent 發(fā)送,return

其它情況下:
從 pendingQueue 拿出一個(gè) Packet,校驗(yàn)后更新 packet 信息

 void readResponse(ByteBuffer incomingBuffer) throws IOException {
        ByteBufferInputStream bbis = new ByteBufferInputStream(
                incomingBuffer);
        BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
        ReplyHeader replyHdr = new ReplyHeader();
        replyHdr.deserialize(bbia, "header"); //反序列化 header
        if (replyHdr.getXid() == -2) { //? 
            // -2 is the xid for pings
            if (LOG.isDebugEnabled()) {
                LOG.debug("Got ping response for sessionid: 0x"
                        + Long.toHexString(sessionId)
                        + " after "
                        + ((System.nanoTime() - lastPingSentNs) / 1000000)
                        + "ms");
            }
            return;
        }
        if (replyHdr.getXid() == -4) {
            // -4 is the xid for AuthPacket 
            if (replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
                state = States.AUTH_FAILED;
                eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
                        Watcher.Event.KeeperState.AuthFailed, null));

            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Got auth sessionid:0x"
                        + Long.toHexString(sessionId));
            }
            return;
        }
        if (replyHdr.getXid() == -1) { //表示當(dāng)前的消息類(lèi)型為一個(gè) notification(意味著是服務(wù)端的一個(gè)響應(yīng)事件)
            // -1 means notification
            if (LOG.isDebugEnabled()) {
                LOG.debug("Got notification sessionid:0x"
                        + Long.toHexString(sessionId));
            }
            WatcherEvent event = new WatcherEvent();//?
            event.deserialize(bbia, "response"); //反序列化響應(yīng)信息
            // convert from a server path to a client path
            if (chrootPath != null) {
                String serverPath = event.getPath();
                if (serverPath.compareTo(chrootPath) == 0)
                    event.setPath("/");
                else if (serverPath.length() > chrootPath.length())
                    event.setPath(serverPath.substring(chrootPath.length()));
                else {
                    LOG.warn("Got server path " + event.getPath()
                            + " which is too short for chroot path "
                            + chrootPath);
                }
            }
            WatchedEvent we = new WatchedEvent(event);
            if (LOG.isDebugEnabled()) {
                LOG.debug("Got " + we + " for sessionid 0x"
                        + Long.toHexString(sessionId));
            }
            eventThread.queueEvent(we);
            return;
        }
        // If SASL authentication is currently in progress, construct and
        // send a response packet immediately, rather than queuing a
        // response as with other packets.
        if (tunnelAuthInProgress()) {
            GetSASLRequest request = new GetSASLRequest();
            request.deserialize(bbia, "token");
            zooKeeperSaslClient.respondToServer(request.getToken(),
                    ClientCnxn.this);
            return;
        }
        Packet packet;
        synchronized (pendingQueue) {
            if (pendingQueue.size() == 0) {
                throw new IOException("Nothing in the queue, but got "
                        + replyHdr.getXid());
            }
            packet = pendingQueue.remove(); //因?yàn)楫?dāng)前這個(gè)數(shù)據(jù)包已經(jīng)收到了響應(yīng),所以講它從 pendingQueued 中移除
        }
        /*
         * Since requests are processed in order, we better get a response
         * to the first request!
         */
        try {//校驗(yàn)數(shù)據(jù)包信息,校驗(yàn)成功后講數(shù)據(jù)包信息進(jìn)行更新(替換為服務(wù)端的信息)
            if (packet.requestHeader.getXid() != replyHdr.getXid()) {
                packet.replyHeader.setErr(
                        KeeperException.Code.CONNECTIONLOSS.intValue());
                throw new IOException("Xid out of order. Got Xid "
                        + replyHdr.getXid() + " with err " +
                        +replyHdr.getErr() +
                        " expected Xid "
                        + packet.requestHeader.getXid()
                        + " for a packet with details: "
                        + packet);
            }
            packet.replyHeader.setXid(replyHdr.getXid());
            packet.replyHeader.setErr(replyHdr.getErr());
            packet.replyHeader.setZxid(replyHdr.getZxid());
            if (replyHdr.getZxid() > 0) {
                lastZxid = replyHdr.getZxid();
            }
            if (packet.response != null && replyHdr.getErr() == 0) {
                packet.response.deserialize(bbia, "response"); //獲得服務(wù)端的響應(yīng),反序列化以后設(shè)置到 p
                acket.response 屬性中。所以我們可以在 exists 方法的最后一行通過(guò) packet.response 拿到改請(qǐng)求的返回結(jié)果
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Reading reply sessionid:0x"
                        + Long.toHexString(sessionId) + ", packet:: " + packet);
            }
        } finally {
            finishPacket(packet); //最后調(diào)用 finishPacket 方法完成處理
        }
    }

finishPacket 方法

主要功能是把從 Packet 中取出對(duì)應(yīng)的 Watcher 并注冊(cè)到 ZKWatchManager 中去

 private void finishPacket(Packet p) {
        int err = p.replyHeader.getErr();
        if (p.watchRegistration != null) {
            p.watchRegistration.register(err); //將事件注冊(cè)到 zkwatchemanager 中
            // watchRegistration,熟悉嗎?在組裝請(qǐng)求的時(shí)候,我們初始化了這個(gè)對(duì)象
            //把 watchRegistration 子類(lèi)里面的 Watcher 實(shí)例放到 ZKWatchManager 的 existsWatches 中存儲(chǔ)起來(lái)。
        }
        //將所有移除的監(jiān)視事件添加到事件隊(duì)列, 這樣客戶端能收到 “data/child 事件被移除”的事件類(lèi)型
        if (p.watchDeregistration != null) {
            Map<EventType, Set<Watcher>> materializedWatchers = null;
            try {
                materializedWatchers = p.watchDeregistration.unregister(err);
                for (Entry<EventType, Set<Watcher>> entry : materializedWatchers.entrySet()) {
                    Set<Watcher> watchers = entry.getValue();
                    if (watchers.size() > 0) {
                        queueEvent(p.watchDeregistration.getClientPath(), err,
                                watchers, entry.getKey());
                        p.replyHeader.setErr(Code.OK.intValue());
                    }
                }
            } catch (KeeperException.NoWatcherException nwe) {
                p.replyHeader.setErr(nwe.code().intValue());
            } catch (KeeperException ke) {
                p.replyHeader.setErr(ke.code().intValue());
            }
        }
        //cb 就是 AsnycCallback,如果為 null,表明是同步調(diào)用的接口,不需要異步回掉,因此,直接 notifyAll 即可。
        if (p.cb == null) {
            synchronized (p) {
                p.finished = true;
                p.notifyAll();
            }
        } else {
            p.finished = true;
            eventThread.queuePacket(p);
        }
    }

watchRegistration

 public void register(int rc) {
        if (shouldAddWatch(rc)) {
            Map<String, Set<Watcher>> watches = getWatches(rc); // //通過(guò)子類(lèi)的實(shí)現(xiàn)取得 ZKWatchManager 中的 existsWatches
            synchronized(watches) {
                Set<Watcher> watchers = watches.get(clientPath);
                if (watchers == null) {
                    watchers = new HashSet<Watcher>();
                    watches.put(clientPath, watchers);
                }
                watchers.add(watcher); //將 Watcher 對(duì)象放到 ZKWatchManager 中的 existsWatches 里面
            }
        }
    }

下面這段代碼是客戶端存儲(chǔ) watcher 的幾個(gè) map 集合,分別對(duì)應(yīng)三種注冊(cè)監(jiān)聽(tīng)事件

  static class ZKWatchManager implements ClientWatchManager {
        private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>();
        private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>();
        private final Map<String, Set<Watcher>> childWatches = new HashMap<String, Set<Watcher>>();
    }

總的來(lái)說(shuō),當(dāng)使用 ZooKeeper 構(gòu)造方法或者使用 getData、exists 和getChildren 三個(gè)接口來(lái)向 ZooKeeper 服務(wù)器注冊(cè) Watcher 的時(shí)候,首先將此消息傳遞給服務(wù)端,傳遞成功后,服務(wù)端會(huì)通知客戶端,然后客戶端將該路徑和 Watcher 對(duì)應(yīng)關(guān)系存儲(chǔ)起來(lái)備用。

EventThread.queuePacket()

finishPacket 方法最終會(huì)調(diào)用 eventThread.queuePacket, 講當(dāng)前的數(shù)據(jù)包添加到等待事件通知的隊(duì)列中

  public void queuePacket(Packet packet) {
        if (wasKilled) {
            synchronized (waitingEvents) {
                if (isRunning) waitingEvents.add(packet);
                else processEvent(packet);
            }
        } else {
            waitingEvents.add(packet);
        }
    }

事件觸發(fā)

前面這么長(zhǎng)的說(shuō)明,只是為了清洗的說(shuō)明事件的注冊(cè)流程,最終的觸發(fā),還得需要通過(guò)事務(wù)型操作來(lái)完成事件的觸發(fā)

zookeeper.setData(“/wei”, “1”.getByte(),-1) ; //修改節(jié)點(diǎn)的值觸發(fā)監(jiān)聽(tīng)

前面的客戶端和服務(wù)端對(duì)接的流程就不再重復(fù)講解了,交互流程是一樣的,唯一的差別在于事件觸發(fā)了

服務(wù)端的事件響應(yīng) DataTree.setData()

public Stat setData(String path, byte data[], int version, long zxid,
                        long time) throws KeeperException.NoNodeException {
        Stat s = new Stat();
        DataNode n = nodes.get(path);
        if (n == null) {
            throw new KeeperException.NoNodeException();
        }
        byte lastdata[] = null;
        synchronized (n) {
            lastdata = n.data;
            n.data = data;
            n.stat.setMtime(time);
            n.stat.setMzxid(zxid);
            n.stat.setVersion(version);
            n.copyStat(s);
        }
        // now update if the path is in a quota subtree.
        String lastPrefix = getMaxPrefixWithQuota(path);
        if (lastPrefix != null) {
            this.updateBytes(lastPrefix, (data == null ? 0 : data.length)
                    - (lastdata == null ? 0 : lastdata.length));
        }
        dataWatches.triggerWatch(path, EventType.NodeDataChanged); //觸發(fā)對(duì)應(yīng)節(jié)點(diǎn)的 NodeDataChanged 事件
        return s;
    }

WatcherManager. triggerWatch

Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
        WatchedEvent e = new WatchedEvent(type, KeeperState.SyncConnected, path); // 根據(jù)事件類(lèi)型、連接狀態(tài)、節(jié)點(diǎn)路徑創(chuàng)建 WatchedEvent
        HashSet<Watcher> watchers;
        synchronized (this) {
            watchers = watchTable.remove(path); // 從 watcher 表中移除 path,并返回其對(duì)應(yīng)的 watcher 集合
            if (watchers == null || watchers.isEmpty()) {
                if (LOG.isTraceEnabled()) {
                    ZooTrace.logTraceMessage(LOG,
                            ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                            "No watchers for " + path);
                }
                return null;
            }
            for (Watcher w : watchers) { // 遍歷 watcher 集合
                HashSet<String> paths = watch2Paths.get(w); // 根據(jù) watcher 從 watcher 表中取出路徑集合
                if (paths != null) {
                    paths.remove(path); //移除路徑
                }
            }
        }
        for (Watcher w : watchers) { // 遍歷 watcher 集合
            if (supress != null && supress.contains(w)) {
                continue;
            }
            w.process(e); //OK,重點(diǎn)又來(lái)了,w.process 是做什么呢?
        }
        return watchers;
    }

w.process(e);

還記得我們?cè)诜?wù)端綁定事件的時(shí)候,watcher 綁定是是什么?是 ServerCnxn, 所以 w.process(e),其實(shí)調(diào)用的應(yīng)該是ServerCnxn 的 process 方法。而 servercnxn 又是一個(gè)抽象方法,有兩個(gè)實(shí)現(xiàn)類(lèi),分別是:NIOServerCnxn 和NettyServerCnxn。那接下來(lái)我們扒開(kāi) NIOServerCnxn 這個(gè)類(lèi)的 process 方法看看究竟

   public void process(WatchedEvent event) {
        ReplyHeader h = new ReplyHeader(-1, -1L, 0);
        if (LOG.isTraceEnabled()) {
            ZooTrace.logTraceMessage(LOG, ZooTrace.EVENT_DELIVERY_TRACE_MASK,
                    "Deliver event " + event + " to 0x"
                            + Long.toHexString(this.sessionId)
                            + " through " + this);
        }
        // Convert WatchedEvent to a type that can be sent over the wire
        WatcherEvent e = event.getWrapper();
        try {
            sendResponse(h, e, "notification"); // 這個(gè)地方發(fā)送了一個(gè)事件,事件對(duì)象為 WatcherEvent
        } catch (IOException e1) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Problem sending to " + getRemoteSocketAddress(), e1);
            }
            close();
        }
    }

那接下里,客戶端會(huì)收到這個(gè) response,觸發(fā) SendThread.readResponse 方法

客戶端處理事件響應(yīng)

SendThread.readResponse

這塊代碼上面已經(jīng)貼過(guò)了,所以我們只挑選當(dāng)前流程的代碼進(jìn)行講解,按照前面我們將到過(guò)的,notifacation 通知消息的 xid 為-1,意味著~直接找到-1 的判斷進(jìn)行分析

  void readResponse(ByteBuffer incomingBuffer) throws IOException {
        ByteBufferInputStream bbis = new ByteBufferInputStream(
                incomingBuffer);
        BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis);
        ReplyHeader replyHdr = new ReplyHeader();
        replyHdr.deserialize(bbia, "header");
        if (replyHdr.getXid() == -2) { //?
            // -2 is the xid for pings
            if (LOG.isDebugEnabled()) {
                LOG.debug("Got ping response for sessionid: 0x"
                        + Long.toHexString(sessionId)
                        + " after "
                        + ((System.nanoTime() - lastPingSentNs) / 1000000)
                        + "ms");
            }
            return;
        }
        if (replyHdr.getXid() == -4) {
            // -4 is the xid for AuthPacket
            if (replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) {
                state = States.AUTH_FAILED;
                eventThread.queueEvent(new WatchedEvent(Watcher.Event.EventType.None,
                        Watcher.Event.KeeperState.AuthFailed, null));

            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Got auth sessionid:0x"
                        + Long.toHexString(sessionId));
            }
            return;
        }
        if (replyHdr.getXid() == -1) {
            // -1 means notification
            if (LOG.isDebugEnabled()) {
                LOG.debug("Got notification sessionid:0x"
                        + Long.toHexString(sessionId));
            }
            WatcherEvent event = new WatcherEvent();
            event.deserialize(bbia, "response"); //這個(gè)地方,是反序列化服務(wù)端的 WatcherEvent 事件。
            // convert from a server path to a client path
            if (chrootPath != null) {
                String serverPath = event.getPath();
                if (serverPath.compareTo(chrootPath) == 0)
                    event.setPath("/");
                else if (serverPath.length() > chrootPath.length())
                    event.setPath(serverPath.substring(chrootPath.length()));
                else {
                    LOG.warn("Got server path " + event.getPath()
                            + " which is too short for chroot path "
                            + chrootPath);
                }
            }
            WatchedEvent we = new WatchedEvent(event); //組裝 watchedEvent 對(duì)象。
            if (LOG.isDebugEnabled()) {
                LOG.debug("Got " + we + " for sessionid 0x"
                        + Long.toHexString(sessionId));
            }
            eventThread.queueEvent(we); //通過(guò) eventTherad 進(jìn)行事件處理
            return;
        }
        // If SASL authentication is currently in progress, construct and
        // send a response packet immediately, rather than queuing a
        // response as with other packets.
        if (tunnelAuthInProgress()) {
            GetSASLRequest request = new GetSASLRequest();
            request.deserialize(bbia, "token");
            zooKeeperSaslClient.respondToServer(request.getToken(),
                    ClientCnxn.this);
            return;
        }
        Packet packet;
        synchronized (pendingQueue) {
            if (pendingQueue.size() == 0) {
                throw new IOException("Nothing in the queue, but got "
                        + replyHdr.getXid());
            }
            packet = pendingQueue.remove();
        }
        /*
         * Since requests are processed in order, we better get a response
         * to the first request!
         */
        try {
            if (packet.requestHeader.getXid() != replyHdr.getXid()) {
                packet.replyHeader.setErr(
                        KeeperException.Code.CONNECTIONLOSS.intValue());
                throw new IOException("Xid out of order. Got Xid "
                        + replyHdr.getXid() + " with err " +
                        +replyHdr.getErr() +
                        " expected Xid "
                        + packet.requestHeader.getXid()
                        + " for a packet with details: "
                        + packet);
            }
            packet.replyHeader.setXid(replyHdr.getXid());
            packet.replyHeader.setErr(replyHdr.getErr());
            packet.replyHeader.setZxid(replyHdr.getZxid());
            if (replyHdr.getZxid() > 0) {
                lastZxid = replyHdr.getZxid();
            }
            if (packet.response != null && replyHdr.getErr() == 0) {
                packet.response.deserialize(bbia, "response");
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("Reading reply sessionid:0x"
                        + Long.toHexString(sessionId) + ", packet:: " + packet);
            }
        } finally {
            finishPacket(packet);
        }
    }

eventThread.queueEvent

SendThread 接收到服務(wù)端的通知事件后,會(huì)通過(guò)調(diào)用 EventThread 類(lèi)的 queueEvent 方法將事件傳給 EventThread 線程,queueEvent 方法根據(jù)該通知事件,從 ZKWatchManager 中取出所有相關(guān)的 Watcher,如果獲取到相應(yīng)的 Watcher,就會(huì)讓 Watcher 移除失效。

  private void queueEvent(WatchedEvent event, Set<Watcher> materializedWatchers) {
        if (event.getType() == EventType.None && sessionState == event.getState()) { //判斷類(lèi)型
            return;
        }
        sessionState = event.getState();
        final Set<Watcher> watchers;
        if (materializedWatchers == null) {
            // materialize the watchers based on the event
            watchers = watcher.materialize(event.getState(),
                    event.getType(), event.getPath());
        } else {
            watchers = new HashSet<Watcher>();
            watchers.addAll(materializedWatchers);
        }
        //封裝 WatcherSetEventPair 對(duì)象,添加到 waitngEvents 隊(duì)列中
        WatcherSetEventPair pair = new WatcherSetEventPair(watchers, event);
        // queue the pair (watch set & event) for later processing
        waitingEvents.add(pair);
    }

Meterialize 方法

通過(guò) dataWatches 或者 existWatches 或者 childWatches 的 remove 取出對(duì)應(yīng)的 watch,表明客戶端 watch 也是注冊(cè)一次就移除
同時(shí)需要根據(jù) keeperState、eventType 和 path 返回應(yīng)該被通知的 Watcher 集合

 public Set<Watcher> materialize(Watcher.Event.KeeperState state,
                                    Watcher.Event.EventType type,
                                    String clientPath) {
        Set<Watcher> result = new HashSet<Watcher>();
        switch (type) {
            case None:
                result.add(defaultWatcher);
                boolean clear = disableAutoWatchReset && state != Watcher.Event.KeeperState.SyncConnect
                ed;
                synchronized (dataWatches) {
                    for (Set<Watcher> ws : dataWatches.values()) {
                        result.addAll(ws);
                    }
                    if (clear) {
                        dataWatches.clear();
                    }
                }
                synchronized (existWatches) {
                    for (Set<Watcher> ws : existWatches.values()) {
                        result.addAll(ws);
                    }
                    if (clear) {
                        existWatches.clear();
                    }
                }
                synchronized (childWatches) {
                    for (Set<Watcher> ws : childWatches.values()) {
                        result.addAll(ws);
                    }
                    if (clear) {
                        childWatches.clear();
                    }
                }
                return result;
            case NodeDataChanged:
            case NodeCreated:
                synchronized (dataWatches) {
                    addTo(dataWatches.remove(clientPath), result);
                }
                synchronized (existWatches) {
                    addTo(existWatches.remove(clientPath), result);
                }
                break;
            case NodeChildrenChanged:
                synchronized (childWatches) {
                    addTo(childWatches.remove(clientPath), result);
                }
                break;
            case NodeDeleted:
                synchronized (dataWatches) {
                    addTo(dataWatches.remove(clientPath), result);
                }
                // XXX This shouldn't be needed, but just in case
                synchronized (existWatches) {
                    Set<Watcher> list = existWatches.remove(clientPath);
                    if (list != null) {
                        addTo(existWatches.remove(clientPath), result);
                        LOG.warn("We are triggering an exists watch for delete! Shouldn't happen!");
                    }
                }
                synchronized (childWatches) {
                    addTo(childWatches.remove(clientPath), result);
                }
                break;
            default:
                String msg = "Unhandled watch event type " + type
                        + " with state " + state + " on path " + clientPath;
                LOG.error(msg);
                throw new RuntimeException(msg);
        }
        return result;
    }

waitingEvents.add

最后一步,接近真相了
waitingEvents 是 EventThread 這個(gè)線程中的阻塞隊(duì)列,很明顯,又是在我們第一步操作的時(shí)候?qū)嵗囊粋€(gè)線程。從名字可以指導(dǎo),waitingEvents 是一個(gè)待處理 Watcher 的隊(duì)列,EventThread 的 run() 方法會(huì)不斷從隊(duì)列中取數(shù)據(jù),交由 processEvent 方法處理:

  public void run() {
        try {
            isRunning = true;
            while (true) { //死循環(huán)
                Object event = waitingEvents.take(); //從待處理的事件隊(duì)列中取出事件
                if (event == eventOfDeath) {
                    wasKilled = true;
                } else {
                    processEvent(event); //執(zhí)行事件處理
                }
                if (wasKilled)
                    synchronized (waitingEvents) {
                        if (waitingEvents.isEmpty()) {
                            isRunning = false;
                            break;
                        }
                    }
            }
        } catch (InterruptedException e) {
            LOG.error("Event thread exiting due to interruption", e);
        }
        LOG.info("EventThread shut down for session: 0x{}",
                Long.toHexString(getSessionId()));
    }

ProcessEvent

由于這塊的代碼太長(zhǎng),只把核心的代碼貼出來(lái),這里就是處理事件觸發(fā)的核心代碼

    private void processEvent(Object event) {
        try {
            if (event instanceof WatcherSetEventPair) { //判斷事件類(lèi)型
                // each watcher will process the event
                WatcherSetEventPair pair = (WatcherSetEventPair) event; //得到 watcherseteventPair
                for (Watcher watcher : pair.watchers) { //拿到符合觸發(fā)機(jī)制的所有 watcher 列表,循環(huán)進(jìn)行調(diào)用
                    try {
                        watcher.process(pair.event); //調(diào)用客戶端的回調(diào) process,就是會(huì)調(diào)用開(kāi)篇寫(xiě)的new Watcher()的實(shí)現(xiàn)方法System.out.println("event.type" + event.getType());
                    } catch (Throwable t) {
                        LOG.error("Error while calling watcher ", t);
                    }
                }
            }

服務(wù)端接收數(shù)據(jù)請(qǐng)求

服務(wù)端收到的數(shù)據(jù)包應(yīng)該在哪里呢?
zookeeper 啟動(dòng)的時(shí)候,通過(guò)下面的代碼構(gòu)建了一個(gè)
ServerCnxnFactory cnxnFactory = ServerCnxnFactory.createFactory();
NIOServerCnxnFactory,它實(shí)現(xiàn)了 Thread,所以在啟動(dòng)的時(shí)候,會(huì)在 run 方法中不斷循環(huán)接收客戶端的請(qǐng)求進(jìn)行分發(fā)

NIOServerCnxnFactory.run

 public void run() {
        while (!ss.socket().isClosed()) {
            try {
                for (SelectionKey k : selectedList) {
                    // 獲取 client 的連接請(qǐng)求  
                    if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) {

                    } else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) {
                        //處理客戶端的讀/寫(xiě)請(qǐng)求
                        NIOServerCnxn c = (NIOServerCnxn) k.attachment();
                        c.doIO(k);//處理 IO 操作
                    } else {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Unexpected ops in select "
                                    + k.readyOps());
                        }
                    }
                }
                selected.clear();
            } catch (RuntimeException e) {
                LOG.warn("Ignoring unexpected runtime exception", e);
            } catch (Exception e) {
                LOG.warn("Ignoring exception", e);
            }
        }
        closeAll();
        LOG.info("NIOServerCnxn factory exited run method");
    }

NIOServerCnxn.doIO

 void doIO(SelectionKey k) {
        try {
        //省略部分代碼..
        if (k.isReadable()) {//處理讀請(qǐng)求,表示接收
            //中間這部分邏輯用來(lái)處理報(bào)文以及粘包問(wèn)題
            if (isPayload) { // not the case for 4letterword
                readPayload();//處理報(bào)文
            }
            else {
                // four letter words take care
                // need not do anything else
                return;
            }
        }
    }

NIOServerCnxn.readRequest

讀取客戶端的請(qǐng)求,進(jìn)行具體的處理

private void readRequest() throws IOException {
        zkServer.processPacket(this, incomingBuffer);
    }

ZookeeperServer.processPacket

這個(gè)方法根據(jù)數(shù)據(jù)包的類(lèi)型來(lái)處理不同的數(shù)據(jù)包,對(duì)于讀寫(xiě)請(qǐng)求,我們主要關(guān)注下面這塊代碼即可

   Request si = new Request(cnxn, cnxn.getSessionId(), h.getXid(), h.getType(), incomingBuffer, cnxn.getAuthInfo());
    si.setOwner(ServerCnxn.me);
    submitRequest(si);

ZookeeperServer.submitRequest

提交請(qǐng)求,這里面涉及到一個(gè) firstProcessor. 這個(gè)是一個(gè)責(zé)任鏈模式,如果當(dāng)前請(qǐng)求發(fā)到了 Leader 服務(wù)器,那么這條鏈路的設(shè)置過(guò)程應(yīng)該在LeaderZookeeperServer. setupRequestProcessors。那么它所組成的責(zé)任鏈為:

leader 節(jié)點(diǎn)請(qǐng)求流程

image.png

prepRequestProcessor,對(duì)于事務(wù)請(qǐng)求進(jìn)行一些列的預(yù)處理,比如創(chuàng)建請(qǐng)求事務(wù)頭、會(huì)話檢查、ACl 檢查等.
接下來(lái)把請(qǐng)求傳遞給 ProposalRequestProcessor,它負(fù)責(zé)準(zhǔn)備 proposal 并且發(fā)送給follower 節(jié)點(diǎn),另外,還會(huì)把事務(wù)請(qǐng)求轉(zhuǎn)發(fā)給 SyncRequestProcessor。
SynRequestProcessor 負(fù)責(zé)把事務(wù)持久化到磁盤(pán),并且會(huì)觸發(fā)一個(gè)AckRequestProcessor,它會(huì)產(chǎn)生一個(gè) ack 給自己。leader 會(huì)等待集群中的每個(gè)節(jié)點(diǎn)的ack,包括自己的。
CommitRequestProcessor 負(fù)責(zé)提交 proposal。前提是如果收到足夠多的 ack 時(shí)。
ToBeAppliedRequestProcessor,主要是為了維護(hù) Leader 類(lèi)的 toBeApplied 隊(duì)列,這個(gè)隊(duì)列中保存了已經(jīng)完成投票(即 commit)的 proposal,但是這些 proposal 還沒(méi)有應(yīng)用到本機(jī)的內(nèi)存中FinalRequestProcessos 是責(zé)任鏈中的最后一個(gè),主要負(fù)責(zé)把已經(jīng) commit 的寫(xiě)操作應(yīng)用到本機(jī),對(duì)于讀操作則從本機(jī)中讀取數(shù)據(jù)并返回給 client

follower 節(jié)點(diǎn)請(qǐng)求流程

如果當(dāng)前請(qǐng)求發(fā)送到了 Follow 服務(wù)器,那么這條鏈路的設(shè)置過(guò)程應(yīng)該在
FollowerZooKeeperServer. setupRequestProcessors。那么它所組成的責(zé)任鏈為


image.png

首先 FollowRequestProcessor 接收并處理 client 的請(qǐng)求,并把請(qǐng)求轉(zhuǎn)發(fā)到CommitRequestProcessor,此外還轉(zhuǎn)發(fā)寫(xiě)請(qǐng)求到 leader,還把讀請(qǐng)求直接轉(zhuǎn)發(fā)到FinalRequestProcessor。寫(xiě)請(qǐng)求則不一樣,CommitRequestProcessor 在寫(xiě)請(qǐng)求在FinalRequestProcessor 上 commit 之前保持等待狀態(tài)

集群模式下的處理流程

集群模式下,涉及到 zab 協(xié)議,所以處理流程比較復(fù)雜,可以基于這個(gè)圖來(lái)定位代碼的流程


image.png

——學(xué)自咕泡學(xué)院

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容