網(wǎng)絡(luò)編程-Netty-writeAndFlush方法原理分析 以及 close以后是否還能寫入數(shù)據(jù)?

前言

在上一講網(wǎng)絡(luò)編程-關(guān)閉連接(2)-Java的NIO在關(guān)閉socket時(shí),究竟用了哪個(gè)系統(tǒng)調(diào)用函數(shù)?中,我們做了個(gè)實(shí)驗(yàn),研究了java nio的close函數(shù)究竟調(diào)用了哪個(gè)系統(tǒng)調(diào)用,答案是close,但在真實(shí)的測試代碼中,其實(shí)我犯了一個(gè)小錯(cuò)誤,在close之后并沒有return,所以在測試close之后,還做了writeAndFlush操作發(fā)送了一條數(shù)據(jù),并且執(zhí)行過程并沒有報(bào)錯(cuò)。這件事讓我關(guān)注起了close和之后的writeAndFlush之間的關(guān)系。為什么在close之后”看起來“還可以繼續(xù)寫入呢?

原始代碼如下:

@Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //寫入本地文件測試字符,然后關(guān)閉channel
        FileWriter fileWriter = new FileWriter("/root/test.txt");
        fileWriter.write("test test hold on");
        fileWriter.flush();
        fileWriter.close();

        //調(diào)用同步方法關(guān)閉
        ChannelFuture sync = ctx.channel().close().sync();
        if(sync.isSuccess()){
            System.out.println("關(guān)閉成功!");
        }else{
            System.out.println("關(guān)閉失?。?);
        }

        //這里開始,是誤執(zhí)行的語句
        this.ctx = ctx;
        //發(fā)送心跳指令
        if (count.intValue() > 150) {
            count.set(1);
        }
        Command0C04 command0C04 = new Command0C04(count.intValue());
        byte[] encode = command0C04.encode();
        logger.info("心跳指令:" + HexStringUtils.toHexString(encode));
        ctx.channel().writeAndFlush(encode).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                System.out.println("success:"+channelFuture.isSuccess());
                System.out.println("cancelled:"+channelFuture.isCancelled());
                System.out.println("done:"+channelFuture.isDone());
                System.out.println("isCancellable:"+channelFuture.isCancellable());
            }
        });
        count.getAndIncrement();
}

我們知道,close系統(tǒng)調(diào)用會(huì)關(guān)閉讀和寫兩個(gè)方向的操作,那么writeAndFlush在close之后具體是如何執(zhí)行的?netty是怎么確保不會(huì)寫入到發(fā)送緩沖區(qū)中呢?

想研究清楚這個(gè)問題,需要先看writeAndFlush操作做了什么,涉及到什么底層的數(shù)據(jù)結(jié)構(gòu)。

writeAndFlush原理

簡言之,writeAndFlush,在底層會(huì)做兩個(gè)操作

  • write操作
  • flush操作

首先分析write操作。

write操作

netty底層會(huì)維護(hù)一個(gè)重要的數(shù)據(jù)結(jié)構(gòu),ChannelOutboundBuffer,這是一個(gè)單向鏈表。我們調(diào)用寫的方法其實(shí)會(huì)把數(shù)據(jù)先緩存到這個(gè)數(shù)據(jù)結(jié)構(gòu)中,等調(diào)用flush之后,就會(huì)真正的把數(shù)據(jù)寫入到發(fā)送緩沖區(qū)當(dāng)中。

ChannelOutBoundBuffer中有以下幾個(gè)重要的指針:


1357217-6c645e2c5d0876d4.png
  • Entry代表了我們發(fā)送的數(shù)據(jù)
  • flushedEntry代表需要寫入到發(fā)送緩沖區(qū)的第一個(gè)Entry
  • unflushedEntry代表第一個(gè)等待寫入發(fā)送緩沖區(qū)的Entry

