Netty源碼_AbstractChannel和ChannelOutboundBuffer詳解

一. AbstractChannel

1.1 構(gòu)造方法

    /**
     * 創(chuàng)建一個(gè)新實(shí)例。
     */
    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        // 創(chuàng)建
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

    /**
     * 創(chuàng)建一個(gè)新實(shí)例。
     */
    protected AbstractChannel(Channel parent, ChannelId id) {
        this.parent = parent;
        this.id = id;
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

可以看出在構(gòu)造方法中,就綁定了這個(gè)通道的四個(gè)成員變量 parent,id,unsafe,pipeline

    protected ChannelId newId() {
        return DefaultChannelId.newInstance();
    }
    protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
    }
    /**
     * 由子類(lèi)來(lái)實(shí)現(xiàn),創(chuàng)建對(duì)應(yīng)的 Unsafe 類(lèi)型實(shí)例
     */
    protected abstract AbstractUnsafe newUnsafe();
  • idpipeline都是直接創(chuàng)建,默認(rèn)是 DefaultChannelIdDefaultChannelPipeline 類(lèi)型。
  • newUnsafe() 是抽樣方法,有子類(lèi)才能創(chuàng)建對(duì)應(yīng)的 Unsafe 類(lèi)型實(shí)例。

1.2 ChannelOutboundInvoker 接口方法

Channel 還繼承了 ChannelOutboundInvoker 接口,也就是說(shuō)通道是可以發(fā)送出站 IO 操作的。

 @Override
    public ChannelFuture bind(SocketAddress localAddress) {
        return pipeline.bind(localAddress);
    }

    @Override
    public ChannelFuture connect(SocketAddress remoteAddress) {
        return pipeline.connect(remoteAddress);
    }

    @Override
    public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
        return pipeline.connect(remoteAddress, localAddress);
    }

    @Override
    public ChannelFuture disconnect() {
        return pipeline.disconnect();
    }

    @Override
    public ChannelFuture close() {
        return pipeline.close();
    }

    @Override
    public ChannelFuture deregister() {
        return pipeline.deregister();
    }
  @Override
    public Channel flush() {
        pipeline.flush();
        return this;
    }

    @Override
    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return pipeline.bind(localAddress, promise);
    }
    ........

你會(huì)發(fā)現(xiàn)基本上都是調(diào)用 ChannelPipeline 對(duì)應(yīng)的方法。

  • 也就是說(shuō)直接調(diào)用通道Channel 的發(fā)送出站IO事件的方法,和調(diào)用管道pipeline() 發(fā)送出站IO事件的方法是一樣的。
  • 根據(jù) DefaultChannelPipeline 的分析,我們知道這些出站 IO 事件最后都會(huì)調(diào)用到該通道的 Unsafe 屬性對(duì)應(yīng)方法進(jìn)行處理。

1.3 抽樣方法

AbstractChannel 還有幾個(gè)需要子類(lèi)實(shí)現(xiàn)抽樣方法,由子類(lèi)提供不同的處理邏輯:

  1. AbstractUnsafe newUnsafe()

    不同類(lèi)型的 Channel有自己特定的 Unsafe 類(lèi)型。

  2. boolean isCompatible(EventLoop loop)

    判斷給定的事件輪詢(xún)器 EventLoop 和當(dāng)前的通道類(lèi)型是不是兼容。每種類(lèi)型的通道Channel 都有自己特定的事件輪詢(xún)器。

  3. SocketAddress localAddress0()SocketAddress remoteAddress0()

    通道綁定的本地地址和通道連接的遠(yuǎn)程地址。

  4. void doBind(SocketAddress localAddress)

    進(jìn)行綁定操作,每種類(lèi)型的通道綁定處理是不一樣的。

  5. void doDisconnect()

    進(jìn)行連接操作。

  6. void doClose()

    進(jìn)行關(guān)閉連接操作。

  7. void doBeginRead()

    將通道設(shè)為開(kāi)始讀操作。

  8. void doWrite(ChannelOutboundBuffer in)

    進(jìn)行寫(xiě)操作。

AbstractChannel 真正重點(diǎn)的操作都是在 AbstractUnsafe 中實(shí)現(xiàn)的啊,下面講解 AbstractUnsafe。

二. AbstractUnsafe 類(lèi)

2.1 成員屬性

        // 寫(xiě)緩沖區(qū) ChannelOutboundBuffer
        private volatile ChannelOutboundBuffer outboundBuffer = new ChannelOutboundBuffer(AbstractChannel.this);

        // 用于在接收數(shù)據(jù)時(shí)分配緩存區(qū) ByteBuf
        private RecvByteBufAllocator.Handle recvHandle;

        // 當(dāng)前是否正在刷新數(shù)據(jù),防止重復(fù)刷新數(shù)據(jù)
        private boolean inFlush0;

        // 如果通道從未被注冊(cè),則為true,否則為false
        private boolean neverRegistered = true;

