netty服務端啟動

Netty是基于Nio實現(xiàn)的,所以也離不開selector、serverSocketChannel、socketChannel和selectKey等,只不過Netty把這些實現(xiàn)都封裝在了底層。

來看一個標準的netty程序:

public class EchoServer {
    private final int port;
    public EchoServer(int port) {
        this.port = port;
    }

    public void run() throws Exception {
        // Configure the server.
        EventLoopGroup bossGroup = new NioEventLoopGroup();  // (1)
        EventLoopGroup workerGroup = new NioEventLoopGroup();  
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class) // (3)
             .option(ChannelOption.SO_BACKLOG, 100)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                 @Override
                 public void initChannel(SocketChannel ch) throws Exception {
                     ch.pipeline().addLast(
                             //new LoggingHandler(LogLevel.INFO),
                             new EchoServerHandler());
                 }
             });

            // Start the server.
            ChannelFuture f = b.bind(port).sync(); // (5)

            // Wait until the server socket is closed.
            f.channel().closeFuture().sync();
        } finally {
            // Shut down all event loops to terminate all threads.
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new EchoServer(port).run();
    }
}

EchoServerHandler 實現(xiàn)

public class EchoServerHandler extends ChannelInboundHandlerAdapter {  
  
    private static final Logger logger = Logger.getLogger(  
            EchoServerHandler.class.getName());  
  
    @Override  
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {  
        ctx.write(msg);  
    }  
  
    @Override  
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {  
        ctx.flush();  
    }  
  
    @Override  
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {  
        // Close the connection when an exception is raised.  
        logger.log(Level.WARNING, "Unexpected exception from downstream.", cause);  
        ctx.close();  
    }  
}  

和客戶端的代碼相比, 沒有很大的差別, 基本上也是進行了如下幾個部分的初始化:

EventLoopGroup: 不論是服務器端還是客戶端, 都必須指定 EventLoopGroup. 在這個例子中, 指定了 NioEventLoopGroup, 表示一個 NIO 的EventLoopGroup, 不過服務器端需要指定兩個 EventLoopGroup, 一個是 bossGroup, 用于處理客戶端的連接請求; 另一個是 workerGroup, 用于處理與各個客戶端連接的 IO 操作.

  • ChannelType: 指定 Channel 的類型. 因為是服務器端, 因此使用了NioServerSocketChannel.
  • Handler: 設(shè)置數(shù)據(jù)的處理器.

在客戶端中, Channel 的類型其實是在初始化時, 通過 Bootstrap.channel() 方法設(shè)置的, 服務器端自然也不例外.
在服務器端, 我們調(diào)用了 ServerBootstarap.channel(NioServerSocketChannel.class), 傳遞了一個 NioServerSocketChannel Class 對象. 這樣的話, 按照和分析客戶端代碼一樣的流程, 我們就可以確定, NioServerSocketChannel 的實例化是通過 BootstrapChannelFactory 工廠類來完成的, 而 BootstrapChannelFactory 中的 clazz 字段被設(shè)置為了 NioServerSocketChannel.class, 因此當調(diào)用 BootstrapChannelFactory.newChannel() 時:

@Override
public T newChannel() {
    // 刪除 try 塊
    return clazz.newInstance();
}

就獲取到了一個 NioServerSocketChannel 的實例.

最后我們也來總結(jié)一下:

  • ServerBootstrap 中的 ChannelFactory 的實現(xiàn)是 BootstrapChannelFactory

  • 生成的 Channel 的具體類型是 NioServerSocketChannel.
    Channel 的實例化過程, 其實就是調(diào)用的 ChannelFactory.newChannel 方法, 而實例化的 Channel 的具體的類型又是和在初始化 ServerBootstrap 時傳入的 channel() 方法的參數(shù)相關(guān). 因此對于我們這個例子中的服務器端的 ServerBootstrap 而言, 生成的的 Channel 實例就是 NioServerSocketChannel.

NioServerSocketChannel 的實例化過程
下面是 NioServerSocketChannel 的類層次結(jié)構(gòu)圖:

image.png

