Netty服務(wù)端流程源碼分析(Netty4.1.25)

Netty的4個(gè)重要內(nèi)容

通過閱讀Netty的相關(guān)實(shí)現(xiàn),剛好看到網(wǎng)上的一篇文章給的總結(jié),覺得挺適合作為Netty的核心內(nèi)容來研究的
1.Reactor線程模型:高性能多線程設(shè)計(jì)思路
2.Netty中自己定義的channel概念:增強(qiáng)版的NIOchannel
3.ChannelPipeline責(zé)任鏈設(shè)計(jì)模式:事件處理機(jī)制
4.內(nèi)存管理:增強(qiáng)型byteBuf緩沖區(qū)

Server端啟動(dòng)相關(guān)代碼例子

public static void main(String[] args) throws Exception {
    EventLoopGroup pGroup = new NioEventLoopGroup(1);
    EventLoopGroup cGroup = new NioEventLoopGroup();
    ServerBootstrap b = new ServerBootstrap();
    b.group(pGroup, cGroup)

            // Server端 NioServerSocketChannel.class,Client 端 NioSocketChannel.class
            .channel(NioServerSocketChannel.class)

            // options.put(option, value)
            .option(ChannelOption.SO_BACKLOG, 1024)

            // childOptions.put(childOption, value);
            .childOption( ChannelOption.SO_TIMEOUT, 20000)

            // 設(shè)置 handler 屬性,
            .handler(new LoggingHandler(LogLevel.INFO))

            // 設(shè)置 childHandler 屬性(多個(gè)可以用 ChannelInitializer 來設(shè)置)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                protected void initChannel(SocketChannel sc) throws Exception {
                    sc.pipeline().addLast(new NettyMessageDecoder<>(AttachmentRequest.class, ProtostuffSerializer.class, 1 << 20, 2, 4));
                    sc.pipeline().addLast(new NettyMessageEncoder<>(ResponseBean.class, ProtostuffSerializer.class));
                    sc.pipeline().addLast(new ServerHandler());
                }
            });

    ChannelFuture cf = b.bind(8765).sync();

    cf.channel().closeFuture().sync();
    pGroup.shutdownGracefully();
    cGroup.shutdownGracefully();

}

NioEventLoopGroup 創(chuàng)建以及初始化工作

我們通常會(huì)通過下面代碼來實(shí)例化兩個(gè) Group

EventLoopGroup pGroup = new NioEventLoopGroup(1);

EventLoopGroup cGroup = new NioEventLoopGroup();

當(dāng)我們通過 new NioEventLoopGroup() 實(shí)例化一個(gè) NioEventLoopGroup 對(duì)象時(shí),由于繼承關(guān)系 NioEventLoopGroup --> MultithreadEventLoopGroup --> MultithreadEventExecutorGroup。通過 NioEventLoopGroup 的構(gòu)造器調(diào)用鏈,我們直接來到其父類 io.netty.util.concurrent.MultithreadEventExecutorGroup 構(gòu)造器中進(jìn)行分析相關(guān)實(shí)現(xiàn)邏輯

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
                                        EventExecutorChooserFactory chooserFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    if (executor == null) {
        executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
    }

    children = new EventExecutor[nThreads];

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            children[i] = newChild(executor, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
               // 關(guān)閉資源
                }
            }
        }
    }

    chooser = chooserFactory.newChooser(children);

    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }

    Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
    Collections.addAll(childrenSet, children);
    readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

NioEventLoopGroup 線程數(shù)

初始化NioEventLoopGroup時(shí),如果我們?cè)O(shè)置線程數(shù),則會(huì)根據(jù)我們?cè)O(shè)置的線程數(shù)來在需要的時(shí)候創(chuàng)建線程,如果不設(shè)置,則首先根據(jù)如下代碼獲取可創(chuàng)建的線程數(shù):

SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2);
//首先是檢查我們是否配置了 io.netty.eventLoopThreads 參數(shù),如果沒配置則根據(jù)當(dāng)前 JVM 可使用的 CPU 數(shù)量乘以2 來初始化線程數(shù)。

1、初始化 executor 變量

NioEventLoopGroup創(chuàng)建過程中,會(huì)遞歸調(diào)用父類的構(gòu)造器完成相應(yīng)的初始化操作,在父類 MultithreadEventExecutorGroup 中會(huì)執(zhí)行一個(gè) executor變量的初始化,此變量是一個(gè) ThreadPerTaskExecutor 類型,也就是需要實(shí)例化一個(gè) ThreadPerTaskExecutor 對(duì)象賦值給 executor,而實(shí)例化 ThreadPerTaskExecutor 時(shí)需要傳入一個(gè) ThreadFactory 對(duì)象作為構(gòu)造參數(shù),在 MultithreadEventExecutorGroup 類中是通過實(shí)例化 DefaultThreadFactory 對(duì)象作為 ThreadFactory 參數(shù)傳入的。

就如 ThreadPerTaskExecutor 名稱所示,只要我們調(diào)用 execute(Runnable command) 方法來提交任務(wù),此executor就會(huì)新建一個(gè)線程來執(zhí)行任務(wù)。