2.2 注冊(cè) register

        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            ObjectUtil.checkNotNull(eventLoop, "eventLoop");

            if (isRegistered()) {
                // 當(dāng)前通道已經(jīng)注冊(cè),失敗,調(diào)用 promise 的setFailure方法進(jìn)行通知
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                // 這個(gè)事件輪詢(xún)器和當(dāng)前通道不兼容,失敗,調(diào)用 promise 的setFailure方法進(jìn)行通知
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                // 當(dāng)前線(xiàn)程就是通道 事件輪詢(xún)器線(xiàn)程,直接調(diào)用 register0 方法
                register0(promise);
            } else {
                try {
                    // 通過(guò) eventLoop.execute 方法,
                    // 保證 register0 方法在通道事件輪詢(xún)器線(xiàn)程中調(diào)用
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    // 發(fā)生異常,要關(guān)閉通道,并進(jìn)行相關(guān)通知
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }

        private void register0(ChannelPromise promise) {
            try {
                // 檢查通道是否仍然打開(kāi)
                // 當(dāng)注冊(cè)操作在 eventLoop 線(xiàn)程之外調(diào)用的話(huà),
                // 有可能這時(shí)通道被別的線(xiàn)程關(guān)閉了
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                // 第一次注冊(cè)
                boolean firstRegistration = neverRegistered;
                // 調(diào)用 AbstractChannel 的 doRegister 方法,進(jìn)行注冊(cè)操作
                doRegister();
                neverRegistered = false;
                // 設(shè)置 AbstractChannel 的 registered 成員屬性,表示已經(jīng)注冊(cè)
                registered = true;

                // 確保在通道未注冊(cè)前添加到管道上的 ChannelHandler 的 handlerAdded(…) 也會(huì)被調(diào)用
                // 這是必需的,因?yàn)橛脩?hù)可能已經(jīng)通過(guò)ChannelFutureListener中的管道觸發(fā)了事件。
                pipeline.invokeHandlerAddedIfNeeded();

                // 注冊(cè)成功的通知
                safeSetSuccess(promise);
                // 發(fā)送注冊(cè) 入站IO事件
                pipeline.fireChannelRegistered();
                // 只有在通道從未注冊(cè)的情況下才觸發(fā) channelActive 事件。
                // 這可以防止在通道被取消注冊(cè)和重新注冊(cè)時(shí)觸發(fā)多個(gè)通道 channelActive 事件。
                if (isActive()) {
                    if (firstRegistration) {
                        // 第一次注冊(cè)時(shí),才會(huì)發(fā)送 channelActive 事件
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        // 此通道在注冊(cè)之前,設(shè)置了 autoRead()。
                        // 這意味著我們需要重新設(shè)置開(kāi)始讀取操作,以便介紹入站數(shù)據(jù)。

                        // See https://github.com/netty/netty/issues/4805
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                // 發(fā)生異常,要關(guān)閉通道,并進(jìn)行相關(guān)通知
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

將通道注冊(cè)到事件輪詢(xún)器EventLoop 上:

  • 如果當(dāng)前通道已經(jīng)注冊(cè),或者當(dāng)前通道和事件輪詢(xún)器不兼容,那么注冊(cè)失敗,調(diào)用 promisesetFailure方法進(jìn)行通知。
  • 保證在事件輪詢(xún)器線(xiàn)程調(diào)用實(shí)際注冊(cè)register0方法。
  • 調(diào)用 AbstractChanneldoRegister 方法,進(jìn)行注冊(cè)操作,發(fā)送注冊(cè)事件。
  • 如果通道已活躍,第一次注冊(cè)的時(shí)候,就會(huì)發(fā)送 channelActive 事件;
  • 如果不是,那么就可能設(shè)置開(kāi)始讀的操作。
  • 如果這期間發(fā)生異常,就關(guān)閉通道,并進(jìn)行相關(guān)通知。

2.3 取消注冊(cè) deregister

        @Override
        public final void deregister(final ChannelPromise promise) {
            assertEventLoop();

            deregister(promise, false);
        }

        private void deregister(final ChannelPromise promise, final boolean fireChannelInactive) {
            // 如果 promise不是 不可取消的,那么直接返回
            if (!promise.setUncancellable()) {
                return;
            }

            if (!registered) {
                // 當(dāng)前通道沒(méi)有注冊(cè),那么也表示取消注冊(cè)成功,進(jìn)行成功通知
                safeSetSuccess(promise);
                return;
            }

            // As a user may call deregister() from within any method while doing processing in the ChannelPipeline,
            // we need to ensure we do the actual deregister operation later. This is needed as for example,
            // we may be in the ByteToMessageDecoder.callDecode(...) method and so still try to do processing in
            // the old EventLoop while the user already registered the Channel to a new EventLoop. Without delay,
            // the deregister operation this could lead to have a handler invoked by different EventLoop and so
            // threads.

            // See:
            // https://github.com/netty/netty/issues/4435

            // 通過(guò) invokeLater 方法,將 doDeregister() 方法放在下一個(gè)事件輪詢(xún)周期進(jìn)行
            invokeLater(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 調(diào)用 AbstractChannel 的 doDeregister 方法,進(jìn)行取消注冊(cè)操作
                        doDeregister();
                    } catch (Throwable t) {
                        logger.warn("Unexpected exception occurred while deregistering a channel.", t);
                    } finally {
                        if (fireChannelInactive) {
                            pipeline.fireChannelInactive();
                        }
                        // Some transports like local and AIO does not allow the deregistration of
                        // an open channel.  Their doDeregister() calls close(). Consequently,
                        // close() calls deregister() again - no need to fire channelUnregistered, so check
                        // if it was registered.
                        if (registered) {
                            // 如果通道之前是注冊(cè)成功了,
                            // 這里才發(fā)送取消注冊(cè)的 IO 事件
                            registered = false;
                            pipeline.fireChannelUnregistered();
                        }
                        // 取消綁定成功通知
                        safeSetSuccess(promise);
                    }
                }
            });
        }

重點(diǎn)就是調(diào)用AbstractChanneldoDeregister 方法,進(jìn)行取消注冊(cè)操作。
如果 fireChannelInactive == true,將發(fā)送 ChannelInactive 事件。

2.4 綁定 bind

       @Override
        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
            assertEventLoop();
            // 檢查通道是否仍然打開(kāi)
            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }

            // See: https://github.com/netty/netty/issues/576
            if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
                localAddress instanceof InetSocketAddress &&
                !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
                !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
                // Warn a user about the fact that a non-root user can't receive a
                // broadcast packet on *nix if the socket is bound on non-wildcard address.
                logger.warn(
                        "A non-root user can't receive a broadcast packet if the socket " +
                        "is not bound to a wildcard address; binding to a non-wildcard " +
                        "address (" + localAddress + ") anyway as requested.");
            }

            boolean wasActive = isActive();
            try {
                // 調(diào)用 AbstractChannel 的 doBind 方法,進(jìn)行綁定操作
                doBind(localAddress);
            } catch (Throwable t) {
                // 綁定失敗的通知
                safeSetFailure(promise, t);

                // doBind(localAddress) 方法有可能關(guān)閉這個(gè)通道,
                // 就可能需要進(jìn)行關(guān)閉通道的通知
                closeIfClosed();
                return;
            }

           // 如果綁定操作后,通道從不活躍變成活躍,就要發(fā)送 ChannelActive 事件
            if (!wasActive && isActive()) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireChannelActive();
                    }
                });
            }
            // 綁定成功的通知
            safeSetSuccess(promise);
        }
  • 這個(gè)方法邏輯比較簡(jiǎn)單,重點(diǎn)就是調(diào)用AbstractChanneldoBind方法,進(jìn)行綁定操作。
  • 如果綁定操作后,通道從不活躍變成活躍,就要發(fā)送 ChannelActive 事件。

