Netty源碼深度解析-Pipeline(1) Pipeline的構(gòu)造

導(dǎo)讀

原創(chuàng)文章,轉(zhuǎn)載請注明出處。

本文源碼地址:netty-source-code-analysis

本文所使用的netty版本4.1.6.Final:帶注釋的netty源碼

Pipeline這個(gè)詞翻譯過來就是“流水線”的意思,讀到這里有了解過設(shè)計(jì)模式的同學(xué)應(yīng)該已經(jīng)想到了,這里用到的是“責(zé)任鏈模式”。本文我們將以DefaultChannelPipeline為例看一下Pipeline的構(gòu)造以及其中重要的數(shù)據(jù)結(jié)構(gòu)。

1 和Pipeline相關(guān)的其他組件

1.1 ChannnelHandler

這是ChannelHandler中的注釋,翻譯過來就是“處理IO事件或者攔截IO操作,并且將其向ChannelPipeline中的下一個(gè)handler傳遞”,說白了就是在責(zé)任鏈中注冊的一系列回調(diào)方法。

Handles an I/O event or intercepts an I/O operation, and forwards it to its next handler in
its ChannelPipeline

這里的I/O event就是很多書中提到的“入站事件”,而I/O operation就是很多書中提到的“出站事件”,前面我說過,這里我并不準(zhǔn)備這么叫,按我的理解我習(xí)慣把這兩者稱之為“事件”和“命令”。很顯然這里eventoperation的含義是不一樣的,event更多地多表示事件發(fā)生了,我們被動(dòng)地收到,而operation則表示我們主動(dòng)地發(fā)起一個(gè)動(dòng)作或者命令。

1.2 ChannelHandlerContext

每一個(gè)ChannelHandler在被添加進(jìn)ChannelPipeline時(shí)會(huì)被包裝進(jìn)一個(gè)ChannelHandlerContext。有兩個(gè)特殊的ChannelHandlerContext除外,分別是HeadContextTailContext,HeadContext繼承了ChannelInBoundHandlerChannelOutBoundHandler,而TailContext繼承了ChannelInBoundHandler。
每個(gè)ChannelHandlerContext中有兩個(gè)指針nextprev,這是用來將多個(gè)ChannelHandlerContext構(gòu)成雙向鏈表的。

2 Pipeline的構(gòu)造方法

我們以DefaultChannelPipeline為例,從它的構(gòu)造方法開始。這里首先將Channel保存到Pipeline的屬性中,又初始化了兩個(gè)屬性succeedFuturevoidPromise。這是兩個(gè)特殊的可以共享的Promise,這兩個(gè)Promise不是重點(diǎn),不理解也沒關(guān)系。

接下來的tailhead是兩個(gè)特殊的ChannelHandlerContext,這兩個(gè)是Pipeline中的重要組件。

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);

    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

Pipeline在執(zhí)行完構(gòu)造方法以后的結(jié)構(gòu)如下圖所示,headtail構(gòu)成了最簡單的雙向鏈表。

Pipeline的初始化

圖中藍(lán)色填充的就是ChannelHandlerContext,目前只有HeadContextTailContext,ChannelHandlerContext中的較窄的矩形表示ChannelHandler,由于HeadContextTailContext并沒有包含ChannelHandler,而是繼承ChannelHandler,所以這里我們用虛線表示。上下貫通的ChannelHandler表示既是ChannelInBoundHandler又是ChannelOutBoundHandler,只有上半部分的表示是ChannelInBoundHandler,只有下半部分的表示是ChannelOutBoundHandler。

3 添加ChannelHandler

ChannelPipeline中有很多以add開頭的方法,這些方法就是向ChannelPipeline中添加ChannelHandler的方法。

  • addAfter:向某個(gè)ChannelHandler后邊添加
  • addBefore:向某個(gè)ChannelHandler前面添加
  • addFirst:添加到頭部,不能在head的前面,而是緊挨著head,在head的后面
  • addLast:添加到尾部,不能在tail的后面,而是緊挨著tail,在tail的前面

我們以最常用的的addLast方法為例來分析一下Pipeline中添加ChannelHandler的操作。
這里所貼出的addLast方法其實(shí)我們已經(jīng)在“服務(wù)端啟動(dòng)流程”這篇文章中打過照面了。方法參數(shù)中的EventExecutorGroup意味著我們可以為這個(gè)ChannelHandler單獨(dú)設(shè)置Excutor而不使用Channel所綁定的EventLoop,一般情況下我們不這么做,所以group參數(shù)為null

