內(nèi)存分配概述
介紹netty內(nèi)存分配,最為底層,負(fù)責(zé)從底層讀據(jù)到ByteBuf。
三個(gè)問題
+內(nèi)存類別有哪些
+如何減少多線程內(nèi)存分配之間的競爭
+不同大小的內(nèi)存是如何進(jìn)行分配的
主要內(nèi)容:
- 內(nèi)存與內(nèi)存管理器的抽象
- 不同規(guī)格大小和不同類型的內(nèi)存的分配策略
- 內(nèi)存回收過程
ByteBuf結(jié)構(gòu)以及重要API
- ByteBuf結(jié)構(gòu)
ByteBuf內(nèi)存結(jié)構(gòu)
* +-------------------+------------------+------------------+
* | discardable bytes | readable bytes | writable bytes |
* | | (CONTENT) | |
* +-------------------+------------------+------------------+
* | | | |
* 0 <= readerIndex <= writerIndex <= capacity
+read、write、set方法
調(diào)用read方法時(shí),會移動readerIndex指針;
調(diào)用write方法時(shí),會移動writerIndex指針;
set方法不移動任何指針。
mark和reset方法
mark方法的作用時(shí)保存指針。
reset方法復(fù)原指針位置。readbleBytes和writableBytes、maxWritableBytes
見名思義。
ByteBuf分類
ByteBuf類圖
- Pooled和Unpooled
Pooled從預(yù)先分配好的內(nèi)存中分配;Unpooled直接調(diào)用系統(tǒng)api進(jìn)行內(nèi)存分配。 - Unsafe和非Unsafe
Unsafe:調(diào)用jdk的Unsafe拿到ByteBuf的內(nèi)存地址。
非Unsafe:不依賴底層unsafe。 - Heap和Direct
Heap:直接在堆上內(nèi)存分配,分配的內(nèi)存參與gc,分配的內(nèi)存不需要手動釋放,底層是byte[]數(shù)組。
Direct:分配的內(nèi)存不受 的控制,分配的內(nèi)存不參與gc,分配的內(nèi)存需要手動釋放,調(diào)用jdk的ByteBuffer.allocateDirect(initialCapacity)進(jìn)行內(nèi)存分配。
內(nèi)存分配器ByteBufAllocator
ByteBufAllocator功能
最頂層抽象ByteBufAllocator,功能:重載的buffer方法,重載的ioBuffer(更希望分配directBuffer),heapBuffer在堆上進(jìn)行內(nèi)存分配,directBuffer直接內(nèi)存分配,compositeBuffer可以把兩個(gè)byteBuffer合并在一起。AbstractByteBufAllocator
實(shí)現(xiàn)了ByteBufAllocator的大部分功能,留下了兩個(gè)抽象接口newHeapBuffer,newDirectBuffer進(jìn)行擴(kuò)展,從而區(qū)分heap和direct內(nèi)存。ByteBufAllocator兩大子類
ByteBufAllocator分類:
ByteBufAllocator的兩大子類PooledByteBufAllocator和UnpooledByteBufAllocator,這里是通過子類區(qū)分Pooled和Unpooled。
那么Unsafe和非Unsafe是如何區(qū)分的呢?netty是自動判別的,如果底層有unsafe
對象netty就直接通過Unsafe來分配內(nèi)存。
UnpooledByteBufAllocator分析
- heap內(nèi)存分配邏輯
- direct內(nèi)存分配邏輯
unsafe會通過內(nèi)存地址+偏移量的方式去拿到對應(yīng)的數(shù)據(jù);而非unsafe是通過數(shù)組+下標(biāo)或者jdk底層的ByteBuffer的api拿數(shù)據(jù)。一般情況下通過unsafe操作內(nèi)存比非unsafe的方式效率要高。
PooledByteBufAllocator概述
首先它下面分了兩類內(nèi)存,newHeapBuffer和newDirectBuffer,這兩類內(nèi)存的分配過程大致相同,我們來分析newDirectBuffer。
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<ByteBuffer> directArena = cache.directArena;
final ByteBuf buf;
if (directArena != null) {
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}
- 拿到線程局部緩存PoolThreadCache
因?yàn)閚ewDirectBuffer可能被多線程調(diào)用
final class PoolThreadLocalCache extends FastThreadLocal<PoolThreadCache> {
private final boolean useCacheForAllThreads;
PoolThreadLocalCache(boolean useCacheForAllThreads) {
this.useCacheForAllThreads = useCacheForAllThreads;
}
@Override
protected synchronized PoolThreadCache initialValue() {
// 拿到 heapArena 和 directArena ;然后創(chuàng)建PoolThreadCache
final PoolArena<byte[]> heapArena = leastUsedArena(heapArenas);
final PoolArena<ByteBuffer> directArena = leastUsedArena(directArenas);
Thread current = Thread.currentThread();
if (useCacheForAllThreads || current instanceof FastThreadLocalThread) {
return new PoolThreadCache(
heapArena, directArena, tinyCacheSize, smallCacheSize, normalCacheSize,
DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
}
// No caching so just use 0 as sizes.
return new PoolThreadCache(heapArena, directArena, 0, 0, 0, 0, 0);
}
@Override
protected void onRemoval(PoolThreadCache threadCache) {
threadCache.free();
}
private <T> PoolArena<T> leastUsedArena(PoolArena<T>[] arenas) {
if (arenas == null || arenas.length == 0) {
return null;
}
PoolArena<T> minArena = arenas[0];
for (int i = 1; i < arenas.length; i++) {
PoolArena<T> arena = arenas[i];
if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
minArena = arena;
}
}
return minArena;
}
}
FastThreadLocal實(shí)際上是一個(gè)更快的ThreadLocal,從這里看出每個(gè)線程都有一個(gè)PoolThreadCache 。
- 在線程局部緩存的Arena上進(jìn)行內(nèi)存分配
線程局部緩存維護(hù)著兩大內(nèi)存,一個(gè)是堆相關(guān)的內(nèi)存,一個(gè)是堆外相關(guān)的內(nèi)存。我們拿堆外內(nèi)存相關(guān)的邏輯進(jìn)行分析。
heapArena和directArena是在創(chuàng)建PoolThreadCache的時(shí)候傳遞進(jìn)來的,見上面initialValue代碼。
在創(chuàng)建內(nèi)存構(gòu)造器PooledByteBufAllocator的時(shí)候會創(chuàng)建兩大內(nèi)存heapArena和directArena,我們來看構(gòu)造函數(shù)。
public PooledByteBufAllocator(boolean preferDirect, int nHeapArena, int nDirectArena, int pageSize, int maxOrder,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
boolean useCacheForAllThreads, int directMemoryCacheAlignment) {
super(preferDirect);
threadCache = new PoolThreadLocalCache(useCacheForAllThreads);
this.tinyCacheSize = tinyCacheSize;
this.smallCacheSize = smallCacheSize;
this.normalCacheSize = normalCacheSize;
chunkSize = validateAndCalculateChunkSize(pageSize, maxOrder);
checkPositiveOrZero(nHeapArena, "nHeapArena");
checkPositiveOrZero(nDirectArena, "nDirectArena");
checkPositiveOrZero(directMemoryCacheAlignment, "directMemoryCacheAlignment");
if (directMemoryCacheAlignment > 0 && !isDirectMemoryCacheAlignmentSupported()) {
throw new IllegalArgumentException("directMemoryCacheAlignment is not supported");
}
if ((directMemoryCacheAlignment & -directMemoryCacheAlignment) != directMemoryCacheAlignment) {
throw new IllegalArgumentException("directMemoryCacheAlignment: "
+ directMemoryCacheAlignment + " (expected: power of two)");
}
int pageShifts = validateAndCalculatePageShifts(pageSize);
if (nHeapArena > 0) {
//heapArena初始化
heapArenas = newArenaArray(nHeapArena);
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(heapArenas.length);
for (int i = 0; i < heapArenas.length; i ++) {
PoolArena.HeapArena arena = new PoolArena.HeapArena(this,
pageSize, maxOrder, pageShifts, chunkSize,
directMemoryCacheAlignment);
heapArenas[i] = arena;
metrics.add(arena);
}
heapArenaMetrics = Collections.unmodifiableList(metrics);
} else {
heapArenas = null;
heapArenaMetrics = Collections.emptyList();
}
if (nDirectArena > 0) {
//directArena初始化
directArenas = newArenaArray(nDirectArena);
List<PoolArenaMetric> metrics = new ArrayList<PoolArenaMetric>(directArenas.length);
for (int i = 0; i < directArenas.length; i ++) {
PoolArena.DirectArena arena = new PoolArena.DirectArena(
this, pageSize, maxOrder, pageShifts, chunkSize, directMemoryCacheAlignment);
directArenas[i] = arena;
metrics.add(arena);
}
directArenaMetrics = Collections.unmodifiableList(metrics);
} else {
directArenas = null;
directArenaMetrics = Collections.emptyList();
}
metric = new PooledByteBufAllocatorMetric(this);
}
heapArena 初始化heapArenas = newArenaArray(nHeapArena);;directArena初始化directArenas = newArenaArray(nDirectArena);。
我們來看上述構(gòu)造函數(shù)的nHeapArena和nDirectArena從哪里來的,往上跟代碼:
public PooledByteBufAllocator(boolean preferDirect) {
this(preferDirect, DEFAULT_NUM_HEAP_ARENA, DEFAULT_NUM_DIRECT_ARENA, DEFAULT_PAGE_SIZE, DEFAULT_MAX_ORDER);
}
看DEFAULT_NUM_DIRECT_ARENA怎么來的。默認(rèn)情況下defaultMinNumArena是小于runtime.maxMemory() / defaultChunkSize / 2 / 3)的,所以 DEFAULT_NUM_DIRECT_ARENA默認(rèn)情況下是兩倍的cpu核數(shù)。DEFAULT_NUM_HEAP_ARENA也是同理。為什么要創(chuàng)建兩倍的cpu核心數(shù)的Arena?因?yàn)樵谇懊鎰?chuàng)建NIO線程的時(shí)候也是默認(rèn)兩倍的cpu核心數(shù),也就是說每個(gè)線程都有一個(gè)獨(dú)享的Arena,對arena數(shù)組中的每個(gè)Arena它其實(shí)在分配線程的時(shí)候是不用加鎖的。
/*
* We use 2 * available processors by default to reduce contention as we use 2 * available processors for the
* number of EventLoops in NIO and EPOLL as well. If we choose a smaller number we will run into hot spots as
* allocation and de-allocation needs to be synchronized on the PoolArena.
*
* See https://github.com/netty/netty/issues/3888.
*/
// 兩倍的cpu核心數(shù)
final int defaultMinNumArena = NettyRuntime.availableProcessors() * 2;
final int defaultChunkSize = DEFAULT_PAGE_SIZE << DEFAULT_MAX_ORDER;
DEFAULT_NUM_HEAP_ARENA = Math.max(0,
SystemPropertyUtil.getInt(
"io.netty.allocator.numHeapArenas",
(int) Math.min(
defaultMinNumArena,
runtime.maxMemory() / defaultChunkSize / 2 / 3)));
我們來看下PooledByteBufAllocator內(nèi)存分配器(假設(shè)有四個(gè)NIO線程)的結(jié)構(gòu)示意圖:

