netty Bootstrap

1.1 Bootstrap的作用

Bootstrap的作用可以參考AbstractBootstrap的javadoc:

    AbstractBootstrap is a helper class that makes it easy to bootstrap a Channel.

Bootstrap存在的意義就是為了方便的"引導(dǎo)"Channel.

在netty中, 存在兩種類型的Channel, 因此也對(duì)應(yīng)有兩種Bootstrap

channel類型 用于引導(dǎo)的bootstrap實(shí)現(xiàn)類
ServerChannel ServerBootstrap
Channel Bootstrap

1.2 Bootstrap的繼承結(jié)構(gòu)

在netty的代碼中, 類ServerBootstrap和類Bootstrap都繼承自基類AbstractBootstrap:


圖片.png

2.類AbstractBootstrap

2.1 類定義

AbstractBootstrap是Bootstrap的基類, 類定義如下:

package io.netty.bootstrap;

public abstract class AbstractBootstrap
<B extends AbstractBootstrap<B, C>, C extends Channel> 
implements Cloneable {}

類定義中的泛型B要求是AbstractBootstrap的子類, 而泛型C要求是Channel的子類.
注意這里的泛型的用法,非常的巧妙。

2.2 成員變量

group屬性

volatile EventLoopGroup group;

public B group(EventLoopGroup group) {
    if (group == null) {
        throw new NullPointerException("group");
    }
    if (this.group != null) {
        throw new IllegalStateException("group set already");
    }
    this.group = group;
    return (B) this;
}

public EventLooattrspGroup group() {
    return group;
}

注意this.group只能設(shè)置一次, 這意味著group(group)方法只能被調(diào)用一次.

localAddress屬性

localAddress用于綁定本地終端, 有多個(gè)設(shè)值的方法:

private volatile SocketAddress localAddress;

public B localAddress(SocketAddress localAddress) {
    this.localAddress = localAddress;
    return (B) this;
}

public B localAddress(int inetPort) {
    return localAddress(new InetSocketAddress(inetPort));
}

public B localAddress(String inetHost, int inetPort) {
    return localAddress(new InetSocketAddress(inetHost, inetPort));
}

public B localAddress(InetAddress inetHost, int inetPort) {
    return localAddress(new InetSocketAddress(inetHost, inetPort));
}

final SocketAddress localAddress() {
    return localAddress;
}

這些重載的localAddress(), 最終都指向了InetSocketAddress的幾個(gè)構(gòu)造函數(shù).

options屬性

options屬性是一個(gè)LinkedHashMap, option()方法用于設(shè)置單個(gè)的key/value, 如果value為null則刪除該key.

private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();

public <T> B option(ChannelOption<T> option, T value) {
    if (option == null) {
        throw new NullPointerException("option");
    }
    if (value == null) {
        synchronized (options) {
            options.remove(option);
        }
    } else {
        synchronized (options) {
            options.put(option, value);
        }
    }
    return (B) this;
}

final Map<ChannelOption<?>, Object> options() {
    return options;
}

attrs屬性

attrs和options屬性類似.

private final Map<AttributeKey<?>, Object> attrs = new LinkedHashMap<AttributeKey<?>, Object>();

public <T> B attr(AttributeKey<T> key, T value) {
    if (key == null) {
        throw new NullPointerException("key");
    }
    if (value == null) {
        synchronized (attrs) {
            attrs.remove(key);
        }
    } else {
        synchronized (attrs) {
            attrs.put(key, value);
        }
    }
    return (B) this;
}

final Map<AttributeKey<?>, Object> attrs() {
    return attrs;
}

handler屬性

private volatile ChannelHandler handler;

public B handler(ChannelHandler handler) {
    if (handler == null) {
        throw new NullPointerException("handler");
    }
    this.handler = handler;
    return (B) this;
}

final ChannelHandler handler() {
    return handler;
}

channelFactory屬性

新舊兩個(gè)ChannelFactory

channelFactory這個(gè)屬性有點(diǎn)麻煩, 根源在于ChannelFactory這個(gè)類,netty中有新舊兩個(gè)ChannelFactory,具體介紹見 Channel Factory

混合使用

但是現(xiàn)在的情況是內(nèi)部已經(jīng)轉(zhuǎn)為使用新類, 對(duì)外的接口還是繼續(xù)保持使用原來的舊類, 因此代碼有些混亂:

// 這里的channelFactory的類型定義用的是舊類,因此需要加SuppressWarnings
@SuppressWarnings("deprecation")
private volatile ChannelFactory< ? extends C> channelFactory;

// 返回的channelFactory也是用的舊類, 沒的說, 繼續(xù)SuppressWarnings
@SuppressWarnings("deprecation")
final ChannelFactory< ? extends C> channelFactory() {
    return channelFactory;
}

// 這個(gè)方法的參數(shù)是舊的"io.netty.bootstrap.ChannelFactory",已經(jīng)被標(biāo)志為"@Deprecated",盡量用下面的方法
@Deprecated
public B channelFactory(ChannelFactory< ? extends C> channelFactory) {
    if (channelFactory == null) {
        throw new NullPointerException("channelFactory");
    }
    if (this.channelFactory != null) {
        throw new IllegalStateException("channelFactory set already");
    }

    this.channelFactory = channelFactory;
    return (B) this;
}

// 這個(gè)方法是現(xiàn)在推薦使用的設(shè)置channelFactory的方法, 使用新類"io.netty.channel.ChannelFactory"
@SuppressWarnings({ "unchecked", "deprecation" })
public B channelFactory(io.netty.channel.ChannelFactory< ? extends C> channelFactory) {
    // 但是底層的實(shí)現(xiàn)還是調(diào)用回上面被廢棄的channelFactory()方法
    // 因?yàn)樾骂愂抢^承自舊類的,所有只要簡單轉(zhuǎn)一下類型就好
    return channelFactory((ChannelFactory<C>) channelFactory);
}

此外還有一個(gè)channel()方法可以非常方便的設(shè)置channelFactory:

public B channel(Class< ? extends C> channelClass) {
    if (channelClass == null) {
        throw new NullPointerException("channelClass");
    }
    return channelFactory(new ReflectiveChannelFactory<C>(channelClass));
}

2.3 類方法

validate()

validate()用于檢驗(yàn)所有的參數(shù), 實(shí)際代碼中檢查的是group和channelFactory兩個(gè)參數(shù), 這兩個(gè)參數(shù)必須設(shè)置不能為空:

public B validate() {
    if (group == null) {
        throw new IllegalStateException("group not set");
    }
    if (channelFactory == null) {
        throw new IllegalStateException("channel or channelFactory not set");
    }
    return (B) this;
}

register()

register()方法創(chuàng)建一個(gè)新的Channel并將它注冊(cè)到EventLoop, 在執(zhí)行前會(huì)調(diào)用validate()方法做前置檢查:

public ChannelFuture register() {
    validate();
    return initAndRegister();
}

initAndRegister()是關(guān)鍵代碼, 細(xì)細(xì)讀一下:

final ChannelFuture initAndRegister() {
    // 創(chuàng)建一個(gè)新的Channel
    final Channel channel = channelFactory().newChannel();
    try {
        // 調(diào)用抽象方法, 子類來做初始化
        init(channel);
    } catch (Throwable t) {
        // 如果出錯(cuò), 強(qiáng)行關(guān)閉這個(gè)channel
        channel.unsafe().closeForcibly();
        // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor
        return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t);
    }

    // 創(chuàng)建成功則將這個(gè)channel注冊(cè)到eventloop中
    ChannelFuture regFuture = group().register(channel);
    // 如果注冊(cè)出錯(cuò)
    if (regFuture.cause() != null) {
        // 判斷是否已經(jīng)注冊(cè)
        if (channel.isRegistered()) {
            // channel已經(jīng)注冊(cè)的就關(guān)閉
            channel.close();
        } else {
            // 還沒有注冊(cè)的就強(qiáng)行關(guān)閉
            channel.unsafe().closeForcibly();
        }
    }

    // 如果代碼走到這里而且promise沒有失敗, 那么是下面兩種情況之一:
    // 1) 如果我們嘗試了從event loop中注冊(cè), 那么現(xiàn)在注冊(cè)已經(jīng)完成
    //    現(xiàn)在可以安全的嘗試 bind()或者connect(), 因?yàn)閏hannel已經(jīng)注冊(cè)成功
    // 2) 如果我們嘗試了從另外一個(gè)線程中注冊(cè), 注冊(cè)請(qǐng)求已經(jīng)成功添加到event loop的任務(wù)隊(duì)列中等待后續(xù)執(zhí)行
    //    現(xiàn)在可以安全的嘗試 bind()或者connect():
    //         因?yàn)?bind() 或 connect() 會(huì)在安排的注冊(cè)任務(wù)之后執(zhí)行
    //         而register(), bind(), 和 connect() 都被確認(rèn)是同一個(gè)線程

    return regFuture;
}

