ServerBootstrapAcceptor是用來處理當(dāng)BossGroup(EventLoopGroup)里面有新的客戶端連接產(chǎn)生時(shí)候 將新連接(NioSocketChannel)交給WorkerGroup(EventLoopGroup)
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
//將新的channelHandler(MySocketServerInitializer)添加到pipeline
//同時(shí)會(huì)添加一個(gè)pendingHandlerCallback里面包含了當(dāng)前ChannelHandlerContext
//當(dāng)后面調(diào)用注冊方法的時(shí)候會(huì)觸發(fā)pipeline上的HandlerAdded方法回調(diào) pendingHandlerCallback里面的方法
child.pipeline().addLast(childHandler);
setChannelOptions(child, childOptions, logger);
try {
//將新的連接(NioSocketChannel)注冊到WorkerGroup中的一個(gè)WorkerEventLoop(EventLoop)
//會(huì)從WorkerGroup獲取一個(gè)WorkerEventLoop挑選方法可以看一下
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
}
io.netty.channel.AbstractChannel.AbstractUnsafe#register0
private void register0(ChannelPromise promise) {
try {
//真正將channel注冊到 WorkerEventLoop中的Selecter上
doRegister();
//調(diào)用pipeline.callHandlerAddedForAllHandlers()方法
// PendingHandlerCallback task
// ctx.callHandlerAdded()
// 觸發(fā)了注冊方法 @2
//注:這個(gè)時(shí)候pipline上只有三個(gè)head MySocketServerInitializer tail 三個(gè)handler
// 其中 head tail這兩個(gè)handler是在創(chuàng)建NioSocketChannel時(shí)候創(chuàng)建pipeline默認(rèn)生成的
// 一個(gè)NioSocketChannel只綁定一個(gè)pipeline 參見@1
// MySocketServerInitializer是在ServerBootstrapAcceptor中拿到連接時(shí)候直接加上的
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
@1
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
private final Channel parent;
private final ChannelId id;
private final Unsafe unsafe;
private final DefaultChannelPipeline pipeline;
}
@2
public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
if (initChannel(ctx)) {
removeState(ctx);
}
}
}
}