1、線程池
在業(yè)務channelHandler中,我們有可能會有一些導致同步阻塞的業(yè)務處理邏輯,比如數(shù)據(jù)庫操作,同步的調(diào)用第三方服務等,這時候,為了提升性能,我們可以采用線程池來提升并發(fā)處理能力。
線程池添加策略:
1、業(yè)務自定義線程池執(zhí)行業(yè)務channleHandler


2、Netty提供EventExecutorGroup機制來并行執(zhí)行ChannelHandler


從上面的圖中,可以看出來,Netty提供的的EventExecutorGroup針對的是一個NioEventLoop上的多個客戶端channel并發(fā)處理,如果是一個客戶端channel不斷的請求,那么這種并發(fā)處理并沒有用,服務端還是只有一個線程執(zhí)行客戶端業(yè)務。簡單的說,就是如果不加EventExecutorGroup,則NioEventLoop中的線程會不斷的輪詢處理它所管理的客戶端channel,加上EventExecutorGroup之后,NioEventLoop就可以把其所管理的客戶端channel丟給線程池EventExecutorGroup處理,在EventExecutorGroup線程池中,一個線程處理一個channel的讀寫操作。
但是業(yè)務自定的ExecutorService針對的是所有的客戶端channel業(yè)務請求并發(fā)處理,就算是一個客戶端channel的多個請求,也是可以并發(fā)處理的,這種鎖競爭會很激烈。
所以,在使用中,如果是客戶端的并發(fā)連接數(shù)channel多,且每個客戶端channel的業(yè)務請求阻塞不多,那么使用EventExecutorGroup;
如果客戶端并發(fā)連接數(shù)channel不多,但是客戶端channle的業(yè)務請求阻塞較多(復雜業(yè)務處理和數(shù)據(jù)庫處理),那么使用ExecutorService
補充:


2、ChannelHandler并發(fā)
ChannelHandler的一端是Netty NIO線程,另一端則是業(yè)務線程池(如上面的業(yè)務自定義線程池執(zhí)行業(yè)務channleHandler),那么在多線程并發(fā)場景下理解ChannelHandler的并發(fā)安全性就很重要了。
以下面這段代碼為例,在多線程環(huán)境下loss_connect_time調(diào)用安全么?
public class AcceptorIdleStateTrigger extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(AcceptorIdleStateTrigger.class);
//非線程安全
private int loss_connect_time = 0;
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
IdleState state=event.state();
if (state==IdleState.READER_IDLE) {
loss_connect_time++;
logger.info(String.valueOf(NettyConstants.SERVER_READ_IDEL_TIME_OUT*loss_connect_time)+"秒沒有接收到客戶端"+ FactoryMap.getDevNoByChannel((SocketChannel)ctx)+"的信息了");
if(loss_connect_time>=NettyConstants.MAX_LOSS_CONNECT_TIME){
logger.info("------------服務器主動關(guān)閉客戶端鏈路");
ctx.channel().close();
}
}
} else {
super.userEventTriggered(ctx,evt);
}
}
}
// 4、TCP連接通道channel建立時創(chuàng)建ChannelPipeline
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
// 為通道進行初始化: 數(shù)據(jù)傳輸過來的時候會進行攔截和執(zhí)行
ChannelPipeline pipeline = sc.pipeline();
pipeline.addLast(new ObjectDecoder(1024 * 1024,
ClassResolvers.weakCachingConcurrentResolver(this
.getClass().getClassLoader())));
pipeline.addLast(new ObjectEncoder());
//每個鏈路都有自己對應的業(yè)務Handler實例,不共享
pipeline.addLast(new ServerHandler());
}
如果ChannelHandler是非共享的,則它就是線程安全的。
原因:當鏈路完成初始化時會創(chuàng)建ChannelPipeline,每個Channel對應一個ChannelPipeline實例,業(yè)務的channelHandler會被實例化化并加入ChannelPipeline執(zhí)行。
一個channel對應一個channelPipeline,一個channelPipeline上串行執(zhí)行多個Handler。由于一個Channel只能被特定的NioEventLoop線程執(zhí)行,也就是說單個NioEventLoop線程串行執(zhí)行所有channelHandler,
因此ChannelHandler不會被并發(fā)調(diào)用,不用考慮線程安全問題。

跨鏈路共享的ChannelHandler
如果某個ChannelHandler需要全局共享,則只需要在Handler上添加@ChannelHandler.Sharable注解就可以被添加到多個ChannelPipeline上。
@ChannelHandler.Sharable
public class AcceptorIdleStateTrigger extends ChannelInboundHandlerAdapter {
//代碼略
}
當channelHandler被添加到多個ChannelPipeline,就會面臨多線程并發(fā)訪問問題,需要ChannelHandler保證自身的線程安全,例如通過原子類,讀寫鎖等方式對數(shù)據(jù)做并發(fā)保護。如果加鎖,可能會阻塞NioEventLoop線程,所以@ChannelHandler.Sharable要慎用。
使用場景
用戶自定義的ChannelHandler有兩種場景需要考慮并發(fā)安全。
- 1、通過@ChannelHandler.Sharable注解,多個ChannelPipeline共享的ChannelHandler,它將被多個NioEventLoop線程并發(fā)訪問。在這種場景下,用戶需要保證ChannelHandler共享的合理性,同時需要自己保證它的并發(fā)安全性,盡量通過原子類等方式降低鎖的開銷,防止阻塞NioEventLoop線程。

