pipleline和handler以及pipeline的數(shù)據(jù)流向
- 先理一下思路。首先我們考慮到之前的文章分析,沒創(chuàng)建一個channel就會創(chuàng)建一個pipeline與之對應(yīng)。每個pipeline會有AbstractChannelHandlerContext屬性的tail和head從而組成要給雙向鏈表。那么pipeline的handler添加和數(shù)據(jù)流向其實(shí)都是基于HandlerContext和雙向鏈表的性質(zhì)。下面具體分析。
當(dāng)然我們?nèi)匀幌旅孢@段代碼分析,主要分析pipeline的添加
b.group(group)
//初始化工廠ReflectiveChannelFactory為后續(xù)鏈接connect方法創(chuàng)建NioSocketChannel對象
.channel(NioSocketChannel.class)
//將選項(xiàng)添加到AbstractBootstrap屬性options. 實(shí)現(xiàn)類中Bootstrap的init(Channel channel)方法設(shè)置channel的類型
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc(), HOST, PORT));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new EchoClientHandler());
}
});
pipeline.addXXX 都有一個重載的方法, 例如 addLast, 它有一個重載的版本
直接查看DefaultChannelPipeline源碼:
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
我們一路查看下去,找到重載的方法,且記住我們?nèi)雲(yún)⒗飃roup、和name都是null
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
//檢查是否重復(fù)添加
checkMultiplicity(handler);
//創(chuàng)建DefaultChannelHandlerContext對象
newCtx = newContext(group, filterName(name, handler), handler);
// 將生成的newCtx插入handlercontex鏈表中
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
從addLast0方法看到,這里是將我們的handler添加到了tail的前面
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
```
* 還有一點(diǎn)就是上面addLast方法中newContext方法的入?yún)ilterName(name, handler),這里會生成handler名字并校驗(yàn)是否重復(fù)(有興趣可查看源碼類DefaultChannelPipeline中 generateName(ChannelHandler handler))
2、 那么inbond和outbond是決定pipleline的數(shù)據(jù)流向的關(guān)鍵。
記得我們上面的newContext方法中創(chuàng)建的DefaultChannelHandlerContext里的構(gòu)造器,有isInbound和isOutbound倆方法分別根據(jù)接口來判斷Inbound和Outbound
```java
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
一個Inbound事件通常由Inbound handler來處理。一個Inbound handler通常處理在IO線程產(chǎn)生的Inbound數(shù)據(jù)。Inbound數(shù)據(jù)通過真實(shí)的輸入操作如 SocketChannel#read(ByteBuffer)來獲取。如果一個inbound事件越過了最上面的inbound handler,該事件將會被拋棄到而不會通知你
一個outbound事件由outbound handler來處理。一個outbound handler通常由outbound流量如寫請求產(chǎn)生或者轉(zhuǎn)變的。如果一個outbound事件越過了底部的outbound handler,它將由channel關(guān)聯(lián)的IO線程處理。IO線程通常運(yùn)行的是真實(shí)的輸出操作如 SocketChannel#write(byteBuffer).
inbound 事件傳播方法:
ChannelHandlerContext#fireChannelRegistered()
ChannelHandlerContext#fireChannelActive()
ChannelHandlerContext#fireChannelRead(Object)
ChannelHandlerContext#fireChannelReadComplete()
ChannelHandlerContext#fireExceptionCaught(Throwable)
ChannelHandlerContext#fireUserEventTriggered(Object)
ChannelHandlerContext#fireChannelWritabilityChanged()
ChannelHandlerContext#fireChannelInactive()
ChannelHandlerContext#fireChannelUnregistered()
outbound事件傳播方法:
ChannelHandlerContext#bind(SocketAddress, ChannelPromise)
ChannelHandlerContext#connect(SocketAddress, SocketAddress, ChannelPromise)
ChannelHandlerContext#write(Object, ChannelPromise)
ChannelHandlerContext#flush()
ChannelHandlerContext#read()
ChannelHandlerContext#disconnect(ChannelPromise)
ChannelHandlerContext#close(ChannelPromise)
ChannelHandlerContext#deregister(ChannelPromise)
如果我們捕獲了一個事件, 并且想讓這個事件繼續(xù)傳遞下去, 那么需要調(diào)用 Context 相應(yīng)的傳播方法.
例如:
public class MyInboundHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("Connected!");
ctx.fireChannelActive();
}
}
public clas MyOutboundHandler extends ChannelOutboundHandlerAdapter {
@Override
public void close(ChannelHandlerContext ctx, ChannelPromise promise) {
System.out.println("Closing ..");
ctx.close(promise);
}
}
上面的例子中, MyInboundHandler 收到了一個 channelActive 事件, 它在處理后, 如果希望將事件繼續(xù)傳播下去, 那么需要接著調(diào)用 ctx.fireChannelActive().
Outbound 操作(outbound operations of a channel)
以connect為例
Bootstrap.connect -> Bootstrap.doResolveAndConnect -> Bootstrap.doResolveAndConnect0 ->Bootstrap.doConnect ->AbstractChannel.connect->pipeline.connect->tail.connect-> AbstractChannelHandlerContext.connect
最后我們以AbstractChannelHandlerContext.connect 的源碼分析:
@Override
public ChannelFuture connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
if (remoteAddress == null) {
throw new NullPointerException("remoteAddress");
}
if (isNotValidPromise(promise, false)) {
// cancelled
return promise;
}
// DefaultChannelPipeline 內(nèi)的雙向鏈表的 tail 開始, 不斷向前尋找第一個 outbound 為 true 的 AbstractChannelHandlerContext, 然后調(diào)用它的 invokeConnect
final AbstractChannelHandlerContext next = findContextOutbound();
//next其實(shí)是找到headContext unsafe.connect(remoteAddress, localAddress, promise);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
當(dāng)我們找到了一個 outbound 的 Context 后, 就調(diào)用它的 invokeConnect 方法,
next.invokeConnect(remoteAddress, localAddress, promise);
} else {
safeExecute(executor, new Runnable() {
@Override
public void run() {
next.invokeConnect(remoteAddress, localAddress, promise);
}
}, promise, null);
}
return promise;
}
invokeConnect這個方法中會調(diào)用 Context 所關(guān)聯(lián)著的 ChannelHandler 的 connect 方法。下面的handler()方法會返回一個handlerContex(根據(jù)上面next方法我們知道這里返回的為tailContext,但是tailContext并沒有實(shí)現(xiàn)connect方法,所以這里的connect為其父類AbstractChannelHandlerContext的connect方法。也就是說再次從上面哪個方法開始,知道執(zhí)行到headContext時,它實(shí)現(xiàn)了connect方法如下 方法三)。然后調(diào)用connect,包裝handler所以即為hanler的connect。
private void invokeConnect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
if (invokeHandler()) {
try {
//pipeline.channel().unsafe();
((ChannelOutboundHandler) handler()).connect(this, remoteAddress, localAddress, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
} else {
connect(remoteAddress, localAddress, promise);
}
}
方法三 headContext類找的connect方法
@Override
public void connect(
ChannelHandlerContext ctx,
SocketAddress remoteAddress, SocketAddress localAddress,
ChannelPromise promise) throws Exception {
unsafe.connect(remoteAddress, localAddress, promise);
}
所以最終是unsafe.connect,而這unsafe的由來我們前面也分析過,看HeadContext的構(gòu)造器unsafe = pipeline.channel().unsafe(); 所以它是來自channel。那么channel來自哪呢
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, true, true);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
還記得 .channel(NioSocketChannel.class)這里就是channel的來源
接下來我們找到unsafe是哪里創(chuàng)建的,查看NioSocketChannel構(gòu)造器
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
構(gòu)造器跟蹤流程:NioSocketChannel->AbstractNioByteChannel->AbstractChannel
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
同時newUnsafe() 并沒有再AbstractChannel實(shí)現(xiàn),而是在NioSocketChannel實(shí)現(xiàn),這是為什么呢?
protected AbstractNioUnsafe newUnsafe() {
return new NioSocketChannelUnsafe();
}
其實(shí)在子類實(shí)現(xiàn),是由于不同的協(xié)議用的Unsafe會不同,所以要根據(jù)子類區(qū)別對待
繼續(xù)跟蹤NioSocketChannelUnsafe但是該類并未實(shí)現(xiàn)connect方法,所以查找父類直到找到
AbstractNioUnsafe中的connect方法
@Override
public final void connect(
final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
//省略
//doConnect這里的實(shí)現(xiàn)在子類中
if (doConnect(remoteAddress, localAddress)) {
fulfillConnectPromise(promise, wasActive);
} else {
connectPromise = promise;
requestedRemoteAddress = remoteAddress;
// Schedule connect timeout.
int connectTimeoutMillis = config().getConnectTimeoutMillis();
if (connectTimeoutMillis > 0) {
connectTimeoutFuture = eventLoop().schedule(new Runnable() {
@Override
public void run() {
ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
ConnectTimeoutException cause =
new ConnectTimeoutException("connection timed out: " + remoteAddress);
if (connectPromise != null && connectPromise.tryFailure(cause)) {
close(voidPromise());
}
}
}, connectTimeoutMillis, TimeUnit.MILLISECONDS);
}
//省略
}
這段代碼的重點(diǎn)就在doConnect,而這個方法在該類中是沒有實(shí)現(xiàn)的,實(shí)現(xiàn)類在子類NioSocketChannel中
@Override
protected boolean doConnect(SocketAddress remoteAddress, SocketAddress localAddress) throws Exception {
if (localAddress != null) {
doBind0(localAddress);
}
boolean success = false;
try {
boolean connected = SocketUtils.connect(javaChannel(), remoteAddress);
if (!connected) {
selectionKey().interestOps(SelectionKey.OP_CONNECT);
}
success = true;
return connected;
} finally {
if (!success) {
doClose();
}
}
}
注意:上面這個方法就是和java的NIO聯(lián)系的地方了
重點(diǎn)分析:
1、首先doBind0方法 使用SocketUtils.bind(javaChannel(), localAddress);
其中的javaChannel()根據(jù)java版本選擇nio還是bio
private void doBind0(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
SocketUtils.bind(javaChannel(), localAddress);
} else {
SocketUtils.bind(javaChannel().socket(), localAddress);
}
}
在NIO中javaChannel其實(shí)獲取SelectableChannel,這里SelectableChannel在之前介紹過。所以bind方法
最后調(diào)用socketChannel.bind(address); nio的在前面已介紹這里不再贅述。至此connect方法就追蹤到這里。
總結(jié)connect事件在outbound中的順序,結(jié)合上面Bootstrap.connect最后到達(dá)handler的connect就形成了下面這個循環(huán)
Context.connect -> Connect.findContextOutbound -> next.invokeConnect -> handler.connect -> Context.connect
直到head中的connect,這里我們上面分析過了。所以從connect事件來管中窺豹的話。就借用官網(wǎng)的數(shù)據(jù)流程圖吧
參考官網(wǎng)的事件流轉(zhuǎn)圖
I/O Request
via Channel or
ChannelHandlerContext
|
+---------------------------------------------------+---------------+
| ChannelPipeline | |
| \|/ |
| +---------------------+ +-----------+----------+ |
| | Inbound Handler N | | Outbound Handler 1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler N-1 | | Outbound Handler 2 | |
| +----------+----------+ +-----------+----------+ |
| /|\ . |
| . . |
| ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
| [ method call] [method call] |
| . . |
| . \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 2 | | Outbound Handler M-1 | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
| | \|/ |
| +----------+----------+ +-----------+----------+ |
| | Inbound Handler 1 | | Outbound Handler M | |
| +----------+----------+ +-----------+----------+ |
| /|\ | |
+---------------+-----------------------------------+---------------+
| \|/
+---------------+-----------------------------------+---------------+
| | | |
| [ Socket.read() ] [ Socket.write() ] |
| |
| Netty Internal I/O Threads (Transport Implementation) |
+-------------------------------------------------------------------+
- 由于篇幅過長下篇繼續(xù)講解Inbound事件的源碼