這里先把ChannelHandler包裝成ChannelHandlerContext,再添加到尾部,隨后調(diào)用ChannelHandlerHandlerAdded方法。

在調(diào)用HandlerAdded方法時(shí)有一點(diǎn)問題,添加ChannelHandler的操作不需要在EventLoop線程中進(jìn)行,而HandlerAdded方法則必須在EventLoop線程中進(jìn)行。也就是說存在添加Handler時(shí)還未綁定EventLoop的情況,此時(shí)則調(diào)用newCtx.setAddPending()將當(dāng)前HandlerContext設(shè)置為ADD_PENDING狀態(tài),并且調(diào)用callHandlerCallbackLater(newCtx, true)將一個(gè)異步任務(wù)添加到一個(gè)單向隊(duì)鏈表中,即pendingHandlerCallbackHead這個(gè)鏈表。

如果當(dāng)前已經(jīng)綁定了EventLoop,則看當(dāng)前調(diào)用線程是否為EventLoop線程,如果不是則向EventLoop提交一個(gè)異步任務(wù)調(diào)用callHandlerAdded0方法,否則直接調(diào)用callHandlerAdded0方法。

下面咱們依次分析一下newContextcallHandlerCallbackLatercallHandlerAdd0方法。

@Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
           //先把`handler`包裝成`HandlerContext`
            newCtx = newContext(group, filterName(name, handler), handler);
            //添加到尾部
            addLast0(newCtx);
            //如果還未綁定`EventLoop`則稍后再發(fā)起對`HandlerAdded`方法的調(diào)用。
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }
            //如果已經(jīng)綁定了EventLoop,并且當(dāng)前線程非EventLoop線程的話就提交一個(gè)異步任務(wù),就發(fā)起一個(gè)異步任務(wù)去調(diào)用HandlerAdded方法。
            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        //如果當(dāng)前線程是EventLoop線程,就直接調(diào)用HandlerAdded方法。
        callHandlerAdded0(newCtx);
        return this;
    }

3.1 newContext

先看來一下newContext方法,這里直接調(diào)用了DefaultChannelHandlerContext的構(gòu)造方法,咱們跟進(jìn)去看看。

private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
    return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}

DefaultChannelHandlerContext的構(gòu)造方法中又調(diào)用了父類AbstractChannelHandlerContext的構(gòu)造方法,保存了handler屬性。在調(diào)用父類構(gòu)造方法之前調(diào)用了isInboudisOutbound方法判斷當(dāng)前的Handler是否為ChannelInBoundHandler或者ChannelOutBoundHandler,這兩個(gè)方法很簡單,不再展開。

DefaultChannelHandlerContext(
        DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
    super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
    if (handler == null) {
        throw new NullPointerException("handler");
    }
    this.handler = handler;
}

接下來看AbstractChannelHandlerContext的構(gòu)造方法,這里非常簡單,保存了幾個(gè)屬性,咱們看一下ordered這個(gè)屬性。ordered表示EventExecutor在執(zhí)行異步任務(wù)時(shí)是否按添加順序執(zhí)行,這里一般情況下executornull,表示使用Channel所綁定的EventLoop線程,而EventLoop線程都是OrderedEventExecutor的實(shí)現(xiàn)類。所以這里我們不考慮orderedfalse的情況。

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                              boolean inbound, boolean outbound) {
    this.name = ObjectUtil.checkNotNull(name, "name");
    this.pipeline = pipeline;
    this.executor = executor;
    this.inbound = inbound;
    this.outbound = outbound;
    // Its ordered if its driven by the EventLoop or the given Executor is an instanceof OrderedEventExecutor.
    ordered = executor == null || executor instanceof OrderedEventExecutor;
}

上面提到了ChannelHandlerContext可以在構(gòu)造方法里單獨(dú)指定EventExecutor,如果沒有單獨(dú)指定的話就使用Channel所綁定的EventLoop,代碼在哪里呢,就在AbstractChannelHandlerContext#executor方法,非常簡單,如果沒有為當(dāng)前ChannelHandler指定excutor則返回Channel所綁定的EventLoop

@Override
public EventExecutor executor() {
    if (executor == null) {
        return channel().eventLoop();
    } else {
        return executor;
    }
}

3.2 callHandlerCallbackLater

