這一章節(jié),我們通過例子學習netty的一些高級特性。
1、netty客戶端流控
在有些場景下,由于各種原因,會導致客戶端消息發(fā)送積壓,進而導致OOM。
- 1、當netty服務端并發(fā)壓力過大,超過了服務端的處理能力時,channel中的消息服務端不能及時消費,這時channel堵塞,客戶端消息就會堆積在發(fā)送隊列中
- 2、網(wǎng)絡瓶頸,當客戶端發(fā)送速度超過網(wǎng)絡鏈路處理能力,會導致客戶端發(fā)送隊列積壓
- 3、當對端讀取速度小于己方發(fā)送速度,導致自身TCP發(fā)送緩沖區(qū)滿,頻繁發(fā)生write 0字節(jié)時,待發(fā)送消息會在netty發(fā)送隊列中排隊
這三種情況下,如果客戶端沒有流控保護,這時候就很容易發(fā)生內(nèi)存泄露。
原因:
在我們調(diào)用channel的write和writeAndFlush時
io.netty.channel.AbstractChannelHandlerContext#writeAndFlush(java.lang.Object, io.netty.channel.ChannelPromise),如果發(fā)送方為業(yè)務線程,則將發(fā)送操作封裝成WriteTask(繼承Runnable),放到Netty的NioEventLoop中執(zhí)行,當NioEventLoop無法完成如此多的消息的發(fā)送的時候,發(fā)送任務隊列積壓,進而導致內(nèi)存泄漏。
解決方案:
為了防止在高并發(fā)場景下,由于服務端處理慢導致的客戶端消息積壓,客戶端需要做并發(fā)保護,防止自身發(fā)生消息積壓。Netty提供了一個高低水位機制,可以實現(xiàn)客戶端精準的流控。
io.netty.channel.ChannelConfig#setWriteBufferHighWaterMark 高水位
io.netty.channel.ChannelConfig#setWriteBufferLowWaterMark 低水位
當發(fā)送隊列待發(fā)送的字節(jié)數(shù)組達到高水位時,對應的channel就變?yōu)椴豢蓪憼顟B(tài),由于高水位并不影響業(yè)務線程調(diào)用write方法把消息加入到待發(fā)送隊列,因此在消息發(fā)送時要先對channel的狀態(tài)進行判斷(ctx.channel().isWritable)。
這里涉及到的知識點是netty的消息發(fā)送機制。
netty的消息發(fā)送機制
業(yè)務調(diào)用write方法后,經(jīng)過ChannelPipeline職責鏈處理,消息被投遞到發(fā)送緩沖區(qū)待發(fā)送,調(diào)用flush之后會執(zhí)行真正的發(fā)送操作,底層通過調(diào)用Java NIO的SocketChannel進行非阻塞write操作,將消息發(fā)送到網(wǎng)絡上,

當用戶線程(業(yè)務線程)發(fā)起write操作時,Netty會進行判斷,如果發(fā)現(xiàn)不少NioEventLoop(I/O線程),則將發(fā)送消息封裝成WriteTask,放入NioEventLoop的任務隊列,由NioEventLoop線程執(zhí)行,代碼如下
io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, io.netty.channel.ChannelPromise)
@Override
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}
if (isNotValidPromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return promise;
}
write(msg, true, promise);
return promise;
}
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}
private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromise promise, Object msg) {
try {
//這里的executor執(zhí)行的是netty自己實現(xiàn)的SingleThreadEventExecutor#execute方法,
executor.execute(runnable);
} catch (Throwable cause) {
try {
promise.setFailure(cause);
} finally {
if (msg != null) {
ReferenceCountUtil.release(msg);
}
}
}
}
io.netty.util.concurrent.SingleThreadEventExecutor#execute
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
Netty的NioEventLoop線程內(nèi)部維護了一個Queue<Runnable> taskQuue,除了處理網(wǎng)絡IO讀寫操作,同時還負責執(zhí)行網(wǎng)絡讀寫相關的Task,NioEventLoop遍歷taskQueue,執(zhí)行消息發(fā)送任務,代碼調(diào)用入路徑如下,具體的就不貼了,太長了
io.netty.channel.nio.NioEventLoop#run
-----> io.netty.util.concurrent.SingleThreadEventExecutor#runAllTasks(long)
----->io.netty.util.concurrent.AbstractEventExecutor#safeExecute
這里safeExecute執(zhí)行的task,就是前面write寫入時包裝的AbstractWriteTask,AbstractWriteTask的run中
io.netty.channel.AbstractChannelHandlerContext.AbstractWriteTask#run
經(jīng)過一些系統(tǒng)處理操作,最終會調(diào)用io.netty.channel.ChannelOutboundBuffer#addMessage方法,將發(fā)送消息加入發(fā)送隊列(鏈表)。
我們上面寫的流程從NioSocketChannel到ChnnelOutbountBuffer,實際上在這個過程中,為了對發(fā)送速度和消息積壓數(shù)進行控制,Netty還提供了高低水位機制,當消息隊列中積壓的待發(fā)送消息總字節(jié)數(shù)到達高水位時,修改Channel的狀態(tài)為不可寫,并發(fā)送通知事件;當消息發(fā)送完成后,對低水位進行判斷,如果當前積壓的待發(fā)送字節(jié)數(shù)低于低水位時,則修改channel狀態(tài)為可寫,并發(fā)送通知事件,具體代碼見下
io.netty.channel.ChannelOutboundBuffer#incrementPendingOutboundBytes(long);
io.netty.channel.ChannelOutboundBuffer#decrementPendingOutboundBytes(long);

