Netty 源碼分析之ByteBuf

ByteBuf基礎(chǔ)

Java Nio 的Buffer

在進(jìn)行數(shù)據(jù)傳輸?shù)倪^程中,我們經(jīng)常會(huì)用到緩沖區(qū)。
在Java NIO 為我們提供了原生的七種緩沖區(qū)實(shí)現(xiàn),對(duì)應(yīng)著Java 的七種基本類型。一般使用ByteBuffer較多。原生的Buffer雖然能滿足我們的日常使用,但是要進(jìn)行復(fù)雜的應(yīng)用的時(shí)候,確有點(diǎn)力不從心了,原生Buffer存在著以下缺點(diǎn)。因此Netty對(duì)其進(jìn)行了封裝,提供了更為友好的接口供我們使用。

  • 當(dāng)我們調(diào)用對(duì)應(yīng)Buffer類的allocate方法來創(chuàng)建緩沖區(qū)實(shí)例的時(shí)候,會(huì)分配指定的空間,同時(shí)緩沖區(qū)的長(zhǎng)度就會(huì)被固定,不能進(jìn)行動(dòng)態(tài)的增長(zhǎng)或者收縮。如果我們寫入的數(shù)據(jù)大于緩沖區(qū)的capacity的時(shí)候,就會(huì)發(fā)生數(shù)組越界錯(cuò)誤。
  • Buffer只有一個(gè)位置標(biāo)志位屬性Position,我們只能flip或者rewind方法來對(duì)position進(jìn)行修改來處理數(shù)據(jù)的存取位置,一不小心就可能會(huì)導(dǎo)致錯(cuò)誤。
  • Buffer只提供了存取、翻轉(zhuǎn)、釋放、標(biāo)志、比較、批量移動(dòng)等緩沖區(qū)的基本操作,我們想使用高級(jí)的功能,就得自己手動(dòng)進(jìn)行封裝及維護(hù),使用非常不方便。

ByteBuf工作原理

ByteBuf也是通過字節(jié)數(shù)組作為緩沖區(qū)來存取數(shù)據(jù),通過外觀模式聚合了JDK NIO元素的ByteBuffer,進(jìn)行封裝。
ByteBuf是通過readerIndex跟writerIndex兩個(gè)位置指針來協(xié)助緩沖區(qū)的讀寫操作的。
在對(duì)象初始化的時(shí)候,readerIndex和writerIndex的值為0,隨著讀操作和寫操作的進(jìn)行,writerIndex和readerIndex都會(huì)增加,不過readerIndex不能超過writerIndex,在進(jìn)行讀取操作之后,0到readerIndex之間的空間會(huì)被視為discard,調(diào)用ByteBuf的discardReadBytes方法,可以對(duì)這部分空間進(jìn)行釋放重用,類似于ByteBuffer的compact操作,對(duì)緩沖區(qū)進(jìn)行壓縮。readerIndex到writerIndex的空間,相當(dāng)于ByteBuffer的position到limit的空間,可以對(duì)其進(jìn)行讀取,WriterIndex到capacity的空間,則相當(dāng)于ByteBuffer的limit到capacity的空間,是可以繼續(xù)寫入的。
readerIndex跟writerIndex讓讀寫操作的位置指針分離,不需要對(duì)同一個(gè)位置指針進(jìn)行調(diào)整,簡(jiǎn)化了緩沖區(qū)的讀寫操作。
同樣,ByteBuf對(duì)讀寫操作進(jìn)行了封裝,提供了動(dòng)態(tài)擴(kuò)展的能力,當(dāng)我們對(duì)緩沖區(qū)進(jìn)行寫操作的時(shí)候,需要對(duì)剩余的可用空間進(jìn)行校驗(yàn),如果可用空間不足,同時(shí)要寫入的字節(jié)數(shù)小于可寫的最大字節(jié)數(shù),會(huì)對(duì)緩沖區(qū)進(jìn)行動(dòng)態(tài)擴(kuò)展,它會(huì)重新創(chuàng)建一個(gè)緩沖區(qū),然后將以前的數(shù)據(jù)復(fù)制到新創(chuàng)建的緩沖區(qū)中,

