Netty源碼筆記之Bootstrap

Bootstrap適用于創(chuàng)建客戶(hù)端連接的一個(gè)引導(dǎo)類(lèi),我們可以通過(guò)它很方便的創(chuàng)建出Netty客戶(hù)端的連接,接下來(lái)我以官方源碼里面的example echo項(xiàng)目為例來(lái)具體分析其實(shí)現(xiàn):

例子來(lái)自官方的Example示例下面的echo項(xiàng)目,example\src\main\java\io\netty\example\echo

 // Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
    Bootstrap b = new Bootstrap();
    b.group(group)
     .channel(NioSocketChannel.class)
     .option(ChannelOption.TCP_NODELAY, true)
     .handler(new ChannelInitializer<SocketChannel>() {
         @Override
         public void initChannel(SocketChannel ch) throws Exception {
             ChannelPipeline p = ch.pipeline();
             if (sslCtx != null) {
                 p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
             }
             //p.addLast(new LoggingHandler(LogLevel.INFO));
             p.addLast(new EchoClientHandler());
         }
     });

    // Start the client.
    ChannelFuture f = b.connect(HOST, PORT).sync();

    // Wait until the connection is closed.
    f.channel().closeFuture().sync();
} finally {
    // Shut down the event loop to terminate all threads.
    group.shutdownGracefully();
}

上面的代碼向我們展示了Netty客戶(hù)端初始化所需的內(nèi)容:

1、EventLoopGroup:事件線程循環(huán)調(diào)度組,這里使用的是NioEventLoopGroup。
2、Bootstrap:客戶(hù)端引導(dǎo)類(lèi)
3、NioSocketChannel:Nio客戶(hù)端通道
4、ChannelInitializer:客戶(hù)端Handler初始器
5、ChannelPipeline:Channel管道

在研究Bootstrap實(shí)現(xiàn)之前,我們先來(lái)看下Bootstrap的類(lèi)圖結(jié)構(gòu):

Bootstrap類(lèi)圖結(jié)構(gòu)

可以看到,Bootstrap的類(lèi)圖結(jié)構(gòu)還是比較簡(jiǎn)單的,上層有個(gè)抽象類(lèi)AbstractBootstrap,以及兩個(gè)頂層接口分別是Cloneable、Channel。

繼續(xù)研究,我們看到上面的代碼中是直接new 出來(lái)了一個(gè)Bootstrap,這個(gè)是個(gè)空的構(gòu)造函數(shù),所以這一步驟沒(méi)啥可說(shuō)的,接下來(lái)把EventLoopGroup設(shè)置進(jìn)了b.group(group),我們進(jìn)入這里看下其實(shí)現(xiàn):

public B group(EventLoopGroup group) {
    // 忽略參數(shù)校驗(yàn)
    this.group = group;
    return self();
}

其實(shí)這一步驟是由AbstractBootstrap來(lái)完成的,group的定義如下:

volatile EventLoopGroup group;

繼續(xù)往下走,來(lái)到了 channel(NioSocketChannel.class),這里設(shè)置了一個(gè)Chanenl類(lèi)型,此處使用的是NioSocketChannel,這里的具體實(shí)現(xiàn)也是由AbstractBootstrap來(lái)完成的,代碼如下:

public B channel(Class<? extends C> channelClass) {
    return channelFactory(new ReflectiveChannelFactory<C>(
            ObjectUtil.checkNotNull(channelClass, "channelClass")
    ));
}

我們來(lái)看下ReflectiveChannelFactory里面的實(shí)現(xiàn)是怎么樣的:

private final Constructor<? extends T> constructor;

public ReflectiveChannelFactory(Class<? extends T> clazz) {
    this.constructor = clazz.getConstructor();
}

@Override
public T newChannel() {
    return constructor.newInstance();
}

上面的代碼去除了參數(shù)校驗(yàn)以及異常判斷,這樣看代碼更加清晰明了一點(diǎn),可以看到的是這不就是我們經(jīng)常使用的Java反射實(shí)例對(duì)象的方式嗎,首先先將傳遞進(jìn)來(lái)的NioSocketChanenl.class,通過(guò)獲取其默認(rèn)的構(gòu)造函數(shù)對(duì)象,并將其默認(rèn)構(gòu)造函數(shù)保存起來(lái),后期實(shí)現(xiàn)的使用,直接通過(guò)newChannel()獲取一個(gè)新的實(shí)例即可。ReflectiveChannelFactory實(shí)例對(duì)象后面通過(guò)channelFactory()方法設(shè)置給了AbstractBootstrap的channelFactory字段。

