netty是如何實現(xiàn)流控的
netty實現(xiàn)流控的方式可以分兩個大類,第一類依賴于tcp的窗口機制,第二類通過使用流量整形,這里只對第一類進行介紹
滑動窗口和擁塞窗口
tcp協(xié)議使用滑動窗口和擁塞窗口進行流量控制
滑動窗口是在接收端確認的窗口,其實就是接收端的接收緩沖區(qū),隨著數(shù)據(jù)的不斷接收,如果接收端處理的速度沒有接收數(shù)據(jù)的速度快,那么緩沖區(qū)的大小在減小,也即是窗口在減少,那么接收端在發(fā)給發(fā)送端的ack中會包含可以接收的數(shù)據(jù)大小,那么發(fā)送端就會發(fā)送對應(yīng)接收端剩余窗口大小的數(shù)據(jù)
另一方面在發(fā)送端維護了一個擁塞窗口,在開始發(fā)送數(shù)據(jù)時,會使用一個慢啟動算法發(fā)送,窗口從1開始以指數(shù)級增長當(dāng)達到臨界值時,改為線性增加,如果發(fā)現(xiàn)網(wǎng)絡(luò)中有重傳發(fā)生,那么就會將擁塞窗口減小到1,從而避免自己發(fā)的太快導(dǎo)致的網(wǎng)絡(luò)堵塞。
netty依賴窗口實現(xiàn)的流控
- 作為發(fā)送方:當(dāng)我們執(zhí)行channel.write()時,最終的write操作會由unsafe接口處理,實現(xiàn)是AbstractUnsafe,片段代碼如下:
public final void write(Object msg, ChannelPromise promise) {
//。。。省略
outboundBuffer.addMessage(msg, size, promise);
}
緊接著addMessage方法會將消息加入到隊列中,進而增加發(fā)送緩沖區(qū)字節(jié)數(shù),代碼如下:
public void addMessage(Object msg, int size, ChannelPromise promise) {
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
tailEntry = entry;
} else {
Entry tail = tailEntry;
tail.next = entry;
tailEntry = entry;
}
if (unflushedEntry == null) {
unflushedEntry = entry;
}
// increment pending bytes after adding message to the unflushed arrays.
// See https://github.com/netty/netty/issues/1619
incrementPendingOutboundBytes(entry.pendingSize, false);
}
incrementPendingOutboundBytes會將當(dāng)前字節(jié)大小與配置的寫的高水位比較,如果超過配置的高水位值則執(zhí)行setUnwritable方法,就是將writable屬性設(shè)置為false,若設(shè)置成功則觸發(fā)了pipeline中的fireChannelWritabilityChanged接口,這個方法我們可以自己去實現(xiàn)比如降低我們發(fā)送的速率
private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
setUnwritable(invokeLater);
}
}
同樣的在將數(shù)據(jù)慢慢發(fā)送去之后,如果小于配置的低水位了,那么就再會觸發(fā)該事件
private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
if (size == 0) {
return;
}
long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
setWritable(invokeLater);
}
}
private void setWritable(boolean invokeLater) {
for (;;) {
final int oldValue = unwritable;
final int newValue = oldValue & ~1;
if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
if (oldValue != 0 && newValue == 0) {
fireChannelWritabilityChanged(invokeLater);
}
break;
}
}
}
所以這里要么可以維護一個發(fā)送隊列暫存消息或者快速失敗
- 作為接收方:netty提供了一個配置autoread,該參數(shù)設(shè)置后,那么即使select出的事件為讀時,那么也不會從內(nèi)核中讀數(shù)據(jù),這樣就會導(dǎo)致接收窗口滿,那么就會通知發(fā)送方接收端窗口為0,使得發(fā)送方降低發(fā)送速度,具體實現(xiàn)代碼如下:
#NioEventLoop 的processSelectedKey方法在觸發(fā)read方法時,會調(diào)用NioByteUnsafe的read方法
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
#該方法里面有個allocHandle.continueReading()判斷,具體實現(xiàn)在MaxMessageHandle
public final void read() {
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);
readPending = false;
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) {
removeReadOp();
}
}
}
#判斷的依據(jù)就是autoread
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
return config.isAutoRead() &&
maybeMoreDataSupplier.get() &&
totalMessages < maxMessagePerRead &&
totalBytesRead > 0;
}
業(yè)務(wù)控制方式:
作為發(fā)送方,發(fā)送前應(yīng)該檢查iswritable 如果返回false,一種方式是延遲發(fā)送可以用隊列或者timeout去實現(xiàn)
作為接收方,如果業(yè)務(wù)處理速度不足以匹配接收速度,結(jié)合隊列控制隊列滿時將autoread設(shè)置為false,不滿時將其打開,但是應(yīng)該注意不滿的條件觸發(fā),不應(yīng)使得autoread頻繁開關(guān),畢竟這也是耗費系統(tǒng)資源的