2、初始化 children 數(shù)組并完成各個(gè)元素的實(shí)例化

  1. 根據(jù)線程數(shù)來創(chuàng)建一個(gè) children 數(shù)組(長(zhǎng)度為線程數(shù)),數(shù)組類型是 EventExecutor 類型的;
  2. 循環(huán)為每個(gè)元素初始化一個(gè) NioEventLoop 對(duì)象(由子類來創(chuàng)建EventLoop 對(duì)象);需要傳入兩個(gè)參數(shù),一個(gè)是 上一步始化好的 executor 對(duì)象,一個(gè)是 NioEventLoopGroup 中傳來的參數(shù)信息,包括的元素為: SelectorProvider,SelectStrategyFactory ,RejectedExecutionHandler 此三個(gè)元素。在 NioEventLoop 構(gòu)造方法被調(diào)用時(shí)(包括父類構(gòu)造方法)主要做了如下工作:
  • 【1】在超類 SingleThreadEventLoop 中創(chuàng)建了一個(gè)引用為 tailTasks 的 LinkedBlockingQueue 隊(duì)列。

  • 【2】 在超類 SingleThreadEventExecutor 中也創(chuàng)建了 一個(gè)引用為 taskQueue 的 LinkedBlockingQueue 隊(duì)列。

  • 【3】 NioEventLoop 把 所屬的 NioEventLoopGroup 對(duì)象以 parent 變量傳給超類,并存入超類的 parent 變量中。

  • 【4】初始化 Nio Selector 對(duì)象信息,

  • 【5】初始化 NioEventLoop 的 executor 信息,此信息是公用 MultithreadEventExecutorGroup 中的 executor 實(shí)例(復(fù)用)所以后面邏輯 每個(gè) NioEventLoop 都各自執(zhí)被調(diào)用 execute 提交任務(wù)并啟動(dòng) EventLoop線程來執(zhí)行 selector.select() 獲取就緒事件并調(diào)用 pipeline中的 handler進(jìn)行處理,以及執(zhí)行 taskQueue中的方法(也就是各自啟動(dòng)一個(gè)線程來執(zhí)行自己的 selector 事件 以及執(zhí)行自己 taskQueue中的任務(wù));

其中上面的兩個(gè)隊(duì)列長(zhǎng)度,最小為 16 ,最大為 Integer.MAX , 取決于 io.netty.eventLoop.maxPendingTasks 參數(shù)

DEFAULT_MAX_PENDING_TASKS = Math.max(16, SystemPropertyUtil.getInt("io.netty.eventLoop.maxPendingTasks", Integer.MAX_VALUE))

以上 SystemPropertyUtil.getInt( v1, v2) : 如果v1 不為空返回 v1, 否則范湖 v2;

3、初始化 chooser 對(duì)象

chooser 對(duì)象是根據(jù)上一步初始化好的 children 數(shù)組長(zhǎng)度來創(chuàng)建一個(gè) EventExecutorChooser 對(duì)象;其中根據(jù) children 數(shù)組元素個(gè)數(shù)是否是 2的N 次方來看是實(shí)例化 PowerOfTowEventExecutorChooser 還是實(shí)例化 GenericEventExecutorChooser ;其中兩個(gè)功能都是一樣,根據(jù)任務(wù)數(shù)取余來獲取 children 中的 NioEventLoop 對(duì)象;但當(dāng) children 的元素個(gè)數(shù)是 2 的 N 次方時(shí)可以根據(jù)“與運(yùn)算” 來取余,速更度快。

4、為 children 數(shù)組中的每個(gè) NioEventLoop 添加一個(gè) terminationListener

首先是創(chuàng)建一個(gè) FutureListener 實(shí)例作為 terminationListener ,然后循環(huán) children 為每個(gè)元素(的 terminationListener )注冊(cè)一個(gè)FutureListener ,此監(jiān)聽器的實(shí)現(xiàn)就是當(dāng)當(dāng) terminatedChildren 數(shù)值等于 children.length 的時(shí)候,則調(diào)用 terminationFuture.setSuccess(null) ,具體作用和具體什么場(chǎng)景下調(diào)用應(yīng)該是終止程序有關(guān)。

其中 e.terminationFuture() 返回的是 NioEventLoop 的父類 SingleThreadEventExecutor 中的 terminationFuture 值。

5、最后完成 readonlyChildren 的初始化

最后剩余的代碼就是根據(jù) children 中的元素來生成一個(gè)不可變的 Set 集合,并賦值給 readonlyChildren 變量,方便后續(xù)的遍歷

ServerBootstrap 初始化以及啟動(dòng)流程分析

ServerBootstrap創(chuàng)建以及初始化工作

ServerBootstrap 創(chuàng)建時(shí),會(huì)完成相關(guān)全局變量的初始化(包括一些集合等信息)

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {

private static final InternalLogger logger = InternalLoggerFactory.getInstance(ServerBootstrap.class);

private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();

private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();

private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);

private volatile EventLoopGroup childGroup;

private volatile ChannelHandler childHandler;

public ServerBootstrap() { }

當(dāng)我們調(diào)用 ServerBootstrap 默認(rèn)構(gòu)造參數(shù)來創(chuàng)建 ServerBootstrap 對(duì)象時(shí),會(huì)執(zhí)行全局參數(shù)的初始話, 包括父類的 options 和 attrs 集合信息(也就是為 childGroup parentGroup 分別初始化好這些集合信息),這些集合為我們后面?zhèn)魅氲某跏蓟畔⒆霰4娴?,因此后面我們就可以通過調(diào)用 ServerBootstrap 相關(guān)方法來完成相應(yīng)屬性的初始化工作,類似;

