Netty源碼-ChannelPipeline和ChannelHandler

1 概述

在Netty事件模型中,在發(fā)生網(wǎng)絡(luò)事件(如Read,Write,Connect)等事件后,是通過注冊(cè)在Pipeline中的一個(gè)個(gè)Handler對(duì)事件進(jìn)行處理的,這種采用多Handler對(duì)事件進(jìn)行處理可以對(duì)事件的處理進(jìn)行邏輯分層,比如在經(jīng)典的編碼、解碼處理中,可以注冊(cè)一個(gè)專門的Handler對(duì)報(bào)文進(jìn)行編碼或者解碼,編碼或者解碼之后的報(bào)文再傳遞給下一個(gè)Handler進(jìn)行處理。另外Netty采用這種Pipeline這種串行的Handler處理各種事件,避免了線程的上下文切換,減少了多線程環(huán)境對(duì)鎖的依賴,也能在一定程度上提高性能。

ChannelPipelineChannelHandler的容器,負(fù)責(zé)管理一系列的ChannelHandler,對(duì)到來的事件進(jìn)行處理。

ChannelHandler則是對(duì)事件處理的一個(gè)個(gè)處理器,分為兩種類型,即ChannelInboundHandlerChannelOutboundHandler,分別負(fù)責(zé)處理Netty中的Inbound和Outbound事件,從兩個(gè)接口中定義的函數(shù)可以知道Inbound和Outbound事件分別有哪些:

ChannelInboundHandler接口函數(shù)定義.png
ChannelOutboundHandler接口函數(shù)定義.png

當(dāng)然在``接口源碼注釋中也列出了Inbound和Outbound方法:

Inbound event propagation methods:

  • ChannelHandlerContext.fireChannelRegistered()
  • ChannelHandlerContext.fireChannelActive()
  • ChannelHandlerContext.fireChannelRead(Object)
  • ChannelHandlerContext.fireChannelReadComplete()
  • ChannelHandlerContext.fireExceptionCaught(Throwable)
  • ChannelHandlerContext.fireUserEventTriggered(Object)
  • ChannelHandlerContext.fireChannelWritabilityChanged()
  • ChannelHandlerContext.fireChannelInactive()
  • ChannelHandlerContext.fireChannelUnregistered()

Outbound event propagation methods:

  • ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
  • ChannelHandlerContext.connect(SocketAddress, SocketAddress, hannelPromise)
  • ChannelHandlerContext.write(Object, ChannelPromise)
  • ChannelHandlerContext.flush()
  • ChannelHandlerContext.read()
  • ChannelHandlerContext.disconnect(ChannelPromise)
  • ChannelHandlerContext.close(ChannelPromise)
  • ChannelHandlerContext.deregister(ChannelPromise)

Pipeline中可以注冊(cè)多個(gè)InboundHandler和多個(gè)OutboundHandler,并使用雙向鏈表連接起來,對(duì)于收到的Inbound或Outbound事件會(huì)調(diào)用相關(guān)類型的Handler進(jìn)行處理,但是Inbound和Outbound事件執(zhí)行handler的順序是不一樣的,Inbound事件則是從前往后調(diào)用handler,最后一個(gè)被調(diào)用的是尾節(jié)點(diǎn);對(duì)于Outbound事件則是從后往前調(diào)用,最后一個(gè)執(zhí)行的是頭結(jié)點(diǎn)。

下面我們分別介紹Netty中ChannelPipelineChannelHandler的相關(guān)實(shí)現(xiàn)。

2 ChannelPipeline

2.1 接口(類)結(jié)構(gòu)

2.1.1 重要域

在這里,我們直接看ChannelPipeline在Netty中的默認(rèn)實(shí)現(xiàn)DefaultChannelPipeline。首先我們要說明一下,雖然說ChannelPipelineChannelHandler的容器,但是ChannelPipeline并不是直接持有ChannelHandler的,ChannelHandler會(huì)被封裝成ChannelHandlerContext,ChannelPipeline則使用雙鏈表持有一個(gè)個(gè)的ChannelHandlerContext。

我們先看DefaultChannelPipeline重要域:

//DefaultChannelPipeline
//該P(yáng)ipeline關(guān)聯(lián)的Channel,由構(gòu)造函數(shù)傳進(jìn)來
private final Channel channel;
//pipeline持有一些列的context,但是其頭和尾context是不可配置的,
//在構(gòu)造函數(shù)中被初始化
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
//這個(gè)平時(shí)沒怎么使用,但是在后文介紹如何向pipeline中添加handler時(shí)
//會(huì)介紹,pipeline默認(rèn)使用channel注冊(cè)的executor執(zhí)行任務(wù),但是也可以在
//向pipeline中加入handler時(shí)傳入EventExecutorGroup,然后從該線程組
//選出線程執(zhí)行任務(wù),傳入的線程組和第一次選出的線程executor會(huì)記錄在
//childExecutors,這樣后面可以保證在向該pipeline添加handler時(shí),如果
//配置了SINGLE_EVENTEXECUTOR_PER_GROUP參數(shù)為true,即單線程執(zhí)行任務(wù),
//childExecutors記錄每個(gè)group第一次選出的executor則可以在下次添加
//handler取出直接使用,保證單線,這個(gè)后面在介紹childExecutor方法時(shí)會(huì)
//再次介紹
private Map<EventExecutorGroup, EventExecutor> childExecutors;

關(guān)于上面提到的配置參數(shù)SINGLE_EVENTEXECUTOR_PER_GROUP,可見參考文章介紹如下:

Netty參數(shù),單線程執(zhí)行ChannelPipeline中的事件,默認(rèn)值為True。該值控制執(zhí)行ChannelPipeline中執(zhí)行ChannelHandler的線程。如果為True,整個(gè)pipeline由一個(gè)線程執(zhí)行,這樣不需要進(jìn)行線程切換以及線程同步,是Netty4的推薦做法;如果為False,ChannelHandler中的處理過程會(huì)由Group中的不同線程執(zhí)行。

但是個(gè)人認(rèn)為上面引用中的整個(gè)pipeline由一個(gè)線程執(zhí)行不太準(zhǔn)確,只能說如果傳入同一個(gè)group,且配置為true,則可以保證由該group中的同一個(gè)線程處理,而不是整個(gè)pipeline由一個(gè)線程執(zhí)行,這個(gè)后面介紹childExecutor再看。

2.1.2 內(nèi)部類

DefaultChannelPipeline重要內(nèi)部類有兩個(gè),也就是上面介紹到默認(rèn)切不可更改的headtail節(jié)點(diǎn),分別為HeadContextTailContext。

這里暫時(shí)不介紹這兩個(gè)類的具體實(shí)現(xiàn),我們?cè)诮榻BChannelHandler時(shí)再介紹。

2.2 重要方法

下面再看DefaultChannelPipeline重要方法,先介紹DefaultChannelPipeline的構(gòu)造函數(shù):

//DefaultChannelPipeline
protected DefaultChannelPipeline(Channel channel) {
    //記錄該pipeline關(guān)聯(lián)的channel
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    succeededFuture = new SucceededChannelFuture(channel, null);
    voidPromise =  new VoidChannelPromise(channel, true);

    //初始化head和tail節(jié)點(diǎn)
    tail = new TailContext(this);
    head = new HeadContext(this);

    //將首尾節(jié)點(diǎn)連接起來
    head.next = tail;
    tail.prev = head;
}

除了構(gòu)造函數(shù),我們將DefaultChannelPipeline剩下的方法分為三類,第一類是負(fù)責(zé)向pipeline中添加、刪除或者替換handler,另一類負(fù)責(zé)觸發(fā)Inbound handler,最后一類則負(fù)責(zé)觸發(fā)Outbound handler:

2.2.1 Handler添加、刪除方法

這類方法主要包含如下方法:

向Pipeline中添加handler

  • addAfter
  • addBefore
  • addFirst
  • addLast

從Pipeline中移除handler

  • remove
  • removeFirst
  • removeIfExists
  • removeLast

替換Pipeline中的某個(gè)handler

  • replace

為了節(jié)省篇幅,我們介紹一個(gè)方法的實(shí)現(xiàn):addLast(EventExecutorGroup executor, ChannelHandler... handlers)

//DefaultChannelPipeline
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
    if (handlers == null) {
        throw new NullPointerException("handlers");
    }

    for (ChannelHandler h: handlers) {
        if (h == null) {
            break;
        }
        //將傳入的Handler一個(gè)個(gè)添加到pipeline中
        addLast(executor, null, h);
    }

    return this;
}


