2019-05-19 pipeline 初始化、新增、刪除操作

  • 1.pipeline的初始化

之前我們分析過,每構(gòu)造一個(gè)channel的時(shí)候會通過newChannelPipeline初始化一個(gè)pipeline;

 protected AbstractChannel(Channel parent, ChannelId id) {
        this.parent = parent;
        this.id = id;
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
    }

newChannelPipeline的實(shí)現(xiàn)邏輯,this 是當(dāng)前的channel

protected DefaultChannelPipeline newChannelPipeline() {
       return new DefaultChannelPipeline(this);
   }
protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise = new VoidChannelPromise(channel, true);

        //創(chuàng)建tail和head節(jié)點(diǎn)
        tail = new TailContext(this);
        head = new HeadContext(this);

        //構(gòu)造一個(gè)雙向鏈表的數(shù)據(jù)結(jié)構(gòu),包含head和tail兩個(gè)節(jié)點(diǎn),鏈表的元素其實(shí)是ChannelHandlerContext
        head.next = tail;
        tail.prev = head;
    }

總結(jié)下,pipeline的創(chuàng)建是在創(chuàng)建channel的時(shí)候就創(chuàng)建了。

    1. ChannelHandlerContext 解析
      首先看下ChannelHandlerContext的類繼承關(guān)系
      public interface ChannelHandlerContext extends AttributeMap, ChannelInboundInvoker, ChannelOutboundInvoker

包含三層含義:

  • extends AttributeMap : 自身可以存儲一些屬性;
  • extends ChannelInboundInvoker:可以觸發(fā)一些用戶事件,包括讀事件,注冊事件等;
  • extends ChannelOutboundInvoker: 可以觸發(fā)一些用戶事件,包括寫事件
    Channel channel();
    EventExecutor executor();
    String name();
    ChannelHandler handler();
    boolean isRemoved();
    ChannelPipeline pipeline();
    ByteBufAllocator alloc();
  • ChannelHandlerContext本身包含獲取當(dāng)前的channel,獲取當(dāng)前的NioEventloop,當(dāng)前屬于哪個(gè)ChannelHandler等等;

  • 3.HeadContext 和 TailContext

  • 3.1 TailContext繼承AbstractChannelHandlerContext

final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
       TailContext(DefaultChannelPipeline pipeline) {
           super(pipeline, null, TAIL_NAME, true, false);
           setAddComplete();
       }
}
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,boolean inbound, boolean outbound) {
        //當(dāng)前context的名字  
        this.name = ObjectUtil.checkNotNull(name, "name");
       //  當(dāng)前context所屬的pipeline
        this.pipeline = pipeline;
       //  當(dāng)前context所屬的NioEventLoop
        this.executor = executor;
       //  標(biāo)示是inboundHandler還是outboundHandler
        this.inbound = inbound;
        this.outbound = outbound;
        // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
        ordered = executor == null || executor instanceof OrderedEventExecutor;
    }

可以看出TailContext是一個(gè)inBound處理器,用于處理讀事件,注冊事件;

通過cas+自懸操作將當(dāng)前節(jié)點(diǎn)設(shè)置為已經(jīng)添加

final void setAddComplete() {
        for (; ; ) {
            int oldState = handlerState;
            if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
                return;
            }
        }
    }
  • 3.2 HeadContext

構(gòu)造函數(shù)比TailContext多了一個(gè)unsafe屬性,其余的都相同

final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {

        //unsafe 屬性實(shí)現(xiàn)底層數(shù)據(jù)的讀寫
        private final Unsafe unsafe;

        HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, HEAD_NAME, false, true);
            unsafe = pipeline.channel().unsafe();
            setAddComplete();
        }
}

基本實(shí)現(xiàn)了父類的方法,包含讀寫,注冊,異常傳播等;

  • 4.ChannelHandler的添加與刪除
  • 4.1 添加channelhandler
    在業(yè)務(wù)代碼中我們一般添加handler都是通過這樣的方式進(jìn)行添加
  ch.pipeline().addLast(new EchoServerHandler());

接下來我們看下addLast方法中都做了哪些操作?

