Netty 源碼剖析之 unSafe.read 方法

目錄:

  1. NioSocketChannel$NioSocketChannelUnsafe 的 read 方法
  2. 首先看 ByteBufAllocator
  3. 再看 RecvByteBufAllocator.Handle
  4. 兩者如何配合進(jìn)行內(nèi)存分配
  5. 如何讀取到 ByteBuf
  6. 總結(jié)

前言

在之前的文章 Netty 核心組件 Pipeline 源碼分析(二)一個(gè)請(qǐng)求的 pipeline 之旅中,我們知道了當(dāng)客戶端請(qǐng)求進(jìn)來的時(shí)候,boss 線程會(huì)將 Socket 包裝后交給 worker 線程,worker 線程會(huì)將這個(gè) Socket 注冊(cè) selector 的讀事件,當(dāng)讀事件進(jìn)來的時(shí)候,會(huì)調(diào)用 unsafe 的 read 方法,這個(gè)方法的主要作用是讀取 Socket 緩沖區(qū)的內(nèi)存,并包裝成 Netty 的 ByteBuf 對(duì)象,最后傳遞進(jìn) pipeline 中的所有節(jié)點(diǎn)完成處理。

今天,我們就要好好的看看這個(gè) read方法的實(shí)現(xiàn)。

1. NioSocketChannel$NioSocketChannelUnsafe 的 read 方法

源碼如下:

public final void read() {
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    // 用來處理內(nèi)存的分配:池化或者非池化 UnpooledByteBufAllocator
    final ByteBufAllocator allocator = config.getAllocator();
    // 用來計(jì)算此次讀循環(huán)應(yīng)該分配多少內(nèi)存 AdaptiveRecvByteBufAllocator 自適應(yīng)計(jì)算緩沖分配
    final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
    allocHandle.reset(config);// 重置為0

    ByteBuf byteBuf = null;
    boolean close = false;
    try {
        do {
            byteBuf = allocHandle.allocate(allocator);
            allocHandle.lastBytesRead(doReadBytes(byteBuf));
            if (allocHandle.lastBytesRead() <= 0) {// 如果上一次讀到的字節(jié)數(shù)小于等于0,清理引用和跳出循環(huán)
                // nothing was read. release the buffer.
                byteBuf.release();// 引用 -1
                byteBuf = null;
                close = allocHandle.lastBytesRead() < 0;// 如果遠(yuǎn)程已經(jīng)關(guān)閉連接
                if (close) {
                    // There is nothing left to read as we received an EOF.
                    readPending = false;
                }
                break;
            }

            allocHandle.incMessagesRead(1);//  totalMessages += amt;
            readPending = false;
            pipeline.fireChannelRead(byteBuf);
            byteBuf = null;
        } while (allocHandle.continueReading());

        allocHandle.readComplete();
        pipeline.fireChannelReadComplete();

        if (close) {
            closeOnRead(pipeline);
        }
    } catch (Throwable t) {
        handleReadException(pipeline, byteBuf, t, close, allocHandle);
    } finally {
        if (!readPending && !config.isAutoRead()) {
            removeReadOp();
        }
    }
}

代碼很長(zhǎng),怎么辦呢?當(dāng)然是拆解,然后逐個(gè)擊破。步驟如下:

  1. 獲取到 Channel 的 config 對(duì)象,并從該對(duì)象中獲取內(nèi)存分配器,還有"計(jì)算內(nèi)存分配器"。
  2. 計(jì)算內(nèi)存分配器 重置。
  3. 進(jìn)入一個(gè)循環(huán),循環(huán)體的作用是:使用內(nèi)存分配器獲取數(shù)據(jù)容器-----ByteBuf,調(diào)用 doReadBytes 方法將數(shù)據(jù)讀取到容器中,如果這次讀取什么都沒有或遠(yuǎn)程連接關(guān)閉,則跳出循環(huán)。還有,如果滿足了跳出推薦,也要結(jié)束循環(huán),不能無限循環(huán),默認(rèn)16 次,默認(rèn)參數(shù)來自 AbstractNioByteChannel 的 屬性 ChannelMetadata 類型的 METADATA 實(shí)例。每讀取一次就調(diào)用 pipeline 的 channelRead 方法,為什么呢?因?yàn)橛捎?TCP 傳輸如果包過大的話,丟失的風(fēng)險(xiǎn)會(huì)更大,導(dǎo)致重傳,所以,大的數(shù)據(jù)流會(huì)分成多次傳輸。而 channelRead 方法也會(huì)被調(diào)用多次,因此,使用 channelRead 方法的時(shí)候需要注意,如果數(shù)據(jù)量大,最好將數(shù)據(jù)放入到緩存中,讀取完畢后,再進(jìn)行處理。
  4. 跳出循環(huán)后,調(diào)用 allocHandle 的 readComplete 方法,表示讀取已完成,并記錄讀取記錄,用于下次分配合理內(nèi)存。
  5. 調(diào)用 pipeline 的方法。