public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        checkMultiplicity(handler);
        //上面介紹過pipeline中連接的實(shí)際上是封裝了handler的
        //context
        newCtx = newContext(group, filterName(name, handler), handler);

        //將該context連接到鏈表尾端,但是在tail之前
        addLast0(newCtx);

        // If the registered is false it means that the channel was not registered on an eventloop yet.
        // In this case we add the context to the pipeline and add a task that will call
        // ChannelHandler.handlerAdded(...) once the channel is registered.
        if (!registered) {
            newCtx.setAddPending();
            callHandlerCallbackLater(newCtx, true);
            return this;
        }

        EventExecutor executor = newCtx.executor();
        if (!executor.inEventLoop()) {
            newCtx.setAddPending();
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    callHandlerAdded0(newCtx);
                }
            });
            return this;
        }
    }
    callHandlerAdded0(newCtx);
    return this;
}

//新建一個(gè)新的DefaultChannelHandlerContext
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
    return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}

//處理傳入的線程組group
private EventExecutor childExecutor(EventExecutorGroup group) {
    //如果沒有傳入線程組,則返回空
    if (group == null) {
        return null;
    }
    //獲取SINGLE_EVENTEXECUTOR_PER_GROUP配置,
    //上面介紹過
    Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
    //如果不是采用單一線程執(zhí)行,則調(diào)用next方法選出一個(gè)線程返回
    if (pinEventExecutor != null && !pinEventExecutor) {
        return group.next();
    }

    //如果執(zhí)行到這里,表示傳入了線程組,并且
    //SINGLE_EVENTEXECUTOR_PER_GROUP配置為Ture


    //獲取記錄的group和第一次從該線程組選出的線程
    Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
    //如果為空,則新建一個(gè)IdentityHashMap,這里為什么使用
    //IdentityHashMap,可以看下IdentityHashMap的實(shí)現(xiàn)原理
    if (childExecutors == null) {
        // Use size of 4 as most people only use one extra EventExecutor.
        childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
    }
    // Pin one of the child executors once and remember it so that the same child executor
    // is used to fire events for the same channel.
    //找出group第一次調(diào)用時(shí)選出的線程
    EventExecutor childExecutor = childExecutors.get(group);
    //為空的話,則表示第一次使用該group,則從該group選出一個(gè)線程,
    //并放入該Map中
    if (childExecutor == null) {
        childExecutor = group.next();
        childExecutors.put(group, childExecutor);
    }
    return childExecutor;
}

private void addLast0(AbstractChannelHandlerContext newCtx) {
    //可見雖然是addLast,但是還是放在tail節(jié)點(diǎn)之前
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev;
    newCtx.next = tail;
    prev.next = newCtx;
    tail.prev = newCtx;
}

private void addFirst0(AbstractChannelHandlerContext newCtx) {
    //addFirst0也是同理,添加到列表最前面,但是在head節(jié)點(diǎn)之后
    AbstractChannelHandlerContext nextCtx = head.next;
    newCtx.prev = head;
    newCtx.next = nextCtx;
    head.next = newCtx;
    nextCtx.prev = newCtx;
}

2.2.2 Inbound事件相關(guān)方法

觸發(fā)處理Inbound事件Handler的相關(guān)方法如下:

//DefaultChannelPipeline    
 @Override
public final ChannelPipeline fireChannelActive() {
    AbstractChannelHandlerContext.invokeChannelActive(head);
    return this;
}

@Override
public final ChannelPipeline fireChannelInactive() {
    AbstractChannelHandlerContext.invokeChannelInactive(head);
    return this;
}

@Override
public final ChannelPipeline fireExceptionCaught(Throwable cause) {
    AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
    return this;
}

@Override
public final ChannelPipeline fireUserEventTriggered(Object event) {
    AbstractChannelHandlerContext.invokeUserEventTriggered(head, event);
    return this;
}

@Override
public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}

@Override
public final ChannelPipeline fireChannelReadComplete() {
    AbstractChannelHandlerContext.invokeChannelReadComplete(head);
    return this;
}

@Override
public final ChannelPipeline fireChannelWritabilityChanged() {
    AbstractChannelHandlerContext.invokeChannelWritabilityChanged(head);
    return this;
}

從上面的方法定義可知,Inbound事件第一個(gè)被觸發(fā)的Handler是head節(jié)點(diǎn)對(duì)應(yīng)的handler,我們舉例看下fireChannelRead方法的具體實(shí)現(xiàn):