ServerBootstrap b = new ServerBootstrap();
b.group(pGroup, cGroup)

        // Server端 NioServerSocketChannel.class,Client 端 NioSocketChannel.class
        .channel(NioServerSocketChannel.class)

        // options.put(option, value)
        .option(ChannelOption.SO_BACKLOG, 1024)

        // childOptions.put(childOption, value);
        .childOption( ChannelOption.SO_TIMEOUT, 20000)

        // 設(shè)置 handler 屬性,
        .handler(new LoggingHandler(LogLevel.INFO))

        // 設(shè)置 childHandler 屬性(多個(gè)可以用 ChannelInitializer 來設(shè)置)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            protected void initChannel(SocketChannel sc) throws Exception {
                sc.pipeline().addLast(new NettyMessageDecoder<>(AttachmentRequest.class, ProtostuffSerializer.class, 1 << 20, 2, 4));
                sc.pipeline().addLast(new NettyMessageEncoder<>(ResponseBean.class, ProtostuffSerializer.class));
                sc.pipeline().addLast(new ServerHandler());
            }
        });

調(diào)用 Bootstrap#bind(int) 方法完成相應(yīng)信息的綁定以及注冊(cè)

對(duì)應(yīng)例子中的如下bind() 方法

ChannelFuture cf = b.bind(8765).sync()

最終調(diào)用到 io.netty.bootstrap.AbstractBootstrap#doBind 方法:

private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }

    if (regFuture.isDone()) {
        // At this point we know that the registration was complete and successful.
        ChannelPromise promise = channel.newPromise();
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();

                    doBind0(regFuture, channel, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

接下來我們根據(jù)源碼進(jìn)行分析

NioServerSocketChannel的創(chuàng)建以及EventLoop的注冊(cè)

在doBind() 方法中通過調(diào)用 initAndRegister() 來完成Channel 的初始化以及完成Channel到EventLoop的注冊(cè)邏輯

1. 根據(jù) channelFactory創(chuàng)建一個(gè) Channel 實(shí)例,服務(wù)端這里對(duì)應(yīng)的是 NioServerSocketChannel 實(shí)例

  • 【1】在創(chuàng)建 NioServerSocketChannel 時(shí)會(huì)根據(jù) java.nio.channels.spi.SelectorProvider#openServerSocketChannel 創(chuàng)建一個(gè) ServerSocketChannel 實(shí)例,此實(shí)例信息被記錄在父類 AbstractNioChannel 的 ch 變量中(后續(xù)可調(diào)用 AbstractNioChannel#javaChannel() 獲取),并設(shè)置 ch.configureBlocking(false) 以及 readInterestOp = SelectionKey.OP_ACCEPT

  • 【2】同時(shí)初始化 unsafe 變量以及 pipeline = new DefaultChannelPipeline(this);

  • 【3】 調(diào)用 config = new NioServerSocketChannelConfig(this, javaChannel().socket()) 完成 config的初始化,此初始化會(huì)創(chuàng)建一個(gè) AdaptiveRecvByteBufAllocator 實(shí)例來為 channel 管理 ByteBuffer信息(擴(kuò)容縮容等);

2. 調(diào)用ServerBootstrap#init( channel ) 初始化Channel信息

  • 【1】根據(jù) options 信息以及 attrs 信息初始化NioServerSocketChannel 對(duì)象信息(以備 NIO channel使用)

  • 【2】調(diào)用 channel.pipeline() 獲取上面創(chuàng)建的 pipeline 對(duì)象 (DefaultChannelPipeline ),然后調(diào)用 pipeline 的 addLast( ChannelHandler )方法首先添加 bootstrap 中配置的 handler 信息,然后提交一個(gè)任務(wù)到 channel eventLoop 中,此任務(wù)是為 當(dāng)前Pipeline 添加一個(gè) ServerBootstrapAcceptor 類型的Handler信息,整體代碼為:

p.addLast(new ChannelInitializer<Channel>() {
    @Override
    public void initChannel(final Channel ch) throws Exception {
        final ChannelPipeline pipeline = ch.pipeline();
        ChannelHandler handler = config.handler();
        if (handler != null) {
            pipeline.addLast(handler);
        }

        ch.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                pipeline.addLast(new ServerBootstrapAcceptor(
                        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
            }
        });
    }
});

【重點(diǎn)】在 DefaultChannelPipeline#addLast( ChannelHandler)方法中,首先會(huì)根據(jù)當(dāng)前的 ChannelHandler來創(chuàng)建 ctx 實(shí)例并插入到 pipeline中,同時(shí)會(huì)調(diào)用 ChannelHandler#handlerAdded() 方法,而 ChannelInitializer#handlerAdded() 方法中又調(diào)用了 ChannelInitializer#initChannel() 方法,所以會(huì)輪訓(xùn)調(diào)用到 我們實(shí)現(xiàn)的 initChannel() 來完成相關(guān) Handler 的加入邏輯;

以上代碼中的 channel 目前還未配置上 eventLoop 變量信息,需要在下面的 register(channel) 進(jìn)行注冊(cè)之后才配置上 EventLoop;

3. 完成 Channel 到 EventLoop 以及對(duì)應(yīng)的 Selelctor 的注冊(cè)
調(diào)用 ChannelFuture regFuture = config().group().register(channel) 并返回此 regFutre 對(duì)象(DefaultChannelPromise 類型);在創(chuàng)建 NioEventLoopGroup 時(shí)就通過 config = new ServerBootstrapConfig(this) 初始化了 config 變量;所以此處的 config().group()獲取的是注入ServerBootstrap 中的 NioEventLoopGroup 對(duì)象(return bootstrap.group()),也就是 parentGroup 信息;

  • 【1】調(diào)用 MultithreadEventLoopGroup#register(io.netty.channel.Channel) 方法,在此方法中 調(diào)用 next().register(channel) 來完成相關(guān)業(yè)務(wù)邏輯

  • 【2】next() 方法的邏輯 是調(diào)用 MultithreadEventExecutorGroup#chooser 中的 next() 方法來獲取下一個(gè) NioEventLoop 類型的 EventExecutor 信息,實(shí)現(xiàn)邏輯就是 chooser 內(nèi)部維護(hù)一個(gè)AtomicInteger idx, 根據(jù) idx.getAndIncrement() 來與 children 數(shù)組長(zhǎng)度求余來獲取數(shù)組中的元素并返回;實(shí)現(xiàn)的策略根據(jù)任務(wù)數(shù)取余來獲取 children 中的 NioEventLoop 對(duì)象;但當(dāng) children 的元素個(gè)數(shù)是 2 的 N 次方時(shí) 可以根據(jù)位與運(yùn)算來取余,速更度快;

  • 【3】由于 Eventloop 都繼承于 SingleThreadEventLoop 所以 最終會(huì)調(diào)用 SingleThreadEventLoop#register(Channel) 來完成相關(guān)邏輯最終調(diào)用鏈:AbstractChannel 類中的 AbstractUnsafe#register -> SingleThreadEventExecutor#execute() --> SingleThreadEventExecutor#addTask() ; 最后返回 DefaultChannelPromise 實(shí)例信息

AbstractChannel 類中的 AbstractUnsafe#register() :初始化當(dāng)前的 AbstractChannel.this.eventLoop = eventLoop , 也就是完成當(dāng)前channel 到 NioEventLoop 的綁定;提交 pipeline 任務(wù)到 taskQuene 任務(wù)列表中(由于不是 eventLoop執(zhí)行線程,所以以任務(wù)方式提交執(zhí)行內(nèi)容,也就是調(diào)用 SingleThreadEventExecutor#execute() 提交 Runnable),并通過 CAS 來啟動(dòng) NioEventLoop中的線程;大概流程為:

SingleThreadEventExecutor#execute( Runnable ) --> SingleThreadEventExecutor#addTask() (如果添加task 失敗則調(diào)用拒絕策略 rejectedExecutionHandler.rejected(task, this) ) ; 并且判斷當(dāng)前線程是不是負(fù)責(zé)執(zhí)行 EventLoop 中的任務(wù)(SingleThreadEventExecutor#taskQueue)的線程,如果是則直接退出,否則調(diào)用 SingleThreadEventExecutor#startThread() 通過 CAS來嘗試啟動(dòng) SingleThreadEventExecutor 中的線程(如果啟動(dòng)過了則不執(zhí)行任何邏輯,否則通過向 ThreadPerTaskExecutor 提交一個(gè) Runable 來啟動(dòng)線程執(zhí)行任務(wù)信息)—— 所以如上提到,雖然 SingleThreadEventExecutor 每次提交任務(wù)都會(huì)啟動(dòng)一個(gè)新線程來執(zhí)行,但一個(gè) SingleThreadEventExecutor 只會(huì)提交一次(CAS成功才提交),并且 SingleThreadEventExecutor 中的線程運(yùn)行之后會(huì)一直循環(huán)拉取 selector中就緒的事件和 taskQueue中的任務(wù),直到異?;蛘叱绦蚪K止;

再次總結(jié)上面的執(zhí)行的邏輯:

  • (1)完成 channel 到給定的(通過 chooser 選擇得到的) eventLoop 的綁定任務(wù);
  • (2)提交Runnable 任務(wù)到 eventLoop 的 taskQuene中, 并通過 CAS 檢查是否需要啟動(dòng) eventLoop 的線程(線程會(huì)運(yùn)行 NioEventLoop#run()),如果未啟動(dòng)則進(jìn)行啟動(dòng),否則忽略;

io.netty.channel.AbstractChannel.AbstractUnsafe#register 代碼如下:

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
// ........異常檢查等邏輯刪除
    AbstractChannel.this.eventLoop = eventLoop;
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
        }
    }
}

