NIO技術(shù)概覽

NIO(Non-blocking I/O,在Java領(lǐng)域,也稱為New I/O),是一種同步非阻塞的I/O模型,也是I/O多路復(fù)用的基礎(chǔ),已經(jīng)被越來越多地應(yīng)用到大型應(yīng)用服務(wù)器,成為解決高并發(fā)與大量連接、I/O處理問題的有效方式。

IO模型的分類

按照《Unix網(wǎng)絡(luò)編程》的劃分,I/O模型可以分為:阻塞I/O模型、非阻塞I/O模型、I/O復(fù)用模型、信號驅(qū)動式I/O模型和異步I/O模型,按照POSIX標(biāo)準(zhǔn)來劃分只分為兩類:同步I/O和異步I/O。

如何區(qū)分呢?首先一個(gè)I/O操作其實(shí)分成了兩個(gè)步驟:發(fā)起IO請求和實(shí)際的IO操作。同步I/O和異步I/O的區(qū)別就在于第二個(gè)步驟是否阻塞,如果實(shí)際的I/O讀寫阻塞請求進(jìn)程,那么就是同步I/O,因此阻塞I/O、非阻塞I/O、I/O復(fù)用、信號驅(qū)動I/O都是同步I/O,如果不阻塞,而是操作系統(tǒng)幫你做完I/O操作再將結(jié)果返回給你,那么就是異步I/O。

阻塞I/O和非阻塞I/O的區(qū)別在于第一步,發(fā)起I/O請求是否會被阻塞,如果阻塞直到完成那么就是傳統(tǒng)的阻塞I/O,如果不阻塞,那么就是非阻塞I/O。

  • 阻塞I/O模型 :在linux中,默認(rèn)情況下所有的socket都是blocking,一個(gè)典型的讀操作流程大概是這樣:
  • 非阻塞I/O模型:linux下,可以通過設(shè)置socket使其變?yōu)閚on-blocking。當(dāng)對一個(gè)non-blocking socket執(zhí)行讀操作時(shí),流程是這個(gè)樣子:
  • I/O復(fù)用模型:我們可以調(diào)用selectpoll,阻塞在這兩個(gè)系統(tǒng)調(diào)用中的某一個(gè)之上,而不是真正的IO系統(tǒng)調(diào)用上:
  • 信號驅(qū)動式I/O模型:我們可以用信號,讓內(nèi)核在描述符就緒時(shí)發(fā)送SIGIO信號通知我們:
  • 異步I/O模型:用戶進(jìn)程發(fā)起read操作之后,立刻就可以開始去做其它的事。而另一方面,從內(nèi)核的角度,當(dāng)它受到一個(gè)asynchronousread之后,首先它會立刻返回,所以不會對用戶進(jìn)程產(chǎn)生任何block。然后,內(nèi)核會等待數(shù)據(jù)準(zhǔn)備完成,然后將數(shù)據(jù)拷貝到用戶內(nèi)存,當(dāng)這一切都完成之后,內(nèi)核會給用戶進(jìn)程發(fā)送一個(gè)signal,告訴它read操作完成了:

以上參考自:《UNIX網(wǎng)絡(luò)編程》

從前面 I/O 模型的分類中,我們可以看出 AIO 的動機(jī)。阻塞模型需要在 I/O 操作開始時(shí)阻塞應(yīng)用程序。這意味著不可能同時(shí)重疊進(jìn)行處理和 I/O 操作。非阻塞模型允許處理和 I/O 操作重疊進(jìn)行,但是這需要應(yīng)用程序來檢查 I/O 操作的狀態(tài)。對于異步I/O ,它允許處理和 I/O 操作重疊進(jìn)行,包括 I/O 操作完成的通知。除了需要阻塞之外,select 函數(shù)所提供的功能(異步阻塞 I/O)與 AIO 類似。不過,它是對通知事件進(jìn)行阻塞,而不是對 I/O 調(diào)用進(jìn)行阻塞。

參考下知乎上的回答:

  • 同步與異步:同步和異步關(guān)注的是消息通信機(jī)制 (synchronous communication/ asynchronous communication)。所謂同步,就是在發(fā)出一個(gè)調(diào)用時(shí),在沒有得到結(jié)果之前,該調(diào)用就不返回。但是一旦調(diào)用返回,就得到返回值了。換句話說,就是由調(diào)用者主動等待這個(gè)調(diào)用的結(jié)果;
  • 阻塞與非阻塞:阻塞和非阻塞關(guān)注的是程序在等待調(diào)用結(jié)果(消息,返回值)時(shí)的狀態(tài)。阻塞調(diào)用是指調(diào)用結(jié)果返回之前,當(dāng)前線程會被掛起。調(diào)用線程只有在得到結(jié)果之后才會返回;而非阻塞調(diào)用指在不能立刻得到結(jié)果之前,該調(diào)用不會阻塞當(dāng)前線程。

鏈接:https://www.zhihu.com/question/19732473/answer/20851256

兩種IO多路復(fù)用方案:Reactor和Proactor

一般地,I/O多路復(fù)用機(jī)制都依賴于一個(gè)事件多路分離器(Event Demultiplexer)。分離器對象可將來自事件源的I/O事件分離出來,并分發(fā)到對應(yīng)的read/write事件處理器(Event Handler)。開發(fā)人員預(yù)先注冊需要處理的事件及其事件處理器(或回調(diào)函數(shù));事件分離器負(fù)責(zé)將請求事件傳遞給事件處理器。

