1 概述
在介紹了Netty服務(wù)端啟動之后(參考筆者文章Netty源碼-服務(wù)端啟動過程),再看Netty的客戶端啟動會發(fā)現(xiàn)二者十分類似,服務(wù)端啟動通過調(diào)用了ServerBootstrap.bind方法開啟,而客戶端啟動則通過調(diào)用Bootstrap.connect方法啟動。
本文的介紹比較簡單,因為許多操作和服務(wù)端啟動一致,就沒有詳細介紹,讀者可集合服務(wù)端啟動過程一起理解。
2 客戶端的典型編碼
和介紹服務(wù)端一樣,我們先看一下客戶端的典型編碼:
public class TimeClient {
public void connect(int port, String host) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
//客戶端只要準備一個group即可
b.group(group)
//注冊客戶端使用的channel
.channel(NioSocketChannel.class)
//設(shè)置客戶端channel選項和屬性
.option(ChannelOption.TCP_NODELAY, true)
.attr(AttributeKey.valueOf("attrKey"), "attrValue")
//注冊客戶端pipelne中的handler
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
//調(diào)用connect啟動客戶端
ChannelFuture f = b.connect(host, port).sync();
} finally {
//優(yōu)雅停機
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new TimeClient().connect(8080, "127.0.0.1");
}
}
3 一些配置函數(shù)
在第1節(jié)我們提到了Bootstrap和ServerBootstrap類,Bootstrap主要負責(zé)客戶端的啟動,而ServerBootstrap則主要負責(zé)服務(wù)端的啟動,我們在Netty源碼-服務(wù)端啟動過程第3節(jié)一些配置函數(shù)介紹了ServerBootstrap的一些常用配置函數(shù),因為ServerBootstrap即需要配置Accept線程和Server channel,又需要配置客戶端連接的線程和客戶端channel,所以ServerBootstrap的配置方法都是option和childOption、handler和childHandler這樣成對出現(xiàn)的,因為Bootstrap主要負責(zé)客戶端啟動,所以只需要配置客戶端線程和channel即可,所以其配置方法則沒有child*這一類。相關(guān)配置方法在文章Netty源碼-服務(wù)端啟動過程也都介紹過,本文也就不再介紹了。
4 客戶端啟動
客戶端的啟動由Bootstrap.connect方法開啟,下面看其源碼:
//Bootstrap
/**
* Connect a {@link Channel} to the remote peer.
*/
public ChannelFuture connect(String inetHost, int inetPort) {
return connect(InetSocketAddress.createUnresolved(inetHost, inetPort));
}
/**
* Connect a {@link Channel} to the remote peer.
*/
public ChannelFuture connect(SocketAddress remoteAddress) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
//驗證一些必要配置是否都已經(jīng)配置過
validate();
//進行地址解析和實際的連接
return doResolveAndConnect(remoteAddress, config.localAddress());
}
@Override
public Bootstrap validate() {
//父類中的驗證主要是保證group和channelFactory不為空
super.validate();
//驗證設(shè)置了handler
if (config.handler() == null) {
throw new IllegalStateException("handler not set");
}
return this;
}
//客戶端的啟動也分為三個步驟,第一為初始化通道,第二為向
//EventLoopGroup中的某個NioEventLoop持有的Selector注冊
//通道,第三個為解析地址和連接
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
//這里完成了第一和第二步驟,即初始化和注冊
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.isDone()) {
if (!regFuture.isSuccess()) {
return regFuture;
}
//這里完成第三個步驟:解析地址和連接
return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
// Directly obtain the cause and do a null check so we only need one volatile read in case of a
// failure.
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
}
}
});
return promise;
}
}
4.1 Channel初始化和注冊
其實客戶端Channel初始化和注冊實現(xiàn)與服務(wù)端基本一樣,在初始化時會調(diào)用AbstractBootstrap.init方法,這個方法根據(jù)在具體的子類中進行了重寫,客戶端的子類為Bootstrap,其實現(xiàn)如下:
//Bootstrap
//邏輯比較簡單,首先向客戶端channel的pipeline添加handler
//然后進行選項和attr的設(shè)置
@Override
@SuppressWarnings("unchecked")
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
p.addLast(config.handler());
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
setChannelOptions(channel, options, logger);
}
final Map<AttributeKey<?>, Object> attrs = attrs0();
synchronized (attrs) {
for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
}
}
除此之外,Channel初始化和注冊與服務(wù)端一樣,可參見筆者文章Netty源碼-服務(wù)端啟動過程4.1節(jié)相關(guān)內(nèi)容,這里不再介紹。
doResolveAndConnect0方法主要完成第三個步驟,解析服務(wù)器地址并連接:
//Bootstrap
private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
try {
final EventLoop eventLoop = channel.eventLoop();
//首先進行域名解析(如果指定的服務(wù)端地址時域名而不是
//IP時需要進行解析)
final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);
if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
// Resolver has no idea about what to do with the specified remote address or it's resolved already.
//解析成功之后進行連接操作
doConnect(remoteAddress, localAddress, promise);
return promise;
}
final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);
if (resolveFuture.isDone()) {
final Throwable resolveFailureCause = resolveFuture.cause();
if (resolveFailureCause != null) {
// Failed to resolve immediately
channel.close();
promise.setFailure(resolveFailureCause);
} else {
// Succeeded to resolve immediately; cached? (or did a blocking lookup)
doConnect(resolveFuture.getNow(), localAddress, promise);
}
return promise;
}
// Wait until the name resolution is finished.
resolveFuture.addListener(new FutureListener<SocketAddress>() {
@Override
public void operationComplete(Future<SocketAddress> future) throws Exception {
if (future.cause() != null) {
channel.close();
promise.setFailure(future.cause());
} else {
doConnect(future.getNow(), localAddress, promise);
}
}
});
} catch (Throwable cause) {
promise.tryFailure(cause);
}
return promise;
}
4.2 地址解析和連接
在完成第一和第二個步驟之后,通道初始化和注冊都已經(jīng)完成,Bootstrap就會調(diào)用doResolveAndConnect0方法解析服務(wù)器地址并連接。
4.2.1 地址解析
因為我們在指定服務(wù)端地址不僅可以使用IP,還可以使用域名,所以我們在指定域名時,就需要Netty將其解析為IP地址,實現(xiàn)邏輯也比較簡單,默認的解析器為DefaultNameResolver,根據(jù)域名解析出IP調(diào)用的方法為java.net.InetAddress.getAllByName(hostname),這里也不再展開介紹。
4.2.2 連接
在將域名(如果配置的服務(wù)端地址為域名而不是IP時會進行解析操作)解析為IP之后,會調(diào)用doConnect進行連接操作:
//Bootstrap
private static void doConnect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
final Channel channel = connectPromise.channel();
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (localAddress == null) {
//直接調(diào)用通道的connect方法
channel.connect(remoteAddress, connectPromise);
} else {
channel.connect(remoteAddress, localAddress, connectPromise);
}
connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
});
}
從上面的源碼可以看出,連接操作直接通過調(diào)用Channel.connect方法完成,Channel.connect方法我們直接看起子類AbstractChannel中的實現(xiàn):
//AbstractChannel
//connect都是直接通過調(diào)用Pipeline的connect進行連接操作
@Override
public ChannelFuture connect(SocketAddress remoteAddress) {
return pipeline.connect(remoteAddress);
}
@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
return pipeline.connect(remoteAddress, localAddress);
}
根據(jù)Netty源碼-ChannelPipeline和ChannelHandler中的介紹,connect方法屬于Outbound事件,所以最終會調(diào)用HeadContext.connect方法:
//HeadContext
@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
HeadContext.connect方法調(diào)用了Unsafe.connect方法,我們看其在子類AbstractNioUnsafe中的實現(xiàn):
//AbstractNioUnsafe
@Override
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
try {
if (connectPromise != null) {
// Already a connect in process.
throw new ConnectionPendingException();
}
boolean wasActive = isActive();
//這里調(diào)用了外部類AbstractNioChannel.doConnect
//方法執(zhí)行實際的連接動作
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isCancelled()) {
if (connectTimeoutFuture != null) {
connectTimeoutFuture.cancel(false);
}
connectPromise = null;
close(voidPromise());
}
}
});
}
} catch (Throwable t) {
promise.tryFailure(annotateConnectException(t, remoteAddress));
closeIfClosed();
}
}
因為這里介紹的是客戶端的啟動,所以我們看AbstractNioChannel.doConnect在其子類NioSocketChannel中的實現(xiàn):
//NioSocketChannel
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
//如何本地地址不為空,表示要進行本地地址綁定
if (localAddress != null) {
doBind0(localAddress);
}
boolean success = false;
try {
//最終調(diào)用了java channel.connect方法
boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
if (!connected) {
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
//本地地址綁定,調(diào)用java channel的bind方法進行綁定
private void doBind0(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
SocketUtils.bind(javaChannel(), localAddress);
} else {
SocketUtils.bind(javaChannel().socket(), localAddress);
}
}