圖中有四個(gè)NIO線程,通過我們前面的代碼分析我們知道分別有4個(gè)heapArena和4個(gè)directArena,邏輯基本上是相同的,我們在圖中統(tǒng)稱為Arena。
PooledByteBufAllocator在分配ByteBuf時(shí)候是怎么做的呢?首先通過PoolThreadCache拿到對應(yīng)的Arena對象;PooledThreadCache的作用通過ThreadLocal把內(nèi)存分配器中其中的一個(gè)Arena塞到它的成員變量里邊,然后當(dāng)每個(gè)NIO線程去調(diào)用它的get方法的時(shí)候,會拿到它底層的一個(gè)Arena,這樣就可以把線程和Arena進(jìn)行一個(gè)綁定。PooledByteBufAllocator除了可以在Arena上進(jìn)行分配內(nèi)存還可以在它底層維護(hù)的ByteBuf緩存列表上分配內(nèi)存。
舉個(gè)例子,當(dāng)我第一次分配了1024個(gè)字節(jié)的內(nèi)存大小使用完了之后,需要第二次分配1024字節(jié)的內(nèi)存。這個(gè)時(shí)候其實(shí)不需要在Arena上進(jìn)行內(nèi)存分配,而是通過PoolThreadCache里邊維護(hù)的一個(gè)緩存列表中取出返回即可。
PooledByteBufAllocator里邊維護(hù)了三個(gè)類型的ByteBuf緩存的大小,tinyCacheSize,smallCaheSize,normalCacheSize,在PoolThreadCache初始化的時(shí)候使用到了這三個(gè)值,
PoolThreadCache的構(gòu)造函數(shù)
PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
this.heapArena = heapArena;
this.directArena = directArena;
if (directArena != null) {
// 創(chuàng)建緩存對象
tinySubPageDirectCaches = createSubPageCaches(
tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
smallSubPageDirectCaches = createSubPageCaches(
smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
numShiftsNormalDirect = log2(directArena.pageSize);
normalDirectCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, directArena);
directArena.numThreadCaches.getAndIncrement();
} else {
// No directArea is configured so just null out all caches
tinySubPageDirectCaches = null;
smallSubPageDirectCaches = null;
normalDirectCaches = null;
numShiftsNormalDirect = -1;
}
if (heapArena != null) {
// Create the caches for the heap allocations
tinySubPageHeapCaches = createSubPageCaches(
tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
smallSubPageHeapCaches = createSubPageCaches(
smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);
numShiftsNormalHeap = log2(heapArena.pageSize);
normalHeapCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, heapArena);
heapArena.numThreadCaches.getAndIncrement();
} else {
// No heapArea is configured so just null out all caches
tinySubPageHeapCaches = null;
smallSubPageHeapCaches = null;
normalHeapCaches = null;
numShiftsNormalHeap = -1;
}
// Only check if there are caches in use.
if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null
|| tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null)
&& freeSweepAllocationThreshold < 1) {
throw new IllegalArgumentException("freeSweepAllocationThreshold: "
+ freeSweepAllocationThreshold + " (expected: > 0)");
}
}
創(chuàng)建緩存對象的方法:
private static <T> MemoryRegionCache<T>[] createSubPageCaches(
int cacheSize, int numCaches, SizeClass sizeClass) {
if (cacheSize > 0 && numCaches > 0) {
@SuppressWarnings("unchecked")
MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
for (int i = 0; i < cache.length; i++) {
// TODO: maybe use cacheSize / cache.length
// 創(chuàng)建cache
cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
}
return cache;
} else {
return null;
}
}
創(chuàng)建緩存對象中的每個(gè)元素:
private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
SubPageMemoryRegionCache(int size, SizeClass sizeClass) {
super(size, sizeClass);
}
@Override
protected void initBuf(
PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity) {
chunk.initBufWithSubpage(buf, nioBuffer, handle, reqCapacity);
}
}
MemoryRegionCache(int size, SizeClass sizeClass) {
// size緩存的內(nèi)存規(guī)格
this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
// queue這種內(nèi)存規(guī)格的緩存最終有多少個(gè)
queue = PlatformDependent.newFixedMpscQueue(this.size);
this.sizeClass = sizeClass;
}
7. directArena分配direct內(nèi)存的流程
- 從對象池里拿到PooledByteBuf進(jìn)行復(fù)用
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<ByteBuffer> directArena = cache.directArena;
final ByteBuf buf;
if (directArena != null) {
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
buf = PlatformDependent.hasUnsafe() ?
UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}
看 directArena.allocate
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
// 創(chuàng)建PooledByteBuf
PooledByteBuf<T> buf = newByteBuf(maxCapacity);
// 從cache中為PooledByteBuf分配內(nèi)存
allocate(cache, buf, reqCapacity);
return buf;
}
我們來看newByteBuf,DirectArena中的實(shí)現(xiàn)分配對外內(nèi)存
@Override
protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
if (HAS_UNSAFE) { // 默認(rèn)采用unsafe方式
return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
} else {
return PooledDirectByteBuf.newInstance(maxCapacity);
}
}
看newInstance:
static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
// 從可回收的對象池中拿到ByteBuf,對象池中沒有就直接創(chuàng)建一個(gè)
PooledUnsafeDirectByteBuf buf = RECYCLER.get();
buf.reuse(maxCapacity); // 進(jìn)行復(fù)用,設(shè)置capacity,引用次數(shù),readerIndex,writerIndex,重置標(biāo)志位
return buf; // 拿到純凈的ByteBuf對象
}
我們看RECYCLE:
private static final Recycler<PooledUnsafeDirectByteBuf> RECYCLER = new Recycler<PooledUnsafeDirectByteBuf>() {
@Override
protected PooledUnsafeDirectByteBuf newObject(Handle<PooledUnsafeDirectByteBuf> handle) {
return new PooledUnsafeDirectByteBuf(handle, 0); // RECYCLER 沒有就創(chuàng)建一個(gè)ByteBuf,handle負(fù)責(zé)ByteBuf對象的回收
}
};
內(nèi)存分配第一步拿到了ByteBuf,接下來從PoolThreadCache上進(jìn)行內(nèi)存分配。
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
final int normCapacity = normalizeCapacity(reqCapacity);
if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
int tableIdx;
PoolSubpage<T>[] table;
boolean tiny = isTiny(normCapacity);
if (tiny) { // < 512
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
// 以上嘗試在緩存上進(jìn)行內(nèi)存分配,如果沒有成功,會進(jìn)行實(shí)際內(nèi)存分配
final PoolSubpage<T> head = table[tableIdx];
/**
* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
* {@link PoolChunk#free(long)} may modify the doubly linked list as well.
*/
synchronized (head) {
final PoolSubpage<T> s = head.next;
if (s != head) {
assert s.doNotDestroy && s.elemSize == normCapacity;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
incTinySmallAllocation(tiny);
return;
}
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
}
incTinySmallAllocation(tiny);
return;
}
if (normCapacity <= chunkSize) { // 這里是個(gè)特例,如果分配的內(nèi)存大于chunkSize就分配一個(gè)allocateHuge
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
++allocationsNormal;
}
} else {
// allocateHuge不是從緩存上分配的
// Huge allocations are never served via the cache so just call allocateHuge
allocateHuge(buf, reqCapacity);
}
}
allocate實(shí)際上包含兩大步驟,第一步先從緩存上進(jìn)行內(nèi)存分配,第二步從內(nèi)存堆里面進(jìn)行內(nèi)存分配。
- allocate 從緩存上進(jìn)行內(nèi)存分配
- 從內(nèi)存堆里面進(jìn)行內(nèi)存分配
8. 內(nèi)存規(guī)格介紹