ByteBuf基本功能

  • 順序讀
    在進(jìn)行讀操作之前,首先對(duì)緩沖區(qū)可用的空間進(jìn)行校驗(yàn)。如果要讀取的字節(jié)長(zhǎng)度小于0,就會(huì)拋出IllegalArgumentException異常,如果要讀取的字節(jié)長(zhǎng)度大于已寫入的字節(jié)長(zhǎng)度,會(huì)拋出IndexOutOfBoundsException異常。通過校驗(yàn)之后,調(diào)用getBytes方法,從當(dāng)前的readerIndex開始,讀取length長(zhǎng)度的字節(jié)數(shù)據(jù)到目標(biāo)dst中,由于不同的子類實(shí)現(xiàn)不一樣,getBytes是個(gè)抽象方法,由對(duì)應(yīng)的子類去實(shí)現(xiàn)。如果讀取數(shù)據(jù)成功,readerIndex將會(huì)增加相應(yīng)的length。
public ByteBuf readBytes(ByteBuf dst, int dstIndex, int length) {
    checkReadableBytes(length);
    getBytes(readerIndex, dst, dstIndex, length);
    readerIndex += length;
    return this;
}
protected final void checkReadableBytes(int minimumReadableBytes) {
    ensureAccessible();
    if (minimumReadableBytes < 0) {
        throw new IllegalArgumentException("minimumReadableBytes: " + minimumReadableBytes + " (expected: >= 0)");
    }
    if (readerIndex > writerIndex - minimumReadableBytes) {
        throw new IndexOutOfBoundsException(String.format(
                "readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s",
                readerIndex, minimumReadableBytes, writerIndex, this));
    }
}
  • 順序?qū)?br> 讀操作是將源字節(jié)數(shù)組從srcIndex開始,length長(zhǎng)度的數(shù)據(jù)寫入到當(dāng)前的ByteBuf中的。
    一開始需要對(duì)寫入數(shù)組的字節(jié)數(shù)進(jìn)行校驗(yàn),如果寫入長(zhǎng)度小于0,將會(huì)拋出IllegalArgumentException異常,如果寫入字節(jié)數(shù)小于當(dāng)前ByteBuf的可寫入字節(jié)數(shù),則通過檢驗(yàn)。如果寫入字節(jié)數(shù)大于緩沖區(qū)最大可動(dòng)態(tài)擴(kuò)展的容量maxCapacity,就會(huì)拋出
    IndexOutOfBoundsException異常,否則的話,就會(huì)通過動(dòng)態(tài)擴(kuò)展來滿足寫入需要的字節(jié)數(shù)。首先通過calculateNewCapacity計(jì)算出重新擴(kuò)展后的容量,然后調(diào)用capacity方法進(jìn)行擴(kuò)展,不同的子類有不同實(shí)現(xiàn),所以也是一個(gè)抽象方法。
    • 計(jì)算擴(kuò)展容量,首先設(shè)置門閥值為4m,如果要擴(kuò)展的容量等于閥值就使用閥值作為緩沖區(qū)新的容量,如果大于閥值就以4M作為步長(zhǎng),每次增加4M,如果擴(kuò)展期間,要擴(kuò)展的容量比最大可擴(kuò)展容量還大的話,就以最大可擴(kuò)展容量maxCapacity為新的容量。否則的話,就從64開始倍增,直到倍增之后的結(jié)果大于要擴(kuò)展的容量,再把結(jié)果作為緩沖區(qū)的新容量。
    • 通過先倍增再步長(zhǎng)來擴(kuò)展容量,如果我們只是writerIndex+length的值作為緩沖區(qū)的新容量,那么再以后進(jìn)行寫操作的時(shí)候,每次都需要進(jìn)行容量擴(kuò)展,容量擴(kuò)展的過程需要進(jìn)行內(nèi)存復(fù)制,過多內(nèi)存復(fù)制會(huì)導(dǎo)致系統(tǒng)的性能下降,之所以是倍增再部長(zhǎng),在最初空間比較小的時(shí)候,倍增操作并不會(huì)帶來太多的內(nèi)存浪費(fèi),但是內(nèi)存增長(zhǎng)到一定的時(shí)候,再進(jìn)行倍增的時(shí)候,就會(huì)對(duì)內(nèi)存造成浪費(fèi),因此,需要設(shè)定一個(gè)閥值,到達(dá)閥值之后就通過步長(zhǎng)的方法進(jìn)行平滑的增長(zhǎng)。
