Netty源碼七 ByteBuf

內(nèi)存分配概述

介紹netty內(nèi)存分配,最為底層,負(fù)責(zé)從底層讀據(jù)到ByteBuf。

三個(gè)問題
+內(nèi)存類別有哪些
+如何減少多線程內(nèi)存分配之間的競爭
+不同大小的內(nèi)存是如何進(jìn)行分配的

主要內(nèi)容:

  1. 內(nèi)存與內(nèi)存管理器的抽象
  2. 不同規(guī)格大小和不同類型的內(nèi)存的分配策略
  3. 內(nèi)存回收過程

ByteBuf結(jié)構(gòu)以及重要API

  • ByteBuf結(jié)構(gòu)
    ByteBuf內(nèi)存結(jié)構(gòu)
 *      +-------------------+------------------+------------------+
 *      | discardable bytes |  readable bytes  |  writable bytes  |
 *      |                   |     (CONTENT)    |                  |
 *      +-------------------+------------------+------------------+
 *      |                   |                  |                  |
 *      0      <=      readerIndex   <=   writerIndex    <=    capacity

+read、write、set方法
調(diào)用read方法時(shí),會移動readerIndex指針;
調(diào)用write方法時(shí),會移動writerIndex指針;
set方法不移動任何指針。

  • mark和reset方法
    mark方法的作用時(shí)保存指針。
    reset方法復(fù)原指針位置。

  • readbleBytes和writableBytes、maxWritableBytes
    見名思義。

ByteBuf分類

ByteBuf類圖


ByteBuf類圖.PNG
  • Pooled和Unpooled
    Pooled從預(yù)先分配好的內(nèi)存中分配;Unpooled直接調(diào)用系統(tǒng)api進(jìn)行內(nèi)存分配。
  • Unsafe和非Unsafe
    Unsafe:調(diào)用jdk的Unsafe拿到ByteBuf的內(nèi)存地址。
    非Unsafe:不依賴底層unsafe。
  • Heap和Direct
    Heap:直接在堆上內(nèi)存分配,分配的內(nèi)存參與gc,分配的內(nèi)存不需要手動釋放,底層是byte[]數(shù)組。
    Direct:分配的內(nèi)存不受 的控制,分配的內(nèi)存不參與gc,分配的內(nèi)存需要手動釋放,調(diào)用jdk的ByteBuffer.allocateDirect(initialCapacity)進(jìn)行內(nèi)存分配。

內(nèi)存分配器ByteBufAllocator

  • ByteBufAllocator功能
    最頂層抽象ByteBufAllocator,功能:重載的buffer方法,重載的ioBuffer(更希望分配directBuffer),heapBuffer在堆上進(jìn)行內(nèi)存分配,directBuffer直接內(nèi)存分配,compositeBuffer可以把兩個(gè)byteBuffer合并在一起。

  • AbstractByteBufAllocator
    實(shí)現(xiàn)了ByteBufAllocator的大部分功能,留下了兩個(gè)抽象接口newHeapBuffer,newDirectBuffer進(jìn)行擴(kuò)展,從而區(qū)分heap和direct內(nèi)存。

  • ByteBufAllocator兩大子類

ByteBufAllocator分類:


ByteBufferAllocator.PNG

ByteBufAllocator的兩大子類PooledByteBufAllocator和UnpooledByteBufAllocator,這里是通過子類區(qū)分Pooled和Unpooled。
那么Unsafe和非Unsafe是如何區(qū)分的呢?netty是自動判別的,如果底層有unsafe
對象netty就直接通過Unsafe來分配內(nèi)存。

UnpooledByteBufAllocator分析

  • heap內(nèi)存分配邏輯
  • direct內(nèi)存分配邏輯

unsafe會通過內(nèi)存地址+偏移量的方式去拿到對應(yīng)的數(shù)據(jù);而非unsafe是通過數(shù)組+下標(biāo)或者jdk底層的ByteBuffer的api拿數(shù)據(jù)。一般情況下通過unsafe操作內(nèi)存比非unsafe的方式效率要高。

PooledByteBufAllocator概述

首先它下面分了兩類內(nèi)存,newHeapBuffer和newDirectBuffer,這兩類內(nèi)存的分配過程大致相同,我們來分析newDirectBuffer。

    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
        PoolThreadCache cache = threadCache.get();
        PoolArena<ByteBuffer> directArena = cache.directArena;

        final ByteBuf buf;
        if (directArena != null) {
            buf = directArena.allocate(cache, initialCapacity, maxCapacity);
        } else {
            buf = PlatformDependent.hasUnsafe() ?
                    UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                    new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }

        return toLeakAwareBuffer(buf);
    }
  1. 拿到線程局部緩存PoolThreadCache
    因?yàn)閚ewDirectBuffer可能被多線程調(diào)用
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
        private final boolean useCacheForAllThreads;

        PoolThreadLocalCache(boolean useCacheForAllThreads) {
            this.useCacheForAllThreads = useCacheForAllThreads;
        }

        @Override
        protected synchronized PoolThreadCache initialValue() {
            // 拿到 heapArena 和 directArena ;然后創(chuàng)建PoolThreadCache
            final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
            final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);

            Thread current = Thread.currentThread();
            if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
                return new PoolThreadCache(
                        heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
                        DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
            }
            // No caching so just use 0 as sizes.
            return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
        }

        @Override
        protected void onRemoval(PoolThreadCache threadCache) {
            threadCache.free();
        }

        private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
            if (arenas == null || arenas.length == 0) {
                return null;
            }

            PoolArena<T> minArena = arenas[0];
            for (int i = 1; i < arenas.length; i++) {
                PoolArena<T> arena = arenas[i];
                if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
                    minArena = arena;
                }
            }

            return minArena;
        }
    }

