netty源碼分析之pipeline(一)

通過前面的源碼系列文章中的netty reactor線程三部曲,我們已經(jīng)知道,netty的reactor線程就像是一個(gè)發(fā)動(dòng)機(jī),驅(qū)動(dòng)著整個(gè)netty框架的運(yùn)行,而服務(wù)端的綁定新連接的建立正是發(fā)動(dòng)機(jī)的導(dǎo)火線,將發(fā)動(dòng)機(jī)點(diǎn)燃

netty在服務(wù)端端口綁定和新連接建立的過程中會建立相應(yīng)的channel,而與channel的動(dòng)作密切相關(guān)的是pipeline這個(gè)概念,pipeline像是可以看作是一條流水線,原始的原料(字節(jié)流)進(jìn)來,經(jīng)過加工,最后輸出

本文,我將以新連接的建立為例分為以下幾個(gè)部分給你介紹netty中的pipeline是怎么玩轉(zhuǎn)起來的

  • pipeline 初始化
  • pipeline 添加節(jié)點(diǎn)
  • pipeline 刪除節(jié)點(diǎn)

pipeline 初始化

新連接的建立這篇文章中,我們已經(jīng)知道了創(chuàng)建NioSocketChannel的時(shí)候會將netty的核心組件創(chuàng)建出來

channel中的核心組件

pipeline是其中的一員,在下面這段代碼中被創(chuàng)建

AbstractChannel

protected AbstractChannel(Channel parent) {
    this.parent = parent;
    id = newId();
    unsafe = newUnsafe();
    pipeline = newChannelPipeline();
}

AbstractChannel

protected DefaultChannelPipeline newChannelPipeline() {
    return new DefaultChannelPipeline(this);
}

DefaultChannelPipeline

protected DefaultChannelPipeline(Channel channel) {
    this.channel = ObjectUtil.checkNotNull(channel, "channel");
    tail = new TailContext(this);
    head = new HeadContext(this);

    head.next = tail;
    tail.prev = head;
}

pipeline中保存了channel的引用,創(chuàng)建完pipeline之后,整個(gè)pipeline是這個(gè)樣子的

pipeline默認(rèn)結(jié)構(gòu)

pipeline中的每個(gè)節(jié)點(diǎn)是一個(gè)ChannelHandlerContext對象,每個(gè)context節(jié)點(diǎn)保存了它包裹的執(zhí)行器 ChannelHandler 執(zhí)行操作所需要的上下文,其實(shí)就是pipeline,因?yàn)閜ipeline包含了channel的引用,可以拿到所有的context信息

默認(rèn)情況下,一條pipeline會有兩個(gè)節(jié)點(diǎn),head和tail,后面的文章我們具體分析這兩個(gè)特殊的節(jié)點(diǎn),今天我們重點(diǎn)放在pipeline

pipeline添加節(jié)點(diǎn)

下面是一段非常常見的客戶端代碼

bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         p.addLast(new Spliter())
         p.addLast(new Decoder());
         p.addLast(new BusinessHandler())
         p.addLast(new Encoder());
     }
});

首先,用一個(gè)spliter將來源TCP數(shù)據(jù)包拆包,然后將拆出來的包進(jìn)行decoder,傳入業(yè)務(wù)處理器BusinessHandler,業(yè)務(wù)處理完encoder,輸出

整個(gè)pipeline結(jié)構(gòu)如下

pipeline結(jié)構(gòu)

我用兩種顏色區(qū)分了一下pipeline中兩種不同類型的節(jié)點(diǎn),一個(gè)是 ChannelInboundHandler,處理inBound事件,最典型的就是讀取數(shù)據(jù)流,加工處理;還有一種類型的Handler是 ChannelOutboundHandler, 處理outBound事件,比如當(dāng)調(diào)用writeAndFlush()類方法時(shí),就會經(jīng)過該種類型的handler

不管是哪種類型的handler,其外層對象 ChannelHandlerContext 之間都是通過雙向鏈表連接,而區(qū)分一個(gè) ChannelHandlerContext到底是in還是out,在添加節(jié)點(diǎn)的時(shí)候我們就可以看到netty是怎么處理的

DefaultChannelPipeline

