網(wǎng)絡(luò)編程N(yùn)etty之ByteBuf詳解

Netty中的ByteBuf優(yōu)勢

NIO使用的ByteBuffer有哪些缺點

1: 無法動態(tài)擴(kuò)容,ByteBuffer的長度是固定的,是初始指定的值,不能夠再進(jìn)行擴(kuò)容了,當(dāng)寫入的內(nèi)容大于ByteBuffer的容量時,會報越界異常

2.: API使用復(fù)雜,當(dāng)要讀取數(shù)據(jù)時,需要調(diào)用buffer.flip()方法,轉(zhuǎn)換為讀取模式,如果稍微不注意就可能出現(xiàn)錯誤,讀取不到數(shù)據(jù)或者讀取的數(shù)據(jù)是錯誤的

ByteBuf的優(yōu)勢和做了哪些增強(qiáng)

1: API操作起來更加的方便,可以直接寫或者直接讀

2:支持動態(tài)擴(kuò)容,當(dāng)寫入的數(shù)據(jù)大于ByteBuf的容量時,會動態(tài)擴(kuò)容,不會報錯

3:提供了多種ByteBuf的實現(xiàn),可以更加靈活的使用

4:提供了高效的零拷貝機(jī)制

5:ByteBuf可以內(nèi)存復(fù)用

ByteBuf操作示例
ByteBuf操作

ByteBuf中有三個重要的屬性:
1:capacity容量,初始指定的ByteBuf的大小

2:readIndex讀取位置,順序讀的時候,記錄讀取數(shù)據(jù)的索引值

3:writeIndex寫入位置,順序?qū)懙臅r候,記錄寫入數(shù)據(jù)的索引值

ByteBuf常用的方法:
1:getByte和setByte,獲取指定索引處的數(shù)據(jù),是隨機(jī)獲取的,不會改變readIndex和writeIndex的值

2:read*,順序讀,會改變readIndex的值

3:write*,順序?qū)?,會改變writeIndex的值

4:discardReadBytes,清除讀過的內(nèi)容

5:clear,清除緩沖區(qū)

6:搜索操作

7:標(biāo)記和重置

8:引用計數(shù)和釋放

簡單的Demo示例
/**
 * ByteBuf的使用示例
 */
public class ByteBufDemo {

    public static void main(String[] args) {
        //分配非池化,10個字節(jié)的ByteBuf
        ByteBuf buf = Unpooled.buffer(10);

        //看下ByteBuf
        System.out.println("------------------------原始的ByteBuf-------------------------------");
        System.out.println("ByteBuf參數(shù):" + buf.toString());
        System.out.println("ByteBuf中的內(nèi)容:" + Arrays.toString(buf.array()) + "\n");

        //寫入內(nèi)容到ByteBuf
        byte[] bytes = {1, 2, 3, 4, 5};
        buf.writeBytes(bytes);
        System.out.println("------------------------寫入內(nèi)容后的ByteBuf-------------------------------");
        System.out.println("ByteBuf參數(shù):" + buf.toString());
        System.out.println("ByteBuf中的內(nèi)容:" + Arrays.toString(buf.array()) + "\n");

        //從ByteBuf中讀取內(nèi)容
        buf.readByte();
        buf.readByte();
        System.out.println("------------------------讀取一些內(nèi)容后的ByteBuf-------------------------------");
        System.out.println("ByteBuf參數(shù):" + buf.toString());
        System.out.println("ByteBuf中的內(nèi)容:" + Arrays.toString(buf.array()) + "\n");

        //清除讀過的內(nèi)容
        //把讀過的數(shù)據(jù)清除后,readIndex變?yōu)?,writeIndex變?yōu)?
        //后面尚未讀取的內(nèi)容,會復(fù)制到前面去,把原來的值覆蓋掉
        //再次寫入時,3,4,5后面的4,5會被覆蓋掉
        buf.discardReadBytes();
        System.out.println("------------------------清除讀過的數(shù)據(jù)后的ByteBuf-------------------------------");
        System.out.println("ByteBuf參數(shù):" + buf.toString());
        System.out.println("ByteBuf中的內(nèi)容:" + Arrays.toString(buf.array()) + "\n");

        //再次寫入內(nèi)容到ByteBuf
        byte[] bytesO = {6};
        buf.writeBytes(bytesO);
        System.out.println("------------------------再次寫入內(nèi)容后的ByteBuf-------------------------------");
        System.out.println("ByteBuf參數(shù):" + buf.toString());
        System.out.println("ByteBuf中的內(nèi)容:" + Arrays.toString(buf.array()) + "\n");

        //清空讀和寫的索引值
        //readIndex和writeIndex會重置為0,ByteBuf中的內(nèi)容并不會重置
        buf.clear();
        System.out.println("------------------------清空讀和寫的索引值后的ByteBuf-------------------------------");
        System.out.println("ByteBuf參數(shù):" + buf.toString());
        System.out.println("ByteBuf中的內(nèi)容:" + Arrays.toString(buf.array()) + "\n");

        //再次寫入內(nèi)容到ByteBuf
        byte[] bytes2 = {1, 2, 3};
        buf.writeBytes(bytes2);
        System.out.println("------------------------再次寫入內(nèi)容后的ByteBuf-------------------------------");
        System.out.println("ByteBuf參數(shù):" + buf.toString());
        System.out.println("ByteBuf中的內(nèi)容:" + Arrays.toString(buf.array()) + "\n");

        //清空ByteBuf的內(nèi)容
        //不會重置readIndex和writeIndex
        buf.setZero(0, buf.capacity());
        System.out.println("------------------------清空ByteBuf的內(nèi)容后的ByteBuf-------------------------------");
        System.out.println("ByteBuf參數(shù):" + buf.toString());
        System.out.println("ByteBuf中的內(nèi)容:" + Arrays.toString(buf.array()) + "\n");

        //再次寫入超出指定容量的數(shù)據(jù)到ByteBuf
        //會進(jìn)行擴(kuò)容
        byte[] bytes3 = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12};
        buf.writeBytes(bytes3);
        System.out.println("------------------------再次寫入超出指定容量的數(shù)據(jù)后的ByteBuf-------------------------------");
        System.out.println("ByteBuf參數(shù):" + buf.toString());
        System.out.println("ByteBuf中的內(nèi)容:" + Arrays.toString(buf.array()) + "\n");
    }
}