public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
    ensureWritable(length);
    setBytes(writerIndex, src, srcIndex, length);
    writerIndex += length;
    return this;
}
public ByteBuf ensureWritable(int minWritableBytes) {
    if (minWritableBytes < 0) {
        throw new IllegalArgumentException(String.format(
                "minWritableBytes: %d (expected: >= 0)", minWritableBytes));
    }

    if (minWritableBytes <= writableBytes()) {
        return this;
    }

    if (minWritableBytes > maxCapacity - writerIndex) {
        throw new IndexOutOfBoundsException(String.format(
                "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
                writerIndex, minWritableBytes, maxCapacity, this));
    }

    // Normalize the current capacity to the power of 2.
    int newCapacity = calculateNewCapacity(writerIndex + minWritableBytes);

    // Adjust to the new capacity.
    capacity(newCapacity);
    return this;
}
private int calculateNewCapacity(int minNewCapacity) {
    final int maxCapacity = this.maxCapacity;
    final int threshold = 1048576 * 4; // 4 MiB page

    if (minNewCapacity == threshold) {
        return threshold;
    }

    // If over threshold, do not double but just increase by threshold.
    if (minNewCapacity > threshold) {
        int newCapacity = minNewCapacity / threshold * threshold;
        if (newCapacity > maxCapacity - threshold) {
            newCapacity = maxCapacity;
        } else {
            newCapacity += threshold;
        }
        return newCapacity;
    }

    // Not over threshold. Double up to 4 MiB, starting from 64.
    int newCapacity = 64;
    while (newCapacity < minNewCapacity) {
        newCapacity <<= 1;
    }

    return Math.min(newCapacity, maxCapacity);
}
//UnpooledHeapByteBuf的capacity實(shí)現(xiàn)
public ByteBuf capacity(int newCapacity) {
    ensureAccessible();
    if (newCapacity < 0 || newCapacity > maxCapacity()) {
        throw new IllegalArgumentException("newCapacity: " + newCapacity);
    }

    int oldCapacity = array.length;
    if (newCapacity > oldCapacity) {
        byte[] newArray = new byte[newCapacity];
        System.arraycopy(array, 0, newArray, 0, array.length);
        setArray(newArray);
    } else if (newCapacity < oldCapacity) {
        byte[] newArray = new byte[newCapacity];
        int readerIndex = readerIndex();
        if (readerIndex < newCapacity) {
            int writerIndex = writerIndex();
            if (writerIndex > newCapacity) {
                writerIndex(writerIndex = newCapacity);
            }
            System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
        } else {
            setIndex(newCapacity, newCapacity);
        }
        setArray(newArray);
    }
    return this;
}
  • Clear操作
    clear操作只是把readerIndex和writerIndex設(shè)置為0,不會(huì)對(duì)存儲(chǔ)的數(shù)據(jù)進(jìn)行修改。
public ByteBuf clear() {
    readerIndex = writerIndex = 0;
    return this;
}
  • 索引操作

    • 讀寫位置索引設(shè)置:主要是對(duì)邊界條件進(jìn)行校驗(yàn),設(shè)置readerIndex的時(shí)候,newReaderIndex不能小于0跟大于writerIndex;設(shè)置writerIndex的時(shí)候,newWriterIndex必須大于readerIndex和小于當(dāng)前的capacity。如果不能通過校驗(yàn)的話,就會(huì)拋出IndexOutOfBoundsException異常。
    • mark和reset操作:由于有readerIndex和writerIndex,因此進(jìn)行mark或者reset需要指定相應(yīng)的操作位置索引,mark操作會(huì)把當(dāng)前的readerIndex或者writerIndex設(shè)置為markedReaderIndex或者markedWriterIndex;reset操作的話,它是參入對(duì)應(yīng)的mark值調(diào)用對(duì)應(yīng)readerIndex()或者writerIndex();
  • 緩沖區(qū)重用
    可以通過discardReadByte方法去重用已經(jīng)讀取過的緩沖區(qū)。
    首先對(duì)readerIndex進(jìn)行判斷:

    • 如果readerIndex等于0,就說明沒有讀取數(shù)據(jù),沒有可以用來重用的空間,直接返回;
    • 如果readerIndex大于0且不等于writerIndex的話,說明有進(jìn)行數(shù)據(jù)讀取被丟棄的緩沖區(qū),也有還沒有被讀取的緩沖區(qū)。調(diào)用setBytes方法進(jìn)行字節(jié)數(shù)組的復(fù)制,將沒被讀取的數(shù)據(jù)移動(dòng)到緩沖區(qū)的起始位置,重新去設(shè)置readerIndex和writerIndex,readerIndex為0,writerIndex為原writerIndex-readerIndex;同時(shí),也需要對(duì)mark進(jìn)行重新設(shè)置。
      • 首先對(duì)markedReaderIndex進(jìn)行備份然后跟decrement進(jìn)行比較,如果markedReaderIndex比decrement小的話,markedReaderIndex設(shè)置為0,再用markedWriterIndex跟decrement比較,如果小于的話,markedWriterIndex也設(shè)置為0,否則的話markedWriterIndex較少decrement;
      • 如果markedReaderIndex比decrement大的話,markedReaderIndex和markedReaderIndex都減去decrement就可以了。
    • 如果readerIndex等于writerIndex的話,說明沒有可以進(jìn)行重用的緩沖區(qū),直接對(duì)mark重新設(shè)置就可以了,不需要內(nèi)存復(fù)制。