//DefaultChannelPipeline  
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
    AbstractChannelHandlerContext.invokeChannelRead(head, msg);
    return this;
}

//AbstractChannelHandlerContext
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
    final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
    //獲取該context對(duì)應(yīng)的線程
    EventExecutor executor = next.executor();
    //如果當(dāng)前線程就是該context的線程,則直接在該線程執(zhí)行,否則
    //將該任務(wù)放入線程的任務(wù)隊(duì)列中
    if (executor.inEventLoop()) {
        next.invokeChannelRead(m);
    } else {
        executor.execute(new Runnable() {
            @Override
            public void run() {
                next.invokeChannelRead(m);
            }
        });
    }
}


 @Override
public EventExecutor executor() {
    //如果線程為空的話,默認(rèn)返回channel注冊(cè)的線程
    if (executor == null) {
        return channel().eventLoop();
    } else {
        return executor;
    }
}

//獲取context封裝的handler,并執(zhí)行相應(yīng)的方法
private void invokeChannelRead(Object msg) {
    if (invokeHandler()) {
        try {
            ((ChannelInboundHandler) handler()).channelRead(this, msg);
        } catch (Throwable t) {
            notifyHandlerException(t);
        }
    } else {
        //獲取下一個(gè)Inbound handler并執(zhí)行
        fireChannelRead(msg);
    }
}

@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
    invokeChannelRead(findContextInbound(), msg);
    return this;
}

//對(duì)于Inbound事件,從當(dāng)前context出發(fā),從前往后找
private AbstractChannelHandlerContext findContextInbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.next;
    } while (!ctx.inbound);
    return ctx;
}

除此之外,在一個(gè)handler處理完之后,想調(diào)用下一個(gè)handler繼續(xù)處理,可以調(diào)用如下方法:

//AbstractChannelHandlerContext
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
    //調(diào)用findContextInbound從找到當(dāng)前節(jié)點(diǎn)后面的第一個(gè)Inbound類型的
    //handler,并觸發(fā)相應(yīng)的函數(shù)
    invokeChannelRead(findContextInbound(), msg);
    return this;
}

2.2.3 Outbound事件相關(guān)方法

Outbond事件發(fā)生時(shí),會(huì)觸發(fā)Outbound類型的handler,流程和上面Inbound事件觸發(fā)Inbound handler的流程類似,這里不再贅述,但是要注意的是,Outbound觸發(fā)Outbound類型的handler是從后向前調(diào)用的,最后一個(gè)調(diào)用head。

@Override
public final ChannelFuture bind(SocketAddress localAddress) {
    return tail.bind(localAddress);
}

@Override
public final ChannelFuture connect(SocketAddress remoteAddress) {
    return tail.connect(remoteAddress);
}

@Override
public final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
    return tail.connect(remoteAddress, localAddress);
}

@Override
public final ChannelFuture disconnect() {
    return tail.disconnect();
}

@Override
public final ChannelFuture close() {
    return tail.close();
}

...

這里大概列一下DefaultChannelPipeline.write方法的源碼:

//DefaultChannelPipeline
@Override
public final ChannelFuture write(Object msg, ChannelPromise promise) {
    return tail.write(msg, promise);
}

@Override
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
    if (msg == null) {
        throw new NullPointerException("msg");
    }

    try {
        if (isNotValidPromise(promise, true)) {
            ReferenceCountUtil.release(msg);
            // cancelled
            return promise;
        }
    } catch (RuntimeException e) {
        ReferenceCountUtil.release(msg);
        throw e;
    }
    write(msg, false, promise);

    return promise;
}

private void write(Object msg, boolean flush, ChannelPromise promise) {
    //從后往前找outbound handler
    AbstractChannelHandlerContext next = findContextOutbound();
    final Object m = pipeline.touch(msg, next);
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        if (flush) {
            next.invokeWriteAndFlush(m, promise);
        } else {
            next.invokeWrite(m, promise);
        }
    } else {
        AbstractWriteTask task;
        if (flush) {
            task = WriteAndFlushTask.newInstance(next, m, promise);
        }  else {
            task = WriteTask.newInstance(next, m, promise);
        }
        safeExecute(executor, task, promise, m);
    }
}

private AbstractChannelHandlerContext findContextOutbound() {
    //從當(dāng)前節(jié)點(diǎn),往前找outbound handler
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.prev;
    } while (!ctx.outbound);
    return ctx;
}