在添加完ChannelHandler之后將調(diào)用ChannledHandlerhandlerAdded方法,但是此時(shí)有可能還未綁定EventLoop,而handlerAdded方法的調(diào)用必須在EventLoop線程內(nèi)執(zhí)行,此時(shí)就需要調(diào)用callHandlerCallbackLater方法在pendingHandlerCallbackHead鏈表中添加一個(gè)PendingHandlerAddedTask。

private void callHandlerCallbackLater(AbstractChannelHandlerContext ctx, boolean added) {
    assert !registered;

    PendingHandlerCallback task = added ? new PendingHandlerAddedTask(ctx) : new PendingHandlerRemovedTask(ctx);
    PendingHandlerCallback pending = pendingHandlerCallbackHead;
    if (pending == null) {
        pendingHandlerCallbackHead = task;
    } else {
        // Find the tail of the linked-list.
        while (pending.next != null) {
            pending = pending.next;
        }
        pending.next = task;
    }
}

接下來咱們看一下PendingHandlerAddedTask的代碼,邏輯在execute方法里,這里直接調(diào)用了callHandlerAdded0。

private final class PendingHandlerAddedTask extends PendingHandlerCallback {

    PendingHandlerAddedTask(AbstractChannelHandlerContext ctx) {
        super(ctx);
    }

    @Override
    public void run() {
        callHandlerAdded0(ctx);
    }

    @Override
    void execute() {
        EventExecutor executor = ctx.executor();
        if (executor.inEventLoop()) {
            callHandlerAdded0(ctx);
        } else {
            try {
                executor.execute(this);
            } catch (RejectedExecutionException e) {
                
            }
        }
    }
}

3.3 callHandlerAdded0

不管是在未綁定EventLoop的情況下延遲調(diào)用handlerAdded還是在已經(jīng)綁定了EventLoop的情況下立即調(diào)用HandlerAdded,最終都會(huì)調(diào)用到callHandlerAdded0方法。這里干了兩件事,一是調(diào)用ChannelHandlerhandlerAdded方法,二是將HandlerContext的狀態(tài)設(shè)置為ADD_COMPLETE狀態(tài)。

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
    try {
        ctx.handler().handlerAdded(ctx);
        ctx.setAddComplete();
    } catch (Throwable t) {
           
}

3.4 添加多個(gè)ChannelHandler后的Pipeline

還記得咱們的“Netty整體架構(gòu)圖”嗎,在這里咱們把Pipeline部分單獨(dú)放大拿出來看一下,在添加完多個(gè)ChannelHandler之后,Pipeline的結(jié)構(gòu)是這樣的。

多ChannelHandler的Pipeline

4 刪除ChannelHandler

Pipeline中有幾個(gè)以remove開頭的方法,這些方法的作用就是刪除ChannelHandler

  • remove(ChannelHandler handler):從headtail查找,用==判斷是否為同一實(shí)例,只刪除第1個(gè)。
  • remove(Class<T> handlerType):從headtail查找,用isAssignableFrom方法判斷是否為符合條件的類型,只刪除第1個(gè)。
  • remove(String name):從headtail查找,用name精確匹配查找,只刪除第1個(gè),因?yàn)?code>name不能重復(fù),所以這里刪除第1個(gè)也是唯一的1個(gè)。
  • removeFirst:刪除head的后一個(gè),不能刪除tail。
  • removeLast:刪除tail的前一個(gè),不能刪除head。

上述無論哪種刪除方式在查找到對應(yīng)的HandlerContext后都會(huì)調(diào)用到remove(final AbstractChannelHandlerContext ctx)方法,查找過程比較簡單,咱們不再展開,直接看remove(final AbstractChannelHandlerContext ctx)方法。

看看這個(gè)方法的實(shí)現(xiàn),是不是和addLast(EventExecutorGroup group, String name, ChannelHandler handler)很相似,非常相似。首先從雙向鏈表中刪除ChannelHandlerContext,再調(diào)用callHandlerRemoved0方法,callHandlerRemoved0方法內(nèi)會(huì)調(diào)用handlerRemoved方法,這個(gè)調(diào)用必須在EventLoop線程內(nèi)進(jìn)行。如果刪除時(shí)還未綁定EventLoop則添加一個(gè)異步任務(wù)到鏈表pendingHandlerCallbackHead中。

如果已經(jīng)綁定了EventLoop并且當(dāng)前線程非EventLoop線程則向EventLoop提交一個(gè)異步任務(wù),否則直接調(diào)用callHandlerRemoved0方法。