@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
    return addLast(null, handlers);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
    for (ChannelHandler h: handlers) {
        addLast(executor, null, h);
    }
    return this;
}
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
    final AbstractChannelHandlerContext newCtx;
    synchronized (this) {
        // 1.檢查是否有重復(fù)handler
        checkMultiplicity(handler);
        // 2.創(chuàng)建節(jié)點(diǎn)
        newCtx = newContext(group, filterName(name, handler), handler);
        // 3.添加節(jié)點(diǎn)
        addLast0(newCtx);
    }
   
    // 4.回調(diào)用戶方法
    callHandlerAdded0(handler);
    
    return this;
}

這里簡單地用synchronized方法是為了防止多線程并發(fā)操作pipeline底層的雙向鏈表

我們還是逐步分析上面這段代碼

1.檢查是否有重復(fù)handler

在用戶代碼添加一條handler的時(shí)候,首先會查看該handler有沒有添加過

private static void checkMultiplicity(ChannelHandler handler) {
    if (handler instanceof ChannelHandlerAdapter) {
        ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
        if (!h.isSharable() && h.added) {
            throw new ChannelPipelineException(
                    h.getClass().getName() +
                    " is not a @Sharable handler, so can't be added or removed multiple times.");
        }
        h.added = true;
    }
}

netty使用一個(gè)成員變量added標(biāo)識一個(gè)channel是否已經(jīng)添加,上面這段代碼很簡單,如果當(dāng)前要添加的Handler是非共享的,并且已經(jīng)添加過,那就拋出異常,否則,標(biāo)識該handler已經(jīng)添加

由此可見,一個(gè)Handler如果是sharable的,就可以無限次被添加到pipeline中,我們客戶端代碼如果要讓一個(gè)Handler被共用,只需要加一個(gè)@Sharable標(biāo)注即可,如下

@Sharable
public class BusinessHandler {
    
}

而如果Handler是sharable的,一般就通過spring的注入的方式使用,不需要每次都new 一個(gè)

isSharable() 方法正是通過該Handler對應(yīng)的類是否標(biāo)注@Sharable來實(shí)現(xiàn)的

ChannelHandlerAdapter

public boolean isSharable() {
   Class<?> clazz = getClass();
    Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
    Boolean sharable = cache.get(clazz);
    if (sharable == null) {
        sharable = clazz.isAnnotationPresent(Sharable.class);
        cache.put(clazz, sharable);
    }
    return sharable;
}

這里也可以看到,netty為了性能優(yōu)化到極致,還使用了ThreadLocal來緩存Handler的狀態(tài),高并發(fā)海量連接下,每次有新連接添加Handler都會創(chuàng)建調(diào)用此方法

2.創(chuàng)建節(jié)點(diǎn)

回到主流程,看創(chuàng)建上下文這段代碼

newCtx = newContext(group, filterName(name, handler), handler);

這里我們需要先分析 filterName(name, handler) 這段代碼,這個(gè)函數(shù)用于給handler創(chuàng)建一個(gè)唯一性的名字

private String filterName(String name, ChannelHandler handler) {
    if (name == null) {
        return generateName(handler);
    }
    checkDuplicateName(name);
    return name;
}

顯然,我們傳入的name為null,netty就給我們生成一個(gè)默認(rèn)的name,否則,檢查是否有重名,檢查通過的話就返回

netty創(chuàng)建默認(rèn)name的規(guī)則為 簡單類名#0,下面我們來看些具體是怎么實(shí)現(xiàn)的

private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
        new FastThreadLocal<Map<Class<?>, String>>() {
    @Override
    protected Map<Class<?>, String> initialValue() throws Exception {
        return new WeakHashMap<Class<?>, String>();
    }
};

private String generateName(ChannelHandler handler) {
    // 先查看緩存中是否有生成過默認(rèn)name
    Map<Class<?>, String> cache = nameCaches.get();
    Class<?> handlerType = handler.getClass();
    String name = cache.get(handlerType);
    // 沒有生成過,就生成一個(gè)默認(rèn)name,加入緩存 
    if (name == null) {
        name = generateName0(handlerType);
        cache.put(handlerType, name);
    }

    // 生成完了,還要看默認(rèn)name有沒有沖突
    if (context0(name) != null) {
        String baseName = name.substring(0, name.length() - 1);
        for (int i = 1;; i ++) {
            String newName = baseName + i;
            if (context0(newName) == null) {
                name = newName;
                break;
            }
        }
    }
    return name;
}