內(nèi)存臨界值:0,512B,8K,16M
tiny: 0-512B
small: 512B-8K
normal:8k-16M
huge:>16M
為什么把16M作為一個(gè)內(nèi)存分界點(diǎn)?16M對應(yīng)的一個(gè)chunk,所有的內(nèi)存申請是以chunk為單位到操作系統(tǒng)進(jìn)行申請的;然后所有ByteBuf的內(nèi)存分配,都是在chunk里邊進(jìn)行操作;比如要分配一個(gè)1M的內(nèi)存,我要先從操作系統(tǒng)中申請一個(gè)16M的chunk,然后從16M里取一段內(nèi)存當(dāng)作1M,然后把這1M對應(yīng)的連續(xù)內(nèi)存分配給ByteBuf。
為什么會有一個(gè)8k的內(nèi)存臨界點(diǎn)?netty里面把8k當(dāng)作一個(gè)page進(jìn)行內(nèi)存分配的。從系統(tǒng)申請到了16M的內(nèi)存,這是比較大的。這時(shí)候netty對16M的內(nèi)存進(jìn)行切分,切分的方式就是以Page進(jìn)行切分。也就是一個(gè)chunk切分成了2048個(gè)page,分配16k內(nèi)存時(shí)只需要取2個(gè)page。
0-8k的內(nèi)存對象在netty中叫做subPage。如果申請一個(gè)10B的內(nèi)存還是以page進(jìn)行分配內(nèi)存,這樣就會很浪費(fèi),這時(shí)候就能看到了subPage的作用。
9. 命中緩存的分配流程
看allocate方法:
private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
final int normCapacity = normalizeCapacity(reqCapacity);
if (isTinyOrSmall(normCapacity)) { // capacity < pageSize,pageSize是8k
int tableIdx;
PoolSubpage<T>[] table;
boolean tiny = isTiny(normCapacity);
if (tiny) { // < 512
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
final PoolSubpage<T> head = table[tableIdx];
/**
* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
* {@link PoolChunk#free(long)} may modify the doubly linked list as well.
*/
synchronized (head) {
final PoolSubpage<T> s = head.next;
if (s != head) {
assert s.doNotDestroy && s.elemSize == normCapacity;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
incTinySmallAllocation(tiny);
return;
}
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
}
incTinySmallAllocation(tiny);
return;
}
if (normCapacity <= chunkSize) {
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
++allocationsNormal;
}
} else {
// Huge allocations are never served via the cache so just call allocateHuge
allocateHuge(buf, reqCapacity);
}
}
- 首先進(jìn)行內(nèi)存的規(guī)格化;
int normalizeCapacity(int reqCapacity) {
checkPositiveOrZero(reqCapacity, "reqCapacity");
if (reqCapacity >= chunkSize) { // >16M直接返回
return directMemoryCacheAlignment == 0 ? reqCapacity : alignCapacity(reqCapacity);
}
if (!isTiny(reqCapacity)) { // >= 512
// Doubled
int normalizedCapacity = reqCapacity;
normalizedCapacity --;
normalizedCapacity |= normalizedCapacity >>> 1;
normalizedCapacity |= normalizedCapacity >>> 2;
normalizedCapacity |= normalizedCapacity >>> 4;
normalizedCapacity |= normalizedCapacity >>> 8;
normalizedCapacity |= normalizedCapacity >>> 16;
normalizedCapacity ++;
if (normalizedCapacity < 0) {
normalizedCapacity >>>= 1;
}
assert directMemoryCacheAlignment == 0 || (normalizedCapacity & directMemoryCacheAlignmentMask) == 0;
return normalizedCapacity;
}
if (directMemoryCacheAlignment > 0) {
return alignCapacity(reqCapacity);
}
// Quantum-spaced
if ((reqCapacity & 15) == 0) {
return reqCapacity;
}
return (reqCapacity & ~15) + 16;
}
分配緩存的大致步驟:
- 找到對應(yīng)size的MemoryRegionCache;
- 從queue中彈出一個(gè)entry給ByteBuf初始化
entry里面有chunk,代表一段連續(xù)的內(nèi)存,chunk分配一段連續(xù)內(nèi)存給ByteBuf,ByteBuf就可以對這段內(nèi)存進(jìn)行數(shù)據(jù)讀寫。 - 將彈出的entry丟到對象池中進(jìn)行復(fù)用
netty為了盡量對分配的內(nèi)存進(jìn)行復(fù)用,是通過RECYCLE進(jìn)行管理內(nèi)存的。減少gc,減少對象池重復(fù)的創(chuàng)建和銷毀。
以cache.allocateTiny(this, buf, reqCapacity, normCapacity)為例,來說明上述三個(gè)步驟。
- 找到對應(yīng)size的MemoryRegionCache
/**
* Try to allocate a tiny buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
*/
boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}
private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
int idx = PoolArena.tinyIdx(normCapacity); // 計(jì)算數(shù)組下標(biāo),根據(jù)下標(biāo)去取MemoryRegionCache
if (area.isDirect()) {
return cache(tinySubPageDirectCaches, idx);
}
return cache(tinySubPageHeapCaches, idx);
}
// 容量除以16就能求出下標(biāo)
static int tinyIdx(int normCapacity) {
return normCapacity >>> 4;
}
- 從queue中彈出一個(gè)entry給ByteBuf初始化
@SuppressWarnings({ "unchecked", "rawtypes" })
private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
if (cache == null) {
// no cache found so just return false here
return false;
}
boolean allocated = cache.allocate(buf, reqCapacity);
if (++ allocations >= freeSweepAllocationThreshold) {
allocations = 0;
trim();
}
return allocated;
}
public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
Entry<T> entry = queue.poll();
if (entry == null) {
return false;
}
// 初始化
initBuf(entry.chunk, entry.nioBuffer, entry.handle, buf, reqCapacity);
entry.recycle();
// allocations is not thread-safe which is fine as this is only called from the same thread all time.
++ allocations;
return true;
}
private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
SubPageMemoryRegionCache(int size, SizeClass sizeClass) {
super(size, sizeClass);
}
@Override
protected void initBuf( // subPage初始化
PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity) {
chunk.initBufWithSubpage(buf, nioBuffer, handle, reqCapacity);
}
}
void initBufWithSubpage(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity) {
initBufWithSubpage(buf, nioBuffer, handle, bitmapIdx(handle), reqCapacity);
}
private void initBufWithSubpage(PooledByteBuf<T> buf, ByteBuffer nioBuffer,
long handle, int bitmapIdx, int reqCapacity) {
assert bitmapIdx != 0;
int memoryMapIdx = memoryMapIdx(handle);
PoolSubpage<T> subpage = subpages[subpageIdx(memoryMapIdx)];
assert subpage.doNotDestroy;
assert reqCapacity <= subpage.elemSize;
buf.init(
this, nioBuffer, handle,
runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize + offset,
reqCapacity, subpage.elemSize, arena.parent.threadCache());
}
void init(PoolChunk<T> chunk, ByteBuffer nioBuffer,
long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
init0(chunk, nioBuffer, handle, offset, length, maxLength, cache);
}
private void init0(PoolChunk<T> chunk, ByteBuffer nioBuffer,
long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
assert handle >= 0;
assert chunk != null;
this.chunk = chunk; // 向系統(tǒng)申請的內(nèi)存塊
memory = chunk.memory;
tmpNioBuf = nioBuffer;
allocator = chunk.arena.parent;
this.cache = cache;
this.handle = handle; // 指向內(nèi)存塊
this.offset = offset;
this.length = length;
this.maxLength = maxLength;
}
- 將彈出的entry丟到對象池中進(jìn)行復(fù)用
看entry.recycle();
static final class Entry<T> {
final Handle<Entry<?>> recyclerHandle;
PoolChunk<T> chunk;
ByteBuffer nioBuffer;
long handle = -1;
Entry(Handle<Entry<?>> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
void recycle() { // 參數(shù)初始化設(shè)置
chunk = null;
nioBuffer = null;
handle = -1;
recyclerHandle.recycle(this);
}
}
@Override
public void recycle(Object object) {
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
}
Stack<?> stack = this.stack;
if (lastRecycledId != recycleId || stack == null) {
throw new IllegalStateException("recycled already");
}
stack.push(this); // 壓入堆棧中
}
10. 命中緩存的分配邏輯
-
netty中緩存相關(guān)的數(shù)據(jù)結(jié)構(gòu)
MemeoryRegionCache數(shù)據(jù)結(jié)構(gòu).png
Netty中緩存相關(guān)的數(shù)據(jù)結(jié)構(gòu)叫做MemoryRegionCache,它有三部分組成:第一部分是queue,第二部分是sizeClass,第三部分是size。
首先queue中的每個(gè)元素都是一個(gè)實(shí)體,每個(gè)實(shí)體中都有一個(gè)chunk一個(gè)handler。netty中的內(nèi)存都是以chunk為單位進(jìn)行分配的,handler都唯一指向一段連續(xù)的內(nèi)存;所以chunk和handler合在一起就可以確定一塊內(nèi)存的大小及其位置,所有的實(shí)體組合起來就變成了cache的一個(gè)鏈。從緩存中找對應(yīng)的鏈,就可以定位到queue中的一個(gè)實(shí)體。
sizeClass是netty的內(nèi)存規(guī)格,huge內(nèi)存規(guī)格是直接分配的,所以MemoryRegionCache中沒有。
size是一小塊內(nèi)存的大小。
一個(gè)MemoryRegionCahe中,每個(gè)小塊的內(nèi)存大小是固定的。如果某個(gè)MemoryRegionCache中緩存了一個(gè)1k的內(nèi)存塊,那么這個(gè)MemoryRegionCache中queue緩存的都是1k大小的ByteBuf。內(nèi)存大小的種類,如果內(nèi)存規(guī)格是tiny的,它的內(nèi)存大小種類16B的整數(shù)倍且不大于512B,別的內(nèi)存規(guī)格可從圖示直接看出。
private abstract static class MemoryRegionCache<T> {
private final int size;
private final Queue<Entry<T>> queue;
private final SizeClass sizeClass;
private int allocations;
MemoryRegionCache(int size, SizeClass sizeClass) {
this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
queue = PlatformDependent.newFixedMpscQueue(this.size);
this.sizeClass = sizeClass;
}
// ...
}
MemoryRegionCahe 在PoolThreadCache中維護(hù)。
final class PoolThreadCache {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(PoolThreadCache.class);
final PoolArena<byte[]> heapArena;
final PoolArena<ByteBuffer> directArena;
// Hold the caches for the different size classes, which are tiny, small and normal.
private final MemoryRegionCache<byte[]>[] tinySubPageHeapCaches;
private final MemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] tinySubPageDirectCaches;
private final MemoryRegionCache<ByteBuffer>[] smallSubPageDirectCaches;
private final MemoryRegionCache<byte[]>[] normalHeapCaches;
private final MemoryRegionCache<ByteBuffer>[] normalDirectCaches;
// ...
}
創(chuàng)建MemoryRegionCache:
PoolThreadCache(PoolArena<byte[]> heapArena, PoolArena<ByteBuffer> directArena,
int tinyCacheSize, int smallCacheSize, int normalCacheSize,
int maxCachedBufferCapacity, int freeSweepAllocationThreshold) {
checkPositiveOrZero(maxCachedBufferCapacity, "maxCachedBufferCapacity");
this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
this.heapArena = heapArena;
this.directArena = directArena;
if (directArena != null) {
// 創(chuàng)建tiny[32]數(shù)組
tinySubPageDirectCaches = createSubPageCaches(
tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
smallSubPageDirectCaches = createSubPageCaches(
smallCacheSize, directArena.numSmallSubpagePools, SizeClass.Small);
numShiftsNormalDirect = log2(directArena.pageSize);
normalDirectCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, directArena);
directArena.numThreadCaches.getAndIncrement();
} else {
// No directArea is configured so just null out all caches
tinySubPageDirectCaches = null;
smallSubPageDirectCaches = null;
normalDirectCaches = null;
numShiftsNormalDirect = -1;
}
if (heapArena != null) {
// Create the caches for the heap allocations
tinySubPageHeapCaches = createSubPageCaches(
tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
smallSubPageHeapCaches = createSubPageCaches(
smallCacheSize, heapArena.numSmallSubpagePools, SizeClass.Small);
numShiftsNormalHeap = log2(heapArena.pageSize);
normalHeapCaches = createNormalCaches(
normalCacheSize, maxCachedBufferCapacity, heapArena);
heapArena.numThreadCaches.getAndIncrement();
} else {
// No heapArea is configured so just null out all caches
tinySubPageHeapCaches = null;
smallSubPageHeapCaches = null;
normalHeapCaches = null;
numShiftsNormalHeap = -1;
}
// Only check if there are caches in use.
if ((tinySubPageDirectCaches != null || smallSubPageDirectCaches != null || normalDirectCaches != null
|| tinySubPageHeapCaches != null || smallSubPageHeapCaches != null || normalHeapCaches != null)
&& freeSweepAllocationThreshold < 1) {
throw new IllegalArgumentException("freeSweepAllocationThreshold: "
+ freeSweepAllocationThreshold + " (expected: > 0)");
}
}
來看創(chuàng)建tiny[32]
tinySubPageDirectCaches = createSubPageCaches(
tinyCacheSize, PoolArena.numTinySubpagePools, SizeClass.Tiny);
tinyCacheSize是在內(nèi)存分配器中維護(hù)的,默認(rèn)512;PoolArena.numTinySubpagePools,默認(rèn)512 >>> 4,512右移4位,相當(dāng)于512除以16,也就是32。
創(chuàng)建數(shù)組,數(shù)組長度就是32:
private static <T> MemoryRegionCache<T>[] createSubPageCaches(
int cacheSize, int numCaches, SizeClass sizeClass) {
if (cacheSize > 0 && numCaches > 0) {
@SuppressWarnings("unchecked")
MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
for (int i = 0; i < cache.length; i++) {
// TODO: maybe use cacheSize / cache.length
// cacheSize,這里是512
cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
}
return cache;
} else {
return null;
}
}
繼續(xù)跟進(jìn):
private static final class SubPageMemoryRegionCache<T> extends MemoryRegionCache<T> {
SubPageMemoryRegionCache(int size, SizeClass sizeClass) {
super(size, sizeClass);
}
@Override
protected void initBuf(
PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity) {
chunk.initBufWithSubpage(buf, nioBuffer, handle, reqCapacity);
}
}
MemoryRegionCache(int size, SizeClass sizeClass) {
this.size = MathUtil.safeFindNextPositivePowerOfTwo(size); // 這里還是512
queue = PlatformDependent.newFixedMpscQueue(this.size);
this.sizeClass = sizeClass;
}
分析到這里可以看到tinySubPageDirectCaches(MemoryRegionCache)最外層有32個(gè)節(jié)點(diǎn)(SubPageMemoryRegionCache),每個(gè)節(jié)點(diǎn)表示不同內(nèi)存規(guī)格(16B,32B,...,496B)的一個(gè)隊(duì)列,每個(gè)隊(duì)列的長度默認(rèn)是512個(gè)。

11. arena、chunk、page、subpage
-
Arena結(jié)構(gòu)
Arena結(jié)構(gòu).png
最外層是chunckList的數(shù)據(jù)結(jié)構(gòu),每個(gè)chunkList通過雙向鏈表進(jìn)行連接,每個(gè)節(jié)點(diǎn)都是一個(gè)chunk,每個(gè)chunk是向操作系統(tǒng)申請內(nèi)存的最小單位16M。chunkList為什么通過雙向鏈表連接起來呢,netty會實(shí)時(shí)計(jì)算chunk的實(shí)時(shí)分配情況,按照內(nèi)存使用率歸為不同的chunkList,這樣進(jìn)行內(nèi)存分配時(shí),netty會根據(jù)一定的算法定位到合適的chunkList,然后取其中的一個(gè)chunk進(jìn)行內(nèi)存分配
abstract class PoolArena<T> implements PoolArenaMetric {
static final boolean HAS_UNSAFE = PlatformDependent.hasUnsafe();
enum SizeClass {
Tiny,
Small,
Normal
}
static final int numTinySubpagePools = 512 >>> 4;
final PooledByteBufAllocator parent;
private final int maxOrder;
final int pageSize;
final int pageShifts;
final int chunkSize;
final int subpageOverflowMask;
final int numSmallSubpagePools;
final int directMemoryCacheAlignment;
final int directMemoryCacheAlignmentMask;
private final PoolSubpage<T>[] tinySubpagePools;
private final PoolSubpage<T>[] smallSubpagePools;
private final PoolChunkList<T> q050;
private final PoolChunkList<T> q025;
private final PoolChunkList<T> q000;
private final PoolChunkList<T> qInit;
private final PoolChunkList<T> q075;
private final PoolChunkList<T> q100;
// ...
}
protected PoolArena(PooledByteBufAllocator parent, int pageSize,
int maxOrder, int pageShifts, int chunkSize, int cacheAlignment) {
this.parent = parent;
this.pageSize = pageSize;
this.maxOrder = maxOrder;
this.pageShifts = pageShifts;
this.chunkSize = chunkSize;
directMemoryCacheAlignment = cacheAlignment;
directMemoryCacheAlignmentMask = cacheAlignment - 1;
subpageOverflowMask = ~(pageSize - 1);
tinySubpagePools = newSubpagePoolArray(numTinySubpagePools);
for (int i = 0; i < tinySubpagePools.length; i ++) {
tinySubpagePools[i] = newSubpagePoolHead(pageSize);
}
numSmallSubpagePools = pageShifts - 9;
smallSubpagePools = newSubpagePoolArray(numSmallSubpagePools);
for (int i = 0; i < smallSubpagePools.length; i ++) {
smallSubpagePools[i] = newSubpagePoolHead(pageSize);
}
// 初始化chunkList
q100 = new PoolChunkList<T>(this, null, 100, Integer.MAX_VALUE, chunkSize);
q075 = new PoolChunkList<T>(this, q100, 75, 100, chunkSize);
q050 = new PoolChunkList<T>(this, q075, 50, 100, chunkSize);
q025 = new PoolChunkList<T>(this, q050, 25, 75, chunkSize);
q000 = new PoolChunkList<T>(this, q025, 1, 50, chunkSize);
qInit = new PoolChunkList<T>(this, q000, Integer.MIN_VALUE, 25, chunkSize);
q100.prevList(q075);
q075.prevList(q050);
q050.prevList(q025);
q025.prevList(q000);
q000.prevList(null);
qInit.prevList(qInit);
List<PoolChunkListMetric> metrics = new ArrayList<PoolChunkListMetric>(6);
metrics.add(qInit);
metrics.add(q000);
metrics.add(q025);
metrics.add(q050);
metrics.add(q075);
metrics.add(q100);
chunkListMetrics = Collections.unmodifiableList(metrics);
}
-
Chunk的結(jié)構(gòu)
Chunk結(jié)構(gòu).pngchunk將里邊的內(nèi)存按8k拆分成了page,每個(gè)page又拆分為了4個(gè)subPage。
12. page級別的內(nèi)存分配:allocateNormal()
我們看PoolArena中allocateNormal代碼片段:
if (normCapacity <= chunkSize) {
if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) { // 從cache中分配
// was able to allocate out of the cache so move on
return;
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity); // 從arena中分配
++allocationsNormal;
}
}
從arena中allocateNormal:
private void allocateNormal(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) ||
q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) ||
q075.allocate(buf, reqCapacity, normCapacity)) {
return;
}
// Add a new chunk. 創(chuàng)建一個(gè)chunk進(jìn)行內(nèi)存分配
PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
boolean success = c.allocate(buf, reqCapacity, normCapacity);
assert success;
qInit.add(c);
}
- 嘗試從現(xiàn)有的chunkList分配內(nèi)存
+創(chuàng)建一個(gè)chunk進(jìn)行內(nèi)存分配
+初始化ByteBuf
13. subpage級別的內(nèi)存分配:allocateTiny()
- 定位一個(gè)Subpage對象
- 初始化subpage
- 初始化PooledByteBuf
通過代碼來調(diào)試:
public class TinyAllocate {
public static void main(String[] args) {
PooledByteBufAllocator allocator = PooledByteBufAllocator.DEFAULT;
allocator.directBuffer(16);
}
}
來看allocate的部分代碼片段:
boolean tiny = isTiny(normCapacity);
if (tiny) { // < 512
if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = tinyIdx(normCapacity);
table = tinySubpagePools;
} else {
if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
// was able to allocate out of the cache so move on
return;
}
tableIdx = smallIdx(normCapacity);
table = smallSubpagePools;
}
final PoolSubpage<T> head = table[tableIdx];
/**
* Synchronize on the head. This is needed as {@link PoolChunk#allocateSubpage(int)} and
* {@link PoolChunk#free(long)} may modify the doubly linked list as well.
*/
synchronized (head) {
final PoolSubpage<T> s = head.next;
if (s != head) { // 默認(rèn)情況下,頭節(jié)點(diǎn)是沒有任何subpage相關(guān)的信息
assert s.doNotDestroy && s.elemSize == normCapacity;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
incTinySmallAllocation(tiny);
return;
}
}
synchronized (this) {
allocateNormal(buf, reqCapacity, normCapacity);
}
incTinySmallAllocation(tiny);
return;
我們來看tinySubpagePools的結(jié)構(gòu),默認(rèn)情況下是和MemoryRegionCache的tiny結(jié)構(gòu)是一樣的。
tiny[32] 0 -> 16B -> 32B -> 48B -> ... 480B
private long allocateSubpage(int normCapacity) {
// Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
// This is need as we may add it back and so alter the linked-list structure.
PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
int d = maxOrder; // subpages are only be allocated from pages i.e., leaves
synchronized (head) {
int id = allocateNode(d);
if (id < 0) {
return id;
}
final PoolSubpage<T>[] subpages = this.subpages;
final int pageSize = this.pageSize;
freeBytes -= pageSize;
int subpageIdx = subpageIdx(id);
PoolSubpage<T> subpage = subpages[subpageIdx];
if (subpage == null) {
subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity);
subpages[subpageIdx] = subpage;
} else {
subpage.init(head, normCapacity);
}
return subpage.allocate();
}
}
14. ByteBuf的釋放
- 連續(xù)的內(nèi)存區(qū)段加到緩存
- 標(biāo)記連續(xù)的內(nèi)存區(qū)段為未使用
- ByteBuf加到對象池