總結:在實際項目中,根據(jù)業(yè)務QPS規(guī)劃,客戶端處理性能、網(wǎng)絡帶寬、鏈路數(shù)、消息平均碼流大小等綜合因數(shù),設置Netty高水位(setWriteBufferHighWaterMark)值,可以防止在發(fā)送隊列處于高水位時繼續(xù)發(fā)送消息,導致積壓更嚴重,甚至發(fā)生內(nèi)存泄漏。在系統(tǒng)中合理利用Netty的高低水位機制做消息發(fā)送的流控,既可以保護自身,同時又能減輕服務端的壓力,可以提升系統(tǒng)的可靠性。
那么代碼中,怎么使用呢?

同時在業(yè)務發(fā)送消息時,添加socketChannel.isWritable()是否可以發(fā)送判斷
public static boolean sendMessage(String clientId,Object message){
if(StringUtils.isEmpty(clientId)){
log.error(" clientId 為空,找不到客戶端!");
return false;
}
SocketChannel socketChannel = FactoryMap.getChannelByDevNo(clientId);
if(socketChannel !=null ){
if(socketChannel.isWritable()){
socketChannel.writeAndFlush(message);
//更新數(shù)據(jù)庫中消息狀態(tài)
return true;
}else {
log.error("channel不可寫");
return false;
}
}else {
log.error(" 客戶端未連接服務器!發(fā)送消息失敗!{}",clientId);
}
return false;
}
2、netty服務端 流量整形
前面講的流控(高低水位控制),主要是根據(jù)發(fā)送消息隊列積壓的大小來控制客戶端channel的寫狀態(tài),然后用戶手動根據(jù)channel.isWritable()來控制消息是否發(fā)送,用戶可以手動控制消息不能及時發(fā)送后的處理方案(比如,過期、超時)。通常用在客戶端比較多。
流量整形呢,是一種主動調(diào)整流量輸出速度的措施,一個典型的應用是基于下游網(wǎng)絡節(jié)點的TPS指標控制本地流量的輸出。大多數(shù)商用系統(tǒng)都由多個網(wǎng)元或者部件組成,例如參與短信互動,會涉及手機,基站,短信中心,短信網(wǎng)關,SP/CP等網(wǎng)元,不同網(wǎng)元或者部件的處理性能不同,為了防止突發(fā)的業(yè)務洪峰的 導致下游網(wǎng)元被沖垮,有時候需要消停提供流量整形功能。

Netty流量整形的主要作用:
1、防止由于上下游網(wǎng)元性能不均衡導致下游網(wǎng)元被沖垮,業(yè)務流程中斷;
2、防止由于通信模塊接收消息過快,后端業(yè)務線程處理不及時,導致出現(xiàn)“撐死”問題。
例如,之前有博客的讀者咨詢過我一個問題,他們設備向服務端不間斷的上報數(shù)據(jù),有1G左右,而服務端處理不過來這么多數(shù)據(jù),這種情況下,其實就可以使用流量整形來控制接收消息速度。
原理和使用
原理:攔截channelRead和write方法,計算當前需要發(fā)送的消息大小,對讀取和發(fā)送閾值進行判斷,如果達到了閾值,則暫停讀取和發(fā)送消息,待下一個周期繼續(xù)處理,以實現(xiàn)在某個周期內(nèi)對消息讀寫速度進行控制。
使用:將流量整形ChannelHandler添加到業(yè)務解碼器之前,

注意事項:
全局流量整形實例只需要創(chuàng)建一次
GlobalChannelTrafficShapingHandler 和 GlobalTrafficShapingHandler 是全局共享的,因此實例只需要創(chuàng)建一次,添加到不同的ChannelPipeline即可,不要創(chuàng)建多個實例,否則流量整形將失效。流量整形參數(shù)調(diào)整不要過于頻繁
消息發(fā)送保護機制
通過流量整形可以控制發(fā)送速度,但是它的控制原理是將待發(fā)送的消息封裝成Task放入消息隊列,等待執(zhí)行時間到達后繼續(xù)發(fā)送,所以如果業(yè)務發(fā)送線程不判斷channle的可以狀態(tài),就可能會導致OOM問題。