Netty服務(wù)啟動流程分析

服務(wù)啟動

服務(wù)啟動可分為以下幾步:
①:Boostrap.bind()
②:創(chuàng)建NioServerSocketChannel
③:將NioServerSocketChannel注冊到EventLoopGroup(這里的EventLoopGroup指的是我們前面服務(wù)創(chuàng)建時指定的bossGroup)
下面結(jié)合源碼進行分析:

ServerBoostrap-bind源碼分析
//bind(port)最終調(diào)用doBind方法
public ChannelFuture bind(int inetPort) {
    return bind(new InetSocketAddress(inetPort));
}
//在doBind方法,主要看其第一行代碼,intAndRegister方法:初始化NioServerSocketChannel并注冊
private ChannelFuture doBind(final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    if (regFuture.cause() != null) {
        return regFuture;
    }
    .....
}
/**
 * 服務(wù)端NioServerSocketChannel的創(chuàng)建、初始化、注冊
 * @return
 */
final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        //這里的newChannel其實就是對我們之前channel方法傳入的class進行實例化,即NioServerSocketChannel的創(chuàng)建
        channel = channelFactory().newChannel();
        //NioServerSocketChannle的初始化
        init(channel);
    } catch (Throwable t) {
        if (channel != null) {
            channel.unsafe().closeForcibly();
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    }
    //服務(wù)端Channel-NioServerSocketChannel的注冊到EventLoop上
    ChannelFuture regFuture = group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}
NioServerSocketChannel的創(chuàng)建
/**
* NioServerSocketChannel的創(chuàng)建,服務(wù)端socketChannel的創(chuàng)建包含的內(nèi)容:
*       實例化的時候注冊O(shè)P_Accept事件
*       默認pipeline由DefaultChannelPipeline提供,其默認只包含HeadContext和TailContext兩個不含ChannelHandler的AbstratChannelHandlerContext組成。
*/
/**
 * NioServerSocketChannel默認的構(gòu)造函數(shù)
 */
public NioServerSocketChannel() {
    this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
/**
 * 最終調(diào)用的構(gòu)造器函數(shù),可發(fā)現(xiàn)其①首先注冊了OP_Accept事件,具體的操作發(fā)生在父類AbstractNioChannel的構(gòu)造函數(shù)中
 * 而創(chuàng)建默認的Pipeline的動作則發(fā)生在父類AbstractChannel的構(gòu)造函數(shù)中
 * ②然后初始化了一些服務(wù)端的基本配置
 */
public NioServerSocketChannel(ServerSocketChannel channel) {
    super(null, channel, SelectionKey.OP_ACCEPT);
    config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
//AbstractNioChannel父類中:Channel感興趣的事件
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;//感興趣的事件,后面,將Channel注冊到EventLoop上的時候會用到
    try {
        ch.configureBlocking(false);
    } catch (IOException e) {
        try {
            ch.close();
        } catch (IOException e2) {
            if (logger.isWarnEnabled()) {
                logger.warn(
                        "Failed to close a partially initialized socket.", e2);
            }
        }
        throw new ChannelException("Failed to enter non-blocking mode.", e);
    }
}
//AbstractChannel父類中:創(chuàng)建默認的Pipeline
protected AbstractChannel(Channel parent) {
    this.parent = parent;
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}
protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}

NioServerSocketChannel的初始化

NioServerSocketChannel的初始化,除了初始化一些參數(shù)外,還為pipeline添加一個ChannelInitializer的handler,這個特殊的handler會在channel register的時候被調(diào)用,然后回調(diào)initChannel方法,回調(diào)完成之后,會將當前channel從所屬的pipeline中移除。

NioServerSocketChannel的初始化
//ServerBoostrap.init方法
//這里需重點關(guān)注最后為NioServerSocketChannel添加的Channelhandler,當OP_Accept事件發(fā)生的時候,NioSocketChannel的初始化會調(diào)用這里
void init(Channel channel) throws Exception {
    final Map<ChannelOption<?>, Object> options = options();
    synchronized (options) {
        setChannelOptions(channel, options, logger);
    }
 
    final Map<AttributeKey<?>, Object> attrs = attrs();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }
 
    ChannelPipeline p = channel.pipeline();
 
    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions;
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }
    //重點
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) throws Exception {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = handler();
            //將我們之前添加的ChannelHanndler添加到SocketChannel的初始化中
            if (handler != null) {
                pipeline.addLast(handler);
            }
 
            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}

上述初始化代碼中為pipeline添加了一個重要的類 ServerBootstrapAcceptor 這個類是專門負責處理IO監(jiān)聽事件的。它的里面提供了一個重要的方法channelRead,這個方法做了兩件事:

  1. 為接收到的Channel添加我們在服務(wù)啟動前設(shè)置的自定義的handler。
  2. 將接收到的channel注冊到childGroup所在的EventLoopGroup中
NioServerSocketChannel的ChannelHandler
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
    //省略部分代碼...
    @Override
    @SuppressWarnings("unchecked")
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        final Channel child = (Channel) msg;
        child.pipeline().addLast(childHandler);
        setChannelOptions(child, childOptions, logger);
        for (Entry<AttributeKey<?>, Object> e: childAttrs) {
            child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
        }
        try {
            childGroup.register(child).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        forceClose(child, future.cause());
                    }
                }
            });
        } catch (Throwable t) {
            forceClose(child, t);
        }
    }
    //省略部分代碼...
}

NioServerSocketChannel的注冊

/**
 * NioServerSocketChannel的注冊,這里的group是我們創(chuàng)建ServerBoostrap傳入的第一個group即bossGroup
 */
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
  
    //next方法用于順序返回group中的一個EventLoop,因為我們創(chuàng)建bossGroup的時候只指定了一個EventLoop,所以其實這里只有一個Eventloop可供選擇
    @Override
    public EventLoop next() {
        return (EventLoop) super.next();
    }
    @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }
}
//最終的注冊會調(diào)用channel自身的register方法
@Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
    ....
    channel.unsafe().register(this, promise);
    return promise;
}
//看一下channel自身的register方法,發(fā)現(xiàn)其會判斷調(diào)用當前register的線程是否就是支撐當前EventLoop的線程
//如果是,則直接進行register,否則封裝成一個任務(wù),等待EventLoop之后進行調(diào)度執(zhí)行
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    AbstractChannel.this.eventLoop = eventLoop;
    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        try {
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}
//在register0方法中,channel將自身注冊到了EventLoop上Selector上,只不過注冊的ops是0,表示只注冊,不監(jiān)聽任何事件
//而真正的注冊O(shè)P_Accept事件發(fā)生在,java原生的ServerSocketChannel綁定了localAddress之后,
public final void bind(final SocketAddress localAddress, final ChannelPromise){
 
    boolean wasActive = isActive();
    try {
        //java原生接口ServerSocketChannle的bind方法
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }
    //這里修改了NioServerSocketChannle注冊的事件,將0->改為了感興趣的事件即OP_Accpet事件
    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容