JavaNIO源碼分析

首先給大家分享一個(gè)Linux下的OpenJDK1.8源碼,這個(gè)源碼里包含了sun包源碼 ,自己要去找挺難找的,下面的源碼分析就用到了。
OpenJDK1.8 提取碼:xbae

一、NIO簡(jiǎn)介

Java NIO主要由三個(gè)部分組成,Channel、Buffer和Selector,Channel。
借用賽哥的一句話:NIO的本質(zhì)模型就是等待消息的到來(lái),處理到來(lái)的消息。
等待消息的到來(lái)這一部分由Selector去監(jiān)聽(tīng)。



處理到來(lái)的消息由Channel和Buffer處理,通道(Channel)類似于Java中的流(IO Stream),但是通道是雙向的(流是單向),通道中的數(shù)據(jù)必須先讀入到Buffer中,在從Buffer中進(jìn)行讀取,或者先把數(shù)據(jù)寫(xiě)入到Buffer中,在把Buffer中的 數(shù)據(jù)寫(xiě)入到通道。


二、Channel

Java NIO中提供以下4種Channel:

FileChannel:從文件中讀寫(xiě)數(shù)據(jù)
DatagramChannel:通過(guò)UDP協(xié)議讀寫(xiě)網(wǎng)絡(luò)中的數(shù)據(jù)
SocketChannel:通過(guò)TCP協(xié)議讀寫(xiě)網(wǎng)絡(luò)中的數(shù)據(jù)
ServerSocketChannel:在服務(wù)器端可以監(jiān)聽(tīng)新進(jìn)來(lái)的TCP連接,像WEB服務(wù)器那樣,對(duì)每一個(gè)新進(jìn)來(lái)的請(qǐng)求創(chuàng)建一個(gè)SocketChannel

三、Buffer

Java NIO 有以下Buffer類型

ByteBuffer
MappedByteBuffer
CharBuffer
DoubleBuffer
FloatBuffer
IntBuffer
LongBuffer
ShortBuffer

我們主要是通過(guò)對(duì)Buffer進(jìn)行讀寫(xiě)操作,將數(shù)據(jù)寫(xiě)入Channel中。通過(guò)研究Buffer源碼可以發(fā)現(xiàn),Buffer其實(shí)是數(shù)組,有以下幾個(gè)屬性。

索引 說(shuō)明
capacity 緩沖區(qū)數(shù)組的總長(zhǎng)度
position 下一個(gè)要操作的數(shù)據(jù)元素的位置
limit 緩沖區(qū)數(shù)組中不可操作的下一個(gè)元素的位置:limit<=capacity
mark 用于記錄當(dāng)前position的前一個(gè)位置或者默認(rèn)是-1

Buffer的設(shè)計(jì)非常簡(jiǎn)單,通過(guò)以上幾個(gè)簡(jiǎn)單的屬性就可以完成讀寫(xiě)操作,當(dāng)然,簡(jiǎn)單帶來(lái)壞處就是使用起來(lái)有點(diǎn)麻煩。不過(guò)熟練后還是使用起來(lái)還是很簡(jiǎn)單的。

我們看一個(gè)非常簡(jiǎn)單的例子:

初始狀態(tài)的一個(gè)ByteBuffer(總長(zhǎng)度為10):



向Buffer中寫(xiě)入5個(gè)字節(jié):



讀取的時(shí)候 ,調(diào)用Buffer.filp(),此時(shí)postion變?yōu)?,limit變?yōu)?5,也就是能從0讀取到4:

Buffer.compact()方法將所有未讀的數(shù)據(jù)拷貝到Buffer起始處。然后將position設(shè)到最后一個(gè)未讀元素正后面。
Buffer.rewind()方法將position設(shè)回0
Buffer.mark()方法,可以標(biāo)記Buffer中的一個(gè)特定的position,之后可以通過(guò)調(diào)用Buffer.reset()方法恢復(fù)到這個(gè)position
Buffer.rewind()方法將position設(shè)回0,所以你可以重讀Buffer中的所有數(shù)據(jù)。limit保持不變,仍然表示能從Buffer中讀取多少個(gè)元素。
四、NIO的一個(gè)小Demo
首先是服務(wù)端的代碼:

public class ServerConnect {
    private static final int BUF_SIZE = 1024;
    private static final int PORT = 8080;
    private static final int TIMEOUT = 3000;
 
    public static void selector() {
        Selector selector = null;
        ServerSocketChannel ssc = null;
        try {
            // 打開(kāi)一個(gè)Slectore
            selector = Selector.open();
            // 打開(kāi)一個(gè)Channel
            ssc = ServerSocketChannel.open();
            // 將Channel綁定端口
            ssc.socket().bind(new InetSocketAddress(PORT));
            // 設(shè)置Channel為非阻塞,如果設(shè)置為阻塞,其實(shí)和BIO差不多了。
            ssc.configureBlocking(false);
            // 向selector中注冊(cè)Channel和感興趣的事件
            ssc.register(selector, SelectionKey.OP_ACCEPT);
            while (true) {
                // selector監(jiān)聽(tīng)事件,select會(huì)被阻塞,直到selector監(jiān)聽(tīng)的channel中有事件發(fā)生或者超時(shí),會(huì)返回一個(gè)事件數(shù)量
                //TIMEOUT就是超時(shí)時(shí)間,selector初始化的時(shí)候會(huì)添加一個(gè)用于主動(dòng)喚醒的pipe,待會(huì)源碼分析會(huì)說(shuō)
                if (selector.select(TIMEOUT) == 0) {
                    System.out.println("==");
                    continue;
                }
                /**
                 * SelectionKey的組成是selector和Channel
                 * 有事件發(fā)生的channel會(huì)被包裝成selectionKey添加到selector的publicSelectedKeys屬性中
                 * publicSelectedKeys是SelectionKey的Set集合
                 *下面這一部分遍歷,就是遍歷有事件的channel
                 */
                Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    if (key.isAcceptable()) {
                        handleAccept(key);
                    }
                    if (key.isReadable()) {
                        handleRead(key);
                    }
                    if (key.isWritable() && key.isValid()) {
                        handleWrite(key);
                    }
                    if (key.isConnectable()) {
                        System.out.println("isConnectable = true");
                    }
                    //每次使用完,必須將該SelectionKey移除,否則會(huì)一直存儲(chǔ)在publicSelectedKeys中
                    //下一次遍歷又會(huì)重復(fù)處理
                    iter.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (selector != null) {
                    selector.close();
                }
                if (ssc != null) {
                    ssc.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    public static void handleAccept(SelectionKey key) throws IOException {
        ServerSocketChannel ssChannel = (ServerSocketChannel) key.channel();
        SocketChannel sc = ssChannel.accept();
        sc.configureBlocking(false);
        sc.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocateDirect(BUF_SIZE));
    }
    public static void handleRead(SelectionKey key) throws IOException {
        SocketChannel sc = (SocketChannel) key.channel();
        ByteBuffer buf = (ByteBuffer) key.attachment();
        long bytesRead = sc.read(buf);
        while (bytesRead > 0) {
            buf.flip();
            while (buf.hasRemaining()) {
                System.out.print((char) buf.get());
            }
            System.out.println();
            buf.clear();
            bytesRead = sc.read(buf);
        }
        if (bytesRead == -1) {
            sc.close();
        }
    }
    public static void handleWrite(SelectionKey key) throws IOException {
        ByteBuffer buf = (ByteBuffer) key.attachment();
        buf.flip();
        SocketChannel sc = (SocketChannel) key.channel();
        while (buf.hasRemaining()) {
            sc.write(buf);
        }
        buf.compact();
    }
}

客戶端代碼:

public class Client {
    public static void client() {
        // 申請(qǐng)一塊空間
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        SocketChannel socketChannel = null;
        Thread.currentThread().setName("client");
        try {
            // 打開(kāi)一個(gè)Channel
            socketChannel = SocketChannel.open();
            //設(shè)置為非阻塞
            socketChannel.configureBlocking(false);
            //連接IP和端口號(hào)
            socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
            if (socketChannel.finishConnect()) {
                int i = 0;
                while (true) {
                    // 為了不讓消息發(fā)送太快,每發(fā)一條睡1s
                    TimeUnit.SECONDS.sleep(1);
                    String info = Thread.currentThread().getName()+":I'm " + i++ + "-th information from client";
                    //清空Buffer
                    buffer.clear();
                    //寫(xiě)入到Buffer中
                    buffer.put(info.getBytes());
                    //進(jìn)行flip操作,為了下面可以將buffer中數(shù)據(jù)讀取到channel中。
                    buffer.flip();
                    // 將buffer中的數(shù)據(jù)寫(xiě)入到channel中
                    while (buffer.hasRemaining()) {
                        System.out.println(Thread.currentThread().getName()+":"+buffer);
                        int write = socketChannel.write(buffer);
                        System.out.println(Thread.currentThread().getName()+":"+write);
                    }
                }
            }
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
        } finally {
            try {
                if (socketChannel != null) {
                    socketChannel.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

五、NIO源碼分析

1.NIO中selector.open()是調(diào)用了SelectorProvider.provider().openSelector(),ServerSocketChannel.open()是調(diào)用SelectorProvider.provider().openServerSocketChannel(),兩個(gè)主要組件的開(kāi)啟都是SelectorProvider.provider()的提供,我們先看一下這個(gè)源碼。

public static SelectorProvider provider() {
    //很明顯provider是單例的
    synchronized (lock) {
        if (provider != null)
            return provider;
        return AccessController.doPrivileged(
                new PrivilegedAction<SelectorProvider>() {
                    public SelectorProvider run() {
                        if (loadProviderFromProperty())
                            return provider;
                        if (loadProviderAsService())
                            return provider;
                        //會(huì)根據(jù)不同的操作系統(tǒng)創(chuàng)建不同的provider
                        provider = sun.nio.ch.DefaultSelectorProvider.create();
                        return provider;
                    }
                });
    }
}
/**
 * 根據(jù)不同系統(tǒng)返回不同SelectorProvider.
 */
public static SelectorProvider create() {
    String osname = AccessController
        .doPrivileged(new GetPropertyAction("os.name"));
    if (osname.equals("SunOS"))
        return createProvider("sun.nio.ch.DevPollSelectorProvider");
    if (osname.equals("Linux"))
        return createProvider("sun.nio.ch.EPollSelectorProvider");
    return new sun.nio.ch.PollSelectorProvider();
}

這一部分設(shè)計(jì)的非常好,因?yàn)镹IO需要操作系統(tǒng)底層提供支持,這一部分代碼可以根據(jù)不同的操作系統(tǒng)提供不同的實(shí)現(xiàn),可以讓我們不用關(guān)系底層如何實(shí)現(xiàn)的。
2.ServerSocketChannel.open()最終是new ServerSocketChannelImpl(this);this是SelectorProvider的實(shí)現(xiàn)類的實(shí)例化對(duì)象

ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
    super(sp);
    //獲取文件描述符
    this.fd =  Net.serverSocket(true);
    this.fdVal = IOUtil.fdVal(fd);
    this.state = ST_INUSE;
}

3.selector.open()會(huì)根據(jù)操作系統(tǒng)的不同,得到的selector也會(huì)不同,如果是windows系統(tǒng)獲取的是WindowsSelectorImpl,Linux系統(tǒng)是EPollSelectorImpl,還有一些操作系統(tǒng)會(huì)提供PollSelectorImpl。這里的EPollSelectorImpl和PollSelectorImpl對(duì)應(yīng)這上次NIO基礎(chǔ)原理中的EPoll模型和Poll模型。
首先我們來(lái)分析一下PollSelectorImpl。

PollSelectorImpl(SelectorProvider sp) {
    super(sp, 1, 1);
    // 本地方法,新建一個(gè)pipe,返回以long編碼的管道的兩個(gè)文件描述符。
    //管道的讀端以高32位返回,
    //而寫(xiě)入結(jié)束以低32位返回。
    //這個(gè)pipe主要用途是用來(lái)喚醒selector的
    long pipeFds = IOUtil.makePipe(false);
    //讀文件描述符
    fd0 = (int) (pipeFds >>> 32);
    //寫(xiě)文件描述符
    fd1 = (int) pipeFds;
    try {
        // 新建一個(gè)存fd的數(shù)組
        pollWrapper = new PollArrayWrapper(INIT_CAP);
        // 初始化,將pipe的fd放入數(shù)組中
        pollWrapper.initInterrupt(fd0, fd1);
        // 新建一個(gè)存放SelectionKey的數(shù)組
        channelArray = new SelectionKeyImpl[INIT_CAP];
    } catch (Throwable t) {
        ......
    }
}

再來(lái)看看EPollSelectorImpl有什么不一樣:

EPollSelectorImpl(SelectorProvider sp) throws IOException {
    super(sp);
    long pipeFds = IOUtil.makePipe(false);
    fd0 = (int) (pipeFds >>> 32);
    fd1 = (int) pipeFds;
    try {
        //這里不再是一個(gè)數(shù)組了
        pollWrapper = new EPollArrayWrapper();
        //初始化,添加用于中斷的pipe
        pollWrapper.initInterrupt(fd0, fd1);
        fdToKey = new HashMap<>();
    } catch (Throwable t) {
        ......
    }
}
EPollArrayWrapper() throws IOException {
    // 創(chuàng)建epoll的文件描述符
    epfd = epollCreate();
    int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
    //開(kāi)辟一個(gè)數(shù)組,存儲(chǔ)來(lái)自epoll_wait的結(jié)果的epoll_event數(shù)組
    pollArray = new AllocatedNativeObject(allocationSize, true);
    pollArrayAddress = pollArray.address();
    //文件描述符> 64k時(shí)需要使用eventHigh
    if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)
        eventsHigh = new HashMap<>();
}

4.我們?cè)賮?lái)分析一下如何注冊(cè)事件的。注冊(cè)的時(shí)候,首先把selector和channel封裝成一個(gè)SelectionKeyImpl,最終調(diào)用implRegister(),把fd添加到pollWrapper,把key添加到keys中。因?yàn)閜ollWrapper數(shù)據(jù)結(jié)構(gòu)的不同,所以添加方式也有點(diǎn)區(qū)別。
poll:

protected void implRegister(SelectionKeyImpl ski) {
    synchronized (closeLock) {
        if (closed)
            throw new ClosedSelectorException();
        //檢測(cè)容量是否夠用
        if (channelArray.length == totalChannels) {
            // 新建一個(gè)更大的數(shù)組
            int newSize = pollWrapper.totalChannels * 2;
            SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];
            //拷貝
            for (int i=channelOffset; i<totalChannels; i++)
                temp[i] = channelArray[i];
            channelArray = temp;
            //擴(kuò)容存儲(chǔ)fd的數(shù)組
            pollWrapper.grow(newSize);
        }
        channelArray[totalChannels] = ski;
        ski.setIndex(totalChannels);
        pollWrapper.addEntry(ski.channel);
        totalChannels++;
        keys.add(ski);
    }
}

epoll

protected void implRegister(SelectionKeyImpl ski) {
    if (closed)
        throw new ClosedSelectorException();
    SelChImpl ch = ski.channel;
    int fd = Integer.valueOf(ch.getFDVal());
    fdToKey.put(fd, ski);
    pollWrapper.add(fd);
    keys.add(ski);
}

5.初始化準(zhǔn)備好了,我們?cè)賮?lái)分析一下select()是如何監(jiān)聽(tīng)channel的,select()最終是調(diào)用doSelect(long timeout)方法,里面調(diào)用本地方法,本地方法調(diào)用的系統(tǒng)提供的操作,這些操作對(duì)應(yīng)NIO基礎(chǔ)原理中的三個(gè)模型。
首先來(lái)PollSelectorImpl的:

protected int doSelect(long timeout)
    throws IOException
{
    if (channelArray == null)
        throw new ClosedSelectorException();
    processDeregisterQueue();
    try {
        begin();
        pollWrapper.poll(totalChannels, 0, timeout);
    } finally {
        end();
    }
    //清理那些已經(jīng)cancelled的SelectionKey
    processDeregisterQueue();
    //統(tǒng)計(jì)有事件發(fā)生的SelectionKey數(shù)量,并把符合條件發(fā)生事件的SelectionKey添加到selectedKeys哈希表中,提供給后續(xù)使用
    int numKeysUpdated = updateSelectedKeys();
    // 第零個(gè)位置使用來(lái)中斷的,如果不為0,則pipe中寫(xiě)入了數(shù)據(jù),用于中斷,這里進(jìn)行重置
    if (pollWrapper.getReventOps(0) != 0) {
        // Clear the wakeup pipe
        pollWrapper.putReventOps(0, 0);
        synchronized (interruptLock) {
            //將fd0的數(shù)據(jù)全部讀完
            IOUtil.drain(fd0);
            interruptTriggered = false;
        }
    }
    return numKeysUpdated;
}
/**
 * poll0是一個(gè)本地方法,調(diào)用系統(tǒng)底層的實(shí)現(xiàn)了
 * 對(duì)應(yīng)poll模型
 */
