Netty源碼-客戶端啟動過程

1 概述

在介紹了Netty服務(wù)端啟動之后(參考筆者文章Netty源碼-服務(wù)端啟動過程),再看Netty的客戶端啟動會發(fā)現(xiàn)二者十分類似,服務(wù)端啟動通過調(diào)用了ServerBootstrap.bind方法開啟,而客戶端啟動則通過調(diào)用Bootstrap.connect方法啟動。

本文的介紹比較簡單,因為許多操作和服務(wù)端啟動一致,就沒有詳細介紹,讀者可集合服務(wù)端啟動過程一起理解。

2 客戶端的典型編碼

和介紹服務(wù)端一樣,我們先看一下客戶端的典型編碼:

public class TimeClient {
    public void connect(int port, String host) throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            //客戶端只要準備一個group即可
            b.group(group)
            //注冊客戶端使用的channel
            .channel(NioSocketChannel.class)
            //設(shè)置客戶端channel選項和屬性
            .option(ChannelOption.TCP_NODELAY, true)
            .attr(AttributeKey.valueOf("attrKey"), "attrValue")
            //注冊客戶端pipelne中的handler
            .handler(new ChannelInitializer<Channel>() {
                @Override
                protected void initChannel(Channel ch) throws Exception {
                    ch.pipeline().addLast(new TimeClientHandler());
                }
            });
            //調(diào)用connect啟動客戶端
            ChannelFuture f = b.connect(host, port).sync();
        } finally {
            //優(yōu)雅停機
            group.shutdownGracefully();
        }
    }
    public static void main(String[] args) throws Exception {
        new TimeClient().connect(8080, "127.0.0.1");
    }

}

3 一些配置函數(shù)

在第1節(jié)我們提到了BootstrapServerBootstrap類,Bootstrap主要負責(zé)客戶端的啟動,而ServerBootstrap則主要負責(zé)服務(wù)端的啟動,我們在Netty源碼-服務(wù)端啟動過程第3節(jié)一些配置函數(shù)介紹了ServerBootstrap的一些常用配置函數(shù),因為ServerBootstrap即需要配置Accept線程和Server channel,又需要配置客戶端連接的線程和客戶端channel,所以ServerBootstrap的配置方法都是optionchildOptionhandlerchildHandler這樣成對出現(xiàn)的,因為Bootstrap主要負責(zé)客戶端啟動,所以只需要配置客戶端線程和channel即可,所以其配置方法則沒有child*這一類。相關(guān)配置方法在文章Netty源碼-服務(wù)端啟動過程也都介紹過,本文也就不再介紹了。

4 客戶端啟動

客戶端的啟動由Bootstrap.connect方法開啟,下面看其源碼:

//Bootstrap
/**
* Connect a {@link Channel} to the remote peer.
*/
public ChannelFuture connect(String inetHost, int inetPort) {
    return connect(InetSocketAddress.createUnresolved(inetHost, inetPort));
}

/**
* Connect a {@link Channel} to the remote peer.
*/
public ChannelFuture connect(SocketAddress remoteAddress) {
    if (remoteAddress == null) {
        throw new NullPointerException("remoteAddress");
    }
    //驗證一些必要配置是否都已經(jīng)配置過
    validate();
    //進行地址解析和實際的連接
    return doResolveAndConnect(remoteAddress, config.localAddress());
}

@Override
public Bootstrap validate() {
    //父類中的驗證主要是保證group和channelFactory不為空
    super.validate();
    //驗證設(shè)置了handler
    if (config.handler() == null) {
        throw new IllegalStateException("handler not set");
    }
    return this;
}