接下來(lái),代碼使用了option(ChannelOption.TCP_NODELAY, true),這里也是由AbstractBootstrap父類(lèi)的進(jìn)行處理的:

public <T> B option(ChannelOption<T> option, T value) {
    synchronized (options) {
        if (value == null) {
            options.remove(option);
        } else {
            options.put(option, value);
        }
    }
    return self();
}

這里可以看到如果ChannelOption里面的常量值如果設(shè)置的是null則會(huì)進(jìn)行移除該選項(xiàng),否則就將其設(shè)置到options中,下面我們來(lái)看下options在AbstractBootstrap中是如何定義的:

private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();

這里就是一個(gè)簡(jiǎn)單的LinkedHashMap用于保存ChannelOption的鍵值集合,比較簡(jiǎn)單。

下面設(shè)置了handler(),也是由AbstractBootstrap來(lái)設(shè)置,代碼實(shí)現(xiàn)如下:

public B handler(ChannelHandler handler) {
    this.handler = ObjectUtil.checkNotNull(handler, "handler");
    return self();
}

這里不過(guò)多展開(kāi),后面會(huì)有單獨(dú)的章節(jié)討論。

繼續(xù)往下走,b.connect(HOST, PORT).sync();看下其實(shí)現(xiàn)先:

private ChannelFuture doResolveAndConnect(final SocketAddress remoteAddress, final SocketAddress localAddress) {
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();

    if (regFuture.isDone()) {
        if (!regFuture.isSuccess()) {
            return regFuture;
        }
        return doResolveAndConnect0(channel, remoteAddress, localAddress, channel.newPromise());
    } else {
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                if (cause != null) {
                    promise.setFailure(cause);
                } else {
                    promise.registered();
                    doResolveAndConnect0(channel, remoteAddress, localAddress, promise);
                }
            }
        });
        return promise;
    }
}

代碼雖長(zhǎng),但是最重要的還是initAndRegister()方法,繼續(xù)跟進(jìn)看看:

final ChannelFuture initAndRegister() {
    Channel channel = null;
    try {
        channel = channelFactory.newChannel();
        init(channel);
    } catch (Throwable t) {
        if (channel != null) {
            channel.unsafe().closeForcibly();
            return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
        }
        return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    ChannelFuture regFuture = config().group().register(channel);
    if (regFuture.cause() != null) {
        if (channel.isRegistered()) {
            channel.close();
        } else {
            channel.unsafe().closeForcibly();
        }
    }
    return regFuture;
}

走到這里,我們可以看到我們之前通過(guò)傳遞NioSocketChannel.class構(gòu)建的ReflectiveChannelFactory工廠排上用場(chǎng)了,這里也是通過(guò)newChannel()方法直接通過(guò)構(gòu)造函數(shù)直接new出來(lái)了一個(gè)NioSocketChannel對(duì)象出來(lái),因?yàn)檫@里使用的是默認(rèn)構(gòu)造函數(shù)構(gòu)造的對(duì)象,具體的代碼實(shí)現(xiàn)我們還是需要手動(dòng)定位到NioSocketChannel中一看究竟,如下:

private static final SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();

private static SocketChannel newSocket(SelectorProvider provider) {
    return provider.openSocketChannel();
}

public NioSocketChannel() {
    this(DEFAULT_SELECTOR_PROVIDER);
}

public NioSocketChannel(SelectorProvider provider) {
    this(newSocket(provider));
}

public NioSocketChannel(SocketChannel socket) {
    this(null, socket);
}

public NioSocketChannel(Channel parent, SocketChannel socket) {
    super(parent, socket);
    config = new NioSocketChannelConfig(this, socket.socket());
}

可以看到這里使用了JDK的SelectorProvider提供器打開(kāi)了一個(gè)新的SocketChannel通道,然后調(diào)用了父類(lèi)AbstractNioByteChannel的構(gòu)造函數(shù):

protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
    super(parent, ch, SelectionKey.OP_READ);
}

AbstractNioByteChannel又調(diào)用了它的父類(lèi)AbstractNioChannel構(gòu)造函數(shù):

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
    super(parent);
    this.ch = ch;
    this.readInterestOp = readInterestOp;
    ch.configureBlocking(false);
}

這里我們調(diào)用了父類(lèi)的構(gòu)造函數(shù)將parent(這里是null)傳遞給了AbstractChannel,然后設(shè)置了感興趣的事件為SelectionKey.OP_READ,并且設(shè)置該通道為非阻塞,這里沒(méi)有太多的東西,看下AbstractChannel的構(gòu)造函數(shù)實(shí)現(xiàn):

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

