netty源碼分析(28)- PooledByteBufAllocator分析

上一節(jié)分析了UnpooledByteBufAllocator,包括了堆內(nèi)堆外內(nèi)存是如何分配的,底層時時如何獲取數(shù)據(jù)內(nèi)容的。
本節(jié)分析分析PooledByteBufAllocator,看看它是怎么做Pooled類型的內(nèi)存管理的。

  • 入口PooledByteBufAllocator#newHeapBuffer()PooledByteBufAllocator#newDirectBuffer()
    堆內(nèi)內(nèi)存和堆外內(nèi)存分配的模式都比較固定
  1. 拿到線程局部緩存PoolThreadCache
  2. 拿到不同類型的rena
  3. 使用不同類型的arena進(jìn)行內(nèi)存分配
    @Override
    protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
        //拿到線程局部緩存
        PoolThreadCache cache = threadCache.get();
        //拿到heapArena
        PoolArena<byte[]> heapArena = cache.heapArena;

        final ByteBuf buf;
        if (heapArena != null) {
            //使用heapArena分配內(nèi)存
            buf = heapArena.allocate(cache, initialCapacity, maxCapacity);
        } else {
            buf = PlatformDependent.hasUnsafe() ?
                    new UnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
                    new UnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
        }

        return toLeakAwareBuffer(buf);
    }

    @Override
    protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
        //拿到線程局部緩存
        PoolThreadCache cache = threadCache.get();
        //拿到directArena
        PoolArena<ByteBuffer> directArena = cache.directArena;

        final ByteBuf buf;
        if (directArena != null) {
            //使用directArena分配內(nèi)存
            buf = directArena.allocate(cache, initialCapacity, maxCapacity);
        } else {
            buf = PlatformDependent.hasUnsafe() ?
                    UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                    new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }

        return toLeakAwareBuffer(buf);
    }
  • 跟蹤threadCache.get()
    調(diào)用的是FastThreadLocal#get()方法。那么其實threadCache也是一個FastThreadLocal,可以看成是jdk的ThreadLocal,只不過還了一種跟家塊的是西安方法。get方發(fā)住喲啊是調(diào)用了初始化方法initialize
    public final V get() {
        InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get();
        Object v = threadLocalMap.indexedVariable(index);
        if (v != InternalThreadLocalMap.UNSET) {
            return (V) v;
        }
        //調(diào)用初始化方法
        V value = initialize(threadLocalMap);
        registerCleaner(threadLocalMap);
        return value;
    }
private final PoolThreadLocalCache threadCache;

initialValue()方法的邏輯如下

  1. 從預(yù)先準(zhǔn)備好的heapArenasdirectArenas中獲取最少使用的arena
  2. 使用獲取到的arean為參數(shù),實例化一個PoolThreadCache并返回
    final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
        private final boolean useCacheForAllThreads;

        PoolThreadLocalCache(boolean useCacheForAllThreads) {
            this.useCacheForAllThreads = useCacheForAllThreads;
        }

        @Override
        protected synchronized PoolThreadCache initialValue() {
            /**
             * arena翻譯成競技場,關(guān)于內(nèi)存非配的邏輯都在這個競技場中進(jìn)行分配
             */
            //獲取heapArena:從heapArenas堆內(nèi)競技場中拿出使用最少的一個arena
            final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
            //獲取directArena:從directArena堆內(nèi)競技場中拿出使用最少的一個arena
            final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);

            Thread current = Thread.currentThread();
            if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
                //創(chuàng)建PoolThreadCache:該Cache最終被一個線程使用
                //通過heapArena和directArena維護(hù)兩大塊內(nèi)存:堆和堆外內(nèi)存
                //通過tinyCacheSize,smallCacheSize,normalCacheSize維護(hù)ByteBuf緩存列表維護(hù)反復(fù)使用的內(nèi)存塊
                return new PoolThreadCache(
                        heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
                        DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
            }
            // No caching so just use 0 as sizes.
            return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
        }

      //省略代碼......

      }

查看PoolThreadCache其維護(hù)了兩種類型的內(nèi)存分配策略,一種是上述通過持有heapArenadirectArena,另一種是通過維護(hù)tiny,small,normal對應(yīng)的緩存列表來維護(hù)反復(fù)使用的內(nèi)存。

final class PoolThreadCache {