2.5 取消連接 disconnect

        @Override
        public final void disconnect(final ChannelPromise promise) {
            assertEventLoop();
            // 如果 promise不是 不可取消的,那么直接返回
            if (!promise.setUncancellable()) {
                return;
            }

            boolean wasActive = isActive();
            try {
                doDisconnect();
                // 重置 remoteAddress and localAddress
                remoteAddress = null;
                localAddress = null;
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                // doDisconnect() 方法有可能關(guān)閉這個(gè)通道,
                // 就可能需要進(jìn)行關(guān)閉通道的通知
                closeIfClosed();
                return;
            }

            // 如果取消連接后,通道從活躍變成不活躍,就要發(fā)送 ChannelInactive 事件
            if (wasActive && !isActive()) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireChannelInactive();
                    }
                });
            }

            safeSetSuccess(promise);
            // doDisconnect() 方法有可能關(guān)閉這個(gè)通道,
            // 就可能需要進(jìn)行關(guān)閉通道的通知
            closeIfClosed();
        }
  • 這個(gè)方法邏輯比較簡(jiǎn)單,重點(diǎn)就是調(diào)用AbstractChanneldoDisconnect()方法,進(jìn)行取消連接操作。
  • 如果取消連接操作成功后,通道從活躍變成不活躍,就要發(fā)送 ChannelInactive 事件。

