通過前面的源碼系列文章中的netty reactor線程三部曲,我們已經(jīng)知道,netty的reactor線程就像是一個(gè)發(fā)動(dòng)機(jī),驅(qū)動(dòng)著整個(gè)netty框架的運(yùn)行,而服務(wù)端的綁定和新連接的建立正是發(fā)動(dòng)機(jī)的導(dǎo)火線,將發(fā)動(dòng)機(jī)點(diǎn)燃
netty在服務(wù)端端口綁定和新連接建立的過程中會建立相應(yīng)的channel,而與channel的動(dòng)作密切相關(guān)的是pipeline這個(gè)概念,pipeline像是可以看作是一條流水線,原始的原料(字節(jié)流)進(jìn)來,經(jīng)過加工,最后輸出
本文,我將以新連接的建立為例分為以下幾個(gè)部分給你介紹netty中的pipeline是怎么玩轉(zhuǎn)起來的
- pipeline 初始化
- pipeline 添加節(jié)點(diǎn)
- pipeline 刪除節(jié)點(diǎn)
pipeline 初始化
在新連接的建立這篇文章中,我們已經(jīng)知道了創(chuàng)建NioSocketChannel的時(shí)候會將netty的核心組件創(chuàng)建出來

pipeline是其中的一員,在下面這段代碼中被創(chuàng)建
AbstractChannel
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
AbstractChannel
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
DefaultChannelPipeline
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
pipeline中保存了channel的引用,創(chuàng)建完pipeline之后,整個(gè)pipeline是這個(gè)樣子的

pipeline中的每個(gè)節(jié)點(diǎn)是一個(gè)ChannelHandlerContext對象,每個(gè)context節(jié)點(diǎn)保存了它包裹的執(zhí)行器 ChannelHandler 執(zhí)行操作所需要的上下文,其實(shí)就是pipeline,因?yàn)閜ipeline包含了channel的引用,可以拿到所有的context信息
默認(rèn)情況下,一條pipeline會有兩個(gè)節(jié)點(diǎn),head和tail,后面的文章我們具體分析這兩個(gè)特殊的節(jié)點(diǎn),今天我們重點(diǎn)放在pipeline
pipeline添加節(jié)點(diǎn)
下面是一段非常常見的客戶端代碼
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new Spliter())
p.addLast(new Decoder());
p.addLast(new BusinessHandler())
p.addLast(new Encoder());
}
});
首先,用一個(gè)spliter將來源TCP數(shù)據(jù)包拆包,然后將拆出來的包進(jìn)行decoder,傳入業(yè)務(wù)處理器BusinessHandler,業(yè)務(wù)處理完encoder,輸出
整個(gè)pipeline結(jié)構(gòu)如下