public ByteBuf discardReadBytes() {
    ensureAccessible();
    if (readerIndex == 0) {
        return this;
    }

    if (readerIndex != writerIndex) {
        setBytes(0, this, readerIndex, writerIndex - readerIndex);
        writerIndex -= readerIndex;
        adjustMarkers(readerIndex);
        readerIndex = 0;
    } else {
        adjustMarkers(readerIndex);
        writerIndex = readerIndex = 0;
    }
    return this;
}
protected final void adjustMarkers(int decrement) {
    int markedReaderIndex = this.markedReaderIndex;
    if (markedReaderIndex <= decrement) {
        this.markedReaderIndex = 0;
        int markedWriterIndex = this.markedWriterIndex;
        if (markedWriterIndex <= decrement) {
            this.markedWriterIndex = 0;
        } else {
            this.markedWriterIndex = markedWriterIndex - decrement;
        }
    } else {
        this.markedReaderIndex = markedReaderIndex - decrement;
        markedWriterIndex -= decrement;
    }
}
  • skipBytes

當(dāng)我們需要跳過某些不需要的字節(jié)的時(shí)候,可以調(diào)用skipBytes方法來跳過指定長(zhǎng)度的字節(jié)來讀取后面的數(shù)據(jù)。
首先對(duì)跳躍長(zhǎng)度進(jìn)行判斷,如果跳躍長(zhǎng)度小于0的話,會(huì)拋出IllegalArgumentException異常,或者跳躍長(zhǎng)度大于當(dāng)前緩沖區(qū)可讀長(zhǎng)度的話,會(huì)拋出IndexOutOfBoundsException異常。如果校驗(yàn)通過,新的readerindex為原readerIndex+length,如果新的readerIndex大于writerIndex的話,會(huì)拋出IndexOutOfBoundsException異常,否則就更新readerIndex。

public ByteBuf skipBytes(int length) {
    checkReadableBytes(length);
    int newReaderIndex = readerIndex + length;
    if (newReaderIndex > writerIndex) {
        throw new IndexOutOfBoundsException(String.format(
                "length: %d (expected: readerIndex(%d) + length <= writerIndex(%d))",
                length, readerIndex, writerIndex));
    }
    readerIndex = newReaderIndex;
    return this;
}

ByteBuf源碼分析

[圖片上傳失敗...(image-54964b-1539838427542)]

AbstractReferenceCountedByteBuf

AbstractReferenceCountedByteBuf是ByteBuf實(shí)現(xiàn)對(duì)引用進(jìn)行計(jì)數(shù)的基類,用來跟蹤對(duì)象的分配和銷毀,實(shí)現(xiàn)自動(dòng)內(nèi)存回收。

  • 成員變量
    • refCntUpdater refCntUpdater是一個(gè)AtomicIntegerFieldUpdater類型的成員變量,它可以對(duì)成員變量進(jìn)行原子性更新操作,達(dá)到線程安全。
    • REFCNT_FIELD_OFFSET REFCNT_FIELD_OFFSET是標(biāo)識(shí)refCnt字段在AbstractReferenceCountedByteBuf的內(nèi)存地址,在UnpooledDirectByteBuf和PooledDirectByteBuf兩個(gè)子類中都會(huì)使用到這個(gè)偏移量。
    • refCnt volatile修飾保證變量的線程可見性,用來跟蹤對(duì)象的引用次數(shù)
  • 對(duì)象引用計(jì)數(shù)器
    每調(diào)用retain方法一次,引用計(jì)數(shù)器就會(huì)加一。retain方法通過自旋對(duì)引用計(jì)數(shù)器進(jìn)行加一操作,引用計(jì)數(shù)器的初始值為1,只要程序是正確執(zhí)行的話,它的最小值應(yīng)該為1,當(dāng)申請(qǐng)和釋放次數(shù)相等的時(shí)候,對(duì)應(yīng)的ByteBuf就會(huì)被回收。當(dāng)次數(shù)為0時(shí),表明對(duì)象被錯(cuò)誤的引用,就會(huì)拋出IllegalReferenceCountException異常,如果次數(shù)等于Integer類型的最大值,就會(huì)拋出
    IllegalReferenceCountException異常。retain通過refCntUpdater的compareAndSet方法進(jìn)行原子操作更新,compareAndSet會(huì)使用獲取的值與期望值進(jìn)行比較,如果在比較器件,有其他線程對(duì)變量進(jìn)行修改,那么比較失敗,會(huì)再次自旋,獲取引用計(jì)數(shù)器的值再次進(jìn)行比較,否則的話,就會(huì)進(jìn)行加一操作,退出自旋。
    release方法的話與retain方法類似,也是通過自旋循環(huán)進(jìn)行判斷和更新,不過當(dāng)refCnt的值等于1的時(shí)候,表明引用計(jì)數(shù)器的申請(qǐng)跟釋放次數(shù)一樣,對(duì)象引用已經(jīng)不可達(dá)了,對(duì)象應(yīng)該要被垃圾收集回收掉了,調(diào)用deallocate方法釋放ByteBuf對(duì)象
