1 概述
在Netty事件模型中,在發(fā)生網(wǎng)絡(luò)事件(如Read,Write,Connect)等事件后,是通過注冊(cè)在Pipeline中的一個(gè)個(gè)Handler對(duì)事件進(jìn)行處理的,這種采用多Handler對(duì)事件進(jìn)行處理可以對(duì)事件的處理進(jìn)行邏輯分層,比如在經(jīng)典的編碼、解碼處理中,可以注冊(cè)一個(gè)專門的Handler對(duì)報(bào)文進(jìn)行編碼或者解碼,編碼或者解碼之后的報(bào)文再傳遞給下一個(gè)Handler進(jìn)行處理。另外Netty采用這種Pipeline這種串行的Handler處理各種事件,避免了線程的上下文切換,減少了多線程環(huán)境對(duì)鎖的依賴,也能在一定程度上提高性能。
ChannelPipeline是ChannelHandler的容器,負(fù)責(zé)管理一系列的ChannelHandler,對(duì)到來的事件進(jìn)行處理。
ChannelHandler則是對(duì)事件處理的一個(gè)個(gè)處理器,分為兩種類型,即ChannelInboundHandler和ChannelOutboundHandler,分別負(fù)責(zé)處理Netty中的Inbound和Outbound事件,從兩個(gè)接口中定義的函數(shù)可以知道Inbound和Outbound事件分別有哪些:


當(dāng)然在``接口源碼注釋中也列出了Inbound和Outbound方法:
Inbound event propagation methods:
- ChannelHandlerContext.fireChannelRegistered()
- ChannelHandlerContext.fireChannelActive()
- ChannelHandlerContext.fireChannelRead(Object)
- ChannelHandlerContext.fireChannelReadComplete()
- ChannelHandlerContext.fireExceptionCaught(Throwable)
- ChannelHandlerContext.fireUserEventTriggered(Object)
- ChannelHandlerContext.fireChannelWritabilityChanged()
- ChannelHandlerContext.fireChannelInactive()
- ChannelHandlerContext.fireChannelUnregistered()
Outbound event propagation methods:
- ChannelHandlerContext.bind(SocketAddress, ChannelPromise)
- ChannelHandlerContext.connect(SocketAddress, SocketAddress, hannelPromise)
- ChannelHandlerContext.write(Object, ChannelPromise)
- ChannelHandlerContext.flush()
- ChannelHandlerContext.read()
- ChannelHandlerContext.disconnect(ChannelPromise)
- ChannelHandlerContext.close(ChannelPromise)
- ChannelHandlerContext.deregister(ChannelPromise)
Pipeline中可以注冊(cè)多個(gè)InboundHandler和多個(gè)OutboundHandler,并使用雙向鏈表連接起來,對(duì)于收到的Inbound或Outbound事件會(huì)調(diào)用相關(guān)類型的Handler進(jìn)行處理,但是Inbound和Outbound事件執(zhí)行handler的順序是不一樣的,Inbound事件則是從前往后調(diào)用handler,最后一個(gè)被調(diào)用的是尾節(jié)點(diǎn);對(duì)于Outbound事件則是從后往前調(diào)用,最后一個(gè)執(zhí)行的是頭結(jié)點(diǎn)。
下面我們分別介紹Netty中ChannelPipeline和ChannelHandler的相關(guān)實(shí)現(xiàn)。
2 ChannelPipeline
2.1 接口(類)結(jié)構(gòu)
2.1.1 重要域
在這里,我們直接看ChannelPipeline在Netty中的默認(rèn)實(shí)現(xiàn)DefaultChannelPipeline。首先我們要說明一下,雖然說ChannelPipeline是ChannelHandler的容器,但是ChannelPipeline并不是直接持有ChannelHandler的,ChannelHandler會(huì)被封裝成ChannelHandlerContext,ChannelPipeline則使用雙鏈表持有一個(gè)個(gè)的ChannelHandlerContext。
我們先看DefaultChannelPipeline重要域:
//DefaultChannelPipeline
//該P(yáng)ipeline關(guān)聯(lián)的Channel,由構(gòu)造函數(shù)傳進(jìn)來
private final Channel channel;
//pipeline持有一些列的context,但是其頭和尾context是不可配置的,
//在構(gòu)造函數(shù)中被初始化
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
//這個(gè)平時(shí)沒怎么使用,但是在后文介紹如何向pipeline中添加handler時(shí)
//會(huì)介紹,pipeline默認(rèn)使用channel注冊(cè)的executor執(zhí)行任務(wù),但是也可以在
//向pipeline中加入handler時(shí)傳入EventExecutorGroup,然后從該線程組
//選出線程執(zhí)行任務(wù),傳入的線程組和第一次選出的線程executor會(huì)記錄在
//childExecutors,這樣后面可以保證在向該pipeline添加handler時(shí),如果
//配置了SINGLE_EVENTEXECUTOR_PER_GROUP參數(shù)為true,即單線程執(zhí)行任務(wù),
//childExecutors記錄每個(gè)group第一次選出的executor則可以在下次添加
//handler取出直接使用,保證單線,這個(gè)后面在介紹childExecutor方法時(shí)會(huì)
//再次介紹
private Map<EventExecutorGroup, EventExecutor> childExecutors;
關(guān)于上面提到的配置參數(shù)SINGLE_EVENTEXECUTOR_PER_GROUP,可見參考文章介紹如下:
Netty參數(shù),單線程執(zhí)行ChannelPipeline中的事件,默認(rèn)值為True。該值控制執(zhí)行ChannelPipeline中執(zhí)行ChannelHandler的線程。如果為True,整個(gè)pipeline由一個(gè)線程執(zhí)行,這樣不需要進(jìn)行線程切換以及線程同步,是Netty4的推薦做法;如果為False,ChannelHandler中的處理過程會(huì)由Group中的不同線程執(zhí)行。
但是個(gè)人認(rèn)為上面引用中的整個(gè)pipeline由一個(gè)線程執(zhí)行不太準(zhǔn)確,只能說如果傳入同一個(gè)group,且配置為true,則可以保證由該group中的同一個(gè)線程處理,而不是整個(gè)pipeline由一個(gè)線程執(zhí)行,這個(gè)后面介紹childExecutor再看。
2.1.2 內(nèi)部類
DefaultChannelPipeline重要內(nèi)部類有兩個(gè),也就是上面介紹到默認(rèn)切不可更改的head和tail節(jié)點(diǎn),分別為HeadContext和TailContext。
這里暫時(shí)不介紹這兩個(gè)類的具體實(shí)現(xiàn),我們?cè)诮榻BChannelHandler時(shí)再介紹。
2.2 重要方法
下面再看DefaultChannelPipeline重要方法,先介紹DefaultChannelPipeline的構(gòu)造函數(shù):
//DefaultChannelPipeline
protected DefaultChannelPipeline(Channel channel) {
//記錄該pipeline關(guān)聯(lián)的channel
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
//初始化head和tail節(jié)點(diǎn)
tail = new TailContext(this);
head = new HeadContext(this);
//將首尾節(jié)點(diǎn)連接起來
head.next = tail;
tail.prev = head;
}
除了構(gòu)造函數(shù),我們將DefaultChannelPipeline剩下的方法分為三類,第一類是負(fù)責(zé)向pipeline中添加、刪除或者替換handler,另一類負(fù)責(zé)觸發(fā)Inbound handler,最后一類則負(fù)責(zé)觸發(fā)Outbound handler:
2.2.1 Handler添加、刪除方法
這類方法主要包含如下方法:
向Pipeline中添加handler
- addAfter
- addBefore
- addFirst
- addLast
從Pipeline中移除handler
- remove
- removeFirst
- removeIfExists
- removeLast
替換Pipeline中的某個(gè)handler
- replace
為了節(jié)省篇幅,我們介紹一個(gè)方法的實(shí)現(xiàn):addLast(EventExecutorGroup executor, ChannelHandler... handlers)
//DefaultChannelPipeline
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
//將傳入的Handler一個(gè)個(gè)添加到pipeline中
addLast(executor, null, h);
}
return this;
}
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
//上面介紹過pipeline中連接的實(shí)際上是封裝了handler的
//context
newCtx = newContext(group, filterName(name, handler), handler);
//將該context連接到鏈表尾端,但是在tail之前
addLast0(newCtx);
// If the registered is false it means that the channel was not registered on an eventloop yet.
// In this case we add the context to the pipeline and add a task that will call
// ChannelHandler.handlerAdded(...) once the channel is registered.
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
//新建一個(gè)新的DefaultChannelHandlerContext
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
//處理傳入的線程組group
private EventExecutor childExecutor(EventExecutorGroup group) {
//如果沒有傳入線程組,則返回空
if (group == null) {
return null;
}
//獲取SINGLE_EVENTEXECUTOR_PER_GROUP配置,
//上面介紹過
Boolean pinEventExecutor = channel.config().getOption(ChannelOption.SINGLE_EVENTEXECUTOR_PER_GROUP);
//如果不是采用單一線程執(zhí)行,則調(diào)用next方法選出一個(gè)線程返回
if (pinEventExecutor != null && !pinEventExecutor) {
return group.next();
}
//如果執(zhí)行到這里,表示傳入了線程組,并且
//SINGLE_EVENTEXECUTOR_PER_GROUP配置為Ture
//獲取記錄的group和第一次從該線程組選出的線程
Map<EventExecutorGroup, EventExecutor> childExecutors = this.childExecutors;
//如果為空,則新建一個(gè)IdentityHashMap,這里為什么使用
//IdentityHashMap,可以看下IdentityHashMap的實(shí)現(xiàn)原理
if (childExecutors == null) {
// Use size of 4 as most people only use one extra EventExecutor.
childExecutors = this.childExecutors = new IdentityHashMap<EventExecutorGroup, EventExecutor>(4);
}
// Pin one of the child executors once and remember it so that the same child executor
// is used to fire events for the same channel.
//找出group第一次調(diào)用時(shí)選出的線程
EventExecutor childExecutor = childExecutors.get(group);
//為空的話,則表示第一次使用該group,則從該group選出一個(gè)線程,
//并放入該Map中
if (childExecutor == null) {
childExecutor = group.next();
childExecutors.put(group, childExecutor);
}
return childExecutor;
}
private void addLast0(AbstractChannelHandlerContext newCtx) {
//可見雖然是addLast,但是還是放在tail節(jié)點(diǎn)之前
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
private void addFirst0(AbstractChannelHandlerContext newCtx) {
//addFirst0也是同理,添加到列表最前面,但是在head節(jié)點(diǎn)之后
AbstractChannelHandlerContext nextCtx = head.next;
newCtx.prev = head;
newCtx.next = nextCtx;
head.next = newCtx;
nextCtx.prev = newCtx;
}
2.2.2 Inbound事件相關(guān)方法
觸發(fā)處理Inbound事件Handler的相關(guān)方法如下:
//DefaultChannelPipeline
@Override
public final ChannelPipeline fireChannelActive() {
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}
@Override
public final ChannelPipeline fireChannelInactive() {
AbstractChannelHandlerContext.invokeChannelInactive(head);
return this;
}
@Override
public final ChannelPipeline fireExceptionCaught(Throwable cause) {
AbstractChannelHandlerContext.invokeExceptionCaught(head, cause);
return this;
}
@Override
public final ChannelPipeline fireUserEventTriggered(Object event) {
AbstractChannelHandlerContext.invokeUserEventTriggered(head, event);
return this;
}
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
@Override
public final ChannelPipeline fireChannelReadComplete() {
AbstractChannelHandlerContext.invokeChannelReadComplete(head);
return this;
}
@Override
public final ChannelPipeline fireChannelWritabilityChanged() {
AbstractChannelHandlerContext.invokeChannelWritabilityChanged(head);
return this;
}
從上面的方法定義可知,Inbound事件第一個(gè)被觸發(fā)的Handler是head節(jié)點(diǎn)對(duì)應(yīng)的handler,我們舉例看下fireChannelRead方法的具體實(shí)現(xiàn):
//DefaultChannelPipeline
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
//AbstractChannelHandlerContext
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
//獲取該context對(duì)應(yīng)的線程
EventExecutor executor = next.executor();
//如果當(dāng)前線程就是該context的線程,則直接在該線程執(zhí)行,否則
//將該任務(wù)放入線程的任務(wù)隊(duì)列中
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
@Override
public EventExecutor executor() {
//如果線程為空的話,默認(rèn)返回channel注冊(cè)的線程
if (executor == null) {
return channel().eventLoop();
} else {
return executor;
}
}
//獲取context封裝的handler,并執(zhí)行相應(yīng)的方法
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
//獲取下一個(gè)Inbound handler并執(zhí)行
fireChannelRead(msg);
}
}
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
}
//對(duì)于Inbound事件,從當(dāng)前context出發(fā),從前往后找
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
除此之外,在一個(gè)handler處理完之后,想調(diào)用下一個(gè)handler繼續(xù)處理,可以調(diào)用如下方法:
//AbstractChannelHandlerContext
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
//調(diào)用findContextInbound從找到當(dāng)前節(jié)點(diǎn)后面的第一個(gè)Inbound類型的
//handler,并觸發(fā)相應(yīng)的函數(shù)
invokeChannelRead(findContextInbound(), msg);
return this;
}
2.2.3 Outbound事件相關(guān)方法
Outbond事件發(fā)生時(shí),會(huì)觸發(fā)Outbound類型的handler,流程和上面Inbound事件觸發(fā)Inbound handler的流程類似,這里不再贅述,但是要注意的是,Outbound觸發(fā)Outbound類型的handler是從后向前調(diào)用的,最后一個(gè)調(diào)用head。
@Override
public final ChannelFuture bind(SocketAddress localAddress) {
return tail.bind(localAddress);
}
@Override
public final ChannelFuture connect(SocketAddress remoteAddress) {
return tail.connect(remoteAddress);
}
@Override
public final ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
return tail.connect(remoteAddress, localAddress);
}
@Override
public final ChannelFuture disconnect() {
return tail.disconnect();
}
@Override
public final ChannelFuture close() {
return tail.close();
}
...
這里大概列一下DefaultChannelPipeline.write方法的源碼:
//DefaultChannelPipeline
@Override
public final ChannelFuture write(Object msg, ChannelPromise promise) {
return tail.write(msg, promise);
}
@Override
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}
try {
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return promise;
}
} catch (RuntimeException e) {
ReferenceCountUtil.release(msg);
throw e;
}
write(msg, false, promise);
return promise;
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
//從后往前找outbound handler
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}
private AbstractChannelHandlerContext findContextOutbound() {
//從當(dāng)前節(jié)點(diǎn),往前找outbound handler
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
3 幾個(gè)重要的ChannelHandler
相信通過上面的介紹,已經(jīng)知道了ChannelHandler是怎么被調(diào)用執(zhí)行相關(guān)方法的。下面我們介紹幾個(gè)重要的ChannelHandler實(shí)現(xiàn)。
HeadContext
HeadContext擴(kuò)展了AbstractChannelHandlerContext,也實(shí)現(xiàn)了ChannelOutboundHandler,ChannelInboundHandler,既是Inbound handler,也是outbound handler。
其作為第一個(gè)被調(diào)用的Inbound handler,其Inbound相關(guān)方法沒有做什么實(shí)際工作,僅僅觸發(fā)下一個(gè)handler,如
//HeadContext
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
而作為最后一個(gè)被調(diào)用的outbound handler,其Outbound相關(guān)方法則進(jìn)行實(shí)際的操作,如:
//HeadContext
//調(diào)用unsafe.flush實(shí)際向channel寫數(shù)據(jù)
@Override
public void flush(ChannelHandlerContext ctx) throws Exception {
unsafe.flush();
}
-
TailContext
TailContext擴(kuò)展了AbstractChannelHandlerContext,并實(shí)現(xiàn)了ChannelInboundHandler接口,是一個(gè)Inbound handler,因?yàn)镮nbound事件從前往后調(diào)用Inbound handler,所以TailContext是最后一個(gè)被調(diào)用的Inbound handler,這里我們盡看一個(gè)有意思的方法:
//TailContext
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//如果channelRead事件能夠成功被傳遞到tail節(jié)點(diǎn),會(huì)執(zhí)行此方法
//作為DefaultChannelPipeline內(nèi)部類,調(diào)用DefaultChannelPipeline
//的onUnhandledInboundMessage方法
onUnhandledInboundMessage(msg);
}
//DefaultChannelPipeline
protected void onUnhandledInboundMessage(Object msg) {
//記錄日志,告訴用戶msg被傳到了tail節(jié)點(diǎn),需要檢查是否沒有
//在tail節(jié)點(diǎn)之前配置正確的inbound進(jìn)行處理
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
//如下面源碼所示,如果是ReferenceCounted類型,嘗試進(jìn)行
//釋放操作
ReferenceCountUtil.release(msg);
}
}
//ReferenceCountUtil
public static boolean release(Object msg) {
if (msg instanceof ReferenceCounted) {
return ((ReferenceCounted) msg).release();
}
return false;
}
-
ChannelInitializer
ChannelInitializer主要用于channel初始化,一般用于在channelRegistered方法中向channel的pipeline中注冊(cè)相關(guān)的handler,ChannelInitializer的特別之處是注冊(cè)完handler之后,會(huì)將自己從pipeline的handler鏈表中刪除,僅僅會(huì)被執(zhí)行一次:
//ChannelInitializer
@SuppressWarnings("unchecked")
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
// Normally this method will never be called as handlerAdded(...) should call initChannel(...) and remove
// the handler.
//調(diào)用初始化函數(shù)
if (initChannel(ctx)) {
// we called initChannel(...) so we need to call now pipeline.fireChannelRegistered() to ensure we not
// miss an event.
ctx.pipeline().fireChannelRegistered();
} else {
// Called initChannel(...) before which is the expected behavior, so just forward the event.
ctx.fireChannelRegistered();
}
}
@SuppressWarnings("unchecked")
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
try {
//調(diào)用提供給子類重寫的初始化函數(shù)
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
// We do so to prevent multiple calls to initChannel(...).
exceptionCaught(ctx, cause);
} finally {
//從pipeline的handler鏈表中移除自己
remove(ctx);
}
return true;
}
return false;
}
//從pipeline的handler鏈表中移除自己
private void remove(ChannelHandlerContext ctx) {
try {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
} finally {
initMap.remove(ctx);
}
}