首先, 我們來看一下它的默認的構(gòu)造器. 和 NioSocketChannel 類似, 構(gòu)造器都是調(diào)用了 newSocket 來打開一個 Java 的 NIO Socket, 不過需要注意的是, 客戶端的 newSocket 調(diào)用的是 openSocketChannel, 而服務器端的 newSocket 調(diào)用的是 openServerSocketChannel. 顧名思義, 一個是客戶端的 Java SocketChannel, 一個是服務器端的 Java ServerSocketChannel.

private static ServerSocketChannel newSocket(SelectorProvider provider) {
    return provider.openServerSocketChannel();
}

public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

接下來會調(diào)用重載的構(gòu)造器:

public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

這個構(gòu)造其中, 調(diào)用父類構(gòu)造器時, 傳入的參數(shù)是 SelectionKey.OP_ACCEPT. 作為對比, 我們回想一下, 在客戶端的 Channel 初始化時, 傳入的參數(shù)是 SelectionKey.OP_READ. 有 Java NIO Socket 開發(fā)經(jīng)驗的朋友就知道了, Java NIO 是一種 Reactor 模式, 我們通過 selector 來實現(xiàn) I/O 的多路復用復用. 在一開始時, 服務器端需要監(jiān)聽客戶端的連接請求, 因此在這里我們設(shè)置了 SelectionKey.OP_ACCEPT, 即通知 selector 我們對客戶端的連接請求感興趣.

接著和客戶端的分析一下, 會逐級地調(diào)用父類的構(gòu)造器 NioServerSocketChannel <- AbstractNioMessageChannel <- AbstractNioChannel <- AbstractChannel.
同樣的, 在 AbstractChannel 中會實例化一個 unsafe 和 pipeline:

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    unsafe = newUnsafe();
    pipeline = new DefaultChannelPipeline(this);
}

不過, 這里有一點需要注意的是, 客戶端的 unsafe 是一個 AbstractNioByteChannel#NioByteUnsafe 的實例, 而在服務器端時, 因為 AbstractNioMessageChannel 重寫了newUnsafe 方法:

@Override
protected AbstractNioUnsafe newUnsafe() {
    return new NioMessageUnsafe();
}

因此在服務器端, unsafe 字段其實是一個 AbstractNioMessageChannel#AbstractNioUnsafe 的實例.
我們來總結(jié)一下, 在 NioServerSocketChannsl 實例化過程中, 所需要做的工作:

調(diào)用 NioServerSocketChannel.newSocket(DEFAULT_SELECTOR_PROVIDER) 打開一個新的 Java NIO ServerSocketChannel

AbstractChannel(Channel parent) 中初始化 AbstractChannel 的屬性:

     parent 屬性置為 null

    unsafe 通過newUnsafe() 實例化一個 unsafe 對象, 它的類型是 AbstractNioMessageChannel#AbstractNioUnsafe 內(nèi)部類

     pipeline 是 new DefaultChannelPipeline(this) 新創(chuàng)建的實例.

AbstractNioChannel 中的屬性:

     SelectableChannel ch 被設(shè)置為 Java ServerSocketChannel, 即 NioServerSocketChannel#newSocket 返回的 Java NIO ServerSocketChannel.

     readInterestOp 被設(shè)置為 SelectionKey.OP_ACCEPT

    SelectableChannel ch 被配置為非阻塞的 ch.configureBlocking(false)

NioServerSocketChannel 中的屬性:

     ServerSocketChannelConfig config = new NioServerSocketChannelConfig(this, javaChannel().socket())
Channel 的注冊

服務器端和客戶端的 Channel 的注冊過程一致, 因此就不再單獨分析了.

關(guān)于 bossGroup 與 workerGroup

可以看出一切從ServerBootstrap開始,ServerBootstrap實例中需要兩個NioEventLoopGroup實例,按照職責劃分成boss和work,有著不同的分工:

  • boss負責請求的accept
  • work負責請求的read、write

NioEventLoopGroup