我用兩種顏色區(qū)分了一下pipeline中兩種不同類型的節(jié)點(diǎn),一個(gè)是 ChannelInboundHandler,處理inBound事件,最典型的就是讀取數(shù)據(jù)流,加工處理;還有一種類型的Handler是 ChannelOutboundHandler, 處理outBound事件,比如當(dāng)調(diào)用writeAndFlush()類方法時(shí),就會經(jīng)過該種類型的handler
不管是哪種類型的handler,其外層對象 ChannelHandlerContext 之間都是通過雙向鏈表連接,而區(qū)分一個(gè) ChannelHandlerContext到底是in還是out,在添加節(jié)點(diǎn)的時(shí)候我們就可以看到netty是怎么處理的
DefaultChannelPipeline
@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
for (ChannelHandler h: handlers) {
addLast(executor, null, h);
}
return this;
}
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
// 1.檢查是否有重復(fù)handler
checkMultiplicity(handler);
// 2.創(chuàng)建節(jié)點(diǎn)
newCtx = newContext(group, filterName(name, handler), handler);
// 3.添加節(jié)點(diǎn)
addLast0(newCtx);
}
// 4.回調(diào)用戶方法
callHandlerAdded0(handler);
return this;
}
這里簡單地用synchronized方法是為了防止多線程并發(fā)操作pipeline底層的雙向鏈表
我們還是逐步分析上面這段代碼
1.檢查是否有重復(fù)handler
在用戶代碼添加一條handler的時(shí)候,首先會查看該handler有沒有添加過
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
h.added = true;
}
}
netty使用一個(gè)成員變量added標(biāo)識一個(gè)channel是否已經(jīng)添加,上面這段代碼很簡單,如果當(dāng)前要添加的Handler是非共享的,并且已經(jīng)添加過,那就拋出異常,否則,標(biāo)識該handler已經(jīng)添加
由此可見,一個(gè)Handler如果是sharable的,就可以無限次被添加到pipeline中,我們客戶端代碼如果要讓一個(gè)Handler被共用,只需要加一個(gè)@Sharable標(biāo)注即可,如下
@Sharable
public class BusinessHandler {
}
而如果Handler是sharable的,一般就通過spring的注入的方式使用,不需要每次都new 一個(gè)
isSharable() 方法正是通過該Handler對應(yīng)的類是否標(biāo)注@Sharable來實(shí)現(xiàn)的
ChannelHandlerAdapter
public boolean isSharable() {
Class<?> clazz = getClass();
Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
Boolean sharable = cache.get(clazz);
if (sharable == null) {
sharable = clazz.isAnnotationPresent(Sharable.class);
cache.put(clazz, sharable);
}
return sharable;
}
這里也可以看到,netty為了性能優(yōu)化到極致,還使用了ThreadLocal來緩存Handler的狀態(tài),高并發(fā)海量連接下,每次有新連接添加Handler都會創(chuàng)建調(diào)用此方法
2.創(chuàng)建節(jié)點(diǎn)
回到主流程,看創(chuàng)建上下文這段代碼
newCtx = newContext(group, filterName(name, handler), handler);
這里我們需要先分析 filterName(name, handler) 這段代碼,這個(gè)函數(shù)用于給handler創(chuàng)建一個(gè)唯一性的名字
private String filterName(String name, ChannelHandler handler) {
if (name == null) {
return generateName(handler);
}
checkDuplicateName(name);
return name;
}
顯然,我們傳入的name為null,netty就給我們生成一個(gè)默認(rèn)的name,否則,檢查是否有重名,檢查通過的話就返回
netty創(chuàng)建默認(rèn)name的規(guī)則為 簡單類名#0,下面我們來看些具體是怎么實(shí)現(xiàn)的
private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =
new FastThreadLocal<Map<Class<?>, String>>() {
@Override
protected Map<Class<?>, String> initialValue() throws Exception {
return new WeakHashMap<Class<?>, String>();
}
};
private String generateName(ChannelHandler handler) {
// 先查看緩存中是否有生成過默認(rèn)name
Map<Class<?>, String> cache = nameCaches.get();
Class<?> handlerType = handler.getClass();
String name = cache.get(handlerType);
// 沒有生成過,就生成一個(gè)默認(rèn)name,加入緩存
if (name == null) {
name = generateName0(handlerType);
cache.put(handlerType, name);
}
// 生成完了,還要看默認(rèn)name有沒有沖突
if (context0(name) != null) {
String baseName = name.substring(0, name.length() - 1);
for (int i = 1;; i ++) {
String newName = baseName + i;
if (context0(newName) == null) {
name = newName;
break;
}
}
}
return name;
}
netty使用一個(gè) FastThreadLocal(后面的文章會細(xì)說)變量來緩存Handler的類和默認(rèn)名稱的映射關(guān)系,在生成name的時(shí)候,首先查看緩存中有沒有生成過默認(rèn)name(簡單類名#0),如果沒有生成,就調(diào)用generateName0()生成默認(rèn)name,然后加入緩存
接下來還需要檢查name是否和已有的name有沖突,調(diào)用context0(),查找pipeline里面有沒有對應(yīng)的context
private AbstractChannelHandlerContext context0(String name) {
AbstractChannelHandlerContext context = head.next;
while (context != tail) {
if (context.name().equals(name)) {
return context;
}
context = context.next;
}
return null;
}
context0()方法鏈表遍歷每一個(gè) ChannelHandlerContext,只要發(fā)現(xiàn)某個(gè)context的名字與待添加的name相同,就返回該context,最后拋出異常,可以看到,這個(gè)其實(shí)是一個(gè)線性搜索的過程
如果context0(name) != null 成立,說明現(xiàn)有的context里面已經(jīng)有了一個(gè)默認(rèn)name,那么就從 簡單類名#1 往上一直找,直到找到一個(gè)唯一的name,比如簡單類名#3
如果用戶代碼在添加Handler的時(shí)候指定了一個(gè)name,那么要做到事僅僅為檢查一下是否有重復(fù)
private void checkDuplicateName(String name) {
if (context0(name) != null) {
throw new IllegalArgumentException("Duplicate handler name: " + name);
}
}
處理完name之后,就進(jìn)入到創(chuàng)建context的過程,由前面的調(diào)用鏈得知,group為null,因此childExecutor(group)也返回null
DefaultChannelPipeline
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
private EventExecutor childExecutor(EventExecutorGroup group) {
if (group == null) {
return null;
}
//..
}
DefaultChannelHandlerContext
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
構(gòu)造函數(shù)中,DefaultChannelHandlerContext將參數(shù)回傳到父類,保存Handler的引用,進(jìn)入到其父類
AbstractChannelHandlerContext
AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor, String name,
boolean inbound, boolean outbound) {
this.name = ObjectUtil.checkNotNull(name, "name");
this.pipeline = pipeline;
this.executor = executor;
this.inbound = inbound;
this.outbound = outbound;
}
netty中用兩個(gè)字段來表示這個(gè)channelHandlerContext屬于inBound還是outBound,或者兩者都是,兩個(gè)boolean是通過下面兩個(gè)小函數(shù)來判斷(見上面一段代碼)
DefaultChannelHandlerContext
private static boolean isInbound(ChannelHandler handler) {
return handler instanceof ChannelInboundHandler;
}
private static boolean isOutbound(ChannelHandler handler) {
return handler instanceof ChannelOutboundHandler;
}
通過instanceof關(guān)鍵字根據(jù)接口類型來判斷,因此,如果一個(gè)Handler實(shí)現(xiàn)了兩類接口,那么他既是一個(gè)inBound類型的Handler,又是一個(gè)outBound類型的Handler,比如下面這個(gè)類