FastThreadLocal實(shí)際上是一個(gè)更快的ThreadLocal,從這里看出每個(gè)線程都有一個(gè)PoolThreadCache 。

  1. 在線程局部緩存的Arena上進(jìn)行內(nèi)存分配
    線程局部緩存維護(hù)著兩大內(nèi)存,一個(gè)是堆相關(guān)的內(nèi)存,一個(gè)是堆外相關(guān)的內(nèi)存。我們拿堆外內(nèi)存相關(guān)的邏輯進(jìn)行分析。
    heapArena和directArena是在創(chuàng)建PoolThreadCache的時(shí)候傳遞進(jìn)來的,見上面initialValue代碼。
    在創(chuàng)建內(nèi)存構(gòu)造器PooledByteBufAllocator的時(shí)候會創(chuàng)建兩大內(nèi)存heapArena和directArena,我們來看構(gòu)造函數(shù)。
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
                                  int tinyCacheSize, int smallCacheSize, int normalCacheSize,
                                  boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
        super(preferDirect);
        threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
        this.tinyCacheSize = tinyCacheSize;
        this.smallCacheSize = smallCacheSize;
        this.normalCacheSize = normalCacheSize;
        chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);

        checkPositiveOrZero(nHeapArena, "nHeapArena");
        checkPositiveOrZero(nDirectArena, "nDirectArena");

        checkPositiveOrZero(directMemoryCacheAlignment, "directMemoryCacheAlignment");
        if (directMemoryCacheAlignment > 0 && !isDirectMemoryCacheAlignmentSupported()) {
            throw new IllegalArgumentException("directMemoryCacheAlignment is not supported");
        }

        if ((directMemoryCacheAlignment & -directMemoryCacheAlignment) != directMemoryCacheAlignment) {
            throw new IllegalArgumentException("directMemoryCacheAlignment: "
                    + directMemoryCacheAlignment + " (expected: power of two)");
        }

        int pageShifts = validateAndCalculatePageShifts(pageSize);

        if (nHeapArena > 0) {
            //heapArena初始化
            heapArenas = newArenaArray(nHeapArena);
            List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
            for (int i = 0; i < heapArenas.length; i ++) {
                PoolArena.HeapArena arena = new PoolArena.HeapArena(this,
                        pageSize, maxOrder, pageShifts, chunkSize,
                        directMemoryCacheAlignment);
                heapArenas[i] = arena;
                metrics.add(arena);
            }
            heapArenaMetrics = Collections.unmodifiableList(metrics);
        } else {
            heapArenas = null;
            heapArenaMetrics = Collections.emptyList();
        }

        if (nDirectArena > 0) {
            //directArena初始化
            directArenas = newArenaArray(nDirectArena);
            List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
            for (int i = 0; i < directArenas.length; i ++) {
                PoolArena.DirectArena arena = new PoolArena.DirectArena(
                        this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment);
                directArenas[i] = arena;
                metrics.add(arena);
            }
            directArenaMetrics = Collections.unmodifiableList(metrics);
        } else {
            directArenas = null;
            directArenaMetrics = Collections.emptyList();
        }
        metric = new PooledByteBufAllocatorMetric(this);
    }

heapArena 初始化heapArenas = newArenaArray(nHeapArena);;directArena初始化directArenas = newArenaArray(nDirectArena);
我們來看上述構(gòu)造函數(shù)的nHeapArena和nDirectArena從哪里來的,往上跟代碼:

    public PooledByteBufAllocator(boolean preferDirect) {
        this(preferDirect, DEFAULT_NUM_HEAP_ARENA, DEFAULT_NUM_DIRECT_ARENA, DEFAULT_PAGE_SIZE, DEFAULT_MAX_ORDER);
    }

看DEFAULT_NUM_DIRECT_ARENA怎么來的。默認(rèn)情況下defaultMinNumArena是小于runtime.maxMemory() / defaultChunkSize / 2 / 3)的,所以 DEFAULT_NUM_DIRECT_ARENA默認(rèn)情況下是兩倍的cpu核數(shù)。DEFAULT_NUM_HEAP_ARENA也是同理。為什么要創(chuàng)建兩倍的cpu核心數(shù)的Arena?因?yàn)樵谇懊鎰?chuàng)建NIO線程的時(shí)候也是默認(rèn)兩倍的cpu核心數(shù),也就是說每個(gè)線程都有一個(gè)獨(dú)享的Arena,對arena數(shù)組中的每個(gè)Arena它其實(shí)在分配線程的時(shí)候是不用加鎖的。

/*
         * We use 2 * available processors by default to reduce contention as we use 2 * available processors for the
         * number of EventLoops in NIO and EPOLL as well. If we choose a smaller number we will run into hot spots as
         * allocation and de-allocation needs to be synchronized on the PoolArena.
         *
         * See https://github.com/netty/netty/issues/3888.
         */
        // 兩倍的cpu核心數(shù)
        final int defaultMinNumArena = NettyRuntime.availableProcessors() * 2;
        final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER;
        DEFAULT_NUM_HEAP_ARENA = Math.max(0,
                SystemPropertyUtil.getInt(
                        "io.netty.allocator.numHeapArenas",
                        (int) Math.min(
                                defaultMinNumArena,
                                runtime.maxMemory() / defaultChunkSize / 2 / 3)));