NioEventLoopGroup主要管理eventLoop的生命周期。在客戶端的時候, 我們只提供了一個 EventLoopGroup 對象, 而在服務器端的初始化時, 我們設(shè)置了兩個 EventLoopGroup, 一個是 bossGroup, 另一個是 workerGroup. 那么這兩個 EventLoopGroup 都是干什么用的呢? bossGroup 是用于服務端 的 accept 的, 即用于處理客戶端的連接請求. 我們可以把 Netty 比作一個飯店, bossGroup 就像一個像一個前臺接待, 當客戶來到飯店吃時, 接待員就會引導顧客就坐, 為顧客端茶送水等. 而 workerGroup, 其實就是實際上干活的啦, 它們負責客戶端連接通道的 IO 操作: 當接待員 招待好顧客后, 就可以稍做休息, 而此時后廚里的廚師們(workerGroup)就開始忙碌地準備飯菜了.
關(guān)于 bossGroup 與 workerGroup 的關(guān)系, 我們可以用如下圖來展示:

image.png
image.png

首先, 服務器端 bossGroup 不斷地監(jiān)聽是否有客戶端的連接, 當發(fā)現(xiàn)有一個新的客戶端連接到來時, bossGroup 就會為此連接初始化各項資源, 然后從 workerGroup 中選出一個 EventLoop 綁定到此客戶端連接中. 那么接下來的服務器與客戶端的交互過程就全部在此分配的 EventLoop 中了.
首先在ServerBootstrap 初始化時, 調(diào)用了 b.group(bossGroup, workerGroup) 設(shè)置了兩個 EventLoopGroup, 我們跟蹤進去看一下:

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
    super.group(parentGroup);
    ...
    this.childGroup = childGroup;
    return this;
}

顯然, 這個方法初始化了兩個字段, 一個是 group = parentGroup, 它是在 super.group(parentGroup) 中初始化的, 另一個是 childGroup = childGroup. 接著我們啟動程序調(diào)用了 b.bind 方法來監(jiān)聽一個本地端口. bind 方法會觸發(fā)如下的調(diào)用鏈:

AbstractBootstrap.bind -> AbstractBootstrap.doBind -> AbstractBootstrap.initAndRegister

final ChannelFuture initAndRegister() {
    final Channel channel = channelFactory().newChannel();
    ... 省略異常判斷
    init(channel);
    ChannelFuture regFuture = group().register(channel);
    return regFuture;
}

這里 group() 方法返回的是上面我們提到的 bossGroup, 而這里的 channel 我們也已經(jīng)分析過了, 它是一個是一個 NioServerSocketChannsl 實例, 因此我們可以知道, group().register(channel) 將 bossGroup 和 NioServerSocketChannsl 關(guān)聯(lián)起來了.
那么 workerGroup 是在哪里與 NioSocketChannel 關(guān)聯(lián)的呢?
我們繼續(xù)看 init(channel) 方法:

@Override
void init(Channel channel) throws Exception {
    ...
    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }
            pipeline.addLast(new ServerBootstrapAcceptor(
                    currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
        }
    });
}

init 方法在 ServerBootstrap 中重寫了, 從上面的代碼片段中我們看到, 它為 pipeline 中添加了一個 ChannelInitializer, 而這個 ChannelInitializer 中添加了一個關(guān)鍵的 ServerBootstrapAcceptor handler. 我們看一下 ServerBootstrapAcceptor 類.
ServerBootstrapAcceptor 中重寫了 channelRead 方法, 其主要代碼如下:

@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;
    child.pipeline().addLast(childHandler);
    ...
    childGroup.register(child).addListener(...);
}

ServerBootstrapAcceptor 中的 childGroup 是構(gòu)造此對象是傳入的 currentChildGroup, 即我們的 workerGroup, 而 Channel 是一個 NioSocketChannel 的實例, 因此這里的 childGroup.register 就是將 workerGroup 中的 EventLoop 和 NioSocketChannel 關(guān)聯(lián)了. 既然這樣, 那么現(xiàn)在的問題是, ServerBootstrapAcceptor.channelRead 方法是怎么被調(diào)用的呢? 其實當一個 client 連接到 server 時, Java 底層的 NIO ServerSocketChannel 會有一個 SelectionKey.OP_ACCEPT 就緒事件, 接著就會調(diào)用到 NioServerSocketChannel.doReadMessages:

@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = javaChannel().accept();
    ... 省略異常處理
    buf.add(new NioSocketChannel(this, ch));
    return 1;
}

在 doReadMessages 中, 通過 javaChannel().accept() 獲取到客戶端新連接的 SocketChannel, 接著就實例化一個 NioSocketChannel, 并且傳入 NioServerSocketChannel 對象(即 this), 由此可知, 我們創(chuàng)建的這個 NioSocketChannel 的父 Channel 就是 NioServerSocketChannel 實例 .
接下來就經(jīng)由 Netty 的 ChannelPipeline 機制, 將讀取事件逐級發(fā)送到各個 handler 中, 于是就會觸發(fā)前面我們提到的 ServerBootstrapAcceptor.channelRead 方法啦.

我們再來看看NioEventLoopGroup構(gòu)造方法:

public NioEventLoopGroup() {  
    this(0);  
}  
  
public NioEventLoopGroup(int nThreads) {  
    this(nThreads, null); 
}  
  
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {  
    this(nThreads, threadFactory, SelectorProvider.provider());  
}  
  
public NioEventLoopGroup(  
            int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {  
    super(nThreads, threadFactory, selectorProvider);  
}  

MultithreadEventLoopGroup是NioEventLoopGroup的父類,構(gòu)造方法:

protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {  
    super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);  
}  

其中 DEFAULT_EVENT_LOOP_THREADS 為處理器數(shù)量的兩倍。
MultithreadEventExecutorGroup是核心,管理eventLoop的生命周期,先看看其中幾個變量。
1、children:EventExecutor數(shù)組,保存eventLoop。
2、chooser:從children中選取一個eventLoop的策略。

protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
    if (nThreads <= 0) {
        throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
    }

    if (threadFactory == null) {
        threadFactory = newDefaultThreadFactory();
    }
// 根據(jù)數(shù)組的大小,采用不同策略初始化chooser,如果大小為2的冪次方,則采用PowerOfTwoEventExecutorChooser,否則使用GenericEventExecutorChooser。

    children = new SingleThreadEventExecutor[nThreads];
    if (isPowerOfTwo(children.length)) {
        chooser = new PowerOfTwoEventExecutorChooser();
    } else {
        chooser = new GenericEventExecutorChooser();
    }

    for (int i = 0; i < nThreads; i ++) {
        boolean success = false;
        try {
            children[i] = newChild(threadFactory, args);
            success = true;
        } catch (Exception e) {
            // TODO: Think about if this is a good exception type
            throw new IllegalStateException("failed to create a child event loop", e);
        } finally {
            if (!success) {
                for (int j = 0; j < i; j ++) {
                    children[j].shutdownGracefully();
                }

                for (int j = 0; j < i; j ++) {
                    EventExecutor e = children[j];
                    try {
                        while (!e.isTerminated()) {
                            e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
                        }
                    } catch (InterruptedException interrupted) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }
        }
    }

    final FutureListener<Object> terminationListener = new FutureListener<Object>() {
        @Override
        public void operationComplete(Future<Object> future) throws Exception {
            if (terminatedChildren.incrementAndGet() == children.length) {
                terminationFuture.setSuccess(null);
            }
        }
    };

    for (EventExecutor e: children) {
        e.terminationFuture().addListener(terminationListener);
    }
}

 protected EventExecutor newChild(  
            ThreadFactory threadFactory, Object... args) throws Exception {  
      return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);  
}  

1、根據(jù)數(shù)組的大小,采用不同策略初始化chooser,如果大小為2的冪次方,則采用PowerOfTwoEventExecutorChooser,否則使用GenericEventExecutorChooser。
其中判斷一個數(shù)是否是2的冪次方的方法很有技巧:

private static boolean isPowerOfTwo(int val) {
      return (val & -val) == val;
}

2、newChild方法重載,初始化EventExecutor時,實際執(zhí)行的是NioEventLoopGroup中的newChild方法,所以children元素的實際類型為NioEventLoop。

