長文預(yù)警,本文為源碼分析部分,夾雜大量源碼可能會引起不適,請選擇性閱讀。如果你只想知道Bootstrap的使用,可以閱讀前一篇文章:自頂向下深入分析Netty(三)--Bootstrap
2.源碼分析
首先看Bootstrap類圖,可見類圖比較簡單。在分析時也使用自頂向下的方法,首先分析頂層的AbstractBootstrap,然后分析其子類Bootstrap和ServerBootstrap。
2.1 AbstractBootstrap
首先看其中的類簽名:
public abstract class
AbstractBootstrap<B extends AbstractBootstrap<B, C>, C extends Channel>
implements Cloneable
這個泛型這么復(fù)雜,我已經(jīng)不想繼續(xù)了。別急,這個泛型其實也是一個標(biāo)準(zhǔn)用法,在JDK的Enum中就有類似的使用方法:Enum<E extends Enum<E>>。還記得在ServerBootstarp中可以使用.group().channel().option().handler()這樣的形式嗎?這正是這個泛型的功能:在子類中返回子類本身,無需轉(zhuǎn)型。更多的細(xì)節(jié),可到這個連接查看。
再看其中的字段含義:
// reactor線程池
volatile EventLoopGroup group;
// 通道工廠,主要用來創(chuàng)建初始的Channel,比如服務(wù)端的第一個執(zhí)行bind()方法的serverChannel,
// 客戶端第一個執(zhí)行connect()方法的Channel
private volatile ChannelFactory<? extends C> channelFactory;
private volatile SocketAddress localAddress;
// channel相關(guān)的選項參數(shù)
private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<>();
// 初始化channel的屬性值
private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<>();
// 業(yè)務(wù)邏輯Handler,主要是HandlerInitializer,也可能是普通Handler
private volatile ChannelHandler handler;
接著看其中的關(guān)鍵方法說明:
// 設(shè)置Channel的無參構(gòu)造工廠
public B channel(Class<? extends C> channelClass);
// 設(shè)置Channel工廠
public B channelFactory(ChannelFactory<? extends C> channelFactory);
// 創(chuàng)建一個Channel并綁定到本地端口
public ChannelFuture bind(SocketAddress localAddress);
// 初始化Channel并且注冊到線程池
final ChannelFuture initAndRegister();
// 初始化一個Channel
abstract void init(Channel channel) throws Exception;
2.1.1 channelFactory方法
bootstrap包中有一個ChannelFactory接口,代碼如下:
public interface ChannelFactory<T extends Channel> {
T newChannel();
}
其中僅聲明了一個newChannel()方法用來創(chuàng)建一個Channel,一般使用時,會有以下兩種情況:
- 在服務(wù)端創(chuàng)建一個ServerChannel用于接受客戶端的連接
- 在客戶端創(chuàng)建一個Channel用于連接服務(wù)端
在這兩種情況下,僅僅創(chuàng)建一個Channel似乎使用Factory過于大材小用??紤]這種情況:在構(gòu)造一個代理服務(wù)器時,服務(wù)端需要創(chuàng)建大量的Channel連接目標(biāo)服務(wù)器,這樣使用Factory就很好。并且,當(dāng)Channel的無參構(gòu)造方法不能滿足需求時,可以方便用戶定義自己獨有的工廠創(chuàng)建滿足需求的Channel。
接下來分析channelFacotry()方法,這是一個簡單的setter方法(代碼中的null檢查不再列出):
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
this.channelFactory = channelFactory;
return (B) this;
}
以及另一個默認(rèn)提供無參構(gòu)造方法Channel的工廠setter方法:
public B channel(Class<? extends C> channelClass) {
return channelFactory(new BootstrapChannelFactory<C>(channelClass));
}
其中的BootstrapChannelFactory用于反射調(diào)用無參構(gòu)造方法創(chuàng)建一個Chanel,代碼如下:
public T newChannel() {
try {
return clazz.newInstance();
} catch (Throwable t) {
throw new ChannelException("Unable to create Channel from class ");
}
}
2.1.2 bind方法
bind()方法是AbstarctBootstrap的核心方法,用于綁定前述工廠創(chuàng)建的Channel到本地的一個端口,其中有很多變體方法,關(guān)鍵的一個如下:
public ChannelFuture bind(SocketAddress localAddress) {
validate(); //確保關(guān)鍵參數(shù)設(shè)置正確
return doBind(localAddress);
}
validate()對參數(shù)進(jìn)行驗證,確保關(guān)鍵參數(shù)設(shè)置正確,由于其實現(xiàn)簡單,不貼出代碼。再分析關(guān)鍵的doBind()方法:
private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister(); // 創(chuàng)建Channel并注冊到線程池
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
if (regFuture.isDone()) {
// 一般情況下,channel注冊完成且注冊成功
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// 由于注冊是異步事件,可能此時沒有注冊完成,那么使用異步操作
final PendingRegistrationPromise promise =
new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
// 該方法在注冊完成時調(diào)用
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// 注冊過程中有異常則失敗
promise.setFailure(cause);
} else {
// 注冊完成且成功
promise.executor = channel.eventLoop(); // 設(shè)置為注冊到的線程
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
為了更好的理解這段代碼,先回顧Netty的特點:事件驅(qū)動,比如上面代碼中的兩個動作:注冊和綁定,在Netty實現(xiàn)中是兩個異步事件,其中注冊是指Channel注冊到reactor線程池,綁定是指Channel獲得了本機的一個TCP端口。如果你熟悉javascript或者GUI編程,對回調(diào)函數(shù)也不會陌生,Netty也采用類似的機制,所以能在Netty源代碼里看到大量的ChannelFuture(細(xì)節(jié)可看這一章),當(dāng)一個事件完成時,會回調(diào)注冊到ChannelFuture上的FutureListener從而實現(xiàn)異步操作。此外,Netty4為了簡化邏輯處理邏輯,遵循這樣一條規(guī)則:一個Channel產(chǎn)生的IO事件始終由其注冊到的線程處理,可知注冊和綁定事件都將在同一個線程也就是Channel注冊到的線程執(zhí)行。
從代碼中可以看出bind分為兩步:initAndRegister()以及doBind0()。initAndRegister()用于創(chuàng)建Channel、綁定用戶定義的Handler、以及將該Chanel注冊到一個Reactor中,代碼如下:
final ChannelFuture initAndRegister() {
Channel channel = null;
try {
// 創(chuàng)建一個Channel
channel = channelFactory().newChannel();
// 初始化處理器Handler
init(channel);
} catch (Throwable t) {
if (channel != null) {
channel.unsafe().closeForcibly();
}
// 還沒有注冊到線程池,使用默認(rèn)線程GlobalEventExecutor
return new DefaultChannelPromise(channel,
GlobalEventExecutor.INSTANCE).setFailure(t);
}
// 將channel注冊到Reactor線程池
ChannelFuture regFuture = group().register(channel);
if (regFuture.cause() != null) {
if (channel.isRegistered()) {
channel.close();
} else {
channel.unsafe().closeForcibly();
}
}
return regFuture;
}
abstract void init(Channel channel) throws Exception;
其中的init()主要用于初始化處理器Handler,可視為一個模板方法,由子類Bootstrap或ServerBootstrap實現(xiàn)具體的初始化細(xì)節(jié)。
接著分析實現(xiàn)綁定本地端口的doBind0()方法:
private static void doBind0(final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise)
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
如果你對channel.eventLoop().execute()的用法有疑問,再次回顧這條規(guī)則:一個Channel產(chǎn)生的IO事件始終由其注冊到的線程處理。綁定事件是一個類似write的出站事件,所以需要由channel注冊到的線程完成。為什么不使用regFuture直接添加Futurelistener完成綁定處理呢?代碼中的解釋是注冊不一定成功,失敗后可能執(zhí)行的線程并不是注冊的線程(我查看代碼在這里僅僅調(diào)用register(Channel)并不會有這樣的情況)。
這個bind過程夾雜很多私貨,總結(jié)一下流程:
- 使用ChannelFactory創(chuàng)建一個Channel
- 調(diào)用init()方法初始化Channel,主要是綁定處理器
- 注冊到一個Reactor線程池
- 對注冊的異步結(jié)果處理:注冊成功進(jìn)行綁定操作,此時綁定失敗將會關(guān)閉Channel返回異步綁定失敗,綁定成功返回異步成功;注冊失敗直接返回異步注冊失敗。
2.2 ServerBootstrap
關(guān)鍵字段如下:
// 為accept的客戶端channel設(shè)置的選項參數(shù)
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<>();
// 為accept的客戶端channel設(shè)置的屬性鍵值對
private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<>();
// subReactor線程池,父類的group成為mainReactor線程池
private volatile EventLoopGroup childGroup;
// subReactor線程池的事件處理器,一般為ChannelInitializer初始化處理器鏈
private volatile ChannelHandler childHandler;
ServerBootstrap中的關(guān)鍵方法只有父類中的模板方法init(channel)。
2.2.1 init方法
ServerBootstrap的init(channel)方法,完成的工作有:
- 設(shè)置serverChannel的選項參數(shù)
- 設(shè)置serverChannel的屬性鍵值對
- 添加處理serverChannel的IO事件處理器
其中1和2兩條都比較簡單,不再列出代碼,主要看3的處理,代碼如下:
void init(Channel channel) throws Exception {
// 1.設(shè)置serverChannel的選項參數(shù)
// 2.設(shè)置serverChannel的屬性鍵值對
// 3.添加處理serverChannel事件的處理器
ChannelPipeline p = channel.pipeline();
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup,currentChildHandler,
currentChildOptions, currentChildAttrs));
}
});
}
});
}
可見,向serverChannel添加是一個初始化處理器ChannelInitializer,完成的主要工作是將用戶Handler以及一個ServerBootstrapAcceptor添加到serverChannel的處理器鏈中。此處需要注意的是,結(jié)合文章最開始的示例,p.addLast()方法在main線程中執(zhí)行,而initChannel()方法將在Channel注冊到的線程中執(zhí)行,執(zhí)行的時機是該ChannelInitializer被添加到ChannelPipeline中時但晚于p.addLast()方法。明白了這點,繼續(xù)分析ch.eventLoop().execute()的使用,這是因為需要保證ServerBootstrapAcceptor被添加到處理器鏈的最尾部以便不破壞mainReactor將accept接受的Channel連接傳遞給subReactor。但是當(dāng)通過handler()獲得的用戶Handler也是一個ChannelInitializer,如果只是常規(guī)的使用pipeline.addLast(acceptor)將導(dǎo)致acceptor并不在處理器鏈的最尾部。
2.2.2 ServerBootstrapAcceptor靜態(tài)內(nèi)部類
在前面的分析中,不斷提到了ServerBootstrapAcceptor,正對應(yīng)文章最開始圖中綠色的acceptor。Netty默認(rèn)實現(xiàn)了這個acceptor處理器,主要功能是將mainReactor接受的Channel傳遞給subReactor。該類的字段如下:
private final EventLoopGroup childGroup;
private final ChannelHandler childHandler;
private final Entry<ChannelOption<?>, Object>[] childOptions;
private final Entry<AttributeKey<?>, Object>[] childAttrs;
與ServerBootstrap類的關(guān)鍵字段一致,由此可見這個內(nèi)部類是一個關(guān)鍵點,首先看該類的類簽名:
private static class ServerBootstrapAcceptor extends ChannelInboundHandlerAdapter
該類繼承自ChannelInboundHandlerAdapter,作為Netty的使用者對這個類很熟悉,如果你只是初學(xué),可在學(xué)完本章之后繼續(xù)查看這個鏈接。首先明確這個Handler處理器是一個inbound事件處理器,需要注意的是:Netty將ServerChannel接受客戶端連接的accept事件抽象為Read讀取事件。因此,我們重點關(guān)注channelRead()方法,其完成的工作有:
- 配置Channel,包括Channel上的處理器鏈,Channel的選項參數(shù)及屬性鍵值對。
- 將服務(wù)端accept的客戶端Channel注冊到subReactor線程池的一個線程上
其中的代碼清晰易懂,代碼如下:
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 服務(wù)器accept的客戶端channel
final Channel child = (Channel) msg;
// 設(shè)置處理器鏈
child.pipeline().addLast(childHandler);
// 設(shè)置channel的選項參數(shù)
for (Entry<ChannelOption<?>, Object> e: childOptions) {
try {
if (!child.config().setOption(
(ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + child, t);
}
}
// 設(shè)置channel的屬性鍵值對
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
// 將客戶端channel注冊到subReactor線程池,注冊失敗或者拋出異常則關(guān)閉channel
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
private static void forceClose(Channel child, Throwable t) {
child.unsafe().closeForcibly();
logger.warn("Failed to register an accepted channel: " + child, t);
}
其中的exceptionCaught()方法也值得關(guān)注,當(dāng)ServerChannel事件在執(zhí)行中產(chǎn)生異常時,用戶并不希望ServerChannel被關(guān)閉,因為還有其他的客戶端連接需要處理。為此,Netty處理異常時使用這樣的極致:產(chǎn)生異常后暫停接受客戶端連接,1s以后再恢復(fù)接受連接。
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
final ChannelConfig config = ctx.channel().config();
if (config.isAutoRead()) {
// 停止accept客戶端channel的連接
config.setAutoRead(false);
// 1s以后重新接受客戶端channel連接
ctx.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
config.setAutoRead(true);
}
}, 1, TimeUnit.SECONDS);
}
ctx.fireExceptionCaught(cause);
}
2.3 Bootstrap
Bootstrap是客戶端啟動器,只增加了一個字段:
private volatile SocketAddress remoteAddress; // 服務(wù)端地址
2.1 init方法
客戶端的init()方法很簡單,只完成配置Channel的工作,包括Channel上的處理器鏈,Channel的選項參數(shù)及屬性鍵值對。代碼如下:
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
// 設(shè)置處理器鏈
p.addLast(handler());
// 設(shè)置channel的選項參數(shù)
final Map<ChannelOption<?>, Object> options = options();
synchronized (options) {
for (Entry<ChannelOption<?>, Object> e: options.entrySet()) {
try {
if(!channel.config().setOption(
(ChannelOption<Object>)e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + channel, t);
}
}
}
// 設(shè)置channel的屬性鍵值對
final Map<AttributeKey<?>, Object> attrs = attrs();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
}
}
2.2 connect方法
客戶端需要連接到遠(yuǎn)程服務(wù)端,才能進(jìn)行網(wǎng)絡(luò)通訊,所以Bootstrap作為客戶端啟動器必然要有connect(remoteAddr)方法。該方法有很多變種,關(guān)鍵的一個如下:
public ChannelFuture connect(SocketAddress remoteAddress,
SocketAddress localAddress) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
validate();
return doConnect(remoteAddress, localAddress);
}
繼續(xù)分析doConnect()方法:
private ChannelFuture doConnect(final SocketAddress remoteAddress,
final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister(); //創(chuàng)建Channel并注冊到線程池
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
final ChannelPromise promise = channel.newPromise();
if (regFuture.isDone()) {
// 注冊異步操作完成,再提交一個連接異步任務(wù)
doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
} else {
// 注冊異步操作未完成,當(dāng)注冊異步完成時提交一個連接異步任務(wù)
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
doConnect0(regFuture, channel, remoteAddress, localAddress, promise);
}
});
}
return promise;
}
此處的處理與ServerBootstrap的bind()方法很類似,可類比分析。由此推知,doConnect0()方法將向channel注冊的線程池提交一個異步連接任務(wù)。的確如此,代碼如下:
private static void doConnect0(final ChannelFuture regFuture, final Channel channel,
final SocketAddress remoteAddress,
final SocketAddress localAddress,
final ChannelPromise promise) {
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
if (localAddress == null) {
channel.connect(remoteAddress, promise);
} else {
channel.connect(remoteAddress, localAddress, promise);
}
promise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}