private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
    synchronized (this) {
        //從雙向鏈表中刪除`ChannelHandlerContext`
        remove0(ctx);
        
        //如果還未綁定`EventLoop`,則稍后調(diào)用`handlerRemoved`方法
        if (!registered) {
            callHandlerCallbackLater(ctx, false);
            return ctx;
        }
        //如果已經(jīng)綁定了`EventLoop`,但是當(dāng)前線程非`EventLoop`線程的話,就發(fā)起一個(gè)異步任務(wù)調(diào)用callHandlerRemoved0方法
        EventExecutor executor = ctx.executor();
        if (!executor.inEventLoop()) {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    callHandlerRemoved0(ctx);
                }
            });
            return ctx;
        }
    }
    //如果當(dāng)前線程就是`EventLoop`線程,則直接調(diào)用callHandlerRemoved0方法。
    callHandlerRemoved0(ctx);
    return ctx;
}

callHandlerCallbackLater方法咱們前面已經(jīng)分析過,和添加ChannelHandler時(shí)不同的是,這里向鏈表添加的是PendingHandlerRemovedTask,這個(gè)類也很簡單,不再展開。

這里咱們只看一下callHandlerRemoved0方法。這個(gè)方法很簡單,調(diào)用handlerRemoved方法,再把ChannelHandlerContext的狀態(tài)設(shè)置為REMOVE_COMPLETE。

private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
    // Notify the complete removal.
    try {
        try {
            ctx.handler().handlerRemoved(ctx);
        } finally {
            ctx.setRemoved();
        }
    } catch (Throwable t) {
        fireExceptionCaught(new ChannelPipelineException(
                ctx.handler().getClass().getName() + ".handlerRemoved() has thrown an exception.", t));
    }
}

4 pendingHandlerCallbackHead鏈表中的任務(wù)什么時(shí)候調(diào)用

AbstractUnsaferegister0方法中,在綁定EventLoop以后,會(huì)調(diào)用pipeline.invokeHandlerAddedIfNeeded()方法,我們看一下pipeline.invokeHandlerAddedIfNeeded()方法。

private void register0(ChannelPromise promise) {
try {
    
    // 去完成那些在綁定EventLoop之前觸發(fā)的添加handler操作,這些操作被放在pipeline中的pendingHandlerCallbackHead中,是個(gè)鏈表
    pipeline.invokeHandlerAddedIfNeeded();
    
}

invokeHandlerAddedIfNeeded方法調(diào)用了callHandlerAddedForAllHandlers方法,咱們接著看下去。

final void invokeHandlerAddedIfNeeded() {
    assert channel.eventLoop().inEventLoop();
    if (firstRegistration) {
        firstRegistration = false;

        callHandlerAddedForAllHandlers();
    }
}

callHandlerAddedForAllHandlers方法的邏輯咱就不再展開來說了,非常簡單,就是遍歷pendingHandlerCallbackHead這個(gè)單向鏈表,依次調(diào)用每個(gè)元素的execute方法,并且清空這個(gè)單向鏈表。

private void callHandlerAddedForAllHandlers() {
    final PendingHandlerCallback pendingHandlerCallbackHead;
    synchronized (this) {
        assert !registered;

        registered = true;

        pendingHandlerCallbackHead = this.pendingHandlerCallbackHead;

        this.pendingHandlerCallbackHead = null;
    }

    PendingHandlerCallback task = pendingHandlerCallbackHead;
    while (task != null) {
        task.execute();
        task = task.next;
    }
}

5 總結(jié)

Pipeline中的最重要的數(shù)據(jù)結(jié)構(gòu)就是由多個(gè)ChannelHandlerContext組成的雙向鏈表,而每個(gè)ChannelHandlerContext中包含一個(gè)ChannelHandler,ChannelHandler既可以添加也可以刪除。在Pipeline中有兩個(gè)特殊的ChannelHandlerContext分別是HeadContextTailContext,這兩個(gè)ChannelHandlerContext中不包含ChannelHandler,而是采用繼承的方式。HeadContext實(shí)現(xiàn)了ChannelOutBoundHandlerChannelInBoundHandler,而TailContext實(shí)現(xiàn)了ChannelInBoundHandler。


關(guān)于作者

王建新,轉(zhuǎn)轉(zhuǎn)架構(gòu)部資深Java工程師,主要負(fù)責(zé)服務(wù)治理、RPC框架、分布式調(diào)用跟蹤、監(jiān)控系統(tǒng)等。愛技術(shù)、愛學(xué)習(xí),歡迎聯(lián)系交流。
原創(chuàng)文章,碼字不易,點(diǎn)贊分享,手有余香。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容