接下去看看NioEventLoop類。

每個eventLoop會維護一個selector和taskQueue,負責處理客戶端請求和內(nèi)部任務,如ServerSocketChannel注冊和ServerSocket綁定等。

image.png
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {  
      super(parent, threadFactory, false);  
      if (selectorProvider == null) {  
          throw new NullPointerException("selectorProvider");  
      }  
      provider = selectorProvider;  
      selector = openSelector();  
}

當看到 selector = openSelector() 時,有沒有覺得親切了許多??纯碨ingleThreadEventLoop類。它是NioEventLoop的父類,

protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
    super(parent, threadFactory, addTaskWakesUp);
}

繼續(xù)看SingleThreadEventLoop的父類SingleThreadEventExecutor

從類名上可以看出,這是一個只有一個線程的線程池, 先看看其中的幾個變量:
1、state:線程池當前的狀態(tài)
2、taskQueue:存放任務的隊列
3、thread:線程池維護的唯一線程
4、scheduledTaskQueue:定義在其父類AbstractScheduledEventExecutor中,用以保存延遲執(zhí)行的任務。

protected SingleThreadEventExecutor(EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
    if (threadFactory == null) {
        throw new NullPointerException("threadFactory");
    }
    this.parent = parent;
    this.addTaskWakesUp = addTaskWakesUp;

    thread = threadFactory.newThread(new Runnable() {
        @Override
        public void run() {
            boolean success = false;
            updateLastExecutionTime();
            try {
                SingleThreadEventExecutor.this.run();
                success = true;
            } catch (Throwable t) {
                logger.warn("Unexpected exception from an event executor: ", t);
            } finally {
                for (;;) {
                    int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
                    if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                            SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                        break;
                    }
                }
                // Check if confirmShutdown() was called at the end of the loop.
                if (success && gracefulShutdownStartTime == 0) {
                    logger.error(
                            "Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                            SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                            "before run() implementation terminates.");
                }

                try {
                    // Run all remaining tasks and shutdown hooks.
                    for (;;) {
                        if (confirmShutdown()) {
                            break;
                        }
                    }
                } finally {
                    try {
                        cleanup();
                    } finally {
                        STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                        threadLock.release();
                        if (!taskQueue.isEmpty()) {
                            logger.warn(
                                    "An event executor terminated with " +
                                    "non-empty task queue (" + taskQueue.size() + ')');
                        }

                        terminationFuture.setSuccess(null);
                    }
                }
            }
        }
    });
    threadProperties = new DefaultThreadProperties(thread);
    taskQueue = newTaskQueue();
}

代碼很長,內(nèi)容很簡單:
1、初始化一個線程,并在線程內(nèi)部執(zhí)行NioEventLoop類的run方法,當然這個線程不會立刻執(zhí)行。
2、使用LinkedBlockingQueue類初始化taskQueue。

ServerBootstrap

通過serverBootstrap.bind(port)啟動服務,過程如下:

/**
 * Create a new {@link Channel} and bind it.
 */
public ChannelFuture bind() {
    validate();
    SocketAddress localAddress = this.localAddress;
    if (localAddress == null) {
       throw new IllegalStateException("localAddress not set");
    }
    return doBind(localAddress);
 }
image.png
image.png
final ChannelFuture initAndRegister() {
    final Channel channel = channelFactory().newChannel();
    try {
        init(channel);
    } catch (Throwable t) {
        channel.unsafe().closeForcibly();
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    ChannelFuture regFuture = group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}

1、負責創(chuàng)建服務端的NioServerSocketChannel實例
2、為NioServerSocketChannel的pipeline添加handler
3、注冊NioServerSocketChannel到selector

NioServerSocketChannel

對Nio的ServerSocketChannel和SelectionKey進行了封裝。

public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}

private static ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
        return provider.openServerSocketChannel();
    } catch (IOException e) {
        throw new ChannelException(
                "Failed to open a server socket.", e);
    }
}

public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}

1、方法newSocket利用 provider.openServerSocketChannel() 生成Nio中的ServerSocketChannel對象。
2、設(shè)置SelectionKey.OP_ACCEPT事件。

