本文是Netty文集中“Netty 那些事兒”系列的文章。主要結(jié)合在開發(fā)實(shí)戰(zhàn)中,我們遇到的一些“奇奇怪怪”的問題,以及如何正確且更好的使用Netty框架,并會(huì)對(duì)Netty中涉及的重要設(shè)計(jì)理念進(jìn)行介紹。
Netty實(shí)現(xiàn)“流量整形”原理分析
流量整形
流量整形(Traffic Shaping)是一種主動(dòng)調(diào)整流量輸出速率的措施。流量整形與流量監(jiān)管的主要區(qū)別在于,流量整形對(duì)流量監(jiān)管中需要丟棄的報(bào)文進(jìn)行緩存——通常是將它們放入緩沖區(qū)或隊(duì)列內(nèi),也稱流量整形(Traffic Shaping,簡(jiǎn)稱TS)。當(dāng)報(bào)文的發(fā)送速度過(guò)快時(shí),首先在緩沖區(qū)進(jìn)行緩存;再通過(guò)流量計(jì)量算法的控制下“均勻”地發(fā)送這些被緩沖的報(bào)文。流量整形與流量監(jiān)管的另一區(qū)別是,整形可能會(huì)增加延遲,而監(jiān)管幾乎不引入額外的延遲。
Netty提供了GlobalTrafficShapingHandler、ChannelTrafficShapingHandler、GlobalChannelTrafficShapingHandler三個(gè)類來(lái)實(shí)現(xiàn)流量整形,他們都是AbstractTrafficShapingHandler抽象類的實(shí)現(xiàn)類,下面我們就對(duì)其進(jìn)行介紹,讓我們來(lái)了解Netty是如何實(shí)現(xiàn)流量整形的。
核心類分析
AbstractTrafficShapingHandler
AbstractTrafficShapingHandler允許限制全局的帶寬(見GlobalTrafficShapingHandler)或者每個(gè)session的帶寬(見ChannelTrafficShapingHandler)作為流量整形。
它允許你使用TrafficCounter來(lái)實(shí)現(xiàn)幾乎實(shí)時(shí)的帶寬監(jiān)控,TrafficCounter會(huì)在每個(gè)檢測(cè)間期(checkInterval)調(diào)用這個(gè)處理器的doAccounting方法。
如果你有任何特別的原因想要停止監(jiān)控(計(jì)數(shù))或者改變讀寫的限制或者改變檢測(cè)間期(checkInterval),可以使用如下方法:
① configure:允許你改變讀或?qū)懙南拗?,或者檢測(cè)間期(checkInterval);
② getTrafficCounter:允許你獲得TrafficCounter,并可以停止或啟動(dòng)監(jiān)控,直接改變檢測(cè)間期(checkInterval),或去訪問它的值。
TrafficCounter:對(duì)讀和寫的字節(jié)進(jìn)行計(jì)數(shù)以用于限制流量。
它會(huì)根據(jù)給定的檢測(cè)間期周期性的計(jì)算統(tǒng)計(jì)入站和出站的流量,并會(huì)回調(diào)AbstractTrafficShapingHandler的doAccounting方法。
如果檢測(cè)間期(checkInterval)是0,將不會(huì)進(jìn)行計(jì)數(shù)并且統(tǒng)計(jì)只會(huì)在每次讀或?qū)懖僮鲿r(shí)進(jìn)行計(jì)算。
- configure
public void configure(long newWriteLimit, long newReadLimit,
long newCheckInterval) {
configure(newWriteLimit, newReadLimit);
configure(newCheckInterval);
}
配置新的寫限制、讀限制、檢測(cè)間期。該方法會(huì)盡最大努力進(jìn)行此更改,這意味著已經(jīng)被延遲進(jìn)行的流量將不會(huì)使用新的配置,它僅用于新的流量中。
- ReopenReadTimerTask
static final class ReopenReadTimerTask implements Runnable {
final ChannelHandlerContext ctx;
ReopenReadTimerTask(ChannelHandlerContext ctx) {
this.ctx = ctx;
}
@Override
public void run() {
ChannelConfig config = ctx.channel().config();
if (!config.isAutoRead() && isHandlerActive(ctx)) {
// If AutoRead is False and Active is True, user make a direct setAutoRead(false)
// Then Just reset the status
if (logger.isDebugEnabled()) {
logger.debug("Not unsuspend: " + config.isAutoRead() + ':' +
isHandlerActive(ctx));
}
ctx.attr(READ_SUSPENDED).set(false);
} else {
// Anything else allows the handler to reset the AutoRead
if (logger.isDebugEnabled()) {
if (config.isAutoRead() && !isHandlerActive(ctx)) {
logger.debug("Unsuspend: " + config.isAutoRead() + ':' +
isHandlerActive(ctx));
} else {
logger.debug("Normal unsuspend: " + config.isAutoRead() + ':'
+ isHandlerActive(ctx));
}
}
ctx.attr(READ_SUSPENDED).set(false);
config.setAutoRead(true);
ctx.channel().read();
}
if (logger.isDebugEnabled()) {
logger.debug("Unsuspend final status => " + config.isAutoRead() + ':'
+ isHandlerActive(ctx));
}
}
}
重啟讀操作的定時(shí)任務(wù)。該定時(shí)任務(wù)總會(huì)實(shí)現(xiàn):
a) 如果Channel的autoRead為false,并且AbstractTrafficShapingHandler的READ_SUSPENDED屬性設(shè)置為null或false(說(shuō)明讀暫停未啟用或開啟),則直接將READ_SUSPENDED屬性設(shè)置為false。
b) 否則,如果Channel的autoRead為true,或者READ_SUSPENDED屬性的值為true(說(shuō)明讀暫停開啟了),則將READ_SUSPENDED屬性設(shè)置為false,并將Channel的autoRead標(biāo)識(shí)為true(該操作底層會(huì)將該Channel的OP_READ事件重新注冊(cè)為感興趣的事件,這樣Selector就會(huì)監(jiān)聽該Channel的讀就緒事件了),最后觸發(fā)一次Channel的read操作。
也就說(shuō),若“讀操作”為“開啟”狀態(tài)(READ_SUSPENDED為null或false)的情況下,Channel的autoRead是保持Channel原有的配置,此時(shí)并不會(huì)做什么操作。但當(dāng)“讀操作”從“暫?!睜顟B(tài)(READ_SUSPENDED為true)轉(zhuǎn)為“開啟”狀態(tài)(READ_SUSPENDED為false)時(shí),則會(huì)將Channel的autoRead標(biāo)志為true,并將“讀操作”設(shè)置為“開啟”狀態(tài)(READ_SUSPENDED為false)。
- channelRead
public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
long size = calculateSize(msg);
long now = TrafficCounter.milliSecondFromNano();
if (size > 0) {
// compute the number of ms to wait before reopening the channel
long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime, now);
wait = checkWaitReadTime(ctx, wait, now);
if (wait >= MINIMAL_WAIT) { // At least 10ms seems a minimal
// time in order to try to limit the traffic
// Only AutoRead AND HandlerActive True means Context Active
ChannelConfig config = ctx.channel().config();
if (logger.isDebugEnabled()) {
logger.debug("Read suspend: " + wait + ':' + config.isAutoRead() + ':'
+ isHandlerActive(ctx));
}
if (config.isAutoRead() && isHandlerActive(ctx)) {
config.setAutoRead(false);
ctx.attr(READ_SUSPENDED).set(true);
// Create a Runnable to reactive the read if needed. If one was create before it will just be
// reused to limit object creation
Attribute<Runnable> attr = ctx.attr(REOPEN_TASK);
Runnable reopenTask = attr.get();
if (reopenTask == null) {
reopenTask = new ReopenReadTimerTask(ctx);
attr.set(reopenTask);
}
ctx.executor().schedule(reopenTask, wait, TimeUnit.MILLISECONDS);
if (logger.isDebugEnabled()) {
logger.debug("Suspend final status => " + config.isAutoRead() + ':'
+ isHandlerActive(ctx) + " will reopened at: " + wait);
}
}
}
}
informReadOperation(ctx, now);
ctx.fireChannelRead(msg);
}
① 『long size = calculateSize(msg);』計(jì)算本次讀取到的消息的字節(jié)數(shù)。
② 如果讀取到的字節(jié)數(shù)大于0,則根據(jù)數(shù)據(jù)的大小、設(shè)定的readLimit、最大延遲時(shí)間等計(jì)算(『long wait = trafficCounter.readTimeToWait(size, readLimit, maxTime, now);』)得到下一次開啟讀操作需要的延遲時(shí)間(距當(dāng)前時(shí)間而言)wait(毫秒)。
③ 如果a)wait >= MINIMAL_WAIT(10毫秒)。并且b)當(dāng)前Channel為自動(dòng)讀?。矗琣utoRead為true)以及c)當(dāng)前的READ_SUSPENDED標(biāo)識(shí)為null或false(即,讀操作未被暫停),那么將Channel的autoRead設(shè)置為false(該操作底層會(huì)將該Channel的OP_READ事件從感興趣的事件中移除,這樣Selector就不會(huì)監(jiān)聽該Channel的讀就緒事件了),并且將READ_SUSPENDED標(biāo)識(shí)為true(說(shuō)明,接下來(lái)的讀操作會(huì)被暫停),并將“重新開啟讀操作“封裝為一個(gè)任務(wù),讓入Channel所注冊(cè)NioEventLoop的定時(shí)任務(wù)隊(duì)列中(延遲wait時(shí)間后執(zhí)行)。
也就說(shuō),只有當(dāng)計(jì)算出的下一次讀操作的時(shí)間大于了MINIMAL_WAIT(10毫秒),并且當(dāng)前Channel是自動(dòng)讀取的,且“讀操作”處于“開啟”狀態(tài)時(shí),才會(huì)去暫停讀操作,而暫停讀操作主要需要完成三件事:[1]將Channel的autoRead標(biāo)識(shí)設(shè)置為false,這使得OP_READ會(huì)從感興趣的事件中移除,這樣Selector就會(huì)不會(huì)監(jiān)聽這個(gè)Channel的讀就緒事件了;[2]將“讀操作”狀態(tài)設(shè)置為“暫?!保≧EAD_SUSPENDED為true);[3]將重啟開啟“讀操作”的操作封裝為一個(gè)task,在延遲wait時(shí)間后執(zhí)行。
當(dāng)你將得Channel的autoRead都會(huì)被設(shè)置為false時(shí),Netty底層就不會(huì)再去執(zhí)行讀操作了,也就是說(shuō),這時(shí)如果有數(shù)據(jù)過(guò)來(lái),會(huì)先放入到內(nèi)核的接收緩沖區(qū),只有我們執(zhí)行讀操作的時(shí)候數(shù)據(jù)才會(huì)從內(nèi)核緩沖區(qū)讀取到用戶緩沖區(qū)中。而對(duì)于TCP協(xié)議來(lái)說(shuō),你不要擔(dān)心一次內(nèi)核緩沖區(qū)會(huì)溢出。因?yàn)槿绻麘?yīng)用進(jìn)程一直沒有讀取,接收緩沖區(qū)滿了之后,發(fā)生的動(dòng)作是:通知對(duì)端TCP協(xié)議中的窗口關(guān)閉。這個(gè)便是滑動(dòng)窗口的實(shí)現(xiàn)。保證TCP套接口接收緩沖區(qū)不會(huì)溢出,從而保證了TCP是可靠傳輸。因?yàn)閷?duì)方不允許發(fā)出超過(guò)所通告窗口大小的數(shù)據(jù)。 這就是TCP的流量控制,如果對(duì)方無(wú)視窗口大小而發(fā)出了超過(guò)窗口大小的數(shù)據(jù),則接收方TCP將丟棄它。
④ 將當(dāng)前的消息發(fā)送給ChannelPipeline中的下一個(gè)ChannelInboundHandler。
- write
public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise)
throws Exception {
long size = calculateSize(msg);
long now = TrafficCounter.milliSecondFromNano();
if (size > 0) {
// compute the number of ms to wait before continue with the channel
long wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime, now);
if (wait >= MINIMAL_WAIT) {
if (logger.isDebugEnabled()) {
logger.debug("Write suspend: " + wait + ':' + ctx.channel().config().isAutoRead() + ':'
+ isHandlerActive(ctx));
}
submitWrite(ctx, msg, size, wait, now, promise);
return;
}
}
// to maintain order of write
submitWrite(ctx, msg, size, 0, now, promise);
}
① 『long size = calculateSize(msg);』計(jì)算待寫出的數(shù)據(jù)大小
② 如果待寫出數(shù)據(jù)的字節(jié)數(shù)大于0,則根據(jù)數(shù)據(jù)大小、設(shè)置的writeLimit、最大延遲時(shí)間等計(jì)算(『long wait = trafficCounter.writeTimeToWait(size, writeLimit, maxTime, now);』)得到本次寫操作需要的延遲時(shí)間(距當(dāng)前時(shí)間而言)wait(毫秒)。
③ 如果wait >= MINIMAL_WAIT(10毫秒),則調(diào)用『submitWrite(ctx, msg, size, wait, now, promise);』wait即為延遲時(shí)間,該方法的具體實(shí)現(xiàn)由子類完成;否則,若wait < MINIMAL_WAIT(10毫秒),則調(diào)用『submitWrite(ctx, msg, size, 0, now, promise);』注意這里傳遞的延遲時(shí)間為0了。
GlobalTrafficShapingHandler
這實(shí)現(xiàn)了AbstractTrafficShapingHandler的全局流量整形,也就是說(shuō)它限制了全局的帶寬,無(wú)論開啟了幾個(gè)channel。
注意『 OutboundBuffer.setUserDefinedWritability(index, boolean)』中索引使用’2’。
一般用途如下:
創(chuàng)建一個(gè)唯一的GlobalTrafficShapingHandler
GlobalTrafficShapingHandler myHandler = new GlobalTrafficShapingHandler(executor);
pipeline.addLast(myHandler);
executor可以是底層的IO工作池
注意,這個(gè)處理器是覆蓋所有管道的,這意味著只有一個(gè)處理器對(duì)象會(huì)被創(chuàng)建并且作為所有channel間共享的計(jì)數(shù)器,它必須于所有的channel共享。
所有你可以見到,該類的定義上面有個(gè)@Sharable注解。
在你的處理器中,你需要考慮使用『channel.isWritable()』和『channelWritabilityChanged(ctx)』來(lái)處理可寫性,或通過(guò)在ctx.write()返回的future上注冊(cè)listener來(lái)實(shí)現(xiàn)。
你還需要考慮讀或?qū)懖僮鲗?duì)象的大小需要和你要求的帶寬相對(duì)應(yīng):比如,你將一個(gè)10M大小的對(duì)象用于10KB/s的帶寬將會(huì)導(dǎo)致爆發(fā)效果,若你將100KB大小的對(duì)象用于在1M/s帶寬那么將會(huì)被流量整形處理器平滑處理。
一旦不在需要這個(gè)處理器時(shí)請(qǐng)確保調(diào)用『release()』以釋放所有內(nèi)部的資源。這不會(huì)關(guān)閉EventExecutor,因?yàn)樗赡苁枪蚕淼?,所以這需要你自己做。
GlobalTrafficShapingHandler中持有一個(gè)Channel的哈希表,用于存儲(chǔ)當(dāng)前應(yīng)用所有的Channel:
private final ConcurrentMap<Integer, PerChannel> channelQueues = PlatformDependent.newConcurrentHashMap();
key為Channel的hashCode;value是一個(gè)PerChannel對(duì)象。
PerChannel對(duì)象中維護(hù)有該Channel的待發(fā)送數(shù)據(jù)的消息隊(duì)列(ArrayDeque<ToSend> messagesQueue)。
- submitWrite
void submitWrite(final ChannelHandlerContext ctx, final Object msg,
final long size, final long writedelay, final long now,
final ChannelPromise promise) {
Channel channel = ctx.channel();
Integer key = channel.hashCode();
PerChannel perChannel = channelQueues.get(key);
if (perChannel == null) {
// in case write occurs before handlerAdded is raised for this handler
// imply a synchronized only if needed
perChannel = getOrSetPerChannel(ctx);
}
final ToSend newToSend;
long delay = writedelay;
boolean globalSizeExceeded = false;
// write operations need synchronization
synchronized (perChannel) {
if (writedelay == 0 && perChannel.messagesQueue.isEmpty()) {
trafficCounter.bytesRealWriteFlowControl(size);
ctx.write(msg, promise);
perChannel.lastWriteTimestamp = now;
return;
}
if (delay > maxTime && now + delay - perChannel.lastWriteTimestamp > maxTime) {
delay = maxTime;
}
newToSend = new ToSend(delay + now, msg, size, promise);
perChannel.messagesQueue.addLast(newToSend);
perChannel.queueSize += size;
queuesSize.addAndGet(size);
checkWriteSuspend(ctx, delay, perChannel.queueSize);
if (queuesSize.get() > maxGlobalWriteSize) {
globalSizeExceeded = true;
}
}
if (globalSizeExceeded) {
setUserDefinedWritability(ctx, false);
}
final long futureNow = newToSend.relativeTimeAction;
final PerChannel forSchedule = perChannel;
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
sendAllValid(ctx, forSchedule, futureNow);
}
}, delay, TimeUnit.MILLISECONDS);
}
寫操作提交上來(lái)的數(shù)據(jù)。
① 如果寫延遲為0,且當(dāng)前該Channel的messagesQueue為空(說(shuō)明,在此消息前沒有待發(fā)送的消息了),那么直接發(fā)送該消息包。并返回,否則到下一步。
② 『newToSend = new ToSend(delay + now, msg, size, promise);
perChannel.messagesQueue.addLast(newToSend);』
將待發(fā)送的數(shù)據(jù)封裝成ToSend對(duì)象放入PerChannel的消息隊(duì)列中(messagesQueue)。注意,這里的messagesQueue是一個(gè)ArrayDeque隊(duì)列,我們總是從隊(duì)列尾部插入。然后從隊(duì)列的頭獲取消息來(lái)依次發(fā)送,這就保證了消息的有序性。但是,如果一個(gè)大數(shù)據(jù)包前于一個(gè)小數(shù)據(jù)包發(fā)送的話,小數(shù)據(jù)包也會(huì)因?yàn)榇髷?shù)據(jù)包的延遲發(fā)送而被延遲到大數(shù)據(jù)包發(fā)送后才會(huì)發(fā)送。
ToSend 對(duì)象中持有帶發(fā)送的數(shù)據(jù)對(duì)象、發(fā)送的相對(duì)延遲時(shí)間(即,根據(jù)數(shù)據(jù)包大小以及設(shè)置的寫流量限制值(writeLimit)等計(jì)算出來(lái)的延遲操作的時(shí)間)、消息數(shù)據(jù)的大小、異步寫操作的promise。
③ 『checkWriteSuspend(ctx, delay, perChannel.queueSize);』
檢查單個(gè)Channel待發(fā)送的數(shù)據(jù)包是否超過(guò)了maxWriteSize(默認(rèn)4M),或者延遲時(shí)間是否超過(guò)了maxWriteDelay(默認(rèn)4s)。如果是的話,則調(diào)用『setUserDefinedWritability(ctx, false);』該方法會(huì)將ChannelOutboundBuffer中的unwritable屬性值的相應(yīng)標(biāo)志位置位(unwritable關(guān)系到isWritable方法是否會(huì)返回true。以及會(huì)在unwritable從0到非0間變化時(shí)觸發(fā)ChannelWritabilityChanged事件)。
④ 如果所有待發(fā)送的數(shù)據(jù)大?。ㄟ@里指所有Channel累積的待發(fā)送的數(shù)據(jù)大?。┐笥诹薽axGlobalWriteSize(默認(rèn)400M),則標(biāo)識(shí)globalSizeExceeded為true,并且調(diào)用『setUserDefinedWritability(ctx, false)』將ChannelOutboundBuffer中的unwritable屬性值相應(yīng)的標(biāo)志位置位。
⑤ 根據(jù)指定的延遲時(shí)間delay,將『sendAllValid(ctx, forSchedule, futureNow);』操作封裝成一個(gè)任務(wù)提交至executor的定時(shí)周期任務(wù)隊(duì)列中。
sendAllValid操作會(huì)遍歷該Channel中待發(fā)送的消息隊(duì)列messagesQueue,依次取出perChannel.messagesQueue中的消息包,將滿足發(fā)送條件(即,延遲發(fā)送的時(shí)間已經(jīng)到了)的消息發(fā)送給到ChannelPipeline中的下一個(gè)ChannelOutboundHandler(ctx.write(newToSend.toSend, newToSend.promise);),并且將perChannel.queueSize(當(dāng)前Channel待發(fā)送的總數(shù)據(jù)大?。┖蛁ueuesSize(所有Channel待發(fā)送的總數(shù)據(jù)大小)減小相應(yīng)的值(即,被發(fā)送出去的這個(gè)數(shù)據(jù)包的大?。Qh(huán)遍歷前面的操作直到當(dāng)前的消息不滿足發(fā)送條件則退出遍歷。并且如果該Channel的消息隊(duì)列中的消息全部都發(fā)送出去的話(即,messagesQueue.isEmpty()為true),則會(huì)通過(guò)調(diào)用『releaseWriteSuspended(ctx);』來(lái)釋放寫暫停。而該方法底層會(huì)將ChannelOutboundBuffer中的unwritable屬性值相應(yīng)的標(biāo)志位重置。
ChannelTrafficShapingHandler
ChannelTrafficShapingHandler是針對(duì)單個(gè)Channel的流量整形,和GlobalTrafficShapingHandler的思想是一樣的。只是實(shí)現(xiàn)中沒有對(duì)全局概念的檢測(cè),僅檢測(cè)了當(dāng)前這個(gè)Channel的數(shù)據(jù)。
這里就不再贅述了。
GlobalChannelTrafficShapingHandler
相比于GlobalTrafficShapingHandler增加了一個(gè)誤差概念,以平衡各個(gè)Channel間的讀/寫操作。也就是說(shuō),使得各個(gè)Channel間的讀/寫操作盡量均衡。比如,盡量避免不同Channel的大數(shù)據(jù)包都延遲近乎一樣的是時(shí)間再操作,以及如果小數(shù)據(jù)包在一個(gè)大數(shù)據(jù)包后才發(fā)送,則減少該小數(shù)據(jù)包的延遲發(fā)送時(shí)間等。。
“流量整形”實(shí)戰(zhàn)
這里僅展示服務(wù)端和客戶端中使用“流量整形”功能涉及的關(guān)鍵代碼,完整demo可見github
服務(wù)端
使用GlobalTrafficShapingHandler來(lái)實(shí)現(xiàn)服務(wù)端的“流量整形”,每當(dāng)有客戶端連接至服務(wù)端時(shí)服務(wù)端就會(huì)開始往這個(gè)客戶端發(fā)送26M的數(shù)據(jù)包。我們將GlobalTrafficShapingHandler的writeLimit設(shè)置為10M/S。并使用了ChunkedWriteHandler來(lái)實(shí)現(xiàn)大數(shù)據(jù)包拆分成小數(shù)據(jù)包發(fā)送的功能。
MyServerInitializer實(shí)現(xiàn):在ChannelPipeline中注冊(cè)了GlobalTrafficShapingHandler
public class MyServerInitializer extends ChannelInitializer<SocketChannel> {
Charset utf8 = Charset.forName("utf-8");
final int M = 1024 * 1024;
@Override
protected void initChannel(SocketChannel ch) throws Exception {
GlobalTrafficShapingHandler globalTrafficShapingHandler = new GlobalTrafficShapingHandler(ch.eventLoop().parent(), 10 * M, 50 * M);
// globalTrafficShapingHandler.setMaxGlobalWriteSize(50 * M);
// globalTrafficShapingHandler.setMaxWriteSize(5 * M);
ch.pipeline()
.addLast("LengthFieldBasedFrameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4, true))
.addLast("LengthFieldPrepender", new LengthFieldPrepender(4, 0))
.addLast("GlobalTrafficShapingHandler", globalTrafficShapingHandler)
.addLast("chunkedWriteHandler", new ChunkedWriteHandler())
.addLast("myServerChunkHandler", new MyServerChunkHandler())
.addLast("StringDecoder", new StringDecoder(utf8))
.addLast("StringEncoder", new StringEncoder(utf8))
.addLast("myServerHandler", new MyServerHandlerForPlain());
}
}
ServerHandler:當(dāng)有客戶端連接上了后就開始給客戶端發(fā)送消息。并且通過(guò)『Channel#isWritable』方法以及『channelWritabilityChanged』事件來(lái)監(jiān)控可寫性,以判斷啥時(shí)需要停止數(shù)據(jù)的寫出,啥時(shí)可以開始繼續(xù)寫出數(shù)據(jù)。同時(shí)寫了一個(gè)簡(jiǎn)易的task來(lái)計(jì)算每秒數(shù)據(jù)的發(fā)送速率(并非精確的計(jì)算)。
public class MyServerHandlerForPlain extends MyServerCommonHandler {
@Override
protected void sentData(ChannelHandlerContext ctx) {
sentFlag = true;
ctx.writeAndFlush(tempStr, getChannelProgressivePromise(ctx, future -> {
if(ctx.channel().isWritable() && !sentFlag) {
sentData(ctx);
}
}));
}
@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
if(ctx.channel().isWritable() && !sentFlag) {
// System.out.println(" ###### 重新開始寫數(shù)據(jù) ######");
sentData(ctx);
} else {
// System.out.println(" ===== 寫暫停 =====");
}
}
}
public abstract class MyServerCommonHandler extends SimpleChannelInboundHandler<String> {
protected final int M = 1024 * 1024;
protected String tempStr;
protected AtomicLong consumeMsgLength;
protected Runnable counterTask;
private long priorProgress;
protected boolean sentFlag;
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
consumeMsgLength = new AtomicLong();
counterTask = () -> {
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
long length = consumeMsgLength.getAndSet(0);
System.out.println("*** " + ctx.channel().remoteAddress() + " rate(M/S):" + (length / M));
}
};
StringBuilder builder = new StringBuilder();
for (int i = 0; i < M; i++) {
builder.append("abcdefghijklmnopqrstuvwxyz");
}
tempStr = builder.toString();
super.handlerAdded(ctx);
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
sentData(ctx);
new Thread(counterTask).start();
}
protected ChannelProgressivePromise getChannelProgressivePromise(ChannelHandlerContext ctx, Consumer<ChannelProgressiveFuture> completedAction) {
ChannelProgressivePromise channelProgressivePromise = ctx.newProgressivePromise();
channelProgressivePromise.addListener(new ChannelProgressiveFutureListener(){
@Override
public void operationProgressed(ChannelProgressiveFuture future, long progress, long total) throws Exception {
consumeMsgLength.addAndGet(progress - priorProgress);
priorProgress = progress;
}
@Override
public void operationComplete(ChannelProgressiveFuture future) throws Exception {
sentFlag = false;
if(future.isSuccess()){
System.out.println("成功發(fā)送完成!");
priorProgress -= 26 * M;
Optional.ofNullable(completedAction).ifPresent(action -> action.accept(future));
} else {
System.out.println("發(fā)送失?。。。。?!");
future.cause().printStackTrace();
}
}
});
return channelProgressivePromise;
}
protected abstract void sentData(ChannelHandlerContext ctx);
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("===== receive client msg : " + msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.channel().close();
}
}
客戶端
客戶端比較簡(jiǎn)單了,使用ChannelTrafficShapingHandler來(lái)實(shí)現(xiàn)“流量整形”,并將readLimit設(shè)置為1M/S。
public class MyClientInitializer extends ChannelInitializer<SocketChannel> {
Charset utf8 = Charset.forName("utf-8");
final int M = 1024 * 1024;
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelTrafficShapingHandler channelTrafficShapingHandler = new ChannelTrafficShapingHandler(10 * M, 1 * M);
ch.pipeline()
.addLast("channelTrafficShapingHandler",channelTrafficShapingHandler)
.addLast("lengthFieldBasedFrameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4, true))
.addLast("lengthFieldPrepender", new LengthFieldPrepender(4, 0))
.addLast("stringDecoder", new StringDecoder(utf8))
.addLast("stringEncoder", new StringEncoder(utf8))
.addLast("myClientHandler", new MyClientHandler());
}
}
注意事項(xiàng)
① 注意,trafficShaping是通過(guò)程序來(lái)達(dá)到控制流量的作用,并不是網(wǎng)絡(luò)層真實(shí)的傳輸流量大小的控制。TrafficShapingHandler僅僅是根據(jù)消息大?。ùl(fā)送出去的數(shù)據(jù)包大小)和設(shè)定的流量限制來(lái)得出延遲發(fā)送該包的時(shí)間,即同一時(shí)刻不會(huì)發(fā)送過(guò)大的數(shù)據(jù)導(dǎo)致帶寬負(fù)荷不了。但是并沒有對(duì)大數(shù)據(jù)包進(jìn)行拆分的作用,這會(huì)使在發(fā)送這個(gè)大數(shù)據(jù)包時(shí)同樣可能會(huì)導(dǎo)致帶寬爆掉的情況。所以你需要注意一次發(fā)送數(shù)據(jù)包的大小,不要大于你設(shè)置限定的寫帶寬大小(writeLimit)。你可以通過(guò)在業(yè)務(wù)handler中自己控制的方式,或者考慮使用ChunkedWriteHandler,如果它能滿足你的要求的話。同時(shí)注意,不要將writeLimit和readLimit設(shè)置的過(guò)小,這是沒有意義的,只會(huì)導(dǎo)致讀/寫操作的不斷停頓。。
② 注意,不要在非NioEventLoop線程中不停歇的發(fā)送非ByteBuf、ByteBufHolder或者FileRegion對(duì)象的大數(shù)據(jù)包,如:
new Thread(() -> {
while (true) {
if(ctx.channel().isWritable()) {
ctx.writeAndFlush(tempStr, getChannelProgressivePromise(ctx, null));
}
}
}).start();
因?yàn)閷懖僮魇且粋€(gè)I/O操作,當(dāng)你在非NioEventLoop線程上執(zhí)行了Channel的I/O操作的話,該操作會(huì)封裝為一個(gè)task 被提交至NioEventLoop的任務(wù)隊(duì)列中,以使得I/O操作最終是NioEventLoop線程上得到執(zhí)行。
而提交這個(gè)任務(wù)的流程,僅會(huì)對(duì)ByteBuf、ByteBufHolder或者FileRegion對(duì)象進(jìn)行真實(shí)數(shù)據(jù)大小的估計(jì)(其他情況默認(rèn)估計(jì)大小為8 bytes),并將估計(jì)后的數(shù)據(jù)大小值對(duì)該ChannelOutboundBuffer的totalPendingSize屬性值進(jìn)行累加。而totalPendingSize同WriteBufferWaterMark一起來(lái)控制著Channel的unwritable。所以,如果你在一個(gè)非NioEventLoop線程中不斷地發(fā)送一個(gè)非ByteBuf、ByteBufHolder或者FileRegion對(duì)象的大數(shù)據(jù)包時(shí),最終就會(huì)導(dǎo)致提交大量的任務(wù)到NioEventLoop線程的任務(wù)隊(duì)列中,而當(dāng)NioEventLoop線程在真實(shí)執(zhí)行這些task時(shí)可能發(fā)生OOM。
擴(kuò)展
關(guān)于 “OP_WRITE” 與 “Channel#isWritable()”
首先,我們需要明確的一點(diǎn)是,“OP_WRITE” 與 “Channel#isWritable()” 雖然都是的對(duì)數(shù)據(jù)的可寫性進(jìn)行檢測(cè),但是它們是分別針對(duì)不同層面的可寫性的。
- “OP_WRITE”是當(dāng)內(nèi)核的發(fā)送緩沖區(qū)滿的時(shí)候,我們程序執(zhí)行write操作(這里是真實(shí)寫操作了,將數(shù)據(jù)通過(guò)TCP協(xié)議進(jìn)行網(wǎng)絡(luò)傳輸)無(wú)法將數(shù)據(jù)寫出,這時(shí)我們需要注冊(cè)O(shè)P_WRITE事件。這樣當(dāng)發(fā)送緩沖區(qū)空閑時(shí)就OP_WRITE事件就會(huì)觸發(fā),我們就可以繼續(xù)write未寫完的數(shù)據(jù)了。這可以看做是對(duì)系統(tǒng)層面的可寫性的一種檢測(cè)。
- 而“Channel#isWritable()”則是檢測(cè)程序中的緩存的待寫出的數(shù)據(jù)大小超過(guò)了我們?cè)O(shè)定的相關(guān)最大寫數(shù)據(jù)大小,如果超過(guò)了isWritable()方法將返回false,說(shuō)明這時(shí)我們不應(yīng)該再繼續(xù)進(jìn)行write操作了(這里寫操作一般為通過(guò)ChannelHandlerContext或Channel進(jìn)行的寫操作)。
關(guān)于“OP_WRITE”前面的NIO文章及前面Netty系列文章已經(jīng)進(jìn)行過(guò)不少介紹了,這里不再贅述。下面我們來(lái)看看“Channel#isWritable()”是如果檢測(cè)可寫性的。
public boolean isWritable() {
return unwritable == 0;
}
ChannelOutboundBuffer 的 unwritable屬性為0時(shí),Channel的isWritable()方法將返回true;否之,返回false;
unwritable可以看做是一個(gè)二進(jìn)制的開關(guān)屬性值。它的二進(jìn)制的不同位表示的不同狀態(tài)的開關(guān)。如:

ChannelOutboundBuffer有四個(gè)方法會(huì)對(duì)unwritable屬性值進(jìn)行修改:clearUserDefinedWritability、setUnwritable、setUserDefinedWritability、setWritable。并且,當(dāng)unwritable從0到非0間改變時(shí)還會(huì)觸發(fā)ChannelWritabilityChanged事件,以通知ChannelPipeline中的各個(gè)ChannelHandler當(dāng)前Channel可寫性發(fā)生了改變。
其中setUnwritable、setWritable這對(duì)方法是由于待寫數(shù)據(jù)大小高于或低于了WriteBufferWaterMark的水位線而導(dǎo)致的unwritable屬性值的改變。
我們所執(zhí)行的『ChannelHandlerContext#write』和『Channel#write』操作會(huì)先將待發(fā)送的數(shù)據(jù)包放到Channel的輸出緩沖區(qū)(ChannelOutboundBuffer)中,然后在執(zhí)行flush操作的時(shí)候,會(huì)從ChannelOutboundBuffer中依次出去數(shù)據(jù)包進(jìn)行真實(shí)的網(wǎng)絡(luò)數(shù)據(jù)傳輸。而WriteBufferWaterMark控制的就是ChannelOutboundBuffer中待發(fā)送的數(shù)據(jù)總大?。?,totalPendingSize:包含一個(gè)個(gè)ByteBuf中待發(fā)送的數(shù)據(jù)大小,以及數(shù)據(jù)包對(duì)象占用的大?。?。如果totalPendingSize的大小超過(guò)了WriteBufferWaterMark高水位(默認(rèn)為64KB),則會(huì)unwritable屬性的’WriteBufferWaterMark狀態(tài)位’置位1;隨著數(shù)據(jù)不斷寫出(每寫完一個(gè)ByteBuf后,就會(huì)將totalPendingSize減少相應(yīng)的值),當(dāng)totalPendingSize的大小小于WriteBufferWaterMark低水位(默認(rèn)為32KB)時(shí),則會(huì)將unwritable屬性的’WriteBufferWaterMark狀態(tài)位’置位0。
而本文的主題“流量整形”則是使用了clearUserDefinedWritability、setUserDefinedWritability這對(duì)方法來(lái)控制unwritable相應(yīng)的狀態(tài)位。
當(dāng)數(shù)據(jù)write到GlobalTrafficShapingHandler的時(shí)候,估計(jì)的數(shù)據(jù)大小大于0,且通過(guò)trafficCounter計(jì)算出的延遲時(shí)間大于最小延遲時(shí)間(MINIMAL_WAIT,默認(rèn)為10ms)時(shí),滿足如下任意條件會(huì)使得unwritable的’GlobalTrafficShaping狀態(tài)位’置為1:
- 當(dāng)perChannel.queueSize(單個(gè)Channel中待寫出的總數(shù)據(jù)大小)設(shè)定的最大寫數(shù)據(jù)大小時(shí)(默認(rèn)為4M)
- 當(dāng)queuesSize(所有Channel的待寫出的總數(shù)據(jù)大?。┏^(guò)設(shè)定的最大寫數(shù)據(jù)大小時(shí)(默認(rèn)為400M)
- 對(duì)于Channel發(fā)送的單個(gè)數(shù)據(jù)包如果太大,以至于計(jì)算出的延遲發(fā)送時(shí)間大于了最大延遲發(fā)送時(shí)間(maxWriteDelay,默認(rèn)為4s)時(shí)
隨著寫延遲時(shí)間的到達(dá)GlobalTrafficShaping中積壓的數(shù)據(jù)不斷被寫出,當(dāng)某個(gè)Channel中所有待寫出的數(shù)據(jù)都寫出后(注意,這里指將數(shù)據(jù)寫到ChannelPipeline中的下一個(gè)ChannelOutboundBuffer中)會(huì)將unwritable的’GlobalTrafficShaping狀態(tài)位’置為0。
后記
本文主要對(duì)Netty是如何實(shí)現(xiàn)“流量整形”的原理進(jìn)行了分析,并給出了一個(gè)簡(jiǎn)單demo。而在實(shí)際開發(fā)中,問題往往更加的復(fù)雜,可能會(huì)涉及到不少文中未提及的要點(diǎn)。但對(duì)原理有一定了解后結(jié)合實(shí)際情況和需求在做優(yōu)化和進(jìn)一步的開發(fā)就會(huì)比較有頭緒了,因此非常歡迎大家分享你們的線程問題對(duì)此類問題與我留言討論~
若文章有任何錯(cuò)誤,望大家不吝指教:)