接下來就一步步看。

2. 首先看 ByteBufAllocator

首先看看這個(gè)節(jié)點(diǎn)的定義:

Implementations are responsible to allocate buffers. Implementations of this interface are expected to be hread-safe.
實(shí)現(xiàn)負(fù)責(zé)分配緩沖區(qū)。這個(gè)接口的實(shí)現(xiàn)應(yīng)該是線程安全的。

定義了如上方法

通過這個(gè)接口,可以看出來,主要作用是創(chuàng)建 ByteBuf,這個(gè) ByteBuf 是 Netty 用來替代 NIO 的 ByteBuffer 的,是存儲(chǔ)數(shù)據(jù)的緩存區(qū)。其中,這個(gè)接口有一個(gè)默認(rèn)實(shí)現(xiàn) ByteBufUtil.DEFAULT_ALLOCATOR :該實(shí)現(xiàn)根據(jù)配置創(chuàng)建一個(gè) 池化或非池化的緩存區(qū)分配器。該參數(shù)是 io.netty.allocator.type。

同時(shí),由于很多方法都是重載的,那就說說上面的主要方法作用:

buffer() // 返回一個(gè) ByteBuf 對(duì)象,默認(rèn)直接內(nèi)存。如果平臺(tái)不支持,返回堆內(nèi)存。
heapBuffer()// 返回堆內(nèi)存緩存區(qū)
directBuffer()// 返回直接內(nèi)存緩沖區(qū)
compositeBuffer() // 返回一個(gè)復(fù)合緩沖區(qū)??赡芡瑫r(shí)包含堆內(nèi)存和直接內(nèi)存。
ioBuffer() // 當(dāng)當(dāng)支持 Unsafe 時(shí),返回直接內(nèi)存的 Bytebuf,否則返回返回基于堆內(nèi)存,當(dāng)使用 PreferHeapByteBufAllocator 時(shí)返回堆內(nèi)存

3. 再看 RecvByteBufAllocator.Handle

首先看這個(gè)接口:

上圖中, Handle 是 RecvByteBufAllocator 的內(nèi)部接口。而 RecvByteBufAllocator 是如何定義的呢?

Creates a new handle. The handle provides the actual operations and keeps the internal information which is required for predicting an optimal buffer capacity.
創(chuàng)建一個(gè)新的句柄。句柄提供了實(shí)際操作,并保留了用于預(yù)測(cè)最佳緩沖區(qū)容量所需的內(nèi)部信息。

該接口只定義了一個(gè)方法:newHandle()。

而 handle 的作用是什么呢?


ByteBuf allocate(ByteBufAllocator alloc);//創(chuàng)建一個(gè)新的接收緩沖區(qū),其容量可能大到足以讀取所有入站數(shù)據(jù)和小到數(shù)據(jù)足夠不浪費(fèi)它的空間。
int guess();// 猜測(cè)所需的緩沖區(qū)大小,不進(jìn)行實(shí)際的分配
void reset(ChannelConfig config);// 每次開始讀循環(huán)之前,重置相關(guān)屬性
void incMessagesRead(int numMessages);// 增加本地讀循環(huán)的次數(shù)
void lastBytesRead(int bytes); // 設(shè)置最后一次讀到的字節(jié)數(shù)
int lastBytesRead(); // 最后一次讀到的字節(jié)數(shù)
void attemptedBytesRead(int bytes); // 設(shè)置讀操作嘗試讀取的字節(jié)數(shù)
void attemptedBytesRead(); // 獲取嘗試讀取的字節(jié)數(shù)
boolean continueReading(); // 判斷是否需要繼續(xù)讀
void readComplete(); // 讀結(jié)束后調(diào)用

從上面的方法中,可以看出,該接口的主要作用就是計(jì)算字節(jié)數(shù),如同 RecvByteBufAllocator 的文檔說的那樣,根據(jù)預(yù)測(cè)和計(jì)算最佳大小的緩存區(qū),確保不浪費(fèi)。