父類AbstractNioMessageChannel構(gòu)造方法

protected  AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent, ch, readInterestOp);
}
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    try {
        ch.configureBlocking(false);
    } catch (IOException e) {
        try {
            ch.close();
        } catch (IOException e2) {
            if (logger.isWarnEnabled()) {
                logger.warn(
                        "Failed to close a partially initialized socket.", e2);
            }
        }

        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}

設(shè)置當前ServerSocketChannel為非阻塞通道。

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    unsafe = newUnsafe();
    pipeline = new DefaultChannelPipeline(this);
}

1、初始化unsafe,這里的Unsafe并非是jdk中底層Unsafe類,用來負責底層的connect、register、read和write等操作。
2、初始化pipeline,每個Channel都有自己的pipeline,當有請求事件發(fā)生時,pipeline負責調(diào)用相應的hander進行處理。

回到ServerBootstrap的init(Channel channel)方法,添加handler到channel的pipeline中。

void init(Channel channel) throws Exception {
    final Map<ChannelOption<?>, Object> options = options();
    synchronized (options) {
        channel.config().setOptions(options);
    }

    final Map<AttributeKey<?>, Object> attrs = attrs();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }

    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }
            pipeline.addLast(new ServerBootstrapAcceptor(
                    currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
        }
    });
}

1、設(shè)置channel的options和attrs。
2、在pipeline中添加一個ChannelInitializer對象。

init執(zhí)行完,需要把當前channel注冊到EventLoopGroup。其實最終目的是為了實現(xiàn)Nio中把ServerSocket注冊到selector上,這樣就可以實現(xiàn)client請求的監(jiān)聽了

public ChannelFuture register(Channel channel, ChannelPromise promise) {
    return next().register(channel, promise);
}

public EventLoop next() {
    return (EventLoop) super.next();
}

public EventExecutor next() {
    return children[Math.abs(childIndex.getAndIncrement() % children.length)];
}

因為EventLoopGroup中維護了多個eventLoop,next方法會調(diào)用chooser策略找到下一個eventLoop,并執(zhí)行eventLoop的register方法進行注冊。

public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
    ...
    channel.unsafe().register(this, promise);
    return promise;
}

channel.unsafe()是什么?
NioServerSocketChannel初始化時,會創(chuàng)建一個NioMessageUnsafe實例,用于實現(xiàn)底層的register、read、write等操作。

eventLoop.execute(new Runnable() {
   @Override
   public void run() {
      register0(promise);
   }
});

private void register0(ChannelPromise promise) {
    try {
        if (!ensureOpen(promise)) {
            return;
        }
        Runnable postRegisterTask = doRegister();
        registered = true;
        promise.setSuccess();
        pipeline.fireChannelRegistered();
        if (postRegisterTask != null) {
            postRegisterTask.run();
        }
        if (isActive()) {
            pipeline.fireChannelActive();
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        if (!promise.tryFailure(t)) {
            
        }
        closeFuture.setClosed();
    }
}

public void execute(Runnable task) {
    if (task == null) {
        throw new NullPointerException("task");
    }

    boolean inEventLoop = inEventLoop();
    if (inEventLoop) {
        addTask(task);
    } else {
        startThread();
        addTask(task);
        if (isShutdown() && removeTask(task)) {
            reject();
        }
    }

    if (!addTaskWakesUp) {
        wakeup(inEventLoop);
    }
}

1、register0方法提交到eventLoop線程池中執(zhí)行,這個時候會啟動eventLoop中的線程。
2、方法doRegister()才是最終Nio中的注冊方法,方法javaChannel()獲取ServerSocketChannel。

protected Runnable doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().selector, 0, this);
            return null;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now  as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}

ServerSocketChannel注冊完之后,通知pipeline執(zhí)行fireChannelRegistered方法,pipeline中維護了handler鏈表,通過遍歷鏈表,執(zhí)行InBound類型handler的channelRegistered方法,最終執(zhí)行init中添加的ChannelInitializer handler。

 public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        initChannel((C) ctx.channel());
        ctx.pipeline().remove(this);
        ctx.fireChannelRegistered();
    }