我們來看下PooledByteBufAllocator內(nèi)存分配器(假設(shè)有四個(gè)NIO線程)的結(jié)構(gòu)示意圖:


PooledByteBufAllocator結(jié)構(gòu)圖.png

圖中有四個(gè)NIO線程,通過我們前面的代碼分析我們知道分別有4個(gè)heapArena和4個(gè)directArena,邏輯基本上是相同的,我們在圖中統(tǒng)稱為Arena。
PooledByteBufAllocator在分配ByteBuf時(shí)候是怎么做的呢?首先通過PoolThreadCache拿到對應(yīng)的Arena對象;PooledThreadCache的作用通過ThreadLocal把內(nèi)存分配器中其中的一個(gè)Arena塞到它的成員變量里邊,然后當(dāng)每個(gè)NIO線程去調(diào)用它的get方法的時(shí)候,會拿到它底層的一個(gè)Arena,這樣就可以把線程和Arena進(jìn)行一個(gè)綁定。PooledByteBufAllocator除了可以在Arena上進(jìn)行分配內(nèi)存還可以在它底層維護(hù)的ByteBuf緩存列表上分配內(nèi)存。
舉個(gè)例子,當(dāng)我第一次分配了1024個(gè)字節(jié)的內(nèi)存大小使用完了之后,需要第二次分配1024字節(jié)的內(nèi)存。這個(gè)時(shí)候其實(shí)不需要在Arena上進(jìn)行內(nèi)存分配,而是通過PoolThreadCache里邊維護(hù)的一個(gè)緩存列表中取出返回即可。

PooledByteBufAllocator里邊維護(hù)了三個(gè)類型的ByteBuf緩存的大小,tinyCacheSize,smallCaheSize,normalCacheSize,在PoolThreadCache初始化的時(shí)候使用到了這三個(gè)值,

PoolThreadCache的構(gòu)造函數(shù)

   PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
                    int tinyCacheSize, int smallCacheSize, int normalCacheSize,
                    int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
        checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
        this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
        this.heapArena = heapArena;
        this.directArena = directArena;
        if (directArena != null) {
            // 創(chuàng)建緩存對象
            tinySubPageDirectCaches = createSubPageCaches(
                    tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
            smallSubPageDirectCaches = createSubPageCaches(
                    smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);

            numShiftsNormalDirect = log2(directArena.pageSize);
            normalDirectCaches = createNormalCaches(
                    normalCacheSize, maxCachedBufferCapacity, directArena);

            directArena.numThreadCaches.getAndIncrement();
        } else {
            // No directArea is configured so just null out all caches
            tinySubPageDirectCaches = null;
            smallSubPageDirectCaches = null;
            normalDirectCaches = null;
            numShiftsNormalDirect = -1;
        }
        if (heapArena != null) {
            // Create the caches for the heap allocations
            tinySubPageHeapCaches = createSubPageCaches(
                    tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
            smallSubPageHeapCaches = createSubPageCaches(
                    smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);

            numShiftsNormalHeap = log2(heapArena.pageSize);
            normalHeapCaches = createNormalCaches(
                    normalCacheSize, maxCachedBufferCapacity, heapArena);

            heapArena.numThreadCaches.getAndIncrement();
        } else {
            // No heapArea is configured so just null out all caches
            tinySubPageHeapCaches = null;
            smallSubPageHeapCaches = null;
            normalHeapCaches = null;
            numShiftsNormalHeap = -1;
        }

        // Only check if there are caches in use.
        if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null
                || tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null)
                && freeSweepAllocationThreshold < 1) {
            throw new IllegalArgumentException("freeSweepAllocationThreshold: "
                    + freeSweepAllocationThreshold + " (expected: > 0)");
        }
    }

創(chuàng)建緩存對象的方法:

    private static <T> MemoryRegionCache<T>[] createSubPageCaches(
            int cacheSize, int numCaches, SizeClass sizeClass) {
        if (cacheSize > 0 && numCaches > 0) {
            @SuppressWarnings("unchecked")
            MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
            for (int i = 0; i < cache.length; i++) {
                // TODO: maybe use cacheSize / cache.length
                // 創(chuàng)建cache
                cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
            }
            return cache;
        } else {
            return null;
        }
    }

創(chuàng)建緩存對象中的每個(gè)元素:

    private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
        SubPageMemoryRegionCache(int size, SizeClass sizeClass) {
            super(size, sizeClass);
        }

        @Override
        protected void initBuf(
                PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity) {
            chunk.initBufWithSubpage(buf, nioBuffer, handle, reqCapacity);
        }
    }

        MemoryRegionCache(int size, SizeClass sizeClass) {
            // size緩存的內(nèi)存規(guī)格
            this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
            // queue這種內(nèi)存規(guī)格的緩存最終有多少個(gè)
            queue = PlatformDependent.newFixedMpscQueue(this.size);
            this.sizeClass = sizeClass;
        }

7. directArena分配direct內(nèi)存的流程

  • 從對象池里拿到PooledByteBuf進(jìn)行復(fù)用
    @Override
    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
        PoolThreadCache cache = threadCache.get();
        PoolArena<ByteBuffer> directArena = cache.directArena;

        final ByteBuf buf;
        if (directArena != null) {
            buf = directArena.allocate(cache, initialCapacity, maxCapacity);
        } else {
            buf = PlatformDependent.hasUnsafe() ?
                    UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                    new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }

        return toLeakAwareBuffer(buf);
    }