int poll(int numfds, int offset, long timeout) {
    return poll0(pollArrayAddress + (offset * SIZE_POLLFD),
                 numfds, timeout);
}

EPollSelectorImpl:

protected int doSelect(long timeout) throws IOException {
    if (closed)
        throw new ClosedSelectorException();
    //清理那些已經(jīng)cancelled的SelectionKey,底層會(huì)調(diào)用epoll_ctl方法移除被epoll所監(jiān)聽(tīng)的文件描述符
    processDeregisterQueue();
    try {
        begin();
        pollWrapper.poll(timeout);
    } finally {
        end();
    }
    //清理那些已經(jīng)cancelled的SelectionKey,底層會(huì)調(diào)用epoll_ctl方法移除被epoll所監(jiān)聽(tīng)的文件描述符
    processDeregisterQueue();
    //更新epoll已選擇fd的密鑰。 將就緒密鑰添加到就緒隊(duì)列。
    int numKeysUpdated = updateSelectedKeys();
    //判斷是否為中斷,如果中斷了,則清除記錄的中斷位置的內(nèi)容
    if (pollWrapper.interrupted()) {
        // Clear the wakeup pipe
        pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
        synchronized (interruptLock) {
            pollWrapper.clearInterrupted();
            IOUtil.drain(fd0);
            interruptTriggered = false;
        }
    }
    return numKeysUpdated;
}
/**
 *  epollWait也是一個(gè)本地方法
 *  對(duì)應(yīng)epoll模型
 */
