ByteBuf基礎(chǔ)
Java Nio 的Buffer
在進(jìn)行數(shù)據(jù)傳輸?shù)倪^程中,我們經(jīng)常會(huì)用到緩沖區(qū)。
在Java NIO 為我們提供了原生的七種緩沖區(qū)實(shí)現(xiàn),對(duì)應(yīng)著Java 的七種基本類型。一般使用ByteBuffer較多。原生的Buffer雖然能滿足我們的日常使用,但是要進(jìn)行復(fù)雜的應(yīng)用的時(shí)候,確有點(diǎn)力不從心了,原生Buffer存在著以下缺點(diǎn)。因此Netty對(duì)其進(jìn)行了封裝,提供了更為友好的接口供我們使用。
- 當(dāng)我們調(diào)用對(duì)應(yīng)Buffer類的allocate方法來創(chuàng)建緩沖區(qū)實(shí)例的時(shí)候,會(huì)分配指定的空間,同時(shí)緩沖區(qū)的長(zhǎng)度就會(huì)被固定,不能進(jìn)行動(dòng)態(tài)的增長(zhǎng)或者收縮。如果我們寫入的數(shù)據(jù)大于緩沖區(qū)的capacity的時(shí)候,就會(huì)發(fā)生數(shù)組越界錯(cuò)誤。
- Buffer只有一個(gè)位置標(biāo)志位屬性Position,我們只能flip或者rewind方法來對(duì)position進(jìn)行修改來處理數(shù)據(jù)的存取位置,一不小心就可能會(huì)導(dǎo)致錯(cuò)誤。
- Buffer只提供了存取、翻轉(zhuǎn)、釋放、標(biāo)志、比較、批量移動(dòng)等緩沖區(qū)的基本操作,我們想使用高級(jí)的功能,就得自己手動(dòng)進(jìn)行封裝及維護(hù),使用非常不方便。
ByteBuf工作原理
ByteBuf也是通過字節(jié)數(shù)組作為緩沖區(qū)來存取數(shù)據(jù),通過外觀模式聚合了JDK NIO元素的ByteBuffer,進(jìn)行封裝。
ByteBuf是通過readerIndex跟writerIndex兩個(gè)位置指針來協(xié)助緩沖區(qū)的讀寫操作的。
在對(duì)象初始化的時(shí)候,readerIndex和writerIndex的值為0,隨著讀操作和寫操作的進(jìn)行,writerIndex和readerIndex都會(huì)增加,不過readerIndex不能超過writerIndex,在進(jìn)行讀取操作之后,0到readerIndex之間的空間會(huì)被視為discard,調(diào)用ByteBuf的discardReadBytes方法,可以對(duì)這部分空間進(jìn)行釋放重用,類似于ByteBuffer的compact操作,對(duì)緩沖區(qū)進(jìn)行壓縮。readerIndex到writerIndex的空間,相當(dāng)于ByteBuffer的position到limit的空間,可以對(duì)其進(jìn)行讀取,WriterIndex到capacity的空間,則相當(dāng)于ByteBuffer的limit到capacity的空間,是可以繼續(xù)寫入的。
readerIndex跟writerIndex讓讀寫操作的位置指針分離,不需要對(duì)同一個(gè)位置指針進(jìn)行調(diào)整,簡(jiǎn)化了緩沖區(qū)的讀寫操作。
同樣,ByteBuf對(duì)讀寫操作進(jìn)行了封裝,提供了動(dòng)態(tài)擴(kuò)展的能力,當(dāng)我們對(duì)緩沖區(qū)進(jìn)行寫操作的時(shí)候,需要對(duì)剩余的可用空間進(jìn)行校驗(yàn),如果可用空間不足,同時(shí)要寫入的字節(jié)數(shù)小于可寫的最大字節(jié)數(shù),會(huì)對(duì)緩沖區(qū)進(jìn)行動(dòng)態(tài)擴(kuò)展,它會(huì)重新創(chuàng)建一個(gè)緩沖區(qū),然后將以前的數(shù)據(jù)復(fù)制到新創(chuàng)建的緩沖區(qū)中,
ByteBuf基本功能
- 順序讀
在進(jìn)行讀操作之前,首先對(duì)緩沖區(qū)可用的空間進(jìn)行校驗(yàn)。如果要讀取的字節(jié)長(zhǎng)度小于0,就會(huì)拋出IllegalArgumentException異常,如果要讀取的字節(jié)長(zhǎng)度大于已寫入的字節(jié)長(zhǎng)度,會(huì)拋出IndexOutOfBoundsException異常。通過校驗(yàn)之后,調(diào)用getBytes方法,從當(dāng)前的readerIndex開始,讀取length長(zhǎng)度的字節(jié)數(shù)據(jù)到目標(biāo)dst中,由于不同的子類實(shí)現(xiàn)不一樣,getBytes是個(gè)抽象方法,由對(duì)應(yīng)的子類去實(shí)現(xiàn)。如果讀取數(shù)據(jù)成功,readerIndex將會(huì)增加相應(yīng)的length。
public ByteBuf readBytes(ByteBuf dst, int dstIndex, int length) {
checkReadableBytes(length);
getBytes(readerIndex, dst, dstIndex, length);
readerIndex += length;
return this;
}
protected final void checkReadableBytes(int minimumReadableBytes) {
ensureAccessible();
if (minimumReadableBytes < 0) {
throw new IllegalArgumentException("minimumReadableBytes: " + minimumReadableBytes + " (expected: >= 0)");
}
if (readerIndex > writerIndex - minimumReadableBytes) {
throw new IndexOutOfBoundsException(String.format(
"readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s",
readerIndex, minimumReadableBytes, writerIndex, this));
}
}
- 順序?qū)?br>
讀操作是將源字節(jié)數(shù)組從srcIndex開始,length長(zhǎng)度的數(shù)據(jù)寫入到當(dāng)前的ByteBuf中的。
一開始需要對(duì)寫入數(shù)組的字節(jié)數(shù)進(jìn)行校驗(yàn),如果寫入長(zhǎng)度小于0,將會(huì)拋出IllegalArgumentException異常,如果寫入字節(jié)數(shù)小于當(dāng)前ByteBuf的可寫入字節(jié)數(shù),則通過檢驗(yàn)。如果寫入字節(jié)數(shù)大于緩沖區(qū)最大可動(dòng)態(tài)擴(kuò)展的容量maxCapacity,就會(huì)拋出
IndexOutOfBoundsException異常,否則的話,就會(huì)通過動(dòng)態(tài)擴(kuò)展來滿足寫入需要的字節(jié)數(shù)。首先通過calculateNewCapacity計(jì)算出重新擴(kuò)展后的容量,然后調(diào)用capacity方法進(jìn)行擴(kuò)展,不同的子類有不同實(shí)現(xiàn),所以也是一個(gè)抽象方法。- 計(jì)算擴(kuò)展容量,首先設(shè)置門閥值為4m,如果要擴(kuò)展的容量等于閥值就使用閥值作為緩沖區(qū)新的容量,如果大于閥值就以4M作為步長(zhǎng),每次增加4M,如果擴(kuò)展期間,要擴(kuò)展的容量比最大可擴(kuò)展容量還大的話,就以最大可擴(kuò)展容量maxCapacity為新的容量。否則的話,就從64開始倍增,直到倍增之后的結(jié)果大于要擴(kuò)展的容量,再把結(jié)果作為緩沖區(qū)的新容量。
- 通過先倍增再步長(zhǎng)來擴(kuò)展容量,如果我們只是writerIndex+length的值作為緩沖區(qū)的新容量,那么再以后進(jìn)行寫操作的時(shí)候,每次都需要進(jìn)行容量擴(kuò)展,容量擴(kuò)展的過程需要進(jìn)行內(nèi)存復(fù)制,過多內(nèi)存復(fù)制會(huì)導(dǎo)致系統(tǒng)的性能下降,之所以是倍增再部長(zhǎng),在最初空間比較小的時(shí)候,倍增操作并不會(huì)帶來太多的內(nèi)存浪費(fèi),但是內(nèi)存增長(zhǎng)到一定的時(shí)候,再進(jìn)行倍增的時(shí)候,就會(huì)對(duì)內(nèi)存造成浪費(fèi),因此,需要設(shè)定一個(gè)閥值,到達(dá)閥值之后就通過步長(zhǎng)的方法進(jìn)行平滑的增長(zhǎng)。
public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
ensureWritable(length);
setBytes(writerIndex, src, srcIndex, length);
writerIndex += length;
return this;
}
public ByteBuf ensureWritable(int minWritableBytes) {
if (minWritableBytes < 0) {
throw new IllegalArgumentException(String.format(
"minWritableBytes: %d (expected: >= 0)", minWritableBytes));
}
if (minWritableBytes <= writableBytes()) {
return this;
}
if (minWritableBytes > maxCapacity - writerIndex) {
throw new IndexOutOfBoundsException(String.format(
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this));
}
// Normalize the current capacity to the power of 2.
int newCapacity = calculateNewCapacity(writerIndex + minWritableBytes);
// Adjust to the new capacity.
capacity(newCapacity);
return this;
}
private int calculateNewCapacity(int minNewCapacity) {
final int maxCapacity = this.maxCapacity;
final int threshold = 1048576 * 4; // 4 MiB page
if (minNewCapacity == threshold) {
return threshold;
}
// If over threshold, do not double but just increase by threshold.
if (minNewCapacity > threshold) {
int newCapacity = minNewCapacity / threshold * threshold;
if (newCapacity > maxCapacity - threshold) {
newCapacity = maxCapacity;
} else {
newCapacity += threshold;
}
return newCapacity;
}
// Not over threshold. Double up to 4 MiB, starting from 64.
int newCapacity = 64;
while (newCapacity < minNewCapacity) {
newCapacity <<= 1;
}
return Math.min(newCapacity, maxCapacity);
}
//UnpooledHeapByteBuf的capacity實(shí)現(xiàn)
public ByteBuf capacity(int newCapacity) {
ensureAccessible();
if (newCapacity < 0 || newCapacity > maxCapacity()) {
throw new IllegalArgumentException("newCapacity: " + newCapacity);
}
int oldCapacity = array.length;
if (newCapacity > oldCapacity) {
byte[] newArray = new byte[newCapacity];
System.arraycopy(array, 0, newArray, 0, array.length);
setArray(newArray);
} else if (newCapacity < oldCapacity) {
byte[] newArray = new byte[newCapacity];
int readerIndex = readerIndex();
if (readerIndex < newCapacity) {
int writerIndex = writerIndex();
if (writerIndex > newCapacity) {
writerIndex(writerIndex = newCapacity);
}
System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
} else {
setIndex(newCapacity, newCapacity);
}
setArray(newArray);
}
return this;
}
- Clear操作
clear操作只是把readerIndex和writerIndex設(shè)置為0,不會(huì)對(duì)存儲(chǔ)的數(shù)據(jù)進(jìn)行修改。
public ByteBuf clear() {
readerIndex = writerIndex = 0;
return this;
}
-
索引操作
- 讀寫位置索引設(shè)置:主要是對(duì)邊界條件進(jìn)行校驗(yàn),設(shè)置readerIndex的時(shí)候,newReaderIndex不能小于0跟大于writerIndex;設(shè)置writerIndex的時(shí)候,newWriterIndex必須大于readerIndex和小于當(dāng)前的capacity。如果不能通過校驗(yàn)的話,就會(huì)拋出IndexOutOfBoundsException異常。
- mark和reset操作:由于有readerIndex和writerIndex,因此進(jìn)行mark或者reset需要指定相應(yīng)的操作位置索引,mark操作會(huì)把當(dāng)前的readerIndex或者writerIndex設(shè)置為markedReaderIndex或者markedWriterIndex;reset操作的話,它是參入對(duì)應(yīng)的mark值調(diào)用對(duì)應(yīng)readerIndex()或者writerIndex();
-
緩沖區(qū)重用
可以通過discardReadByte方法去重用已經(jīng)讀取過的緩沖區(qū)。
首先對(duì)readerIndex進(jìn)行判斷:- 如果readerIndex等于0,就說明沒有讀取數(shù)據(jù),沒有可以用來重用的空間,直接返回;
- 如果readerIndex大于0且不等于writerIndex的話,說明有進(jìn)行數(shù)據(jù)讀取被丟棄的緩沖區(qū),也有還沒有被讀取的緩沖區(qū)。調(diào)用setBytes方法進(jìn)行字節(jié)數(shù)組的復(fù)制,將沒被讀取的數(shù)據(jù)移動(dòng)到緩沖區(qū)的起始位置,重新去設(shè)置readerIndex和writerIndex,readerIndex為0,writerIndex為原writerIndex-readerIndex;同時(shí),也需要對(duì)mark進(jìn)行重新設(shè)置。
- 首先對(duì)markedReaderIndex進(jìn)行備份然后跟decrement進(jìn)行比較,如果markedReaderIndex比decrement小的話,markedReaderIndex設(shè)置為0,再用markedWriterIndex跟decrement比較,如果小于的話,markedWriterIndex也設(shè)置為0,否則的話markedWriterIndex較少decrement;
- 如果markedReaderIndex比decrement大的話,markedReaderIndex和markedReaderIndex都減去decrement就可以了。
- 如果readerIndex等于writerIndex的話,說明沒有可以進(jìn)行重用的緩沖區(qū),直接對(duì)mark重新設(shè)置就可以了,不需要內(nèi)存復(fù)制。
public ByteBuf discardReadBytes() {
ensureAccessible();
if (readerIndex == 0) {
return this;
}
if (readerIndex != writerIndex) {
setBytes(0, this, readerIndex, writerIndex - readerIndex);
writerIndex -= readerIndex;
adjustMarkers(readerIndex);
readerIndex = 0;
} else {
adjustMarkers(readerIndex);
writerIndex = readerIndex = 0;
}
return this;
}
protected final void adjustMarkers(int decrement) {
int markedReaderIndex = this.markedReaderIndex;
if (markedReaderIndex <= decrement) {
this.markedReaderIndex = 0;
int markedWriterIndex = this.markedWriterIndex;
if (markedWriterIndex <= decrement) {
this.markedWriterIndex = 0;
} else {
this.markedWriterIndex = markedWriterIndex - decrement;
}
} else {
this.markedReaderIndex = markedReaderIndex - decrement;
markedWriterIndex -= decrement;
}
}
- skipBytes
當(dāng)我們需要跳過某些不需要的字節(jié)的時(shí)候,可以調(diào)用skipBytes方法來跳過指定長(zhǎng)度的字節(jié)來讀取后面的數(shù)據(jù)。
首先對(duì)跳躍長(zhǎng)度進(jìn)行判斷,如果跳躍長(zhǎng)度小于0的話,會(huì)拋出IllegalArgumentException異常,或者跳躍長(zhǎng)度大于當(dāng)前緩沖區(qū)可讀長(zhǎng)度的話,會(huì)拋出IndexOutOfBoundsException異常。如果校驗(yàn)通過,新的readerindex為原readerIndex+length,如果新的readerIndex大于writerIndex的話,會(huì)拋出IndexOutOfBoundsException異常,否則就更新readerIndex。
public ByteBuf skipBytes(int length) {
checkReadableBytes(length);
int newReaderIndex = readerIndex + length;
if (newReaderIndex > writerIndex) {
throw new IndexOutOfBoundsException(String.format(
"length: %d (expected: readerIndex(%d) + length <= writerIndex(%d))",
length, readerIndex, writerIndex));
}
readerIndex = newReaderIndex;
return this;
}
ByteBuf源碼分析
[圖片上傳失敗...(image-54964b-1539838427542)]
AbstractReferenceCountedByteBuf
AbstractReferenceCountedByteBuf是ByteBuf實(shí)現(xiàn)對(duì)引用進(jìn)行計(jì)數(shù)的基類,用來跟蹤對(duì)象的分配和銷毀,實(shí)現(xiàn)自動(dòng)內(nèi)存回收。
- 成員變量
- refCntUpdater refCntUpdater是一個(gè)AtomicIntegerFieldUpdater類型的成員變量,它可以對(duì)成員變量進(jìn)行原子性更新操作,達(dá)到線程安全。
- REFCNT_FIELD_OFFSET REFCNT_FIELD_OFFSET是標(biāo)識(shí)refCnt字段在AbstractReferenceCountedByteBuf的內(nèi)存地址,在UnpooledDirectByteBuf和PooledDirectByteBuf兩個(gè)子類中都會(huì)使用到這個(gè)偏移量。
- refCnt volatile修飾保證變量的線程可見性,用來跟蹤對(duì)象的引用次數(shù)
- 對(duì)象引用計(jì)數(shù)器
每調(diào)用retain方法一次,引用計(jì)數(shù)器就會(huì)加一。retain方法通過自旋對(duì)引用計(jì)數(shù)器進(jìn)行加一操作,引用計(jì)數(shù)器的初始值為1,只要程序是正確執(zhí)行的話,它的最小值應(yīng)該為1,當(dāng)申請(qǐng)和釋放次數(shù)相等的時(shí)候,對(duì)應(yīng)的ByteBuf就會(huì)被回收。當(dāng)次數(shù)為0時(shí),表明對(duì)象被錯(cuò)誤的引用,就會(huì)拋出IllegalReferenceCountException異常,如果次數(shù)等于Integer類型的最大值,就會(huì)拋出
IllegalReferenceCountException異常。retain通過refCntUpdater的compareAndSet方法進(jìn)行原子操作更新,compareAndSet會(huì)使用獲取的值與期望值進(jìn)行比較,如果在比較器件,有其他線程對(duì)變量進(jìn)行修改,那么比較失敗,會(huì)再次自旋,獲取引用計(jì)數(shù)器的值再次進(jìn)行比較,否則的話,就會(huì)進(jìn)行加一操作,退出自旋。
release方法的話與retain方法類似,也是通過自旋循環(huán)進(jìn)行判斷和更新,不過當(dāng)refCnt的值等于1的時(shí)候,表明引用計(jì)數(shù)器的申請(qǐng)跟釋放次數(shù)一樣,對(duì)象引用已經(jīng)不可達(dá)了,對(duì)象應(yīng)該要被垃圾收集回收掉了,調(diào)用deallocate方法釋放ByteBuf對(duì)象
public ByteBuf retain() {
for (;;) {
int refCnt = this.refCnt;
if (refCnt == 0) {
throw new IllegalReferenceCountException(0, 1);
}
if (refCnt == Integer.MAX_VALUE) {
throw new IllegalReferenceCountException(Integer.MAX_VALUE, 1);
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt + 1)) {
break;
}
}
return this;
}
public final boolean release() {
for (;;) {
int refCnt = this.refCnt;
if (refCnt == 0) {
throw new IllegalReferenceCountException(0, -1);
}
if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) {
if (refCnt == 1) {
deallocate();
return true;
}
return false;
}
}
}
UnpooledHeapByteBuf
UnpooledHeapByteBuf是一個(gè)非線程池實(shí)現(xiàn)的在堆內(nèi)存進(jìn)行內(nèi)存分配的字節(jié)緩沖區(qū),在每次IO操作的都會(huì)去創(chuàng)建一個(gè)UnpooledHeapByteBuf對(duì)象,如果頻繁地對(duì)內(nèi)存進(jìn)行分配或者釋放會(huì)對(duì)性能造成影響。
- 成員變量
- ByteBufAllocator 用于內(nèi)存分配
- array 字節(jié)數(shù)組作為緩沖區(qū),用于存儲(chǔ)字節(jié)數(shù)據(jù)
- ByteBuffer 用來實(shí)現(xiàn)Netty ByteBuf 到Nio ByteBuffer的變換
- 動(dòng)態(tài)擴(kuò)展緩沖區(qū)
調(diào)用capacity方法動(dòng)態(tài)擴(kuò)展緩沖區(qū),首先要對(duì)擴(kuò)展容量進(jìn)行校驗(yàn),如果新容量的大小小于0或者大于最大可擴(kuò)展容量maxCapacity的話,拋出IllegalArgumentException異常。
通過校驗(yàn)之后,如果新擴(kuò)展容量比原來大的話,則創(chuàng)建一個(gè)新的容量為新擴(kuò)展容量的字節(jié)數(shù)組緩沖區(qū),然后調(diào)用System.arraycopy進(jìn)行內(nèi)存復(fù)制,將舊的數(shù)據(jù)復(fù)制到新數(shù)組中去,然后用setArray進(jìn)行數(shù)組替換。動(dòng)態(tài)擴(kuò)展之后需要原來的視圖tmpNioBuffer設(shè)置為控。
如果新的容量小于當(dāng)前緩沖區(qū)容量的話,不需要進(jìn)行動(dòng)態(tài)擴(kuò)展,但是需要截取部分?jǐn)?shù)據(jù)作為子緩沖區(qū)。- 首先對(duì)當(dāng)前的readerIndex是否小于newCapacity,如果小于的話繼續(xù)對(duì)writerIndex跟newCapacity進(jìn)行比較,如果writerIndex大于newCapacity的話,就將writerIndex設(shè)置為newCapacity,更新完索引之后就通過System.arrayCopy內(nèi)存復(fù)制將當(dāng)前可讀的數(shù)據(jù)復(fù)制到新的緩沖區(qū)字節(jié)數(shù)組中。
- 如果newCapacity小于readerIndex的話,說明沒有新的可讀數(shù)據(jù)要復(fù)制到新的字節(jié)數(shù)組緩沖區(qū)中,只需要把writerIndex跟readerIndex都更新為newCapacity既可,最后調(diào)用setArray更換字節(jié)數(shù)組。
public ByteBuf capacity(int newCapacity) {
ensureAccessible();
if (newCapacity < 0 || newCapacity > maxCapacity()) {
throw new IllegalArgumentException("newCapacity: " + newCapacity);
}
int oldCapacity = array.length;
if (newCapacity > oldCapacity) {
byte[] newArray = new byte[newCapacity];
System.arraycopy(array, 0, newArray, 0, array.length);
setArray(newArray);
} else if (newCapacity < oldCapacity) {
byte[] newArray = new byte[newCapacity];
int readerIndex = readerIndex();
if (readerIndex < newCapacity) {
int writerIndex = writerIndex();
if (writerIndex > newCapacity) {
writerIndex(writerIndex = newCapacity);
}
System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
} else {
setIndex(newCapacity, newCapacity);
}
setArray(newArray);
}
return this;
}
- setBytes
字節(jié)數(shù)組復(fù)制,首先對(duì)數(shù)據(jù)進(jìn)行合法性檢驗(yàn),如果srcIndex或者index的值小于0,就會(huì)拋出IllegalArgumentException,如果index+length的值大于capacity的值或者srcIndex+length的值大于src.length的話,就會(huì)拋出IndexOutOfBoundsException異常。通過校驗(yàn)之后,就調(diào)用System.arraycopy進(jìn)行字節(jié)數(shù)組復(fù)制。
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
checkSrcIndex(index, length, srcIndex, src.length);
System.arraycopy(src, srcIndex, array, index, length);
return this;
}
protected final void checkSrcIndex(int index, int length, int srcIndex, int srcCapacity) {
checkIndex(index, length);
if (srcIndex < 0 || srcIndex > srcCapacity - length) {
throw new IndexOutOfBoundsException(String.format(
"srcIndex: %d, length: %d (expected: range(0, %d))", srcIndex, length, srcCapacity));
}
}
- Netty ByteBuf與Nio ByteBuffer轉(zhuǎn)換
要將Netty的ByteBuf轉(zhuǎn)化為Nio ByteBuffer,在ByteBuffer中有wrap靜態(tài)方法,只需要傳入對(duì)應(yīng)的字節(jié)數(shù)組即可創(chuàng)建轉(zhuǎn)化為ByteBuffer,在nioBuffer方法還調(diào)用了slice方法,它可以創(chuàng)建一個(gè)從原ByteBuffer的position開始緩沖區(qū),與原緩沖區(qū)共享同一段數(shù)據(jù)元素。nioBuffer方法不會(huì)重用緩沖區(qū),只能保證writerIndex跟readerIndex的獨(dú)立性。
public ByteBuffer nioBuffer(int index, int length) {
ensureAccessible();
return ByteBuffer.wrap(array, index, length).slice();
}
PooledByteBuf
在Netty4之后加入內(nèi)存池管理,通過內(nèi)存池管理比之前ByteBuf的創(chuàng)建性能得到了極大提高。
- PoolChunk
- Page 可以用來分配的最小內(nèi)存塊單位
- Chunk page的集合
PoolChunk主要負(fù)責(zé)內(nèi)存塊的分配及釋放,chunk中的page會(huì)構(gòu)建成一顆二叉樹,默認(rèn)情況下page的大小是8K,chunk的大小是2^11 page,即16M,構(gòu)成了11層的二叉樹,最下面一層的葉子節(jié)點(diǎn)有8192個(gè),與page的數(shù)目一樣,每一次內(nèi)存的分配必須保證連續(xù)性,方便內(nèi)存操作。每個(gè)節(jié)點(diǎn)會(huì)記錄自己在Memory Area的偏移地址,當(dāng)一個(gè)節(jié)點(diǎn)表示的內(nèi)存區(qū)域被分配之后,那么該節(jié)點(diǎn)會(huì)被標(biāo)志為已分配,該節(jié)點(diǎn)的所有子節(jié)點(diǎn)的內(nèi)存請(qǐng)求都會(huì)忽略。每次內(nèi)存分配的都是8k(2n)大小的內(nèi)存塊,當(dāng)需要分配大小為chunkSize/(2k)的內(nèi)存端時(shí),為了找到可用的內(nèi)存段,會(huì)從第K層左邊開始尋找可用節(jié)點(diǎn)。
- PoolArena
在內(nèi)存分配中,為了能夠集中管理內(nèi)存的分配及釋放,同時(shí)提供分配和釋放內(nèi)存的性能,一般都是會(huì)先預(yù)先分配一大塊連續(xù)的內(nèi)存,不需要重復(fù)頻繁地進(jìn)行內(nèi)存操作,那一大塊連續(xù)的內(nèi)存就叫做memory Arena,而PoolArena是Netty的內(nèi)存池實(shí)現(xiàn)類。
在Netty中,PoolArena是由多個(gè)Chunk組成的,而每個(gè)Chunk則由多個(gè)Page組成。PoolArena是由Chunk和Page共同組織和管理的。
- PoolSubpage
當(dāng)對(duì)于小于一個(gè)Page的內(nèi)存分配的時(shí)候,每個(gè)Page會(huì)被劃分為大小相等的內(nèi)存塊,它的大小是根據(jù)第一次申請(qǐng)內(nèi)存分配的內(nèi)存塊大小來決定的。一個(gè)Page只能分配與第一次內(nèi)存內(nèi)存的內(nèi)存塊的大小相等的內(nèi)存塊,如果想要想要申請(qǐng)大小不想等的內(nèi)存塊,只能在新的Page上申請(qǐng)內(nèi)存分配了。
Page中的存儲(chǔ)區(qū)域的使用情況是通過一個(gè)long數(shù)組bitmap來維護(hù)的,每一位表示一個(gè)區(qū)域的占用情況。
PooledDirectByteBuf
- 創(chuàng)建字節(jié)緩沖區(qū)
由于內(nèi)存池實(shí)現(xiàn),每次創(chuàng)建字節(jié)緩沖區(qū)的時(shí)候,不是直接new,而是從內(nèi)存池中去獲取,然后設(shè)置引用計(jì)數(shù)器跟讀寫Index,跟緩沖區(qū)最大容量返回。
static PooledHeapByteBuf newInstance(int maxCapacity) {
PooledHeapByteBuf buf = RECYCLER.get();
buf.reuse(maxCapacity);
return buf;
}
final void reuse(int maxCapacity) {
maxCapacity(maxCapacity);
setRefCnt(1);
setIndex0(0, 0);
discardMarks();
}
- 復(fù)制字節(jié)緩沖區(qū)實(shí)例
copy方法可以復(fù)制一個(gè)字節(jié)緩沖區(qū)實(shí)例,與原緩沖區(qū)獨(dú)立。
首先要對(duì)index和length進(jìn)行合法性判斷,然后調(diào)用PooledByteBufAllocator的directBuffer方法分配一個(gè)新的緩沖區(qū)。newDirectBuffer方法是一個(gè)抽象方法,對(duì)于不同的子類有不同的實(shí)現(xiàn)。如果是unpooled的話,會(huì)直接創(chuàng)建一個(gè)新的緩沖區(qū),如果是pooled的話,它會(huì)從內(nèi)存池中獲取一個(gè)可用的緩沖區(qū)。
public ByteBuf copy(int index, int length) {
checkIndex(index, length);
ByteBuf copy = alloc().directBuffer(length, maxCapacity());
copy.writeBytes(this, index, length);
return copy;
}
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
if (initialCapacity == 0 && maxCapacity == 0) {
return emptyBuf;
}
validate(initialCapacity, maxCapacity);
return newDirectBuffer(initialCapacity, maxCapacity);
}
// PooledByteBufAllocator
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
PoolThreadCache cache = threadCache.get();
PoolArena<ByteBuffer> directArena = cache.directArena;
ByteBuf buf;
if (directArena != null) {
buf = directArena.allocate(cache, initialCapacity, maxCapacity);
} else {
if (PlatformDependent.hasUnsafe()) {
buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
}
return toLeakAwareBuffer(buf);
}
//UnpooledByteBufAllocator
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
ByteBuf buf;
if (PlatformDependent.hasUnsafe()) {
buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
} else {
buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
}
return toLeakAwareBuffer(buf);
}
ByteBuf輔助類分析
ByteBufHolder
ByteBufHolder是ByteBuf的一個(gè)容器,它可以更方便地訪問ByteBuf中的數(shù)據(jù),在使用不同的協(xié)議進(jìn)行數(shù)據(jù)傳輸?shù)臅r(shí)候,不同的協(xié)議消息體包含的數(shù)據(jù)格式和字段不一樣,所以抽象一個(gè)ByteBufHolder對(duì)ByteBuf進(jìn)行包裝,不同的子類有不同的實(shí)現(xiàn),使用者可以根據(jù)自己的需要進(jìn)行實(shí)現(xiàn)。Netty提供了一個(gè)默認(rèn)實(shí)現(xiàn)DefaultByteBufHolder。
ByteBufAllocator
ByteBufAllocator是字節(jié)緩沖區(qū)分配器,根據(jù)Netty字節(jié)緩沖區(qū)的實(shí)現(xiàn)不同,分為兩種不同的分配器PooledByteBufAllocator和UnpooledByteBufAllocator。他們提供了不同ByteBuf的分配方法。
CompositeByteBuf
CompositeByteBuf是一個(gè)虛擬的Buffer,它可以將多個(gè)ByteBuf組裝為一個(gè)ByteBuf視圖。
在Java NIO中,我們有兩種實(shí)現(xiàn)的方法
- 將其他ByteBuffer的數(shù)據(jù)復(fù)制到一個(gè)ByteBuffer中,或者重新創(chuàng)建一個(gè)新的ByteBuffer,將其他的ByteBuffer復(fù)制到新建的ByteBuffer中。
- 通過容器將多個(gè)ByteBuffer存儲(chǔ)在一起,進(jìn)行統(tǒng)一的管理和維護(hù)。
在Netty中,CompositeByByteBuf中維護(hù)了一個(gè)Component類型的集合。Component是ByteBuf的包裝類,它聚合了ByteBuf.維護(hù)在集合中的位置偏移量等信息。一般情況下,我們應(yīng)該使用ByteBufAllocator.compositeBuffer()和Unpooled.wrappedBuffer(ByteBuf...)方法來創(chuàng)建CompositeByteBuf,而不是直接通過構(gòu)造函數(shù)去實(shí)例化一個(gè)CompositeByteBuf對(duì)象。
private int addComponent0(int cIndex, ByteBuf buffer) {
checkComponentIndex(cIndex);
if (buffer == null) {
throw new NullPointerException("buffer");
}
int readableBytes = buffer.readableBytes();
// No need to consolidate - just add a component to the list.
Component c = new Component(buffer.order(ByteOrder.BIG_ENDIAN).slice());
if (cIndex == components.size()) {
components.add(c);
if (cIndex == 0) {
c.endOffset = readableBytes;
} else {
Component prev = components.get(cIndex - 1);
c.offset = prev.endOffset;
c.endOffset = c.offset + readableBytes;
}
} else {
components.add(cIndex, c);
if (readableBytes != 0) {
updateComponentOffsets(cIndex);
}
}
return cIndex;
}
private void consolidateIfNeeded() {
final int numComponents = components.size();
if (numComponents > maxNumComponents) {
final int capacity = components.get(numComponents - 1).endOffset;
ByteBuf consolidated = allocBuffer(capacity);
for (int i = 0; i < numComponents; i ++) {
Component c = components.get(i);
ByteBuf b = c.buf;
consolidated.writeBytes(b);
c.freeIfNecessary();
}
Component c = new Component(consolidated);
c.endOffset = c.length;
components.clear();
components.add(c);
}
}
public CompositeByteBuf removeComponent(int cIndex) {
checkComponentIndex(cIndex);
Component comp = components.remove(cIndex);
comp.freeIfNecessary();
if (comp.length > 0) {
updateComponentOffsets(cIndex);
}
return this;
}
private static final class Component {
final ByteBuf buf;
final int length;
int offset;
int endOffset;
Component(ByteBuf buf) {
this.buf = buf;
length = buf.readableBytes();
}
void freeIfNecessary() {
buf.release(); // We should not get a NPE here. If so, it must be a bug.
}
}
ByteBufUtil
ByteBufUtil是ByteBuf的工具類,它提供了一系列的靜態(tài)方法來操作ByteBuf。