2.6關(guān)閉 close

        public void close(final ChannelPromise promise) {
            assertEventLoop();

            ClosedChannelException closedChannelException =
                    StacklessClosedChannelException.newInstance(AbstractChannel.class, "close(ChannelPromise)");
            close(promise, closedChannelException, closedChannelException, false);
        }

     private void close(final ChannelPromise promise, final Throwable cause,
                           final ClosedChannelException closeCause, final boolean notify) {
            // 如果 promise不是 不可取消的,那么直接返回
            if (!promise.setUncancellable()) {
                return;
            }

            if (closeInitiated) {
                // closeInitiated == true,已經(jīng)調(diào)用過(guò)關(guān)閉操作了,就要return 返回了。
                if (closeFuture.isDone()) {
                    // 已經(jīng)通道已經(jīng)關(guān)閉了,通知 promise
                    safeSetSuccess(promise);
                } else if (!(promise instanceof VoidChannelPromise)) { // Only needed if no VoidChannelPromise.
                    // 當(dāng)前通道正在關(guān)閉,那么就添加一個(gè)監(jiān)聽(tīng)器,當(dāng)關(guān)閉成功后,再通知 promise
                    closeFuture.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            promise.setSuccess();
                        }
                    });
                }
                return;
            }

            // 保證關(guān)閉方法只調(diào)用一次,不能重復(fù)調(diào)用
            closeInitiated = true;

            final boolean wasActive = isActive();

            final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            // 禁止再向?qū)懢彌_區(qū) outboundBuffer 添加任何消息和刷新操作。
            this.outboundBuffer = null;
            Executor closeExecutor = prepareToClose();
            if (closeExecutor != null) {
                closeExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            // Execute the close.
                            doClose0(promise);
                        } finally {
                            // Call invokeLater so closeAndDeregister is executed in the EventLoop again!
                            invokeLater(new Runnable() {
                                @Override
                                public void run() {
                                    if (outboundBuffer != null) {
                                        // 使寫(xiě)緩沖區(qū)中所有排隊(duì)的消息失敗
                                        outboundBuffer.failFlushed(cause, notify);
                                         // 關(guān)閉寫(xiě)緩沖區(qū)
                                        outboundBuffer.close(closeCause);
                                    }
                                    fireChannelInactiveAndDeregister(wasActive);
                                }
                            });
                        }
                    }
                });
            } else {
                try {
                    // Close the channel and fail the queued messages in all cases.
                    doClose0(promise);
                } finally {
                    if (outboundBuffer != null) {
                         // 使寫(xiě)緩沖區(qū)中所有排隊(duì)的消息失敗
                        outboundBuffer.failFlushed(cause, notify);
                         // 關(guān)閉寫(xiě)緩沖區(qū)
                        outboundBuffer.close(closeCause);
                    }
                }

                if (inFlush0) {
                    // 如果正在刷新操作,那么就讓 fireChannelInactiveAndDeregister 操作,
                    // 放到下一個(gè)事件輪詢(xún)周期中處理
                    invokeLater(new Runnable() {
                        @Override
                        public void run() {
                            fireChannelInactiveAndDeregister(wasActive);
                        }
                    });
                } else {
                    // 取消注冊(cè)和可能發(fā)送 ChannelInactive 事件
                    fireChannelInactiveAndDeregister(wasActive);
                }
            }
        }

        private void doClose0(ChannelPromise promise) {
            try {
                doClose();
                closeFuture.setClosed();
                safeSetSuccess(promise);
            } catch (Throwable t) {
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

        private void fireChannelInactiveAndDeregister(final boolean wasActive) {
            deregister(voidPromise(), wasActive && !isActive());
        }

方法流程:

  1. 通過(guò) closeInitiated 成員屬性保證關(guān)閉方法只調(diào)用一次,不能重復(fù)調(diào)用。
  2. 因?yàn)殛P(guān)閉連接,需要考慮寫(xiě)緩沖區(qū) ChannelOutboundBuffer 中的待寫(xiě)入數(shù)據(jù)的問(wèn)題。
  3. 通過(guò) prepareToClose() 方法,返回一個(gè)關(guān)閉通道的事件執(zhí)行器。
    • 如果不為空,那么就在這個(gè)事件執(zhí)行器中進(jìn)行接下來(lái)的關(guān)閉操作。
    • 如果為空,那么就在當(dāng)前線(xiàn)程進(jìn)行接下來(lái)的關(guān)閉操作。
  4. 調(diào)用 doClose0(promise) 方法,進(jìn)行關(guān)閉以及操作成功或失敗的相關(guān)通知。
  5. 處理寫(xiě)緩沖區(qū) outboundBuffer 中的數(shù)據(jù),并關(guān)閉寫(xiě)緩沖區(qū)。
  6. 最后調(diào)用 fireChannelInactiveAndDeregister 方法,取消管道注冊(cè),以及可能會(huì)發(fā)送 ChannelInactive 事件。

    如果在 doClose() 方法之后,通道從活躍變成不活躍的情況下,才會(huì)發(fā)送 ChannelInactive 事件。

2.7 shutdownOutput

       @UnstableApi
        public final void shutdownOutput(final ChannelPromise promise) {
            assertEventLoop();
            shutdownOutput(promise, null);
        }

        /**
         * 關(guān)閉相應(yīng)通道的輸出部分。
         * 例如,這將清理ChannelOutboundBuffer并不再允許任何寫(xiě)操作。
         */
        private void shutdownOutput(final ChannelPromise promise, Throwable cause) {
            if (!promise.setUncancellable()) {
                return;
            }

            final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                promise.setFailure(new ClosedChannelException());
                return;
            }
            // 禁止再向?qū)懢彌_區(qū) outboundBuffer 添加任何消息和刷新操作。
            this.outboundBuffer = null;

            final Throwable shutdownCause = cause == null ?
                    new ChannelOutputShutdownException("Channel output shutdown") :
                    new ChannelOutputShutdownException("Channel output shutdown", cause);
            Executor closeExecutor = prepareToClose();
            if (closeExecutor != null) {
                closeExecutor.execute(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            // 調(diào)用 AbstractChannel 的 doShutdownOutput 方法,
                            // 進(jìn)行 shutdown 操作
                            doShutdownOutput();
                            // 操作成功通知
                            promise.setSuccess();
                            // 操作失敗通知
                        } catch (Throwable err) {
                            promise.setFailure(err);
                        } finally {
                            // Dispatch to the EventLoop
                            eventLoop().execute(new Runnable() {
                                @Override
                                public void run() {
                                    // 在 Shutdown 的時(shí)候,關(guān)閉寫(xiě)緩沖區(qū) ChannelOutboundBuffer,
                                    // 并發(fā)送用戶(hù)通知事件
                                    closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
                                }
                            });
                        }
                    }
                });
            } else {
                try {
                    // 調(diào)用 AbstractChannel 的 doShutdownOutput 方法,
                    // 進(jìn)行 shutdown 操作
                    doShutdownOutput();
                    // 操作成功通知
                    promise.setSuccess();
                } catch (Throwable err) {
                    // 操作失敗通知
                    promise.setFailure(err);
                } finally {
                    // 在 Shutdown 的時(shí)候,關(guān)閉寫(xiě)緩沖區(qū) ChannelOutboundBuffer,
                    // 并發(fā)送用戶(hù)通知事件
                    closeOutboundBufferForShutdown(pipeline, outboundBuffer, shutdownCause);
                }
            }
        }

        /**
         * 在 Shutdown 的時(shí)候,關(guān)閉寫(xiě)緩沖區(qū) ChannelOutboundBuffer,
         * 并發(fā)送用戶(hù)通知事件
         */
        private void closeOutboundBufferForShutdown(
                ChannelPipeline pipeline, ChannelOutboundBuffer buffer, Throwable cause) {

            // 使寫(xiě)緩沖區(qū)中所有排隊(duì)的消息失敗
            buffer.failFlushed(cause, false);
            // 關(guān)閉寫(xiě)緩沖區(qū)
            buffer.close(cause, true);
            // 發(fā)送一個(gè)通道 Shutdown 的用戶(hù)通知事件
            pipeline.fireUserEventTriggered(ChannelOutputShutdownEvent.INSTANCE);
        }