可以看到這里的東西還是有點(diǎn)兒多的,首先將子類(lèi)傳遞過(guò)來(lái)的Channel parent設(shè)置到this.parent,這里是個(gè)null,接下來(lái)再實(shí)例化id,這里的id實(shí)現(xiàn)為DefaultChannelId,接下來(lái)實(shí)例化了一個(gè)Unsafe的實(shí)現(xiàn),這個(gè)Unsafe是Netty的自定義實(shí)現(xiàn),看下其接口聲明:

interface Unsafe {
    //返回指定的RecvByteBufAllocator.Handle,它將用于在接收數(shù)據(jù)時(shí)分配ByteBuf。
    RecvByteBufAllocator.Handle recvBufAllocHandle();
    //返回本地綁定的SocketAddress
    SocketAddress localAddress();
    //返回遠(yuǎn)程綁定的SocketAddress
    SocketAddress remoteAddress();
    // 注冊(cè)ChannelPromise到Channel,并在注冊(cè)完成后通知ChannelFuture
    void register(EventLoop eventLoop, ChannelPromise promise);
    //將SocketAddress綁定到ChannelPromise的Channel上,當(dāng)完成后將通知ChannelPromise
    void bind(SocketAddress localAddress, ChannelPromise promise);
    // 將給定ChannelFuture的Channel與給定的遠(yuǎn)程SocketAddress連接,如果要使用本地localAddress只需要給定參數(shù),其它情況只需要傳遞null即可,連接操作完成后,ChannelPromise將收到通知
    void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
    //斷開(kāi)ChannelFuture的Channel連接并且當(dāng)操作完成后通知ChannelPromise
    void disconnect(ChannelPromise promise);
    //關(guān)閉ChannelFuture的Channel連接并且當(dāng)操作完成后通知ChannelPromise
    void close(ChannelPromise promise);
    //立即關(guān)閉Channel而不觸發(fā)任何事件,可能只有在注冊(cè)嘗試失敗時(shí)才有用。
    void closeForcibly();
    //從EventLoop注銷(xiāo)ChannelPromise的Channel,并在操作完成后通知ChannelPromise
    void deregister(ChannelPromise promise);
   //調(diào)度一個(gè)讀取操作,該操作將填充ChannelPipeline中第一個(gè)ChannelInboundHandler的入站緩沖區(qū)。如果已經(jīng)存在掛起的讀取操作,則此方法不執(zhí)行任何操作。
    void beginRead();
    // 調(diào)度一個(gè)寫(xiě)操作
    void write(Object msg, ChannelPromise promise);
    // 調(diào)度write(Object, ChannelPromise)方法,刷新所有的寫(xiě)操作
    void flush();
    //返回一個(gè)特殊的ChannelPromise,它可以被重用并傳遞給{@link Unsafe}中的操作。它永遠(yuǎn)不會(huì)收到成功或錯(cuò)誤的通知,因此只是操作的占位符以ChannelPromise作為參數(shù),但不希望得到通知。
    ChannelPromise voidPromise();
    //返回存儲(chǔ)掛起寫(xiě)入請(qǐng)求的Channel的ChannelOutboundBuffer
    ChannelOutboundBuffer outboundBuffer();
}

這么看來(lái)所有的網(wǎng)絡(luò)底層操作都封裝到了Netty實(shí)現(xiàn)的Unsafe接口中,這里我們看些newUnsafe代碼的實(shí)現(xiàn),在該父類(lèi)中定義的是一個(gè)抽象方法,具體的實(shí)現(xiàn)在NioSocketChannel子類(lèi)中:

protected AbstractNioUnsafe newUnsafe() {
    return new NioSocketChannelUnsafe();
}

可以看到Unsafe的具體實(shí)現(xiàn)是由NioSocketChannelUnsafe()類(lèi)來(lái)實(shí)現(xiàn)的,其類(lèi)層級(jí)圖如下所示:

NioSocketChannelUnsafe

這里就不展開(kāi)講了。

繼續(xù)往下走,接下來(lái)就到了pipeline = newChannelPipeline()了,這也對(duì)應(yīng)了每個(gè)Channel都有一個(gè)對(duì)應(yīng)的ChannelPipeline,這也是Netty實(shí)現(xiàn)無(wú)鎖化的關(guān)鍵,后面的事件都是在該P(yáng)ipeline中流轉(zhuǎn)的,看下其實(shí)現(xiàn)代碼吧:

protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}

