服務(wù)啟動
服務(wù)啟動可分為以下幾步:
①:Boostrap.bind()
②:創(chuàng)建NioServerSocketChannel
③:將NioServerSocketChannel注冊到EventLoopGroup(這里的EventLoopGroup指的是我們前面服務(wù)創(chuàng)建時指定的bossGroup)
下面結(jié)合源碼進行分析:
ServerBoostrap-bind源碼分析
//bind(port)最終調(diào)用doBind方法
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
//在doBind方法,主要看其第一行代碼,intAndRegister方法:初始化NioServerSocketChannel并注冊
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
.....
}
/**
* 服務(wù)端NioServerSocketChannel的創(chuàng)建、初始化、注冊
* @return
*/
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
//這里的newChannel其實就是對我們之前channel方法傳入的class進行實例化,即NioServerSocketChannel的創(chuàng)建
channel = channelFactory().newChannel();
//NioServerSocketChannle的初始化
init(channel);
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
//服務(wù)端Channel-NioServerSocketChannel的注冊到EventLoop上
ChannelFuture regFuture = group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
NioServerSocketChannel的創(chuàng)建
/**
* NioServerSocketChannel的創(chuàng)建,服務(wù)端socketChannel的創(chuàng)建包含的內(nèi)容:
* 實例化的時候注冊O(shè)P_Accept事件
* 默認pipeline由DefaultChannelPipeline提供,其默認只包含HeadContext和TailContext兩個不含ChannelHandler的AbstratChannelHandlerContext組成。
*/
/**
* NioServerSocketChannel默認的構(gòu)造函數(shù)
*/
public NioServerSocketChannel() {
this(newSocket(DEFAULT_SELECTOR_PROVIDER));
}
/**
* 最終調(diào)用的構(gòu)造器函數(shù),可發(fā)現(xiàn)其①首先注冊了OP_Accept事件,具體的操作發(fā)生在父類AbstractNioChannel的構(gòu)造函數(shù)中
* 而創(chuàng)建默認的Pipeline的動作則發(fā)生在父類AbstractChannel的構(gòu)造函數(shù)中
* ②然后初始化了一些服務(wù)端的基本配置
*/
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
//AbstractNioChannel父類中:Channel感興趣的事件
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;//感興趣的事件,后面,將Channel注冊到EventLoop上的時候會用到
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
//AbstractChannel父類中:創(chuàng)建默認的Pipeline
protected AbstractChannel(Channel parent) {
this.parent = parent;
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
NioServerSocketChannel的初始化
NioServerSocketChannel的初始化,除了初始化一些參數(shù)外,還為pipeline添加一個ChannelInitializer的handler,這個特殊的handler會在channel register的時候被調(diào)用,然后回調(diào)initChannel方法,回調(diào)完成之后,會將當前channel從所屬的pipeline中移除。
NioServerSocketChannel的初始化
//ServerBoostrap.init方法
//這里需重點關(guān)注最后為NioServerSocketChannel添加的Channelhandler,當OP_Accept事件發(fā)生的時候,NioSocketChannel的初始化會調(diào)用這里
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
//重點
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = handler();
//將我們之前添加的ChannelHanndler添加到SocketChannel的初始化中
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
上述初始化代碼中為pipeline添加了一個重要的類 ServerBootstrapAcceptor 這個類是專門負責處理IO監(jiān)聽事件的。它的里面提供了一個重要的方法channelRead,這個方法做了兩件事:
- 為接收到的Channel添加我們在服務(wù)啟動前設(shè)置的自定義的handler。
- 將接收到的channel注冊到childGroup所在的EventLoopGroup中
NioServerSocketChannel的ChannelHandler
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
//省略部分代碼...
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
//省略部分代碼...
}
NioServerSocketChannel的注冊
/**
* NioServerSocketChannel的注冊,這里的group是我們創(chuàng)建ServerBoostrap傳入的第一個group即bossGroup
*/
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
//next方法用于順序返回group中的一個EventLoop,因為我們創(chuàng)建bossGroup的時候只指定了一個EventLoop,所以其實這里只有一個Eventloop可供選擇
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
}
//最終的注冊會調(diào)用channel自身的register方法
@Override
public ChannelFuture register(final Channel channel, final ChannelPromise promise) {
....
channel.unsafe().register(this, promise);
return promise;
}
//看一下channel自身的register方法,發(fā)現(xiàn)其會判斷調(diào)用當前register的線程是否就是支撐當前EventLoop的線程
//如果是,則直接進行register,否則封裝成一個任務(wù),等待EventLoop之后進行調(diào)度執(zhí)行
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
//在register0方法中,channel將自身注冊到了EventLoop上Selector上,只不過注冊的ops是0,表示只注冊,不監(jiān)聽任何事件
//而真正的注冊O(shè)P_Accept事件發(fā)生在,java原生的ServerSocketChannel綁定了localAddress之后,
public final void bind(final SocketAddress localAddress, final ChannelPromise){
boolean wasActive = isActive();
try {
//java原生接口ServerSocketChannle的bind方法
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
//這里修改了NioServerSocketChannle注冊的事件,將0->改為了感興趣的事件即OP_Accpet事件
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
}