1、initChannel方法最終把ServerBootstrapAcceptor添加到ServerSocketChannel的pipeline,負責accept客戶端請求。
2、在pipeline中刪除對應的handler。
3、觸發(fā)fireChannelRegistered方法,可以自定義handler的channelRegistered方法。

到目前為止,ServerSocketChannel完成了初始化并注冊到seletor上,啟動線程執(zhí)行selector.select()方法準備接受客戶端請求。

,ServerSocketChannel的socket還未綁定到指定端口,那么這一塊Netty是如何實現(xiàn)的? Netty把注冊操作放到eventLoop中執(zhí)行。

private static void doBind0(
        final ChannelFuture regFuture, 
        final Channel channel,
        final SocketAddress localAddress, 
        final ChannelPromise promise) {
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                channel.bind(localAddress, promise)
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return pipeline.bind(localAddress, promise);
}

@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return tail.bind(localAddress, promise);
}


@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    if (localAddress == null) {
        throw new NullPointerException("localAddress");
    }
    validatePromise(promise, false);
    return findContextOutbound().invokeBind(localAddress, promise);
}

private ChannelFuture invokeBind(final SocketAddress localAddress, final ChannelPromise promise) {
    EventExecutor executor = executor();
    if (executor.inEventLoop()) {
        invokeBind0(localAddress, promise);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                invokeBind0(localAddress, promise);
            }
        });
    }
    return promise;
}

private void invokeBind0(SocketAddress localAddress, ChannelPromise promise) {
    try {
        ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise);
    } catch (Throwable t) {
        notifyOutboundHandlerException(t, promise);
    }
}

@Override
public void bind(
        ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise)
        throws Exception {
    unsafe.bind(localAddress, promise);
}

最終由unsafe實現(xiàn)端口的bind操作。

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
        if (!ensureOpen(promise)) {
            return;
        }

        try {
            boolean wasActive = isActive();
            ...        
            doBind(localAddress);
            promise.setSuccess();
            if (!wasActive && isActive()) {
                pipeline.fireChannelActive();
            }
        } catch (Throwable t) {
            promise.setFailure(t);
            closeIfClosed();
        }
    }

protected void doBind(SocketAddress localAddress) throws Exception {
    javaChannel().socket().bind(localAddress, config.getBacklog());
}

bind完成后,且ServerSocketChannel也已經(jīng)注冊完成,則觸發(fā)pipeline的fireChannelActive方法,所以在這里可以自定義fireChannelActive方法,默認執(zhí)行tail的fireChannelActive。

@Override
public ChannelPipeline fireChannelActive() {
    head.fireChannelActive();

    if (channel.config().isAutoRead()) {
        channel.read();
    }

    return this;
}

channel.read()方法會觸發(fā)pipeline的行為:

@Override
public Channel read() {
    pipeline.read();
    return this;
}

@Override
public ChannelPipeline read() {
    tail.read();
    return this;
}

@Override
public ChannelHandlerContext read() {
    findContextOutbound().invokeRead();
    return this;
}

private void invokeRead() {
    EventExecutor executor = executor();
    if (executor.inEventLoop()) {
        invokeRead0();
    } else {
        Runnable task = invokeRead0Task;
        if (task == null) {
            invokeRead0Task = task = new Runnable() {
                @Override
                public void run() {
                    invokeRead0();
                }
            };
        }
        executor.execute(task);
    }
}

private void invokeRead0() {
    try {
        ((ChannelOutboundHandler) handler()).read(this);
    } catch (Throwable t) {
        notifyHandlerException(t);
    }
}

handler 的添加過程

和 EventLoopGroup 一樣, 服務器端的 handler 也有兩個, 一個是通過 handler() 方法設(shè)置 handler 字段, 另一個是通過 childHandler() 設(shè)置 childHandler 字段. 通過前面的 bossGroup 和 workerGroup 的分析, 其實我們在這里可以大膽地猜測: handler 字段與 accept 過程有關(guān), 即這個 handler 負責處理客戶端的連接請求; 而 childHandler 就是負責和客戶端的連接的 IO 交互.