int poll(long timeout) throws IOException {
    updateRegistrations();
    // 調(diào)用系統(tǒng)底層的實(shí)現(xiàn),會(huì)將有事件的fd放在pollArray中
    updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
    // 查詢是否存在中斷,并且記錄中斷事件的位置
    for (int i=0; i<updated; i++) {
        if (getDescriptor(i) == incomingInterruptFD) {
            interruptedIndex = i;
            interrupted = true;
            break;
        }
    }
    return updated;
}

6.NIO還有一個(gè)小細(xì)節(jié),就是我們最開(kāi)始建立selelctor的時(shí)候,會(huì)創(chuàng)建一個(gè)pipe,我之前也提到了這個(gè)pipe是用來(lái)喚醒selector,selector調(diào)用select()方法后,會(huì)進(jìn)入阻塞狀態(tài),如果沒(méi)有事件他會(huì)一直阻塞,那么我們?nèi)绾沃鲃?dòng)喚醒呢,于是就用到了這個(gè)pipe。
PollSelectorImpl和EPollSelectorImpl實(shí)現(xiàn)都是如下方式:

public Selector wakeup() {
    synchronized (interruptLock) {
        if (!interruptTriggered) {
            pollWrapper.interrupt();
            interruptTriggered = true;
        }
    }
    return this;
}