常用的,將decode操作和encode操作合并到一起的codec,一般會繼承 MessageToMessageCodec,而MessageToMessageCodec就是繼承ChannelDuplexHandler
MessageToMessageCodec
public abstract class MessageToMessageCodec<INBOUND_IN, OUTBOUND_IN> extends ChannelDuplexHandler {
protected abstract void encode(ChannelHandlerContext ctx, OUTBOUND_IN msg, List<Object> out)
throws Exception;
protected abstract void decode(ChannelHandlerContext ctx, INBOUND_IN msg, List<Object> out)
throws Exception;
}
context 創(chuàng)建完了之后,接下來終于要將創(chuàng)建完畢的context加入到pipeline中去了
3.添加節(jié)點(diǎn)
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev; // 1
newCtx.next = tail; // 2
prev.next = newCtx; // 3
tail.prev = newCtx; // 4
}
用下面這幅圖可見簡單的表示這段過程,說白了,其實(shí)就是一個(gè)雙向鏈表的插入操作

操作完畢,該context就加入到pipeline中

到這里,pipeline添加節(jié)點(diǎn)的操作就完成了,你可以根據(jù)此思路掌握所有的addxxx()系列方法
4.回調(diào)用戶方法
AbstractChannelHandlerContext
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
ctx.handler().handlerAdded(ctx);
ctx.setAddComplete();
}
到了第四步,pipeline中的新節(jié)點(diǎn)添加完成,于是便開始回調(diào)用戶代碼 ctx.handler().handlerAdded(ctx);,常見的用戶代碼如下
AbstractChannelHandlerContext
public class DemoHandler extends SimpleChannelInboundHandler<...> {
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
// 節(jié)點(diǎn)被添加完畢之后回調(diào)到此
// do something
}
}
接下來,設(shè)置該節(jié)點(diǎn)的狀態(tài)
AbstractChannelHandlerContext
final void setAddComplete() {
for (;;) {
int oldState = handlerState;
if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
return;
}
}
}
用cas修改節(jié)點(diǎn)的狀態(tài)至:REMOVE_COMPLETE(說明該節(jié)點(diǎn)已經(jīng)被移除) 或者 ADD_COMPLETE
pipeline刪除節(jié)點(diǎn)
netty 有個(gè)最大的特性之一就是Handler可插拔,做到動(dòng)態(tài)編織pipeline,比如在首次建立連接的時(shí)候,需要通過進(jìn)行權(quán)限認(rèn)證,在認(rèn)證通過之后,就可以將此context移除,下次pipeline在傳播事件的時(shí)候就就不會調(diào)用到權(quán)限認(rèn)證處理器
下面是權(quán)限認(rèn)證Handler最簡單的實(shí)現(xiàn),第一個(gè)數(shù)據(jù)包傳來的是認(rèn)證信息,如果校驗(yàn)通過,就刪除此Handler,否則,直接關(guān)閉連接
public class AuthHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf data) throws Exception {
if (verify(authDataPacket)) {
ctx.pipeline().remove(this);
} else {
ctx.close();
}
}
private boolean verify(ByteBuf byteBuf) {
//...
}
}
重點(diǎn)就在 ctx.pipeline().remove(this) 這段代碼
@Override
public final ChannelPipeline remove(ChannelHandler handler) {
remove(getContextOrDie(handler));
return this;
}
remove操作相比add簡單不少,分為三個(gè)步驟:
1.找到待刪除的節(jié)點(diǎn)
2.調(diào)整雙向鏈表指針刪除
3.回調(diào)用戶函數(shù)
1.找到待刪除的節(jié)點(diǎn)
DefaultChannelPipeline
private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
if (ctx == null) {
throw new NoSuchElementException(handler.getClass().getName());
} else {
return ctx;
}
}
@Override
public final ChannelHandlerContext context(ChannelHandler handler) {
if (handler == null) {
throw new NullPointerException("handler");
}
AbstractChannelHandlerContext ctx = head.next;
for (;;) {
if (ctx == null) {
return null;
}
if (ctx.handler() == handler) {
return ctx;
}
ctx = ctx.next;
}
}
這里為了找到Handler對應(yīng)的context,照樣是通過依次遍歷雙向鏈表的方式,直到某一個(gè)context的Handler和當(dāng)前Handler相同,便找到了該節(jié)點(diǎn)
2.調(diào)整雙向鏈表指針刪除
DefaultChannelPipeline
private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
assert ctx != head && ctx != tail;
synchronized (this) {
// 2.調(diào)整雙向鏈表指針刪除
remove0(ctx);
}
// 3.回調(diào)用戶函數(shù)
callHandlerRemoved0(ctx);
return ctx;
}
private static void remove0(AbstractChannelHandlerContext ctx) {
AbstractChannelHandlerContext prev = ctx.prev;
AbstractChannelHandlerContext next = ctx.next;
prev.next = next; // 1
next.prev = prev; // 2
}
經(jīng)歷的過程要比添加節(jié)點(diǎn)要簡單,可以用下面一幅圖來表示