中途調(diào)用的init()方法定義如下, 后面看具體子類代碼時(shí)再展開.

abstract void init(Channel channel) throws Exception;

bind()

bind()方法有多個(gè)重載, 差異只是bind操作所需的InetSocketAddress參數(shù)從何而來而已:

??從屬性this.localAddress來

??這個(gè)時(shí)候bind()方法無需參數(shù), 直接使用屬性this.localAddress, 當(dāng)前調(diào)用之前this.localAddress必須有賦值(通過函數(shù)localAddress()):

 public ChannelFuture bind() {
     validate();
     SocketAddress localAddress = this.localAddress;
     if (localAddress == null) {
         throw new IllegalStateException("localAddress not set");
     }
     return doBind(localAddress);
 }

從bind()方法的輸入?yún)?shù)中來

在輸入?yún)?shù)中來直接指定localAddress:

 public ChannelFuture bind(SocketAddress localAddress) {
     validate();
     if (localAddress == null) {
         throw new NullPointerException("localAddress");
     }
     return doBind(localAddress);
 }

另外為了方便, 重載了下面三個(gè)方法, 用不同的方式來創(chuàng)建InetSocketAddress而已:

public ChannelFuture bind(int inetPort) {
     return bind(new InetSocketAddress(inetPort));
 }

 public ChannelFuture bind(String inetHost, int inetPort) {
     return bind(new InetSocketAddress(inetHost, inetPort));
 }

 public ChannelFuture bind(InetAddress inetHost, int inetPort) {
     return bind(new InetSocketAddress(inetHost, inetPort));
 }

注: 使用帶參數(shù)的bind()方法, 忽略了localAddress()設(shè)定的參數(shù). 而且也沒有設(shè)置localAddress屬性. 這里的差異, 后面留意.

繼續(xù)看doBind()方法的細(xì)節(jié), 這個(gè)依然是這個(gè)類的核心內(nèi)容:

private ChannelFuture doBind(final SocketAddress localAddress) {
    // 調(diào)用initAndRegister()方法, 先初始化channel,并注冊(cè)到event loop
    final ChannelFuture regFuture = initAndRegister();
    final Channel channel = regFuture.channel();
    // 檢查注冊(cè)的channel是否出錯(cuò)
    if (regFuture.cause() != null) {
        return regFuture;
    }

    // 檢查注冊(cè)操作是否完成
    if (regFuture.isDone()) {
        // 如果完成
        // 在這個(gè)點(diǎn)上我們知道注冊(cè)已經(jīng)完成并且成功
        // 繼續(xù)bind操作, 創(chuàng)建一個(gè)ChannelPromise
        ChannelPromise promise = channel.newPromise();
        // 調(diào)用doBind0()方法來繼續(xù)真正的bind操作
        doBind0(regFuture, channel, localAddress, promise);
        return promise;
    } else {
        // 通常這個(gè)時(shí)候注冊(cè)的future應(yīng)該都已經(jīng)完成,但是萬一沒有, 我們也需要處理
        // 為這個(gè)channel創(chuàng)建一個(gè)PendingRegistrationPromise
        final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
        // 然后給注冊(cè)的future添加一個(gè)listener, 在operationComplete()回調(diào)時(shí)
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                Throwable cause = future.cause();
                // 檢查是否出錯(cuò)
                if (cause != null) {
                    // 在event loop上注冊(cè)失敗, 因此直接讓ChannelPromise失敗, 避免一旦我們?cè)噲D訪問這個(gè)channel的eventloop導(dǎo)致IllegalStateException
                    promise.setFailure(cause);
                } else {
                    // 注冊(cè)已經(jīng)成功, 因此設(shè)置正確的executor以便使用
                    // 注: 這里已經(jīng)以前有過一個(gè)bug, 有issue記錄
                    // See https://github.com/netty/netty/issues/2586
                    promise.executor = channel.eventLoop();
                }
                // 調(diào)用doBind0()方法來繼續(xù)真正的bind操作
                doBind0(regFuture, channel, localAddress, promise);
            }
        });
        return promise;
    }
}

關(guān)鍵的doBind0()方法