看 directArena.allocate

    PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
        // 創(chuàng)建PooledByteBuf
        PooledByteBuf<T> buf = newByteBuf(maxCapacity);
        // 從cache中為PooledByteBuf分配內(nèi)存
        allocate(cache, buf, reqCapacity);
        return buf;
    }

我們來看newByteBuf,DirectArena中的實(shí)現(xiàn)分配對外內(nèi)存

        @Override
        protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
            if (HAS_UNSAFE) { // 默認(rèn)采用unsafe方式
                return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
            } else {
                return PooledDirectByteBuf.newInstance(maxCapacity);
            }
        }

看newInstance:

    static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
        // 從可回收的對象池中拿到ByteBuf,對象池中沒有就直接創(chuàng)建一個(gè)
        PooledUnsafeDirectByteBuf buf = RECYCLER.get();
        buf.reuse(maxCapacity); // 進(jìn)行復(fù)用,設(shè)置capacity,引用次數(shù),readerIndex,writerIndex,重置標(biāo)志位
        return buf; // 拿到純凈的ByteBuf對象
    }

我們看RECYCLE:

    private static final Recycler<PooledUnsafeDirectByteBuf> RECYCLER = new Recycler<PooledUnsafeDirectByteBuf>() {
        @Override
        protected PooledUnsafeDirectByteBuf newObject(Handle<PooledUnsafeDirectByteBuf> handle) { 
            return new PooledUnsafeDirectByteBuf(handle, 0); // RECYCLER 沒有就創(chuàng)建一個(gè)ByteBuf,handle負(fù)責(zé)ByteBuf對象的回收
        }
    };

內(nèi)存分配第一步拿到了ByteBuf,接下來從PoolThreadCache上進(jìn)行內(nèi)存分配。

    private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
        final int normCapacity = normalizeCapacity(reqCapacity);
        if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
            int tableIdx;
            PoolSubpage<T>[] table;
            boolean tiny = isTiny(normCapacity);
            if (tiny) { // < 512
                if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = tinyIdx(normCapacity);
                table = tinySubpagePools;
            } else {
                if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = smallIdx(normCapacity);
                table = smallSubpagePools;
            }

            // 以上嘗試在緩存上進(jìn)行內(nèi)存分配,如果沒有成功,會進(jìn)行實(shí)際內(nèi)存分配
            final PoolSubpage<T> head = table[tableIdx];

            /**
             * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
             * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
             */
            synchronized (head) {
                final PoolSubpage<T> s = head.next;
                if (s != head) {
                    assert s.doNotDestroy && s.elemSize == normCapacity;
                    long handle = s.allocate();
                    assert handle >= 0;
                    s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
                    incTinySmallAllocation(tiny);
                    return;
                }
            }
            synchronized (this) {
                allocateNormal(buf, reqCapacity, normCapacity);
            }

            incTinySmallAllocation(tiny);
            return;
        }
        if (normCapacity <= chunkSize) { // 這里是個(gè)特例,如果分配的內(nèi)存大于chunkSize就分配一個(gè)allocateHuge
            if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            synchronized (this) {
                allocateNormal(buf, reqCapacity, normCapacity);
                ++allocationsNormal;
            }
        } else {
            // allocateHuge不是從緩存上分配的
            // Huge allocations are never served via the cache so just call allocateHuge
            allocateHuge(buf, reqCapacity);
        }
    }

allocate實(shí)際上包含兩大步驟,第一步先從緩存上進(jìn)行內(nèi)存分配,第二步從內(nèi)存堆里面進(jìn)行內(nèi)存分配。

  • allocate 從緩存上進(jìn)行內(nèi)存分配
  • 從內(nèi)存堆里面進(jìn)行內(nèi)存分配

8. 內(nèi)存規(guī)格介紹

Netty內(nèi)存規(guī)格示意圖.png

內(nèi)存臨界值:0,512B,8K,16M
tiny: 0-512B
small: 512B-8K
normal:8k-16M
huge:>16M
為什么把16M作為一個(gè)內(nèi)存分界點(diǎn)?16M對應(yīng)的一個(gè)chunk,所有的內(nèi)存申請是以chunk為單位到操作系統(tǒng)進(jìn)行申請的;然后所有ByteBuf的內(nèi)存分配,都是在chunk里邊進(jìn)行操作;比如要分配一個(gè)1M的內(nèi)存,我要先從操作系統(tǒng)中申請一個(gè)16M的chunk,然后從16M里取一段內(nèi)存當(dāng)作1M,然后把這1M對應(yīng)的連續(xù)內(nèi)存分配給ByteBuf。

為什么會有一個(gè)8k的內(nèi)存臨界點(diǎn)?netty里面把8k當(dāng)作一個(gè)page進(jìn)行內(nèi)存分配的。從系統(tǒng)申請到了16M的內(nèi)存,這是比較大的。這時(shí)候netty對16M的內(nèi)存進(jìn)行切分,切分的方式就是以Page進(jìn)行切分。也就是一個(gè)chunk切分成了2048個(gè)page,分配16k內(nèi)存時(shí)只需要取2個(gè)page。