shutdown 的方法流程和 close 很像,區(qū)別點(diǎn):

  • shutdown 是調(diào)用AbstractChanneldoShutdownOutput() 方法進(jìn)行相關(guān)操作,而 close 是調(diào)用AbstractChanneldoClose() 方法。
  • close 最后會(huì)取消注冊(cè),以及可能會(huì)發(fā)送ChannelInactive 事件。
  • shutdown 會(huì)發(fā)送一個(gè) ChannelOutputShutdownEvent.INSTANCE 用戶(hù)自定義的通知事件。

2.8 強(qiáng)制關(guān)閉 closeForcibly

        @Override
        public final void closeForcibly() {
            assertEventLoop();

            try {
                doClose();
            } catch (Exception e) {
                logger.warn("Failed to close a channel.", e);
            }
        }

你會(huì)發(fā)現(xiàn)只調(diào)用了AbstractChanneldoClose() 方法進(jìn)行關(guān)閉操作,不觸發(fā)任何事件,也不處理寫(xiě)緩沖區(qū)。只可能在某些特殊情況下調(diào)用,例如嘗試注冊(cè)失敗的時(shí)候。

2.9 開(kāi)始讀 beginRead

        @Override
        public final void beginRead() {
            assertEventLoop();

            try {
                doBeginRead();
            } catch (final Exception e) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireExceptionCaught(e);
                    }
                });
                close(voidPromise());
            }
        }

