流量控制

netty是如何實現(xiàn)流控的

netty實現(xiàn)流控的方式可以分兩個大類,第一類依賴于tcp的窗口機制,第二類通過使用流量整形,這里只對第一類進行介紹

滑動窗口和擁塞窗口

tcp協(xié)議使用滑動窗口和擁塞窗口進行流量控制

  1. 滑動窗口是在接收端確認的窗口,其實就是接收端的接收緩沖區(qū),隨著數(shù)據(jù)的不斷接收,如果接收端處理的速度沒有接收數(shù)據(jù)的速度快,那么緩沖區(qū)的大小在減小,也即是窗口在減少,那么接收端在發(fā)給發(fā)送端的ack中會包含可以接收的數(shù)據(jù)大小,那么發(fā)送端就會發(fā)送對應(yīng)接收端剩余窗口大小的數(shù)據(jù)

  2. 另一方面在發(fā)送端維護了一個擁塞窗口,在開始發(fā)送數(shù)據(jù)時,會使用一個慢啟動算法發(fā)送,窗口從1開始以指數(shù)級增長當(dāng)達到臨界值時,改為線性增加,如果發(fā)現(xiàn)網(wǎng)絡(luò)中有重傳發(fā)生,那么就會將擁塞窗口減小到1,從而避免自己發(fā)的太快導(dǎo)致的網(wǎng)絡(luò)堵塞。

netty依賴窗口實現(xiàn)的流控

  1. 作為發(fā)送方:當(dāng)我們執(zhí)行channel.write()時,最終的write操作會由unsafe接口處理,實現(xiàn)是AbstractUnsafe,片段代碼如下:
public final void write(Object msg, ChannelPromise promise) {
           //。。。省略
            outboundBuffer.addMessage(msg, size, promise);
 }

緊接著addMessage方法會將消息加入到隊列中,進而增加發(fā)送緩沖區(qū)字節(jié)數(shù),代碼如下:

public void addMessage(Object msg, int size, ChannelPromise promise) {
        Entry entry = Entry.newInstance(msg, size, total(msg), promise);
        if (tailEntry == null) {
            flushedEntry = null;
            tailEntry = entry;
        } else {
            Entry tail = tailEntry;
            tail.next = entry;
            tailEntry = entry;
        }
        if (unflushedEntry == null) {
            unflushedEntry = entry;
        }

        // increment pending bytes after adding message to the unflushed arrays.
        // See https://github.com/netty/netty/issues/1619
        incrementPendingOutboundBytes(entry.pendingSize, false);
    }

incrementPendingOutboundBytes會將當(dāng)前字節(jié)大小與配置的寫的高水位比較,如果超過配置的高水位值則執(zhí)行setUnwritable方法,就是將writable屬性設(shè)置為false,若設(shè)置成功則觸發(fā)了pipeline中的fireChannelWritabilityChanged接口,這個方法我們可以自己去實現(xiàn)比如降低我們發(fā)送的速率

private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
        if (size == 0) {
            return;
        }

        long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
        if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
            setUnwritable(invokeLater);
        }
    }

同樣的在將數(shù)據(jù)慢慢發(fā)送去之后,如果小于配置的低水位了,那么就再會觸發(fā)該事件

private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
        if (size == 0) {
            return;
        }

        long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
        if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
            setWritable(invokeLater);
        }
    }

private void setWritable(boolean invokeLater) {
        for (;;) {
            final int oldValue = unwritable;
            final int newValue = oldValue & ~1;
            if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
                if (oldValue != 0 && newValue == 0) {
                    fireChannelWritabilityChanged(invokeLater);
                }
                break;
            }
        }
    }

所以這里要么可以維護一個發(fā)送隊列暫存消息或者快速失敗

  1. 作為接收方:netty提供了一個配置autoread,該參數(shù)設(shè)置后,那么即使select出的事件為讀時,那么也不會從內(nèi)核中讀數(shù)據(jù),這樣就會導(dǎo)致接收窗口滿,那么就會通知發(fā)送方接收端窗口為0,使得發(fā)送方降低發(fā)送速度,具體實現(xiàn)代碼如下:
#NioEventLoop 的processSelectedKey方法在觸發(fā)read方法時,會調(diào)用NioByteUnsafe的read方法
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }

#該方法里面有個allocHandle.continueReading()判斷,具體實現(xiàn)在MaxMessageHandle
 public final void read() {
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            final ByteBufAllocator allocator = config.getAllocator();
            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
            allocHandle.reset(config);

            ByteBuf byteBuf = null;
            boolean close = false;
            try {
                do {
                    byteBuf = allocHandle.allocate(allocator);
                    allocHandle.lastBytesRead(doReadBytes(byteBuf));
                    if (allocHandle.lastBytesRead() <= 0) {
                        // nothing was read. release the buffer.
                        byteBuf.release();
                        byteBuf = null;
                        close = allocHandle.lastBytesRead() < 0;
                        if (close) {
                            // There is nothing left to read as we received an EOF.
                            readPending = false;
                        }
                        break;
                    }

                    allocHandle.incMessagesRead(1);
                    readPending = false;
                    pipeline.fireChannelRead(byteBuf);
                    byteBuf = null;
                } while (allocHandle.continueReading());

                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();

                if (close) {
                    closeOnRead(pipeline);
                }
            } catch (Throwable t) {
                handleReadException(pipeline, byteBuf, t, close, allocHandle);
            } finally {
                // Check if there is a readPending which was not processed yet.
                // This could be for two reasons:
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
                // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
                //
                // See https://github.com/netty/netty/issues/2254
                if (!readPending && !config.isAutoRead()) {
                    removeReadOp();
                }
            }
        }

#判斷的依據(jù)就是autoread
public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
            return config.isAutoRead() &&
                   maybeMoreDataSupplier.get() &&
                   totalMessages < maxMessagePerRead &&
                   totalBytesRead > 0;
        }

業(yè)務(wù)控制方式:
作為發(fā)送方,發(fā)送前應(yīng)該檢查iswritable 如果返回false,一種方式是延遲發(fā)送可以用隊列或者timeout去實現(xiàn)

作為接收方,如果業(yè)務(wù)處理速度不足以匹配接收速度,結(jié)合隊列控制隊列滿時將autoread設(shè)置為false,不滿時將其打開,但是應(yīng)該注意不滿的條件觸發(fā),不應(yīng)使得autoread頻繁開關(guān),畢竟這也是耗費系統(tǒng)資源的

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容