Bootstrap適用于創(chuàng)建客戶(hù)端連接的一個(gè)引導(dǎo)類(lèi),我們可以通過(guò)它很方便的創(chuàng)建出Netty客戶(hù)端的連接,接下來(lái)我以官方源碼里面的example echo項(xiàng)目為例來(lái)具體分析其實(shí)現(xiàn):
例子來(lái)自官方的Example示例下面的echo項(xiàng)目,example\src\main\java\io\netty\example\echo
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(HOST, PORT).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
上面的代碼向我們展示了Netty客戶(hù)端初始化所需的內(nèi)容:
1、EventLoopGroup:事件線程循環(huán)調(diào)度組,這里使用的是NioEventLoopGroup。
2、Bootstrap:客戶(hù)端引導(dǎo)類(lèi)
3、NioSocketChannel:Nio客戶(hù)端通道
4、ChannelInitializer:客戶(hù)端Handler初始器
5、ChannelPipeline:Channel管道
在研究Bootstrap實(shí)現(xiàn)之前,我們先來(lái)看下Bootstrap的類(lèi)圖結(jié)構(gòu):

可以看到,Bootstrap的類(lèi)圖結(jié)構(gòu)還是比較簡(jiǎn)單的,上層有個(gè)抽象類(lèi)AbstractBootstrap,以及兩個(gè)頂層接口分別是Cloneable、Channel。
繼續(xù)研究,我們看到上面的代碼中是直接new 出來(lái)了一個(gè)Bootstrap,這個(gè)是個(gè)空的構(gòu)造函數(shù),所以這一步驟沒(méi)啥可說(shuō)的,接下來(lái)把EventLoopGroup設(shè)置進(jìn)了b.group(group),我們進(jìn)入這里看下其實(shí)現(xiàn):
public B group(EventLoopGroup group) {
// 忽略參數(shù)校驗(yàn)
this.group = group;
return self();
}
其實(shí)這一步驟是由AbstractBootstrap來(lái)完成的,group的定義如下:
volatile EventLoopGroup group;
繼續(xù)往下走,來(lái)到了 channel(NioSocketChannel.class),這里設(shè)置了一個(gè)Chanenl類(lèi)型,此處使用的是NioSocketChannel,這里的具體實(shí)現(xiàn)也是由AbstractBootstrap來(lái)完成的,代碼如下:
public B channel(Class<? extends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
我們來(lái)看下ReflectiveChannelFactory里面的實(shí)現(xiàn)是怎么樣的:
private final Constructor<? extends T> constructor;
public ReflectiveChannelFactory(Class<? extends T> clazz) {
this.constructor = clazz.getConstructor();
}
@Override
public T newChannel() {
return constructor.newInstance();
}
上面的代碼去除了參數(shù)校驗(yàn)以及異常判斷,這樣看代碼更加清晰明了一點(diǎn),可以看到的是這不就是我們經(jīng)常使用的Java反射實(shí)例對(duì)象的方式嗎,首先先將傳遞進(jìn)來(lái)的NioSocketChanenl.class,通過(guò)獲取其默認(rèn)的構(gòu)造函數(shù)對(duì)象,并將其默認(rèn)構(gòu)造函數(shù)保存起來(lái),后期實(shí)現(xiàn)的使用,直接通過(guò)newChannel()獲取一個(gè)新的實(shí)例即可。ReflectiveChannelFactory實(shí)例對(duì)象后面通過(guò)channelFactory()方法設(shè)置給了AbstractBootstrap的channelFactory字段。
接下來(lái),代碼使用了option(ChannelOption.TCP_NODELAY, true),這里也是由AbstractBootstrap父類(lèi)的進(jìn)行處理的:
public <T> B option(ChannelOption<T> option, T value) {
synchronized (options) {
if (value == null) {
options.remove(option);
} else {
options.put(option, value);
}
}
return self();
}
這里可以看到如果ChannelOption里面的常量值如果設(shè)置的是null則會(huì)進(jìn)行移除該選項(xiàng),否則就將其設(shè)置到options中,下面我們來(lái)看下options在AbstractBootstrap中是如何定義的:
private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
這里就是一個(gè)簡(jiǎn)單的LinkedHashMap用于保存ChannelOption的鍵值集合,比較簡(jiǎn)單。
下面設(shè)置了handler(),也是由AbstractBootstrap來(lái)設(shè)置,代碼實(shí)現(xiàn)如下:
public B handler(ChannelHandler handler) {
this.handler = ObjectUtil.checkNotNull(handler, "handler");
return self();
}
這里不過(guò)多展開(kāi),后面會(huì)有單獨(dú)的章節(jié)討論。
繼續(xù)往下走,b.connect(HOST, PORT).sync();看下其實(shí)現(xiàn)先:
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
promise.registered();
doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
}
}
});
return promise;
}
}
代碼雖長(zhǎng),但是最重要的還是initAndRegister()方法,繼續(xù)跟進(jìn)看看:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
channel = channelFactory.newChannel();
init(channel);
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
}
ChannelFuture regFuture = config().group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
走到這里,我們可以看到我們之前通過(guò)傳遞NioSocketChannel.class構(gòu)建的ReflectiveChannelFactory工廠排上用場(chǎng)了,這里也是通過(guò)newChannel()方法直接通過(guò)構(gòu)造函數(shù)直接new出來(lái)了一個(gè)NioSocketChannel對(duì)象出來(lái),因?yàn)檫@里使用的是默認(rèn)構(gòu)造函數(shù)構(gòu)造的對(duì)象,具體的代碼實(shí)現(xiàn)我們還是需要手動(dòng)定位到NioSocketChannel中一看究竟,如下:
private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();
private static SocketChannel newSocket(SelectorProvider provider) {
return provider.openSocketChannel();
}
public NioSocketChannel() {
this(DEFAULT_SELECTOR_PROVIDER);
}
public NioSocketChannel(SelectorProvider provider) {
this(newSocket(provider));
}
public NioSocketChannel(SocketChannel socket) {
this(null, socket);
}
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
可以看到這里使用了JDK的SelectorProvider提供器打開(kāi)了一個(gè)新的SocketChannel通道,然后調(diào)用了父類(lèi)AbstractNioByteChannel的構(gòu)造函數(shù):
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
AbstractNioByteChannel又調(diào)用了它的父類(lèi)AbstractNioChannel構(gòu)造函數(shù):
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
ch.configureBlocking(false);
}
這里我們調(diào)用了父類(lèi)的構(gòu)造函數(shù)將parent(這里是null)傳遞給了AbstractChannel,然后設(shè)置了感興趣的事件為SelectionKey.OP_READ,并且設(shè)置該通道為非阻塞,這里沒(méi)有太多的東西,看下AbstractChannel的構(gòu)造函數(shù)實(shí)現(xiàn):
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
可以看到這里的東西還是有點(diǎn)兒多的,首先將子類(lèi)傳遞過(guò)來(lái)的Channel parent設(shè)置到this.parent,這里是個(gè)null,接下來(lái)再實(shí)例化id,這里的id實(shí)現(xiàn)為DefaultChannelId,接下來(lái)實(shí)例化了一個(gè)Unsafe的實(shí)現(xiàn),這個(gè)Unsafe是Netty的自定義實(shí)現(xiàn),看下其接口聲明:
interface Unsafe {
//返回指定的RecvByteBufAllocator.Handle,它將用于在接收數(shù)據(jù)時(shí)分配ByteBuf。
RecvByteBufAllocator.Handle recvBufAllocHandle();
//返回本地綁定的SocketAddress
SocketAddress localAddress();
//返回遠(yuǎn)程綁定的SocketAddress
SocketAddress remoteAddress();
// 注冊(cè)ChannelPromise到Channel,并在注冊(cè)完成后通知ChannelFuture
void register(EventLoop eventLoop, ChannelPromise promise);
//將SocketAddress綁定到ChannelPromise的Channel上,當(dāng)完成后將通知ChannelPromise
void bind(SocketAddress localAddress, ChannelPromise promise);
// 將給定ChannelFuture的Channel與給定的遠(yuǎn)程SocketAddress連接,如果要使用本地localAddress只需要給定參數(shù),其它情況只需要傳遞null即可,連接操作完成后,ChannelPromise將收到通知
void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
//斷開(kāi)ChannelFuture的Channel連接并且當(dāng)操作完成后通知ChannelPromise
void disconnect(ChannelPromise promise);
//關(guān)閉ChannelFuture的Channel連接并且當(dāng)操作完成后通知ChannelPromise
void close(ChannelPromise promise);
//立即關(guān)閉Channel而不觸發(fā)任何事件,可能只有在注冊(cè)嘗試失敗時(shí)才有用。
void closeForcibly();
//從EventLoop注銷(xiāo)ChannelPromise的Channel,并在操作完成后通知ChannelPromise
void deregister(ChannelPromise promise);
//調(diào)度一個(gè)讀取操作,該操作將填充ChannelPipeline中第一個(gè)ChannelInboundHandler的入站緩沖區(qū)。如果已經(jīng)存在掛起的讀取操作,則此方法不執(zhí)行任何操作。
void beginRead();
// 調(diào)度一個(gè)寫(xiě)操作
void write(Object msg, ChannelPromise promise);
// 調(diào)度write(Object, ChannelPromise)方法,刷新所有的寫(xiě)操作
void flush();
//返回一個(gè)特殊的ChannelPromise,它可以被重用并傳遞給{@link Unsafe}中的操作。它永遠(yuǎn)不會(huì)收到成功或錯(cuò)誤的通知,因此只是操作的占位符以ChannelPromise作為參數(shù),但不希望得到通知。
ChannelPromise voidPromise();
//返回存儲(chǔ)掛起寫(xiě)入請(qǐng)求的Channel的ChannelOutboundBuffer
ChannelOutboundBuffer outboundBuffer();
}
這么看來(lái)所有的網(wǎng)絡(luò)底層操作都封裝到了Netty實(shí)現(xiàn)的Unsafe接口中,這里我們看些newUnsafe代碼的實(shí)現(xiàn),在該父類(lèi)中定義的是一個(gè)抽象方法,具體的實(shí)現(xiàn)在NioSocketChannel子類(lèi)中:
protected AbstractNioUnsafe newUnsafe() {
return new NioSocketChannelUnsafe();
}
可以看到Unsafe的具體實(shí)現(xiàn)是由NioSocketChannelUnsafe()類(lèi)來(lái)實(shí)現(xiàn)的,其類(lèi)層級(jí)圖如下所示:

這里就不展開(kāi)講了。
繼續(xù)往下走,接下來(lái)就到了pipeline = newChannelPipeline()了,這也對(duì)應(yīng)了每個(gè)Channel都有一個(gè)對(duì)應(yīng)的ChannelPipeline,這也是Netty實(shí)現(xiàn)無(wú)鎖化的關(guān)鍵,后面的事件都是在該P(yáng)ipeline中流轉(zhuǎn)的,看下其實(shí)現(xiàn)代碼吧:
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
這里的實(shí)現(xiàn)比較簡(jiǎn)單,直接實(shí)例化了一個(gè)DefaultChannelPipeline,并且將當(dāng)前對(duì)象以構(gòu)造參數(shù)的形式傳遞過(guò)去了,繼續(xù)深入進(jìn)去看看:
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
這里可以看到這里有兩個(gè)節(jié)點(diǎn)分別是tail、head,并且它們進(jìn)行了關(guān)聯(lián),組成了一個(gè)雙向鏈表的形式,接下來(lái)我們看下TailContext的類(lèi)結(jié)構(gòu)圖:

接下來(lái)看下構(gòu)造函數(shù)代碼實(shí)現(xiàn):
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, TailContext.class);
setAddComplete();
}
可以看到調(diào)用了父類(lèi)AbstractChannelHandlerContext父類(lèi)的構(gòu)造函數(shù):
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, Class<? extends ChannelHandler> handlerClass) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.executionMask = mask(handlerClass);
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
接下來(lái),我們繼續(xù)跟進(jìn)HeadContext的實(shí)現(xiàn),首先看下HeadContext的類(lèi)結(jié)構(gòu)圖:

代碼實(shí)現(xiàn)和TailContext一致:
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, HeadContext.class);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
和TailContext一樣,HeadContext的父類(lèi)也是AbstractChannelHandlerContext抽象類(lèi),到此為止,我們就完成了整個(gè)Channel的實(shí)例化工作了。我們繼續(xù)回到AbstractBootstrap.initAndRegister()方法,下一步操作并是執(zhí)行init(channel)方法:
void init(Channel channel) {
ChannelPipeline p = channel.pipeline();
p.addLast(config.handler());
setChannelOptions(channel, newOptionsArray(), logger);
setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
}
這里的ChannelPipeline并是我們前面實(shí)例化的DefaultChannelPipeline,然后將我們EchoClient中設(shè)置的Handler添加到ChannelPipeline中,并設(shè)置ChannelOption以及屬性等操作。
繼續(xù)回到AbstractBootstrap.initAndRegister()方法,我們可以看到下面的代碼塊:
ChannelFuture regFuture = config().group().register(channel);
這里通過(guò)config()方法獲取到BootstrapConfig實(shí)例,然后根據(jù)該實(shí)例的group()方法,里面的實(shí)現(xiàn)是直接bootstrap的group屬性,所以這里的group就是我們程序里面設(shè)置的NioEventLoopGroup對(duì)象,我們定位到NioEventLoopGroup對(duì)象的register(channel)方法,該方法由父類(lèi)MultithreadEventLoopGroup實(shí)現(xiàn):
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
可以看到通過(guò)next()方法從線程組里面選取一個(gè)NioEventLoop線程來(lái)執(zhí)行操作的,我們來(lái)跟進(jìn)next()方法的實(shí)現(xiàn),該方法由父類(lèi)MultithreadEventExecutorGroup來(lái)實(shí)現(xiàn):
public EventExecutor next() {
return chooser.next();
}
可以看到的是,這里是由我們之前設(shè)置的EventExecutorChooserFactory.EventExecutorChooser來(lái)實(shí)現(xiàn)的,如果傳遞線程的數(shù)量是2的冪則使用的是PowerOfTwoEventExecutorChooser,否則使用的是GenericEventExecutorChooser,這里不過(guò)是使用獲取線程的方式不同,下面來(lái)分別看下兩者的實(shí)現(xiàn):
PowerOfTwoEventExecutorChooser的實(shí)現(xiàn):
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
可以看到PowerOfTwoEventExecutorChooser的實(shí)現(xiàn)是通過(guò)一個(gè)原子整型的自增器不斷增加然后與線程池里面的數(shù)量-1與之進(jìn)行邏輯與獲取該線程池里面的一個(gè)線程,其效率是非常高效的。
接下來(lái)看下GenericEventExecutorChooser的實(shí)現(xiàn):
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
private final AtomicInteger idx = new AtomicInteger();
private final EventExecutor[] executors;
GenericEventExecutorChooser(EventExecutor[] executors) {
this.executors = executors;
}
@Override
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
與PowerOfTwoEventExecutorChooser不同的是,GenericEventExecutorChooser的實(shí)現(xiàn)方式則是通過(guò)原子整型的自增器不斷增加然后與線程池里面的數(shù)量進(jìn)行模除操作來(lái)定義一個(gè)線程,其效率是沒(méi)有按位與高效。
回到之前的代碼,next()方法與設(shè)置的線程數(shù)是有很大關(guān)系的,推薦做法是使用2的冪來(lái)設(shè)置線程的數(shù)量,這樣使用按位與能獲得更好的性能。next()方法返回的是NioEventLoop線程對(duì)象,我們定位到該方法是由其父類(lèi)SingleThreadEventLoop來(lái)實(shí)現(xiàn)的:
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
可以看到具體的實(shí)現(xiàn)是由channel的Unsafe類(lèi)的實(shí)現(xiàn)類(lèi)NioSocketChannelUnsafe來(lái)實(shí)現(xiàn)的,不過(guò)其register()方法是由其父類(lèi)AbstractUnsafe來(lái)實(shí)現(xiàn)的:
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
}
}
繼續(xù)跟進(jìn)register0:
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
if (isActive()) {
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
這里我們跟進(jìn)doRegister()方法,這里的實(shí)現(xiàn)是由AbstractNioChannel來(lái)完成的,源碼如下:
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
// Force the Selector to select now as the "canceled" SelectionKey may still be
// cached and not removed because no Select.select(..) operation was called yet.
eventLoop().selectNow();
selected = true;
} else {
// We forced a select operation on the selector before but the SelectionKey is still cached
// for whatever reason. JDK bug ?
throw e;
}
}
}
}
該方法用于將SocketChannel注冊(cè)到到Selector中,并且監(jiān)聽(tīng)0事件(表示不監(jiān)聽(tīng)任何事件),attachment為當(dāng)前channel。
在一系列操作完成之后,最終調(diào)用的是Bootstrap#doConnect:
private static void doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
final Channel channel = connectPromise.channel();
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (localAddress == null) {
channel.connect(remoteAddress, connectPromise);
} else {
channel.connect(remoteAddress, localAddress, connectPromise);
}
connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
});
}
這里會(huì)通過(guò)調(diào)用對(duì)應(yīng)的NioEventLoop線程來(lái)發(fā)起NioSocketChannel#connect方法的調(diào)用,而這個(gè)方法對(duì)應(yīng)調(diào)用的是DefaultChannelPipeline#connect方法,如下:
public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return tail.connect(remoteAddress, promise);
}
在此之前我們分析過(guò)DefaultChannelPipeline對(duì)象中有一個(gè)雙向關(guān)聯(lián)的鏈表,這里我們可以看到調(diào)用的是尾部鏈表,繼續(xù)跟進(jìn)去,其對(duì)應(yīng)位置在AbstractChannelHandlerContext#connect:
public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
return connect(remoteAddress, null, promise);
}
public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}
final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeConnect(remoteAddress, localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeConnect(remoteAddress, localAddress, promise);
}
}, promise, null, false);
}
return promise;
}
這里我們看到主要是通過(guò)findContextOutbound方法傳遞一個(gè)MASK_CONNECT參數(shù)獲取一個(gè)對(duì)應(yīng)的AbstractChannelHandlerContext對(duì)象來(lái)執(zhí)行下面的操作邏輯,所以我們跟進(jìn)該方法看下其實(shí)現(xiàn):
private AbstractChannelHandlerContext findContextOutbound(int mask) {
AbstractChannelHandlerContext ctx = this;
EventExecutor currentExecutor = executor();
do {
ctx = ctx.prev;
} while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
return ctx;
}
因?yàn)樵贒efaultChannelPipeline中使用的是雙向鏈表保存著各個(gè)ChannelHandler,所以這里采用的是do-while循環(huán)的方式查找對(duì)應(yīng)的AbstractChannelContext,而do-while結(jié)束的條件是通過(guò)skipContext方法計(jì)算得出,來(lái)看看里面的實(shí)現(xiàn):
private static boolean skipContext(AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
return (ctx.executionMask & (onlyMask | mask)) == 0 ||
(ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
}
這里采用的是位運(yùn)算操作,可以大概的理解為利用位運(yùn)算符計(jì)算得出合適的HandlerContext即可,感興趣的小伙伴可以自行參考ChannelHandlerMask#mask方法,也就是我們沒(méi)實(shí)例化一個(gè)Handler對(duì)象,在該對(duì)象的屬性中都會(huì)存放一個(gè)executionMask,而在我們的DefaultChannelPipeline中的TailContext和HeadContext中都有著對(duì)應(yīng)關(guān)于mask的特殊處理,也就是處理所有的事件,下面是TailContext的mask源碼:
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, TailContext.class);
setAddComplete();
}
}
這里調(diào)用了父類(lèi)的構(gòu)造方法,也就是AbstractChannelHandlerContext的構(gòu)造方法:
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, Class<? extends ChannelHandler> handlerClass) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.executionMask = mask(handlerClass);
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
ChannelHandlerMask對(duì)應(yīng)的常量字段:
// Using to mask which methods must be called for a ChannelHandler.
static final int MASK_EXCEPTION_CAUGHT = 1;
static final int MASK_CHANNEL_REGISTERED = 1 << 1;
static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
static final int MASK_CHANNEL_ACTIVE = 1 << 3;
static final int MASK_CHANNEL_INACTIVE = 1 << 4;
static final int MASK_CHANNEL_READ = 1 << 5;
static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;
static final int MASK_BIND = 1 << 9;
static final int MASK_CONNECT = 1 << 10;
static final int MASK_DISCONNECT = 1 << 11;
static final int MASK_CLOSE = 1 << 12;
static final int MASK_DEREGISTER = 1 << 13;
static final int MASK_READ = 1 << 14;
static final int MASK_WRITE = 1 << 15;
static final int MASK_FLUSH = 1 << 16;
static final int MASK_ONLY_INBOUND = MASK_CHANNEL_REGISTERED |
MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |
MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;
private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_INBOUND;
static final int MASK_ONLY_OUTBOUND = MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |
MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;
private static final int MASK_ALL_OUTBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_OUTBOUND;
這里調(diào)用了ChannelHandlerMask#mask方法,如下:
static int mask(Class<? extends ChannelHandler> clazz) {
// MASKS為一個(gè)Netty自定義的ThreadLocal實(shí)現(xiàn)
Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();
Integer mask = cache.get(clazz);
if (mask == null) {
mask = mask0(clazz);
cache.put(clazz, mask);
}
return mask;
}
這里主要的邏輯是mask0方法,如下:
private static int mask0(Class<? extends ChannelHandler> handlerType) {
int mask = MASK_EXCEPTION_CAUGHT;
try {
if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_INBOUND;
if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_REGISTERED;
}
if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_UNREGISTERED;
}
if (isSkippable(handlerType, "channelActive", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_ACTIVE;
}
if (isSkippable(handlerType, "channelInactive", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_INACTIVE;
}
if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) {
mask &= ~MASK_CHANNEL_READ;
}
if (isSkippable(handlerType, "channelReadComplete", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_READ_COMPLETE;
}
if (isSkippable(handlerType, "channelWritabilityChanged", ChannelHandlerContext.class)) {
mask &= ~MASK_CHANNEL_WRITABILITY_CHANGED;
}
if (isSkippable(handlerType, "userEventTriggered", ChannelHandlerContext.class, Object.class)) {
mask &= ~MASK_USER_EVENT_TRIGGERED;
}
}
if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_OUTBOUND;
if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
SocketAddress.class, ChannelPromise.class)) {
mask &= ~MASK_BIND;
}
if (isSkippable(handlerType, "connect", ChannelHandlerContext.class, SocketAddress.class,
SocketAddress.class, ChannelPromise.class)) {
mask &= ~MASK_CONNECT;
}
if (isSkippable(handlerType, "disconnect", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_DISCONNECT;
}
if (isSkippable(handlerType, "close", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_CLOSE;
}
if (isSkippable(handlerType, "deregister", ChannelHandlerContext.class, ChannelPromise.class)) {
mask &= ~MASK_DEREGISTER;
}
if (isSkippable(handlerType, "read", ChannelHandlerContext.class)) {
mask &= ~MASK_READ;
}
if (isSkippable(handlerType, "write", ChannelHandlerContext.class,
Object.class, ChannelPromise.class)) {
mask &= ~MASK_WRITE;
}
if (isSkippable(handlerType, "flush", ChannelHandlerContext.class)) {
mask &= ~MASK_FLUSH;
}
}
if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) {
mask &= ~MASK_EXCEPTION_CAUGHT;
}
} catch (Exception e) {
// Should never reach here.
PlatformDependent.throwException(e);
}
return mask;
}
該方法用于根據(jù)傳遞的Handler類(lèi)型計(jì)算得出對(duì)應(yīng)的mask,TailContext因?yàn)閷?shí)現(xiàn)了ChannelInboundHandler接口,所以走的是下面的邏輯:
int mask = MASK_EXCEPTION_CAUGHT;
if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_INBOUND;
}
最終TailContext的executionMask也就為511了。
接下來(lái),我們看下HeadContext的executionMask是如何計(jì)算得出的,看下HeadContext的構(gòu)造方法:
final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, HeadContext.class);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
}
可以看到的是,HeadContext和TailContext的不同之處是該對(duì)象既實(shí)現(xiàn)了ChannelOutboundHandler接口也實(shí)現(xiàn)了ChannelInboundHandler接口,正所謂能力越大責(zé)任越大啊,不過(guò)它們相同之處便是都繼承了AbstractChannelHandlerContext類(lèi),因?yàn)檫@里處理的方式相同,所以省略看下上文即可,不同的地方是,因?yàn)镠eadContext實(shí)現(xiàn)了2個(gè)接口,所以對(duì)應(yīng)的ChannelHandlerMask#mask0對(duì)應(yīng)的處理有所不同,如下所示:
int mask = MASK_EXCEPTION_CAUGHT;
if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_INBOUND;
}
if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
mask |= MASK_ALL_OUTBOUND;
}
所以HeadContext對(duì)應(yīng)處理了所有關(guān)于in和out的事件,所以HeadContext的executionMask最終得到了131071。
所以回到我們之前的findContextOutbound方法,最終遍歷鏈表找到的便是HeadContext對(duì)象了。找到了該對(duì)象之后回到AbstractChannelHandlerContext#connect方法,如下:
public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
//...
//這里返回的是HeadContext
final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
EventExecutor executor = next.executor();
//如果是在本線程之內(nèi)直接發(fā)起調(diào)用
if (executor.inEventLoop()) {
next.invokeConnect(remoteAddress, localAddress, promise);
} else {
//否則使用異步任務(wù)發(fā)起處理
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeConnect(remoteAddress, localAddress, promise);
}
}, promise, null, false);
}
return promise;
}
這里通過(guò)找到HeadContext調(diào)用了invokeConnect方法,如下:
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
//當(dāng)前ChannelHandler的handlerAdded方法是否已經(jīng)被調(diào)用或者不保證順序并且ChannelHandler的handlerAdded方法即將被調(diào)用
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
connect(remoteAddress, localAddress, promise);
}
}
繼續(xù)根據(jù)connect方法,HeadContext#connect:
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) {
unsafe.connect(remoteAddress, localAddress, promise);
}
這里的unsafe對(duì)象對(duì)應(yīng)了NioSocketChannelUnsafe對(duì)象,這里的connect調(diào)用了AbstractNioChannel#connect方法:
public final void connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
try {
if (connectPromise != null) {
// Already a connect in process.
throw new ConnectionPendingException();
}
boolean wasActive = isActive();
//doConnect方法會(huì)返回false
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
//保存起來(lái)用于檢測(cè)是否發(fā)起重復(fù)的連接操作
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
//處理連接超時(shí)的情況
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}
這里跟進(jìn)doConnect方法,位置在NioSocketChannel#doConnect:
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
doBind0(localAddress);
}
boolean success = false;
try {
//利用Netty的工具類(lèi)連接到遠(yuǎn)程主機(jī),這里返回的是false
boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
if (!connected) {
//注冊(cè)連接事件
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
到此為止,關(guān)于Netty的Bootstrap的整個(gè)流程就分析完成了。
整個(gè)流程圖如下所示。