private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {

    // 這個(gè)方法在channelRegistered()方法觸發(fā)前被調(diào)用.
    // 讓handler有機(jī)會(huì)在它的channelRegistered()實(shí)現(xiàn)中構(gòu)建pipeline
    // 給channel的event loop增加一個(gè)一次性任務(wù)
    channel.eventLoop().execute(new OneTimeTask() {
        @Override
        public void run() {
            // 檢查注冊(cè)是否成功
            if (regFuture.isSuccess()) {
                // 如果成功則綁定localAddress到channel
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                // 如果不成功則設(shè)置錯(cuò)誤到promise
                promise.setFailure(regFuture.cause());
            }
        }
    });
}

3 類Bootstrap

類Bootstrap用于幫助客戶端引導(dǎo)Channel.

bind()方法用于無連接傳輸如datagram (UDP)。對(duì)于常規(guī)TCP鏈接,用connect()方法。

3.1 類定義

package io.netty.bootstrap;

public class Bootstrap extends AbstractBootstrap<Bootstrap, Channel> {}

3.2 類成員

resolver屬性

resolver默認(rèn)設(shè)置為DefaultAddressResolverGroup.INSTANCE, 可以通過resolver()方法來賦值:

private static final AddressResolverGroup< ? > DEFAULT_RESOLVER = DefaultAddressResolverGroup.INSTANCE;
private volatile AddressResolverGroup<SocketAddress> resolver = (AddressResolverGroup<SocketAddress>) DEFAULT_RESOLVER;

public Bootstrap resolver(AddressResolverGroup< ? > resolver) {
    if (resolver == null) {
        throw new NullPointerException("resolver");
    }
    this.resolver = (AddressResolverGroup<SocketAddress>) resolver;
    return this;
}

remoteAddress屬性

remoteAddress可以通過remoteAddress()方法賦值, 有多個(gè)重載方法:

private volatile SocketAddress remoteAddress;

public Bootstrap remoteAddress(SocketAddress remoteAddress) {
    this.remoteAddress = remoteAddress;
    return this;
}
public Bootstrap remoteAddress(String inetHost, int inetPort) {
    remoteAddress = InetSocketAddress.createUnresolved(inetHost, inetPort);
    return this;
}
public Bootstrap remoteAddress(InetAddress inetHost, int inetPort) {
    remoteAddress = new InetSocketAddress(inetHost, inetPort);
    return this;
}

3.3 類方法

validate()方法

重寫了validate()方法, 在調(diào)用AbstractBootstrap的validate()方法(檢查group和channelFactory)外, 增加了對(duì)handler的檢查:

@Override
public Bootstrap validate() {
    super.validate();
    if (handler() == null) {
        throw new IllegalStateException("handler not set");
    }
    return this;
}

connect()方法

有多個(gè)connect()方法重載, 功能都是一樣, 拿到輸入的remoteAddress然后調(diào)用doResolveAndConnect()方法:

private ChannelFuture doResolveAndConnect(SocketAddress remoteAddress, final SocketAddress localAddress) {
    // 先初始化channel并注冊(cè)到event loop
    final ChannelFuture regFuture = initAndRegister();
    if (regFuture.cause() != null) {
        // 如果注冊(cè)失敗則退出
        return regFuture;
    }

    final Channel channel = regFuture.channel();
    final EventLoop eventLoop = channel.eventLoop();
    final AddressResolver<SocketAddress> resolver = this.resolver.getResolver(eventLoop);

    if (!resolver.isSupported(remoteAddress) || resolver.isResolved(remoteAddress)) {
        // Resolver 不知道該怎么處理給定的遠(yuǎn)程地址, 或者已經(jīng)解析
        return doConnect(remoteAddress, localAddress, regFuture, channel.newPromise());
    }

    // 開始解析遠(yuǎn)程地址
    final Future<SocketAddress> resolveFuture = resolver.resolve(remoteAddress);
    final Throwable resolveFailureCause = resolveFuture.cause();

    if (resolveFailureCause != null) {
        // 如果地址解析失敗, 則立即失敗
        channel.close();
        return channel.newFailedFuture(resolveFailureCause);
    }

    if (resolveFuture.isDone()) {
        // 理解成功的解析了遠(yuǎn)程地址, 開始做連接
        return doConnect(resolveFuture.getNow(), localAddress, regFuture, channel.newPromise());
    }

    // 地址解析還沒有完成, 只能等待完成后在做connectio, 增加一個(gè)promise來操作
    final ChannelPromise connectPromise = channel.newPromise();
    resolveFuture.addListener(new FutureListener<SocketAddress>() {
        @Override
        public void operationComplete(Future<SocketAddress> future) throws Exception {
            if (future.cause() != null) {
                channel.close();
                connectPromise.setFailure(future.cause());
            } else {
                doConnect(future.getNow(), localAddress, regFuture, connectPromise);
            }
        }
    });

    return connectPromise;
}