0-8k的內(nèi)存對象在netty中叫做subPage。如果申請一個(gè)10B的內(nèi)存還是以page進(jìn)行分配內(nèi)存,這樣就會很浪費(fèi),這時(shí)候就能看到了subPage的作用。

9. 命中緩存的分配流程

看allocate方法:

    private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
        final int normCapacity = normalizeCapacity(reqCapacity);
        if (isTinyOrSmall(normCapacity)) { // capacity < pageSize,pageSize是8k
            int tableIdx;
            PoolSubpage<T>[] table;
            boolean tiny = isTiny(normCapacity);
            if (tiny) { // < 512
                if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = tinyIdx(normCapacity);
                table = tinySubpagePools;
            } else {
                if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = smallIdx(normCapacity);
                table = smallSubpagePools;
            }

            final PoolSubpage<T> head = table[tableIdx];

            /**
             * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
             * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
             */
            synchronized (head) {
                final PoolSubpage<T> s = head.next;
                if (s != head) {
                    assert s.doNotDestroy && s.elemSize == normCapacity;
                    long handle = s.allocate();
                    assert handle >= 0;
                    s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
                    incTinySmallAllocation(tiny);
                    return;
                }
            }
            synchronized (this) {
                allocateNormal(buf, reqCapacity, normCapacity);
            }

            incTinySmallAllocation(tiny);
            return;
        }
        if (normCapacity <= chunkSize) {
            if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            synchronized (this) {
                allocateNormal(buf, reqCapacity, normCapacity);
                ++allocationsNormal;
            }
        } else {
            // Huge allocations are never served via the cache so just call allocateHuge
            allocateHuge(buf, reqCapacity);
        }
    }
  1. 首先進(jìn)行內(nèi)存的規(guī)格化;
    int normalizeCapacity(int reqCapacity) {
        checkPositiveOrZero(reqCapacity, "reqCapacity");

        if (reqCapacity >= chunkSize) { // >16M直接返回
            return directMemoryCacheAlignment == 0 ? reqCapacity : alignCapacity(reqCapacity);
        }

        if (!isTiny(reqCapacity)) { // >= 512
            // Doubled

            int normalizedCapacity = reqCapacity;
            normalizedCapacity --;
            normalizedCapacity |= normalizedCapacity >>>  1;
            normalizedCapacity |= normalizedCapacity >>>  2;
            normalizedCapacity |= normalizedCapacity >>>  4;
            normalizedCapacity |= normalizedCapacity >>>  8;
            normalizedCapacity |= normalizedCapacity >>> 16;
            normalizedCapacity ++;

            if (normalizedCapacity < 0) {
                normalizedCapacity >>>= 1;
            }
            assert directMemoryCacheAlignment == 0 || (normalizedCapacity & directMemoryCacheAlignmentMask) == 0;

            return normalizedCapacity;
        }

        if (directMemoryCacheAlignment > 0) {
            return alignCapacity(reqCapacity);
        }

        // Quantum-spaced
        if ((reqCapacity & 15) == 0) {
            return reqCapacity;
        }

        return (reqCapacity & ~15) + 16;
    }