兩個(gè)與事件分離器有關(guān)的模式是Reactor和Proactor。Reactor模式采用同步I/O,而Proactor采用異步I/O。在Reactor中,事件分離器負(fù)責(zé)等待文件描述符或socket為讀寫操作準(zhǔn)備就緒,然后將就緒事件傳遞給對應(yīng)的處理器,最后由處理器負(fù)責(zé)完成實(shí)際的讀寫工作。

而在Proactor模式中,處理器或者兼任處理器的事件分離器,只負(fù)責(zé)發(fā)起異步讀寫操作。I/O操作本身由操作系統(tǒng)來完成。傳遞給操作系統(tǒng)的參數(shù)需要包括用戶定義的數(shù)據(jù)緩沖區(qū)地址和數(shù)據(jù)大小,操作系統(tǒng)才能從中得到寫出操作所需數(shù)據(jù),或?qū)懭霃膕ocket讀到的數(shù)據(jù)。事件分離器捕獲I/O操作完成事件,然后將事件傳遞給對應(yīng)處理器。比如,在windows上,處理器發(fā)起一個(gè)異步I/O操作,再由事件分離器等待IOCompletion事件。典型的異步模式實(shí)現(xiàn),都建立在操作系統(tǒng)支持異步API的基礎(chǔ)之上,我們將這種實(shí)現(xiàn)稱為“系統(tǒng)級”異步或“真”異步,因?yàn)閼?yīng)用程序完全依賴操作系統(tǒng)執(zhí)行真正的I/O工作。

舉個(gè)例子,將有助于理解Reactor與Proactor二者的差異,以讀操作為例(寫操作類似)。

在Reactor中實(shí)現(xiàn)讀:

  • 注冊讀就緒事件和相應(yīng)的事件處理器;
  • 事件分離器等待事件;
  • 事件到來,激活分離器,分離器調(diào)用事件對應(yīng)的處理器;
  • 事件處理器完成實(shí)際的讀操作,處理讀到的數(shù)據(jù),注冊新的事件,然后返還控制權(quán)。

在Proactor中實(shí)現(xiàn)讀:

  • 處理器發(fā)起異步讀操作(注意:操作系統(tǒng)必須支持異步I/O)。在這種情況下,處理器無視I/O就緒事件,它關(guān)注的是完成事件;
  • 事件分離器等待操作完成事件;
  • 在分離器等待過程中,操作系統(tǒng)利用并行的內(nèi)核線程執(zhí)行實(shí)際的讀操作,并將結(jié)果數(shù)據(jù)存入用戶自定義緩沖區(qū),最后通知事件分離器讀操作完成;
  • 事件分離器呼喚處理器;
  • 事件處理器處理用戶自定義緩沖區(qū)中的數(shù)據(jù),然后啟動一個(gè)新的異步操作,并將控制權(quán)返回事件分離器。

可以看出,兩個(gè)模式的相同點(diǎn),都是對某個(gè)I/O事件的事件通知(即告訴某個(gè)模塊,這個(gè)I/O操作可以進(jìn)行或已經(jīng)完成)。在結(jié)構(gòu)上,兩者的相同點(diǎn)和不同點(diǎn)如下:

  • 相同點(diǎn):demultiplexor負(fù)責(zé)提交I/O操作(異步)、查詢設(shè)備是否可操作(同步),然后當(dāng)條件滿足時(shí),就回調(diào)handler;
  • 不同點(diǎn):異步情況下(Proactor),當(dāng)回調(diào)handler時(shí),表示I/O操作已經(jīng)完成;同步情況下(Reactor),回調(diào)handler時(shí),表示I/O設(shè)備可以進(jìn)行某個(gè)操作(can read or can write)。

參考自:https://www.zhihu.com/question/26943938/answer/68773398

傳統(tǒng)BIO模型

BIO是同步阻塞式IO,通常在while循環(huán)中服務(wù)端會調(diào)用accept方法等待接收客戶端的連接請求,一旦接收到一個(gè)連接請求,就可以建立通信套接字在這個(gè)通信套接字上進(jìn)行讀寫操作,此時(shí)不能再接收其他客戶端連接請求,只能等待同當(dāng)前連接的客戶端的操作執(zhí)行完成。

如果BIO要能夠同時(shí)處理多個(gè)客戶端請求,就必須使用多線程,即每次accept阻塞等待來自客戶端請求,一旦受到連接請求就建立通信套接字同時(shí)開啟一個(gè)新的線程來處理這個(gè)套接字的數(shù)據(jù)讀寫請求,然后立刻又繼續(xù)accept等待其他客戶端連接請求,即為每一個(gè)客戶端連接請求都創(chuàng)建一個(gè)線程來單獨(dú)處理。

我們看下傳統(tǒng)的BIO方式下的編程模型大致如下:

public class BIODemo {

    public static void main(String[] args) throws IOException {

        ExecutorService executor = Executors.newFixedThreadPool(128);

        ServerSocket serverSocket = new ServerSocket();
        serverSocket.bind(new InetSocketAddress(1234));
        // 循環(huán)等待新連接
        while (true) {
            Socket socket = serverSocket.accept();
            // 為新的連接創(chuàng)建線程執(zhí)行任務(wù)
            executor.submit(new ConnectionTask(socket));
        }
    }

}

class ConnectionTask extends Thread {
    private Socket socket;

    public ConnectionTask(Socket socket) {
        this.socket = socket;
    }

    public void run() {
        while (true) {
            InputStream inputStream = null;
            OutputStream outputStream = null;
            try {
                inputStream = socket.getInputStream();
                
                // read from socket...
                
                inputStream.read();
                
                outputStream = socket.getOutputStream();

                // write to socket...

                outputStream.write();

            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                // 關(guān)閉資源...

            }

        }
    }
}