調(diào)用AbstractChanneldoBeginRead() 方法設(shè)置通道開(kāi)始讀取數(shù)據(jù)。

2.10 寫(xiě)操作 write

        @Override
        public final void write(Object msg, ChannelPromise promise) {
            assertEventLoop();

            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                // 寫(xiě)緩沖區(qū)為 null,
                try {
                    // 現(xiàn)在釋放資源,以防止資源泄漏
                    ReferenceCountUtil.release(msg);
                } finally {
                    //  如果outboundBuffer為空,我們就知道通道被關(guān)閉了,所以立即進(jìn)行失敗通知。
                    // See https://github.com/netty/netty/issues/2362
                    safeSetFailure(promise,
                            newClosedChannelException(initialCloseCause, "write(Object, ChannelPromise)"));
                }
                return;
            }

            int size;
            try {
                // 進(jìn)行消息的轉(zhuǎn)換,例如將堆緩沖區(qū)變成直接緩沖區(qū)
                msg = filterOutboundMessage(msg);
                // 估算數(shù)據(jù)的大小
                size = pipeline.estimatorHandle().size(msg);
                if (size < 0) {
                    size = 0;
                }
            } catch (Throwable t) {
                try {
                    // 失敗時(shí)需要釋放資源,以防止資源泄漏
                    ReferenceCountUtil.release(msg);
                } finally {
                    // 進(jìn)行操作失敗的通知
                    safeSetFailure(promise, t);
                }
                return;
            }

            // 將數(shù)據(jù)添加到寫(xiě)緩沖區(qū) outboundBuffer 中
            outboundBuffer.addMessage(msg, size, promise);
        }

方法流程

  1. 先判斷寫(xiě)緩沖區(qū) outboundBuffer 是不是為 null,為空說(shuō)明通道已關(guān)閉,進(jìn)行失敗通知。
  2. 通過(guò) filterOutboundMessage(msg) 方法進(jìn)行數(shù)據(jù)轉(zhuǎn)換,例如將堆緩沖區(qū)變成直接緩沖區(qū)。
  3. 估算數(shù)據(jù)大小。
  4. 通過(guò) outboundBuffer.addMessage(...) 方法,將數(shù)據(jù)添加到寫(xiě)緩沖區(qū) outboundBuffer 中。
  5. 如果發(fā)送異常,記得釋放數(shù)據(jù) msg 的引用,防止內(nèi)存泄露,并進(jìn)行操作失敗通知。

2.11 刷新 flush

       @Override
        public final void flush() {
            assertEventLoop();

            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            // 寫(xiě)緩沖區(qū)為空,直接返回
            if (outboundBuffer == null) {
                return;
            }

            // 將寫(xiě)緩沖區(qū)中的消息都標(biāo)記成待刷新
            outboundBuffer.addFlush();
            // 進(jìn)行刷新操作
            flush0();
        }

        @SuppressWarnings("deprecation")
        protected void flush0() {
            if (inFlush0) {
                // 避免重復(fù)刷新
                return;
            }

            final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            // 當(dāng)前寫(xiě)緩沖區(qū)沒(méi)有數(shù)據(jù),那么直接返回
            if (outboundBuffer == null || outboundBuffer.isEmpty()) {
                return;
            }

            // 避免重復(fù)刷新
            inFlush0 = true;

            // 如果通道處于非活動(dòng)狀態(tài),則將所有掛起的寫(xiě)請(qǐng)求標(biāo)記為失敗。
            if (!isActive()) {
                try {
                    // Check if we need to generate the exception at all.
                    if (!outboundBuffer.isEmpty()) {
                        if (isOpen()) {
                            outboundBuffer.failFlushed(new NotYetConnectedException(), true);
                        } else {
                            // Do not trigger channelWritabilityChanged because the channel is closed already.
                            outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause, "flush0()"), false);
                        }
                    }
                } finally {
                    // 刷新操作完成,將 inFlush0重新設(shè)置為 false,以便下次刷新。
                    inFlush0 = false;
                }
                return;
            }

            try {
                // 將給定緩沖區(qū)的內(nèi)容刷新到遠(yuǎn)端
                doWrite(outboundBuffer);
            } catch (Throwable t) {
                handleWriteError(t);
            } finally {
                // 刷新操作完成,將 inFlush0重新設(shè)置為 false,以便下次刷新。
                inFlush0 = false;
            }
        }
  • 通過(guò) inFlush0 成員屬性,來(lái)避免重復(fù)刷新。
  • 如果通道處于非活動(dòng)狀態(tài),則將所有掛起的寫(xiě)請(qǐng)求標(biāo)記為失敗。
  • 通過(guò) AbstractChanneldoWrite(outboundBuffer) 方法,將緩沖區(qū)的內(nèi)容刷新到遠(yuǎn)端。