netty使用一個(gè) FastThreadLocal(后面的文章會細(xì)說)變量來緩存Handler的類和默認(rèn)名稱的映射關(guān)系,在生成name的時(shí)候,首先查看緩存中有沒有生成過默認(rèn)name(簡單類名#0),如果沒有生成,就調(diào)用generateName0()生成默認(rèn)name,然后加入緩存

接下來還需要檢查name是否和已有的name有沖突,調(diào)用context0(),查找pipeline里面有沒有對應(yīng)的context

private AbstractChannelHandlerContext context0(String name) {
    AbstractChannelHandlerContext context = head.next;
    while (context != tail) {
        if (context.name().equals(name)) {
            return context;
        }
        context = context.next;
    }
    return null;
}

context0()方法鏈表遍歷每一個(gè) ChannelHandlerContext,只要發(fā)現(xiàn)某個(gè)context的名字與待添加的name相同,就返回該context,最后拋出異常,可以看到,這個(gè)其實(shí)是一個(gè)線性搜索的過程

如果context0(name) != null 成立,說明現(xiàn)有的context里面已經(jīng)有了一個(gè)默認(rèn)name,那么就從 簡單類名#1 往上一直找,直到找到一個(gè)唯一的name,比如簡單類名#3

如果用戶代碼在添加Handler的時(shí)候指定了一個(gè)name,那么要做到事僅僅為檢查一下是否有重復(fù)

private void checkDuplicateName(String name) {
    if (context0(name) != null) {
        throw new IllegalArgumentException("Duplicate handler name: " + name);
    }
}

處理完name之后,就進(jìn)入到創(chuàng)建context的過程,由前面的調(diào)用鏈得知,group為null,因此childExecutor(group)也返回null

DefaultChannelPipeline

private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
    return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}

private EventExecutor childExecutor(EventExecutorGroup group) {
    if (group == null) {
        return null;
    }
    //..
}

DefaultChannelHandlerContext

DefaultChannelHandlerContext(
        DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
    super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
    if (handler == null) {
        throw new NullPointerException("handler");
    }
    this.handler = handler;
}

構(gòu)造函數(shù)中,DefaultChannelHandlerContext將參數(shù)回傳到父類,保存Handler的引用,進(jìn)入到其父類

AbstractChannelHandlerContext

AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
                              boolean inbound, boolean outbound) {
    this.name = ObjectUtil.checkNotNull(name, "name");
    this.pipeline = pipeline;
    this.executor = executor;
    this.inbound = inbound;
    this.outbound = outbound;
}

netty中用兩個(gè)字段來表示這個(gè)channelHandlerContext屬于inBound還是outBound,或者兩者都是,兩個(gè)boolean是通過下面兩個(gè)小函數(shù)來判斷(見上面一段代碼)

DefaultChannelHandlerContext

private static boolean isInbound(ChannelHandler handler) {
    return handler instanceof ChannelInboundHandler;
}

private static boolean isOutbound(ChannelHandler handler) {
    return handler instanceof ChannelOutboundHandler;
}

通過instanceof關(guān)鍵字根據(jù)接口類型來判斷,因此,如果一個(gè)Handler實(shí)現(xiàn)了兩類接口,那么他既是一個(gè)inBound類型的Handler,又是一個(gè)outBound類型的Handler,比如下面這個(gè)類

ChannelDuplexHandler

常用的,將decode操作和encode操作合并到一起的codec,一般會繼承 MessageToMessageCodec,而MessageToMessageCodec就是繼承ChannelDuplexHandler

MessageToMessageCodec

public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends ChannelDuplexHandler {

    protected abstract void encode(ChannelHandlerContext ctx, OUTBOUND_IN msg, List<Object> out)
            throws Exception;

    protected abstract void decode(ChannelHandlerContext ctx, INBOUND_IN msg, List<Object> out)
            throws Exception;
 }

context 創(chuàng)建完了之后,接下來終于要將創(chuàng)建完畢的context加入到pipeline中去了

3.添加節(jié)點(diǎn)