這里之所以使用多線程,是因?yàn)閟ocket.accept()、inputStream.read()、outputStream.write()都是同步阻塞的,當(dāng)一個(gè)連接在處理I/O的時(shí)候,系統(tǒng)是阻塞的,如果是單線程的話在阻塞的期間不能接受任何請求。所以,使用多線程,就可以讓CPU去處理更多的事情。其實(shí)這也是所有使用多線程的本質(zhì):

  • 利用多核。
  • 當(dāng)I/O阻塞系統(tǒng),但CPU空閑的時(shí)候,可以利用多線程使用CPU資源。

使用線程池能夠讓線程的創(chuàng)建和回收成本相對較低。在活動連接數(shù)不是特別高(小于單機(jī)1000)的情況下,這種模型是比較不錯的,可以讓每一個(gè)連接專注于自己的I/O并且編程模型簡單,也不用過多考慮系統(tǒng)的過載、限流等問題。線程池可以緩沖一些過多的連接或請求。

但這個(gè)模型最本質(zhì)的問題在于,嚴(yán)重依賴于線程。但線程是很"貴"的資源,主要表現(xiàn)在:

  1. 線程的創(chuàng)建和銷毀成本很高,在Linux這樣的操作系統(tǒng)中,線程本質(zhì)上就是一個(gè)進(jìn)程。創(chuàng)建和銷毀都是重量級的系統(tǒng)函數(shù);
  2. 線程本身占用較大內(nèi)存,像Java的線程棧,一般至少分配512K~1M的空間,如果系統(tǒng)中的線程數(shù)過千,恐怕整個(gè)JVM的內(nèi)存都會被吃掉一半;
  3. 線程的切換成本是很高的。操作系統(tǒng)發(fā)生線程切換的時(shí)候,需要保留線程的上下文,然后執(zhí)行系統(tǒng)調(diào)用。如果線程數(shù)過高,可能執(zhí)行線程切換的時(shí)間甚至?xí)笥诰€程執(zhí)行的時(shí)間,這時(shí)候帶來的表現(xiàn)往往是系統(tǒng)load偏高、CPU sy使用率特別高(超過20%以上),導(dǎo)致系統(tǒng)幾乎陷入不可用的狀態(tài);
  4. 容易造成鋸齒狀的系統(tǒng)負(fù)載。因?yàn)橄到y(tǒng)負(fù)載是用活動線程數(shù)或CPU核心數(shù),一旦線程數(shù)量高但外部網(wǎng)絡(luò)環(huán)境不是很穩(wěn)定,就很容易造成大量請求的結(jié)果同時(shí)返回,激活大量阻塞線程從而使系統(tǒng)負(fù)載壓力過大。

所以,當(dāng)面對十萬甚至百萬級連接的時(shí)候,傳統(tǒng)的BIO模型是無能為力的。隨著移動端應(yīng)用的興起和各種網(wǎng)絡(luò)游戲的盛行,百萬級長連接日趨普遍,此時(shí),必然需要一種更高效的I/O處理模型。

NIO的實(shí)現(xiàn)原理

NIO本身是基于事件驅(qū)動思想來完成的,其主要想解決的是BIO的大并發(fā)問題,即在使用同步I/O的網(wǎng)絡(luò)應(yīng)用中,如果要同時(shí)處理多個(gè)客戶端請求,或是在客戶端要同時(shí)和多個(gè)服務(wù)器進(jìn)行通訊,就必須使用多線程來處理。也就是說,將每一個(gè)客戶端請求分配給一個(gè)線程來單獨(dú)處理。這樣做雖然可以達(dá)到我們的要求,但同時(shí)又會帶來另外一個(gè)問題。由于每創(chuàng)建一個(gè)線程,就要為這個(gè)線程分配一定的內(nèi)存空間(也叫工作存儲器),而且操作系統(tǒng)本身也對線程的總數(shù)有一定的限制。如果客戶端的請求過多,服務(wù)端程序可能會因?yàn)椴豢爸刎?fù)而拒絕客戶端的請求,甚至服務(wù)器可能會因此而癱瘓。

NIO基于Reactor,當(dāng)socket有流可讀或可寫入socket時(shí),操作系統(tǒng)會相應(yīng)的通知應(yīng)用程序進(jìn)行處理,應(yīng)用再將流讀取到緩沖區(qū)或?qū)懭氩僮飨到y(tǒng)。

也就是說,這個(gè)時(shí)候,已經(jīng)不是一個(gè)連接就要對應(yīng)一個(gè)處理線程了,而是有效的請求,對應(yīng)一個(gè)線程,當(dāng)連接沒有數(shù)據(jù)時(shí),是沒有工作線程來處理的。

下面看下代碼的實(shí)現(xiàn):

NIO服務(wù)端代碼(新建連接):

//獲取一個(gè)ServerSocket通道
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(port));
//獲取通道管理器
selector = Selector.open();
//將通道管理器與通道綁定,并為該通道注冊SelectionKey.OP_ACCEPT事件,
serverChannel.register(selector, SelectionKey.OP_ACCEPT);

NIO服務(wù)端代碼(監(jiān)聽):

while(true){
    //當(dāng)有注冊的事件到達(dá)時(shí),方法返回,否則阻塞。
   selector.select();
   for(SelectionKey key : selector.selectedKeys()){
       if(key.isAcceptable()){
           ServerSocketChannel server =
                (ServerSocketChannel)key.channel();
           SocketChannel channel = server.accept();
           channel.write(ByteBuffer.wrap(
            new String("send message to client").getBytes()));
           //在與客戶端連接成功后,為客戶端通道注冊SelectionKey.OP_READ事件。
           channel.register(selector, SelectionKey.OP_READ);
       }else if(key.isReadable()){//有可讀數(shù)據(jù)事件
           SocketChannel channel = (SocketChannel)key.channel();
           ByteBuffer buffer = ByteBuffer.allocate(10);
           int read = channel.read(buffer);
           byte[] data = buffer.array();
           String message = new String(data);
           System.out.println("receive message from client, size:"
               + buffer.position() + " msg: " + message);
       }
   }
}

