JDK NIO解析

學習Netty一般都是使用其中的NIO,因此就必須要了解JDK NIO的一些知識,包括 SocketChannel,ServerSocketChannel,Buffer, Selector, SelectionKey等等,只有對NIO有足夠了解,學習Netty起來才會事半功倍。在本文中,會結合源碼對JDK的NIO實現進行解析, 源碼中加入了我個人的理解的注釋,不足之處希望能在評論中不吝指出。

下面是一段JDK的NIO的服務端代碼:

/**
 * 代碼片段1
 * @author zhangc
 * @version 1.0
 * @date 2019/12/2
 */
public class NioServer {
    public static void main(String[] args) throws IOException {
        Selector selector1 = Selector.open();
        Selector selector2 = Selector.open();

        new Thread(() -> {
            try {
                // 啟動服務端
                ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
                //綁定1214端口
                serverSocketChannel.socket().bind(new InetSocketAddress(1214));
                //設置為非阻塞模式
                serverSocketChannel.configureBlocking(false);
                //注冊通道serverSocketChannel的OP_ACCEPT(服務端建立連接就緒)事件到serverSelector上
                serverSocketChannel.register(selector1, SelectionKey.OP_ACCEPT);

                while (true) {
                    // 監(jiān)測1毫秒內是否有新的連接
                    if (selector1.select(1) > 0) {
                        //獲取感興趣的事件
                        Set<SelectionKey> set = selector1.selectedKeys();
                        Iterator<SelectionKey> keyIterator = set.iterator();

                        while (keyIterator.hasNext()) {
                            SelectionKey key = keyIterator.next();

                            if (key.isAcceptable()) {
                                try {
                                    // 獲取此次OP_ACCEPT事件建的通道
                                    SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
                                    // 設置非阻塞模型
                                    clientChannel.configureBlocking(false);
                                    // 給這個通道綁定OP_READ(讀)事件
                                    clientChannel.register(selector2, SelectionKey.OP_READ);
                                } finally {
                                    keyIterator.remove();
                                }
                            }

                        }
                    }
                }
            } catch (IOException e) {
            }
        }).start();
        
        new Thread(() -> {
            try {
                while (true) {
                    // 監(jiān)測1毫秒內是否有新的可讀數據
                    if (selector2.select(1) > 0) {
                        //獲取感興趣的事件
                        Set<SelectionKey> set = selector2.selectedKeys();
                        Iterator<SelectionKey> keyIterator = set.iterator();

                        while (keyIterator.hasNext()) {
                            SelectionKey key = keyIterator.next();

                            if (key.isReadable()) {
                                try {
                                    //獲取此次讀事件的通道
                                    SocketChannel clientChannel = (SocketChannel) key.channel();
                                    //批量讀取數據
                                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                                    clientChannel.read(byteBuffer);
                                    //byteBuffer轉變?yōu)?                                    byteBuffer.flip();
                                    System.out.println(Charset.defaultCharset().newDecoder().decode(byteBuffer)
                                            .toString());
                                } finally {
                                    keyIterator.remove();
                                }
                            }

                        }
                    }
                }
            } catch (IOException e) {
            }
        }).start();
    }
}

Selector是多路復用器(實現可以是select,poll,epoll),相當于一個listener(個人看法),每一個Selector,都需要一個獨立的線程去處理。一個Selector中可以綁定多個SocketChannel/ServerSocketChannel,當綁定的SocketChannel/ServerSocketChannel中發(fā)生Selector感興趣的事件(SelectionKey)時,Selector中的publicSelectedKeys(Set)中會增加一個SelectionKey(這次的事件)對象。SelectionKey有四種類型,分別是OP_ACCEPT、OP_CONNECT、OP_READ、OP_WRITE,其中ServerSocketChannel的有效事件(validOps)為OP_ACCEPT,其余三種為SocketChannel的有效事件。上面的代碼一個線程中在selector1上給ServerSocketChannel注冊了OP_ACCEPT事件,selector1只負責監(jiān)聽新的連接,每當有新的連接建立時,selector1會新建一個此次連接的TCP通道——SocketChannel,然后在selector2上為這個SocketChannel 注冊OP_READ事件。在另一個線程中,selector2中綁定的SocketChannel每當有OP_READ事件時,讀取通道中的數據并打印在控制臺上。

關于SelectionKey