private void addLast0(AbstractChannelHandlerContext newCtx) {
    AbstractChannelHandlerContext prev = tail.prev;
    newCtx.prev = prev; // 1
    newCtx.next = tail; // 2
    prev.next = newCtx; // 3
    tail.prev = newCtx; // 4
}

用下面這幅圖可見簡單的表示這段過程,說白了,其實(shí)就是一個(gè)雙向鏈表的插入操作

添加節(jié)點(diǎn)過程

操作完畢,該context就加入到pipeline中

添加節(jié)點(diǎn)之后

到這里,pipeline添加節(jié)點(diǎn)的操作就完成了,你可以根據(jù)此思路掌握所有的addxxx()系列方法

4.回調(diào)用戶方法

AbstractChannelHandlerContext

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
    ctx.handler().handlerAdded(ctx);
    ctx.setAddComplete();
}

到了第四步,pipeline中的新節(jié)點(diǎn)添加完成,于是便開始回調(diào)用戶代碼 ctx.handler().handlerAdded(ctx);,常見的用戶代碼如下

AbstractChannelHandlerContext

public class DemoHandler extends SimpleChannelInboundHandler<...> {
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        // 節(jié)點(diǎn)被添加完畢之后回調(diào)到此
        // do something
    }
}

接下來,設(shè)置該節(jié)點(diǎn)的狀態(tài)

AbstractChannelHandlerContext

final void setAddComplete() {
    for (;;) {
        int oldState = handlerState;
        if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
            return;
        }
    }
}

用cas修改節(jié)點(diǎn)的狀態(tài)至:REMOVE_COMPLETE(說明該節(jié)點(diǎn)已經(jīng)被移除) 或者 ADD_COMPLETE

pipeline刪除節(jié)點(diǎn)

netty 有個(gè)最大的特性之一就是Handler可插拔,做到動(dòng)態(tài)編織pipeline,比如在首次建立連接的時(shí)候,需要通過進(jìn)行權(quán)限認(rèn)證,在認(rèn)證通過之后,就可以將此context移除,下次pipeline在傳播事件的時(shí)候就就不會調(diào)用到權(quán)限認(rèn)證處理器

下面是權(quán)限認(rèn)證Handler最簡單的實(shí)現(xiàn),第一個(gè)數(shù)據(jù)包傳來的是認(rèn)證信息,如果校驗(yàn)通過,就刪除此Handler,否則,直接關(guān)閉連接

public class AuthHandler extends SimpleChannelInboundHandler<ByteBuf> {
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf data) throws Exception {
        if (verify(authDataPacket)) {
            ctx.pipeline().remove(this);
        } else {
            ctx.close();
        }
    }

    private boolean verify(ByteBuf byteBuf) {
        //...
    }
}

重點(diǎn)就在 ctx.pipeline().remove(this) 這段代碼

@Override
public final ChannelPipeline remove(ChannelHandler handler) {
    remove(getContextOrDie(handler));
    
    return this;
}

remove操作相比add簡單不少,分為三個(gè)步驟:

1.找到待刪除的節(jié)點(diǎn)
2.調(diào)整雙向鏈表指針刪除
3.回調(diào)用戶函數(shù)

1.找到待刪除的節(jié)點(diǎn)

DefaultChannelPipeline

private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
    AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
    if (ctx == null) {
        throw new NoSuchElementException(handler.getClass().getName());
    } else {
        return ctx;
    }
}

@Override
public final ChannelHandlerContext context(ChannelHandler handler) {
    if (handler == null) {
        throw new NullPointerException("handler");
    }

    AbstractChannelHandlerContext ctx = head.next;
    for (;;) {

        if (ctx == null) {
            return null;
        }

        if (ctx.handler() == handler) {
            return ctx;
        }

        ctx = ctx.next;
    }
}

這里為了找到Handler對應(yīng)的context,照樣是通過依次遍歷雙向鏈表的方式,直到某一個(gè)context的Handler和當(dāng)前Handler相同,便找到了該節(jié)點(diǎn)

2.調(diào)整雙向鏈表指針刪除

DefaultChannelPipeline