這里的實(shí)現(xiàn)比較簡(jiǎn)單,直接實(shí)例化了一個(gè)DefaultChannelPipeline,并且將當(dāng)前對(duì)象以構(gòu)造參數(shù)的形式傳遞過(guò)去了,繼續(xù)深入進(jìn)去看看:

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

這里可以看到這里有兩個(gè)節(jié)點(diǎn)分別是tail、head,并且它們進(jìn)行了關(guān)聯(lián),組成了一個(gè)雙向鏈表的形式,接下來(lái)我們看下TailContext的類(lèi)結(jié)構(gòu)圖:

TailContext

接下來(lái)看下構(gòu)造函數(shù)代碼實(shí)現(xiàn):

TailContext(DefaultChannelPipeline pipeline) {
    super(pipeline, null, TAIL_NAME, TailContext.class);
    setAddComplete();
}

可以看到調(diào)用了父類(lèi)AbstractChannelHandlerContext父類(lèi)的構(gòu)造函數(shù):

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, Class<? extends ChannelHandler> handlerClass) {
        this.name = ObjectUtil.checkNotNull(name, "name");
        this.pipeline = pipeline;
        this.executor = executor;
        this.executionMask = mask(handlerClass);
        ordered = executor == null || executor instanceof OrderedEventExecutor;
    }

接下來(lái),我們繼續(xù)跟進(jìn)HeadContext的實(shí)現(xiàn),首先看下HeadContext的類(lèi)結(jié)構(gòu)圖:

HeadContext

代碼實(shí)現(xiàn)和TailContext一致:

HeadContext(DefaultChannelPipeline pipeline) {
    super(pipeline, null, HEAD_NAME, HeadContext.class);
    unsafe = pipeline.channel().unsafe();
    setAddComplete();
}

和TailContext一樣,HeadContext的父類(lèi)也是AbstractChannelHandlerContext抽象類(lèi),到此為止,我們就完成了整個(gè)Channel的實(shí)例化工作了。我們繼續(xù)回到AbstractBootstrap.initAndRegister()方法,下一步操作并是執(zhí)行init(channel)方法:

void init(Channel channel) {
    ChannelPipeline p = channel.pipeline();
    p.addLast(config.handler());

    setChannelOptions(channel, newOptionsArray(), logger);
    setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
}

這里的ChannelPipeline并是我們前面實(shí)例化的DefaultChannelPipeline,然后將我們EchoClient中設(shè)置的Handler添加到ChannelPipeline中,并設(shè)置ChannelOption以及屬性等操作。

繼續(xù)回到AbstractBootstrap.initAndRegister()方法,我們可以看到下面的代碼塊:

ChannelFuture regFuture = config().group().register(channel);

這里通過(guò)config()方法獲取到BootstrapConfig實(shí)例,然后根據(jù)該實(shí)例的group()方法,里面的實(shí)現(xiàn)是直接bootstrap的group屬性,所以這里的group就是我們程序里面設(shè)置的NioEventLoopGroup對(duì)象,我們定位到NioEventLoopGroup對(duì)象的register(channel)方法,該方法由父類(lèi)MultithreadEventLoopGroup實(shí)現(xiàn):

public ChannelFuture register(Channel channel) {
    return next().register(channel);
}

可以看到通過(guò)next()方法從線程組里面選取一個(gè)NioEventLoop線程來(lái)執(zhí)行操作的,我們來(lái)跟進(jìn)next()方法的實(shí)現(xiàn),該方法由父類(lèi)MultithreadEventExecutorGroup來(lái)實(shí)現(xiàn):

public EventExecutor next() {
    return chooser.next();
}

可以看到的是,這里是由我們之前設(shè)置的EventExecutorChooserFactory.EventExecutorChooser來(lái)實(shí)現(xiàn)的,如果傳遞線程的數(shù)量是2的冪則使用的是PowerOfTwoEventExecutorChooser,否則使用的是GenericEventExecutorChooser,這里不過(guò)是使用獲取線程的方式不同,下面來(lái)分別看下兩者的實(shí)現(xiàn):

PowerOfTwoEventExecutorChooser的實(shí)現(xiàn):

private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
    private final AtomicInteger idx = new AtomicInteger();
    private final EventExecutor[] executors;

    PowerOfTwoEventExecutorChooser(EventExecutor[] executors) {
        this.executors = executors;
    }

    @Override
    public EventExecutor next() {
        return executors[idx.getAndIncrement() & executors.length - 1];
    }
}