SelectionKey是一個抽象類,它有且僅有1個子類AbstractSelectionKey,AbstractSelectionKey有且僅有一個子類是SelectionKeyImpl。SelectionKey定義了4個常量:

    /**
     * 代碼片段2
     */
    public static final int OP_READ = 1 << 0;
    public static final int OP_WRITE = 1 << 2;
    public static final int OP_CONNECT = 1 << 3;
    public static final int OP_ACCEPT = 1 << 4;

只看2進制的后5位(前27位為0),OP_READ為00001,OP_WRITE為00100,OP_CONNECT為01000,OP_ACCEPT為10000,也就是分別是右邊第1,3,4,5位為1,其余位為0?;诖?,SocketChannel和ServerSocketChannel中獲取有效事件的方法是這樣的:

    /**
     * 代碼片段3
     * NIO代碼示例
     */
    /*
    * SocketChannel獲取有效事件 返回的二進制前27位為0,后5位為01101
    */
    public final int validOps() {
        return (SelectionKey.OP_READ
                | SelectionKey.OP_WRITE
                | SelectionKey.OP_CONNECT);
    }
    
    /*
     * ServerSocketChannel獲取有效事件 返回的二進制前27位為0,后5位為10000
     */
    public final int validOps() {
        return SelectionKey.OP_ACCEPT;
    }

下面我們看serverSocketChannel.register(selector1, SelectionKey.OP_ACCEPT)這個方法的實現:

    /**
     * 代碼片段4
     * AbstractSelectableChannel代碼(ServerSocketChanner和SocketChannel的父類)代碼
     */
   public final SelectionKey register(Selector sel, int ops)
        throws ClosedChannelException
    {
        return register(sel, ops, null);
    }
    
    public final SelectionKey register(Selector sel, int ops, Object att)
        throws ClosedChannelException
    {
        synchronized (regLock) {
            if (!isOpen())
                throw new ClosedChannelException();
            //判斷注冊的事件是否有效 validOps()獲取int類型表示的有效事件v,
            //v的32位2進制數為1的位,ops對應的位可以1,v為0的位ops對應的位只能為0,否則就無效
            if ((ops & ~validOps()) != 0) 
                throw new IllegalArgumentException();
            if (blocking)
                throw new IllegalBlockingModeException();
            //看這個channel是否已經綁定了這個Selector,如果已經綁定了這個Selector就重新設置這個Selector的interestOps
            //如果沒有綁定該Selector就會將其綁定 
            SelectionKey k = findKey(sel);
            if (k != null) {
                k.interestOps(ops);
                k.attach(att);
            }
            if (k == null) {
                // New registration
                synchronized (keyLock) {
                    if (!isOpen())
                        throw new ClosedChannelException();
                    k = ((AbstractSelector)sel).register(this, ops, att);
                    addKey(k);
                }
            }
            return k;
        }
    }

SelectionKey中維護了對應的Selector,在Channel中維護了一個SelectionKey的數組,在將Selector上為Channel注冊事件時,如果Channel中沒有綁定對應的Selector,那么就會新建一個SelectionKey(即上面代碼片段4中的k = ((AbstractSelector)sel).register(this, ops, att);),具體代碼如下:

    /**
     * 代碼片段5
     * AbstractSelectableChannel
     */
    protected final SelectionKey register(AbstractSelectableChannel var1, int var2, Object var3) {
        if (!(var1 instanceof SelChImpl)) {
            throw new IllegalSelectorException();
        } else {
            //這個this就是當前的Selector,將Selector作為構造參數構造一個SelectionKey對象
            SelectionKeyImpl var4 = new SelectionKeyImpl((SelChImpl)var1, this);
            var4.attach(var3);
            Set var5 = this.publicKeys;
            //將構造的SelectionKey對象也綁定到Selector中
            synchronized(this.publicKeys) {
                this.implRegister(var4);
            }
            //設置SelectionKey的感興趣事件
            var4.interestOps(var2);
            return var4;
        }
    }

構造完成后,會將這個SelectionKey放在Channel的SelectionKey類型數組中,而在Channel中判斷是否綁定事件,就是判斷這個數組中每一個SelectionKey中維護的Selector是否是這個Selector。這樣我們大致的了解了為Channel在Selector上注冊事件的流程:

  • 首先判斷是否是有效事件,不同的Channnel中維護的有效事件不同,ServerSocketChannel中的有效事件為OP_ACCEPT,SocketChannel中的有效事件為OP_CONNECT、OP_READ、OP_WRITE
  • 然后在Channel中的SelectionKey數組中判斷這些SelectionKey中維護的Selector是否包含當前的Selector,如果不包含就用這個Selector構造一個SelectionKey并放入數組
  • 如果包含這個Selector就將對應的SelectionKey的感興趣事件修改

