Netty 權(quán)威指南筆記(五):ByteBuf 源碼解讀
功能介紹
Java 本身提供了 ByteBuffer 類,為什么 Netty 還要搞一個 ByteBuf 類呢?因為 ByteBuffer 類有著許多缺點(diǎn):
- ByteBuffer 長度固定,無法動態(tài)伸縮。
- ByteBuffer 只有一個位置指針 position,讀寫的時候需要手工調(diào)用 flip 和 rewind 方法進(jìn)行模式轉(zhuǎn)換,操作繁瑣,容易出錯。
- 功能太少,缺少一些高級特性。
為了彌補(bǔ)這些不足,Netty 提供了自己的緩沖區(qū)類實現(xiàn) ByteBuf。有什么特點(diǎn)呢?
- 兩個位置指針協(xié)助緩沖區(qū)的讀寫操作:readerIndex、writerIndex,讀寫之間不需要調(diào)整指針位置,大大簡化了讀寫操作。
- 可以動態(tài)擴(kuò)容。
- 當(dāng)有部分內(nèi)容已經(jīng)讀取完成時,可以通過 discard 操作對緩沖區(qū)進(jìn)行整理,在不重新申請內(nèi)存的情況下,增大可寫字節(jié)數(shù)目。
- 支持標(biāo)記和回滾的功能。
- 支持在 ByteBuf 中查找某個字符串。
- 派生出另一個 ByteBuf:duplicate、copy、slice。
- 轉(zhuǎn)化成標(biāo)準(zhǔn)的 ByteBuffer,這是因為在使用 NIO 進(jìn)行網(wǎng)絡(luò)讀寫時,操作的對象還是 JDK 標(biāo)準(zhǔn)的 ByteBuffer。
- 隨機(jī)讀寫。
源碼分析
繼承關(guān)系
ByteBuf 的主要功能類繼承關(guān)系如下圖所示:

從內(nèi)存分配的角度看,ByteBuf 可以分為兩類:
- 堆內(nèi)存字節(jié)緩沖區(qū) HeapByteBuf:優(yōu)點(diǎn)是內(nèi)存分配和回收速度快,可以被 JVM 自動回收。缺點(diǎn)是,如果進(jìn)行 Socket 的 I/O 讀寫,需要額外做一次內(nèi)存復(fù)制,在堆內(nèi)存緩沖區(qū)和內(nèi)核 Channel 之間進(jìn)行復(fù)制,性能會有一定程度下降。
- 直接內(nèi)存字節(jié)緩沖區(qū) DirectByteBuf:非堆內(nèi)存,直接在堆外進(jìn)行分配。相比于堆內(nèi)存,內(nèi)存分配和回收稍慢,但是可以減少復(fù)制,提升性能。
兩種內(nèi)存,各有利弊。Netty 最佳實踐表明:在 I/O 通信線程的讀寫緩沖區(qū)使用 DirectByteBuf,后端業(yè)務(wù)消息的編解碼模塊使用 HeapByteBuf,這樣組合可以達(dá)到性能最優(yōu)。
從內(nèi)存回收的角度看,ByteBuf 也分為兩類:基于對象池的 ByteBuf 和普通 ByteBuf。兩者區(qū)別在于基于對象池的 ByteBuf 可以重用 ByteBuf 對象,它自己維護(hù)了一個內(nèi)存池,可以循環(huán)利用創(chuàng)建的 ByteBuf,提升內(nèi)存的使用效率,降低由于高負(fù)載導(dǎo)致的頻繁 GC。內(nèi)存池的缺點(diǎn)是管理和維護(hù)比較復(fù)雜,使用時需要更加謹(jǐn)慎。
下面我們對一些關(guān)鍵類進(jìn)行分析和解讀。
AbstractByteBuf
AbstractByteBuf 都做了哪些事兒呢?我們先看一下其主要的成員變量:
- 讀寫指針。
- 用于標(biāo)記回滾的 marked 讀寫指針。
- 最大容量 maxCapacity,用于進(jìn)行內(nèi)存保護(hù)。
- 與本 ByteBuf 大小端屬性相反的 ByteBuf:SwappedByteBuf。
我們發(fā)現(xiàn)這里沒有真正存儲數(shù)據(jù)的數(shù)據(jù)結(jié)構(gòu),例如 byte 數(shù)組或 DirectByteBuffer,原因是這里還不知道子類是要基于堆內(nèi)存還是直接內(nèi)存。
public abstract class AbstractByteBuf extends ByteBuf {
static final ResourceLeakDetector<ByteBuf> leakDetector = new ResourceLeakDetector<ByteBuf>(ByteBuf.class);
int readerIndex;
int writerIndex;
private int markedReaderIndex;
private int markedWriterIndex;
private int maxCapacity;
private SwappedByteBuf swappedBuf;
}
接下來我們看看讀操作 readBytes 方法,AbstractByteBuf 類做了什么呢?
- 在 checkReadableBytes 方法中,檢查入?yún)⒂行浴?/li>
- 修改讀指針 readerIndex。
@Override
public ByteBuf readBytes(byte[] dst, int dstIndex, int length) {
checkReadableBytes(length);
// getBytes 方法未在 AbstractByteBuf 中實現(xiàn)
getBytes(readerIndex, dst, dstIndex, length);
readerIndex += length;
return this;
}
protected final void checkReadableBytes(int minimumReadableBytes) {
ensureAccessible();
if (minimumReadableBytes < 0) {
throw new IllegalArgumentException("minimumReadableBytes: " + minimumReadableBytes + " (expected: >= 0)");
}
if (readerIndex > writerIndex - minimumReadableBytes) {
throw new IndexOutOfBoundsException(String.format(
"readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s",
readerIndex, minimumReadableBytes, writerIndex, this));
}
}
從當(dāng)前 ByteBuf 中復(fù)制數(shù)據(jù)到 dst 是在 getBytes 方法中,該方法未在 AbstractByteBuf 中實現(xiàn),也是因為此時具體如何存儲數(shù)據(jù)尚不確定。
下面我們看一下寫操作 writeBytes 方法,AbstractByteBuf 負(fù)責(zé)實現(xiàn)了哪些操作呢?
- 有效性檢查,如果引用計數(shù) refCnt 為 0,表示該 ByteBuf 已經(jīng)被回收,不能再寫入。
- 輸入?yún)?shù)有效性檢查:要寫入的數(shù)據(jù)量不能小于 0,寫入之后總數(shù)據(jù)量也不能大于最大容量。
- 當(dāng)容量不足時,如果尚未超過最大容量,則進(jìn)行擴(kuò)容。
- 修改寫指針。
@Override
public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
ensureAccessible();
ensureWritable(length);
// setBytes 交給子類實現(xiàn)。
setBytes(writerIndex, src, srcIndex, length);
writerIndex += length;
return this;
}
protected final void ensureAccessible() {
if (refCnt() == 0) {
throw new IllegalReferenceCountException(0);
}
}
@Override
public ByteBuf ensureWritable(int minWritableBytes) {
if (minWritableBytes < 0) {
throw new IllegalArgumentException(String.format(
"minWritableBytes: %d (expected: >= 0)", minWritableBytes));
}
if (minWritableBytes <= writableBytes()) {
return this;
}
if (minWritableBytes > maxCapacity - writerIndex) {
throw new IndexOutOfBoundsException(String.format(
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this));
}
// Normalize the current capacity to the power of 2.
int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);
// 具體擴(kuò)容操作由子類實現(xiàn)。
capacity(newCapacity);
return this;
}
在讀寫操作中,AbstractByteBuf 主要負(fù)責(zé)參數(shù)校驗、讀寫指針修改,以及寫操作時的擴(kuò)容計算。
除此之外,AbstractByteBuf 還提供了以下功能:
- 操作索引:修改讀寫指針、mark & reset。
- 重用緩沖區(qū):discardReadBytes。
- 丟棄部分?jǐn)?shù)據(jù):skipBytes。因為丟棄時,只需要修改讀指針即可,與數(shù)據(jù)具體如何存儲無關(guān)。
總結(jié):在 AbstractByteBuf 中實現(xiàn)的是各個子類中通用的功能。
AbstractReferenceCountedByteBuf
從類名可以看出來,該類主要提供引用計數(shù)的功能,類似于 JVM 內(nèi)存回收的對象引用計數(shù)器,用于跟蹤對象的分配和回收,實現(xiàn)手動控制內(nèi)存回收。
首先,我們看一下其成員變量:
- refCnt:記錄對象引用次數(shù)。
- refCntUpdater:用于對 refCnt 進(jìn)行原子更新。
private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater;
static {
AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> updater =
PlatformDependent.newAtomicIntegerFieldUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
if (updater == null) {
updater = AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
}
refCntUpdater = updater;
}
private volatile int refCnt = 1;
接下來,我們看一下增加引用計數(shù)的 retain 方法。該方法是用 CAS 操作對 refCnt 進(jìn)行加 1。另外,refCnt 值為 0 或 Integer.MAX_VALUE 值不能再操作,會拋出異常。
@Override
public ByteBuf retain() {
for (;;) {
int refCnt = this.refCnt;
if (refCnt == 0) {
throw new IllegalReferenceCountException(0, 1);
}
if (refCnt == Integer.MAX_VALUE) {
throw new IllegalReferenceCountException(Integer.MAX_VALUE, 1);
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt + 1)) {
break;
}
}
return this;
}
另一個 release 方法表示釋放資源,會將引用計數(shù) refCnt 減 1,如果當(dāng)前 refCnt 等于 1,減 1 之后等于 0,表示對象已經(jīng)沒有被引用,可以被回收了,會調(diào)用 deallocate 方法釋放內(nèi)存。
@Override
public final boolean release() {
for (;;) {
int refCnt = this.refCnt;
if (refCnt == 0) {
throw new IllegalReferenceCountException(0, -1);
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) {
if (refCnt == 1) {
deallocate();
return true;
}
return false;
}
}
}
在 UnpooledHeapByteBuf 中,釋放內(nèi)存僅僅是把 array 數(shù)組置為 null,剩下的內(nèi)存回收工作交由 JVM 來完成。
// in UnpooledHeapByteBuf.java
private byte[] array;
@Override
protected void deallocate() {
array = null;
}
在 UnpooledDirectByteBuf 中,則是調(diào)用 PlatformDependent.freeDirectBuffer 來釋放直接內(nèi)存。
// in UnpooledDirectByteBuf.java
@Override
protected void deallocate() {
ByteBuffer buffer = this.buffer;
if (buffer == null) {
return;
}
this.buffer = null;
if (!doNotFree) {
freeDirect(buffer);
}
}
protected void freeDirect(ByteBuffer buffer) {
PlatformDependent.freeDirectBuffer(buffer);
}
UnpooledHeapByteBuf
UnpooledHeapByteBuf 是基于堆內(nèi)存進(jìn)行內(nèi)存分配的字節(jié)緩沖區(qū),它沒有基于對象池實現(xiàn),意味著每次 I/O 讀寫都會創(chuàng)建一個新的 UnpooledHeapByteBuf 對象,頻繁進(jìn)行內(nèi)存的分配和釋放對性能會有一定的影響,但是相對堆外內(nèi)存的申請和釋放,成本稍低。
相比于 PooledHeapByteBuf,不需要自己管理內(nèi)存池,不容易出現(xiàn)內(nèi)存管理方面的問題,更容易使用和維護(hù)。因此,在滿足性能的情況下,推薦使用 UnpooledHeapByteBuf。
首先看一下 UnpooledHeapByteBuf 的成員變量:
- 負(fù)責(zé)內(nèi)存分配的 ByteBufAllocator。
- 緩沖區(qū)實現(xiàn) byte 數(shù)組。
- 從 ByteBuf 到 NIO 的 ByteBuffer 的轉(zhuǎn)換對象 tmpNioBuf。
private final ByteBufAllocator alloc;
private byte[] array;
private ByteBuffer tmpNioBuf;
在將 AbstractByteBuf 的時候,我們提到 getBytes、capacity 等方法是由子類來實現(xiàn)的,這里我們先看看 getBytes 的實現(xiàn),從代碼中可以看出來,是直接調(diào)用 System.arraycopy 進(jìn)行的數(shù)組復(fù)制。
@Override
public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
checkDstIndex(index, length, dstIndex, dst.length);
System.arraycopy(array, index, dst, dstIndex, length);
return this;
}
接下來看一下動態(tài)伸縮的 capacity 方法,主要做了以下幾件事:
- 參數(shù)校驗,newCapacity 不能小于 0,大于 maxCapacity。
- 如果 maxCapacity 大于 oldCapacity 表示擴(kuò)容,直接申請新的 byte 數(shù)組,進(jìn)行內(nèi)存復(fù)制即可。
- 如果 maxCapacity 小于 oldCapacity 就是縮容了,同樣申請 byte 數(shù)組。不同的是,需要根據(jù)讀指針 readerIndex 與 newCapacity 的大小來決定是否需要進(jìn)行內(nèi)存復(fù)制。當(dāng) readerIndex 小于 newCapacity 時,需要復(fù)制內(nèi)存,否則不需要。
- 設(shè)置合適的讀寫指針位置。
- 更新緩沖區(qū)字節(jié)數(shù)組引用 array 的值。
@Override
public ByteBuf capacity(int newCapacity) {
ensureAccessible();
if (newCapacity < 0 || newCapacity > maxCapacity()) {
throw new IllegalArgumentException("newCapacity: " + newCapacity);
}
int oldCapacity = array.length;
if (newCapacity > oldCapacity) {
byte[] newArray = new byte[newCapacity];
System.arraycopy(array, 0, newArray, 0, array.length);
setArray(newArray);
} else if (newCapacity < oldCapacity) {
byte[] newArray = new byte[newCapacity];
int readerIndex = readerIndex();
if (readerIndex < newCapacity) {
int writerIndex = writerIndex();
if (writerIndex > newCapacity) {
writerIndex(writerIndex = newCapacity);
}
System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
} else {
setIndex(newCapacity, newCapacity);
}
setArray(newArray);
}
return this;
}
private void setArray(byte[] initialArray) {
array = initialArray;
tmpNioBuf = null;
}
public ByteBuf setIndex(int readerIndex, int writerIndex) {
if (readerIndex < 0 || readerIndex > writerIndex || writerIndex > capacity()) {
throw new IndexOutOfBoundsException(String.format(
"readerIndex: %d, writerIndex: %d (expected: 0 <= readerIndex <= writerIndex <= capacity(%d))",
readerIndex, writerIndex, capacity()));
}
this.readerIndex = readerIndex;
this.writerIndex = writerIndex;
return this;
}
從 ByteBuf 到 ByteBuffer 的轉(zhuǎn)換,主要是使用了 ByteBuffer 的 wrap 方法:
@Override
public ByteBuffer nioBuffer(int index, int length) {
ensureAccessible();
return ByteBuffer.wrap(array, index, length).slice();
}
PooledByteBuf
PooledByteBuf 是 ByteBuf 的內(nèi)存池實現(xiàn),應(yīng)用自己實現(xiàn)的內(nèi)存池管理策略,一般和操作系統(tǒng)的內(nèi)存管理策略差不多,往往會更簡單些。PooledByteBuf 內(nèi)存池的分配和釋放,主要通過 PoolArena 來實現(xiàn)。比如在 capacity 方法中,最終會使用 arena 的 reallocate 方法來重新分配內(nèi)存。
public final ByteBuf capacity(int newCapacity) {
ensureAccessible();
// If the request capacity does not require reallocation, just update the length of the memory.
if (chunk.unpooled) {
if (newCapacity == length) {
return this;
}
} else {
if (newCapacity > length) {
if (newCapacity <= maxLength) {
length = newCapacity;
return this;
}
} else if (newCapacity < length) {
if (newCapacity > maxLength >>> 1) {
if (maxLength <= 512) {
if (newCapacity > maxLength - 16) {
length = newCapacity;
setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
return this;
}
} else { // > 512 (i.e. >= 1024)
length = newCapacity;
setIndex(Math.min(readerIndex(), newCapacity), Math.min(writerIndex(), newCapacity));
return this;
}
}
} else {
return this;
}
}
// 最終使用 arena 的 reallocate 方法來重新分配內(nèi)存。
chunk.arena.reallocate(this, newCapacity, true);
return this;
}
PoolArena 是由多個 PoolChunk 組成的大塊內(nèi)存區(qū)域。
abstract class PoolArena<T> {
static final int numTinySubpagePools = 512 >>> 4;
final PooledByteBufAllocator parent;
private final int maxOrder;
final int pageSize;
final int pageShifts;
final int chunkSize;
final int subpageOverflowMask;
final int numSmallSubpagePools;
private final PoolSubpage<T>[] tinySubpagePools;
private final PoolSubpage<T>[] smallSubpagePools;
private final PoolChunkList<T> q050;
private final PoolChunkList<T> q025;
private final PoolChunkList<T> q000;
private final PoolChunkList<T> qInit;
private final PoolChunkList<T> q075;
private final PoolChunkList<T> q100;
}
// PoolChunkList 是 PoolChunk 組成的鏈表
final class PoolChunkList<T> {
private final PoolArena<T> arena;
private final PoolChunkList<T> nextList;
PoolChunkList<T> prevList;
private final int minUsage;
private final int maxUsage;
private PoolChunk<T> head;
}
每個 PoolChunk 由多個 PoolSubpage 組成。
final class PoolChunk<T> {
final PoolArena<T> arena;
final T memory;
final boolean unpooled;
private final byte[] memoryMap;
private final byte[] depthMap;
private final PoolSubpage<T>[] subpages;
/** Used to determine if the requested capacity is equal to or greater than pageSize. */
private final int subpageOverflowMask;
private final int pageSize;
private final int pageShifts;
private final int maxOrder;
private final int chunkSize;
private final int log2ChunkSize;
private final int maxSubpageAllocs;
/** Used to mark memory as unusable */
private final byte unusable;
private int freeBytes; // 當(dāng)前 chunk 空閑字節(jié)數(shù)目
PoolChunkList<T> parent; // 父節(jié)點(diǎn)
PoolChunk<T> prev; // 鏈表前一個節(jié)點(diǎn)
PoolChunk<T> next; // 鏈表后一個節(jié)點(diǎn)
}
PoolSubpage 負(fù)責(zé)管理一個 Page 的內(nèi)存,通過 bitmap 中的每一位來標(biāo)記每一塊兒內(nèi)存的占用狀態(tài)。
final class PoolSubpage<T> {
final PoolChunk<T> chunk;
private final int memoryMapIdx; // 當(dāng)前page在chunk中的id
private final int runOffset; // 當(dāng)前page在chunk.memory的偏移量
private final int pageSize; // page大小
private final long[] bitmap; // 通過對每一個二進(jìn)制位的標(biāo)記來修改一段內(nèi)存的占用狀態(tài)
PoolSubpage<T> prev;
PoolSubpage<T> next;
boolean doNotDestroy;
int elemSize;
private int maxNumElems;
private int bitmapLength;
private int nextAvail;
private int numAvail;
}
PooledDirectByteBuf
PooledDirectByteBuf 基于內(nèi)存池實現(xiàn),與 UnPooledDirectByteBuf 的唯一區(qū)別就是,緩沖區(qū)的分配和銷毀策略不同。不僅緩沖區(qū)所需內(nèi)存使用內(nèi)存池分配管理,PooledDirectByteBuf 對象本身,也使用 Recycler 管理。 比如 PooledDirectByteBuf 創(chuàng)建示例調(diào)用的是 Recycler 的 get 方法。
final class PooledDirectByteBuf extends PooledByteBuf<ByteBuffer> {
private static final Recycler<PooledDirectByteBuf> RECYCLER = new Recycler<PooledDirectByteBuf>() {
@Override
protected PooledDirectByteBuf newObject(Handle<PooledDirectByteBuf> handle) {
return new PooledDirectByteBuf(handle, 0);
}
};
static PooledDirectByteBuf newInstance(int maxCapacity) {
PooledDirectByteBuf buf = RECYCLER.get();
buf.setRefCnt(1);
buf.maxCapacity(maxCapacity);
return buf;
}
}
Recycler 是一個輕量級的對象池,一個對象池最核心的方法是從池中獲取對象和回收對象到池中,分別對應(yīng)其 get 和 recycle 方法。
public abstract class Recycler<T> {
public final T get() {
Stack<T> stack = threadLocal.get();
DefaultHandle<T> handle = stack.pop();
if (handle == null) {
handle = stack.newHandle();
handle.value = newObject(handle);
}
return (T) handle.value;
}
public final boolean recycle(T o, Handle<T> handle) {
DefaultHandle<T> h = (DefaultHandle<T>) handle;
if (h.stack.parent != this) {
return false;
}
h.recycle(o);
return true;
}
}
PooledDirectByteBuf 中的 copy 方法用于復(fù)制一個新的字節(jié)緩沖區(qū)實例,該方法首先調(diào)用 PooledByteBufAllocator 的 directBuffer 來生成新的 ByteBuf,然后復(fù)制數(shù)據(jù)。
@Override
public ByteBuf copy(int index, int length) {
checkIndex(index, length);
ByteBuf copy = alloc().directBuffer(length, maxCapacity());
copy.writeBytes(this, index, length);
return copy;
}
directBuffer 是在抽象類 AbstractByteBufAllocator 中實現(xiàn)的,進(jìn)行參數(shù)校驗之后調(diào)用 newDirectBuffer 來獲取 ByteBuf,該方法由子類來實現(xiàn)。
@Override
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
if (initialCapacity == 0 && maxCapacity == 0) {
return emptyBuf;
}
validate(initialCapacity, maxCapacity);
return newDirectBuffer(initialCapacity, maxCapacity);
}
在內(nèi)存池版本 PooledByteBufAllocator 的實現(xiàn)中,判斷如果內(nèi)存池 directArena 可用,則從中獲取,否則自行 new 一個。
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<ByteBuffer> directArena = cache.directArena;
ByteBuf buf;
if (directArena != null) {
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
if (PlatformDependent.hasUnsafe()) {
buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
}
return toLeakAwareBuffer(buf);
}
而在非內(nèi)存池版本 UnpooledByteBufAllocator 中,則是直接 new 一個。
@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
ByteBuf buf;
if (PlatformDependent.hasUnsafe()) {
buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}
輔助類
- 內(nèi)存分配相關(guān): ByteBufAllocator 及其子類 UnpooledByteBufAllocator、PooledByteBufAllocator。
- 組合視圖:CompositeByteBuf。
- 工具類:ByteBufUtil。