分配緩存的大致步驟:

  • 找到對應(yīng)size的MemoryRegionCache;
  • 從queue中彈出一個(gè)entry給ByteBuf初始化
    entry里面有chunk,代表一段連續(xù)的內(nèi)存,chunk分配一段連續(xù)內(nèi)存給ByteBuf,ByteBuf就可以對這段內(nèi)存進(jìn)行數(shù)據(jù)讀寫。
  • 將彈出的entry丟到對象池中進(jìn)行復(fù)用
    netty為了盡量對分配的內(nèi)存進(jìn)行復(fù)用,是通過RECYCLE進(jìn)行管理內(nèi)存的。減少gc,減少對象池重復(fù)的創(chuàng)建和銷毀。
    cache.allocateTiny(this, buf, reqCapacity, normCapacity)為例,來說明上述三個(gè)步驟。
  1. 找到對應(yīng)size的MemoryRegionCache
    /**
     * Try to allocate a tiny buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
     */
    boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
        return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
    }

    private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
        int idx = PoolArena.tinyIdx(normCapacity);  // 計(jì)算數(shù)組下標(biāo),根據(jù)下標(biāo)去取MemoryRegionCache
        if (area.isDirect()) {
            return cache(tinySubPageDirectCaches, idx);
        }
        return cache(tinySubPageHeapCaches, idx);
    }

  // 容量除以16就能求出下標(biāo)
    static int tinyIdx(int normCapacity) {
        return normCapacity >>> 4;
    }
  1. 從queue中彈出一個(gè)entry給ByteBuf初始化
    @SuppressWarnings({ "unchecked", "rawtypes" })
    private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
        if (cache == null) {
            // no cache found so just return false here
            return false;
        }
        boolean allocated = cache.allocate(buf, reqCapacity);
        if (++ allocations >= freeSweepAllocationThreshold) {
            allocations = 0;
            trim();
        }
        return allocated;
    }

        public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
            Entry<T> entry = queue.poll();
            if (entry == null) {
                return false;
            }
          // 初始化
            initBuf(entry.chunk, entry.nioBuffer, entry.handle, buf, reqCapacity);
            entry.recycle();

            // allocations is not thread-safe which is fine as this is only called from the same thread all time.
            ++ allocations;
            return true;
        }

    private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
        SubPageMemoryRegionCache(int size, SizeClass sizeClass) {
            super(size, sizeClass);
        }

        @Override
        protected void initBuf( // subPage初始化
                PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity) {
            chunk.initBufWithSubpage(buf, nioBuffer, handle, reqCapacity);
        }
    }

    void initBufWithSubpage(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity) {
        initBufWithSubpage(buf, nioBuffer, handle, bitmapIdx(handle), reqCapacity);
    }

    private void initBufWithSubpage(PooledByteBuf<T> buf, ByteBuffer nioBuffer,
                                    long handle, int bitmapIdx, int reqCapacity) {
        assert bitmapIdx != 0;

        int memoryMapIdx = memoryMapIdx(handle);

        PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
        assert subpage.doNotDestroy;
        assert reqCapacity <= subpage.elemSize;

        buf.init(
            this, nioBuffer, handle,
            runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize + offset,
                reqCapacity, subpage.elemSize, arena.parent.threadCache());
    }

    void init(PoolChunk<T> chunk, ByteBuffer nioBuffer,
              long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
        init0(chunk, nioBuffer, handle, offset, length, maxLength, cache);
    }

    private void init0(PoolChunk<T> chunk, ByteBuffer nioBuffer,
                       long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
        assert handle >= 0;
        assert chunk != null;

        this.chunk = chunk; // 向系統(tǒng)申請的內(nèi)存塊
        memory = chunk.memory;
        tmpNioBuf = nioBuffer;
        allocator = chunk.arena.parent;
        this.cache = cache;
        this.handle = handle; // 指向內(nèi)存塊
        this.offset = offset;
        this.length = length;
        this.maxLength = maxLength;
    }
  1. 將彈出的entry丟到對象池中進(jìn)行復(fù)用
    entry.recycle();
        static final class Entry<T> {
            final Handle<Entry<?>> recyclerHandle;
            PoolChunk<T> chunk;
            ByteBuffer nioBuffer;
            long handle = -1;

            Entry(Handle<Entry<?>> recyclerHandle) {
                this.recyclerHandle = recyclerHandle;
            }

            void recycle() {  // 參數(shù)初始化設(shè)置
                chunk = null;
                nioBuffer = null;
                handle = -1;
                recyclerHandle.recycle(this);
            }
        }

        @Override
        public void recycle(Object object) {
            if (object != value) {
                throw new IllegalArgumentException("object does not belong to handle");
            }

            Stack<?> stack = this.stack;
            if (lastRecycledId != recycleId || stack == null) {
                throw new IllegalStateException("recycled already");
            }

            stack.push(this); // 壓入堆棧中
        }

10. 命中緩存的分配邏輯

  • netty中緩存相關(guān)的數(shù)據(jù)結(jié)構(gòu)
    MemeoryRegionCache數(shù)據(jù)結(jié)構(gòu).png

Netty中緩存相關(guān)的數(shù)據(jù)結(jié)構(gòu)叫做MemoryRegionCache,它有三部分組成:第一部分是queue,第二部分是sizeClass,第三部分是size。

首先queue中的每個(gè)元素都是一個(gè)實(shí)體,每個(gè)實(shí)體中都有一個(gè)chunk一個(gè)handler。netty中的內(nèi)存都是以chunk為單位進(jìn)行分配的,handler都唯一指向一段連續(xù)的內(nèi)存;所以chunk和handler合在一起就可以確定一塊內(nèi)存的大小及其位置,所有的實(shí)體組合起來就變成了cache的一個(gè)鏈。從緩存中找對應(yīng)的鏈,就可以定位到queue中的一個(gè)實(shí)體。

sizeClass是netty的內(nèi)存規(guī)格,huge內(nèi)存規(guī)格是直接分配的,所以MemoryRegionCache中沒有。

size是一小塊內(nèi)存的大小。

一個(gè)MemoryRegionCahe中,每個(gè)小塊的內(nèi)存大小是固定的。如果某個(gè)MemoryRegionCache中緩存了一個(gè)1k的內(nèi)存塊,那么這個(gè)MemoryRegionCache中queue緩存的都是1k大小的ByteBuf。內(nèi)存大小的種類,如果內(nèi)存規(guī)格是tiny的,它的內(nèi)存大小種類16B的整數(shù)倍且不大于512B,別的內(nèi)存規(guī)格可從圖示直接看出。

    private abstract static class MemoryRegionCache<T> {
        private final int size;
        private final Queue<Entry<T>> queue;
        private final SizeClass sizeClass;
        private int allocations;

        MemoryRegionCache(int size, SizeClass sizeClass) {
            this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
            queue = PlatformDependent.newFixedMpscQueue(this.size);
            this.sizeClass = sizeClass;
        }
      // ...
}

MemoryRegionCahe 在PoolThreadCache中維護(hù)。

final class PoolThreadCache {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);

    final PoolArena<byte[]> heapArena;
    final PoolArena<ByteBuffer> directArena;

    // Hold the caches for the different size classes, which are tiny, small and normal.
    private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
    private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
    private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
    private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
    private final MemoryRegionCache<byte[]>[] normalHeapCaches;
    private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
    
// ...
}