注冊多個感興趣的事件只需要將多個事件對應的int進行或操作即可:

    /**
     * 代碼片段6
     * 注冊多個事件
     */
    channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE | SelectionKey.OP_CONNECT);

對SelectionKey注冊多個感興趣的時間也一樣,如果要增加SelectionKey的感興趣的事件,也只需進行或操作:

    /**
     * 代碼片段7
     * 增加SelectionKey的感興趣的事件
     */
    key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);

關于Buffer

在Buffer類的文檔注釋中是這樣描述的:一個特定的基本類型的數據容器,一個buffer(緩沖區(qū),即創(chuàng)建的Buffer對象)是一個線性的、有限的特定基本類型的元素序列,除了它的內容,它本質的屬性是它的capacity,limit和position。capacity:buffer包含的元素數量,不為負數且不可改變;limit:第一個不能被讀或寫的元素的索引,不為負數且不能大于capacity;position:下一個可以被讀或寫的元素的索引,不為負數且不大于limit。對于每一個非boolean類型的基本類型,jdk都提供了一個Buffer的子類。

數據傳輸

Buffer的每個子類都定義了2種get和put操作:

  • 相對操作——從當前position開始讀或者寫一個或多個元素,并根據傳輸的元素增加position的值,如果需要傳輸的數據超過了limit,相對get操作會拋出BufferUnderflowException異常,相對put操作會拋出BufferOverflowException,在這兩種情況下,數據不會被傳輸
  • 絕對操作——使用一個明確的元素索引,并不需要改變position,如果這個索引超出了limit,絕對操作會拋出IndexOutOfBoundsException

關于Buffer類為什么不直接定義get和put方法,我認為原因應該是:java的泛型不支持基本數據類型,如果要定義泛型的get和put方法就要轉化成包裝類,而在IO操作中頻繁地進行拆箱裝箱會影響性能。因此,jdk為除了boolean之外的每個基本數據類型定義了Buffer的子類,在這些子抽象類中定義了每種數據類型的get和put方法。

mark和reset

Buffer中的mark是當reset方法被執(zhí)行時position被重置的索引,mark值并不總是被定義(沒有定義的話,初始值為-1),但當mark被定義了,它不為負數且不大于position。如果定義了mark,當position被調整到小于mark時,mark被丟棄(置為-1)。如果mark沒有被定義時執(zhí)行reset方法,會拋出InvalidMarkException。

不變性

mark, position, limit和capacity的值存在以下不變性:

mark <= position <= limit <= capacity

新創(chuàng)建的buffer的position總是0,且mark是未定義的。初始化的limit值可能為0,也可能為其他值,取決于buffer的類型和它構造的方式,新分配的buffer的每個元素都初始化為零。

clear,flip和rewind

除了獲取position,limit和capacity的方法、獲取mark的方法和reset方法,Buffer類還定義了以下基于緩沖區(qū)的操作:

  • clear——讓buffer為一個新的通道讀取序列或相對put操作準備就緒,設置limit為capacity、position為0;
  • flip——讓buffer一個新的通道寫入序列或者相對get操作準備就緒,設置limit為當前的position、position為0;
  • rewind——讓buffer為重新讀取它已經包含的數據準備就緒,設置limit不變,position為0。
只讀的buffer

所有buffer都是可讀的,但不是所有buffer都是可寫的。修改buffer的方法對于每個Buffer的子類來說是可選的,當在只讀的buffer上調用這些修改buffer的方法時,會拋出ReadOnlyBufferException。只讀的buffer不允許它的數據內容被改變,但它的mark,position和limit是可變的。
一個buffer是不是只讀buffer取決于它的isReadOnly方法。

線程安全性

buffer不是多線程安全的。如果一個buffer用來被超過一個線程獲取,那么它應該被合適的同步操作進行控制。

HeapByteBuffer和DirectByteBuffer