//客戶端的啟動也分為三個步驟,第一為初始化通道,第二為向
//EventLoopGroup中的某個NioEventLoop持有的Selector注冊
//通道,第三個為解析地址和連接
private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
    //這里完成了第一和第二步驟,即初始化和注冊
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();

    if (regFuture.isDone()) {
        if (!regFuture.isSuccess()) {
            return regFuture;
        }
        //這里完成第三個步驟:解析地址和連接
        return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
    } else {
        // Registration future is almost always fulfilled already, but just in case it's not.
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                // Directly obtain the cause and do a null check so we only need one volatile read in case of a
                // failure.
                Throwable cause = future.cause();
                if (cause != null) {
                    // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
                    // IllegalStateException once we try to access the EventLoop of the Channel.
                    promise.setFailure(cause);
                } else {
                    // Registration was successful, so set the correct executor to use.
                    // See https://github.com/netty/netty/issues/2586
                    promise.registered();
                    doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

4.1 Channel初始化和注冊

其實客戶端Channel初始化和注冊實現(xiàn)與服務(wù)端基本一樣,在初始化時會調(diào)用AbstractBootstrap.init方法,這個方法根據(jù)在具體的子類中進行了重寫,客戶端的子類為Bootstrap,其實現(xiàn)如下:

//Bootstrap
//邏輯比較簡單,首先向客戶端channel的pipeline添加handler
//然后進行選項和attr的設(shè)置
@Override
@SuppressWarnings("unchecked")
void init(Channel channel) throws Exception {
    ChannelPipeline p = channel.pipeline();
    p.addLast(config.handler());

    final Map<ChannelOption<?>, Object> options = options0();
    synchronized (options) {
        setChannelOptions(channel, options, logger);
    }

    final Map<AttributeKey<?>, Object> attrs = attrs0();
    synchronized (attrs) {
        for (Entry<AttributeKey<?>, Object> e: attrs.entrySet()) {
            channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
        }
    }
}

除此之外,Channel初始化和注冊與服務(wù)端一樣,可參見筆者文章Netty源碼-服務(wù)端啟動過程4.1節(jié)相關(guān)內(nèi)容,這里不再介紹。

doResolveAndConnect0方法主要完成第三個步驟,解析服務(wù)器地址并連接:

//Bootstrap
private ChannelFuture doResolveAndConnect0(final Channel channel, SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    try {
        final EventLoop eventLoop = channel.eventLoop();
        //首先進行域名解析(如果指定的服務(wù)端地址時域名而不是
        //IP時需要進行解析)
        final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);

        if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
            // Resolver has no idea about what to do with the specified remote address or it's resolved already.
            //解析成功之后進行連接操作
            doConnect(remoteAddress, localAddress, promise);
            return promise;
        }

        final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);

        if (resolveFuture.isDone()) {
            final Throwable resolveFailureCause = resolveFuture.cause();

            if (resolveFailureCause != null) {
                // Failed to resolve immediately
                channel.close();
                promise.setFailure(resolveFailureCause);
            } else {
                // Succeeded to resolve immediately; cached? (or did a blocking lookup)
                doConnect(resolveFuture.getNow(), localAddress, promise);
            }
            return promise;
        }

        // Wait until the name resolution is finished.
        resolveFuture.addListener(new FutureListener<SocketAddress>() {
            @Override
            public void operationComplete(Future<SocketAddress> future) throws Exception {
                if (future.cause() != null) {
                    channel.close();
                    promise.setFailure(future.cause());
                } else {
                    doConnect(future.getNow(), localAddress, promise);
                }
            }
        });
    } catch (Throwable cause) {
        promise.tryFailure(cause);
    }
    return promise;
}

4.2 地址解析和連接

在完成第一和第二個步驟之后,通道初始化和注冊都已經(jīng)完成,Bootstrap就會調(diào)用doResolveAndConnect0方法解析服務(wù)器地址并連接。

4.2.1 地址解析

因為我們在指定服務(wù)端地址不僅可以使用IP,還可以使用域名,所以我們在指定域名時,就需要Netty將其解析為IP地址,實現(xiàn)邏輯也比較簡單,默認的解析器為DefaultNameResolver,根據(jù)域名解析出IP調(diào)用的方法為java.net.InetAddress.getAllByName(hostname),這里也不再展開介紹。