可以看到PowerOfTwoEventExecutorChooser的實(shí)現(xiàn)是通過(guò)一個(gè)原子整型的自增器不斷增加然后與線程池里面的數(shù)量-1與之進(jìn)行邏輯與獲取該線程池里面的一個(gè)線程,其效率是非常高效的。

接下來(lái)看下GenericEventExecutorChooser的實(shí)現(xiàn):

private static final class GenericEventExecutorChooser implements EventExecutorChooser {
    private final AtomicInteger idx = new AtomicInteger();
    private final EventExecutor[] executors;

    GenericEventExecutorChooser(EventExecutor[] executors) {
        this.executors = executors;
    }

    @Override
    public EventExecutor next() {
        return executors[Math.abs(idx.getAndIncrement() % executors.length)];
    }
}

與PowerOfTwoEventExecutorChooser不同的是,GenericEventExecutorChooser的實(shí)現(xiàn)方式則是通過(guò)原子整型的自增器不斷增加然后與線程池里面的數(shù)量進(jìn)行模除操作來(lái)定義一個(gè)線程,其效率是沒(méi)有按位與高效。

回到之前的代碼,next()方法與設(shè)置的線程數(shù)是有很大關(guān)系的,推薦做法是使用2的冪來(lái)設(shè)置線程的數(shù)量,這樣使用按位與能獲得更好的性能。next()方法返回的是NioEventLoop線程對(duì)象,我們定位到該方法是由其父類(lèi)SingleThreadEventLoop來(lái)實(shí)現(xiàn)的:

public ChannelFuture register(Channel channel) {
    return register(new DefaultChannelPromise(channel, this));
}

public ChannelFuture register(final ChannelPromise promise) {
    ObjectUtil.checkNotNull(promise, "promise");
    promise.channel().unsafe().register(this, promise);
    return promise;
}

可以看到具體的實(shí)現(xiàn)是由channel的Unsafe類(lèi)的實(shí)現(xiàn)類(lèi)NioSocketChannelUnsafe來(lái)實(shí)現(xiàn)的,不過(guò)其register()方法是由其父類(lèi)AbstractUnsafe來(lái)實(shí)現(xiàn)的:

public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    AbstractChannel.this.eventLoop = eventLoop;

    if (eventLoop.inEventLoop()) {
        register0(promise);
    } else {
        eventLoop.execute(new Runnable() {
            @Override
            public void run() {
                register0(promise);
            }
        });
    }
}

繼續(xù)跟進(jìn)register0:

private void register0(ChannelPromise promise) {
    try {
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        doRegister();
        neverRegistered = false;
        registered = true;

        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();

        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                beginRead();
            }
        }
    } catch (Throwable t) {
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}

這里我們跟進(jìn)doRegister()方法,這里的實(shí)現(xiàn)是由AbstractNioChannel來(lái)完成的,源碼如下:

protected void doRegister() throws Exception {
    boolean selected = false;
    for (;;) {
        try {
            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
            return;
        } catch (CancelledKeyException e) {
            if (!selected) {
                // Force the Selector to select now as the "canceled" SelectionKey may still be
                // cached and not removed because no Select.select(..) operation was called yet.
                eventLoop().selectNow();
                selected = true;
            } else {
                // We forced a select operation on the selector before but the SelectionKey is still cached
                // for whatever reason. JDK bug ?
                throw e;
            }
        }
    }
}

該方法用于將SocketChannel注冊(cè)到到Selector中,并且監(jiān)聽(tīng)0事件(表示不監(jiān)聽(tīng)任何事件),attachment為當(dāng)前channel。

在一系列操作完成之后,最終調(diào)用的是Bootstrap#doConnect:

private static void doConnect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise connectPromise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    final Channel channel = connectPromise.channel();
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            if (localAddress == null) {
                channel.connect(remoteAddress, connectPromise);
            } else {
                channel.connect(remoteAddress, localAddress, connectPromise);
            }
            connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
        }
    });
}

這里會(huì)通過(guò)調(diào)用對(duì)應(yīng)的NioEventLoop線程來(lái)發(fā)起NioSocketChannel#connect方法的調(diào)用,而這個(gè)方法對(duì)應(yīng)調(diào)用的是DefaultChannelPipeline#connect方法,如下:

public final ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return tail.connect(remoteAddress, promise);
}

在此之前我們分析過(guò)DefaultChannelPipeline對(duì)象中有一個(gè)雙向關(guān)聯(lián)的鏈表,這里我們可以看到調(diào)用的是尾部鏈表,繼續(xù)跟進(jìn)去,其對(duì)應(yīng)位置在AbstractChannelHandlerContext#connect:

public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) {
    return connect(remoteAddress, null, promise);
}

public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    if (isNotValidPromise(promise, false)) {
        // cancelled
        return promise;
    }

    final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeConnect(remoteAddress, localAddress, promise);
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                next.invokeConnect(remoteAddress, localAddress, promise);
            }
        }, promise, null, false);
    }
    return promise;
}

這里我們看到主要是通過(guò)findContextOutbound方法傳遞一個(gè)MASK_CONNECT參數(shù)獲取一個(gè)對(duì)應(yīng)的AbstractChannelHandlerContext對(duì)象來(lái)執(zhí)行下面的操作邏輯,所以我們跟進(jìn)該方法看下其實(shí)現(xiàn):

private AbstractChannelHandlerContext findContextOutbound(int mask) {
    AbstractChannelHandlerContext ctx = this;
    EventExecutor currentExecutor = executor();
    do {
        ctx = ctx.prev;
    } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_OUTBOUND));
    return ctx;
}

因?yàn)樵贒efaultChannelPipeline中使用的是雙向鏈表保存著各個(gè)ChannelHandler,所以這里采用的是do-while循環(huán)的方式查找對(duì)應(yīng)的AbstractChannelContext,而do-while結(jié)束的條件是通過(guò)skipContext方法計(jì)算得出,來(lái)看看里面的實(shí)現(xiàn):

private static boolean skipContext(AbstractChannelHandlerContext ctx, EventExecutor currentExecutor, int mask, int onlyMask) {
    return (ctx.executionMask & (onlyMask | mask)) == 0 ||
            (ctx.executor() == currentExecutor && (ctx.executionMask & mask) == 0);
}

這里采用的是位運(yùn)算操作,可以大概的理解為利用位運(yùn)算符計(jì)算得出合適的HandlerContext即可,感興趣的小伙伴可以自行參考ChannelHandlerMask#mask方法,也就是我們沒(méi)實(shí)例化一個(gè)Handler對(duì)象,在該對(duì)象的屬性中都會(huì)存放一個(gè)executionMask,而在我們的DefaultChannelPipeline中的TailContext和HeadContext中都有著對(duì)應(yīng)關(guān)于mask的特殊處理,也就是處理所有的事件,下面是TailContext的mask源碼:

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

        TailContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, TAIL_NAME, TailContext.class);
            setAddComplete();
        }
}

這里調(diào)用了父類(lèi)的構(gòu)造方法,也就是AbstractChannelHandlerContext的構(gòu)造方法:

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name, Class<? extends ChannelHandler> handlerClass) {
    this.name = ObjectUtil.checkNotNull(name, "name");
    this.pipeline = pipeline;
    this.executor = executor;
    this.executionMask = mask(handlerClass);
    // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
    ordered = executor == null || executor instanceof OrderedEventExecutor;
}

ChannelHandlerMask對(duì)應(yīng)的常量字段:

// Using to mask which methods must be called for a ChannelHandler.
static final int MASK_EXCEPTION_CAUGHT = 1;
static final int MASK_CHANNEL_REGISTERED = 1 << 1;
static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
static final int MASK_CHANNEL_ACTIVE = 1 << 3;
static final int MASK_CHANNEL_INACTIVE = 1 << 4;
static final int MASK_CHANNEL_READ = 1 << 5;
static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;
static final int MASK_BIND = 1 << 9;
static final int MASK_CONNECT = 1 << 10;
static final int MASK_DISCONNECT = 1 << 11;
static final int MASK_CLOSE = 1 << 12;
static final int MASK_DEREGISTER = 1 << 13;
static final int MASK_READ = 1 << 14;
static final int MASK_WRITE = 1 << 15;
static final int MASK_FLUSH = 1 << 16;

static final int MASK_ONLY_INBOUND =  MASK_CHANNEL_REGISTERED |
        MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |
        MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;
private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_INBOUND;
static final int MASK_ONLY_OUTBOUND =  MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |
        MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;
private static final int MASK_ALL_OUTBOUND = MASK_EXCEPTION_CAUGHT | MASK_ONLY_OUTBOUND;

這里調(diào)用了ChannelHandlerMask#mask方法,如下:

static int mask(Class<? extends ChannelHandler> clazz) {
    // MASKS為一個(gè)Netty自定義的ThreadLocal實(shí)現(xiàn)
    Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();
    Integer mask = cache.get(clazz);
    if (mask == null) {
        mask = mask0(clazz);
        cache.put(clazz, mask);
    }
    return mask;
}