HeapByteBuffer和DirectByteBuffer都是MappedByteBuffer的子類,MappedByteBuffer繼承了ByteBuffer類,顧名思義,HeapByteBuffer和DirectByteBuffer都是字節(jié)緩存區(qū),其他基本類型也都對應有HeapXXXBuffer和DirectXXXBuffer類。以HeapByteBuffer和DirectByteBuffer為例,它們的區(qū)別如下:

  • HeapByteBuffer是分配在JVM堆(Heap)內存上的,遵循JVM的內存管理機制(GC也由JVM負責)。HeapByteBuffer中維護了一個final修飾的byte數組hb,來存儲緩沖區(qū)的內容。
  • DirectByteBuffer是從堆外申請的內存(直接內存),這個內存大小不受-Xmx最大堆內存參數的限制。相比HeapByteBuffer,DirectByteBuffer減少了數據拷貝的次數,但創(chuàng)建和釋放的代價更高。DirectByteBuffer中維護了一個名為cleaner的虛引用跟蹤堆外內存以及一個long類型的address字段存儲內存地址,當DirectByteBuffer將要被GC時,cleaner這個指向直接內存的虛引用會被放入引用隊列,在cleaner指向的內存被回首之前,該DirectByteBuffer不會被徹底銷毀。(此處建議了解DMA、java的虛引用、操作系統(tǒng)的用戶態(tài)和內核態(tài)、Zero-Copy零拷貝的知識)

抽象類ByteBuffer提供了靜態(tài)方法allocate(int capacity)和allocateDirect(int capacity)分別用來創(chuàng)建HeapByteBuffer和DirectByteBuffer對象。在這個兩個方法中分別調用HeapByteBuffer和DirectByteBuffer的構造方法并返回。HeapByteBuffer的構造方法比較簡單,只是對對象的mark,hb,position,limit,capacity等屬性進行初始化;而DirectByteBuffer則不太一樣,如下所示:

    /**
     * 代碼片段8
     * DirectByteBuffer構造函數
     */
    // Primary constructor
    //
    DirectByteBuffer(int cap) {                   // package-private

        super(-1, 0, cap, cap);
        //內存是否按頁分配對齊
        boolean pa = VM.isDirectMemoryPageAligned();
        //每頁的大小
        int ps = Bits.pageSize();
        //分配內存的大小
        long size = Math.max(1L, (long)cap + (pa ? ps : 0));
        //將分配大小和容量大小加到Bits中的totalCapacity(總容量)和reservedMemory(總大?。﹥蓚€屬性(AtomicLong類型)中
        Bits.reserveMemory(size, cap);

        long base = 0;
        try {
            //在堆外分配內存
            base = unsafe.allocateMemory(size);
        } catch (OutOfMemoryError x) {
            //內存不夠再減去
            Bits.unreserveMemory(size, cap);
            throw x;
        }
        unsafe.setMemory(base, size, (byte) 0);
        //計算堆外內存的首地址
        if (pa && (base % ps != 0)) {
            // Round up to page boundary
            address = base + ps - (base & (ps - 1));
        } else {
            address = base;
        }
        //創(chuàng)建cleaner虛引用綁定這個buffer,cleaner回收時會執(zhí)行clean方法,clean方法會調用Deallocator的run方法
        cleaner = Cleaner.create(this, new Deallocator(base, size, cap));
        att = null;
    }

可以看到,DirectByteBuffer的創(chuàng)建需要使用unsafe去堆外申請內存,然后將地址保存下來,并通過一個Cleanner的虛引用保存該內存地址的首部
和長度。

下面來看HeapByteBuffer和DirectByteBuffer的get、put方法,首先是HeapByteBuffer的get方法:

    /**
     * 代碼片段9
     * HeapByteBuffer的get方法和get方法中調用的部分方法
     */
    
    //獲取當前position實際地址的方法
    //返回i加上偏移量offset 這個offset在創(chuàng)建HeapByteBuffer時指定,是一個final修飾的int類型
    protected int ix(int i) {
        return i + offset;
    }

    //相對操作get position加1 然后返回保存數據的數組中下標為position+offset的字節(jié)
    public byte get() {
        return hb[ix(nextGetIndex())];
    }

    //絕對操作get 返回保存數據的數組中下標為i+offset的字節(jié)
    public byte get(int i) {
        return hb[ix(checkIndex(i))];
    }

    //相對操作get,將保存數據的數組中下標從position+this.offset開始length長度的部分復制到dst數組中下標從offset開始length長度的部分
    //并設置position += length,返回當前的buffer
    public ByteBuffer get(byte[] dst, int offset, int length) {
        checkBounds(offset, length, dst.length);
        if (length > remaining())
            throw new BufferUnderflowException();
        System.arraycopy(hb, ix(position()), dst, offset, length);
        position(position() + length);
        return this;
    }