doConnect()方法中才是真正的開始處理連接操作, 但是還是需要檢查注冊(cè)操作是否完成:

private static ChannelFuture doConnect(
        final SocketAddress remoteAddress, final SocketAddress localAddress,
        final ChannelFuture regFuture, final ChannelPromise connectPromise) {
    // 判斷一下前面的注冊(cè)操作是否已經(jīng)完成
    // 因?yàn)樽?cè)操作是異步操作, 前面只是返回一個(gè)feature, 代碼執(zhí)行到這里時(shí), 可能完成, 也可能還在進(jìn)行中
    if (regFuture.isDone()) {
        // 如果注冊(cè)已經(jīng)完成, 可以執(zhí)行連接了
        doConnect0(remoteAddress, localAddress, regFuture, connectPromise);
    } else {
        // 如果注冊(cè)還在進(jìn)行中, 增加一個(gè)ChannelFutureListener, 等操作完成之后再在回調(diào)方法中執(zhí)行連接操作
        regFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                doConnect0(remoteAddress, localAddress, regFuture, connectPromise);
            }
        });
    }

    return connectPromise;
}

異步操作就是這點(diǎn)比較麻煩, 總是需要一個(gè)一個(gè)future的做判斷/處理, 如果沒有完成還的加promise/future來依靠回調(diào)函數(shù)繼續(xù)工作處理流程.

終于到了最后的doConnect0()方法, 總算可以真的連接了:

private static void doConnect0(
        final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelFuture regFuture,
        final ChannelPromise connectPromise) {
    // 這個(gè)方法在channelRegistered()方法被觸發(fā)前調(diào)用.
    // 給我們的handler一個(gè)在它的channelRegistered()實(shí)現(xiàn)中構(gòu)建pipeline的機(jī)會(huì)
    final Channel channel = connectPromise.channel();
    // 取當(dāng)前channel的eventlopp, 執(zhí)行一個(gè)一次性任務(wù)
    channel.eventLoop().execute(new OneTimeTask() {
        @Override
        public void run() {
            if (regFuture.isSuccess()) {
                // 如果注冊(cè)成功
                if (localAddress == null) {
                    channel.connect(remoteAddress, connectPromise);
                } else {
                    channel.connect(remoteAddress, localAddress, connectPromise);
                }
                connectPromise.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                connectPromise.setFailure(regFuture.cause());
            }
        }
    });
}

init(channel)方法

前面看基類AbstractBootstrap時(shí)看到過, 這個(gè)init()方法是一個(gè)模板方法, 需要子類做具體實(shí)現(xiàn).

看看Bootstrap是怎么做channel初始化的:

@Override
@SuppressWarnings("unchecked")
void init(Channel channel) throws Exception {
    // 取channel的ChannelPipeline
    ChannelPipeline p = channel.pipeline();
    // 增加當(dāng)前Bootstrap的handle到ChannelPipeline中
    p.addLast(handler());

    // 取當(dāng)前Bootstrap設(shè)置的options, 逐個(gè)設(shè)置到channel中
    final Map<ChannelOption< ? >, Object> options = options();
    synchronized (options) {
        for (Entry<ChannelOption< ? >, Object> e: options.entrySet()) {
            try {
                if (!channel.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                    logger.warn("Unknown channel option: " + e);
                }
            } catch (Throwable t) {
                logger.warn("Failed to set a channel option: " + channel, t);
            }
        }
    }

    // 同樣取當(dāng)前Bootstrap的attrs, 逐個(gè)設(shè)置到channel中
    final Map<AttributeKey< ? >, Object> attrs = attrs();
    synchronized (attrs) {
        for (Entry<AttributeKey< ? >, Object> e: attrs.entrySet()) {
            channel.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
        }
    }
}

總結(jié)上在init()方法中, Bootstrap只做了一個(gè)事情: 將Bootstrap中保存的信息(handle/options/attrs)設(shè)置到新創(chuàng)建的channel.

clone()