- 2、ChannelHandler沒有共享,但是在用戶的ChannelPipeline中的一些ChannelHandler綁定了線程池,這樣ChannelPipe的channelHandler就會被異步執(zhí)行。(第一節(jié)中的Netty提供EventExecutorGroup機制來并行執(zhí)行ChannelHandler)

3、內(nèi)存池
Netty從4.X引入內(nèi)存池機制,默認情況下,都是采用內(nèi)存池模式創(chuàng)建ByteBuf對象,這也是其性能表現(xiàn)超級優(yōu)異的一個原因之一,我們開發(fā)的過程中并不需要去考慮,通常情況下,我們是在編解碼(ByteToMessageDecoder)時,從ByteBuf中讀取數(shù)據(jù)到數(shù)組,然后進行解碼操作,那么我們需要手動釋放ByteBuf內(nèi)存么?不釋放會引起OOM么???
我們以下面一個例子來說明這個問題

在上圖這段代碼中,涉及到兩個ByteBuf,一個是框架分配的(msg),一個是業(yè)務中自定義的respMsg,如果出問題后,通常很容易想到是業(yè)務自定義byteBuf出問題,業(yè)務從內(nèi)存池中申請了ByteBuf,但是沒有主動釋放它,看過源碼后,可以發(fā)現(xiàn)這里并不會出問題,因為在調(diào)用ctx.writeAndFlush(respMsg)后,netty框架會主動釋放內(nèi)存。
那msg為啥會出問題呢,同樣看下源碼,尤其對比著看下io.netty.channel.SimpleChannelInboundHandler#channelRead的就知道了,繼承ChannelInboundHandlerAdapter實現(xiàn)其channelRead方法后,是需要用戶自己手動釋放內(nèi)存的。
所以,解決上面的問題有三種辦法,
第一種,其實通過框架來實現(xiàn),我們業(yè)務Handler實現(xiàn)SimpleChannelInboundHandler<ByteBuf>,在其io.netty.channel.SimpleChannelInboundHandler#channelRead方法中框架有做釋放,ReferenceCountUtil.release(msg);

第二種,就是在代碼的最后一行添加ReferenceCountUtil.release(msg)來釋放ByteBuf

第三種,在業(yè)務ChannelInboundHandler中調(diào)用ctx.fireChannelRead(msg)方法,讓請求消息繼續(xù)往后執(zhí)行,直到調(diào)用DefaultChannelPipeline的內(nèi)部類TailContext,由他來負責釋放請求消息;io.netty.channel.DefaultChannelPipeline.TailContext#channelRead -> onUnhandledInboundMessage
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
注:像上面圖中這樣直接使用ctx.writeAndFlush(byteBuf)時,在NettyServerInitializer.initChannel的ChannelPipeline中是不需要再添加解碼器Encoder()的,因為這里已經(jīng)是直接對寫入byteBuf了,看源碼就知道,在解碼器MessageToMessageEncoder中,也是把消息寫入byteBuf,
以源碼的ByteArrayEncoder為例,如下
@Sharable
public class ByteArrayEncoder extends MessageToMessageEncoder<byte[]> {
@Override
protected void encode(ChannelHandlerContext ctx, byte[] msg, List<Object> out) throws Exception {
out.add(Unpooled.wrappedBuffer(msg));
}
}
總結(jié):這個是很多netty初學者最容易出錯的地方,網(wǎng)上的資料寫的又經(jīng)?;ハ嗝堋J聦嵣衔覀冎灰凑?解碼decoder——業(yè)務處理handler——編碼encoder 這個流程來處理,基本上不會有內(nèi)存釋放的問題。
- 解碼decoder——根據(jù)業(yè)務,把byteBuf中數(shù)據(jù)取出來,然后轉(zhuǎn)換成你想要的數(shù)據(jù)(對象,或者數(shù)組);
- 業(yè)務處理handler(繼承SimpleChannelInboundHandler)——直接構(gòu)建返回對象,或者數(shù)據(jù),然后調(diào)用ctx.writeAndFlush(respMsg);
- 編碼encoder——通常不需要自定義,可以直接調(diào)用框架提供的,比如
ByteArrayEncoder、StringEncoder;
當我們有多種協(xié)議同時處理時,可以添加多個編碼器,也可以不添加,但是要在業(yè)務handler處理中,就直接寫入byteBuf