2.12 小結(jié)

對(duì)比 Unsafe 的方法,你會(huì)發(fā)現(xiàn) AbstractUnsafe 中沒(méi)有實(shí)現(xiàn) connect(...) 連接方法。

對(duì)比發(fā)送入站IO事件:

  1. ChannelRegisteredChannelUnregistered

    • register 方法會(huì)發(fā)送 ChannelRegistered 事件。
    • deregister 方法只有在通道之前已經(jīng)注冊(cè)之后,才會(huì)發(fā)送 ChannelUnregistered 事件。
  2. ChannelActiveChannelInactive

    • 一般都是通道Channel從不活躍變成活躍,要發(fā)送 ChannelActive 事件;可能引起這個(gè)變化的操作有 bindconnect 操作。
    • 通道Channel從活躍變成不活躍,就要發(fā)送 ChannelInactive 事件;可能引起這個(gè)變化的操作有 disconnect,closeshutdown。
    • 最后如果第一次注冊(cè)時(shí),且當(dāng)前通道是活躍狀態(tài),也會(huì)發(fā)送 ChannelActive 事件。

三. ChannelOutboundBuffer

AbstractChannel.Unsafe 中看到用戶(hù)調(diào)用write(...) 方法寫(xiě)的數(shù)據(jù),會(huì)先添加到寫(xiě)緩沖區(qū) ChannelOutboundBuffer 中,然后調(diào)用 flush() 方法,才將寫(xiě)緩沖區(qū)中的數(shù)據(jù)發(fā)送到遠(yuǎn)端。

3.1 重要成員屬性

    // 在鏈表結(jié)構(gòu)中第一個(gè)被刷新的節(jié)點(diǎn)
    private Entry flushedEntry;

    // 在鏈表結(jié)構(gòu)中第一個(gè)未刷新的節(jié)點(diǎn)
    private Entry unflushedEntry;

    // 表示鏈表中最后一個(gè)節(jié)點(diǎn)
    private Entry tailEntry;
    // 等待刷新節(jié)點(diǎn)的數(shù)量
    private int flushed;

寫(xiě)緩沖區(qū)通過(guò)鏈表來(lái)儲(chǔ)存數(shù)據(jù)(依靠 Entry.next 來(lái)實(shí)現(xiàn)鏈表),鏈表形式 Entry(flushedEntry) --> ... Entry(unflushedEntry) --> ... Entry(tailEntry)

  • flushedEntry 表示第一個(gè)被刷新的節(jié)點(diǎn),在鏈表頭,當(dāng)然也是通過(guò) addFlush() 方法設(shè)置的。
  • unflushedEntry 表示第一個(gè)未刷新的節(jié)點(diǎn),表示還沒(méi)有被標(biāo)記刷新的第一個(gè)節(jié)點(diǎn)。
  • tailEntry 最后一個(gè)節(jié)點(diǎn)。
  • flushed 刷新節(jié)點(diǎn)的數(shù)量,這個(gè)屬性很重要,靠它來(lái)標(biāo)記刷新節(jié)點(diǎn),也就是說(shuō)從 flushedEntry 開(kāi)始, flushed 數(shù)量的節(jié)點(diǎn)都被標(biāo)記為刷新節(jié)點(diǎn)了。

3.2 重要方法

3.2.1 添加數(shù)據(jù)

這個(gè)方法一般在 AbstractChannel.AbstractUnsafewrite(...) 方法中調(diào)用。

 /**
     * 將給定的消息 msg 添加到ChannelOutboundBuffer中。
     * 一旦消息寫(xiě)入,給定的ChannelPromise將被通知。
     */
    public void addMessage(Object msg, int size, ChannelPromise promise) {
        // 將給定消息封裝成一個(gè)節(jié)點(diǎn)
        Entry entry = Entry.newInstance(msg, size, total(msg), promise);
        if (tailEntry == null) {
            flushedEntry = null;
        } else {
            // 將新消息節(jié)點(diǎn)添加到隊(duì)列尾
            Entry tail = tailEntry;
            tail.next = entry;
        }
        tailEntry = entry;
        if (unflushedEntry == null) {
            // 如果未刷新節(jié)點(diǎn)為空,說(shuō)明隊(duì)列節(jié)點(diǎn)都變成刷新節(jié)點(diǎn)了,
            // 那么這個(gè)新添加的節(jié)點(diǎn),就是未刷新節(jié)點(diǎn)的頭了。
            unflushedEntry = entry;
        }

        // See https://github.com/netty/netty/issues/1619
        // 向未刷新的數(shù)組添加消息后,增加掛起的字節(jié)數(shù)。
        incrementPendingOutboundBytes(entry.pendingSize, false);
    }
  • 先將數(shù)據(jù) msg 封裝成一個(gè)節(jié)點(diǎn) entry,并將節(jié)點(diǎn)添加到鏈表尾。
  • 如果 unflushedEntrynull,那么這個(gè)節(jié)點(diǎn)就是第一個(gè)未刷新節(jié)點(diǎn)。
  • incrementPendingOutboundBytes(...) 方法,增加掛起的字節(jié)數(shù),看是否需要改變通道的 可寫(xiě)屬性。

