如果還不了解原生nio的socket編程,可以看前置博文
一個簡單的Demo程序
先貼一個簡單的netty的example中echo服務(wù)端代碼
/*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.example.echo;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;
/**
* Echoes back any received data from a client.
*/
public final class EchoServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
// Configure the server.
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
代碼很簡潔,但是看不懂,因為使用的這些類均和Nio原生編程相差甚遠,下面先簡單分析一下。
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup);
此處首先是新建了一個ServerBootstrap 啟動類,分別設(shè)置好boss和worker工作線程。
b.channel(NioServerSocketChannel.class);
此處是設(shè)置channel的類型,內(nèi)部會以創(chuàng)建一個ServerBootstrapChannelFactory工廠來保存class,用于后續(xù)對象創(chuàng)建。
b.option(ChannelOption.SO_BACKLOG, 100);
此處設(shè)置了客戶端連接socket屬性。
b.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
此處設(shè)置了客戶端連接建立以后對SocketChannel的初始化邏輯。
以上的代碼均是給ServerBootstrap對象的各個參數(shù)賦值,真正讓netty跑起來的重點在下面代碼。
ChannelFuture f = b.bind(port).sync();
閱讀這段代碼之前,我們留個懸念,我們需要先了解另一個類:NioEventLoop。了解了NioEventLoop,netty中的線程模型就清晰起來了,后續(xù)分析將不會太費力。
NioEventLoop
NioEventLoop是與jdk層nio交互的最重要的對象,是在NioEventLoopGroup對象中創(chuàng)建出來的。
NioEventGroup內(nèi)部有個名為children的數(shù)組,我們把它理解成一個頭尾相連的環(huán),每次我們調(diào)用NioEventLoopGroup.next()方法時,會返回這個環(huán)的下一個元素。這個元素就是一個NioEventLoop。
這個children的大小由什么決定呢?答案就是NioEventLoopGroup對象構(gòu)造時傳入的線程數(shù)量。
接下來我們來看看NioEventLoop的具體實現(xiàn),構(gòu)造函數(shù)
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider) {
super(parent, executor, false);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
provider = selectorProvider;
selector = openSelector();
}
看一下openSelector()的實現(xiàn)。
private Selector openSelector() {
final Selector selector;
try {
selector = provider.openSelector();
} catch (IOException e) {
throw new ChannelException("failed to open a new selector", e);
}
if (DISABLE_KEYSET_OPTIMIZATION) {
return selector;
}
try {
SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();
Class<?> selectorImplClass =
Class.forName("sun.nio.ch.SelectorImpl", false, ClassLoader.getSystemClassLoader());
// Ensure the current selector implementation is what we can instrument.
if (!selectorImplClass.isAssignableFrom(selector.getClass())) {
return selector;
}
Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");
Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");
selectedKeysField.setAccessible(true);
publicSelectedKeysField.setAccessible(true);
selectedKeysField.set(selector, selectedKeySet);
publicSelectedKeysField.set(selector, selectedKeySet);
selectedKeys = selectedKeySet;
logger.trace("Instrumented an optimized java.util.Set into: {}", selector);
} catch (Throwable t) {
selectedKeys = null;
logger.trace("Failed to instrument an optimized java.util.Set into: {}", selector, t);
}
return selector;
}
構(gòu)造函數(shù)調(diào)用了provider.openSelector()來產(chǎn)生一個多路復(fù)用選擇器對象。
jdk原生Nio實現(xiàn)中,selector內(nèi)部有一個HashSet對象selectedKeys,用來存儲調(diào)用select函數(shù)之后的結(jié)果集。如果未禁用優(yōu)化,此處還利用反射將selector內(nèi)部的selectedKeys值設(shè)置成本地對象。這么做有一個好處,每次調(diào)用Selector的select函數(shù)以后,能很方便的查看selectedKeys的值以確認是否產(chǎn)生了發(fā)生了新的事件。
外界可以調(diào)用NioEventLoop的execute方法來放入任務(wù),查看其實現(xiàn)。
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp) {
wakeup(inEventLoop);
}
}
private void startThread() {
synchronized (stateLock) {
if (state == ST_NOT_STARTED) {
state = ST_STARTED;
delayedTaskQueue.add(new ScheduledFutureTask<Void>(
this, delayedTaskQueue, Executors.<Void>callable(new PurgeTask(), null),
ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));
doStartThread();
}
}
}
NioEventLoop內(nèi)部有一個狀態(tài)變量state,這保證了在調(diào)用startThread方法時,只會調(diào)用一次doStartThread。而doStartThread,在首次調(diào)用的時候,會創(chuàng)建新的線程,查看doStartThread
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
if (state < ST_SHUTTING_DOWN) {
state = ST_SHUTTING_DOWN;
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
"before run() implementation terminates.");
}
try {
// Run all remaining tasks and shutdown hooks.
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
synchronized (stateLock) {
state = ST_TERMINATED;
}
threadLock.release();
if (!taskQueue.isEmpty()) {
logger.warn(
"An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}
terminationFuture.setSuccess(null);
}
}
}
}
});
}
主角是executor,看下其默認實現(xiàn)。
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
}
可以看到,每次調(diào)用executor的execute方法將會產(chǎn)生一個新的線程,實際上只調(diào)用了一次doStartThread,所以只會創(chuàng)建一個線程。
新線程最后調(diào)用到了"SingleThreadEventExecutor.this.run();"。好了,我們離真相已經(jīng)很近了。貼一下run的實現(xiàn)。
protected void run() {
for (;;) {
oldWakenUp = wakenUp.getAndSet(false);
try {
if (hasTasks()) {
selectNow();
} else {
select();
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
selector.wakeup();
}
}
cancelledKeys = 0;
final long ioStartTime = System.nanoTime();
needsToSelectAgain = false;
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
final long ioTime = System.nanoTime() - ioStartTime;
final int ioRatio = this.ioRatio;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
} catch (Throwable t) {
logger.warn("Unexpected exception in the selector loop.", t);
// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}
}
}
run方法,是一個死循環(huán),做的事情就是周期執(zhí)行Selector的select函數(shù)獲取事件并處理,以及執(zhí)行一些拋進隊列的任務(wù)。
- select()/selectNow(),查看函數(shù)內(nèi)部,執(zhí)行了原生Selector的select方法,第一步已經(jīng)浮出水面了,根據(jù)nio的調(diào)用流程(詳細代碼在這篇博文中有),下一步應(yīng)該就是ServerSocketChannel調(diào)用accept函數(shù)來接受客戶端鏈接了,讓我們找一下。
2.如果select調(diào)用之后有事件發(fā)生。那么selectedKeys將發(fā)生改變(注意selectedKeys變量實際是指向底層Selector的觸發(fā)事件集合的引用),此時進入processSelectedKeysOptimized函數(shù)處理:
private void processSelectedKeysOptimized(SelectionKey[] selectedKeys) {
for (int i = 0;; i ++) {
final SelectionKey k = selectedKeys[i];
if (k == null) {
break;
}
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) {
selectAgain();
// Need to flip the optimized selectedKeys to get the right reference to the array
// and reset the index to -1 which will then set to 0 on the for loop
// to start over again.
//
// See https://github.com/netty/netty/issues/1523
selectedKeys = this.selectedKeys.flip();
i = -1;
}
}
}
進一步看processSelectedKey
private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}
try {
int readyOps = k.readyOps();
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
} catch (CancelledKeyException e) {
unsafe.close(unsafe.voidPromise());
}
}
當客戶端觸發(fā)連接的時候,readyOps應(yīng)該是16 ,對應(yīng)著SelectionKey.OP_ACCEPT(如果觸發(fā)了OP_READ,那么將觸發(fā)讀取客戶端數(shù)據(jù)操作,這個在下篇博文中再詳盡分析,地址),進一步查看unsafe.read()中調(diào)用的doReadMessages方法。
public void read() {
assert eventLoop().inEventLoop();
if (!config().isAutoRead()) {
removeReadOp();
}
final ChannelConfig config = config();
final int maxMessagesPerRead = config.getMaxMessagesPerRead();
final boolean autoRead = config.isAutoRead();
final ChannelPipeline pipeline = pipeline();
boolean closed = false;
Throwable exception = null;
try {
for (;;) {
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
if (readBuf.size() >= maxMessagesPerRead | !autoRead) {
break;
}
}
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
pipeline.fireChannelReadComplete();
if (exception != null) {
if (exception instanceof IOException) {
// ServerChannel should not be closed even on IOException because it can often continue
// accepting incoming connections. (e.g. too many open files)
closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
}
pipeline.fireExceptionCaught(exception);
}
if (closed) {
if (isOpen()) {
close(voidPromise());
}
}
}
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, childEventLoopGroup().next(), ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
由此我們也找到了accept,藏的還挺深,調(diào)用accept之后我們拿到具體對接客戶端連接的socket綁定到一個work線程,放入list buf中。接著我們回到上層的read方法。
一步步調(diào)用到了這里。

public void channelRead(ChannelHandlerContext ctx, Object msg) {
Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
for (Entry<ChannelOption<?>, Object> e: childOptions) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + child, t);
}
}
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
child.unsafe().register(child.newPromise());
}
首先是這句 "child.pipeline().addLast(childHandler);" 很熟悉不是嗎,childHandler是開頭我們調(diào)用ServerBootstrap的childHandler方法傳入的處理對象,接下來設(shè)置好socket屬性
查看register實現(xiàn)。
public final void register(final ChannelPromise promise) {
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
promise.setFailure(t);
}
}
}
向eventLoop投遞了一個register事件,在eventLoop(NioEventLoop)線程(此時的eventLoop是workerGroup中的線程)中,將會把這個SocketChannel也注冊到eventLoop中的selector,注意到這里實現(xiàn)和我們原生的nio調(diào)用有區(qū)別,每個線程都啟用了一個Selector對象來輪詢事件。
接下來我們回到開頭的demo程序,看看bind做了什么
public ChannelFuture bind(SocketAddress localAddress) {
validate();//判斷參數(shù)合法性
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
看doBind
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
final ChannelPromise promise;
if (regFuture.isDone()) {
promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
promise = new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doBind0(regFuture, channel, localAddress, promise);
}
});
}
return promise;
}
初始化了一個Channel,并將其綁定到boss線程。我們進一步看下initAndRegister
final ChannelFuture initAndRegister() {
Channel channel;
try {
channel = createChannel();
} catch (Throwable t) {
return VoidChannel.INSTANCE.newFailedFuture(t);
}
try {
init(channel);
} catch (Throwable t) {
channel.unsafe().closeForcibly();
return channel.newFailedFuture(t);
}
ChannelPromise regFuture = channel.newPromise();
channel.unsafe().register(regFuture);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
// If we are here and the promise is not failed, it's one of the following cases:
// 1) If we attempted registration from the event loop, the registration has been completed at this point.
// i.e. It's safe to attempt bind() or connect() now beause the channel has been registered.
// 2) If we attempted registration from the other thread, the registration request has been successfully
// added to the event loop's task queue for later execution.
// i.e. It's safe to attempt bind() or connect() now:
// because bind() or connect() will be executed *after* the scheduled registration task is executed
// because register(), bind(), and connect() are all bound to the same thread.
return regFuture;
}
進一步分為三個步驟,createChannel,init和register。
Channel createChannel() {
EventLoop eventLoop = group().next();
return channelFactory().newChannel(eventLoop, childGroup);
}
void init(Channel channel) throws Exception {
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
channel.config().setOptions(options);
}
final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
if (handler() != null) {
p.addLast(handler());
}
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption<?>, Object>[] currentChildOptions;
final Entry<AttributeKey<?>, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new ServerBootstrapAcceptor(currentChildHandler, currentChildOptions,
currentChildAttrs));
}
});
}
根據(jù)createChannel的實現(xiàn)所示,ServerBootstrap.channel設(shè)置進來的Channel類型派上用場了。這里將bossGroup中的NioeventLoop綁定到
創(chuàng)建出來的channel中,為什么也同時綁了workerGroup呢,因為這個ServerChannel接收到的客戶端連接要拋給指定的worker處理呀。
init函數(shù)完成了setoption,及給ServerChannel的pipline綁定了對于的處理ChannelHandler。
接下來我們著重看下register的實現(xiàn)。
public final void register(final ChannelPromise promise) {
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
promise.setFailure(t);
}
}
}
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!ensureOpen(promise)) {
return;
}
doRegister();
registered = true;
promise.setSuccess();
pipeline.fireChannelRegistered();
if (isActive()) {
pipeline.fireChannelActive();
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
if (!promise.tryFailure(t)) {
logger.warn(
"Tried to fail the registration promise, but it is complete already. " +
"Swallowing the cause of the registration failure:", t);
}
}
}
終于看到了調(diào)用了eventLoop.execute方法。這里由于不是Eventloop的內(nèi)部線程因此會走到execute的邏輯。結(jié)合我們之前對NioEventLoop的分析,首次調(diào)用會創(chuàng)建一個新的線程來執(zhí)行投遞進去Runnable對象的run方法,最后執(zhí)行了ServerChannel的注冊邏輯。注意到傳進去的promise是一個future對象,在注冊成功以后,可以由其他線程通過promise看到是否執(zhí)行完成
至此,我們總結(jié)一下。
ServerBootstrap設(shè)置了兩個線程組,bossGroup和workerGroup,每個線程內(nèi)部均有一個selector循環(huán)地執(zhí)行select函數(shù)來查找監(jiān)聽的事件。正常場景下,我們應(yīng)該只有一個監(jiān)聽端口,此時bossGroup僅有一個線程在工作。
boss線程的selector只綁定了一個ServerSocketChannel,當其accept到一個客戶端連接以后,會調(diào)用線程組的next()函數(shù)獲取一個NioEventLoop來將SocketChannel放入worker中執(zhí)行邏輯。
同時NioEventLoop還有一個execute方法,支持了其他線程往內(nèi)部線程拋入Runnable任務(wù)。這個主要場景是boss線程檢測到有新連接到來時,將channel注冊到worker線程組。以及用戶線程函數(shù)在調(diào)用ServerBootstrap的bind
時注冊serverChannel到boss線程。
還需要擴展認識的部分
還是有許多疑惑,數(shù)據(jù)的拆分包的實現(xiàn)原理是怎樣的,ChannelHandler處理數(shù)據(jù)的流程,添加多個ChannelHandler時如何工作。下回合分析。