NIO模型示例如下:

  • Acceptor注冊Selector,監(jiān)聽accept事件;
  • 當(dāng)客戶端連接后,觸發(fā)accept事件;
  • 服務(wù)器構(gòu)建對應(yīng)的Channel,并在其上注冊Selector,監(jiān)聽讀寫事件;
  • 當(dāng)發(fā)生讀寫事件后,進(jìn)行相應(yīng)的讀寫處理。

Reactor模型

有關(guān)Reactor模型結(jié)構(gòu),可以參考Doug Lea在 Scalable IO in Java 中的介紹。這里簡單介紹一下Reactor模式的典型實(shí)現(xiàn):

Reactor單線程模型

這是最簡單的單Reactor單線程模型。Reactor線程負(fù)責(zé)多路分離套接字、accept新連接,并分派請求到處理器鏈中。該模型適用于處理器鏈中業(yè)務(wù)處理組件能快速完成的場景。不過,這種單線程模型不能充分利用多核資源,所以實(shí)際使用的不多。

這個(gè)模型和上面的NIO流程很類似,只是將消息相關(guān)處理獨(dú)立到了Handler中去了。

代碼實(shí)現(xiàn)如下:

public class Reactor implements Runnable {
    final Selector selector;
    final ServerSocketChannel serverSocketChannel;
    public static void main(String[] args) throws IOException {
        new Thread(new Reactor(1234)).start();
    }
    public Reactor(int port) throws IOException {
        selector = Selector.open();
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().bind(new InetSocketAddress(port));
        serverSocketChannel.configureBlocking(false);
        SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        key.attach(new Acceptor());
    }
    @Override
    public void run() {
        while (!Thread.interrupted()) {
            try {
                selector.select();
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                for (SelectionKey selectionKey : selectionKeys) {
                    dispatch(selectionKey);
                }
                selectionKeys.clear();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    private void dispatch(SelectionKey selectionKey) {
        Runnable run = (Runnable) selectionKey.attachment();
        if (run != null) {
            run.run();
        }
    }
    class Acceptor implements Runnable {
        @Override
        public void run() {
            try {
                SocketChannel channel = serverSocketChannel.accept();
                if (channel != null) {
                    new Handler(selector, channel);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
class Handler implements Runnable {
    private final static int DEFAULT_SIZE = 1024;
    private final SocketChannel socketChannel;
    private final SelectionKey seletionKey;
    private static final int READING = 0;
    private static final int SENDING = 1;
    private int state = READING;
    ByteBuffer inputBuffer = ByteBuffer.allocate(DEFAULT_SIZE);
    ByteBuffer outputBuffer = ByteBuffer.allocate(DEFAULT_SIZE);
    public Handler(Selector selector, SocketChannel channel) throws IOException {
        this.socketChannel = channel;
        socketChannel.configureBlocking(false);
        this.seletionKey = socketChannel.register(selector, 0);
        seletionKey.attach(this);
        seletionKey.interestOps(SelectionKey.OP_READ);
        selector.wakeup();
    }
    @Override
    public void run() {
        if (state == READING) {
            read();
        } else if (state == SENDING) {
            write();
        }
    }
    class Sender implements Runnable {
        @Override
        public void run() {
            try {
                socketChannel.write(outputBuffer);
            } catch (IOException e) {
                e.printStackTrace();
            }
            if (outIsComplete()) {
                seletionKey.cancel();
            }
        }
    }
    private void write() {
        try {
            socketChannel.write(outputBuffer);
        } catch (IOException e) {
            e.printStackTrace();
        }
        while (outIsComplete()) {
            seletionKey.cancel();
        }
    }
    private void read() {
        try {
            socketChannel.read(inputBuffer);
            if (inputIsComplete()) {
                process();
                System.out.println("接收到來自客戶端(" + socketChannel.socket().getInetAddress().getHostAddress()
                        + ")的消息:" + new String(inputBuffer.array()));
                seletionKey.attach(new Sender());
                seletionKey.interestOps(SelectionKey.OP_WRITE);
                seletionKey.selector().wakeup();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public boolean inputIsComplete() {
        return true;
    }
    public boolean outIsComplete() {
        return true;
    }
    public void process() {
        // do something...
    }
}

雖然上面說到NIO一個(gè)線程就可以支持所有的IO處理。但是瓶頸也是顯而易見的。我們看一個(gè)客戶端的情況,如果這個(gè)客戶端多次進(jìn)行請求,如果在Handler中的處理速度較慢,那么后續(xù)的客戶端請求都會被積壓,導(dǎo)致響應(yīng)變慢!所以引入了Reactor多線程模型。

Reactor多線程模型

相比上一種模型,該模型在處理器鏈部分采用了多線程(線程池):

Reactor多線程模型就是將Handler中的IO操作和非IO操作分開,操作IO的線程稱為IO線程,非IO操作的線程稱為工作線程。這樣的話,客戶端的請求會直接被丟到線程池中,客戶端發(fā)送請求就不會堵塞。

可以將Handler做如下修改:

class Handler implements Runnable {
    private final static int DEFAULT_SIZE = 1024;
    private final SocketChannel socketChannel;
    private final SelectionKey seletionKey;
    private static final int READING = 0;
    private static final int SENDING = 1;
    private int state = READING;
    ByteBuffer inputBuffer = ByteBuffer.allocate(DEFAULT_SIZE);
    ByteBuffer outputBuffer = ByteBuffer.allocate(DEFAULT_SIZE);
    
    private Selector selector;
    private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime()
            .availableProcessors());
    private static final int PROCESSING = 3;
    public Handler(Selector selector, SocketChannel channel) throws IOException {
        this.selector = selector;
        this.socketChannel = channel;
        socketChannel.configureBlocking(false);
        this.seletionKey = socketChannel.register(selector, 0);
        seletionKey.attach(this);
        seletionKey.interestOps(SelectionKey.OP_READ);
        selector.wakeup();
    }
    @Override
    public void run() {
        if (state == READING) {
            read();
        } else if (state == SENDING) {
            write();
        }
    }
    class Sender implements Runnable {
        @Override
        public void run() {
            try {
                socketChannel.write(outputBuffer);
            } catch (IOException e) {
                e.printStackTrace();
            }
            if (outIsComplete()) {
                seletionKey.cancel();
            }
        }
    }
    private void write() {
        try {
            socketChannel.write(outputBuffer);
        } catch (IOException e) {
            e.printStackTrace();
        }
        if (outIsComplete()) {
            seletionKey.cancel();
        }
    }
    private void read() {
        try {
            socketChannel.read(inputBuffer);
            if (inputIsComplete()) {
                process();
                executorService.execute(new Processer());
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public boolean inputIsComplete() {
        return true;
    }
    public boolean outIsComplete() {
        return true;
    }
    public void process() {
    }
    synchronized void processAndHandOff() {
        process();
        state = SENDING; // or rebind attachment
        seletionKey.interestOps(SelectionKey.OP_WRITE);
        selector.wakeup();
    }
    class Processer implements Runnable {
        public void run() {
            processAndHandOff();
        }
    }
}

但是當(dāng)用戶進(jìn)一步增加的時(shí)候,Reactor會出現(xiàn)瓶頸!因?yàn)镽eactor既要處理IO操作請求,又要響應(yīng)連接請求。為了分擔(dān)Reactor的負(fù)擔(dān),所以引入了主從Reactor模型。

主從Reactor多線程模型

主從Reactor多線程模型是將Reactor分成兩部分,mainReactor負(fù)責(zé)監(jiān)聽server socket,accept新連接,并將建立的socket分派給subReactor。subReactor負(fù)責(zé)多路分離已連接的socket,讀寫網(wǎng)絡(luò)數(shù)據(jù),對業(yè)務(wù)處理功能,其扔給worker線程池完成。通常,subReactor個(gè)數(shù)上可與CPU個(gè)數(shù)等同:

這時(shí)可以把Reactor做如下修改:

public class Reactor {
    final ServerSocketChannel serverSocketChannel;
    Selector[] selectors; // also create threads
    AtomicInteger next = new AtomicInteger(0);
    ExecutorService sunReactors = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    public static void main(String[] args) throws IOException {
        new Reactor(1234);
    }
    public Reactor(int port) throws IOException {
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.socket().bind(new InetSocketAddress(port));
        serverSocketChannel.configureBlocking(false);
        selectors = new Selector[4];
        for (int i = 0; i < 4; i++) {
            Selector selector = Selector.open();
            selectors[i] = selector;
            SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            key.attach(new Acceptor());
            new Thread(() -> {
                while (!Thread.interrupted()) {
                    try {
                        selector.select();
                        Set<SelectionKey> selectionKeys = selector.selectedKeys();
                        for (SelectionKey selectionKey : selectionKeys) {
                            dispatch(selectionKey);
                        }
                        selectionKeys.clear();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }
    }
    private void dispatch(SelectionKey selectionKey) {
        Runnable run = (Runnable) selectionKey.attachment();
        if (run != null) {
            run.run();
        }
    }
    class Acceptor implements Runnable {
        @Override
        public void run() {
            try {
                SocketChannel connection = serverSocketChannel.accept();
                if (connection != null)
                    sunReactors.execute(new Handler(selectors[next.getAndIncrement() % selectors.length], connection));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

可見,主Reactor用于響應(yīng)連接請求,從Reactor用于處理IO操作請求。

AIO

與NIO不同,當(dāng)進(jìn)行讀寫操作時(shí),只須直接調(diào)用API的read或write方法即可。這兩種方法均為異步的,對于讀操作而言,當(dāng)有流可讀取時(shí),操作系統(tǒng)會將可讀的流傳入read方法的緩沖區(qū),并通知應(yīng)用程序;對于寫操作而言,當(dāng)操作系統(tǒng)將write方法傳遞的流寫入完畢時(shí),操作系統(tǒng)主動通知應(yīng)用程序。
即可以理解為,read/write方法都是異步的,完成后會主動調(diào)用回調(diào)函數(shù)。
在JDK1.7中,這部分內(nèi)容被稱作NIO.2,主要在java.nio.channels包下增加了下面四個(gè)異步通道:

  • AsynchronousSocketChannel
  • AsynchronousServerSocketChannel
  • AsynchronousFileChannel
  • AsynchronousDatagramChannel

我們看一下AsynchronousSocketChannel中的幾個(gè)方法:

public abstract class AsynchronousSocketChannel
    implements AsynchronousByteChannel, NetworkChannel
{
    public abstract Future<Integer> read(ByteBuffer dst);
    
    public abstract <A> void read(ByteBuffer[] dsts,
                                  int offset,
                                  int length,
                                  long timeout,
                                  TimeUnit unit,
                                  A attachment,
                                  CompletionHandler<Long,? super A> handler);

    public abstract <A> void write(ByteBuffer src,
                                   long timeout,
                                   TimeUnit unit,
                                   A attachment,
                                   CompletionHandler<Integer,? super A> handler);

    public final <A> void write(ByteBuffer src,
                                A attachment,
                                CompletionHandler<Integer,? super A> handler)

    {
        write(src, 0L, TimeUnit.MILLISECONDS, attachment, handler);
    }

    public abstract Future<Integer> write(ByteBuffer src);

    public abstract <A> void write(ByteBuffer[] srcs,
                                   int offset,
                                   int length,
                                   long timeout,
                                   TimeUnit unit,
                                   A attachment,
                                   CompletionHandler<Long,? super A> handler);
}

其中的read/write方法,有的會返回一個(gè)Future對象,有的需要傳入一個(gè)CompletionHandler對象,該對象的作用是當(dāng)執(zhí)行完讀取/寫入操作后,直接該對象當(dāng)中的方法進(jìn)行回調(diào)。

對于AsynchronousSocketChannel而言,在windows和linux上的實(shí)現(xiàn)類是不一樣的。

在windows上,AIO的實(shí)現(xiàn)是通過IOCP來完成的,實(shí)現(xiàn)類是:

WindowsAsynchronousSocketChannelImpl

實(shí)現(xiàn)的接口是:

Iocp.OverlappedChannel

而在linux上,實(shí)現(xiàn)類是:

UnixAsynchronousSocketChannelImpl

實(shí)現(xiàn)的接口是:

Port.PollableChannel

可見,最大的區(qū)別就是windows與linux中poll的實(shí)現(xiàn)不同。

AIO是一種接口標(biāo)準(zhǔn),各家操作系統(tǒng)可以實(shí)現(xiàn)也可以不實(shí)現(xiàn)。在不同操作系統(tǒng)上在高并發(fā)情況下最好都采用操作系統(tǒng)推薦的方式。Linux上還沒有真正實(shí)現(xiàn)網(wǎng)絡(luò)方式的AIO。

select和epoll的區(qū)別

當(dāng)需要讀兩個(gè)以上的I/O的時(shí)候,如果使用阻塞式的I/O,那么可能長時(shí)間的阻塞在一個(gè)描述符上面,另外的描述符雖然有數(shù)據(jù)但是不能讀出來,這樣實(shí)時(shí)性不能滿足要求,大概的解決方案有以下幾種:

  1. 使用多進(jìn)程或者多線程,但是這種方法會造成程序的復(fù)雜,而且對與進(jìn)程與線程的創(chuàng)建維護(hù)也需要很多的開銷(Apache服務(wù)器是用的子進(jìn)程的方式,優(yōu)點(diǎn)可以隔離用戶);
  2. 用一個(gè)進(jìn)程,但是使用非阻塞的I/O讀取數(shù)據(jù),當(dāng)一個(gè)I/O不可讀的時(shí)候立刻返回,檢查下一個(gè)是否可讀,這種形式的循環(huán)為輪詢(polling),這種方法比較浪費(fèi)CPU時(shí)間,因?yàn)榇蠖鄶?shù)時(shí)間是不可讀,但是仍花費(fèi)時(shí)間不斷反復(fù)執(zhí)行read系統(tǒng)調(diào)用;
  3. 異步I/O,當(dāng)一個(gè)描述符準(zhǔn)備好的時(shí)候用一個(gè)信號告訴進(jìn)程,但是由于信號個(gè)數(shù)有限,多個(gè)描述符時(shí)不適用;
  4. 一種較好的方式為I/O多路復(fù)用,先構(gòu)造一張有關(guān)描述符的列表(epoll中為隊(duì)列),然后調(diào)用一個(gè)函數(shù),直到這些描述符中的一個(gè)準(zhǔn)備好時(shí)才返回,返回時(shí)告訴進(jìn)程哪些I/O就緒。select和epoll這兩個(gè)機(jī)制都是多路I/O機(jī)制的解決方案,select為POSIX標(biāo)準(zhǔn)中的,而epoll為Linux所特有的。

它們的區(qū)別主要有三點(diǎn):

  1. select的句柄數(shù)目受限,在linux/posix_types.h頭文件有這樣的聲明:#define __FD_SETSIZE 1024表示select最多同時(shí)監(jiān)聽1024個(gè)fd。而epoll沒有,它的限制是最大的打開文件句柄數(shù)目;
  2. epoll的最大好處是不會隨著FD的數(shù)目增長而降低效率,在selec中采用輪詢處理,其中的數(shù)據(jù)結(jié)構(gòu)類似一個(gè)數(shù)組的數(shù)據(jù)結(jié)構(gòu),而epoll是維護(hù)一個(gè)隊(duì)列,直接看隊(duì)列是不是空就可以了。epoll只會對"活躍"的socket進(jìn)行操作---這是因?yàn)樵趦?nèi)核實(shí)現(xiàn)中epoll是根據(jù)每個(gè)fd上面的callback函數(shù)實(shí)現(xiàn)的。那么,只有"活躍"的socket才會主動的去調(diào)用 callback函數(shù)(把這個(gè)句柄加入隊(duì)列),其他idle狀態(tài)句柄則不會,在這點(diǎn)上,epoll實(shí)現(xiàn)了一個(gè)"偽"AIO。但是如果絕大部分的I/O都是“活躍的”,每個(gè)I/O端口使用率很高的話,epoll效率不一定比select高(可能是要維護(hù)隊(duì)列復(fù)雜);
  3. 使用mmap加速內(nèi)核與用戶空間的消息傳遞。無論是select,poll還是epoll都需要內(nèi)核把FD消息通知給用戶空間,如何避免不必要的內(nèi)存拷貝就很重要,在這點(diǎn)上,epoll是通過內(nèi)核于用戶空間mmap同一塊內(nèi)存實(shí)現(xiàn)的。

NIO與epoll

上文說到了select與epoll的區(qū)別,再總結(jié)一下Java NIO與select和epoll:

  • Linux2.6之后支持epoll
  • windows支持select而不支持epoll
  • 不同系統(tǒng)下nio的實(shí)現(xiàn)是不一樣的,包括Sunos linux 和windows
  • select的復(fù)雜度為O(N)
  • select有最大fd限制,默認(rèn)為1024
  • 修改sys/select.h可以改變select的fd數(shù)量限制
  • epoll的事件模型,無fd數(shù)量限制,復(fù)雜度O(1),不需要遍歷fd

以下代碼基于Java 8。

下面看下在NIO中Selector的open方法:

public static Selector open() throws IOException {
    return SelectorProvider.provider().openSelector();
}

這里使用了SelectorProvider去創(chuàng)建一個(gè)Selector,看下provider方法的實(shí)現(xiàn):

public static SelectorProvider provider() {
    synchronized (lock) {
        if (provider != null)
            return provider;
        return AccessController.doPrivileged(
            new PrivilegedAction<SelectorProvider>() {
                public SelectorProvider run() {
                        if (loadProviderFromProperty())
                            return provider;
                        if (loadProviderAsService())
                            return provider;
                        provider = sun.nio.ch.DefaultSelectorProvider.create();
                        return provider;
                    }
                });
    }
}

看下sun.nio.ch.DefaultSelectorProvider.create()方法,該方法在不同的操作系統(tǒng)中的代碼是不同的,在windows中的實(shí)現(xiàn)如下:

public static SelectorProvider create() {
    return new WindowsSelectorProvider();
}

在Mac OS中的實(shí)現(xiàn)如下:

public static SelectorProvider create() {
    return new KQueueSelectorProvider();
}

在linux中的實(shí)現(xiàn)如下:

public static SelectorProvider create() {
    String str = (String)AccessController.doPrivileged(new GetPropertyAction("os.name"));
    
    if (str.equals("SunOS"))
        return createProvider("sun.nio.ch.DevPollSelectorProvider");
    if (str.equals("Linux"))
        return createProvider("sun.nio.ch.EPollSelectorProvider");
    return new PollSelectorProvider();
}

我們看到create方法中是通過區(qū)分操作系統(tǒng)來返回不同的Provider的。其中SunOs就是Solaris返回的是DevPollSelectorProvider,對于Linux,返回的Provder是EPollSelectorProvider,其余操作系統(tǒng),返回的是PollSelectorProvider。

Zero Copy

許多web應(yīng)用都會向用戶提供大量的靜態(tài)內(nèi)容,這意味著有很多數(shù)據(jù)從硬盤讀出之后,會原封不動的通過socket傳輸給用戶。

這種操作看起來可能不會怎么消耗CPU,但是實(shí)際上它是低效的:

  1. kernel把從disk讀數(shù)據(jù);
  2. 將數(shù)據(jù)傳輸給application;
  3. application再次把同樣的內(nèi)容再傳回給處于kernel級的socket。

這種場景下,application實(shí)際上只是作為一種低效的中間介質(zhì),用來把磁盤文件的數(shù)據(jù)傳給socket。

數(shù)據(jù)每次傳輸都會經(jīng)過user和kernel空間都會被copy,這會消耗cpu,并且占用RAM的帶寬。

傳統(tǒng)的數(shù)據(jù)傳輸方式

像這種從文件讀取數(shù)據(jù)然后將數(shù)據(jù)通過網(wǎng)絡(luò)傳輸給其他的程序的方式其核心操作就是如下兩個(gè)調(diào)用:

File.read(fileDesc,buf,len);
Socket.send(socket,buf,len);

其上操作看上去只有兩個(gè)簡單的調(diào)用,但是其內(nèi)部過程卻要經(jīng)歷四次用戶態(tài)和內(nèi)核態(tài)的切換以及四次的數(shù)據(jù)復(fù)制操作:

上圖展示了數(shù)據(jù)從文件到socket的內(nèi)部流程。

下面看下用戶態(tài)和內(nèi)核態(tài)的切換過程:

步驟如下:

  1. read()的調(diào)用引起了從用戶態(tài)到內(nèi)核態(tài)的切換(看圖二),內(nèi)部是通過sys_read()(或者類似的方法)發(fā)起對文件數(shù)據(jù)的讀取。數(shù)據(jù)的第一次復(fù)制是通過DMA(直接內(nèi)存訪問)將磁盤上的數(shù)據(jù)復(fù)制到內(nèi)核空間的緩沖區(qū)中;
  2. 數(shù)據(jù)從內(nèi)核空間的緩沖區(qū)復(fù)制到用戶空間的緩沖區(qū)后,read()方法也就返回了。此時(shí)內(nèi)核態(tài)又切換回用戶態(tài),現(xiàn)在數(shù)據(jù)也已經(jīng)復(fù)制到了用戶地址空間的緩存中;
  3. socket的send()方法的調(diào)用又會引起用戶態(tài)到內(nèi)核的切換,第三次數(shù)據(jù)復(fù)制又將數(shù)據(jù)從用戶空間緩沖區(qū)復(fù)制到了內(nèi)核空間的緩沖區(qū),這次數(shù)據(jù)被放在了不同于之前的內(nèi)核緩沖區(qū)中,這個(gè)緩沖區(qū)與數(shù)據(jù)將要被傳輸?shù)降膕ocket關(guān)聯(lián);
  4. send()系統(tǒng)調(diào)用返回后,就產(chǎn)生了第四次用戶態(tài)和內(nèi)核態(tài)的切換。隨著DMA單獨(dú)異步的將數(shù)據(jù)從內(nèi)核態(tài)的緩沖區(qū)中傳輸?shù)絽f(xié)議引擎發(fā)送到網(wǎng)絡(luò)上,有了第四次數(shù)據(jù)復(fù)制。

Zero Copy的數(shù)據(jù)傳輸方式

java.nio.channels.FileChannel中定義了兩個(gè)方法:transferTo( )和 transferFrom( )。

transferTo( )和 transferFrom( )方法允許將一個(gè)通道交叉連接到另一個(gè)通道,而不需要通過一個(gè)中間緩沖區(qū)來傳遞數(shù)據(jù)。只有 FileChannel 類有這兩個(gè)方法,因此 channel-to-channel 傳輸中通道之一必須是 FileChannel。您不能在 socket 通道之間直接傳輸數(shù)據(jù),不過 socket 通道實(shí)現(xiàn) WritableByteChannelReadableByteChannel 接口,因此文件的內(nèi)容可以用 transferTo( ) 方法傳輸給一個(gè) socket 通道,或者也可以用 transferFrom( )方法將數(shù)據(jù)從一個(gè) socket 通道直接讀取到一個(gè)文件中。

下面根據(jù)transferTo() 方法來說明。

根據(jù)上文可知,transferTo() 方法可以把bytes直接從調(diào)用它的channel傳輸?shù)搅硪粋€(gè)WritableByteChannel,中間不經(jīng)過應(yīng)用程序。

下面看下該方法的定義:

public abstract long transferTo(long position, long count,
                                    WritableByteChannel target)
        throws IOException;

下圖展示了通過transferTo實(shí)現(xiàn)數(shù)據(jù)傳輸?shù)穆窂剑?/p>

下圖展示了內(nèi)核態(tài)、用戶態(tài)的切換情況:

使用transferTo()方式所經(jīng)歷的步驟:

  1. transferTo調(diào)用會引起DMA將文件內(nèi)容復(fù)制到讀緩沖區(qū)(內(nèi)核空間的緩沖區(qū)),然后數(shù)據(jù)從這個(gè)緩沖區(qū)復(fù)制到另一個(gè)與socket輸出相關(guān)的內(nèi)核緩沖區(qū)中;
  2. 第三次數(shù)據(jù)復(fù)制就是DMA把socket關(guān)聯(lián)的緩沖區(qū)中的數(shù)據(jù)復(fù)制到協(xié)議引擎上發(fā)送到網(wǎng)絡(luò)上。

這次改善,我們是通過將內(nèi)核、用戶態(tài)切換的次數(shù)從四次減少到兩次,將數(shù)據(jù)的復(fù)制次數(shù)從四次減少到三次(只有一次用到cpu資源)。但這并沒有達(dá)到我們零復(fù)制的目標(biāo)。如果底層網(wǎng)絡(luò)適配器支持收集操作的話,我們可以進(jìn)一步減少內(nèi)核對數(shù)據(jù)的復(fù)制次數(shù)。在內(nèi)核為2.4或者以上版本的linux系統(tǒng)上,socket緩沖區(qū)描述符將被用來滿足這個(gè)需求。這個(gè)方式不僅減少了內(nèi)核用戶態(tài)間的切換,而且也省去了那次需要cpu參與的復(fù)制過程。從用戶角度來看依舊是調(diào)用transferTo()方法,但是其本質(zhì)發(fā)生了變化:

  1. 調(diào)用transferTo方法后數(shù)據(jù)被DMA從文件復(fù)制到了內(nèi)核的一個(gè)緩沖區(qū)中;
  2. 數(shù)據(jù)不再被復(fù)制到socket關(guān)聯(lián)的緩沖區(qū)中了,僅僅是將一個(gè)描述符(包含了數(shù)據(jù)的位置和長度等信息)追加到socket關(guān)聯(lián)的緩沖區(qū)中。DMA直接將內(nèi)核中的緩沖區(qū)中的數(shù)據(jù)傳輸給協(xié)議引擎,消除了僅剩的一次需要cpu周期的數(shù)據(jù)復(fù)制。

NIO存在的問題

使用NIO != 高性能,當(dāng)連接數(shù)<1000,并發(fā)程度不高或者局域網(wǎng)環(huán)境下NIO并沒有顯著的性能優(yōu)勢。

NIO并沒有完全屏蔽平臺差異,它仍然是基于各個(gè)操作系統(tǒng)的I/O系統(tǒng)實(shí)現(xiàn)的,差異仍然存在。使用NIO做網(wǎng)絡(luò)編程構(gòu)建事件驅(qū)動模型并不容易,陷阱重重。

推薦大家使用成熟的NIO框架,如Netty,MINA等。解決了很多NIO的陷阱,并屏蔽了操作系統(tǒng)的差異,有較好的性能和編程模型。

總結(jié)

最后總結(jié)一下NIO有哪些優(yōu)勢:

  • 事件驅(qū)動模型
  • 避免多線程
  • 單線程處理多任務(wù)
  • 非阻塞I/O,I/O讀寫不再阻塞
  • 基于block的傳輸,通常比基于流的傳輸更高效
  • 更高級的IO函數(shù),Zero Copy
  • I/O多路復(fù)用大大提高了Java網(wǎng)絡(luò)應(yīng)用的可伸縮性和實(shí)用性

參考鏈接:
高性能Server---Reactor模型
Java NIO淺析
Scalable IO in Java
Linux AIO
怎樣理解阻塞非阻塞與同步異步的區(qū)別?
也談BIO | NIO | AIO (Java版)
epoll和select區(qū)別
Epoll在Nio中的實(shí)現(xiàn)
通過zero copy來實(shí)現(xiàn)高效的數(shù)據(jù)傳輸

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容