輸出結(jié)果:



上面的例子是使用堆內(nèi)的ByteBuf,下面看下堆外的ByteBuf例子:

 //分配非池化,10個字節(jié)的directBuffer
ByteBuf buf = Unpooled.directBuffer(10);

//看下ByteBuf
System.out.println("------------------------原始的ByteBuf-------------------------------");
System.out.println("ByteBuf參數(shù):" + buf.toString());

directBuffer不能夠使用array方法,否則會報錯:java.lang.UnsupportedOperationException: direct buffer;而且使用ByteBuf是用它底層的分配器分配的,不是new一個出來,下面會具體說下。


上圖中,可以看到,readIndex和writeIndex把緩沖區(qū)分成了三塊,readIndex會小于或者等于writeIndex,這個應(yīng)該好理解,還沒有寫到那里,就去讀取了,能讀取到什么呢。

堆內(nèi)和堆外內(nèi)存

socket是操作系統(tǒng)底層提供給上層應(yīng)用使用的網(wǎng)絡(luò)通信API,當(dāng)要去讀取或者寫入的數(shù)據(jù)在JVM的堆中,那么就先需要把JVM堆中需要讀取的數(shù)據(jù)拷貝一份到操作系統(tǒng)中,然后socket再去讀取,而直接內(nèi)存的好處是socket可以直接讀取,少了拷貝這一步操作。

ByteBuf動態(tài)擴(kuò)容
下面以堆內(nèi)的ByteBuf為例,查看源碼,分析ByteBuf的動態(tài)擴(kuò)容:
動態(tài)擴(kuò)容肯定是寫入數(shù)據(jù)的時候,ByteBuf的容量不夠了,才去擴(kuò)容的,所以需要跟蹤下面的代碼:

buf.writeBytes(bytes);

跟蹤上面的writeBytes,首先進(jìn)入了ByteBuf這個抽象類中,進(jìn)入了下面這個抽象方法:

public abstract ByteBuf writeBytes(byte[] src);

它的實現(xiàn)類如下:


進(jìn)入第一個AbstractByteBuf的方法:

 @Override
 public ByteBuf writeBytes(byte[] src) {
      writeBytes(src, 0, src.length);
      return this;
  }

再次調(diào)用了下面的方法:

 @Override
 public ByteBuf writeBytes(byte[] src, int srcIndex, int length) {
      //檢查是否可以寫入
      ensureWritable(length);
      setBytes(writerIndex, src, srcIndex, length);
      //把當(dāng)前的寫入位置加上寫入數(shù)據(jù)的長度
      writerIndex += length;
      return this;
  }

src是需要寫入的數(shù)據(jù),length是寫入數(shù)據(jù)的長度
然后會進(jìn)入ensureWritable方法,傳入的參數(shù)是:寫入數(shù)據(jù)的長度

@Override
public ByteBuf ensureWritable(int minWritableBytes) {
    //參數(shù)校驗
    checkPositiveOrZero(minWritableBytes, "minWritableBytes");
    //檢查容量是否可以寫入這么多數(shù)據(jù)
    ensureWritable0(minWritableBytes);
    return this;
}

//檢查參數(shù)是否小于0
public static int checkPositiveOrZero(int i, String name) {
    if (i < 0) {
         throw new IllegalArgumentException(name + ": " + i + " (expected: >= 0)");
     }
     return i;
 }

參數(shù)校驗完成后會進(jìn)入ensureWritable0方法:

final void ensureWritable0(int minWritableBytes) {
        //確保緩沖區(qū)可以訪問
        ensureAccessible();
        //如果寫入的數(shù)據(jù)長度小于等于剩余可寫數(shù)據(jù)的容量,就直接返回
        //就是說,容量足夠?qū)懭?,不需要擴(kuò)容
        if (minWritableBytes <= writableBytes()) {
            return;
        }
        if (checkBounds) {
            //maxCapacity是int的最大值
            //檢查寫入的數(shù)據(jù)長度是否比可以寫入的最大容量還要大
            //是的話就拋異常
            if (minWritableBytes > maxCapacity - writerIndex) {
                throw new IndexOutOfBoundsException(String.format(
                        "writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
                        writerIndex, minWritableBytes, maxCapacity, this));
            }
        }

        //正式的擴(kuò)容方法
        int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);

        //把擴(kuò)容后的新容量設(shè)置進(jìn)去
        capacity(newCapacity);
}

進(jìn)入AbstractByteBufAllocator類的擴(kuò)容方法:

//常量 4M
static final int CALCULATE_THRESHOLD = 1048576 * 4; // 4 MiB page

 @Override
 public int calculateNewCapacity(int minNewCapacity, int maxCapacity) {
        //校驗參數(shù)
        checkPositiveOrZero(minNewCapacity, "minNewCapacity");
        //minNewCapacity = writerIndex + minWritableBytes
        //已經(jīng)寫入的數(shù)據(jù)索引加上當(dāng)前寫入的數(shù)據(jù)長度,就是需要的最小的容量
        //判斷是否比最大容量還大,是的話就拋異常
        if (minNewCapacity > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "minNewCapacity: %d (expected: not greater than maxCapacity(%d)",
                    minNewCapacity, maxCapacity));
        }
        final int threshold = CALCULATE_THRESHOLD; // 4 MiB page
        //如果需要的最小容量等于4M,就直接返回4M,作為擴(kuò)容后的容量
        if (minNewCapacity == threshold) {
            return threshold;
        }

        //如果需要的最小容量大于4M,就按照下面的擴(kuò)容方式擴(kuò)容
        if (minNewCapacity > threshold) {
            //newCapacity = 15 / 4194304 * 4194304 
            int newCapacity = minNewCapacity / threshold * threshold;
            //如果計算出的容量大于最大容量減去4M,就把最大容量賦值給新的容量
            if (newCapacity > maxCapacity - threshold) {
                newCapacity = maxCapacity;
            } else {
                newCapacity += threshold;
            }
            return newCapacity;
        }

        //如果需要的最小容量小于4M,就按照下面的方式擴(kuò)容
        int newCapacity = 64;
        while (newCapacity < minNewCapacity) {
            newCapacity <<= 1;
        }

        return Math.min(newCapacity, maxCapacity);
}

再看下capacity方法:
下面的把擴(kuò)容后的容量放到ByteBuf,就是使用了arraycopy方法

 @Override
    public ByteBuf capacity(int newCapacity) {
        checkNewCapacity(newCapacity);

        int oldCapacity = array.length;
        byte[] oldArray = array;
        if (newCapacity > oldCapacity) {
            byte[] newArray = allocateArray(newCapacity);
            System.arraycopy(oldArray, 0, newArray, 0, oldArray.length);
            setArray(newArray);
            freeArray(oldArray);
        } else if (newCapacity < oldCapacity) {
            byte[] newArray = allocateArray(newCapacity);
            int readerIndex = readerIndex();
            if (readerIndex < newCapacity) {
                int writerIndex = writerIndex();
                if (writerIndex > newCapacity) {
                    writerIndex(writerIndex = newCapacity);
                }
                System.arraycopy(oldArray, readerIndex, newArray, readerIndex, writerIndex - readerIndex);
            } else {
                setIndex(newCapacity, newCapacity);
            }
            setArray(newArray);
            freeArray(oldArray);
        }
        return this;
    }

下面是跟蹤的代碼步驟:


總結(jié)下動態(tài)擴(kuò)容機(jī)制:
1:write*方法調(diào)用的時候,會通過ensureWritable0方法檢查
2:calculateNewCapacity方法是用來計算容量的方法

擴(kuò)容計算方法:
1:需要的容量沒有超過4M,會從64字節(jié)開始擴(kuò)容,每次增加一倍,直到計算出來的容量滿足需要的最小容量,假如,當(dāng)前大小是256,已經(jīng)寫入了200字節(jié),再次寫入60字節(jié),需要的最小容量是260字節(jié),那么擴(kuò)容后的容量是64 * 2 * 2 * 2=512
2:需要的容量超過4M,擴(kuò)容計算方法為:新容量 = 新容量的最小要求 / 4M * 4M + 4M,假如當(dāng)前大小是3M,已經(jīng)寫了2M,再寫入3M,需要的最小容量是5M,那么擴(kuò)容后的容量是 5 / 4 * 4 + 4 = 8M

圖示1:需要的容量小于4M:


圖示2:需要的容量大于4M:


ByteBuf有哪些實現(xiàn)
ByteBuf從3個維度,有8種實現(xiàn)方式:


ByteBuf類圖


//堆內(nèi)
ByteBuf buf = Unpooled.buffer(10);
//堆外
ByteBuf buf = Unpooled.directBuffer(10);

ByteBuf提供了Unpooled非池化的類,可以直接使用,沒有提供Pool池化的類,下面追蹤源碼看下ByteBuf是怎樣分配的:

Unpooled.buffer分配方式

首先進(jìn)入Unpooled類:

private static final ByteBufAllocator ALLOC = UnpooledByteBufAllocator.DEFAULT;
//使用默認(rèn)的分配器分配堆內(nèi)buffer
public static ByteBuf buffer(int initialCapacity) {
  return ALLOC.heapBuffer(initialCapacity);
}

下面會進(jìn)入接口類ByteBufAllocator:

//分配一個指定容量的堆內(nèi)buf
ByteBuf heapBuffer(int initialCapacity);

然后進(jìn)入AbstractByteBufAllocator抽象類:

//如果沒有指定初始容量,默認(rèn)的初始容量大小是256
static final int DEFAULT_INITIAL_CAPACITY = 256;
//最大容量,為int的最大值
static final int DEFAULT_MAX_CAPACITY = Integer.MAX_VALUE;

@Override
public ByteBuf heapBuffer(int initialCapacity) {
   return heapBuffer(initialCapacity, DEFAULT_MAX_CAPACITY);
}

 @Override
 public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) {
     //如果初始化的容量是0,最大的容量也是0,就返回一個空的Buf
     if (initialCapacity == 0 && maxCapacity == 0) {
         return emptyBuf;
     }
     validate(initialCapacity, maxCapacity);
     return newHeapBuffer(initialCapacity, maxCapacity);
 }

//校驗參數(shù)
private static void validate(int initialCapacity, int maxCapacity) {
   //檢查參數(shù)
   checkPositiveOrZero(initialCapacity, "initialCapacity");
   //如果初始化的容量大于最大容量,就拋異常
    if (initialCapacity > maxCapacity) {
        throw new IllegalArgumentException(String.format(
                "initialCapacity: %d (expected: not greater than maxCapacity(%d)",
                initialCapacity, maxCapacity));
    }
}

然后是newHeapBuffer抽象方法:

protected abstract ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity);

因為這里初始化的是非池化的,所以會進(jìn)入UnpooledByteBufAllocator類:

@Override
protected ByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
     //PlatformDependent.hasUnsafe()是檢查當(dāng)前操作系統(tǒng)是否支持unsafe操作
     //根據(jù)支持與否,進(jìn)入不同的類
     return PlatformDependent.hasUnsafe() ?
             new InstrumentedUnpooledUnsafeHeapByteBuf(this, initialCapacity, maxCapacity) :
             new InstrumentedUnpooledHeapByteBuf(this, initialCapacity, maxCapacity);
 }

支持Unsafe操作的進(jìn)入下面:

 InstrumentedUnpooledUnsafeHeapByteBuf(UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
         super(alloc, initialCapacity, maxCapacity);
 }

不支持Unsafe的進(jìn)入下面這個:

InstrumentedUnpooledHeapByteBuf(UnpooledByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
         super(alloc, initialCapacity, maxCapacity);
}

現(xiàn)在以支持Unsafe操作往下面走,進(jìn)入UnpooledUnsafeHeapByteBuf類:

 UnpooledUnsafeHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        super(alloc, initialCapacity, maxCapacity);
}

再次調(diào)用了父類UnpooledHeapByteBuf:

//分配器
private final ByteBufAllocator alloc;
//byte數(shù)組,ByteBuf數(shù)據(jù)底層就是使用這個存儲
byte[] array;

public UnpooledHeapByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        super(maxCapacity);

        //檢查分配器是否為空
        checkNotNull(alloc, "alloc");
        //如果初始化的容量大于最大容量,就拋異常
        if (initialCapacity > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
        }

        this.alloc = alloc;
        //設(shè)置當(dāng)前的數(shù)組是分配之后的數(shù)組
        setArray(allocateArray(initialCapacity));
        //初始化readIndex和writeIndex
        setIndex(0, 0);
 }

//分配數(shù)組
protected byte[] allocateArray(int initialCapacity) {
     //返回一個具有initialCapacity容量大小的byte數(shù)組
     return new byte[initialCapacity];
}

//set數(shù)組
 private void setArray(byte[] initialArray) {
        array = initialArray;
        tmpNioBuf = null;
}

AbstractByteBuf類下的setIndex方法:

//初始化readerIndex和writerIndex
@Override
public ByteBuf setIndex(int readerIndex, int writerIndex) {
    if (checkBounds) {
        checkIndexBounds(readerIndex, writerIndex, capacity());
    }
    setIndex0(readerIndex, writerIndex);
    return this;
}

final void setIndex0(int readerIndex, int writerIndex) {
      this.readerIndex = readerIndex;
      this.writerIndex = writerIndex;
}