這里主要的邏輯是mask0方法,如下:

private static int mask0(Class<? extends ChannelHandler> handlerType) {
    int mask = MASK_EXCEPTION_CAUGHT;
    try {
        if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
            mask |= MASK_ALL_INBOUND;

            if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
                mask &= ~MASK_CHANNEL_REGISTERED;
            }
            if (isSkippable(handlerType, "channelUnregistered", ChannelHandlerContext.class)) {
                mask &= ~MASK_CHANNEL_UNREGISTERED;
            }
            if (isSkippable(handlerType, "channelActive", ChannelHandlerContext.class)) {
                mask &= ~MASK_CHANNEL_ACTIVE;
            }
            if (isSkippable(handlerType, "channelInactive", ChannelHandlerContext.class)) {
                mask &= ~MASK_CHANNEL_INACTIVE;
            }
            if (isSkippable(handlerType, "channelRead", ChannelHandlerContext.class, Object.class)) {
                mask &= ~MASK_CHANNEL_READ;
            }
            if (isSkippable(handlerType, "channelReadComplete", ChannelHandlerContext.class)) {
                mask &= ~MASK_CHANNEL_READ_COMPLETE;
            }
            if (isSkippable(handlerType, "channelWritabilityChanged", ChannelHandlerContext.class)) {
                mask &= ~MASK_CHANNEL_WRITABILITY_CHANGED;
            }
            if (isSkippable(handlerType, "userEventTriggered", ChannelHandlerContext.class, Object.class)) {
                mask &= ~MASK_USER_EVENT_TRIGGERED;
            }
        }

        if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
            mask |= MASK_ALL_OUTBOUND;

            if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
                    SocketAddress.class, ChannelPromise.class)) {
                mask &= ~MASK_BIND;
            }
            if (isSkippable(handlerType, "connect", ChannelHandlerContext.class, SocketAddress.class,
                    SocketAddress.class, ChannelPromise.class)) {
                mask &= ~MASK_CONNECT;
            }
            if (isSkippable(handlerType, "disconnect", ChannelHandlerContext.class, ChannelPromise.class)) {
                mask &= ~MASK_DISCONNECT;
            }
            if (isSkippable(handlerType, "close", ChannelHandlerContext.class, ChannelPromise.class)) {
                mask &= ~MASK_CLOSE;
            }
            if (isSkippable(handlerType, "deregister", ChannelHandlerContext.class, ChannelPromise.class)) {
                mask &= ~MASK_DEREGISTER;
            }
            if (isSkippable(handlerType, "read", ChannelHandlerContext.class)) {
                mask &= ~MASK_READ;
            }
            if (isSkippable(handlerType, "write", ChannelHandlerContext.class,
                    Object.class, ChannelPromise.class)) {
                mask &= ~MASK_WRITE;
            }
            if (isSkippable(handlerType, "flush", ChannelHandlerContext.class)) {
                mask &= ~MASK_FLUSH;
            }
        }

        if (isSkippable(handlerType, "exceptionCaught", ChannelHandlerContext.class, Throwable.class)) {
            mask &= ~MASK_EXCEPTION_CAUGHT;
        }
    } catch (Exception e) {
        // Should never reach here.
        PlatformDependent.throwException(e);
    }

    return mask;
}

該方法用于根據(jù)傳遞的Handler類(lèi)型計(jì)算得出對(duì)應(yīng)的mask,TailContext因?yàn)閷?shí)現(xiàn)了ChannelInboundHandler接口,所以走的是下面的邏輯:

int mask = MASK_EXCEPTION_CAUGHT;
if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
    mask |= MASK_ALL_INBOUND;
}

最終TailContext的executionMask也就為511了。

接下來(lái),我們看下HeadContext的executionMask是如何計(jì)算得出的,看下HeadContext的構(gòu)造方法:

final class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler {

    private final Unsafe unsafe;

    HeadContext(DefaultChannelPipeline pipeline) {
        super(pipeline, null, HEAD_NAME, HeadContext.class);
        unsafe = pipeline.channel().unsafe();
        setAddComplete();
    }
}

可以看到的是,HeadContext和TailContext的不同之處是該對(duì)象既實(shí)現(xiàn)了ChannelOutboundHandler接口也實(shí)現(xiàn)了ChannelInboundHandler接口,正所謂能力越大責(zé)任越大啊,不過(guò)它們相同之處便是都繼承了AbstractChannelHandlerContext類(lèi),因?yàn)檫@里處理的方式相同,所以省略看下上文即可,不同的地方是,因?yàn)镠eadContext實(shí)現(xiàn)了2個(gè)接口,所以對(duì)應(yīng)的ChannelHandlerMask#mask0對(duì)應(yīng)的處理有所不同,如下所示:

int mask = MASK_EXCEPTION_CAUGHT;
if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
    mask |= MASK_ALL_INBOUND;
}
if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
    mask |= MASK_ALL_OUTBOUND;
}

所以HeadContext對(duì)應(yīng)處理了所有關(guān)于inout的事件,所以HeadContext的executionMask最終得到了131071。

所以回到我們之前的findContextOutbound方法,最終遍歷鏈表找到的便是HeadContext對(duì)象了。找到了該對(duì)象之后回到AbstractChannelHandlerContext#connect方法,如下:

public ChannelFuture connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
        //...
        //這里返回的是HeadContext
        final AbstractChannelHandlerContext next = findContextOutbound(MASK_CONNECT);
        EventExecutor executor = next.executor();
        //如果是在本線程之內(nèi)直接發(fā)起調(diào)用
        if (executor.inEventLoop()) {
            next.invokeConnect(remoteAddress, localAddress, promise);
        } else {
            //否則使用異步任務(wù)發(fā)起處理
            safeExecute(executor, new Runnable() {
                @Override
                public void run() {
                    next.invokeConnect(remoteAddress, localAddress, promise);
                }
            }, promise, null, false);
        }
        return promise;
    }

這里通過(guò)找到HeadContext調(diào)用了invokeConnect方法,如下:

private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
    //當(dāng)前ChannelHandler的handlerAdded方法是否已經(jīng)被調(diào)用或者不保證順序并且ChannelHandler的handlerAdded方法即將被調(diào)用
    if (invokeHandler()) {
        try {
            ((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    } else {
        connect(remoteAddress, localAddress, promise);
    }
}

繼續(xù)根據(jù)connect方法,HeadContext#connect:

public void connect(
        ChannelHandlerContext ctx,
        SocketAddress remoteAddress, SocketAddress localAddress,
        ChannelPromise promise) {
    unsafe.connect(remoteAddress, localAddress, promise);
}

這里的unsafe對(duì)象對(duì)應(yīng)了NioSocketChannelUnsafe對(duì)象,這里的connect調(diào)用了AbstractNioChannel#connect方法:

public final void connect( final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    if (!promise.setUncancellable() || !ensureOpen(promise)) {
        return;
    }

    try {
        if (connectPromise != null) {
            // Already a connect in process.
            throw new ConnectionPendingException();
        }

        boolean wasActive = isActive();
        //doConnect方法會(huì)返回false
        if (doConnect(remoteAddress, localAddress)) {
            fulfillConnectPromise(promise, wasActive);
        } else {
            //保存起來(lái)用于檢測(cè)是否發(fā)起重復(fù)的連接操作
            connectPromise = promise;
            requestedRemoteAddress = remoteAddress;

            // Schedule connect timeout.
            int connectTimeoutMillis = config().getConnectTimeoutMillis();
            if (connectTimeoutMillis > 0) {
                //處理連接超時(shí)的情況
                connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                    @Override
                    public void run() {
                        ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                        ConnectTimeoutException cause =
                                new ConnectTimeoutException("connection timed out: " + remoteAddress);
                        if (connectPromise != null && connectPromise.tryFailure(cause)) {
                            close(voidPromise());
                        }
                    }
                }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
            }

            promise.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isCancelled()) {
                        if (connectTimeoutFuture != null) {
                            connectTimeoutFuture.cancel(false);
                        }
                        connectPromise = null;
                        close(voidPromise());
                    }
                }
            });
        }
    } catch (Throwable t) {
        promise.tryFailure(annotateConnectException(t, remoteAddress));
        closeIfClosed();
    }
}

這里跟進(jìn)doConnect方法,位置在NioSocketChannel#doConnect:

protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
    if (localAddress != null) {
        doBind0(localAddress);
    }

    boolean success = false;
    try {
        //利用Netty的工具類(lèi)連接到遠(yuǎn)程主機(jī),這里返回的是false
        boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
        if (!connected) {
            //注冊(cè)連接事件
            selectionKey().interestOps(SelectionKey.OP_CONNECT);
        }
        success = true;
        return connected;
    } finally {
        if (!success) {
            doClose();
        }
    }
}

到此為止,關(guān)于Netty的Bootstrap的整個(gè)流程就分析完成了。

整個(gè)流程圖如下所示。

Bootstrap時(shí)序圖.jpg
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容