private void register0(ChannelPromise promise) {
    try {
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;

        // 完成 nio channel 到 nio selector 的注冊(cè),其中興趣事件值為0
        doRegister();
        neverRegistered = false;
        registered = true;
        // 會(huì)檢查是否有 pending Handler 完成 handlerAdded 方法的調(diào)用,也就是確保標(biāo)記channel 為 registration 之前確保所有 handler 都加入到 pipeline中
        pipeline.invokeHandlerAddedIfNeeded();
       
         // 標(biāo)記此 future為成功狀態(tài),并通知listeners列表,把  io.netty.util.concurrent.DefaultPromise#result 字段標(biāo)記為 SUCCESS
        safeSetSuccess(promise);
        // 傳播 channelRegistered事件(追蹤主流程handler(head + tail ),沒發(fā)現(xiàn)做什么處理)
        pipeline.fireChannelRegistered();
        //  isActive() 邏輯是判斷是否已完成端口號(hào)的綁定,如果已完成則完成 channel active 事件在 pipeline 上的傳播.
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                // See https://github.com/netty/netty/issues/4805
                beginRead();
            }
        }
    } catch (Throwable t) {
}
  • 【4】channel 到 selector 的綁定

其中 register0 方法中的 doRegister() 方法是完成 nio Channel 到 selector 的注冊(cè)邏輯,此處注冊(cè)的興趣事件為 0 ,并把當(dāng)前channel 作為 attach 信息:

selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);

isActive() 邏輯是判斷是否已完成端口號(hào)的綁定,如果已完成則執(zhí)行 channel active 事件在 pipeline 上的傳播.由于綁定端口號(hào)是在后面邏輯完成,所以此處暫不分析,也就是到目前,channel 雖然注冊(cè)到selector 上了,但注冊(cè)的興趣事件是空的,也就是 ops 為0,后續(xù)我們?cè)僬艺以谀睦镒?cè)上了 ACCPT事件(值為16);