創(chuàng)建MemoryRegionCache:

    PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
                    int tinyCacheSize, int smallCacheSize, int normalCacheSize,
                    int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
        checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
        this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
        this.heapArena = heapArena;
        this.directArena = directArena;
        if (directArena != null) {
             // 創(chuàng)建tiny[32]數(shù)組
            tinySubPageDirectCaches = createSubPageCaches(
                    tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
            smallSubPageDirectCaches = createSubPageCaches(
                    smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);

            numShiftsNormalDirect = log2(directArena.pageSize);
            normalDirectCaches = createNormalCaches(
                    normalCacheSize, maxCachedBufferCapacity, directArena);

            directArena.numThreadCaches.getAndIncrement();
        } else {
            // No directArea is configured so just null out all caches
            tinySubPageDirectCaches = null;
            smallSubPageDirectCaches = null;
            normalDirectCaches = null;
            numShiftsNormalDirect = -1;
        }
        if (heapArena != null) {
            // Create the caches for the heap allocations
            tinySubPageHeapCaches = createSubPageCaches(
                    tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
            smallSubPageHeapCaches = createSubPageCaches(
                    smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);

            numShiftsNormalHeap = log2(heapArena.pageSize);
            normalHeapCaches = createNormalCaches(
                    normalCacheSize, maxCachedBufferCapacity, heapArena);

            heapArena.numThreadCaches.getAndIncrement();
        } else {
            // No heapArea is configured so just null out all caches
            tinySubPageHeapCaches = null;
            smallSubPageHeapCaches = null;
            normalHeapCaches = null;
            numShiftsNormalHeap = -1;
        }

        // Only check if there are caches in use.
        if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null
                || tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null)
                && freeSweepAllocationThreshold < 1) {
            throw new IllegalArgumentException("freeSweepAllocationThreshold: "
                    + freeSweepAllocationThreshold + " (expected: > 0)");
        }
    }

來看創(chuàng)建tiny[32]

 tinySubPageDirectCaches = createSubPageCaches(
                    tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);

tinyCacheSize是在內(nèi)存分配器中維護(hù)的,默認(rèn)512;PoolArena.numTinySubpagePools,默認(rèn)512 >>> 4,512右移4位,相當(dāng)于512除以16,也就是32。
創(chuàng)建數(shù)組,數(shù)組長度就是32:

    private static <T> MemoryRegionCache<T>[] createSubPageCaches(
            int cacheSize, int numCaches, SizeClass sizeClass) {
        if (cacheSize > 0 && numCaches > 0) {
            @SuppressWarnings("unchecked")
            MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
            for (int i = 0; i < cache.length; i++) {
                // TODO: maybe use cacheSize / cache.length
                // cacheSize,這里是512
                cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
            }
            return cache;
        } else {
            return null;
        }
    }

繼續(xù)跟進(jìn):

    private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
        SubPageMemoryRegionCache(int size, SizeClass sizeClass) {
            super(size, sizeClass);
        }

        @Override
        protected void initBuf(
                PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity) {
            chunk.initBufWithSubpage(buf, nioBuffer, handle, reqCapacity);
        }
    }

        MemoryRegionCache(int size, SizeClass sizeClass) {
            this.size = MathUtil.safeFindNextPositivePowerOfTwo(size); // 這里還是512
            queue = PlatformDependent.newFixedMpscQueue(this.size);
            this.sizeClass = sizeClass;
        }

分析到這里可以看到tinySubPageDirectCaches(MemoryRegionCache)最外層有32個(gè)節(jié)點(diǎn)(SubPageMemoryRegionCache),每個(gè)節(jié)點(diǎn)表示不同內(nèi)存規(guī)格(16B,32B,...,496B)的一個(gè)隊(duì)列,每個(gè)隊(duì)列的長度默認(rèn)是512個(gè)。

MemoryRegionCache.png

11. arena、chunk、page、subpage

  • Arena結(jié)構(gòu)
    Arena結(jié)構(gòu).png

最外層是chunckList的數(shù)據(jù)結(jié)構(gòu),每個(gè)chunkList通過雙向鏈表進(jìn)行連接,每個(gè)節(jié)點(diǎn)都是一個(gè)chunk,每個(gè)chunk是向操作系統(tǒng)申請內(nèi)存的最小單位16M。chunkList為什么通過雙向鏈表連接起來呢,netty會實(shí)時(shí)計(jì)算chunk的實(shí)時(shí)分配情況,按照內(nèi)存使用率歸為不同的chunkList,這樣進(jìn)行內(nèi)存分配時(shí),netty會根據(jù)一定的算法定位到合適的chunkList,然后取其中的一個(gè)chunk進(jìn)行內(nèi)存分配

abstract class PoolArena<T> implements PoolArenaMetric {
    static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe();

    enum SizeClass {
        Tiny,
        Small,
        Normal
    }

    static final int numTinySubpagePools = 512 >>> 4;

    final PooledByteBufAllocator parent;

    private final int maxOrder;
    final int pageSize;
    final int pageShifts;
    final int chunkSize;
    final int subpageOverflowMask;
    final int numSmallSubpagePools;
    final int directMemoryCacheAlignment;
    final int directMemoryCacheAlignmentMask;
    private final PoolSubpage<T>[] tinySubpagePools;
    private final PoolSubpage<T>[] smallSubpagePools;

    private final PoolChunkList<T> q050;
    private final PoolChunkList<T> q025;
    private final PoolChunkList<T> q000;
    private final PoolChunkList<T> qInit;
    private final PoolChunkList<T> q075;
    private final PoolChunkList<T> q100;
// ...
}