3.2.2 標(biāo)記刷新

這個(gè)方法一般在 AbstractChannel.AbstractUnsafeflush() 方法中調(diào)用。

  /**
     * 向此ChannelOutboundBuffer添加刷新。
     * 這意味著所有以前添加的消息都被標(biāo)記為刷新,因此您將能夠處理它們。
     */
    public void addFlush() {
        // There is no need to process all entries if there was already a flush before and no new messages
        // where added in the meantime.
        //
        // See https://github.com/netty/netty/issues/2577
        // 未刷新節(jié)點(diǎn)后面的鏈表示新添加的節(jié)點(diǎn)列表,都是要加入到刷新中
        Entry entry = unflushedEntry;
        if (entry != null) {
            if (flushedEntry == null) {
                // there is no flushedEntry yet, so start with the entry
                flushedEntry = entry;
            }
            do {
                flushed ++;
                // 將所有要刷新的節(jié)點(diǎn)變成不可取消的
                if (!entry.promise.setUncancellable()) {
                    // Was cancelled so make sure we free up memory and notify about the freed bytes
                    // 掛起消息被取消,所以確保我們釋放內(nèi)存并通知釋放的字節(jié)
                    int pending = entry.cancel();
                    decrementPendingOutboundBytes(pending, false, true);
                }
                entry = entry.next;
            } while (entry != null);

            // 節(jié)點(diǎn)都變成已刷新的了,未刷新節(jié)點(diǎn)就設(shè)置為 null
            unflushedEntry = null;
        }
    }
  • 將從 unflushedEntry 未刷新節(jié)點(diǎn)開(kāi)始到鏈表尾的所有節(jié)點(diǎn)都標(biāo)記為刷新。通過(guò) flushed++ 來(lái)增加刷新節(jié)點(diǎn)數(shù)量。
  • 調(diào)用 setUncancellable(...) 要寫(xiě)入的節(jié)點(diǎn)是不可取消的,如果設(shè)置失敗,就要取消掛起數(shù)據(jù),并調(diào)用 decrementPendingOutboundBytes(...) 減少掛起字節(jié)數(shù),看是否需要改變通道的 可寫(xiě)屬性。

3.2.3 刪除節(jié)點(diǎn)


    /**
     * 將刪除當(dāng)前消息,將其ChannelPromise標(biāo)記為success并返回true。
     * 如果在調(diào)用此方法時(shí)不存在刷新的消息,則返回false,表示沒(méi)有準(zhǔn)備好處理的消息。
     */
    public boolean remove() {
        Entry e = flushedEntry;
        if (e == null) {
            clearNioBuffers();
            return false;
        }
        Object msg = e.msg;

        ChannelPromise promise = e.promise;
        int size = e.pendingSize;

        removeEntry(e);

        if (!e.cancelled) {
            // only release message, notify and decrement if it was not canceled before.
            ReferenceCountUtil.safeRelease(msg);
            safeSuccess(promise);
            decrementPendingOutboundBytes(size, false, true);
        }

        // recycle the entry
        e.recycle();

        return true;
    }

    private void removeEntry(Entry e) {
        if (-- flushed == 0) {
            // flushed == 0, 表示所有刷新節(jié)點(diǎn)都被處理了
            flushedEntry = null;
            if (e == tailEntry) {
                tailEntry = null;
                unflushedEntry = null;
            }
        } else {
            // 將下一個(gè)節(jié)點(diǎn)變成刷新節(jié)點(diǎn)
            flushedEntry = e.next;
        }
    }

當(dāng)緩存區(qū)當(dāng)前刷新節(jié)點(diǎn)數(shù)據(jù)被寫(xiě)入到遠(yuǎn)端了,那么調(diào)用這個(gè) remove() 方法,移除當(dāng)前節(jié)點(diǎn),得到下一個(gè)刷新節(jié)點(diǎn)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容