pipeline的來(lái)源
- 繼續(xù)跟蹤源碼EchoClient中得Bootstrap類(lèi)。
根據(jù)前面得分析總結(jié)如下過(guò)程:
Bootstrap中channel(NioSocketChannel.class)
-》NioSocketChannel的初始化時(shí)創(chuàng)建-》NioSocketChannel this(DEFAULT_SELECTOR_PROVIDER)
-》NioSocketChannel中this(newSocket(provider)) newSocket方法中的參數(shù)provider會(huì)根據(jù)操作系統(tǒng)選擇不同的SelectorProvider(SelectorProvider DEFAULT_SELECTOR_PROVIDER = SelectorProvider.provider();)。 創(chuàng)建socketchannel方法provider.openSocketChannel(),SelectorProvider的用法可參考nio中的
-》繼續(xù)調(diào)用構(gòu)造器this(null, socket);其中的socket為newSocket方法創(chuàng)建的
-》接下來(lái)調(diào)用父類(lèi)的構(gòu)造器和初始化cofnig
查看NioSocketChannel的構(gòu)造代碼:
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
-》 轉(zhuǎn)到父類(lèi)的構(gòu)造器中
/**
* Create a new instance
*
* @param parent the parent {@link Channel} by which this instance was created. May be {@code null}
* @param ch the underlying {@link SelectableChannel} on which it operates
*/
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}
--》 繼續(xù)調(diào)用父類(lèi)構(gòu)造器會(huì)發(fā)現(xiàn)又一次掉用父類(lèi)的構(gòu)造器 super(parent);
--》 下面找到我們費(fèi)盡心機(jī)的pipeline的創(chuàng)建如下:
/**
* Creates a new instance.
*
* @param parent
* the parent of this channel. {@code null} if there's no parent.
*/
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
--》 上面的newChannelPipeline方法其實(shí)調(diào)用DefaultChannelPipeline的構(gòu)造器我們跟蹤一下piple的創(chuàng)建
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
* 從上面的構(gòu)造方法中可以推算Pipeline包含為一個(gè)雙向鏈表,且會(huì)有一個(gè)頭和尾,類(lèi)圖如下
總結(jié):每創(chuàng)建要給channel中會(huì)創(chuàng)建一個(gè)pipeline. pipeline為一個(gè)雙向鏈表,會(huì)有一個(gè)頭和尾
頭尾,從兩者繼承關(guān)系來(lái)看基本一致。區(qū)別在于實(shí)現(xiàn)接口不同,head實(shí)現(xiàn)ChannelOutboundHandler, ChannelInboundHandler而tail實(shí)現(xiàn)ChannelInboundHandler


再看下他們的父類(lèi) AbstractChannelHandlerContext 的構(gòu)造器, 分別以參數(shù) inbound , outbound .來(lái)區(qū)分head和tail
結(jié)合他們實(shí)現(xiàn)的接口,header 是一個(gè) outboundHandler, 而 tail 是一個(gè)inboundHandler。
2 piple是如何將handler起到作用的呢?
1、 ChannelInitializer 的添加
初始化 Bootstrap, 會(huì)添加我們自定義的 ChannelHandler, 就以我們熟悉的 EchoClient 來(lái)舉例
Bootstrap b = new Bootstrap();
b.group(group)
//初始化工廠ReflectiveChannelFactory為后續(xù)鏈接connect方法創(chuàng)建NioSocketChannel對(duì)象
.channel(NioSocketChannel.class)
//將選項(xiàng)添加到AbstractBootstrap屬性options. 實(shí)現(xiàn)類(lèi)中Bootstrap的init(Channel channel)方法設(shè)置channel的類(lèi)型
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoClientHandler());
}
});
// Start the client.
//包含channel(注意channel可抽象為socket鏈接來(lái)理解)實(shí)例化Bootstrap.connect -> Bootstrap.doConnect -> AbstractBootstrap.initAndRegister(最終channelFactory.newChannel();這里的factory就是前面設(shè)置的ReflectiveChannelFactory)
// NioSocketChannel最終newSocket 來(lái)打開(kāi)一個(gè)新的 Java NIO SocketChannel, 最后調(diào)用父類(lèi)AbstractChannel(Channel parent)
ChannelFuture f = b.connect(HOST, PORT).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
在調(diào)用 handler 時(shí), 傳入了 ChannelInitializer 對(duì)象, 它提供了一個(gè) initChannel 方法供我們初始化 ChannelHandler. 那么這個(gè)初始化過(guò)程是怎樣的呢
源碼追蹤:首先 Bootstrap中調(diào)用handler方法 .handler(new ChannelInitializer<SocketChannel>(),將ChannelInitializer父類(lèi)AbstractBootstrap中方法
/**
* the {@link ChannelHandler} to use for serving the requests.
*/
public B handler(ChannelHandler handler) {
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
return self();
}
初始化后的使用過(guò)程: 其實(shí)再ChannelFuture f = b.connect(HOST, PORT).sync();中的connect方法中,一路看下去會(huì)在父類(lèi)AbstractBootstrap找到initAndRegister()初始化這樓里
是一個(gè)很重要的方法。暫時(shí)我們只關(guān)注 init(channel);這里是我們要關(guān)注的地方因?yàn)檫@里是channel和pipeline交互的關(guān)鍵,而且他的具體實(shí)現(xiàn)在Bootstrap中
@Override
@SuppressWarnings("unchecked")
void init(Channel channel) throws Exception {
ChannelPipeline p = channel.pipeline();
// config使用bootstrap.handler(),就是最初ChannelInitializer,可參考上面分析
p.addLast(config.handler());
final Map<ChannelOption<?>, Object> options = options0();
synchronized (options) {
//設(shè)置channel類(lèi)型
setChannelOptions(channel, options, logger);
}
最終我們要關(guān)注 p.addLast(config.handler());這里將我們出事的handler添加到了pipleline的末端(pipleline的結(jié)構(gòu)借本介紹在前面已介紹)
為了添加一個(gè) handler 到 pipeline 中, 必須把此 handler 包裝成 ChannelHandlerContext. 因此在上面的代碼中我們可以看到新實(shí)例化了一個(gè) newCtx 對(duì)象, 并將 handler 作為參數(shù)傳遞到構(gòu)造方法中
最后追蹤到DefaultChannelPipeline中方法
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
//檢查是否重名
checkMultiplicity(handler);
//創(chuàng)建DefaultChannelHandlerContext對(duì)象
newCtx = newContext(group, filterName(name, handler), handler);
// 將生成的newCtx插入handlercontex鏈表中,addLast0方法會(huì)將newCtx插入tail之前
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
- 我們來(lái)關(guān)注下newContext方法是如何構(gòu)造
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
//isInbound isOutbound 方法返回true false
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
ChannelInitializer的繼承關(guān)系如下圖:

下一節(jié)再具體分析pipeline和handler的具體實(shí)現(xiàn)細(xì)節(jié):