4.2.2 連接

在將域名(如果配置的服務(wù)端地址為域名而不是IP時會進行解析操作)解析為IP之后,會調(diào)用doConnect進行連接操作:

//Bootstrap
private static void doConnect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    final Channel channel = connectPromise.channel();
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (localAddress == null) {
                //直接調(diào)用通道的connect方法
                channel.connect(remoteAddress, connectPromise);
            } else {
                channel.connect(remoteAddress, localAddress, connectPromise);
            }
            connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
        }
    });
}

從上面的源碼可以看出,連接操作直接通過調(diào)用Channel.connect方法完成,Channel.connect方法我們直接看起子類AbstractChannel中的實現(xiàn):

//AbstractChannel
//connect都是直接通過調(diào)用Pipeline的connect進行連接操作
@Override
public ChannelFuture connect(SocketAddress remoteAddress) {
    return pipeline.connect(remoteAddress);
}

@Override
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
    return pipeline.connect(remoteAddress, localAddress);
}

根據(jù)Netty源碼-ChannelPipeline和ChannelHandler中的介紹,connect方法屬于Outbound事件,所以最終會調(diào)用HeadContext.connect方法:

//HeadContext
 @Override
public void connect(
        ChannelHandlerContext ctx,
        SocketAddress remoteAddress, SocketAddress localAddress,
        ChannelPromise promise) throws Exception {
    unsafe.connect(remoteAddress, localAddress, promise);
}

HeadContext.connect方法調(diào)用了Unsafe.connect方法,我們看其在子類AbstractNioUnsafe中的實現(xiàn):

//AbstractNioUnsafe
@Override
public final void connect(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    if (!promise.setUncancellable() || !ensureOpen(promise)) {
        return;
    }

    try {
        if (connectPromise != null) {
            // Already a connect in process.
            throw new ConnectionPendingException();
        }

        boolean wasActive = isActive();
        //這里調(diào)用了外部類AbstractNioChannel.doConnect
        //方法執(zhí)行實際的連接動作
        if (doConnect(remoteAddress, localAddress)) {
            fulfillConnectPromise(promise, wasActive);
        } else {
            connectPromise = promise;
            requestedRemoteAddress = remoteAddress;

            // Schedule connect timeout.
            int connectTimeoutMillis = config().getConnectTimeoutMillis();
            if (connectTimeoutMillis > 0) {
                connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                    @Override
                    public void run() {
                        ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                        ConnectTimeoutException cause =
                                new ConnectTimeoutException("connection timed out: " + remoteAddress);
                        if (connectPromise != null && connectPromise.tryFailure(cause)) {
                            close(voidPromise());
                        }
                    }
                }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
            }

            promise.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isCancelled()) {
                        if (connectTimeoutFuture != null) {
                            connectTimeoutFuture.cancel(false);
                        }
                        connectPromise = null;
                        close(voidPromise());
                    }
                }
            });
        }
    } catch (Throwable t) {
        promise.tryFailure(annotateConnectException(t, remoteAddress));
        closeIfClosed();
    }
}

因為這里介紹的是客戶端的啟動,所以我們看AbstractNioChannel.doConnect在其子類NioSocketChannel中的實現(xiàn):

//NioSocketChannel
 @Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
    //如何本地地址不為空,表示要進行本地地址綁定
    if (localAddress != null) {
        doBind0(localAddress);
    }

    boolean success = false;
    try {
        //最終調(diào)用了java channel.connect方法
        boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
        if (!connected) {
            selectionKey().interestOps(SelectionKey.OP_CONNECT);
        }
        success = true;
        return connected;
    } finally {
        if (!success) {
            doClose();
        }
    }
}

//本地地址綁定,調(diào)用java channel的bind方法進行綁定
 private void doBind0(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        SocketUtils.bind(javaChannel(), localAddress);
    } else {
        SocketUtils.bind(javaChannel().socket(), localAddress);
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容