public ByteBuf retain() {
    for (;;) {
        int refCnt = this.refCnt;
        if (refCnt == 0) {
            throw new IllegalReferenceCountException(0, 1);
        }
        if (refCnt == Integer.MAX_VALUE) {
            throw new IllegalReferenceCountException(Integer.MAX_VALUE, 1);
        }
        if (refCntUpdater.compareAndSet(this, refCnt, refCnt + 1)) {
            break;
        }
    }
    return this;
}

public final boolean release() {
    for (;;) {
        int refCnt = this.refCnt;
        if (refCnt == 0) {
            throw new IllegalReferenceCountException(0, -1);
        }

        if (refCntUpdater.compareAndSet(this, refCnt, refCnt - 1)) {
            if (refCnt == 1) {
                deallocate();
                return true;
            }
            return false;
        }
    }
}

UnpooledHeapByteBuf

UnpooledHeapByteBuf是一個(gè)非線程池實(shí)現(xiàn)的在堆內(nèi)存進(jìn)行內(nèi)存分配的字節(jié)緩沖區(qū),在每次IO操作的都會(huì)去創(chuàng)建一個(gè)UnpooledHeapByteBuf對(duì)象,如果頻繁地對(duì)內(nèi)存進(jìn)行分配或者釋放會(huì)對(duì)性能造成影響。

  • 成員變量
    • ByteBufAllocator 用于內(nèi)存分配
    • array 字節(jié)數(shù)組作為緩沖區(qū),用于存儲(chǔ)字節(jié)數(shù)據(jù)
    • ByteBuffer 用來實(shí)現(xiàn)Netty ByteBuf 到Nio ByteBuffer的變換
  • 動(dòng)態(tài)擴(kuò)展緩沖區(qū)
    調(diào)用capacity方法動(dòng)態(tài)擴(kuò)展緩沖區(qū),首先要對(duì)擴(kuò)展容量進(jìn)行校驗(yàn),如果新容量的大小小于0或者大于最大可擴(kuò)展容量maxCapacity的話,拋出IllegalArgumentException異常。
    通過校驗(yàn)之后,如果新擴(kuò)展容量比原來大的話,則創(chuàng)建一個(gè)新的容量為新擴(kuò)展容量的字節(jié)數(shù)組緩沖區(qū),然后調(diào)用System.arraycopy進(jìn)行內(nèi)存復(fù)制,將舊的數(shù)據(jù)復(fù)制到新數(shù)組中去,然后用setArray進(jìn)行數(shù)組替換。動(dòng)態(tài)擴(kuò)展之后需要原來的視圖tmpNioBuffer設(shè)置為控。
    如果新的容量小于當(dāng)前緩沖區(qū)容量的話,不需要進(jìn)行動(dòng)態(tài)擴(kuò)展,但是需要截取部分?jǐn)?shù)據(jù)作為子緩沖區(qū)。
    • 首先對(duì)當(dāng)前的readerIndex是否小于newCapacity,如果小于的話繼續(xù)對(duì)writerIndex跟newCapacity進(jìn)行比較,如果writerIndex大于newCapacity的話,就將writerIndex設(shè)置為newCapacity,更新完索引之后就通過System.arrayCopy內(nèi)存復(fù)制將當(dāng)前可讀的數(shù)據(jù)復(fù)制到新的緩沖區(qū)字節(jié)數(shù)組中。
    • 如果newCapacity小于readerIndex的話,說明沒有新的可讀數(shù)據(jù)要復(fù)制到新的字節(jié)數(shù)組緩沖區(qū)中,只需要把writerIndex跟readerIndex都更新為newCapacity既可,最后調(diào)用setArray更換字節(jié)數(shù)組。
 public ByteBuf capacity(int newCapacity) {
    ensureAccessible();
    if (newCapacity < 0 || newCapacity > maxCapacity()) {
        throw new IllegalArgumentException("newCapacity: " + newCapacity);
    }

    int oldCapacity = array.length;
    if (newCapacity > oldCapacity) {
        byte[] newArray = new byte[newCapacity];
        System.arraycopy(array, 0, newArray, 0, array.length);
        setArray(newArray);
    } else if (newCapacity < oldCapacity) {
        byte[] newArray = new byte[newCapacity];
        int readerIndex = readerIndex();
        if (readerIndex < newCapacity) {
            int writerIndex = writerIndex();
            if (writerIndex > newCapacity) {
                writerIndex(writerIndex = newCapacity);
            }
            System.arraycopy(array, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
        } else {
            setIndex(newCapacity, newCapacity);
        }
        setArray(newArray);
    }
    return this;
}

  • setBytes
    字節(jié)數(shù)組復(fù)制,首先對(duì)數(shù)據(jù)進(jìn)行合法性檢驗(yàn),如果srcIndex或者index的值小于0,就會(huì)拋出IllegalArgumentException,如果index+length的值大于capacity的值或者srcIndex+length的值大于src.length的話,就會(huì)拋出IndexOutOfBoundsException異常。通過校驗(yàn)之后,就調(diào)用System.arraycopy進(jìn)行字節(jié)數(shù)組復(fù)制。
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
    checkSrcIndex(index, length, srcIndex, src.length);
    System.arraycopy(src, srcIndex, array, index, length);
    return this;
}
protected final void checkSrcIndex(int index, int length, int srcIndex, int srcCapacity) {
    checkIndex(index, length);
    if (srcIndex < 0 || srcIndex > srcCapacity - length) {
        throw new IndexOutOfBoundsException(String.format(
                "srcIndex: %d, length: %d (expected: range(0, %d))", srcIndex, length, srcCapacity));
    }
}

  • Netty ByteBuf與Nio ByteBuffer轉(zhuǎn)換
    要將Netty的ByteBuf轉(zhuǎn)化為Nio ByteBuffer,在ByteBuffer中有wrap靜態(tài)方法,只需要傳入對(duì)應(yīng)的字節(jié)數(shù)組即可創(chuàng)建轉(zhuǎn)化為ByteBuffer,在nioBuffer方法還調(diào)用了slice方法,它可以創(chuàng)建一個(gè)從原ByteBuffer的position開始緩沖區(qū),與原緩沖區(qū)共享同一段數(shù)據(jù)元素。nioBuffer方法不會(huì)重用緩沖區(qū),只能保證writerIndex跟readerIndex的獨(dú)立性。