當(dāng)?shù)谝淮握{(diào)用addMessage方法往ChannelOutBoundBuffer中添加數(shù)據(jù)時(shí)

1357217-2fd7d3a45b0de35d.png

第二次調(diào)用addMessage方法時(shí),數(shù)據(jù)指針如下


1357217-01d5fe86bd63437f.png

如果不調(diào)用Flush,那么flushedEntry指針一直為null,數(shù)據(jù)會(huì)一直寫入到后面的鏈表中。

Flush操作

當(dāng)調(diào)用Flush操作后,指針情況如圖:


1357217-24c29eedd72aeb23.png

之后的代碼,就是遍歷這段節(jié)點(diǎn)數(shù)據(jù),寫入到發(fā)送緩沖區(qū)中,并且寫入后釋放節(jié)點(diǎn)內(nèi)存。

判斷緩沖區(qū)是否可寫(小知識)

在實(shí)際flush之前,netty調(diào)用isFlushPending判斷,這個(gè)channel是否注冊了可寫事件,如果有可寫事件就等會(huì)再發(fā)送。如果沒有,就會(huì)調(diào)用父類的flush0方法直接寫。

  • 注:如果到達(dá)發(fā)送緩沖區(qū)的水位線了,發(fā)送緩沖區(qū)本身就不可寫了,這個(gè)時(shí)候會(huì)(XX會(huì))注冊一個(gè)可寫事件到selector中,netty就是使用這個(gè)可寫判斷是否可以真正的發(fā)送。

protected final void flush0() {
    if (!isFlushPending()) {
        super.flush0();
    }
}


private boolean isFlushPending() {
    SelectionKey selectionKey = selectionKey();
    return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
}

OOM?

如果接收端消費(fèi)速度很慢,接收緩沖區(qū)滿了以后,會(huì)導(dǎo)致發(fā)送緩沖區(qū)無法繼續(xù)發(fā)送數(shù)據(jù),在一直發(fā)送數(shù)據(jù)的前提下,ChannelOutboundBuffer會(huì)一直上漲,可能會(huì)引起OOM問題。

Netty官方提供了兩個(gè)ChannelOutBoundBuffer配置參數(shù)、一個(gè)Channel屬性和一個(gè)用戶回調(diào)方法來幫助我們識別和解決這件事。

兩個(gè)ChannelOutBoundBuffer配置參數(shù):

  • Channel.config().setWriteBufferHighWaterMark:高水位,默認(rèn)64 kb

  • Channel.config().setWriteBufferLowWaterMark :低水位:默認(rèn)32 kb

一個(gè)Channel屬性:isWritable

一個(gè)用戶回調(diào)方法:fireChannelWritabilityChanged

內(nèi)部邏輯如下:

  • 當(dāng)本次需要添加到ChannelOutBoundBuffer的數(shù)據(jù)量超過了高水位,會(huì)改變isWritable對應(yīng)的屬性值從0變?yōu)?,并且觸發(fā)一個(gè)ChannelWritabilityChanged事件。
  • 當(dāng)flush或者remove后,如果數(shù)據(jù)恢復(fù)到最低水位下了,會(huì)改變isWritable對應(yīng)的屬性值從1變?yōu)?,并且觸發(fā)一個(gè)ChannelWritabilityChanged事件。

用戶可以通過屬性和回調(diào)方法來檢查是否可寫,做相關(guān)的業(yè)務(wù)處理。

writeAndFlush總結(jié)

在調(diào)用寫入方法后,netty并不會(huì)直接把數(shù)據(jù)寫入到發(fā)送緩沖區(qū)中,而是存儲(chǔ)在了ChannelOutboundBuffer中,等到調(diào)用flush操作后,再把數(shù)據(jù)真正寫入Socket的發(fā)送緩沖區(qū)中。

close以后是否還能寫入數(shù)據(jù)?

跟蹤close源碼,最后會(huì)跟蹤到io.netty.channel.AbstractChannel 的內(nèi)部類 AbstractUnsafe中的close方法,方法代碼如下(部分代碼省略,只保留這個(gè)問題相關(guān)的核心代碼):

private void close(final ChannelPromise promise, final Throwable cause,
                           final ClosedChannelException closeCause, final boolean notify) {

            final boolean wasActive = isActive();
            final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.
        }

可以看到,這里有一句this.outboundBuffer = null; 相當(dāng)于把上文分析的ChannelOutboundBuffer置空。

結(jié)合同在AbstractUnsafe中的write代碼中的這一部分來看(同樣省略了非問題關(guān)注的代碼)

 @Override
        public final void write(Object msg, ChannelPromise promise) {
            assertEventLoop();

            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                // If the outboundBuffer is null we know the channel was closed and so
                // need to fail the future right away. If it is not null the handling of the rest
                // will be done in flush0()
                // See https://github.com/netty/netty/issues/2362
                safeSetFailure(promise, newWriteException(initialCloseCause));
                // release message now to prevent resource-leak
                ReferenceCountUtil.release(msg);
                return;
            }
}

在write之前,會(huì)做判斷,如果如果ChannelOutboundBuffer為空為空,那么釋放內(nèi)存,不發(fā)送數(shù)據(jù)并返回。

總結(jié)

首先我們了解了,在發(fā)送過程中比較重要的數(shù)據(jù)結(jié)構(gòu)ChannelOutboundBuffer,然后我們了解了在close的時(shí)候,會(huì)把如果ChannelOutboundBuffer置空,并且在write的時(shí)候,會(huì)判斷該buffer是否為空,為空則不發(fā)送,并設(shè)置失敗,到此我們的問題就研究明白了。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容