深度克隆當(dāng)前Bootstrap對(duì)象,有完全一樣的配置,但是使用給定的EventLoopGroup。

這個(gè)方法適合用相同配置創(chuàng)建多個(gè)Channel。

public Bootstrap clone(EventLoopGroup group) {
        Bootstrap bs = new Bootstrap(this);
        bs.group = group;
        return bs;
    }

4 類ServerBootstrap

類ServerBootstrap用于幫助服務(wù)器端引導(dǎo)ServerChannel.

ServerBootstrap除了處理ServerChannel外, 還需要處理從ServerChannel下創(chuàng)建的Channel.Netty中稱這兩個(gè)關(guān)系為parent和child.

4.1 類定義

public class ServerBootstrap extends AbstractBootstrap<ServerBootstrap, ServerChannel> {}

4.2 類屬性

childGroup屬性

childGroup屬性用于指定處理客戶端連接的EventLoopGroup, 設(shè)置的方式有兩種:

group(parentGroup, childGroup)方法用于單獨(dú)設(shè)置parentGroup, childGroup, 分別用于處理ServerChannel和Channel.
group(group)方法設(shè)置parentGroup, childGroup為使用同一個(gè)EventLoopGroup. 注意這個(gè)方法覆蓋了基類的方法.

private volatile EventLoopGroup childGroup;

@Override
public ServerBootstrap group(EventLoopGroup group) {
    return group(group, group);
}
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) {
    super.group(parentGroup);
    if (childGroup == null) {
        throw new NullPointerException("childGroup");
    }
    if (this.childGroup != null) {
        throw new IllegalStateException("childGroup set already");
    }
    this.childGroup = childGroup;
    return this;
}
public EventLoopGroup childGroup() {
    return childGroup;
}

4.3 childOptions/childAttrs/childHandler屬性

這三個(gè)屬性和parent的基本對(duì)應(yīng), 設(shè)值方法和檢驗(yàn)都是一模一樣的:

private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();

private final Map<AttributeKey<?>, Object> childAttrs = new LinkedHashMap<AttributeKey<?>, Object>();

private volatile ChannelHandler childHandler;

4.4 類方法

init()方法

ServerBootstrap的init(channel)方法相比Bootstrap的要復(fù)雜一些, 除了設(shè)置options/attrs/handler到channel外, 還需要為child設(shè)置childGroup, childHandler, childOptions, childAttrs:

@Override
void init(Channel channel) throws Exception {
    final Map<ChannelOption< ? >, Object> options = options();
    synchronized (options) {
        channel.config().setOptions(options);
    }

    final Map<AttributeKey< ? >, Object> attrs = attrs();
    synchronized (attrs) {
        for (Entry<AttributeKey< ? >, Object> e: attrs.entrySet()) {
            @SuppressWarnings("unchecked")
            AttributeKey<Object> key = (AttributeKey<Object>) e.getKey();
            channel.attr(key).set(e.getValue());
        }
    }

    ChannelPipeline p = channel.pipeline();

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption< ? >, Object>[] currentChildOptions;
    final Entry<AttributeKey< ? >, Object>[] currentChildAttrs;
    synchronized (childOptions) {
        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));
    }
    synchronized (childAttrs) {
        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));
    }

    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }
            pipeline.addLast(new ServerBootstrapAcceptor(
                    currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
        }
    });
}

ServerBootstrapAcceptor的實(shí)現(xiàn), 主要看channelRead()方法:

private static class ServerBootstrapAcceptor extends ChannelHandlerAdapter {
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        // 獲取child channel
        final Channel child = (Channel) msg;
        // 設(shè)置childHandler到child channel
        child.pipeline().addLast(childHandler);
        // 設(shè)置childOptions到child channel
        for (Entry<ChannelOption< ? >, Object> e: childOptions) {
            try {
                if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                    logger.warn("Unknown channel option: " + e);
                }
            } catch (Throwable t) {
                logger.warn("Failed to set a channel option: " + child, t);
            }
        }

        // 設(shè)置childAttrs到child channel
        for (Entry<AttributeKey< ? >, Object> e: childAttrs) {
            child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
        }

        // 將child channel注冊(cè)到childGroup
        try {
            childGroup.register(child).addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (!future.isSuccess()) {
                        forceClose(child, future.cause());
                    }
                }
            });
        } catch (Throwable t) {
            forceClose(child, t);
        }
    }
}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容