public ByteBuffer nioBuffer(int index, int length) {
    ensureAccessible();
    return ByteBuffer.wrap(array, index, length).slice();
}

PooledByteBuf

在Netty4之后加入內(nèi)存池管理,通過內(nèi)存池管理比之前ByteBuf的創(chuàng)建性能得到了極大提高。

  • PoolChunk
    • Page 可以用來分配的最小內(nèi)存塊單位
    • Chunk page的集合

PoolChunk主要負(fù)責(zé)內(nèi)存塊的分配及釋放,chunk中的page會(huì)構(gòu)建成一顆二叉樹,默認(rèn)情況下page的大小是8K,chunk的大小是2^11 page,即16M,構(gòu)成了11層的二叉樹,最下面一層的葉子節(jié)點(diǎn)有8192個(gè),與page的數(shù)目一樣,每一次內(nèi)存的分配必須保證連續(xù)性,方便內(nèi)存操作。每個(gè)節(jié)點(diǎn)會(huì)記錄自己在Memory Area的偏移地址,當(dāng)一個(gè)節(jié)點(diǎn)表示的內(nèi)存區(qū)域被分配之后,那么該節(jié)點(diǎn)會(huì)被標(biāo)志為已分配,該節(jié)點(diǎn)的所有子節(jié)點(diǎn)的內(nèi)存請(qǐng)求都會(huì)忽略。每次內(nèi)存分配的都是8k(2n)大小的內(nèi)存塊,當(dāng)需要分配大小為chunkSize/(2k)的內(nèi)存端時(shí),為了找到可用的內(nèi)存段,會(huì)從第K層左邊開始尋找可用節(jié)點(diǎn)。

  • PoolArena