我對(duì)Linux下的pipe不太了解,我猜應(yīng)該是使用了中斷指令。
在Windows下,WindowsSelectorImpl的實(shí)現(xiàn)方式:

public Selector wakeup() {
    synchronized(this.interruptLock) {
        if (!this.interruptTriggered) {
            this.setWakeupSocket();
            this.interruptTriggered = true;
        }
        return this;
    }
}

其中setWakeupSocket()方法會(huì)調(diào)用一個(gè)本地方法setWakeupSocket0(),這個(gè)本地方法會(huì)想pipe中發(fā)送一個(gè)字節(jié),selector就能夠監(jiān)聽(tīng)到這個(gè)pipe中有讀事件,然后selector就被喚醒了。
貼一張NIO各個(gè)組件之間的關(guān)系圖,看完源碼后可以仔細(xì)看一下這幅圖,再自己跟著源碼走一遍。


6、總結(jié)和反思

NIO源碼分析到此結(jié)束了,此次閱讀源碼過(guò)程還是有點(diǎn)困難,但從中獲取到了很多的新知識(shí)。

1.最開(kāi)始,我以為大學(xué)學(xué)的操作系統(tǒng)沒(méi)太大用處,現(xiàn)在發(fā)現(xiàn),涉及到底層原理時(shí),離不開(kāi)操作系統(tǒng)的知識(shí)。以后得抽個(gè)時(shí)間把操作系統(tǒng)在好好看一遍。
2.NIO中用到了Reactor設(shè)計(jì)模式,有效的解決基于輪詢方式的效率低的問(wèn)題
3.select、poll和epoll底層數(shù)據(jù)各不相同,poll采用鏈表,解決了fd數(shù)量的限制,epoll底層使用的是紅黑樹(shù),能夠有效的提升效率。
4.NIO并不一定是非常高效的,在連接數(shù)量大,且連接比較短的情況下,NIO效率非常高,但是在連接數(shù)量小,且一次性發(fā)送大量數(shù)據(jù)的情況下,可以選擇BIO加多線程的方式處理。
5.除了NIO,還有一個(gè)AIO,以后有空可以研究研究。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 今天是我第六次去醫(yī)院準(zhǔn)備拔牙,之前的五次大概了解了拔牙需要的條件和主任醫(yī)師的時(shí)間,結(jié)果是成功的拔掉了左側(cè)上下兩顆阻...
    D1930閱讀 173評(píng)論 0 0
  • 準(zhǔn)備 下載genymotion-2.8.0-linux_x64.bin鏈接:https://pan.baidu.c...
    Yet_land閱讀 11,786評(píng)論 8 9
  • 假期陪母親看了一部劇,看哭了她,觸動(dòng)了我。有些情節(jié),至今還記得。我覺(jué)得其中幾個(gè)人講述了幾個(gè)道理,值得借鑒。 程紫月...
    遇琳閱讀 1,792評(píng)論 4 4
  • 曾經(jīng)的樣子,現(xiàn)在的模樣。燦爛的笑容經(jīng)不起歲月的蹉跎。稚嫩的臉龐抵不過(guò)時(shí)光的流逝。小時(shí)候多向往長(zhǎng)大、現(xiàn)在就多懷念青春...
    萍水相逢總是袁閱讀 215評(píng)論 0 0

友情鏈接更多精彩內(nèi)容