    private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);

    //通過arena的方式維護(hù)內(nèi)存
    final PoolArena<byte[]> heapArena;
    final PoolArena<ByteBuffer> directArena;

    //維護(hù)了tiny, small, normal三種類型的緩存列表
    // Hold the caches for the different size classes, which are tiny, small and normal.
    private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
    private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
    private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
    private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
    private final MemoryRegionCache<byte[]>[] normalHeapCaches;
    private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;

    // Used for bitshifting when calculate the index of normal caches later
    private final int numShiftsNormalDirect;
    private final int numShiftsNormalHeap;
    private final int freeSweepAllocationThreshold;
    private final AtomicBoolean freed = new AtomicBoolean();

    private int allocations;

    // TODO: Test if adding padding helps under contention
    //private long pad0, pad1, pad2, pad3, pad4, pad5, pad6, pad7;

    PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
                    int tinyCacheSize, int smallCacheSize, int normalCacheSize,
                    int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
        checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
        this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;

        //通過持有heapArena和directArena,arena的方式管理內(nèi)存分配
        this.heapArena = heapArena;
        this.directArena = directArena;

        //通過tinyCacheSize,smallCacheSize,normalCacheSize創(chuàng)建不同類型的緩存列表并保存到成員變量
        if (directArena != null) {
            tinySubPageDirectCaches = createSubPageCaches(
                    tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
            smallSubPageDirectCaches = createSubPageCaches(
                    smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);

            numShiftsNormalDirect = log2(directArena.pageSize);
            normalDirectCaches = createNormalCaches(
                    normalCacheSize, maxCachedBufferCapacity, directArena);

            directArena.numThreadCaches.getAndIncrement();
        } else {
            // No directArea is configured so just null out all caches
            tinySubPageDirectCaches = null;
            smallSubPageDirectCaches = null;
            normalDirectCaches = null;
            numShiftsNormalDirect = -1;
        }
        if (heapArena != null) {
            // Create the caches for the heap allocations
            //創(chuàng)建規(guī)格化緩存隊列
            tinySubPageHeapCaches = createSubPageCaches(
                    tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
            //創(chuàng)建規(guī)格化緩存隊列
            smallSubPageHeapCaches = createSubPageCaches(
                    smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);

            numShiftsNormalHeap = log2(heapArena.pageSize);
            //創(chuàng)建規(guī)格化緩存隊列
            normalHeapCaches = createNormalCaches(
                    normalCacheSize, maxCachedBufferCapacity, heapArena);

            heapArena.numThreadCaches.getAndIncrement();
        } else {
            // No heapArea is configured so just null out all caches
            tinySubPageHeapCaches = null;
            smallSubPageHeapCaches = null;
            normalHeapCaches = null;
            numShiftsNormalHeap = -1;
        }

        // Only check if there are caches in use.
        if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null
                || tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null)
                && freeSweepAllocationThreshold < 1) {
            throw new IllegalArgumentException("freeSweepAllocationThreshold: "
                    + freeSweepAllocationThreshold + " (expected: > 0)");
        }
    }

    private static <T> MemoryRegionCache<T>[] createSubPageCaches(
            int cacheSize, int numCaches, SizeClass sizeClass) {
        if (cacheSize > 0 && numCaches > 0) {
            //MemoryRegionCache 維護(hù)緩存的一個對象
            @SuppressWarnings("unchecked")
            MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
            for (int i = 0; i < cache.length; i++) {
                // TODO: maybe use cacheSize / cache.length
                //每一種MemoryRegionCache(tiny,small,normal)都表示不同內(nèi)存大?。ú煌?guī)格)的一個隊列
                cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
            }
            return cache;
        } else {
            return null;
        }
    }

    private static <T> MemoryRegionCache<T>[] createNormalCaches(
            int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
        if (cacheSize > 0 && maxCachedBufferCapacity > 0) {
            int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
            int arraySize = Math.max(1, log2(max / area.pageSize) + 1);
            //MemoryRegionCache 維護(hù)緩存的一個對象
            @SuppressWarnings("unchecked")
            MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize];
            for (int i = 0; i < cache.length; i++) {
                //每一種MemoryRegionCache(tiny,small,normal)都表示不同內(nèi)存(不同規(guī)格)大小的一個隊列
                cache[i] = new NormalMemoryRegionCache<T>(cacheSize);
            }
            return cache;
        } else {
            return null;
        }
    }

......
}

