本篇文章主要梳理了Netty服務(wù)端的一個(gè)啟動(dòng)過(guò)程,比較直接,閱讀此篇文章需要對(duì)Netty的基本組件以及模型有一個(gè)基本的了解。
一個(gè)典型的Netty服務(wù)端代碼如下所示:
public void bind(int port) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
ch.pipeline().addLast(new LineBasedFrameDecoder(2014));
ch.pipeline().addLast(new StringDecoder());
ch.pipeline().addLast(new TimeServerHandler());
}
});
ChannelFuture future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
步驟如下:
1、創(chuàng)建兩個(gè)EventLoopGroup,一個(gè)是bossGroup,一個(gè)是workerGroup,前者主要負(fù)責(zé)獲取新連接的操作,后者負(fù)責(zé)處理新連接的I/O操作。
2、定義一個(gè)ServerBootStrap實(shí)例,初始化線程池(兩個(gè))、channel類型(NioServerSocketChannel)、channel參數(shù)選項(xiàng)(SO_BACKLOG)、添加NioServerSocketChannel的handler(LoggingHandler)以及新連接channel的handler(ChannelInitializer)。
3、綁定端口號(hào),啟動(dòng)服務(wù)器,主線程同步阻塞等待。
4、服務(wù)端的channel關(guān)閉之后,優(yōu)雅關(guān)閉線程池
========================bind(port)=================================
bootstrap的bind調(diào)用鏈:
AbstractBootstrap.bind(int inetPort)
-> AbstractBootstrap.bind(SocketAddress localAddress)
-> AbstractBootstrap.doBind(final SocketAddress localAddress)
最終調(diào)用的是下面的邏輯:
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
通過(guò)上面的代碼,可以看出來(lái)doBind主要做了兩件事:
1、initAndRegister();主要負(fù)責(zé)初始化NioServerSocketChannel實(shí)例、以及將該channel注冊(cè)到eventLoop等操作。
2、綁定地址;如果上一步的注冊(cè)操作完成了,直接綁定地址, 沒(méi)完成的話,對(duì)注冊(cè)返回的Future綁定監(jiān)聽(tīng)器,在監(jiān)聽(tīng)器中綁定地址。
======================initAndRegister()========================
initAndRegister()的邏輯,去除了非關(guān)鍵的try catch語(yǔ)句以及相關(guān)的異常處理:
final ChannelFuture initAndRegister() {
Channel channel = null;
channel = channelFactory.newChannel();
init(channel);
ChannelFuture regFuture = config().group().register(channel);
return regFuture;
}
通過(guò)上面的代碼,可以看出來(lái)initAndRegister主要做了以下幾件事:
1、創(chuàng)建ServerSocketChannel實(shí)例,并為之創(chuàng)建了關(guān)聯(lián)ChannelPipeline實(shí)例,具體邏輯可以看ServerSocketChannel的無(wú)參構(gòu)造函數(shù),這里就不詳述了。
2、執(zhí)行init方法,進(jìn)行channel的初始化操作。
3、執(zhí)行注冊(cè)邏輯,將ServerSocketChannel實(shí)例注冊(cè)到NioEventLoop中。
======================init邏輯==============================
init方法在ServerBootStrap類中實(shí)現(xiàn):
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}
通過(guò)上面的代碼,可以看出來(lái)初始化操作的主要邏輯有:
1、設(shè)置NioServerSocketChannel的options參數(shù)以及attrs參數(shù)。
2、準(zhǔn)備NioSocketChannel的各種參數(shù)配置,包括eventLoop線程池、事件handler、options參數(shù)以及attrs參數(shù),以備ServerBootstrapAcceptor之用。
3、向NioServerSocketChannel實(shí)例的pipeline中添加一個(gè)ChannelInitializer實(shí)例。ChannelInitializer的主要作用就是負(fù)責(zé)channel被注冊(cè)到eventLoop后的初始化操作。
這里重點(diǎn)看一下pipeline的addLast()操作,該方法在DefaultChannelPipeline實(shí)現(xiàn):
@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
for (ChannelHandler h: handlers) {
addLast(executor, null, h);
}
return this;
}
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
//1、
checkMultiplicity(handler);
//2、
newCtx = newContext(group, filterName(name, handler), handler);
//3、
addLast0(newCtx);
//4、
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
//下面的邏輯都不會(huì)執(zhí)行
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
主要的執(zhí)行邏輯就在上面這個(gè)addLast方法里,可以看一下該方法的主要邏輯,這里被添加進(jìn)pipeline的handler就是上面的ChannelInitializer實(shí)例:
1、首先檢查要添加的handler的重復(fù)性。如果該handler不是Sharable的并且已經(jīng)被添加到其他的pipeline,就拋異常。
2、根據(jù)所給的eventLoopGroup、handler的name以及handler創(chuàng)建一個(gè)ChannelHandlerContext實(shí)例。
3、將新創(chuàng)建的context實(shí)例添加到pipeline的雙向鏈表中。
4、此時(shí)registered為false,也就是說(shuō)pipeline對(duì)應(yīng)的channel還沒(méi)有被注冊(cè)到eventLoop中,那么就設(shè)置該context的狀態(tài)為ADD_PENDING,同時(shí)將該context封裝成一個(gè)PendingHandlerAddedTask實(shí)例,將該實(shí)例添加到pipeline的一個(gè)專門的鏈表中,以備在channel被注冊(cè)到eventLoop后執(zhí)行該實(shí)例的調(diào)用。簡(jiǎn)單地看一下具體實(shí)現(xiàn):
private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
assert !registered;
PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
PendingHandlerCallback pending = pendingHandlerCallbackHead;
if (pending == null) {
pendingHandlerCallbackHead = task;
} else {
while (pending.next != null) {
pending = pending.next;
}
pending.next = task;
}
}
pendingHandlerCallbackHead就是pipeline的一個(gè)專門的鏈表,用來(lái)維護(hù)需要執(zhí)行的PendingHandlerAddedTask實(shí)例任務(wù)。
以上的邏輯就是init初始化的一個(gè)具體操作。
======================register()=============================
接下來(lái)是register邏輯,首先回顧一下initAndRegister方法:
final ChannelFuture initAndRegister() {
Channel channel = null;
channel = channelFactory.newChannel();
init(channel);
ChannelFuture regFuture = config().group().register(channel);
return regFuture;
}
register的調(diào)用鏈(config().group()返回的就是bossGroup):
MultithreadEventLoopGroup.register(Channel channel)
-> SingleThreadEventLoop.register(Channel channel)
-> SingleThreadEventLoop.register(final ChannelPromise promise)
-> AbstractUnsafe.register(EventLoop eventLoop, final ChannelPromise promise)
調(diào)用的代碼邏輯如下所示:
public ChannelFuture register(Channel channel) {
//在EventLoopGroup中選擇一個(gè)EventLoop線程進(jìn)行注冊(cè)操作。
return next().register(channel);
}
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
public ChannelFuture register(final ChannelPromise promise) {
promise.channel().unsafe().register(this, promise);
return promise;
}
//AbstractUnsafe類
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
//前面是檢查操作,不貼代碼了,
//如果eventLoop == null,拋出異常
//如果該Channel已經(jīng)注冊(cè)過(guò),ChannelPromise設(shè)置失敗
//如果該eventLoop不是NioEventLoop,ChannelPromise設(shè)置失敗
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
//異常處理
}
}
}
register的主要操作邏輯如下:
1、將channel注冊(cè)到所選擇的eventLoop上,執(zhí)行到這,channel就算找到了自己的歸屬。之后在該channel的整個(gè)生命周期內(nèi),所有的事件執(zhí)行操作都由eventLoop負(fù)責(zé)。
2、判斷當(dāng)前執(zhí)行線程是不是eventLoop所屬的線程
如果是,直接執(zhí)行register0操作
否則,在eventLoop的線程中執(zhí)行register0
3、如果你自己調(diào)試的話,此時(shí)你會(huì)發(fā)現(xiàn)eventLoop里面的線程屬性還為空,所以當(dāng)前執(zhí)行線程肯定不是eventLoop中的線程。那么eventLoop中的線程什么時(shí)候創(chuàng)建呢,別急,往下看。
緊接著會(huì)執(zhí)行eventLoop.execute()(SingleThreadEventExecutor類中):
public void execute(Runnable task) {
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
**startThread();**
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
當(dāng)前執(zhí)行線程還是主線程,雖然進(jìn)入到eventLoop的執(zhí)行邏輯里面,但是線程還沒(méi)有切換(也沒(méi)有線程可以切換,因?yàn)榇藭r(shí)eventLoop的線程還沒(méi)有被創(chuàng)建)。所以緊接著會(huì)執(zhí)行下面的邏輯。
private void startThread() {
if (state == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
doStartThread();
}
}
}
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
//當(dāng)前線程就是新創(chuàng)建的線程,分配給eventLoop:eventloop.thread=Thread.currentThread();
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
//非關(guān)鍵代碼不貼
}
}
});
}
上面代碼中executor的類型是ThreadPerTaskExecutor,其execute()方法的邏輯就是每來(lái)一個(gè)任務(wù),創(chuàng)建一個(gè)新線程執(zhí)行,所以上面的new Runnable(...)任務(wù)會(huì)在新創(chuàng)建的線程中執(zhí)行,而這個(gè)新創(chuàng)建的線程就被分配給了eventLoop。
//ThreadPerTaskExecutor
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
到現(xiàn)在為止,eventLoop中的新線程已經(jīng)開(kāi)始執(zhí)行任務(wù)了,什么任務(wù)?那就是NioEventLoop中的run方法:
SingleThreadEventExecutor.this.run();
回到eventLoop中的execute代碼中來(lái),現(xiàn)在startThread方法已經(jīng)執(zhí)行完,新線程已經(jīng)創(chuàng)建,正在運(yùn)行當(dāng)中,接下來(lái)就是向eventLoop所屬的隊(duì)列中添加任務(wù)。代碼比較簡(jiǎn)單就不貼了。
public void execute(Runnable task) {
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
添加了一個(gè)什么任務(wù)呢?就是執(zhí)行register0的任務(wù)。
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
任務(wù)添加到隊(duì)列之后,eventLoop線程就去隊(duì)列中取任務(wù),然后執(zhí)行。所以register0是在eventLoop線程中執(zhí)行的。
下面是eventLoop現(xiàn)成的調(diào)用棧,可以沿著調(diào)用棧查看eventLoop線程的執(zhí)行邏輯,這里就不詳述了:

=========================register0==========================
以下邏輯都是在eventLoop線程中執(zhí)行的:
private void register0(ChannelPromise promise) {
try {
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See [https://github.com/netty/netty/issues/4805](https://github.com/netty/netty/issues/4805)
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
}
}
從上面的代碼可以看出,主要的邏輯有:
1、執(zhí)行doRegister操作,將channel注冊(cè)到selector上。
2、設(shè)置已注冊(cè)標(biāo)志:registered=true
3、回調(diào)handlerAdded方法
4、回調(diào)ChannelRegistered方法
這里重點(diǎn)看一下3和4:
final void invokeHandlerAddedIfNeeded() {
assert channel.eventLoop().inEventLoop();
if (firstRegistration) {
firstRegistration = false;
callHandlerAddedForAllHandlers();
}
}
private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
assert !registered;
registered = true;
pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;
this.pendingHandlerCallbackHead = null;
}
PendingHandlerCallback task = pendingHandlerCallbackHead;
while (task != null) {
task.execute();
task = task.next;
}
}
HandlerAdded方法確保是在eventLoop線程中執(zhí)行的??梢钥吹?code>callHandlerAddedForAllHandlers的方法比較簡(jiǎn)單:首先拿到PendingHandlerCallBack的鏈表頭,依次執(zhí)行鏈表中的每一個(gè)任務(wù)。PendingHandlerCallBack實(shí)現(xiàn)了Runnable接口,可以被線程調(diào)用。
private abstract static class PendingHandlerCallback implements Runnable
下面是PendingHandlerAddedTask實(shí)現(xiàn)的execute方法:
void execute() {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
callHandlerAdded0(ctx);
} else {
try {
//當(dāng)前線程不是eventLoop線程,調(diào)用eventLoop線程執(zhí)行this任務(wù),
//最終還是執(zhí)行callHandlerAdded0(ctx)方法,看下面實(shí)現(xiàn)的run方法。
executor.execute(this);
} catch (RejectedExecutionException e) {
remove0(ctx);
ctx.setRemoved();
}
}
}
@Override
public void run() {
callHandlerAdded0(ctx);
}
eventLoop線程直接執(zhí)行callHandlerAdded0(ctx)方法。
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
ctx.handler().handlerAdded(ctx);
ctx.setAddComplete();
} catch (Throwable t) {
//異常處理,將handler從pipeline中移除掉,并調(diào)用handler的handlerRemoved方法。
}
}
這里面,ctx.handler()返回的就是之前init過(guò)程中的ChannelInitializer實(shí)例。所以這里調(diào)用的是ChannelInitializer里面的handlerAdded方法。
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
initChannel(ctx);
}
}
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
exceptionCaught(ctx, cause);
} finally {
remove(ctx);
}
return true;
}
return false;
}
initChannel主要做了這幾件事:
1、初始化channel,這里面的initChannel方法就是在init里面實(shí)現(xiàn)的。
2、出現(xiàn)異常的話進(jìn)行異常捕獲
3、最終,將ChannelInitializer的實(shí)例從pipeline中移除。
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
initChannel的邏輯就是:
1、向pipeline中添加用戶在ServerBootStrap中指定的handler。
2、調(diào)用eventLoop線程執(zhí)行pipeline的addLast邏輯,向pipeline中添加一個(gè)ServerBootstrapAcceptor實(shí)例,該實(shí)例的作用就是當(dāng)有連接到來(lái),創(chuàng)建了新channel之后,對(duì)該channel進(jìn)行初始化。
這里由于當(dāng)前線程就是eventLoop線程,所以execute執(zhí)行邏輯就是將該任務(wù)添加到隊(duì)列中,等該線程執(zhí)行完當(dāng)前任務(wù)后再?gòu)年?duì)列中取任務(wù)執(zhí)行:
public void execute(Runnable task) {
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
//非關(guān)鍵代碼不貼
}
//非關(guān)鍵代碼不貼
}
以上的邏輯就是channel注冊(cè)到eventLoop之后回調(diào)的handlerAdded方法。一句話,就是執(zhí)行ChannelInitializer的initChannel方法,對(duì)channel進(jìn)行初始化,添加額外的handler。
接下來(lái)將回調(diào)channelRegistered方法。
pipeline.fireChannelRegistered();
public final ChannelPipeline fireChannelRegistered() {
AbstractChannelHandlerContext.invokeChannelRegistered(head);
return this;
}
回調(diào)完成之后,整個(gè)register0的邏輯計(jì)算大體執(zhí)行完了。
整個(gè)register0方法就是在eventLoop中執(zhí)行的,執(zhí)行完之后,eventLoop線程繼續(xù)進(jìn)行無(wú)限for循環(huán):如果隊(duì)列中有任務(wù),就取任務(wù)執(zhí)行,否則進(jìn)行selector.select操作,具體邏輯可以到NioEventLoop的run方法去查看。整個(gè)eventLoop線程的大體執(zhí)行邏輯就是這樣。
===========================================================
然后我們回到主線程:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
整個(gè)register邏輯就算執(zhí)行完了,返回regFuture。
緊接著就是執(zhí)行doBind0操作
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
//非關(guān)鍵代碼不貼
}
}
可以看到綁定地址的操作也是在eventLoop線程中執(zhí)行的,這里execute就是提交任務(wù)到隊(duì)列,由eventLoop線程去隊(duì)列取任務(wù)執(zhí)行。
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
執(zhí)行完之后,返回了一個(gè)ChannelPromise實(shí)例,然后調(diào)用其同步方法sync:
ChannelFuture future = bootstrap.bind(port).sync();
此時(shí)主線程已經(jīng)被阻塞住,查看線程狀態(tài):

主線程阻塞,eventLoop線程一直loop。至此,整個(gè)服務(wù)端就算已經(jīng)啟動(dòng)完畢了。