上面走到AbstractByteBuf后,就分配完了一個非池化、堆內(nèi)的ByteBuf,下面是追蹤的代碼:


總結(jié):
可以看到,分配一個非池化、堆內(nèi)的ByteBuf,它的底層就是byte數(shù)組

Unpooled.directBuffer分配方式

首先進(jìn)入的也是Unpooled類:

public static ByteBuf directBuffer(int initialCapacity) {
     return ALLOC.directBuffer(initialCapacity);
}

然后進(jìn)入ByteBufAllocator抽象類:

ByteBuf directBuffer(int initialCapacity);

然后到AbstractByteBufAllocator類:

@Override
public ByteBuf directBuffer(int initialCapacity) {
     return directBuffer(initialCapacity, DEFAULT_MAX_CAPACITY);
 }
 
@Override
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
     //如果初始化的容量和最大容量都是0,就返回一個空的Buf
     if (initialCapacity == 0 && maxCapacity == 0) {
         return emptyBuf;
     }
     //校驗參數(shù)
     validate(initialCapacity, maxCapacity);
     return newDirectBuffer(initialCapacity, maxCapacity);
 }
 
protected abstract ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity);

由于分配的也是一個非池化的,所以newDirectBuffer會進(jìn)入UnpooledByteBufAllocator類中的實現(xiàn)類:

@Override
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
     final ByteBuf buf;
     //同樣的,會判斷是否支持unsafe操作
     if (PlatformDependent.hasUnsafe()) {
         buf = noCleaner ? new InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf(this, initialCapacity, maxCapacity) :
                 new InstrumentedUnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
     } else {
         buf = new InstrumentedUnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
     }
     return disableLeakDetector ? buf : toLeakAwareBuffer(buf);
 }

以InstrumentedUnpooledUnsafeNoCleanerDirectByteBuf為例,后面兩個其實也相差不大,進(jìn)入UnpooledUnsafeNoCleanerDirectByteBuf類的構(gòu)造方法:

 UnpooledUnsafeNoCleanerDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        super(alloc, initialCapacity, maxCapacity);
    }

再次調(diào)用的父類UnpooledUnsafeDirectByteBuf:

ByteBuffer buffer;

public UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, int initialCapacity, int maxCapacity) {
        super(maxCapacity);
        if (alloc == null) {
            throw new NullPointerException("alloc");
        }
        //校驗參數(shù)
        checkPositiveOrZero(initialCapacity, "initialCapacity");
        checkPositiveOrZero(maxCapacity, "maxCapacity");
        if (initialCapacity > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "initialCapacity(%d) > maxCapacity(%d)", initialCapacity, maxCapacity));
        }

        this.alloc = alloc;
        setByteBuffer(allocateDirect(initialCapacity), false);
}

//分配的是一個NIO中的ByteBuffer
protected ByteBuffer allocateDirect(int initialCapacity) {
      return ByteBuffer.allocateDirect(initialCapacity);
}

final void setByteBuffer(ByteBuffer buffer, boolean tryFree) {
        if (tryFree) {
            ByteBuffer oldBuffer = this.buffer;
            if (oldBuffer != null) {
                if (doNotFree) {
                    doNotFree = false;
                } else {
                    freeDirect(oldBuffer);
                }
            }
        }
        this.buffer = buffer;
        memoryAddress = PlatformDependent.directBufferAddress(buffer);
        tmpNioBuf = null;
        capacity = buffer.remaining();
}

ByteBuffer類下面的allocateDirect:

 public static ByteBuffer allocateDirect(int capacity) {
        return new DirectByteBuffer(capacity);
 }

代碼跟蹤圖:


總結(jié):
分配非池化、堆外的ByteBuf,可以看到底層是NIO的DirectByteBuffer實現(xiàn)的

ByteBufAllocator類圖

ByteBuf內(nèi)存復(fù)用

分配池化內(nèi)存

在上面根據(jù)源碼知道了怎么去分配非池化內(nèi)存,那么池化內(nèi)存要怎么分配呢?看下面的圖示:


上面就是分配池化內(nèi)存的步驟,接下來會根據(jù)源碼具體分析

內(nèi)存緩存池

jemalloc內(nèi)存分配機(jī)制

1:內(nèi)存池中有三大區(qū)域,分別是:tiny、small、normal
2:每個區(qū)域分了不同大小的格子,每個格子只能緩存對應(yīng)大小的內(nèi)存塊
3:支持最大的格子內(nèi)存是32kb,超過這個大小的不能被緩存,只能被釋放掉
4:每個類型的格子都有對應(yīng)的數(shù)量:tiny:512個,small:256個,normal:64個,例如tiny區(qū)域的每個大小的格子都有512個,如果滿了就不會被回收,內(nèi)存會被釋放掉

回收池化內(nèi)存

