在基于nio的編程中,一般是聲明一個ServerSocketChannel對象,然后注冊到selector中,監(jiān)聽accept事件,當有新的連接請求時,select方法返回,通過ServerSocketChannel的accept方法獲取到新建立的SocketChannel連接,并注冊到selector,同時監(jiān)聽read事件,那么netty是如何處理這一過程的呢
我們先來看下一個netty的常規(guī)編程模式,然后再跟蹤源碼,來了解這個注冊監(jiān)聽流程
//1. 啟動器,負責組裝netty組件,啟動服務器
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
new ServerBootstrap()
.group(bossGroup, workerGroup)
//選擇服務器ServerSocketChannel的實現(xiàn)
.channel(NioServerSocketChannel.class)
//4 boss 負責處理連接 worker(child)負責處理讀寫,決定worker能執(zhí)行哪些操作
.childHandler(
//5 代表和客戶端進行數(shù)據(jù)讀寫的通道
new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
}
});
}
})
.bind(8899);
這里面除了ServerBootstrap組件沒有講過,其他的在前面的幾篇文章都說過了,可以簡單的理解為ServerBootstrap就是負責連接各個組件,啟動服務器。
這里直接看下bind方法的邏輯
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
doBind方法,大概可以分為這三個大步驟init、register、doBind0
其中,init就是初始化一個Channel對象,生成關聯(lián)的pipeline對象,并且添加一個入站處理器ChannelInitializer,這個入站處理器的handlerAdd方法會在channel注冊到selector后才會被調用,handlerAdd方法
public void initChannel(final Channel ch) {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
這里面有一個很重要是,當handlerAdd方法觸發(fā)時,又會往pipeline添加一個入站處理器ServerBootstrapAcceptor,并且將當前的處理器從pipeline移除。這個處理器就是用來注冊新連接的讀事件。
初始化channel對象后,接著就調用register方法,就會從NioEventLoopGroup(boss group)中挑選出一個NioEventLoop對象來注冊channel,而NioEventLoop對象對channel的注冊邏輯,跟進代碼可以看到如下
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
//省略....
}
}
}
這里最重要的有一個判斷eventLoop.inEventLoop(),這個是判斷currentThread是否為eventLoop的執(zhí)行線程(EventLoop第一次執(zhí)行任務時會關聯(lián)到一個thread對象),若是的話,直接執(zhí)行register0方法;不是的話,將其作為runnable對象提交給eventLoop的任務隊列,在事件循環(huán)中處理。
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
這個注冊方法里面第一步先執(zhí)行doRegister,這個就是將channel注冊到selector上,但此時并沒有監(jiān)控accept事件。然后再調用pipeline上ChannelHandler的handlerAdd方法,這里會執(zhí)行上面提到的ChannelInitializer的initChannel方法,添加一個ServerBootstrapAcceptor入站處理器。
接著,調用safeSetSuccess方法,表明這個channel的注冊完成,回調promise的監(jiān)聽器,然后調用pipeline上的ChannelHandler的channelRegistered方法。方法后面會判斷isActive方法,因為netty中調用register方法是異步的,當方法register返回后,便會緊接著調bind方法,也就是剛剛說的三大步驟的第三部doBind0方法。