在 上面 bossGroup 與 workerGroup , 我們提到, ServerBootstrap 重寫了 init 方法, 在這個方法中添加了 handler:

@Override
void init(Channel channel) throws Exception {
    ...
    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }
            pipeline.addLast(new ServerBootstrapAcceptor(
                    currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
        }
    });
}

上面代碼的 initChannel 方法中, 首先通過 handler() 方法獲取一個 handler, 如果獲取的 handler 不為空,則添加到 pipeline 中. 然后接著, 添加了一個 ServerBootstrapAcceptor 實例. 那么這里 handler() 方法返回的是哪個對象呢? 其實它返回的是 handler 字段, 而這個字段就是我們在服務器端的啟動代碼中設(shè)置的:

b.group(bossGroup, workerGroup)
 ...
 .handler(new LoggingHandler(LogLevel.INFO))

那么這個時候, pipeline 中的 handler 情況如下:

image.png

根據(jù)我們原來分析客戶端的經(jīng)驗, 我們指定, 當 channel 綁定到 eventLoop 后(在這里是 NioServerSocketChannel 綁定到 bossGroup)中時, 會在 pipeline 中發(fā)出 fireChannelRegistered 事件, 接著就會觸發(fā) ChannelInitializer.initChannel 方法的調(diào)用.

 private void register0(ChannelPromise promise) {
            try {
            ...
                boolean firstRegistration = neverRegistered;
                doRegister();
                neverRegistered = false;
                registered = true;
                safeSetSuccess(promise);
                pipeline.fireChannelRegistered(); // 這里觸發(fā)
                // Only fire a channelActive if the channel has never been registered. This prevents firing
                // multiple channel actives if the channel is deregistered and re-registered.
                if (firstRegistration && isActive()) {
                    pipeline.fireChannelActive();
                }

當 channel 綁定到 eventLoop 后(在這里是 NioServerSocketChannel 綁定到 bossGroup)中時, 會在 pipeline 中發(fā)出 fireChannelRegistered 事件, 接著就會觸發(fā) ChannelInitializer.initChannel 方法的調(diào)用.

 public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        initChannel((C) ctx.channel());
        ctx.pipeline().remove(this);
        ctx.fireChannelRegistered();
    }

因此在綁定完成后, 此時的 pipeline 的內(nèi)如如下:

image.png

前面我們在分析 bossGroup 和 workerGroup 時, 已經(jīng)知道了在 ServerBootstrapAcceptor.channelRead 中會為新建的 Channel 設(shè)置 handler 并注冊到一個 eventLoop 中, 即:

@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;
    child.pipeline().addLast(childHandler);
    ...
    childGroup.register(child).addListener(...);
}

而這里的 childHandler 就是我們在服務器端啟動代碼中設(shè)置的 handler:

b.group(bossGroup, workerGroup)
 ...
 .childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         if (sslCtx != null) {
             p.addLast(sslCtx.newHandler(ch.alloc()));
         }
         //p.addLast(new LoggingHandler(LogLevel.INFO));
         p.addLast(new EchoServerHandler());
     }
 });

, 當這個客戶端連接 Channel 注冊后, 就會觸發(fā) ChannelInitializer.initChannel 方法的調(diào)用, 此后的客戶端連接的 ChannelPipeline 狀態(tài)如下:


image.png

最后我們來總結(jié)一下服務器端的 handler 與 childHandler 的區(qū)別與聯(lián)系:

  • 在服務器 NioServerSocketChannel 的 pipeline 中添加的是 handler 與 ServerBootstrapAcceptor.
  • 當有新的客戶端連接請求時, ServerBootstrapAcceptor.channelRead 中負責新建此連接的 NioSocketChannel 并添加 childHandler 到 NioSocketChannel 對應的 pipeline 中, 并將此 channel 綁定到 workerGroup 中的某個 eventLoop 中.
  • handler 是在 accept 階段起作用, 它處理客戶端的連接請求.
  • childHandler 是在客戶端連接建立以后起作用, 它負責客戶端連接的 IO 交互.


    image.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容