分配池化內(nèi)存的過程
上面分析了分配非池化內(nèi)存,下面看下怎么分配池化內(nèi)存:

 ByteBufAllocator allocator = ByteBufAllocator.DEFAULT;
 //分配的內(nèi)存最大長度為496
 ByteBuf buf1 = allocator.ioBuffer(495);
 System.out.printf("buf1: 0x%X%n", buf1.memoryAddress());
 //此時會被回收到tiny的512b格子中
 buf1.release();

 //從tiny的512b格子去取
 ByteBuf buf2 = allocator.ioBuffer(495);
 System.out.printf("buf2: 0x%X%n", buf2.memoryAddress());
 buf2.release();

先來看下ByteBufAllocator類:

//默認(rèn)ByteBuf分配器,在ByteBufUtil中初始化
ByteBufAllocator DEFAULT = ByteBufUtil.DEFAULT_ALLOCATOR;

跟蹤第一次的allocator.ioBuffer(495)代碼,首先進(jìn)入AbstractByteBufAllocator類:

@Override
public ByteBuf ioBuffer(int initialCapacity) {
    //如果支持Unsafe,就分配堆外內(nèi)存
    if (PlatformDependent.hasUnsafe()) {
        return directBuffer(initialCapacity);
    }
    //不支持Unsafe,就分配堆內(nèi)內(nèi)存
    return heapBuffer(initialCapacity);
}

然后調(diào)用了該類下面的directBuffer方法:

@Override
public ByteBuf directBuffer(int initialCapacity) {
     return directBuffer(initialCapacity, DEFAULT_MAX_CAPACITY);
 }

 @Override
 public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
     //如果初始化的容量和最大容量等于0,就返回一個空的ByteBuf
     if (initialCapacity == 0 && maxCapacity == 0) {
         return emptyBuf;
     }
     validate(initialCapacity, maxCapacity);
     return newDirectBuffer(initialCapacity, maxCapacity);
 }
 //校驗參數(shù)
 private static void validate(int initialCapacity, int maxCapacity) {
        checkPositiveOrZero(initialCapacity, "initialCapacity");
        if (initialCapacity > maxCapacity) {
            throw new IllegalArgumentException(String.format(
                    "initialCapacity: %d (expected: not greater than maxCapacity(%d)",
                    initialCapacity, maxCapacity));
        }
}

然后會進(jìn)入池化的ByteBuf分配器PooledByteBufAllocator類,可以實現(xiàn)內(nèi)存的復(fù)用:

// cache sizes  緩存默認(rèn)值
DEFAULT_TINY_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.tinyCacheSize", 512);
DEFAULT_SMALL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.smallCacheSize", 256);
DEFAULT_NORMAL_CACHE_SIZE = SystemPropertyUtil.getInt("io.netty.allocator.normalCacheSize", 64);

 @Override
 protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
     //從當(dāng)前線程中獲取cache對象
     PoolThreadCache cache = threadCache.get();
     //從cache中獲取Arena
     //Arena可以理解為一個netty提供的實際進(jìn)行buf分配和管理的工具
     PoolArena<ByteBuffer> directArena = cache.directArena;

     final ByteBuf buf;
     //如果有directArena就分配池化內(nèi)存
     if (directArena != null) {
         buf = directArena.allocate(cache, initialCapacity, maxCapacity);
     } else { //如果沒有directArena,就使用非池化Unpooled
         buf = PlatformDependent.hasUnsafe() ?
                 UnsafeByteBufUtil.newUnsafeDirectByteBuf(this, initialCapacity, maxCapacity) :
                 new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
     }

     return toLeakAwareBuffer(buf);
 }

再次跟蹤后進(jìn)入PoolArena類:
可以看到下面有三種類型tiny、small、normal

enum SizeClass {
     Tiny,
     Small,
    Normal
}
PooledByteBuf<T> allocate(PoolThreadCache cache, int reqCapacity, int maxCapacity) {
      //獲取一個ByteBuf對象
      PooledByteBuf<T> buf = newByteBuf(maxCapacity);
      //分配內(nèi)存
      allocate(cache, buf, reqCapacity);
      return buf;
}

@Override
protected PooledByteBuf<ByteBuffer> newByteBuf(int maxCapacity) {
    //如果支持Unsafe,就初始化一個PooledUnsafeDirectByteBuf
    if (HAS_UNSAFE) {
        return PooledUnsafeDirectByteBuf.newInstance(maxCapacity);
    } else { //不支持Unsafe,就初始化一個PooledDirectByteBuf
        return PooledDirectByteBuf.newInstance(maxCapacity);
    }
}

下面進(jìn)入PooledUnsafeDirectByteBuf類:
從線程回收棧中獲取一個buf,如果棧中沒有,就會創(chuàng)建一個新的,如果有,就會返回棧中的buf

//調(diào)用RECYCLER.get()時,線程棧中沒有可以復(fù)用的時,會調(diào)用newObject方法,此時創(chuàng)建出來的buf是空的
 private static final Recycler<PooledUnsafeDirectByteBuf> RECYCLER = new Recycler<PooledUnsafeDirectByteBuf>() {
      @Override
       protected PooledUnsafeDirectByteBuf newObject(Handle<PooledUnsafeDirectByteBuf> handle) {
           return new PooledUnsafeDirectByteBuf(handle, 0);
       }
};