至此 final ChannelFuture regFuture = initAndRegister() 邏輯分析完畢;

調(diào)用 doBind0(regFuture, channel, localAddress, promise) 完成端口號(hào)的綁定

首先是根據(jù) regFuture.isDone() 來判斷當(dāng)前Future 是否已執(zhí)行完成,如果已完成則直接調(diào)用 dobind0() 方法進(jìn)行端口號(hào)的綁定,否則添加一個(gè) pending 任務(wù),把綁定邏輯傳給 regFuture 的監(jiān)聽列表,等Futrue 邏輯處理完成后再進(jìn)行監(jiān)聽列表的回調(diào)來完成綁定邏輯的調(diào)用:由于上面 register0() 中已調(diào)用 safeSetSuccess(promise) 把 futrue result 標(biāo)記為 SUCCESS, 所以此處的 isDone() 返回true, 也就是直接調(diào)用 doBind0(regFuture, channel, localAddress, promise) 進(jìn)行端口號(hào)的綁定, 然后返回 promise 對(duì)象。

if (regFuture.isDone()) {
    // At this point we know that the registration was complete and successful.
    ChannelPromise promise = channel.newPromise();
    doBind0(regFuture, channel, localAddress, promise);
    return promise;
} else {
    // Registration future is almost always fulfilled already, but just in case it's not.
    final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
    regFuture.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            Throwable cause = future.cause();
            if (cause != null) {
                // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                // IllegalStateException once we try to access the EventLoop of the Channel.
                promise.setFailure(cause);
            } else {
                // Registration was successful, so set the correct executor to use.
                // See https://github.com/netty/netty/issues/2586
                promise.registered();

                doBind0(regFuture, channel, localAddress, promise);
            }
        }
    });
    return promise;
}

而 doBindo() 方法的邏輯就是給當(dāng)前 channel 的 eventLoop 提交一個(gè) bind 任務(wù),也就是調(diào)用 SingleThreadEventExecutor#execute( Runnable ) 提交一個(gè)任務(wù);任務(wù)功能是完成端口號(hào)的綁定,調(diào)用任務(wù)鏈為:

DefaultChannelPipeline#bind(SocketAddress, ChannelPromise) ---> AbstractChannel#bind(SocketAddress,ChannelPromise) --> tail.bind(localAddress, promise) ..........tail --> head方向....... --> DefaultChannelPipeline.HeadContext#bind() ----> AbstractUnsafe#bind() -->AbstractUnsafe#bind ---》 NioServerSocketChannel#doBind --》javaChannel().socket().bind(localAddress, config.getBacklog()) (Nio 的端口綁定邏輯)綁定完后,調(diào)用 promise.trySuccess() 進(jìn)行標(biāo)記( Marks this future as a success and notifies all listeners. )

而重要邏輯在 AbstractChannel.AbstractUnsafe#bind 方法中,源代碼如下(僅留要分析的重要代碼):

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    assertEventLoop();

    boolean wasActive = isActive();
    try {
        doBind(localAddress);
    } catch (Throwable t) {
    }

    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }

    safeSetSuccess(promise);
}

完成端口號(hào)的綁定邏輯

首先 bind 的調(diào)用會(huì)通過 DefaultChannelPipeline ,從 tail 方向開始,流入head, 最終在 DefaultChannelPipeline.HeadContext#bind()中調(diào)用到 AbstractUnsafe#bind() 方法;

AbstractUnsafe#bind ---> NioServerSocketChannel#doBind --> javaChannel().socket().bind(localAddress, config.getBacklog()) ;

最終到 java.net.ServerSocket#bind(java.net.SocketAddress, int) 方法中,此處便是熟悉的 NIO Server 端端口號(hào)的綁定了;其實(shí)Netty對(duì)端口號(hào)的綁定也沒什么復(fù)雜的邏輯,只是需要通過 pipeline 調(diào)用 bind 從 tail 方向傳播到 head 方向;最終在 head 時(shí)調(diào)用 unsafe 來調(diào)用 jdk 完成 server 端口的綁定;

通過pipeline.fireChannelActive() 的調(diào)用完成 ACCEPT 事件的綁定

在之前的 channel 到 selector 的綁定中,只是注冊(cè)了 ops 為0 ,也就是并綁定任何事件信息。而真正 ACCEPT 事件綁定是在此處代碼才完成綁定的

上面方法中,對(duì)于 if (!wasActive && isActive()) 的判斷,其實(shí)就是判斷在調(diào)用 doBind 之前, javaChannel().socket().isBound() 是false, 而在調(diào)用之后,返回true, 也就是之前socket是未綁定,調(diào)用 doBind 后完成了綁定的情況 則會(huì)調(diào)用到if里面的邏輯。

也就是往 eventLoop 的任務(wù)列表中插入一條對(duì) pipeline.fireChannelActive() 進(jìn)行調(diào)用的任務(wù);此方法會(huì)調(diào)用 AbstractChannelHandlerContext.invokeChannelActive(head) 方法,也就是從pipeline 的 head 方向往 tail 方向調(diào)用channelActive() 方法 ,而在 DefaultChannelPipeline.HeadContext#channelActive 方法中:

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    ctx.fireChannelActive();

    readIfIsAutoRead();
}

private void readIfIsAutoRead() {
    if (channel.config().isAutoRead()) {
        channel.read();
    }
}