4. 兩者如何配合進(jìn)行內(nèi)存分配

在默認(rèn)的 config (NioSocketChannelConfig)中,allocator 來自 ByteBufAllocator 接口的默認(rèn)實(shí)例,allocHandle 來自 AdaptiveRecvByteBufAllocator 自適應(yīng)循環(huán)緩存分配器 的內(nèi)部類 HandleImpl。

好,知道了他們的默認(rèn)實(shí)現(xiàn),我們一個(gè)方法看看。

首先看 reset 方法:

public void reset(ChannelConfig config) {
    this.config = config;
    maxMessagePerRead = maxMessagesPerRead();
    totalMessages = totalBytesRead = 0;
}

設(shè)置了上次獲取的最大消息讀取次數(shù)(默認(rèn)16),將之前計(jì)算的讀取消息總數(shù)歸零。該方法如同他的名字,歸零重置。

再看看 allocHandle.allocate(allocator) 方法的實(shí)現(xiàn)。

public ByteBuf allocate(ByteBufAllocator alloc) {
    return alloc.ioBuffer(guess());
}

我們剛剛說的 ioBuffer 方法,該方法默認(rèn)返回直接內(nèi)存緩沖區(qū)。而 guess() 方法返回一個(gè)猜測(cè)的大小,一個(gè) nextReceiveBufferSize 屬性,默認(rèn) 1024,也就是說,默認(rèn)創(chuàng)建一個(gè) 1024 大小的直接內(nèi)存緩沖區(qū)。這個(gè)值的設(shè)定來自 HandleImpl 的構(gòu)造方法,存儲(chǔ)在一個(gè) SIZE_TABLE 的數(shù)組中。

我們還是看看 RecvByteBufAllocator 的實(shí)現(xiàn)類 AdaptiveRecvByteBufAllocator 的具體內(nèi)容吧

static final int DEFAULT_MINIMUM = 64; // 緩存區(qū)最小值
static final int DEFAULT_INITIAL = 1024; // 緩沖區(qū)初始值
static final int DEFAULT_MAXIMUM = 65536; // 緩沖區(qū)最大值

private static final int INDEX_INCREMENT = 4;// 當(dāng)發(fā)現(xiàn)緩存過小,數(shù)組下標(biāo)自增值
private static final int INDEX_DECREMENT = 1;// 當(dāng)發(fā)現(xiàn)緩沖區(qū)過大,數(shù)組下標(biāo)自減值

private static final int[] SIZE_TABLE;

static {
    List<Integer> sizeTable = new ArrayList<Integer>();
    for (int i = 16; i < 512; i += 16) {
        sizeTable.add(i);
    }

    for (int i = 512; i > 0; i <<= 1) {
        sizeTable.add(i);
    }

    SIZE_TABLE = new int[sizeTable.size()];
    for (int i = 0; i < SIZE_TABLE.length; i ++) {
        SIZE_TABLE[i] = sizeTable.get(i);
    }
}

樓主在上面的代碼中寫了注釋,這個(gè) SIZE_TABLE 的作用是存儲(chǔ)緩存區(qū)大小的一個(gè) int 數(shù)組,從 static 塊中可以看到,這個(gè)數(shù)組從16開始,同時(shí)遞增16,直到值到了 512,也就是下標(biāo) 31 的地方,遞增策略變?yōu)榱?每次 * 2,直到溢出。最終的數(shù)組長(zhǎng)度為 53。而對(duì)應(yīng)的值接近 int 最大值。

好,回到 allocate 方法中,進(jìn)入到 ioBuffer 方法查看:

public ByteBuf ioBuffer(int initialCapacity) {
    if (PlatformDependent.hasUnsafe()) {
        return directBuffer(initialCapacity);
    }
    return heapBuffer(initialCapacity);
}

判斷,如果平臺(tái)支持 unSafe,就使用直接內(nèi)存,否則使用堆內(nèi)存,初始大小就是我們剛剛說的 1024。而這個(gè)判斷的標(biāo)準(zhǔn)是:如果嘗試獲取 Unsafe 的時(shí)候有異常了,則賦值給一個(gè) UNSAFE_UNAVAILABILITY_CAUSE 對(duì)象,否則賦值為 null,Netty 通過這個(gè)判 Null 確認(rèn)平臺(tái)是否支持 Unsafe。