static PooledUnsafeDirectByteBuf newInstance(int maxCapacity) {
       //RECYCLER,回收機(jī)制
       PooledUnsafeDirectByteBuf buf = RECYCLER.get();
       //取出來的可能是之前的buf,使用之前清理一下
       buf.reuse(maxCapacity);
       return buf;
}

然后再次回到PoolArena類中的allocate方法,分配內(nèi)存:

private void allocate(PoolThreadCache cache, PooledByteBuf<T> buf, final int reqCapacity) {
         //將需要的內(nèi)存大小計算為2^n
        final int normCapacity = normalizeCapacity(reqCapacity);
        //需要分配的內(nèi)存是否是tiny或者small類型
        if (isTinyOrSmall(normCapacity)) { // capacity < pageSize
            int tableIdx;
            PoolSubpage<T>[] table;
            boolean tiny = isTiny(normCapacity);
            if (tiny) { // < 512 //分配一個tiny內(nèi)存
                if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = tinyIdx(normCapacity);
                table = tinySubpagePools;
            } else {
                if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) {
                    // was able to allocate out of the cache so move on
                    return;
                }
                tableIdx = smallIdx(normCapacity);
                table = smallSubpagePools;
            }

            final PoolSubpage<T> head = table[tableIdx];

          
            synchronized (head) {
                final PoolSubpage<T> s = head.next;
                if (s != head) {
                    assert s.doNotDestroy && s.elemSize == normCapacity;
                    long handle = s.allocate();
                    assert handle >= 0;
                    s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
                    incTinySmallAllocation(tiny);
                    return;
                }
            }
            synchronized (this) {
                //分配一塊新的內(nèi)存
                allocateNormal(buf, reqCapacity, normCapacity);
            }

            incTinySmallAllocation(tiny);
            return;
        }
        if (normCapacity <= chunkSize) {
            if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) {
                // was able to allocate out of the cache so move on
                return;
            }
            synchronized (this) {
                allocateNormal(buf, reqCapacity, normCapacity);
                ++allocationsNormal;
            }
        } else {
            // Huge allocations are never served via the cache so just call allocateHuge
            allocateHuge(buf, reqCapacity);
        }
}

PoolThreadCache類下的allocateTiny方法:

boolean allocateTiny(PoolArena<?> area, PooledByteBuf<?> buf, int reqCapacity, int normCapacity) {
    return allocate(cacheForTiny(area, normCapacity), buf, reqCapacity);
}

//從cache中獲取buf
 private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
        int idx = PoolArena.tinyIdx(normCapacity);
        if (area.isDirect()) {
            return cache(tinySubPageDirectCaches, idx);
        }
        return cache(tinySubPageHeapCaches, idx);
    }

根據(jù)需要的容量獲取對應(yīng)的格子,走到PoolArena類下面的tinyIdx方法:

static int tinyIdx(int normCapacity) {
        return normCapacity >>> 4;
}

PoolThreadCache類下的allocate方法,把緩存格子的內(nèi)存分配到buf

private boolean allocate(MemoryRegionCache<?> cache, PooledByteBuf buf, int reqCapacity) {
      if (cache == null) {
          // no cache found so just return false here
          return false;
      }
      boolean allocated = cache.allocate(buf, reqCapacity);
      if (++ allocations >= freeSweepAllocationThreshold) {
          allocations = 0;
          trim();
      }
      return allocated;
  }

下面是具體跟蹤代碼的步驟圖:


上面的源碼是以tiny類型為例,其他兩種類型類似,當(dāng)?shù)谝淮畏峙鋭?chuàng)建了一塊新的內(nèi)存,然后被成功回收到內(nèi)存緩沖池后,再次分配對應(yīng)大小的內(nèi)存,會直接從內(nèi)存緩沖池中取,不會再次分配一塊新的內(nèi)存了

內(nèi)存回收的過程

接下來跟蹤release()方法,看下內(nèi)存回收的過程

buf1.release();

第一次進(jìn)入AbstractReferenceCountedByteBuf類:
Buf的引用計數(shù)器,用于內(nèi)存復(fù)用,有一個計數(shù)器refCnt,retain()計數(shù)器加一,release()計數(shù)器減一,
直到計數(shù)器為0,才調(diào)用deallocate()釋放,deallocate()方法由具體的buf自己實現(xiàn)。

 @Override
 public boolean release() {
     return release0(1);
 }
 private boolean release0(int decrement) {
        int rawCnt = nonVolatileRawCnt(), realCnt = toLiveRealCnt(rawCnt, decrement);
        //判斷當(dāng)前buf有沒有被引用了,沒有的話就調(diào)用deallocate
        if (decrement == realCnt) {
            if (refCntUpdater.compareAndSet(this, rawCnt, 1)) {
                deallocate();
                return true;
            }
            return retryRelease0(decrement);
        }
        return releaseNonFinal0(decrement, rawCnt, realCnt);
}