也就是會(huì)判斷 channel # config # isAutoRead 是否為true, 為true 的話則調(diào)用 channel.read() 方法。而默認(rèn)則是返回 true的,所以會(huì)調(diào)用 channel read(); 也就是會(huì)從 tail.read() ----> head.read() 方向進(jìn)行調(diào)用,最終在 HeadContext#read 方法中進(jìn)行 AbstractUnsafe#beginRead() 方法的調(diào)用,最終調(diào)用 AbstractNioChannel#doBeginRead,源碼如下:

@Override
protected void doBeginRead() throws Exception {
    // Channel.read() or ChannelHandlerContext.read() was called
    final SelectionKey selectionKey = this.selectionKey;

    final int interestOps = selectionKey.interestOps();
    if ((interestOps & readInterestOp) == 0) {
        selectionKey.interestOps(interestOps | readInterestOp);
    }
}

其中 AbstractNioChannel#readInterestOp 變量在上面我們也有提到過,在實(shí)例化 NioServerSocketChannel 時(shí)會(huì)設(shè)置 ch.configureBlocking(false) 以及 readInterestOp = SelectionKey.OP_ACCEPT;因此也就在此步完成了 channel ACCEPT 事件的綁定;

調(diào)用 safeSetSuccess(promise) 執(zhí)行 future SUCCESS 標(biāo)記

此邏輯在上面也大概提過,就是把promise 的 result 嘗試標(biāo)記為 CUCCESS并完成此 future中的 listeners 列表的信息的回調(diào); 也就是上面提到的調(diào)用 promise.trySuccess() 進(jìn)行標(biāo)記( Marks this future as a success and notifies all listeners. )

NioEventLoop#run() 代碼邏輯分析

每個(gè)EventLoop 都會(huì)對(duì)應(yīng)一個(gè)線程,并在線程中 for 循環(huán):

  1. 拉取自己 Selector 上的 Channel 事件信息(READ、WRITE、 ACCEPT、 CONNECTION);

  2. 調(diào)用 processSelectedKeys() 處理 Channel 事件,包括傳播到 channel 對(duì)應(yīng)的 pipeline 中由各個(gè) handler 處理

  3. 執(zhí)行 taskQueue 中的任務(wù),確保提交的任務(wù)都有機(jī)會(huì)執(zhí)行

io.netty.channel.nio.NioEventLoop#run() 源碼如下(部分已刪除):