protected PoolArena(PooledByteBufAllocator parent, int pageSize,
          int maxOrder, int pageShifts, int chunkSize, int cacheAlignment) {
        this.parent = parent;
        this.pageSize = pageSize;
        this.maxOrder = maxOrder;
        this.pageShifts = pageShifts;
        this.chunkSize = chunkSize;
        directMemoryCacheAlignment = cacheAlignment;
        directMemoryCacheAlignmentMask = cacheAlignment - 1;
        subpageOverflowMask = ~(pageSize - 1);
        tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);
        for (int i = 0; i < tinySubpagePools.length; i ++) {
            tinySubpagePools[i] = newSubpagePoolHead(pageSize);
        }

        numSmallSubpagePools = pageShifts - 9;
        smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
        for (int i = 0; i < smallSubpagePools.length; i ++) {
            smallSubpagePools[i] = newSubpagePoolHead(pageSize);
        }

        // 初始化chunkList
        q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, chunkSize);
        q075 = new PoolChunkList<T>(this, q100, 75, 100, chunkSize);
        q050 = new PoolChunkList<T>(this, q075, 50, 100, chunkSize);
        q025 = new PoolChunkList<T>(this, q050, 25, 75, chunkSize);
        q000 = new PoolChunkList<T>(this, q025, 1, 50, chunkSize);
        qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, chunkSize);

        q100.prevList(q075);
        q075.prevList(q050);
        q050.prevList(q025);
        q025.prevList(q000);
        q000.prevList(null);
        qInit.prevList(qInit);

        List<PoolChunkListMetric> metrics = new ArrayList<PoolChunkListMetric>(6);
        metrics.add(qInit);
        metrics.add(q000);
        metrics.add(q025);
        metrics.add(q050);
        metrics.add(q075);
        metrics.add(q100);
        chunkListMetrics = Collections.unmodifiableList(metrics);
    }
  • Chunk的結(jié)構(gòu)

    Chunk結(jié)構(gòu).png

    chunk將里邊的內(nèi)存按8k拆分成了page,每個(gè)page又拆分為了4個(gè)subPage。

12. page級別的內(nèi)存分配:allocateNormal()

我們看PoolArena中allocateNormal代碼片段:

        if (normCapacity <= chunkSize) {
            if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) { // 從cache中分配
                // was able to allocate out of the cache so move on
                return;
            }
            synchronized (this) {
                allocateNormal(buf, reqCapacity, normCapacity); // 從arena中分配
                ++allocationsNormal;
            }
        } 

從arena中allocateNormal:

    private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
        if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
            q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
            q075.allocate(buf, reqCapacity, normCapacity)) {
            return;
        }

        // Add a new chunk. 創(chuàng)建一個(gè)chunk進(jìn)行內(nèi)存分配
        PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
        boolean success = c.allocate(buf, reqCapacity, normCapacity);
        assert success;
        qInit.add(c);
    }
  • 嘗試從現(xiàn)有的chunkList分配內(nèi)存
    +創(chuàng)建一個(gè)chunk進(jìn)行內(nèi)存分配
    +初始化ByteBuf

13. subpage級別的內(nèi)存分配:allocateTiny()

  • 定位一個(gè)Subpage對象
  • 初始化subpage
  • 初始化PooledByteBuf

通過代碼來調(diào)試:

public class TinyAllocate {
    public static void main(String[] args) {
        PooledByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;
        allocator.directBuffer(16);
    }
}

來看allocate的部分代碼片段:

            boolean tiny = isTiny(normCapacity);
            if (tiny) { // < 512
                if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = tinyIdx(normCapacity);
                table = tinySubpagePools;
            } else {
                if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = smallIdx(normCapacity);
                table = smallSubpagePools;
            }
            final PoolSubpage<T> head = table[tableIdx];

            /**
             * Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
             * {@link PoolChunk#free(long)} may modify the doubly linked list as well.
             */
            synchronized (head) {
                final PoolSubpage<T> s = head.next;
                if (s != head) { // 默認(rèn)情況下,頭節(jié)點(diǎn)是沒有任何subpage相關(guān)的信息
                    assert s.doNotDestroy && s.elemSize == normCapacity;
                    long handle = s.allocate();
                    assert handle >= 0;
                    s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
                    incTinySmallAllocation(tiny);
                    return;
                }
            }
            synchronized (this) {
                allocateNormal(buf, reqCapacity, normCapacity);
            }

            incTinySmallAllocation(tiny);
            return;

我們來看tinySubpagePools的結(jié)構(gòu),默認(rèn)情況下是和MemoryRegionCache的tiny結(jié)構(gòu)是一樣的。
tiny[32] 0 -> 16B -> 32B -> 48B -> ... 480B

    private long allocateSubpage(int normCapacity) {
        // Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
        // This is need as we may add it back and so alter the linked-list structure.
        PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
        int d = maxOrder; // subpages are only be allocated from pages i.e., leaves
        synchronized (head) {
            int id = allocateNode(d);
            if (id < 0) {
                return id;
            }

            final PoolSubpage<T>[] subpages = this.subpages;
            final int pageSize = this.pageSize;

            freeBytes -= pageSize;

            int subpageIdx = subpageIdx(id);
            PoolSubpage<T> subpage = subpages[subpageIdx];
            if (subpage == null) {
                subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
                subpages[subpageIdx] = subpage;
            } else {
                subpage.init(head, normCapacity);
            }
            return subpage.allocate();
        }
    }

14. ByteBuf的釋放

  • 連續(xù)的內(nèi)存區(qū)段加到緩存
  • 標(biāo)記連續(xù)的內(nèi)存區(qū)段為未使用
  • ByteBuf加到對象池
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容