然后是HeapByteBuffer中的put方法:

    /**
     * 代碼片段10
     * HeapByteBuffer的put方法和put方法中調用的部分方法
     */
    //相對操作put position加1 將x保存在保存書據的數組的position+offset下標位置
    public ByteBuffer put(byte x) {
        hb[ix(nextPutIndex())] = x;
        return this;
    }

    //絕對操作put position不變 將x保存在保存書據的數組的i+offset下標位置
    public ByteBuffer put(int i, byte x) {
        hb[ix(checkIndex(i))] = x;
        return this;
    }

    //相對操作put 將src中從offset開始的length長度的數據復制到buffer中數組的position+offset開始的length長度的部分
    //并設置position += length,返回當前的buffer
    public ByteBuffer put(byte[] src, int offset, int length) {
        //檢查是否數組越界
        checkBounds(offset, length, src.length);
        if (length > remaining())
            throw new BufferOverflowException();
        System.arraycopy(src, offset, hb, ix(position()), length);
        position(position() + length);
        return this;
    }

    //相對操作put 將另一個ByteBuffer src中從當前position+offset位置開始limit - position長度的數據
    //復制到this的position+offset的位置 并將src和this的position都加n(n為src的剩余可讀/寫長度,limit - position)
    public ByteBuffer put(ByteBuffer src) {
        if (src instanceof HeapByteBuffer) {
            //如果是HeapByteBuffer且不是本身就進行數組復制,并將this和src的position加n
            if (src == this)
                throw new IllegalArgumentException();
            HeapByteBuffer sb = (HeapByteBuffer)src;
            //判斷this的剩余容量夠不夠
            int n = sb.remaining();
            if (n > remaining())
                throw new BufferOverflowException();
            System.arraycopy(sb.hb, sb.ix(sb.position()),
                             hb, ix(position()), n);
            sb.position(sb.position() + n);
            position(position() + n);
        } else if (src.isDirect()) {
            //如果是直接字節(jié)緩沖,就用src的get操作將數據寫到this的保存數據的數組,并將position加n
            int n = src.remaining();
            if (n > remaining())
                throw new BufferOverflowException();
            src.get(hb, ix(position()), n);
            position(position() + n);
        } else {
            //如果既不是HeapByteBuffer,也不是直接字節(jié)緩沖,就調用父類的put方法
            super.put(src);
        }
        return this;
    }

下面是DirectByteBuffer的get方法:

    /**
     * 代碼片段11
     * DirectByteBuffer的get方法和get方法中調用的部分方法
     */
    
    //獲取當前position實際地址的方法
    //返回i加上address 
    private long ix(int i) {
        return address + ((long)i << 0);
    }
    //相對操作get 當前position++ 然后獲取從直接內存中獲取position+addess對應位置的值
    public byte get() {
        return ((unsafe.getByte(ix(nextGetIndex()))));
    }

    //絕對操作get position不變 從直接內存中獲取i+addess對應位置的值
    public byte get(int i) {
        return ((unsafe.getByte(ix(checkIndex(i)))));
    }
    
    //相對操作的get 從內存中將position+address開始、長度為length的數據取出放入數組dst的下標offset開始、length長度的部分
    //并設置position += length,返回當前的buffer
    public ByteBuffer get(byte[] dst, int offset, int length) {
        if (((long)length << 0) > Bits.JNI_COPY_TO_ARRAY_THRESHOLD) {
            //如果length大于Bits.JNI_COPY_TO_ARRAY_THRESHOLD(這個值為6)就直接復制到數組中
            //JNI_COPY_TO_ARRAY_THRESHOLD表示根據經驗確定JNI調用的平均成本逐個元素復制超過整個數組復制的點,這個數字可能會隨著時間改變
            //檢查數組是否越界
            checkBounds(offset, length, dst.length);
            int pos = position();
            int lim = limit();
            assert (pos <= lim);
            int rem = (pos <= lim ? lim - pos : 0);
            if (length > rem)
                throw new BufferUnderflowException();
            Bits.copyToArray(ix(pos), dst, arrayBaseOffset,
                             (long)offset << 0,
                             (long)length << 0);
            position(pos + length);
        } else {
            //如果不大于6,就調用父類的get方法進行逐個元素復制
            super.get(dst, offset, length);
        }
        return this;
    }
    
    //父類ByteBuffer中的get方法 循環(huán)去執(zhí)行無參的相對get方法
    public ByteBuffer get(byte[] dst, int offset, int length) {
        checkBounds(offset, length, dst.length);
        if (length > remaining())
            throw new BufferUnderflowException();
        int end = offset + length;
        for (int i = offset; i < end; i++)
            dst[i] = get();
        return this;
    }