@Override
protected void run() {
    for (;;) {
        try {
            switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                case SelectStrategy.CONTINUE:
                    continue;
                case SelectStrategy.SELECT:
                    select(wakenUp.getAndSet(false));
                    if (wakenUp.get()) {
                        selector.wakeup();
                    }
                    // fall through
                default:
            }
            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;
            if (ioRatio == 100) {
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    runAllTasks();
                }
            } else {
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys();
                } finally {
                    // Ensure we always run tasks.
                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        // Always handle shutdown even if the loop processing threw an exception.
        try {
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

下面我們進(jìn)一步具體分析各個(gè)步驟

執(zhí)行select() 拉取準(zhǔn)備事件

首先根據(jù)任務(wù)列表(taskQueue、tailTasks)是否為空來執(zhí)行 select() 的策略,執(zhí)行完select() ,主要是執(zhí)行 selecteNow() 還是 select( timeOut), 以及確定 timeOut 的時(shí)間(默認(rèn)是一秒)

調(diào)用 processSelectedKeys() 處理就緒事件

1、獲取事件

在 NioEventLoop#openSelector() 方法中,在創(chuàng)建 nio selector 對(duì)象時(shí),nettty 通過判斷 io.netty.noKeySetOptimization 參數(shù)來決定是否執(zhí)行 selector 的相關(guān)優(yōu)化功能,如果執(zhí)行優(yōu)化操作的話,netty會(huì)通過 反射把 SelectorImpl#publicKeys 和 SelectorImpl#publicSelectedKeys 對(duì)象置換成自己的 SelectedSelectionKeySet 對(duì)象,然后把 nio selector 封裝成一個(gè) SelectedSelectionKeySetSelector 對(duì)象,具體優(yōu)化可以查看相關(guān)代碼,由于默認(rèn)沒配置,所以此處不進(jìn)行優(yōu)化代碼邏輯的剖析;

在執(zhí)行完 select() 操作之后就可以通過 selector.selectedKeys() 來獲取就緒事件的集合,接著執(zhí)行事件的處理邏輯,對(duì)應(yīng)的 NioEventLoop#processSelectedKeysPlain() 的部分源碼如下 :

private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
    Iterator<SelectionKey> i = selectedKeys.iterator();
    for (;;) {
        final SelectionKey k = i.next();
        final Object a = k.attachment();
        i.remove();

        if (a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }
        if (!i.hasNext()) {
            break;
        }
    }
}

首先判斷 SelelctionKey # attachment() 對(duì)象是否屬于 AbstractNioChannel 類型,其中在上面我們已經(jīng)分析過 (其中 register0 方法中的 doRegister() 方法是完成 nio Channel 到 selector 的注冊(cè)邏輯,并把當(dāng)前channel 作為 attach 信息),所以此處的 attachment 是屬于 AbstractNioChannel 類型的,所以會(huì)走到 NioEventLoop#processSelectedKey( SelectionKey, AbstractNioChannel) 的方法邏輯:


private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    if (!k.isValid()) {
        // 略.......
        return;
    }

    try {
        int readyOps = k.readyOps();
        // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
        // the NIO JDK channel implementation may throw a NotYetConnectedException.
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
            // See https://github.com/netty/netty/issues/924
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }

        // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
            ch.unsafe().forceFlush();
        }

        // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
        // to a spin loop
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

OP_CONNECT是客戶端事件,OP_WRITE 是我們給channel寫入數(shù)據(jù)時(shí)觸發(fā)的事件(目前我們也沒有注冊(cè) WRITE事件),所以此處我們只分析最后一種情況,也就是 OP_READ | OP_ACCEPT 事件的情況;因此走到 unsafe.read() 方法的調(diào)用。

2、根據(jù)事件獲取信息

首先我們先來看看 channel 的 unsafe.read(), unsafe 有兩種實(shí)現(xiàn)(為了兼容各種數(shù)據(jù)類型,pipeline數(shù)據(jù)傳遞使用了 Object 作為數(shù)據(jù)類型):
NioByteUnsafe # read :此實(shí)現(xiàn)邏輯是通過channel 來讀取數(shù)據(jù)信息,然后在pipeline中傳播 channelRead( Object ) 事件和 channelReadComplete()事件,也就是真正讀取 socket buffer 中的數(shù)據(jù)信息 并從 pipelien 的 head 往 tail 方向傳播讀取的數(shù)據(jù)信息,;
NioMessageUnsafe # read: 此實(shí)現(xiàn)并不是從 channel 中讀取事件信息,而是通過 ServerSocketChannel accept 來獲取 SocketChannel, 并把 SocketChannel 封裝成一個(gè) NioSocketChannel 放入readBbuffer 數(shù)組中,接著再獲取出來,然后同樣在pipeline中傳播 channelRead( Object ) 事件和 channelReadComplete()事件,并從 pipelien 的 head 往 tail 方向傳播獲取的 NioSocketChannel 對(duì)象信息,

而對(duì)于此次 Netty分析,到目前我們只是分析到 Server 端 ParentGroup 啟動(dòng)過程,在上面的 unsafe.read() 方法調(diào)用的其實(shí)是 NioMesaageUnsafe # read 的方法,依據(jù)是:unsafe 是通過調(diào)用 channel#unsafe() 獲取的,而我們此處的 channel 對(duì)應(yīng)的是 NioServerSocketChannel, 而 NioServerSocketChannel 的 unsafe 是在父類 AbstractNioMessageChannel 中創(chuàng)建的,也就是AbstractNioMessageChannel.NioMessageUnsafe 類型的unsafe, 所以調(diào)用鏈為 NioMessageUnsafe#read() --> NioServerSocketChannel#doReadMessages() , 對(duì)應(yīng) NioServerSocketChannel#doReadMessages() 源碼如下(刪減):

@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = SocketUtils.accept(javaChannel());
    try {
        if (ch != null) {
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) { 
    } 
    return 0;
}

以上我們便分析完了信息的獲取邏輯,下面我們分析創(chuàng)建NioSocketChannel 中做了哪些工作;

3、NioSocketChannel 的創(chuàng)建

在上面邏輯中我們已經(jīng)分析了 NioServerSocketChannel 創(chuàng)建的相關(guān)邏輯,其實(shí)NioSocketChannel 的創(chuàng)建同樣也需要完成相關(guān)功能的創(chuàng)建以及初始化工作,下面我們簡(jiǎn)單對(duì)比一下相關(guān)邏輯:

channel.png

從類的關(guān)系圖中可以看出 NioSocketChannel 繼承自 AbstractNioByteChannel , NioServerSocketChannel 繼承自 AbstractNioMessageChannel, 而這兩個(gè) AbstractXXXChannel 都繼承自 AbstractNioChannel;

  • AbstractNioByteChannel : 內(nèi)部維護(hù)了一個(gè) NioByteUnsafe,通過此 unsafe 來為外部提供 read、write 方法的調(diào)用,然后此read/write 方法會(huì)調(diào)用AbstractNioByteChannel 的 doReadBytes / doWriteBytes 方法(抽象方法),而在 NioSocketChannel 類中對(duì)這兩方法進(jìn)行了實(shí)現(xiàn),大概實(shí)現(xiàn)邏輯就是從 SocketChannel 中讀取網(wǎng)絡(luò)傳入的數(shù)據(jù)信息或者寫入數(shù)據(jù)信息;上面說的NioByteUnsafe # read 所提供的功能正是在AbstractNioByteChannel.doReadBytes () 中進(jìn)行實(shí)現(xiàn)的;

  • NioServerSocketChannel: 內(nèi)部維護(hù)了一個(gè) NioMessageUnsafe,同樣通過此 unsafe 來給外部提供 read、write 方法的調(diào)用,不同的是 NioServerSocketChannel 為 unsafe 提供的是 doReadMessages / doWriteMessage 方法,對(duì)于 doReadMessages 方法的功能,正是我們上面說的 NioMessageUnsafe # read 功能,也就是 通過 serverSocket.accept() 獲取到 SocketChannel 對(duì)象的功能,而對(duì)于 doWriteMessage , NioServerSocketChannel 實(shí)現(xiàn)是拋出 UnsupportedOperationException 異常, 也就是不支持調(diào)用;

  • config : NioSocketChannelConfig#NioSocketChannelConfig 中維護(hù)的信息是 NioSocketChannel 和 Socket 信息,NioServerSocketChannel.NioServerSocketChannelConfig維護(hù)的是 NioServerSocketChannel 和 ServerSocket 信息 ;

  • readInterestOp: NioSocketChannel 維護(hù)的值為 OP_READ, NioServerSocketChannel 維護(hù)的值為 OP_ACCEPT。

  • configureBlocking:兩個(gè) Channel 都設(shè)置為 false,也就是都是非阻塞IO,

  • unsafe: 上面我們已經(jīng)分析過了,為子類提供一個(gè) newUnsafe() 方法,NioSocketChannel 提供 NioByteUnsafe 實(shí)現(xiàn),NioServerSocketChannel 提供 NioMessageUnsafe 實(shí)現(xiàn);

  • pipeline:一樣的實(shí)現(xiàn),都會(huì)創(chuàng)建一個(gè) DefaultChannelPipeline 實(shí)例信息;并且同樣完成相關(guān)的初始化工作,包括 head,tail 節(jié)點(diǎn)的初始化等。

以上便是 NioSocketChannel 創(chuàng)建時(shí)會(huì)完成相關(guān)信息的初始化工作(有的是在構(gòu)造器中完成初始化,有的是通過對(duì)象變量進(jìn)行初始化),下面就來看看如何把 NioSocketChannel 注冊(cè)到EventLoopGroup類型的 workerGroup 中的;

4、完成NioSocketChannel 到 EventLoop 的注冊(cè)工作

在上面提到的 ServerBootstrap#init( channel ) 初始化Channel信息時(shí),會(huì)提交一個(gè)任務(wù)到 channel eventLoop 中,此任務(wù)是為 當(dāng)前Pipeline 添加一個(gè) ServerBootstrapAcceptor 類型的Handler信息,也就是為 NioServerSocketChannel 的 pipeline 添加了一個(gè) ServerBootstrapAcceptor 類型的Handler信息,而在 AbstractNioMessageChannel.NioMessageUnsafe#read() 方法中,Netty會(huì)獲取 readBuffer中的信息,并調(diào)用 pipeline.fireChannelRead( Object ) 來把信息傳播給 pipeline 上的handler 去執(zhí)行,源碼如下:

private final class NioMessageUnsafe extends AbstractNioUnsafe {

    private final List<Object> readBuf = new ArrayList<Object>();

    @Override
    public void read() {
        assert eventLoop().inEventLoop();
        final ChannelConfig config = config();
        final ChannelPipeline pipeline = pipeline();
        final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
        allocHandle.reset(config);

        boolean closed = false;
        Throwable exception = null;
        try {
            try {
                do {
                     // 根據(jù) ACCEPT 事件獲取 SocketChannel 并封裝成功 NioSocketChannel 放入 readBuff 中
                    int localRead = doReadMessages(readBuf);
                    if (localRead == 0) {
                        break;
                    }
                    if (localRead < 0) {
                        closed = true;
                        break;
                    }

                    allocHandle.incMessagesRead(localRead);
                } while (allocHandle.continueReading());
            } catch (Throwable t) {
                exception = t;
            }

            int size = readBuf.size();
            for (int i = 0; i < size; i ++) {
                readPending = false;
                // 循環(huán)每一個(gè)事件信息并 傳播 channelRead 方法
                pipeline.fireChannelRead(readBuf.get(i));
            }
            readBuf.clear();
            allocHandle.readComplete();
            pipeline.fireChannelReadComplete();
        } finally {
            if (!readPending && !config.isAutoRead()) {
                removeReadOp();
            }
        }
    }
}

根據(jù)上面的代碼可以看出,最終會(huì)通過pipeline傳播事件獲取的消息,也就是NioScoketChannel 對(duì)象,因此會(huì)調(diào)用到 ServerBootstrap.ServerBootstrapAcceptor#channelRead() 方法,其對(duì)應(yīng)源碼如下:

@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;

    child.pipeline().addLast(childHandler);

    setChannelOptions(child, childOptions, logger);

    for (Entry<AttributeKey<?>, Object> e: childAttrs) {
        child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
    }

    try {
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

以上channelRead(ChannelHandlerContext ctx, Object msg) 方法中,msg信息就是我們提到的 NioSocketChannel 對(duì)象,通過以上代碼,其主要完成以下幾個(gè)功能:

  1. 為 NioSocketChannel pipeline 添加childHandler,也就是我們?cè)趯?server端代碼時(shí)調(diào)用 ServerBootstrap#childHandler(ChannelHandler)配置的 handler 信息;

  2. 為 NioSocketChannel 初始化我們提供的相關(guān)配置,也是我們服務(wù)端編碼時(shí)通過調(diào)用 childOption(ChannelOption<T> childOption, T value)和 childAttr(.AttributeKey<T> childKey, T value) 寫入的配置信息;

  3. 完成 NioSocketChannel 到 childGropu 中 EventLoop 的注冊(cè),具體注冊(cè)邏輯,跟我們上面分析的 NioServerSocketChannel 注冊(cè)到 parentGriup 中的 EventLoop 的邏輯是一樣的,區(qū)別就是興趣事件的綁定,NioSocketChannel 是在注冊(cè)完 EventLoop 時(shí)便已是 active狀態(tài),所以當(dāng)時(shí)就會(huì)通過 pipeline傳播調(diào)用 channelActive() 并在 head 中綁定 readInterestOp (OP_READ)事件到 selector中。而 NioServerSocketChannel 則是在完成端口號(hào)的綁定后才會(huì)提交一個(gè)任務(wù)來傳播 channelActive 進(jìn)行 readInterestOp (OP_ACCEPT)事件綁定 。

流程圖

1、NioEventLoop 到 Pipeline流程圖

image-pipeline.png

END其他流程圖看后續(xù)有沒有時(shí)間補(bǔ)充吧,有建議什么的隨時(shí)留言我會(huì)及時(shí)回復(fù)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容