進(jìn)入PooledByteBuf類:

@Override
protected final void deallocate() {
    if (handle >= 0) {
        final long handle = this.handle;
        //表示當(dāng)前的buf不在使用任何一塊內(nèi)存區(qū)域
        this.handle = -1;
        //設(shè)置memory為null
        memory = null;
        //釋放buf的內(nèi)存
        chunk.arena.free(chunk, tmpNioBuf, handle, maxLength, cache);
        tmpNioBuf = null;
        chunk = null;
        //把buf對象放入對象回收棧
        recycle();
    }
}

再次進(jìn)入PoolArena類:

void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) {
        //判斷是否是unpooled
        if (chunk.unpooled) {
            int size = chunk.chunkSize();
            destroyChunk(chunk);
            activeBytesHuge.add(-size);
            deallocationsHuge.increment();
        } else {
            //判斷是哪種類型,tiny、small、normal
            SizeClass sizeClass = sizeClass(normCapacity);
            //放入緩存
            if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) {
                // cached so not free it.
                return;
            }

            freeChunk(chunk, handle, sizeClass, nioBuffer);
        }
}

//計算內(nèi)存區(qū)域是哪種類型
private SizeClass sizeClass(int normCapacity) {
        if (!isTinyOrSmall(normCapacity)) {
            return SizeClass.Normal;
        }
        return isTiny(normCapacity) ? SizeClass.Tiny : SizeClass.Small;
}

然后到PoolThreadCache類:

boolean add(PoolArena<?> area, PoolChunk chunk, ByteBuffer nioBuffer,
                long handle, int normCapacity, SizeClass sizeClass) {
     MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
     if (cache == null) {
         return false;
     }
     //加入到緩存隊列
     return cache.add(chunk, nioBuffer, handle);
}

 private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) {
        //判斷是哪種類型,然后把內(nèi)存回收到哪一塊
        switch (sizeClass) {
        case Normal:
            return cacheForNormal(area, normCapacity);
        case Small:
            return cacheForSmall(area, normCapacity);
        case Tiny:
            return cacheForTiny(area, normCapacity);
        default:
            throw new Error();
        }
}

  private MemoryRegionCache<?> cacheForTiny(PoolArena<?> area, int normCapacity) {
        int idx = PoolArena.tinyIdx(normCapacity);
        if (area.isDirect()) {
            return cache(tinySubPageDirectCaches, idx);
        }
        return cache(tinySubPageHeapCaches, idx);
    }

上述跟蹤代碼步驟圖:


ByteBuf零拷貝機(jī)制

Netty的零拷貝機(jī)制,是一種應(yīng)用層的實現(xiàn),和底層的JVM、操作系統(tǒng)內(nèi)存機(jī)制沒有過多的關(guān)聯(lián)

幾種示例
一:CompositeByteBuf,將多個ByteBuf合并為一個邏輯上的ByteBuf,避免了各個ByteBuf之間的拷貝
public static void test1() {
       ByteBuf buf1 = Unpooled.buffer(4);
       ByteBuf buf2 = Unpooled.buffer(3);
       byte[] bytes1 = {1,2};
       byte[] bytes2 = {3,4,5};
       buf1.writeBytes(bytes1);
       buf2.writeBytes(bytes2);
       CompositeByteBuf byteBuf = Unpooled.compositeBuffer();
       byteBuf = byteBuf.addComponents(true, buf1, buf2);
       System.out.println("byteBuf: " + byteBuf.toString());
}

上面輸出結(jié)果,ridx是順序讀的讀取位置,widx是順序?qū)懙膶懭胛恢茫琧ap是新的ByteBuf的容量,components是指新的ByteBuf是由幾個ByteBuf組成


二:wrappedBuffer()方法,將byte[]數(shù)組包裝成ByteBuf對象
public static void test2() {
      byte[] bytes = {1,2,3,4,5};
      ByteBuf buf = Unpooled.wrappedBuffer(bytes);
      System.out.println("buf:" + buf.toString());
}

輸出結(jié)果中:ridx是順序讀的讀取位置,widx是順序?qū)懙膶懭胛恢茫琧ap是ByteBuf的容量,新的ByteBuf里存的是數(shù)組的引用地址,實質(zhì)操作的還是原來的數(shù)組


三:slice()方法,將一個ByteBuf對象切分成多個ByteBuf對象
public static void test3() {
     ByteBuf buf = Unpooled.wrappedBuffer("hello".getBytes());
     ByteBuf byteBuf = buf.slice(1,2);
     System.out.println("byteBuf:" + byteBuf.toString());
}

輸出結(jié)果中,可以看到,有兩個ByteBuf,其中一個是原有的,新的ByteBuf中存放了原來的ByteBuf的引用地址,另一個是分割后的ByteBuf的引用地址

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容