private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
    assert ctx != head && ctx != tail;

    synchronized (this) {
        // 2.調(diào)整雙向鏈表指針刪除
        remove0(ctx);
    }
    // 3.回調(diào)用戶函數(shù)
    callHandlerRemoved0(ctx);
    return ctx;
}

private static void remove0(AbstractChannelHandlerContext ctx) {
    AbstractChannelHandlerContext prev = ctx.prev;
    AbstractChannelHandlerContext next = ctx.next;
    prev.next = next; // 1
    next.prev = prev; // 2
}

經(jīng)歷的過程要比添加節(jié)點(diǎn)要簡單,可以用下面一幅圖來表示

刪除節(jié)點(diǎn)過程

最后的結(jié)果為

刪除節(jié)點(diǎn)之后

結(jié)合這兩幅圖,可以很清晰地了解權(quán)限驗(yàn)證Handler的工作原理,另外,被刪除的節(jié)點(diǎn)因?yàn)闆]有對象引用到,果過段時(shí)間就會被gc自動(dòng)回收

3.回調(diào)用戶函數(shù)

private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
    try {
        ctx.handler().handlerRemoved(ctx);
    } finally {
        ctx.setRemoved();
    }
}

到了第三步,pipeline中的節(jié)點(diǎn)刪除完成,于是便開始回調(diào)用戶代碼 ctx.handler().handlerRemoved(ctx);,常見的代碼如下

AbstractChannelHandlerContext

public class DemoHandler extends SimpleChannelInboundHandler<...> {
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        // 節(jié)點(diǎn)被刪除完畢之后回調(diào)到此,可做一些資源清理
        // do something
    }
}

最后,將該節(jié)點(diǎn)的狀態(tài)設(shè)置為removed

final void setRemoved() {
    handlerState = REMOVE_COMPLETE;
}

removexxx系列的其他方法族大同小異,你可以根據(jù)上面的思路展開其他的系列方法,這里不再贅述

總結(jié)

1.以新連接創(chuàng)建為例,新連接創(chuàng)建的過程中創(chuàng)建channel,而在創(chuàng)建channel的過程中創(chuàng)建了該channel對應(yīng)的pipeline,創(chuàng)建完pipeline之后,自動(dòng)給該pipeline添加了兩個(gè)節(jié)點(diǎn),即ChannelHandlerContext,ChannelHandlerContext中有用pipeline和channel所有的上下文信息。

2.pipeline是雙向個(gè)鏈表結(jié)構(gòu),添加和刪除節(jié)點(diǎn)均只需要調(diào)整鏈表結(jié)構(gòu)

3.pipeline中的每個(gè)節(jié)點(diǎn)包著具體的處理器ChannelHandler,節(jié)點(diǎn)根據(jù)ChannelHandler的類型是ChannelInboundHandler還是ChannelOutboundHandler來判斷該節(jié)點(diǎn)屬于in還是out或者兩者都是

下一篇文章將繼續(xù)pipeline的分析,敬請期待!

如果你覺得看的不過癮,想系統(tǒng)學(xué)習(xí)Netty原理,那么你一定不要錯(cuò)過我的Netty源碼分析系列視頻:https://coding.imooc.com/class/230.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Channel 與 ChannelPipeline在Netty中每個(gè)Channel都有且僅有一個(gè)ChannelPi...
    水欣閱讀 2,422評論 0 1
  • 作者: 一字馬胡 轉(zhuǎn)載標(biāo)志 【2017-11-03】 更新日志 ChannelHandler Netty線程模型...
    一字馬胡閱讀 13,107評論 1 24
  • 前言 netty源碼分析之pipeline(一)中,我們已經(jīng)了解了pipeline在netty中所處的角色,像是一...
    簡書閃電俠閱讀 16,302評論 15 35
  • netty常用API學(xué)習(xí) netty簡介 Netty是基于Java NIO的網(wǎng)絡(luò)應(yīng)用框架. Netty是一個(gè)NIO...
    花丶小偉閱讀 6,134評論 0 20
  • 我不在乎別人怎樣評價(jià)我畫的東西。因?yàn)楫嫯媽τ谖襾碚f是觸摸時(shí)間和心靈的方式。我用我力所能及的方式,觸摸自己的心靈,這...
    小豬崽崽閱讀 678評論 2 8

友情鏈接更多精彩內(nèi)容