然后是DirectByteBuffer的put方法:

    /**
     * 代碼片段12
     * DirectByteBuffer的put方法和put方法中調用的部分方法
     */
    //相對操作put position++ 然后將直接內存中address+position對應位置的設置為x
    public ByteBuffer put(byte x) {
        unsafe.putByte(ix(nextPutIndex()), ((x)));
        return this;
    }

    //絕對操作put position不變 將直接內存中address+i對應位置的設置為x
    public ByteBuffer put(int i, byte x) {
        unsafe.putByte(ix(checkIndex(i)), ((x)));
        return this;
    }

    //相對操作put 然后將直接內存中address+position對應位置開始、src剩余可讀/寫容量長度的內存區(qū)域設置為
    //src的position開始、src剩余可讀/寫容量長度的內容
    //并且src和this的position都加n(n為src剩余可讀/寫容量長度,limit - position )
    public ByteBuffer put(ByteBuffer src) {
        if (src instanceof DirectByteBuffer且不為本身,) {
            //如果src是DirectByteBuffer且不為本身,就在堆外進行內存復制
            if (src == this)
                throw new IllegalArgumentException();
            DirectByteBuffer sb = (DirectByteBuffer)src;

            int spos = sb.position();
            int slim = sb.limit();
            assert (spos <= slim);
            int srem = (spos <= slim ? slim - spos : 0);

            int pos = position();
            int lim = limit();
            assert (pos <= lim);
            int rem = (pos <= lim ? lim - pos : 0);

            //判斷是否有足夠的容量
            if (srem > rem)
                throw new BufferOverflowException();
            //內存拷貝
            unsafe.copyMemory(sb.ix(spos), ix(pos), (long)srem << 0);
            sb.position(spos + srem);
            position(pos + srem);
        } else if (src.hb != null) {
            //如果src不是DirectByteBuffer,且保存數據的數組hb不為空,就講hb作為參數調用下面的另一個重載的put方法
            int spos = src.position();
            int slim = src.limit();
            assert (spos <= slim);
            int srem = (spos <= slim ? slim - spos : 0);

            put(src.hb, src.offset + spos, srem);
            src.position(spos + srem);

        } else {
            super.put(src);
        }
        return this;
    }

    //相對操作的put 從內存中將position+address開始、長度為length的數據存入數組src的下標offset開始、length長度的部分的內容
    //并設置position += length,返回當前的buffer
    public ByteBuffer put(byte[] src, int offset, int length) {
        if (((long)length << 0) > Bits.JNI_COPY_FROM_ARRAY_THRESHOLD) {
            //如果length大于JNI_COPY_FROM_ARRAY_THRESHOLD(值為6) 就將整個數組直接復制到內存
            //表示根據經驗確定JNI調用的平均成本逐個元素復制超過整個數組復制的點,這個數字可能會隨著時間改變
            checkBounds(offset, length, src.length);
            int pos = position();
            int lim = limit();
            assert (pos <= lim);
            int rem = (pos <= lim ? lim - pos : 0);
            if (length > rem)
                throw new BufferOverflowException();
            //直接將數組拷貝到內存
            Bits.copyFromArray(src, arrayBaseOffset,
                               (long)offset << 0,
                               ix(pos),
                               (long)length << 0);
            position(pos + length);
        } else {
            //如果小于6 調用父類的put方法 循環(huán)將數組從offset到offset+length的數據逐個復制
            super.put(src, offset, length);
        }
        return this;
    }

以上的源碼中都清楚地通過注釋闡明了HeapByteBuffer和DirectByteBuffer的put和get操作的工作流程,根據源碼,可以總結出:

  • position就是buffer中當前讀到/寫到的地方的索引,當相對讀完或者相對寫完數據后,position會發(fā)生改變。因此在相對寫之后、相對讀之前,要調用buffer.flip()——設置limit為當前的position(因為position往后就沒有數據了)、position為0(為相對讀做好準備);在相對讀之后、相對寫之前,要調用buffer.clear()——設置limit為capacity(想寫多少寫多少,這里limit限制的是讀)、position為0(為相對寫做好準備)。flip和clear并不會改變數組hb或者堆外內存中數據的內容,只是改變幾個標記值。

總結

JDK的NIO雖然與Netty相比不盡完善,但還是有很多可取之處的。而且,學習JDK的NIO之后也能更方便地閱讀Netty的源碼。如果你想學習Netty,并正在了解JDK NIO的相關知識,希望這篇文章能給你帶來收獲。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容