3 幾個(gè)重要的ChannelHandler

相信通過上面的介紹,已經(jīng)知道了ChannelHandler是怎么被調(diào)用執(zhí)行相關(guān)方法的。下面我們介紹幾個(gè)重要的ChannelHandler實(shí)現(xiàn)。

  • HeadContext

HeadContext擴(kuò)展了AbstractChannelHandlerContext,也實(shí)現(xiàn)了ChannelOutboundHandler,ChannelInboundHandler,既是Inbound handler,也是outbound handler。

其作為第一個(gè)被調(diào)用的Inbound handler,其Inbound相關(guān)方法沒有做什么實(shí)際工作,僅僅觸發(fā)下一個(gè)handler,如

//HeadContext
 @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ctx.fireChannelRead(msg);
}

而作為最后一個(gè)被調(diào)用的outbound handler,其Outbound相關(guān)方法則進(jìn)行實(shí)際的操作,如:

//HeadContext
//調(diào)用unsafe.flush實(shí)際向channel寫數(shù)據(jù)
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
    unsafe.flush();
}
  • TailContext
    TailContext擴(kuò)展了AbstractChannelHandlerContext,并實(shí)現(xiàn)了ChannelInboundHandler接口,是一個(gè)Inbound handler,因?yàn)镮nbound事件從前往后調(diào)用Inbound handler,所以TailContext是最后一個(gè)被調(diào)用的Inbound handler,這里我們盡看一個(gè)有意思的方法:
//TailContext
 @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //如果channelRead事件能夠成功被傳遞到tail節(jié)點(diǎn),會(huì)執(zhí)行此方法
    //作為DefaultChannelPipeline內(nèi)部類,調(diào)用DefaultChannelPipeline
    //的onUnhandledInboundMessage方法
    onUnhandledInboundMessage(msg);
}

//DefaultChannelPipeline
protected void onUnhandledInboundMessage(Object msg) {
    //記錄日志,告訴用戶msg被傳到了tail節(jié)點(diǎn),需要檢查是否沒有
    //在tail節(jié)點(diǎn)之前配置正確的inbound進(jìn)行處理
    try {
        logger.debug(
                "Discarded inbound message {} that reached at the tail of the pipeline. " +
                        "Please check your pipeline configuration.", msg);
    } finally {
        //如下面源碼所示,如果是ReferenceCounted類型,嘗試進(jìn)行
        //釋放操作
        ReferenceCountUtil.release(msg);
    }
}

//ReferenceCountUtil
public static boolean release(Object msg) {
    if (msg instanceof ReferenceCounted) {
        return ((ReferenceCounted) msg).release();
    }
    return false;
}

  • ChannelInitializer
    ChannelInitializer主要用于channel初始化,一般用于在channelRegistered方法中向channel的pipeline中注冊(cè)相關(guān)的handler,ChannelInitializer的特別之處是注冊(cè)完handler之后,會(huì)將自己從pipeline的handler鏈表中刪除,僅僅會(huì)被執(zhí)行一次:
//ChannelInitializer
 @SuppressWarnings("unchecked")
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
// Normally this method will never be called as handlerAdded(...) should call initChannel(...) and remove
// the handler.
//調(diào)用初始化函數(shù)
if (initChannel(ctx)) {
    // we called initChannel(...) so we need to call now pipeline.fireChannelRegistered() to ensure we not
    // miss an event.
    ctx.pipeline().fireChannelRegistered();
} else {
    // Called initChannel(...) before which is the expected behavior, so just forward the event.
    ctx.fireChannelRegistered();
}
}

 @SuppressWarnings("unchecked")
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
        try {
            //調(diào)用提供給子類重寫的初始化函數(shù)
            initChannel((C) ctx.channel());
        } catch (Throwable cause) {
            // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
            // We do so to prevent multiple calls to initChannel(...).
            exceptionCaught(ctx, cause);
        } finally {
            //從pipeline的handler鏈表中移除自己
            remove(ctx);
        }
        return true;
    }
    return false;
}

//從pipeline的handler鏈表中移除自己
private void remove(ChannelHandlerContext ctx) {
    try {
        ChannelPipeline pipeline = ctx.pipeline();
        if (pipeline.context(this) != null) {
            pipeline.remove(this);
        }
    } finally {
        initMap.remove(ctx);
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容