最后的結(jié)果為

結(jié)合這兩幅圖,可以很清晰地了解權(quán)限驗(yàn)證Handler的工作原理,另外,被刪除的節(jié)點(diǎn)因?yàn)闆]有對象引用到,果過段時(shí)間就會被gc自動(dòng)回收
3.回調(diào)用戶函數(shù)
private void callHandlerRemoved0(final AbstractChannelHandlerContext ctx) {
try {
ctx.handler().handlerRemoved(ctx);
} finally {
ctx.setRemoved();
}
}
到了第三步,pipeline中的節(jié)點(diǎn)刪除完成,于是便開始回調(diào)用戶代碼 ctx.handler().handlerRemoved(ctx);,常見的代碼如下
AbstractChannelHandlerContext
public class DemoHandler extends SimpleChannelInboundHandler<...> {
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
// 節(jié)點(diǎn)被刪除完畢之后回調(diào)到此,可做一些資源清理
// do something
}
}
最后,將該節(jié)點(diǎn)的狀態(tài)設(shè)置為removed
final void setRemoved() {
handlerState = REMOVE_COMPLETE;
}
removexxx系列的其他方法族大同小異,你可以根據(jù)上面的思路展開其他的系列方法,這里不再贅述
總結(jié)
1.以新連接創(chuàng)建為例,新連接創(chuàng)建的過程中創(chuàng)建channel,而在創(chuàng)建channel的過程中創(chuàng)建了該channel對應(yīng)的pipeline,創(chuàng)建完pipeline之后,自動(dòng)給該pipeline添加了兩個(gè)節(jié)點(diǎn),即ChannelHandlerContext,ChannelHandlerContext中有用pipeline和channel所有的上下文信息。
2.pipeline是雙向個(gè)鏈表結(jié)構(gòu),添加和刪除節(jié)點(diǎn)均只需要調(diào)整鏈表結(jié)構(gòu)
3.pipeline中的每個(gè)節(jié)點(diǎn)包著具體的處理器ChannelHandler,節(jié)點(diǎn)根據(jù)ChannelHandler的類型是ChannelInboundHandler還是ChannelOutboundHandler來判斷該節(jié)點(diǎn)屬于in還是out或者兩者都是
下一篇文章將繼續(xù)pipeline的分析,敬請期待!
如果你覺得看的不過癮,想系統(tǒng)學(xué)習(xí)Netty原理,那么你一定不要錯(cuò)過我的Netty源碼分析系列視頻:https://coding.imooc.com/class/230.html