通過查看分配緩存的方法PoolThreadCache#createSubPageCaches()可以發(fā)現(xiàn)具體維護(hù)的緩存列表對象MemoryRegionCache實際上時維護(hù)了一個Queue<Entry<T>> queue也就是隊列。

    private abstract static class MemoryRegionCache<T> {
        private final int size;
        private final Queue<Entry<T>> queue;
        private final SizeClass sizeClass;
        private int allocations;

        MemoryRegionCache(int size, SizeClass sizeClass) {
            //做一個簡單的規(guī)格化
            this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
            //持有這種規(guī)格的緩存隊列
            queue = PlatformDependent.newFixedMpscQueue(this.size);
            this.sizeClass = sizeClass;
        }
     ......
     }
  • 關(guān)于準(zhǔn)備好的內(nèi)存競技場heapArenadirectArenaPooledByteBufAllocator持有。在實例化分配器的時候被初始化值
    private final PoolArena<byte[]>[] heapArenas;
    private final PoolArena<ByteBuffer>[] directArenas;
    
    //三種緩存列表長度
    private final int tinyCacheSize;
    private final int smallCacheSize;
    private final int normalCacheSize;

跟蹤初始化的過程可以發(fā)現(xiàn),其實headArenadirectArena都是一個PoolArena[],其內(nèi)部分別定義了兩個內(nèi)部類PoolArena.HeapArenaPoolArena.DirectArena分別表示堆內(nèi)內(nèi)存競技場和堆外內(nèi)存競技場。

    public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
                                  int tinyCacheSize, int smallCacheSize, int normalCacheSize,
                                  boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
        super(preferDirect);
        threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
        this.tinyCacheSize = tinyCacheSize;
        this.smallCacheSize = smallCacheSize;
        this.normalCacheSize = normalCacheSize;
        chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);

        checkPositiveOrZero(nHeapArena, "nHeapArena");
        checkPositiveOrZero(nDirectArena, "nDirectArena");

        checkPositiveOrZero(directMemoryCacheAlignment, "directMemoryCacheAlignment");
        if (directMemoryCacheAlignment > 0 && !isDirectMemoryCacheAlignmentSupported()) {
            throw new IllegalArgumentException("directMemoryCacheAlignment is not supported");
        }

        if ((directMemoryCacheAlignment & -directMemoryCacheAlignment) != directMemoryCacheAlignment) {
            throw new IllegalArgumentException("directMemoryCacheAlignment: "
                    + directMemoryCacheAlignment + " (expected: power of two)");
        }

        int pageShifts = validateAndCalculatePageShifts(pageSize);

        //創(chuàng)建兩種內(nèi)存分配的PoolArena數(shù)組,heapArenas和directArenas
        if (nHeapArena > 0) {
            //創(chuàng)建heapArenas內(nèi)存競技場(其實是PoolArena[])
            //nHeapArena:數(shù)組大小
            heapArenas = newArenaArray(nHeapArena);
            List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
            for (int i = 0; i < heapArenas.length; i ++) {
                //堆內(nèi):PoolArena[]存放它下面的HeapArena
                PoolArena.HeapArena arena = new PoolArena.HeapArena(this,
                        pageSize, maxOrder, pageShifts, chunkSize,
                        directMemoryCacheAlignment);
                heapArenas[i] = arena;
                metrics.add(arena);
            }
            heapArenaMetrics = Collections.unmodifiableList(metrics);
        } else {
            heapArenas = null;
            heapArenaMetrics = Collections.emptyList();
        }

        if (nDirectArena > 0) {
            //創(chuàng)建heapArenas內(nèi)存競技場(其實是PoolArena[])
            directArenas = newArenaArray(nDirectArena);
            List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
            for (int i = 0; i < directArenas.length; i ++) {
                //堆外:PoolArena[]存放它下面的DirectArena
                PoolArena.DirectArena arena = new PoolArena.DirectArena(
                        this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment);
                directArenas[i] = arena;
                metrics.add(arena);
            }
            directArenaMetrics = Collections.unmodifiableList(metrics);
        } else {
            directArenas = null;
            directArenaMetrics = Collections.emptyList();
        }
        metric = new PooledByteBufAllocatorMetric(this);
    }
    private static <T> PoolArena<T>[] newArenaArray(int size) {
        //創(chuàng)建PoolArena數(shù)組
        return new PoolArena[size];
    }