我們繼續(xù)看看 directBuffer 方法的實(shí)現(xiàn):


public ByteBuf directBuffer(int initialCapacity) {
    return directBuffer(initialCapacity, DEFAULT_MAX_CAPACITY);
}
// 
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
    if (initialCapacity == 0 && maxCapacity == 0) {
        return emptyBuf;
    }
    validate(initialCapacity, maxCapacity);
    return newDirectBuffer(initialCapacity, maxCapacity);
}
// 
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    final ByteBuf buf;
    if (PlatformDependent.hasUnsafe()) {
        buf = noCleaner ? new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) :
                new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
    } else {
        buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
    }
    return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
}

由于方法層層遞進(jìn),樓主將代碼合在一起,最終調(diào)用的是 newDirectBuffer,根據(jù) noCleaner 參數(shù)決定創(chuàng)建一個(gè) ByteBuf,這個(gè)屬性怎么來的呢?當(dāng) unsafe 不是 null 的時(shí)候,會(huì)嘗試獲取 DirectByteBuffer 的構(gòu)造器,如果成功獲取,則 noCleaner 屬性為 true。

這個(gè) noCleaner 屬性的詳細(xì)介紹請(qǐng)看這里Netty 內(nèi)存回收之 noCleaner 策略.

默認(rèn)情況下就是 true,那么,也就是創(chuàng)建了一個(gè) InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf 對(duì)象,該對(duì)象構(gòu)造如下:

@1
InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(
        UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
    super(alloc, initialCapacity, maxCapacity);
}

@2
UnpooledUnsafeNoCleanerDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
    super(alloc, initialCapacity, maxCapacity);
}

@3
public UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
    super(maxCapacity);
    if (alloc == null) {
        throw new NullPointerException("alloc");
    }
    if (initialCapacity < 0) {
        throw new IllegalArgumentException("initialCapacity: " + initialCapacity);
    }
    if (maxCapacity < 0) {
        throw new IllegalArgumentException("maxCapacity: " + maxCapacity);
    }
    if (initialCapacity > maxCapacity) {
        throw new IllegalArgumentException(String.format(
                "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
    }

    this.alloc = alloc;
    setByteBuffer(allocateDirect(initialCapacity), false);
}

@4
static ByteBuffer newDirectBuffer(long address, int capacity) {
    ObjectUtil.checkPositiveOrZero(capacity, "capacity");
    return (ByteBuffer) DIRECT_BUFFER_CONSTRUCTOR.newInstance(address, capacity);
}

最終使用了 DirectByteBuffer 的構(gòu)造器進(jìn)行反射創(chuàng)建。而這個(gè)構(gòu)造器是沒有默認(rèn)的 new 創(chuàng)建的 Cleaner 對(duì)象的。因此稱為 noCleaner。

創(chuàng)建完畢后,調(diào)用 setByteBuffer ,將這個(gè) DirectByteBuffer 包裝一下。

回到 newDirectBuffer 方法。

最后根據(jù) disableLeakDetector 屬性判斷釋放進(jìn)行自動(dòng)內(nèi)存回收(也就是當(dāng)你忘記回收的時(shí)候,幫你回收),原理這里簡(jiǎn)單的說一下,使用虛引用進(jìn)行跟蹤。 FastThreadLocal 的內(nèi)存回收類似。我們將在以后的文章中詳細(xì)說明此策略。

到這里,創(chuàng)建 ByteBuf 的過程就結(jié)束了。

可以說,大部分工作都是 allocator 做的,allocHandle 的作用就是提供了如何分配一個(gè)合理的內(nèi)存的策略。

5. 如何讀取到 ByteBuf

回到 read 方法,doReadBytes(byteBuf) 就是將 Channel 的內(nèi)容讀取到容器中,并返回一個(gè)讀取到的字節(jié)數(shù)。

代碼如下:

protected int doReadBytes(ByteBuf byteBuf) throws Exception {
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.attemptedBytesRead(byteBuf.writableBytes());
    return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());
}

獲取到 內(nèi)存預(yù)估器,設(shè)置一個(gè) attemptedBytesRead 屬性為 ByteBuf 的可寫字節(jié)數(shù)。這個(gè)參數(shù)可用于后面分配內(nèi)存時(shí)的一些考量。

然后調(diào)用 byteBuf.writeBytes()方法。傳入了 NIO 的 channel,還有剛剛的可寫字節(jié)數(shù)。進(jìn)入到該方法查看:

@1
public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
    ensureWritable(length);
    int writtenBytes = setBytes(writerIndex, in, length);
    if (writtenBytes > 0) {
        writerIndex += writtenBytes;
    }
    return writtenBytes;
}

首先對(duì)長(zhǎng)度進(jìn)行校驗(yàn),確??蓪戦L(zhǎng)度大于0,如果被并發(fā)了導(dǎo)致容量不夠,將這個(gè)底層的 ByteBuffer 的容量增加傳入的長(zhǎng)度。

關(guān)于 ByteBuf 的 wirteIndex ,如下圖:

回到 writeBytes 方法,調(diào)用 setBytes 方法,將流中輸入寫入到緩沖區(qū)。方法如下:

public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
    ensureAccessible();
    ByteBuffer tmpBuf = internalNioBuffer();
    tmpBuf.clear().position(index).limit(index + length);
    try {
        return in.read(tmpBuf);
    } catch (ClosedChannelException ignored) {
        return -1;
    }
}

非常熟悉的 NIO 操作。

首先獲取到內(nèi)部 ByteBuffer 的共享緩沖區(qū),賦值給臨時(shí)的 tmpNioBuf 屬性。然后返回這個(gè)引用。將這個(gè)引用清空,并將指針移動(dòng)到給定 index 為止,然后 limit 方法設(shè)置緩存區(qū)大小。

最后調(diào)用 Channel 的 read 方法,將Channel 數(shù)據(jù)讀入到 ByteBuffer 中。讀的過程時(shí)線程安全的,內(nèi)部使用了 synchronized 關(guān)鍵字控制寫入 buffer 的過程。返回了讀到的字節(jié)數(shù)。

回到 writeBytes 方法,得到字節(jié)數(shù)之后,將這個(gè)字節(jié)數(shù)追加到 writerIndex 屬性,表示可寫字節(jié)變小了。

回到 read 方法。allocHandle 得到讀取到的字節(jié)數(shù),調(diào)用 lastBytesRead 方法,該方法的作用時(shí)調(diào)整下一次分配內(nèi)存的大小。進(jìn)入到該方法查看:

public void lastBytesRead(int bytes) {
    // If we read as much as we asked for we should check if we need to ramp up the size of our next guess.
    // This helps adjust more quickly when large amounts of data is pending and can avoid going back to
    // the selector to check for more data. Going back to the selector can add significant latency for large
    // data transfers.
    if (bytes == attemptedBytesRead()) {
        record(bytes);
    }
    super.lastBytesRead(bytes);
}

Netty 寫了注釋:

如果我們讀的內(nèi)容和我們要求的一樣多,我們應(yīng)該檢查一下是否需要增加下一個(gè)猜測(cè)的大小。
這有助于在等待大量數(shù)據(jù)時(shí)更快地進(jìn)行調(diào)整,并且可以避免返回選擇器以檢查更多數(shù)據(jù)。回到選擇器可以為大型數(shù)據(jù)傳輸添加顯著的延遲。

當(dāng)獲取的字節(jié)數(shù)和預(yù)估的一樣大,則需要進(jìn)行擴(kuò)容??纯?record 方法實(shí)現(xiàn):

private void record(int actualReadBytes) {
    if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT - 1)]) {
        if (decreaseNow) {
            index = max(index - INDEX_DECREMENT, minIndex);
            nextReceiveBufferSize = SIZE_TABLE[index];
            decreaseNow = false;
        } else {
            decreaseNow = true;
        }
    } else if (actualReadBytes >= nextReceiveBufferSize) {
        index = min(index + INDEX_INCREMENT, maxIndex);
        nextReceiveBufferSize = SIZE_TABLE[index];
        decreaseNow = false;
    }
}

如果實(shí)際讀取到的字節(jié)數(shù)小于等于預(yù)估的字節(jié) 下標(biāo) - 2(排除2以下),則將容量縮小一個(gè)下標(biāo)。如果實(shí)際讀取到的字節(jié)數(shù)大于等于預(yù)估的。則將下標(biāo)增加 4,下次創(chuàng)建的 Buffer 容量也相應(yīng)增加。如果不滿足這兩個(gè)條件,什么都不做。

回答 lastBytesRead 方法,該方法記錄了讀取到的總字節(jié)數(shù)并且更新了最后一次的讀取字節(jié)數(shù)。總字節(jié)數(shù)會(huì)用來判斷是否可以結(jié)束讀取循環(huán)。如果什么都沒有讀到,將最多持續(xù)到讀 16(默認(rèn)) 次。