在內(nèi)存分配中,為了能夠集中管理內(nèi)存的分配及釋放,同時(shí)提供分配和釋放內(nèi)存的性能,一般都是會(huì)先預(yù)先分配一大塊連續(xù)的內(nèi)存,不需要重復(fù)頻繁地進(jìn)行內(nèi)存操作,那一大塊連續(xù)的內(nèi)存就叫做memory Arena,而PoolArena是Netty的內(nèi)存池實(shí)現(xiàn)類。
在Netty中,PoolArena是由多個(gè)Chunk組成的,而每個(gè)Chunk則由多個(gè)Page組成。PoolArena是由Chunk和Page共同組織和管理的。

  • PoolSubpage

當(dāng)對(duì)于小于一個(gè)Page的內(nèi)存分配的時(shí)候,每個(gè)Page會(huì)被劃分為大小相等的內(nèi)存塊,它的大小是根據(jù)第一次申請(qǐng)內(nèi)存分配的內(nèi)存塊大小來決定的。一個(gè)Page只能分配與第一次內(nèi)存內(nèi)存的內(nèi)存塊的大小相等的內(nèi)存塊,如果想要想要申請(qǐng)大小不想等的內(nèi)存塊,只能在新的Page上申請(qǐng)內(nèi)存分配了。
Page中的存儲(chǔ)區(qū)域的使用情況是通過一個(gè)long數(shù)組bitmap來維護(hù)的,每一位表示一個(gè)區(qū)域的占用情況。

PooledDirectByteBuf

  • 創(chuàng)建字節(jié)緩沖區(qū)
    由于內(nèi)存池實(shí)現(xiàn),每次創(chuàng)建字節(jié)緩沖區(qū)的時(shí)候,不是直接new,而是從內(nèi)存池中去獲取,然后設(shè)置引用計(jì)數(shù)器跟讀寫Index,跟緩沖區(qū)最大容量返回。
static PooledHeapByteBuf newInstance(int maxCapacity) {
    PooledHeapByteBuf buf = RECYCLER.get();
    buf.reuse(maxCapacity);
    return buf;
}
final void reuse(int maxCapacity) {
    maxCapacity(maxCapacity);
    setRefCnt(1);
    setIndex0(0, 0);
    discardMarks();
}
  • 復(fù)制字節(jié)緩沖區(qū)實(shí)例
    copy方法可以復(fù)制一個(gè)字節(jié)緩沖區(qū)實(shí)例,與原緩沖區(qū)獨(dú)立。
    首先要對(duì)index和length進(jìn)行合法性判斷,然后調(diào)用PooledByteBufAllocator的directBuffer方法分配一個(gè)新的緩沖區(qū)。newDirectBuffer方法是一個(gè)抽象方法,對(duì)于不同的子類有不同的實(shí)現(xiàn)。如果是unpooled的話,會(huì)直接創(chuàng)建一個(gè)新的緩沖區(qū),如果是pooled的話,它會(huì)從內(nèi)存池中獲取一個(gè)可用的緩沖區(qū)。
public ByteBuf copy(int index, int length) {
    checkIndex(index, length);
    ByteBuf copy = alloc().directBuffer(length, maxCapacity());
    copy.writeBytes(this, index, length);
    return copy;
}
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
    if (initialCapacity == 0 && maxCapacity == 0) {
        return emptyBuf;
    }
    validate(initialCapacity, maxCapacity);
    return newDirectBuffer(initialCapacity, maxCapacity);
}
// PooledByteBufAllocator 
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    PoolThreadCache cache = threadCache.get();
    PoolArena<ByteBuffer> directArena = cache.directArena;

    ByteBuf buf;
    if (directArena != null) {
        buf = directArena.allocate(cache, initialCapacity, maxCapacity);
    } else {
        if (PlatformDependent.hasUnsafe()) {
            buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
        } else {
            buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
        }
    }

    return toLeakAwareBuffer(buf);
}
//UnpooledByteBufAllocator
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
    ByteBuf buf;
    if (PlatformDependent.hasUnsafe()) {
        buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
    } else {
        buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
    }

    return toLeakAwareBuffer(buf);
}

