1.1 Bootstrap的作用
Bootstrap的作用可以參考AbstractBootstrap的javadoc:
AbstractBootstrap is a helper class that makes it easy to bootstrap a Channel.
Bootstrap存在的意義就是為了方便的"引導(dǎo)"Channel.
在netty中, 存在兩種類型的Channel, 因此也對(duì)應(yīng)有兩種Bootstrap
| channel類型 | 用于引導(dǎo)的bootstrap實(shí)現(xiàn)類 |
|---|---|
| ServerChannel | ServerBootstrap |
| Channel | Bootstrap |
1.2 Bootstrap的繼承結(jié)構(gòu)
在netty的代碼中, 類ServerBootstrap和類Bootstrap都繼承自基類AbstractBootstrap:

2.類AbstractBootstrap
2.1 類定義
AbstractBootstrap是Bootstrap的基類, 類定義如下:
package io.netty.bootstrap;
public abstract class AbstractBootstrap
<B extends AbstractBootstrap<B, C>, C extends Channel>
implements Cloneable {}
類定義中的泛型B要求是AbstractBootstrap的子類, 而泛型C要求是Channel的子類.
注意這里的泛型的用法,非常的巧妙。
2.2 成員變量
group屬性
volatile EventLoopGroup group;
public B group(EventLoopGroup group) {
if (group == null) {
throw new NullPointerException("group");
}
if (this.group != null) {
throw new IllegalStateException("group set already");
}
this.group = group;
return (B) this;
}
public EventLooattrspGroup group() {
return group;
}
注意this.group只能設(shè)置一次, 這意味著group(group)方法只能被調(diào)用一次.
localAddress屬性
localAddress用于綁定本地終端, 有多個(gè)設(shè)值的方法:
private volatile SocketAddress localAddress;
public B localAddress(SocketAddress localAddress) {
this.localAddress = localAddress;
return (B) this;
}
public B localAddress(int inetPort) {
return localAddress(new InetSocketAddress(inetPort));
}
public B localAddress(String inetHost, int inetPort) {
return localAddress(new InetSocketAddress(inetHost, inetPort));
}
public B localAddress(InetAddress inetHost, int inetPort) {
return localAddress(new InetSocketAddress(inetHost, inetPort));
}
final SocketAddress localAddress() {
return localAddress;
}
這些重載的localAddress(), 最終都指向了InetSocketAddress的幾個(gè)構(gòu)造函數(shù).
options屬性
options屬性是一個(gè)LinkedHashMap, option()方法用于設(shè)置單個(gè)的key/value, 如果value為null則刪除該key.
private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
public <T> B option(ChannelOption<T> option, T value) {
if (option == null) {
throw new NullPointerException("option");
}
if (value == null) {
synchronized (options) {
options.remove(option);
}
} else {
synchronized (options) {
options.put(option, value);
}
}
return (B) this;
}
final Map<ChannelOption<?>, Object> options() {
return options;
}
attrs屬性
attrs和options屬性類似.
private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();
public <T> B attr(AttributeKey<T> key, T value) {
if (key == null) {
throw new NullPointerException("key");
}
if (value == null) {
synchronized (attrs) {
attrs.remove(key);
}
} else {
synchronized (attrs) {
attrs.put(key, value);
}
}
return (B) this;
}
final Map<AttributeKey<?>, Object> attrs() {
return attrs;
}
handler屬性
private volatile ChannelHandler handler;
public B handler(ChannelHandler handler) {
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
return (B) this;
}
final ChannelHandler handler() {
return handler;
}
channelFactory屬性
新舊兩個(gè)ChannelFactory
channelFactory這個(gè)屬性有點(diǎn)麻煩, 根源在于ChannelFactory這個(gè)類,netty中有新舊兩個(gè)ChannelFactory,具體介紹見 Channel Factory
混合使用
但是現(xiàn)在的情況是內(nèi)部已經(jīng)轉(zhuǎn)為使用新類, 對(duì)外的接口還是繼續(xù)保持使用原來的舊類, 因此代碼有些混亂:
// 這里的channelFactory的類型定義用的是舊類,因此需要加SuppressWarnings
@SuppressWarnings("deprecation")
private volatile ChannelFactory< ? extends C> channelFactory;
// 返回的channelFactory也是用的舊類, 沒的說, 繼續(xù)SuppressWarnings
@SuppressWarnings("deprecation")
final ChannelFactory< ? extends C> channelFactory() {
return channelFactory;
}
// 這個(gè)方法的參數(shù)是舊的"io.netty.bootstrap.ChannelFactory",已經(jīng)被標(biāo)志為"@Deprecated",盡量用下面的方法
@Deprecated
public B channelFactory(ChannelFactory< ? extends C> channelFactory) {
if (channelFactory == null) {
throw new NullPointerException("channelFactory");
}
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return (B) this;
}
// 這個(gè)方法是現(xiàn)在推薦使用的設(shè)置channelFactory的方法, 使用新類"io.netty.channel.ChannelFactory"
@SuppressWarnings({ "unchecked", "deprecation" })
public B channelFactory(io.netty.channel.ChannelFactory< ? extends C> channelFactory) {
// 但是底層的實(shí)現(xiàn)還是調(diào)用回上面被廢棄的channelFactory()方法
// 因?yàn)樾骂愂抢^承自舊類的,所有只要簡單轉(zhuǎn)一下類型就好
return channelFactory((ChannelFactory<C>) channelFactory);
}
此外還有一個(gè)channel()方法可以非常方便的設(shè)置channelFactory:
public B channel(Class< ? extends C> channelClass) {
if (channelClass == null) {
throw new NullPointerException("channelClass");
}
return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}
2.3 類方法
validate()
validate()用于檢驗(yàn)所有的參數(shù), 實(shí)際代碼中檢查的是group和channelFactory兩個(gè)參數(shù), 這兩個(gè)參數(shù)必須設(shè)置不能為空:
public B validate() {
if (group == null) {
throw new IllegalStateException("group not set");
}
if (channelFactory == null) {
throw new IllegalStateException("channel or channelFactory not set");
}
return (B) this;
}
register()
register()方法創(chuàng)建一個(gè)新的Channel并將它注冊(cè)到EventLoop, 在執(zhí)行前會(huì)調(diào)用validate()方法做前置檢查:
public ChannelFuture register() {
validate();
return initAndRegister();
}
initAndRegister()是關(guān)鍵代碼, 細(xì)細(xì)讀一下:
final ChannelFuture initAndRegister() {
// 創(chuàng)建一個(gè)新的Channel
final Channel channel = channelFactory().newChannel();
try {
// 調(diào)用抽象方法, 子類來做初始化
init(channel);
} catch (Throwable t) {
// 如果出錯(cuò), 強(qiáng)行關(guān)閉這個(gè)channel
channel.unsafe().closeForcibly();
// as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 創(chuàng)建成功則將這個(gè)channel注冊(cè)到eventloop中
ChannelFuture regFuture = group().register(channel);
// 如果注冊(cè)出錯(cuò)
if (regFuture.cause() != null) {
// 判斷是否已經(jīng)注冊(cè)
if (channel.isRegistered()) {
// channel已經(jīng)注冊(cè)的就關(guān)閉
channel.close();
} else {
// 還沒有注冊(cè)的就強(qiáng)行關(guān)閉
channel.unsafe().closeForcibly();
}
}
// 如果代碼走到這里而且promise沒有失敗, 那么是下面兩種情況之一:
// 1) 如果我們嘗試了從event loop中注冊(cè), 那么現(xiàn)在注冊(cè)已經(jīng)完成
// 現(xiàn)在可以安全的嘗試 bind()或者connect(), 因?yàn)閏hannel已經(jīng)注冊(cè)成功
// 2) 如果我們嘗試了從另外一個(gè)線程中注冊(cè), 注冊(cè)請(qǐng)求已經(jīng)成功添加到event loop的任務(wù)隊(duì)列中等待后續(xù)執(zhí)行
// 現(xiàn)在可以安全的嘗試 bind()或者connect():
// 因?yàn)?bind() 或 connect() 會(huì)在安排的注冊(cè)任務(wù)之后執(zhí)行
// 而register(), bind(), 和 connect() 都被確認(rèn)是同一個(gè)線程
return regFuture;
}
中途調(diào)用的init()方法定義如下, 后面看具體子類代碼時(shí)再展開.
abstract void init(Channel channel) throws Exception;
bind()
bind()方法有多個(gè)重載, 差異只是bind操作所需的InetSocketAddress參數(shù)從何而來而已:
??從屬性this.localAddress來
??這個(gè)時(shí)候bind()方法無需參數(shù), 直接使用屬性this.localAddress, 當(dāng)前調(diào)用之前this.localAddress必須有賦值(通過函數(shù)localAddress()):
public ChannelFuture bind() {
validate();
SocketAddress localAddress = this.localAddress;
if (localAddress == null) {
throw new IllegalStateException("localAddress not set");
}
return doBind(localAddress);
}
從bind()方法的輸入?yún)?shù)中來
在輸入?yún)?shù)中來直接指定localAddress:
public ChannelFuture bind(SocketAddress localAddress) {
validate();
if (localAddress == null) {
throw new NullPointerException("localAddress");
}
return doBind(localAddress);
}
另外為了方便, 重載了下面三個(gè)方法, 用不同的方式來創(chuàng)建InetSocketAddress而已:
public ChannelFuture bind(int inetPort) {
return bind(new InetSocketAddress(inetPort));
}
public ChannelFuture bind(String inetHost, int inetPort) {
return bind(new InetSocketAddress(inetHost, inetPort));
}
public ChannelFuture bind(InetAddress inetHost, int inetPort) {
return bind(new InetSocketAddress(inetHost, inetPort));
}
注: 使用帶參數(shù)的bind()方法, 忽略了localAddress()設(shè)定的參數(shù). 而且也沒有設(shè)置localAddress屬性. 這里的差異, 后面留意.
繼續(xù)看doBind()方法的細(xì)節(jié), 這個(gè)依然是這個(gè)類的核心內(nèi)容:
private ChannelFuture doBind(final SocketAddress localAddress) {
// 調(diào)用initAndRegister()方法, 先初始化channel,并注冊(cè)到event loop
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
// 檢查注冊(cè)的channel是否出錯(cuò)
if (regFuture.cause() != null) {
return regFuture;
}
// 檢查注冊(cè)操作是否完成
if (regFuture.isDone()) {
// 如果完成
// 在這個(gè)點(diǎn)上我們知道注冊(cè)已經(jīng)完成并且成功
// 繼續(xù)bind操作, 創(chuàng)建一個(gè)ChannelPromise
ChannelPromise promise = channel.newPromise();
// 調(diào)用doBind0()方法來繼續(xù)真正的bind操作
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// 通常這個(gè)時(shí)候注冊(cè)的future應(yīng)該都已經(jīng)完成,但是萬一沒有, 我們也需要處理
// 為這個(gè)channel創(chuàng)建一個(gè)PendingRegistrationPromise
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
// 然后給注冊(cè)的future添加一個(gè)listener, 在operationComplete()回調(diào)時(shí)
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
// 檢查是否出錯(cuò)
if (cause != null) {
// 在event loop上注冊(cè)失敗, 因此直接讓ChannelPromise失敗, 避免一旦我們?cè)噲D訪問這個(gè)channel的eventloop導(dǎo)致IllegalStateException
promise.setFailure(cause);
} else {
// 注冊(cè)已經(jīng)成功, 因此設(shè)置正確的executor以便使用
// 注: 這里已經(jīng)以前有過一個(gè)bug, 有issue記錄
// See https://github.com/netty/netty/issues/2586
promise.executor = channel.eventLoop();
}
// 調(diào)用doBind0()方法來繼續(xù)真正的bind操作
doBind0(regFuture, channel, localAddress, promise);
}
});
return promise;
}
}
關(guān)鍵的doBind0()方法
private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {
// 這個(gè)方法在channelRegistered()方法觸發(fā)前被調(diào)用.
// 讓handler有機(jī)會(huì)在它的channelRegistered()實(shí)現(xiàn)中構(gòu)建pipeline
// 給channel的event loop增加一個(gè)一次性任務(wù)
channel.eventLoop().execute(new OneTimeTask() {
@Override
public void run() {
// 檢查注冊(cè)是否成功
if (regFuture.isSuccess()) {
// 如果成功則綁定localAddress到channel
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
// 如果不成功則設(shè)置錯(cuò)誤到promise
promise.setFailure(regFuture.cause());
}
}
});
}
3 類Bootstrap
類Bootstrap用于幫助客戶端引導(dǎo)Channel.
bind()方法用于無連接傳輸如datagram (UDP)。對(duì)于常規(guī)TCP鏈接,用connect()方法。
3.1 類定義
package io.netty.bootstrap;
public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {}
3.2 類成員
resolver屬性
resolver默認(rèn)設(shè)置為DefaultAddressResolverGroup.INSTANCE, 可以通過resolver()方法來賦值:
private static final AddressResolverGroup< ? > DEFAULT_RESOLVER = DefaultAddressResolverGroup.INSTANCE;
private volatile AddressResolverGroup<SocketAddress> resolver = (AddressResolverGroup<SocketAddress>) DEFAULT_RESOLVER;
public Bootstrap resolver(AddressResolverGroup< ? > resolver) {
if (resolver == null) {
throw new NullPointerException("resolver");
}
this.resolver = (AddressResolverGroup<SocketAddress>) resolver;
return this;
}
remoteAddress屬性
remoteAddress可以通過remoteAddress()方法賦值, 有多個(gè)重載方法:
private volatile SocketAddress remoteAddress;
public Bootstrap remoteAddress(SocketAddress remoteAddress) {
this.remoteAddress = remoteAddress;
return this;
}
public Bootstrap remoteAddress(String inetHost, int inetPort) {
remoteAddress = InetSocketAddress.createUnresolved(inetHost, inetPort);
return this;
}
public Bootstrap remoteAddress(InetAddress inetHost, int inetPort) {
remoteAddress = new InetSocketAddress(inetHost, inetPort);
return this;
}
3.3 類方法
validate()方法
重寫了validate()方法, 在調(diào)用AbstractBootstrap的validate()方法(檢查group和channelFactory)外, 增加了對(duì)handler的檢查:
@Override
public Bootstrap validate() {
super.validate();
if (handler() == null) {
throw new IllegalStateException("handler not set");
}
return this;
}
connect()方法
有多個(gè)connect()方法重載, 功能都是一樣, 拿到輸入的remoteAddress然后調(diào)用doResolveAndConnect()方法:
private ChannelFuture doResolveAndConnect(SocketAddress remoteAddress, final SocketAddress localAddress) {
// 先初始化channel并注冊(cè)到event loop
final ChannelFuture regFuture = initAndRegister();
if (regFuture.cause() != null) {
// 如果注冊(cè)失敗則退出
return regFuture;
}
final Channel channel = regFuture.channel();
final EventLoop eventLoop = channel.eventLoop();
final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);
if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
// Resolver 不知道該怎么處理給定的遠(yuǎn)程地址, 或者已經(jīng)解析
return doConnect(remoteAddress, localAddress, regFuture, channel.newPromise());
}
// 開始解析遠(yuǎn)程地址
final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);
final Throwable resolveFailureCause = resolveFuture.cause();
if (resolveFailureCause != null) {
// 如果地址解析失敗, 則立即失敗
channel.close();
return channel.newFailedFuture(resolveFailureCause);
}
if (resolveFuture.isDone()) {
// 理解成功的解析了遠(yuǎn)程地址, 開始做連接
return doConnect(resolveFuture.getNow(), localAddress, regFuture, channel.newPromise());
}
// 地址解析還沒有完成, 只能等待完成后在做connectio, 增加一個(gè)promise來操作
final ChannelPromise connectPromise = channel.newPromise();
resolveFuture.addListener(new FutureListener<SocketAddress>() {
@Override
public void operationComplete(Future<SocketAddress> future) throws Exception {
if (future.cause() != null) {
channel.close();
connectPromise.setFailure(future.cause());
} else {
doConnect(future.getNow(), localAddress, regFuture, connectPromise);
}
}
});
return connectPromise;
}
doConnect()方法中才是真正的開始處理連接操作, 但是還是需要檢查注冊(cè)操作是否完成:
private static ChannelFuture doConnect(
final SocketAddress remoteAddress, final SocketAddress localAddress,
final ChannelFuture regFuture, final ChannelPromise connectPromise) {
// 判斷一下前面的注冊(cè)操作是否已經(jīng)完成
// 因?yàn)樽?cè)操作是異步操作, 前面只是返回一個(gè)feature, 代碼執(zhí)行到這里時(shí), 可能完成, 也可能還在進(jìn)行中
if (regFuture.isDone()) {
// 如果注冊(cè)已經(jīng)完成, 可以執(zhí)行連接了
doConnect0(remoteAddress, localAddress, regFuture, connectPromise);
} else {
// 如果注冊(cè)還在進(jìn)行中, 增加一個(gè)ChannelFutureListener, 等操作完成之后再在回調(diào)方法中執(zhí)行連接操作
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doConnect0(remoteAddress, localAddress, regFuture, connectPromise);
}
});
}
return connectPromise;
}
異步操作就是這點(diǎn)比較麻煩, 總是需要一個(gè)一個(gè)future的做判斷/處理, 如果沒有完成還的加promise/future來依靠回調(diào)函數(shù)繼續(xù)工作處理流程.
終于到了最后的doConnect0()方法, 總算可以真的連接了:
private static void doConnect0(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelFuture regFuture,
final ChannelPromise connectPromise) {
// 這個(gè)方法在channelRegistered()方法被觸發(fā)前調(diào)用.
// 給我們的handler一個(gè)在它的channelRegistered()實(shí)現(xiàn)中構(gòu)建pipeline的機(jī)會(huì)
final Channel channel = connectPromise.channel();
// 取當(dāng)前channel的eventlopp, 執(zhí)行一個(gè)一次性任務(wù)
channel.eventLoop().execute(new OneTimeTask() {
@Override
public void run() {
if (regFuture.isSuccess()) {
// 如果注冊(cè)成功
if (localAddress == null) {
channel.connect(remoteAddress, connectPromise);
} else {
channel.connect(remoteAddress, localAddress, connectPromise);
}
connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
connectPromise.setFailure(regFuture.cause());
}
}
});
}
init(channel)方法
前面看基類AbstractBootstrap時(shí)看到過, 這個(gè)init()方法是一個(gè)模板方法, 需要子類做具體實(shí)現(xiàn).
看看Bootstrap是怎么做channel初始化的:
@Override
@SuppressWarnings("unchecked")
void init(Channel channel) throws Exception {
// 取channel的ChannelPipeline
ChannelPipeline p = channel.pipeline();
// 增加當(dāng)前Bootstrap的handle到ChannelPipeline中
p.addLast(handler());
// 取當(dāng)前Bootstrap設(shè)置的options, 逐個(gè)設(shè)置到channel中
final Map<ChannelOption< ? >, Object> options = options();
synchronized (options) {
for (Entry<ChannelOption< ? >, Object> e: options.entrySet()) {
try {
if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + channel, t);
}
}
}
// 同樣取當(dāng)前Bootstrap的attrs, 逐個(gè)設(shè)置到channel中
final Map<AttributeKey< ? >, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey< ? >, Object> e: attrs.entrySet()) {
channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
}
}
總結(jié)上在init()方法中, Bootstrap只做了一個(gè)事情: 將Bootstrap中保存的信息(handle/options/attrs)設(shè)置到新創(chuàng)建的channel.
clone()
深度克隆當(dāng)前Bootstrap對(duì)象,有完全一樣的配置,但是使用給定的EventLoopGroup。
這個(gè)方法適合用相同配置創(chuàng)建多個(gè)Channel。
public Bootstrap clone(EventLoopGroup group) {
Bootstrap bs = new Bootstrap(this);
bs.group = group;
return bs;
}
4 類ServerBootstrap
類ServerBootstrap用于幫助服務(wù)器端引導(dǎo)ServerChannel.
ServerBootstrap除了處理ServerChannel外, 還需要處理從ServerChannel下創(chuàng)建的Channel.Netty中稱這兩個(gè)關(guān)系為parent和child.
4.1 類定義
public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {}
4.2 類屬性
childGroup屬性
childGroup屬性用于指定處理客戶端連接的EventLoopGroup, 設(shè)置的方式有兩種:
group(parentGroup, childGroup)方法用于單獨(dú)設(shè)置parentGroup, childGroup, 分別用于處理ServerChannel和Channel.
group(group)方法設(shè)置parentGroup, childGroup為使用同一個(gè)EventLoopGroup. 注意這個(gè)方法覆蓋了基類的方法.
private volatile EventLoopGroup childGroup;
@Override
public ServerBootstrap group(EventLoopGroup group) {
return group(group, group);
}
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
super.group(parentGroup);
if (childGroup == null) {
throw new NullPointerException("childGroup");
}
if (this.childGroup != null) {
throw new IllegalStateException("childGroup set already");
}
this.childGroup = childGroup;
return this;
}
public EventLoopGroup childGroup() {
return childGroup;
}
4.3 childOptions/childAttrs/childHandler屬性
這三個(gè)屬性和parent的基本對(duì)應(yīng), 設(shè)值方法和檢驗(yàn)都是一模一樣的:
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();
private volatile ChannelHandler childHandler;
4.4 類方法
init()方法
ServerBootstrap的init(channel)方法相比Bootstrap的要復(fù)雜一些, 除了設(shè)置options/attrs/handler到channel外, 還需要為child設(shè)置childGroup, childHandler, childOptions, childAttrs:
@Override
void init(Channel channel) throws Exception {
final Map<ChannelOption< ? >, Object> options = options();
synchronized (options) {
channel.config().setOptions(options);
}
final Map<AttributeKey< ? >, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey< ? >, Object> e: attrs.entrySet()) {
@SuppressWarnings("unchecked")
AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
channel.attr(key).set(e.getValue());
}
}
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup;
final ChannelHandler currentChildHandler = childHandler;
final Entry<ChannelOption< ? >, Object>[] currentChildOptions;
final Entry<AttributeKey< ? >, Object>[] currentChildAttrs;
synchronized (childOptions) {
currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
}
synchronized (childAttrs) {
currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
}
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = handler();
if (handler != null) {
pipeline.addLast(handler);
}
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
ServerBootstrapAcceptor的實(shí)現(xiàn), 主要看channelRead()方法:
private static class ServerBootstrapAcceptor extends ChannelHandlerAdapter {
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 獲取child channel
final Channel child = (Channel) msg;
// 設(shè)置childHandler到child channel
child.pipeline().addLast(childHandler);
// 設(shè)置childOptions到child channel
for (Entry<ChannelOption< ? >, Object> e: childOptions) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + child, t);
}
}
// 設(shè)置childAttrs到child channel
for (Entry<AttributeKey< ? >, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
// 將child channel注冊(cè)到childGroup
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
}