回到 read 方法。

如果最后一次讀取到字節(jié)數(shù)小于等于0,跳出循環(huán),不做 channelRead 操作。
反之,將 totalMessages 加1,這個(gè)就是用來記錄循環(huán)次數(shù),判斷不能超過 16次。

調(diào)用 fireChannelRead 方法,方法結(jié)束后,將這個(gè) Buffer 的引用置為null,

判斷是否需要繼續(xù)讀取,帶入如下:

public boolean continueReading(UncheckedBooleanSupplier maybeMoreDataSupplier) {
    return config.isAutoRead() &&
           (!respectMaybeMoreData || maybeMoreDataSupplier.get()) &&
           totalMessages < maxMessagePerRead &&
           totalBytesRead > 0;
}

幾個(gè)條件:

  1. 首先是否自動(dòng)讀取。
  2. 且猜測(cè)是否還有更多數(shù)據(jù),如果實(shí)際讀取的和預(yù)估的一致,說明可能還有數(shù)據(jù)沒讀,需要再次循環(huán)。
  3. 如果讀取次數(shù)為達(dá)到 16 次,繼續(xù)讀取。
  4. 如果讀取到的總數(shù)大于0,說明有數(shù)據(jù),繼續(xù)讀取。

這里的循環(huán)的主要原因就像我們剛剛說的,TCP 傳輸過大數(shù)據(jù)容易丟包(帶寬限制),因此會(huì)將大包分好幾次傳輸,還有就是可能預(yù)估的緩沖區(qū)不夠大,沒有充分讀取 Channel 的內(nèi)容。

6. 總結(jié)

NioSocketChannel$NioSocketChannelUnsafe 的實(shí)現(xiàn)看 read 方法。每個(gè) ByteBuf 都會(huì)由一個(gè) Config 實(shí)例中的 ByteBufAllocator 對(duì)象創(chuàng)建,池化或非池化,直接內(nèi)存或堆內(nèi)存,這些都根據(jù)系統(tǒng)是否支持或參數(shù)設(shè)置,底層使用的是 NIO 的 API。今天我們看的是非池化的直接內(nèi)存。同時(shí),為了節(jié)省內(nèi)存,為每個(gè) ByteBufAllocator 配置了一個(gè) handle,用于計(jì)算和預(yù)估緩沖區(qū)大小。

還有一個(gè)需要注意的地方就是 noCleaner 策略。這是 Netty 的一個(gè)優(yōu)化。針對(duì)默認(rèn)的直接內(nèi)存創(chuàng)建和銷毀做了優(yōu)化--------不使用 JDK 的 cleaner 策略。

最終讀取數(shù)據(jù)到封裝了 NIO ByteBuffer 實(shí)例的 Netty 的 ByteBuf 中,其中,如果數(shù)據(jù)量超過 1024,則會(huì)讀取超過兩次,但最多不超過 16 次, 這個(gè)次數(shù)可以設(shè)置,也就是說,可能會(huì)調(diào)用超過2次 fireChannelRead 方法,使用的時(shí)候需要注意(存起來一起在 ChannelReadComplete 使用之類的方法)。

好,關(guān)于 Netty 讀取 Socket 數(shù)據(jù)到容器中的邏輯,就到這里。

good luck?。。?/p>

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 本文是Netty文集中“Netty in action”系列的文章。主要是對(duì)Norman Maurer and M...
    tomas家的小撥浪鼓閱讀 3,587評(píng)論 0 7
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,502評(píng)論 19 139
  • 前言 netty源碼分析之pipeline(一)中,我們已經(jīng)了解了pipeline在netty中所處的角色,像是一...
    簡(jiǎn)書閃電俠閱讀 16,271評(píng)論 15 35
  • 法語(yǔ),你在等待一個(gè)孩子嗎?=你懷孕了嗎?世界上最美的語(yǔ)言,法語(yǔ),原來不單指它的發(fā)音,還體現(xiàn)在他們字面語(yǔ)言上的浪漫,
    Anita2018閱讀 76評(píng)論 0 0
  • 今天用了三個(gè)多小時(shí)填了喜閱教師公益行動(dòng)問卷表,復(fù)制鏈接了幾次,不知道成功沒有?7-12題是自己用訊飛語(yǔ)記口...
    笨南瓜閱讀 273評(píng)論 0 0

友情鏈接更多精彩內(nèi)容