ByteBuf輔助類分析

ByteBufHolder

ByteBufHolder是ByteBuf的一個(gè)容器,它可以更方便地訪問ByteBuf中的數(shù)據(jù),在使用不同的協(xié)議進(jìn)行數(shù)據(jù)傳輸?shù)臅r(shí)候,不同的協(xié)議消息體包含的數(shù)據(jù)格式和字段不一樣,所以抽象一個(gè)ByteBufHolder對(duì)ByteBuf進(jìn)行包裝,不同的子類有不同的實(shí)現(xiàn),使用者可以根據(jù)自己的需要進(jìn)行實(shí)現(xiàn)。Netty提供了一個(gè)默認(rèn)實(shí)現(xiàn)DefaultByteBufHolder。

ByteBufAllocator

ByteBufAllocator是字節(jié)緩沖區(qū)分配器,根據(jù)Netty字節(jié)緩沖區(qū)的實(shí)現(xiàn)不同,分為兩種不同的分配器PooledByteBufAllocator和UnpooledByteBufAllocator。他們提供了不同ByteBuf的分配方法。

CompositeByteBuf

CompositeByteBuf是一個(gè)虛擬的Buffer,它可以將多個(gè)ByteBuf組裝為一個(gè)ByteBuf視圖。
在Java NIO中,我們有兩種實(shí)現(xiàn)的方法

  • 將其他ByteBuffer的數(shù)據(jù)復(fù)制到一個(gè)ByteBuffer中,或者重新創(chuàng)建一個(gè)新的ByteBuffer,將其他的ByteBuffer復(fù)制到新建的ByteBuffer中。
  • 通過容器將多個(gè)ByteBuffer存儲(chǔ)在一起,進(jìn)行統(tǒng)一的管理和維護(hù)。

在Netty中,CompositeByByteBuf中維護(hù)了一個(gè)Component類型的集合。Component是ByteBuf的包裝類,它聚合了ByteBuf.維護(hù)在集合中的位置偏移量等信息。一般情況下,我們應(yīng)該使用ByteBufAllocator.compositeBuffer()和Unpooled.wrappedBuffer(ByteBuf...)方法來創(chuàng)建CompositeByteBuf,而不是直接通過構(gòu)造函數(shù)去實(shí)例化一個(gè)CompositeByteBuf對(duì)象。

private int addComponent0(int cIndex, ByteBuf buffer) {
    checkComponentIndex(cIndex);
    if (buffer == null) {
        throw new NullPointerException("buffer");
    }

    int readableBytes = buffer.readableBytes();

    // No need to consolidate - just add a component to the list.
    Component c = new Component(buffer.order(ByteOrder.BIG_ENDIAN).slice());
    if (cIndex == components.size()) {
        components.add(c);
        if (cIndex == 0) {
            c.endOffset = readableBytes;
        } else {
            Component prev = components.get(cIndex - 1);
            c.offset = prev.endOffset;
            c.endOffset = c.offset + readableBytes;
        }
    } else {
        components.add(cIndex, c);
        if (readableBytes != 0) {
            updateComponentOffsets(cIndex);
        }
    }
    return cIndex;
}
private void consolidateIfNeeded() {
    final int numComponents = components.size();
    if (numComponents > maxNumComponents) {
        final int capacity = components.get(numComponents - 1).endOffset;

        ByteBuf consolidated = allocBuffer(capacity);

        for (int i = 0; i < numComponents; i ++) {
            Component c = components.get(i);
            ByteBuf b = c.buf;
            consolidated.writeBytes(b);
            c.freeIfNecessary();
        }
        Component c = new Component(consolidated);
        c.endOffset = c.length;
        components.clear();
        components.add(c);
    }
}

public CompositeByteBuf removeComponent(int cIndex) {
    checkComponentIndex(cIndex);
    Component comp = components.remove(cIndex);
    comp.freeIfNecessary();
    if (comp.length > 0) {
        updateComponentOffsets(cIndex);
    }
    return this;
}

private static final class Component {
    final ByteBuf buf;
    final int length;
    int offset;
    int endOffset;

    Component(ByteBuf buf) {
        this.buf = buf;
        length = buf.readableBytes();
    }

    void freeIfNecessary() {
        buf.release(); // We should not get a NPE here. If so, it must be a bug.
    }
}

ByteBufUtil

ByteBufUtil是ByteBuf的工具類,它提供了一系列的靜態(tài)方法來操作ByteBuf。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容