- 1.pipeline的初始化
之前我們分析過,每構(gòu)造一個(gè)channel的時(shí)候會通過newChannelPipeline初始化一個(gè)pipeline;
protected AbstractChannel(Channel parent, ChannelId id) {
this.parent = parent;
this.id = id;
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
newChannelPipeline的實(shí)現(xiàn)邏輯,this 是當(dāng)前的channel
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
//創(chuàng)建tail和head節(jié)點(diǎn)
tail = new TailContext(this);
head = new HeadContext(this);
//構(gòu)造一個(gè)雙向鏈表的數(shù)據(jù)結(jié)構(gòu),包含head和tail兩個(gè)節(jié)點(diǎn),鏈表的元素其實(shí)是ChannelHandlerContext
head.next = tail;
tail.prev = head;
}
總結(jié)下,pipeline的創(chuàng)建是在創(chuàng)建channel的時(shí)候就創(chuàng)建了。
- ChannelHandlerContext 解析
首先看下ChannelHandlerContext的類繼承關(guān)系
public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker
- ChannelHandlerContext 解析
包含三層含義:
- extends AttributeMap : 自身可以存儲一些屬性;
- extends ChannelInboundInvoker:可以觸發(fā)一些用戶事件,包括讀事件,注冊事件等;
- extends ChannelOutboundInvoker: 可以觸發(fā)一些用戶事件,包括寫事件
Channel channel();
EventExecutor executor();
String name();
ChannelHandler handler();
boolean isRemoved();
ChannelPipeline pipeline();
ByteBufAllocator alloc();
ChannelHandlerContext本身包含獲取當(dāng)前的channel,獲取當(dāng)前的NioEventloop,當(dāng)前屬于哪個(gè)ChannelHandler等等;
3.HeadContext 和 TailContext
3.1 TailContext繼承AbstractChannelHandlerContext
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, true, false);
setAddComplete();
}
}
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,boolean inbound, boolean outbound) {
//當(dāng)前context的名字
this.name = ObjectUtil.checkNotNull(name, "name");
// 當(dāng)前context所屬的pipeline
this.pipeline = pipeline;
// 當(dāng)前context所屬的NioEventLoop
this.executor = executor;
// 標(biāo)示是inboundHandler還是outboundHandler
this.inbound = inbound;
this.outbound = outbound;
// Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
ordered = executor == null || executor instanceof OrderedEventExecutor;
}
可以看出TailContext是一個(gè)inBound處理器,用于處理讀事件,注冊事件;
通過cas+自懸操作將當(dāng)前節(jié)點(diǎn)設(shè)置為已經(jīng)添加
final void setAddComplete() {
for (; ; ) {
int oldState = handlerState;
if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
return;
}
}
}
- 3.2 HeadContext
構(gòu)造函數(shù)比TailContext多了一個(gè)unsafe屬性,其余的都相同
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
//unsafe 屬性實(shí)現(xiàn)底層數(shù)據(jù)的讀寫
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, false, true);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
}
基本實(shí)現(xiàn)了父類的方法,包含讀寫,注冊,異常傳播等;
- 4.ChannelHandler的添加與刪除
- 4.1 添加channelhandler
在業(yè)務(wù)代碼中我們一般添加handler都是通過這樣的方式進(jìn)行添加
ch.pipeline().addLast(new EchoServerHandler());
接下來我們看下addLast方法中都做了哪些操作?
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
//一個(gè)一個(gè)添加
for (ChannelHandler h : handlers) {
if (h == null) {
break;
}
addLast(executor, null, h);
}
return this;
}
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
//1.判斷是否重復(fù)添加
checkMultiplicity(handler);
//2.構(gòu)造一個(gè)HandlerContext,如果有同名則拋異常
newCtx = newContext(group, filterName(name, handler), handler);
//3.添加HandlerContext
addLast0(newCtx);
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
//4.如果當(dāng)前線程是EventLoop,則異步觸發(fā)HandlerAdded0事件,否則直接觸發(fā)
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
private static void checkMultiplicity(ChannelHandler handler) {
//判斷是不是ChannelHandlerAdapter的實(shí)例
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
//如果當(dāng)前handler不是用Sharable注解的并且已經(jīng)添加了,則直接拋異常
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
//否則,標(biāo)示已經(jīng)添加
h.added = true;
}
}
構(gòu)造一個(gè)DefaultChannelHandlerContext對象
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
DefaultChannelHandlerContext(DefaultChannelPipeline pipeline,EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
執(zhí)行添加操作,就是往鏈表中插入一個(gè)元素
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
添加完成,觸發(fā)該handler的一個(gè)handlerAdded事件,并設(shè)置當(dāng)前handler已經(jīng)添加
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
ctx.handler().handlerAdded(ctx);
ctx.setAddComplete();
}
- 4.2 刪除channelHandler
public final ChannelPipeline remove(ChannelHandler handler) {
remove(getContextOrDie(handler));
return this;
}
拿到channelHandler節(jié)點(diǎn)
private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
if (ctx == null) {
throw new NoSuchElementException(handler.getClass().getName());
} else {
return ctx;
}
}
public final ChannelHandlerContext context(ChannelHandler handler) {
if (handler == null) {
throw new NullPointerException("handler");
}
//從頭開始遍歷節(jié)點(diǎn),無限for循環(huán),如果遍歷到則返回,否則返回null
AbstractChannelHandlerContext ctx = head.next;
for (; ; ) {
if (ctx == null) {
return null;
}
if (ctx.handler() == handler) {
return ctx;
}
ctx = ctx.next;
}
}
移除節(jié)點(diǎn),觸發(fā)HandlerRemoved事件
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
//當(dāng)前節(jié)點(diǎn)不是頭節(jié)點(diǎn)和尾節(jié)點(diǎn),因?yàn)橐WC線程安全,必須保證pipeline的結(jié)構(gòu)
assert ctx != head && ctx != tail;
synchronized (this) {
remove0(ctx);
if (!registered) {
callHandlerCallbackLater(ctx, false);
return ctx;
}
EventExecutor executor = ctx.executor();
if (!executor.inEventLoop()) {
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerRemoved0(ctx);
}
});
return ctx;
}
}
callHandlerRemoved0(ctx);
return ctx;
}
移除節(jié)點(diǎn)的操作
private static void remove0(AbstractChannelHandlerContext ctx) {
AbstractChannelHandlerContext prev = ctx.prev;
AbstractChannelHandlerContext next = ctx.next;
prev.next = next;
next.prev = prev;
}
調(diào)用對應(yīng)Handler的remove方法,最后標(biāo)示該handler已經(jīng)remove,設(shè)置remove的標(biāo)示
private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
try {
try {
ctx.handler().handlerRemoved(ctx);
} finally {
ctx.setRemoved();
}
} catch (Throwable t) {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
}
}
final void setRemoved() {
handlerState = REMOVE_COMPLETE;
}
pipeline的操作就講到這里了。