初始化內(nèi)存競技場數(shù)組的大家的默認(rèn)值為defaultMinNumArena,2被的cpu核心數(shù),運(yùn)行時每個線程可獨(dú)享一個arena,內(nèi)存分配的時候就不用加鎖了

    public PooledByteBufAllocator(boolean preferDirect) {
        this(preferDirect, DEFAULT_NUM_HEAP_ARENA, DEFAULT_NUM_DIRECT_ARENA, DEFAULT_PAGE_SIZE, DEFAULT_MAX_ORDER);
    }
        //2倍cpu核心數(shù),默認(rèn)創(chuàng)建這個數(shù)量大小的Arena數(shù)組
        // (這個數(shù)字和創(chuàng)建NioEventLoop數(shù)組的數(shù)量一致,每個線程都可以由一個獨(dú)享的arena,這個數(shù)組中的arena其實在分配內(nèi)存的時候是不用加鎖的)
        final int defaultMinNumArena = NettyRuntime.availableProcessors() * 2;
        final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER;
        DEFAULT_NUM_HEAP_ARENA = Math.max(0,
                SystemPropertyUtil.getInt(
                        "io.netty.allocator.numHeapArenas",
                        (int) Math.min(
                                defaultMinNumArena,
                                runtime.maxMemory() / defaultChunkSize / 2 / 3)));
        DEFAULT_NUM_DIRECT_ARENA = Math.max(0,
                SystemPropertyUtil.getInt(
                        "io.netty.allocator.numDirectArenas",
                        (int) Math.min(
                                defaultMinNumArena,
                                PlatformDependent.maxDirectMemory() / defaultChunkSize / 2 / 3)));

  • 整體分配架構(gòu),如圖
    假設(shè)初始化了4個NioEventLoop也就是4個線程的數(shù)組,默認(rèn)cpu核心數(shù)為2。那么內(nèi)存分配器PooledByteBufAllocator持有的arena數(shù)量也是4個。創(chuàng)建一個ByteBuf的過程如下:
  • 首先,通過PoolThreadCache去拿到一個對應(yīng)的arena對象。那么PoolThreadCache的作用就是通過ThreadLoad的方式把內(nèi)存分配器PooledByteBufAllocator持有的arena數(shù)組中其中的一個arena(最少使用的)塞到PoolThreadCache的一個成員變量里面。
  • 然后,當(dāng)每個線程通過它(threadCache)去調(diào)用get方法的時候,會拿到它底層的一個arena,也就是第一個線程拿到第一個,第二個線程拿到第二個以此類推。這樣可以把線程和arena進(jìn)行一個綁定
  • PoolThreadCache除了可以直接在arena管理的這塊內(nèi)存進(jìn)行內(nèi)存分配,還可在它底層維護(hù)的一個ByteBuf緩存列表里進(jìn)行內(nèi)存分配。在PooledByteBufAllocator中持有tinyCacheSize,smallCacheSize,normalCacheSize,分配內(nèi)存時調(diào)用threadCache.get();的時候?qū)嵗?code>PoolThreadCache作為它的構(gòu)造方法參數(shù)傳入,創(chuàng)建了對應(yīng)的緩存列表。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Swift1> Swift和OC的區(qū)別1.1> Swift沒有地址/指針的概念1.2> 泛型1.3> 類型嚴(yán)謹(jǐn) 對...
    cosWriter閱讀 11,619評論 1 32
  • 在一個方法內(nèi)部定義的變量都存儲在棧中,當(dāng)這個函數(shù)運(yùn)行結(jié)束后,其對應(yīng)的棧就會被回收,此時,在其方法體中定義的變量將不...
    Y了個J閱讀 4,547評論 1 14
  • 在學(xué)習(xí)jemalloc之前可以了解一下glibc malloc,jemalloc沒有'unlinking' 和 '...
    dcharles閱讀 7,058評論 0 7
  • Java SE 基礎(chǔ): 封裝、繼承、多態(tài) 封裝: 概念:就是把對象的屬性和操作(或服務(wù))結(jié)合為一個獨(dú)立的整體,并盡...
    Jayden_Cao閱讀 2,234評論 0 8
  • 所有知識點已整理成app app下載地址 J2EE 部分: 1.Switch能否用string做參數(shù)? 在 Jav...
    侯蛋蛋_閱讀 2,700評論 1 4

友情鏈接更多精彩內(nèi)容