public final ChannelPipeline addLast(ChannelHandler... handlers) {
     return addLast(null, handlers);
}
 @Override
    public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
        if (handlers == null) {
            throw new NullPointerException("handlers");
        }
        //一個(gè)一個(gè)添加
        for (ChannelHandler h : handlers) {
            if (h == null) {
                break;
            }
            addLast(executor, null, h);
        }
        return this;
    }
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            //1.判斷是否重復(fù)添加
            checkMultiplicity(handler);

            //2.構(gòu)造一個(gè)HandlerContext,如果有同名則拋異常
            newCtx = newContext(group, filterName(name, handler), handler);

            //3.添加HandlerContext
            addLast0(newCtx);

            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            //4.如果當(dāng)前線程是EventLoop,則異步觸發(fā)HandlerAdded0事件,否則直接觸發(fā)
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }
private static void checkMultiplicity(ChannelHandler handler) {
       //判斷是不是ChannelHandlerAdapter的實(shí)例
        if (handler instanceof ChannelHandlerAdapter) {
            ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
            //如果當(dāng)前handler不是用Sharable注解的并且已經(jīng)添加了,則直接拋異常
            if (!h.isSharable() && h.added) {
                throw new ChannelPipelineException(
                        h.getClass().getName() +
                                " is not a @Sharable handler, so can't be added or removed multiple times.");
            }
            //否則,標(biāo)示已經(jīng)添加
            h.added = true;
        }
    }

構(gòu)造一個(gè)DefaultChannelHandlerContext對象

 private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }
DefaultChannelHandlerContext(DefaultChannelPipeline pipeline,EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }

執(zhí)行添加操作,就是往鏈表中插入一個(gè)元素

   private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }

添加完成,觸發(fā)該handler的一個(gè)handlerAdded事件,并設(shè)置當(dāng)前handler已經(jīng)添加

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
        ctx.handler().handlerAdded(ctx);
        ctx.setAddComplete();
}
  • 4.2 刪除channelHandler
    public final ChannelPipeline remove(ChannelHandler handler) {
        remove(getContextOrDie(handler));
        return this;
    }

拿到channelHandler節(jié)點(diǎn)

private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
        AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
        if (ctx == null) {
            throw new NoSuchElementException(handler.getClass().getName());
        } else {
            return ctx;
        }
    }
public final ChannelHandlerContext context(ChannelHandler handler) {
        if (handler == null) {
            throw new NullPointerException("handler");
        }

        //從頭開始遍歷節(jié)點(diǎn),無限for循環(huán),如果遍歷到則返回,否則返回null
        AbstractChannelHandlerContext ctx = head.next;
        for (; ; ) {
            if (ctx == null) {
                return null;
            }
            if (ctx.handler() == handler) {
                return ctx;
            }
            ctx = ctx.next;
        }
    }

移除節(jié)點(diǎn),觸發(fā)HandlerRemoved事件

private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
      //當(dāng)前節(jié)點(diǎn)不是頭節(jié)點(diǎn)和尾節(jié)點(diǎn),因?yàn)橐WC線程安全,必須保證pipeline的結(jié)構(gòu)
        assert ctx != head && ctx != tail;

        synchronized (this) {
            remove0(ctx);
            if (!registered) {
                callHandlerCallbackLater(ctx, false);
                return ctx;
            }

            EventExecutor executor = ctx.executor();
            if (!executor.inEventLoop()) {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerRemoved0(ctx);
                    }
                });
                return ctx;
            }
        }
        callHandlerRemoved0(ctx);
        return ctx;
    }

移除節(jié)點(diǎn)的操作

private static void remove0(AbstractChannelHandlerContext ctx) {
        AbstractChannelHandlerContext prev = ctx.prev;
        AbstractChannelHandlerContext next = ctx.next;
        prev.next = next;
        next.prev = prev;
    }

調(diào)用對應(yīng)Handler的remove方法,最后標(biāo)示該handler已經(jīng)remove,設(shè)置remove的標(biāo)示

private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
        try {
            try {
                ctx.handler().handlerRemoved(ctx);
            } finally {
                ctx.setRemoved();
            }
        } catch (Throwable t) {
            fireExceptionCaught(new ChannelPipelineException(
                    ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
        }
    }
final void setRemoved() {
        handlerState = REMOVE_COMPLETE;
}

pipeline的操作就講到這里了。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容