4. Reactor反應(yīng)器模式

Reactor反應(yīng)器模式是高性能網(wǎng)絡(luò)編程在設(shè)計(jì)和架構(gòu)層面的基礎(chǔ)模式。為什么呢?只有徹底了解反應(yīng)器的原理,才能真正構(gòu)建好高性能的網(wǎng)絡(luò)應(yīng)用,才能輕松地學(xué)習(xí)和掌握Netty框架。同時(shí),反應(yīng)器模式也是BAT級別大公司必不可少的面試題。

4.1 Reactor 反應(yīng)器模式為何如此重要

在詳細(xì)介紹什么是Reactor反應(yīng)器模式之前,首先說明一下它的重要性。

到目前為止,高性能網(wǎng)絡(luò)編程都繞不開反應(yīng)器模式。很多著名的服務(wù)器軟件或者中間件都是基于反應(yīng)器模式實(shí)現(xiàn)的。

比如說,“ 全宇宙最有名的、最高性能”的Web服務(wù)器Nginx,就是基于反應(yīng)器模式的;如雷貫耳的Redis,作為最高性能的緩存服務(wù)器之一, 也是基于反應(yīng)器模式的;目前火得“一塌糊涂”、在開源項(xiàng)目中應(yīng)用極為廣泛的高性能通信中間件Netty,更是基于反應(yīng)器模式的。

從開發(fā)的角度來說,如果要完成和勝任高性能的服務(wù)器開發(fā),反應(yīng)器模式是必須學(xué)會和掌握的。從學(xué)習(xí)的角度來說,反應(yīng)器模式相當(dāng)于高性能、高并發(fā)的一項(xiàng)非常重要的基礎(chǔ)知識,只有掌握了它,才能真正掌握Nginx、Redis、 Netty 等這些大名鼎鼎的中間件技術(shù)。正因?yàn)槿绱?,在大的互?lián)網(wǎng)公司如阿里、騰訊、京東的面試過程中,反應(yīng)器模式相關(guān)的問題是經(jīng)常出現(xiàn)的面試問題。

總之,反應(yīng)器模式是高性能網(wǎng)絡(luò)編程的必知、必會的模式。

4.1.1 為什么首先學(xué)習(xí)Reactor反應(yīng)器模式

本書的目標(biāo),是學(xué)習(xí)基于Netty的開發(fā)高性能通信服務(wù)器。為什么在學(xué)習(xí)Netty之前,首先要學(xué)習(xí)Reactor反應(yīng)器模式呢?

寫多了代碼的程序員都知道,Java程序不是按照順序執(zhí)行的邏輯來組織的。代碼中所用到的設(shè)計(jì)模式,在一定程度上已經(jīng)演變成了代碼的組織方式。越是高水平的Java代碼,抽象的層次越高,到處都是高度抽象和面向接口的調(diào)用,大量用到繼承、多態(tài)的設(shè)計(jì)模式。

在閱讀別人的源代碼時(shí),如果不了解代碼所使用的設(shè)計(jì)模式,往往會暈頭轉(zhuǎn)向,不知身在何處,很難讀懂別人的代碼,對代碼跟蹤很成問題。反過來,如果先了解代碼的設(shè)計(jì)模式,再去看代碼,就會閱讀得很輕松,不會那么難懂了。

當(dāng)然,在寫代碼時(shí),不了解設(shè)計(jì)模式,也很難寫出高水平的Java代碼。

本書的重要使命之一,就是幫助大家學(xué)習(xí)和掌握Netty。Netty本身很抽象,大量應(yīng)用了設(shè)計(jì)模式。學(xué)習(xí)像Netty這樣的“精品中的精品”,肯定也是需要先從設(shè)計(jì)模式入手的。而Netty的整體架構(gòu),就是基于這個著名反應(yīng)器模式。

總之,反應(yīng)器模式非常重要。首先學(xué)習(xí)和掌握反應(yīng)器模式,對于學(xué)習(xí)Netty的人來說,一定是磨刀不誤砍柴工。

4.1.2 Reactor反應(yīng)器模式簡介

什么是Reactor反應(yīng)器模式呢?本文站在巨人的肩膀上,引用一下Doug Lea(那是一位讓人無限景仰的大師,Java中Concurrent并發(fā)包的重要作者之一)在文章《Scalable IO in Java》中對反應(yīng)器模式的定義,具體如下:

反應(yīng)器模式由Reactor反應(yīng)器線程、Handlers處理器兩大角色組成:

  1. Reactor反應(yīng)器線程的職責(zé):負(fù)責(zé)響應(yīng)IO事件,并且分發(fā)到Handlers處理器。
  2. Handlers處理器的職責(zé):非阻塞的執(zhí)行業(yè)務(wù)處理邏輯。

在這里,為了方便大家學(xué)習(xí),將Doug Lea著名的文章《Scalable IO in Java》的鏈接地址貼出來:http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf,建議大家去閱讀一下,提升自己的基礎(chǔ)知識,開闊下眼界。

從上面的反應(yīng)器模式定義,看不出這種模式有什么神奇的地方。當(dāng)然,從簡單到復(fù)雜,反應(yīng)器模式也有很多版本。根據(jù)前面的定義,僅僅是最為簡單的一個版本。

如果需要徹底了解反應(yīng)器模式,還得從最原始的OIO編程開始講起。

4.1.3 多線程OIO的致命缺陷

在Java的OIO編程中,最初和最原始的網(wǎng)絡(luò)服務(wù)器程序,是用一個while循環(huán),不斷地監(jiān)聽端口是否有新的連接。如果有,那么就調(diào)用一個處理函數(shù)來處理,示例代碼如下:

while(true){
    socket = accept(); //阻塞,接收連接
    handle(socket) ;   //讀取數(shù)據(jù)、業(yè)務(wù)處理、寫入結(jié)果
}

這種方法的最大問題是:如果前一個網(wǎng)絡(luò)連接的handle(socket)沒有處理完,那么后面的連接請求沒法被接收,于是后面的請求通通會被阻塞住,服務(wù)器的吞吐量就太低了。對于服務(wù)器來說,這是一個嚴(yán)重的問題。

為了解決這個嚴(yán)重的連接阻塞問題,出現(xiàn)了一個極為經(jīng)典模式:Connection Per Thread(一個線程處理一個連接)模式。示例代碼如下:

//...省略: 導(dǎo)入的Java類
class ConnectionPerThread implements Runnable {
    public void run() {
        try {
            //服務(wù)器監(jiān)聽socket
            ServerSocketserverSocket =
                    new ServerSocket(NioDemoConfig.SOCKET_SERVER_PORT);
            while (!Thread.interrupted()) {
                Socket socket = serverSocket.accept();
                //接收一個連接后,為socket連接,新建一個專屬的處理器對象
                Handler handler = new Handler(socket);
                //創(chuàng)建新線程,專門負(fù)責(zé)一個連接的處理
                new Thread(handler).start();
            }
        } catch (IOException ex) { /* 處理異常 */ }
    }
    //處理器對象
    static class Handler implements Runnable {
        final Socket socket;
        Handler(Socket s) {
            socket = s;
        }
        public void run() {
            while (true) {
                try {
                    byte[] input = new byte[NioDemoConfig.SERVER_BUFFER_SIZE];
                    /* 讀取數(shù)據(jù) */
                    socket.getInputStream().read(input);
                    /* 處理業(yè)務(wù)邏輯,獲取處理結(jié)果*/
                    byte[] output =null;
                    /* 寫入結(jié)果 */
                    socket.getOutputStream().write(output);
                } catch (IOException ex) { /*處理異常*/ }
            }
        }
    }
}

對于每一個新的網(wǎng)絡(luò)連接都分配給一個線程。每個線程都獨(dú)自處理自己負(fù)責(zé)的輸入和輸出。當(dāng)然,服務(wù)器的監(jiān)聽線程也是獨(dú)立的,任何的socket連接的輸入和輸出處理,不會阻塞到后面新socket連接的監(jiān)聽和建立。早期版本的Tomcat服務(wù)器,就是這樣實(shí)現(xiàn)的。

Connection Per Thread模式(一個線程處理一個連接)的優(yōu)點(diǎn)是:解決了前面的新連接被嚴(yán)重阻塞的問題,在一定程度上,極大地提高了服務(wù)器的吞吐量。

這里有個問題:如果一個線程同時(shí)負(fù)責(zé)處理多個socket連接的輸入和輸入,行不行呢?

看上去,沒有什么不可以。但是,實(shí)際上沒有用。為什么?傳統(tǒng)OIO編程中每一個socket的IO讀寫操作,都是阻塞的。在同一時(shí)刻,一個線程里只能處理一個socket,前一個socket被阻塞了,后面連接的IO操作是無法被并發(fā)執(zhí)行的。所以,不論怎么處理,OIO中一個線程也只能是處理一個連接的IO操作。

Connection Per Thread模式的缺點(diǎn)是:對應(yīng)于大量的連接,需要耗費(fèi)大量的線程資源,對線程資源要求太高。在系統(tǒng)中,線程是比較昂貴的系統(tǒng)資源。如果線程數(shù)太多,系統(tǒng)無法承受。而且,線程的反復(fù)創(chuàng)建、銷毀、線程的切換也需要代價(jià)。因此,在高并發(fā)的應(yīng)用場景下,多線程OIO的缺陷是致命的。

如何解決Connection Per Thread模式的巨大缺陷呢?一個有效路徑是:使用Reactor反應(yīng)器模式。用反應(yīng)器模式對線程的數(shù)量進(jìn)行控制,做到一個線程處理大量的連接。它是如何做到呢?首先來看簡單的版本——單線程的Reactor反應(yīng)器模式。

4.2 單線程Reactor反應(yīng)器模式

總體來說,Reactor反應(yīng)器模式有點(diǎn)兒類似事件驅(qū)動模式。

在事件驅(qū)動模式中,當(dāng)有事件觸發(fā)時(shí),事件源會將事件dispatch分發(fā)到handler處理器進(jìn)行事件處理。反應(yīng)器模式中的反應(yīng)器角色,類似于事件驅(qū)動模式中的dispatcher事件分發(fā)器角色。

前面已經(jīng)提到,在反應(yīng)器模式中,有Reactor反應(yīng)器和Handler處理器兩個重要的組件:

  1. Reactor反應(yīng)器:負(fù)責(zé)查詢IO事件,當(dāng)檢測到一個IO事件,將其發(fā)送給相應(yīng)的Handler處理器去處理。這里的IO事件,就是NIO中選擇器監(jiān)控的通道IO事件。
  2. Handler處理器:與IO事件(或者選擇鍵)綁定,負(fù)責(zé)IO事件的處理。完成真正的連接建立、通道的讀取、處理業(yè)務(wù)邏輯、負(fù)責(zé)將結(jié)果寫出到通道等。

4.2.1 什么是單線程Reactor反應(yīng)器

什么是單線程版本的Reactor反應(yīng)器模式呢?簡單地說,Reactor反應(yīng)器和Handers處理器處于一個線程中執(zhí)行。它是最簡單的反應(yīng)器模型,如圖4-1所示。

圖4-1 單線程Reactor反應(yīng)器模式

基于Java NIO,如何實(shí)現(xiàn)簡單的單線程版本的反應(yīng)器模式呢?需要用到SelectionKey選擇鍵的幾個重要的成員方法:

方法一:void attach(Object o)

此方法可以將任何的Java POJO對象,作為附件添加到SelectionKey實(shí)例,相當(dāng)于附件屬性的setter方法。這方法非常重要,因?yàn)樵趩尉€程版本的反應(yīng)器模式中,需要將Handler處理器實(shí)例,作為附件添加到SelectionKey實(shí)例。

方法二:Object attachment()

此方法的作用是取出之前通過attach(Object o)添加到SelectionKey選擇鍵實(shí)例的附件,相當(dāng)于附件屬性的getter方法,與attach(Object o)配套使用。

這個方法同樣非常重要,當(dāng)IO事件發(fā)生,選擇鍵被select方法選到,可以直接將事件的附件取出,也就是之前綁定的Handler處理器實(shí)例,通過該Handler,完成相應(yīng)的處理。

總之,在反應(yīng)器模式中,需要進(jìn)行attach和attachment結(jié)合使用:在選擇鍵注冊完成之后,調(diào)用attach方法,將Handler處理器綁定到選擇鍵;當(dāng)事件發(fā)生時(shí),調(diào)用attachment方法,可以從選擇鍵取出Handler處理器,將事件分發(fā)到Handler處理器中,完成業(yè)務(wù)處理。

4.2.2 單線程Reactor反應(yīng)器的參考代碼

Doug Lea在《Scalable IO in Java》中,實(shí)現(xiàn)了一個單線程Reactor反應(yīng)器模式的參考代碼。這里,我們站在巨人的肩膀上,借鑒Doug Lea的實(shí)現(xiàn),對其進(jìn)行介紹。為了方便說明,對Doug Lea的參考代碼進(jìn)行一些適當(dāng)?shù)男薷?。具體的參考代碼如下:

//...
class Reactor implements Runnable {
    Selector selector;
    ServerSocketChannelserverSocket;
    EchoServerReactor() throws IOException {
        //....省略:打開選擇器、serverSocket連接監(jiān)聽通道
        //注冊serverSocket的accept事件
        SelectionKeysk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
        //將新連接處理器作為附件,綁定到sk選擇鍵
        sk.attach(new AcceptorHandler());
    }
    public void run() {
        //選擇器輪詢
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set selected = selector.selectedKeys();
                Iterator it = selected.iterator();
                while (it.hasNext()) {
                //反應(yīng)器負(fù)責(zé)dispatch收到的事件
                    SelectionKeysk=it.next();
                    dispatch(sk);
                }
                selected.clear();
            }
        } catch (IOException ex) { ex.printStackTrace(); }
    }
    //反應(yīng)器的分發(fā)方法
    void dispatch(SelectionKey k) {
        Runnable handler = (Runnable) (k.attachment());
        //調(diào)用之前綁定到選擇鍵的handler處理器對象
        if (handler != null) {
            handler.run();
        }
    }
    // 新連接處理器
    class AcceptorHandler implements Runnable {
        public void run() {
            //接受新連接
            //需要為新連接,創(chuàng)建一個輸入輸出的handler處理器
        }
    }
    //….
}

在上面的代碼中,設(shè)計(jì)了一個Handler處理器,叫作AcceptorHandler處理器,它是一個內(nèi)部類。在注冊serverSocket服務(wù)監(jiān)聽連接的接受事件之后,創(chuàng)建一個AcceptorHandler新連接處理器的實(shí)例,作為附件,被設(shè)置(attach)到了SelectionKey中。

//注冊serverSocket的接受(accept)事件
SelectionKeysk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
//將新連接處理器作為附件,綁定到sk選擇鍵
sk.attach(new AcceptorHandler());

當(dāng)新連接事件發(fā)生后,取出了之前attach到SelectionKey中的Handler業(yè)務(wù)處理器,進(jìn)行socket的各種IO處理

void dispatch(SelectionKey k) {
        Runnable r = (Runnable) (k.attachment());
        //調(diào)用之前綁定到選擇鍵的處理器對象
        if (r != null) {
           r.run();
        }
    }

AcceptorHandler處理器的兩大職責(zé):一是接受新連接,二是在為新連接創(chuàng)建一個輸入輸出的Handler處理器,稱之為IOHandler。

// 新連接處理器
    class AcceptorHandler implements Runnable {
        public void run() {
            // 接受新連接
            // 需要為新連接創(chuàng)建一個輸入輸出的handler處理器
        }
    }

IOHandler,顧名思義,就是負(fù)責(zé)socket的數(shù)據(jù)輸入、業(yè)務(wù)處理、結(jié)果輸出。示例代碼如下:

//...
class IOHandler implements Runnable {
    final SocketChannel channel;
    final SelectionKeysk;
    IOHandler (Selector selector, SocketChannel c) throws IOException {
        channel = c;
        c.configureBlocking(false);
        //僅僅取得選擇鍵,稍候設(shè)置感興趣的IO事件
        sk = channel.register(selector, 0);
        //將Handler處理器作為選擇鍵的附件
        sk.attach(this);
        //注冊讀寫就緒事件
        sk.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);
    }
    public void run()  {
    //...處理輸入和輸出
   }
}

在IOHandler的構(gòu)造器中,有兩點(diǎn)比較重要:

  1. 將新的SocketChannel傳輸通道,注冊到了反應(yīng)器Reactor類的同一個選擇器中。這樣保證了Reactor類和Handler類在同一個線程中執(zhí)行。
  2. Channel傳輸通道注冊完成后,將IOHandler自身作為附件,attach到了選擇鍵中。這樣,在Reactor類分發(fā)事件(選擇鍵)時(shí),能執(zhí)行到IOHandler的run方法。

如果上面的示例代碼比較繞口,不要緊。為了徹底地理解個中妙處,自己動手開發(fā)一個可以執(zhí)行的實(shí)例。下面基于反應(yīng)器模式,實(shí)現(xiàn)了一個EchoServer回顯服務(wù)器實(shí)例。仔細(xì)閱讀和運(yùn)行這個實(shí)例,就可以明白上面這段繞口的程序代碼的真正含義了。

4.2.3 一個Reactor反應(yīng)器版本的EchoServer實(shí)踐案例

EchoServer回顯服務(wù)器的功能很簡單:讀取客戶端的輸入,回顯到客戶端,所以也叫回顯服務(wù)器?;赗eactor反應(yīng)器模式來實(shí)現(xiàn),設(shè)計(jì)3個重要的類:

  1. 設(shè)計(jì)一個反應(yīng)器類:EchoServerReactor類。
  2. 設(shè)計(jì)兩個處理器類:AcceptorHandler新連接處理器、EchoHandler回顯處理器。

反應(yīng)器類EchoServerReactor的實(shí)現(xiàn)思路和前面的示例代碼基本上相同,具體如下:

//.....
//反應(yīng)器
class EchoServerReactor implements Runnable {
    Selector selector;
    ServerSocketChannel serverSocket;
    EchoServerReactor() throws IOException {
         //...獲取選擇器、開啟serverSocket服務(wù)監(jiān)聽通道
            //...綁定AcceptorHandler新連接處理器到selectKey
    }
    //輪詢和分發(fā)事件
    public void run() {
        try {
            while (!Thread.interrupted()) {
                selector.select();
                Set<SelectionKey> selected = selector.selectedKeys();
                Iterator<SelectionKey> it = selected.iterator();
                while (it.hasNext()) {
                    //反應(yīng)器負(fù)責(zé)dispatch收到的事件
                    SelectionKey sk = it.next();
                    dispatch(sk);
                }
                selected.clear();
            }
        } catch (IOException ex) {
            ex.printStackTrace();
          }
    }
    void dispatch(SelectionKeysk) {
        Runnable handler = (Runnable) sk.attachment();
        //調(diào)用之前attach綁定到選擇鍵的handler處理器對象
        if (handler != null) {
            handler.run();
        }
    }
    // Handler:新連接處理器
    class AcceptorHandler implements Runnable {
        public void run() {
            try {
                SocketChannel channel = serverSocket.accept();
                if (channel != null)
                    new EchoHandler(selector, channel);
            } catch (IOException e) {
                e.printStackTrace();
             }
        }
    }
    public static void main(String[] args) throws IOException {
        new Thread(new EchoServerReactor()).start();
    }
}

EchoHandler回顯處理器,主要是完成客戶端的內(nèi)容讀取和回顯,具體如下:

//...
class EchoHandler implements Runnable {
    final SocketChannel channel;
    final SelectionKeysk;
    final ByteBufferbyteBuffer = ByteBuffer.allocate(1024);
    static final int RECIEVING = 0, SENDING = 1;
    int state = RECIEVING;
    EchoHandler(Selector selector, SocketChannel c) throws IOException {
        channel = c;
        c.configureBlocking(false);
        //取得選擇鍵,再設(shè)置感興趣的IO事件
        sk = channel.register(selector, 0);
        //將Handler自身作為選擇鍵的附件
        sk.attach(this);
        //注冊Read就緒事件
        sk.interestOps(SelectionKey.OP_READ);
        selector.wakeup();
    }
    public void run() {
        try {
            if (state == SENDING) {
                //寫入通道
                channel.write(byteBuffer);
                //寫完后,準(zhǔn)備開始從通道讀,byteBuffer切換成寫入模式
                byteBuffer.clear();
                //寫完后,注冊read就緒事件
                sk.interestOps(SelectionKey.OP_READ);
                //寫完后,進(jìn)入接收的狀態(tài)
                state = RECIEVING;
            } else if (state == RECIEVING) {
                //從通道讀
                int length = 0;
                while ((length = channel.read(byteBuffer)) &gt; 0) {
                    Logger.info(new String(byteBuffer.array(), 0, length));
                }
                //讀完后,準(zhǔn)備開始寫入通道,byteBuffer切換成讀取模式
                byteBuffer.flip();
                //讀完后,注冊write就緒事件
                sk.interestOps(SelectionKey.OP_WRITE);
                //讀完后,進(jìn)入發(fā)送的狀態(tài)
                state = SENDING;
            }
            //處理結(jié)束了, 這里不能關(guān)閉select key,需要重復(fù)使用
            //sk.cancel();
        } catch (IOException ex) {
            ex.printStackTrace();
          }
    }
}

以上兩個類,是一個基于反應(yīng)器模式的EchoServer回顯服務(wù)器的完整實(shí)現(xiàn)。它是一個單線程版本的反應(yīng)器模式,Reactor反應(yīng)器和所有的Handler處理器,都執(zhí)行在同一條線程中。

運(yùn)行EchoServerReactor類中的main方法,可以啟動回顯服務(wù)器。如果要看到回顯輸出,還需要啟動客戶端??蛻舳说拇a,在同一個包下,類名為EchoClient,負(fù)責(zé)數(shù)據(jù)的發(fā)送。代碼如下:

public class EchoClient {

    public void start() throws IOException {

        InetSocketAddress address =
                new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,
                        NioDemoConfig.SOCKET_SERVER_PORT);

        // 1、獲取通道(channel)
        SocketChannel socketChannel = SocketChannel.open(address);
        // 2、切換成非阻塞模式
        socketChannel.configureBlocking(false);
        //不斷的自旋、等待連接完成,或者做一些其他的事情
        while (!socketChannel.finishConnect()) {

        }
        Print.tcfo("客戶端啟動成功!");

        //啟動接受線程
        Processer processer = new Processer(socketChannel);
        new Thread(processer).start();

    }

    static class Processer implements Runnable {
        final Selector selector;
        final SocketChannel channel;

        Processer(SocketChannel channel) throws IOException {
            //Reactor初始化
            selector = Selector.open();
            this.channel = channel;
            channel.register(selector,
                    SelectionKey.OP_READ | SelectionKey.OP_WRITE);
        }

        public void run() {
            try {
                while (!Thread.interrupted()) {
                    selector.select();
                    Set<SelectionKey> selected = selector.selectedKeys();
                    Iterator<SelectionKey> it = selected.iterator();
                    while (it.hasNext()) {
                        SelectionKey sk = it.next();
                        if (sk.isWritable()) {
                            ByteBuffer buffer = ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE);

                            Scanner scanner = new Scanner(System.in);
                            Print.tcfo("請輸入發(fā)送內(nèi)容:");
                            if (scanner.hasNext()) {
                                SocketChannel socketChannel = (SocketChannel) sk.channel();
                                String next = scanner.next();
                                buffer.put((Dateutil.getNow() + " >>" + next).getBytes());
                                buffer.flip();
                                // 操作三:通過DatagramChannel數(shù)據(jù)報(bào)通道發(fā)送數(shù)據(jù)
                                socketChannel.write(buffer);
                                buffer.clear();
                            }

                        }
                        if (sk.isReadable()) {
                            // 若選擇鍵的IO事件是“可讀”事件,讀取數(shù)據(jù)
                            SocketChannel socketChannel = (SocketChannel) sk.channel();

                            //讀取數(shù)據(jù)
                            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                            int length = 0;
                            while ((length = socketChannel.read(byteBuffer)) > 0) {
                                byteBuffer.flip();
                                Logger.info("server echo:" + new String(byteBuffer.array(), 0, length));
                                byteBuffer.clear();
                            }

                        }
                        //處理結(jié)束了, 這里不能關(guān)閉select key,需要重復(fù)使用
                        //selectionKey.cancel();
                    }
                    selected.clear();
                }
            } catch (IOException ex) {
                ex.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws IOException {
        new EchoClient().start();
    }
}

代碼測試,client類運(yùn)行結(jié)果如下:

[main|EchoClient.start]:客戶端啟動成功!
[Thread-0|EchoClient$Processer.run]:請輸入發(fā)送內(nèi)容:
hello

server類運(yùn)行結(jié)果如下:

[Thread-0|EchoHandler.run] |>  2019-11-19 02:39:51 >>hello 

4.2.4 單線程Reactor反應(yīng)器模式的缺點(diǎn)

單線程Reactor反應(yīng)器模式,是基于Java的NIO實(shí)現(xiàn)的。相對于傳統(tǒng)的多線程OIO,反應(yīng)器模式不再需要啟動成千上萬條線程,效率自然是大大提升了。

在單線程反應(yīng)器模式中,Reactor反應(yīng)器和Handler處理器,都執(zhí)行在同一條線程上。這樣,帶來了一個問題:當(dāng)其中某個Handler阻塞時(shí),會導(dǎo)致其他所有的Handler都得不到執(zhí)行。在這種場景下,如果被阻塞的Handler不僅僅負(fù)責(zé)輸入和輸出處理的業(yè)務(wù),還包括負(fù)責(zé)連接監(jiān)聽的AcceptorHandler處理器。這個是非常嚴(yán)重的問題。

為什么?一旦AcceptorHandler處理器阻塞,會導(dǎo)致整個服務(wù)不能接收新的連接,使得服務(wù)器變得不可用。因?yàn)檫@個缺陷,因此單線程反應(yīng)器模型用得比較少。

另外,目前的服務(wù)器都是多核的,單線程反應(yīng)器模式模型不能充分利用多核資源??傊诟咝阅芊?wù)器應(yīng)用場景中,單線程反應(yīng)器模式實(shí)際使用的很少。

4.3 多線程的Reactor反應(yīng)器模式

既然Reactor反應(yīng)器和Handler處理器,擠在一個線程會造成非常嚴(yán)重的性能缺陷。那么,可以使用多線程,對基礎(chǔ)的反應(yīng)器模式進(jìn)行改造和演進(jìn)。

4.3.1 多線程池Reactor反應(yīng)器演進(jìn)

多線程池Reactor反應(yīng)器的演進(jìn),分為兩個方面:

  1. 首先是升級Handler處理器。既要使用多線程,又要盡可能的高效率,則可以考慮使用線程池。
  2. 其次是升級Reactor反應(yīng)器??梢钥紤]引入多個Selector選擇器,提升選擇大量通道的能力。

總體來說,多線程池反應(yīng)器的模式,大致如下:

  1. 將負(fù)責(zé)輸入輸出處理的IOHandler處理器的執(zhí)行,放入獨(dú)立的線程池中。這樣,業(yè)務(wù)處理線程與負(fù)責(zé)服務(wù)監(jiān)聽和IO事件查詢的反應(yīng)器線程相隔離,避免服務(wù)器的連接監(jiān)聽受到阻塞。
  2. 如果服務(wù)器為多核的CPU,可以將反應(yīng)器線程拆分為多個子反應(yīng)器(SubReactor)線程;同時(shí),引入多個選擇器,每一個SubReactor子線程負(fù)責(zé)一個選擇器。這樣,充分釋放了系統(tǒng)資源的能力;也提高了反應(yīng)器管理大量連接,提升選擇大量通道的能力。

4.3.2 多線程Reactor反應(yīng)器的實(shí)踐案例

在前面的“回顯服務(wù)器”(EchoServer)的基礎(chǔ)上,完成多線程Reactor反應(yīng)器的升級。多線程反應(yīng)器的實(shí)踐案例設(shè)計(jì)如下:

  1. 引入多個選擇器。
  2. 設(shè)計(jì)一個新的子反應(yīng)器(SubReactor)類,一個子反應(yīng)器負(fù)責(zé)查詢一個選擇器。
  3. 開啟多個反應(yīng)器的處理線程,一個線程負(fù)責(zé)執(zhí)行一個子反應(yīng)器(SubReactor)。

為了提升效率,建議SubReactor的數(shù)量和選擇器的數(shù)量一致。避免多個線程負(fù)責(zé)一個選擇器,導(dǎo)致需要進(jìn)行線程同步,引起的效率降低。這個實(shí)踐案例的代碼如下:

//....反應(yīng)器
class MultiThreadEchoServerReactor {
    ServerSocketChannelserverSocket;
    AtomicInteger next = new AtomicInteger(0);
    //選擇器集合,引入多個選擇器
    Selector[] selectors = new Selector[2];
    //引入多個子反應(yīng)器
    SubReactor[] subReactors = null;
    MultiThreadEchoServerReactor() throws IOException {
        //初始化多個選擇器
        selectors[0] = Selector.open();
        selectors[1] = Selector.open();
        serverSocket = ServerSocketChannel.open();
        InetSocketAddress address =
                new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,
        NioDemoConfig.SOCKET_SERVER_PORT);
        serverSocket.socket().bind(address);
        //非阻塞
        serverSocket.configureBlocking(false);
        //第一個選擇器,負(fù)責(zé)監(jiān)控新連接事件
        SelectionKeysk =
                serverSocket.register(selectors[0], SelectionKey.OP_ACCEPT);
        //綁定Handler:attach新連接監(jiān)控handler處理器到SelectionKey(選擇鍵)
        sk.attach(new AcceptorHandler());
        //第一個子反應(yīng)器,一子反應(yīng)器負(fù)責(zé)一個選擇器
        SubReactor subReactor1 = new SubReactor(selectors[0]);
        //第二個子反應(yīng)器,一子反應(yīng)器負(fù)責(zé)一個選擇器
        SubReactor subReactor2 = new SubReactor(selectors[1]);
        subReactors = new SubReactor[]{subReactor1, subReactor2};
    }
    private void startService() {
        // 一子反應(yīng)器對應(yīng)一個線程
        new Thread(subReactors[0]).start();
        new Thread(subReactors[1]).start();
    }
    //子反應(yīng)器
    class SubReactor implements Runnable {
        //每個線程負(fù)責(zé)一個選擇器的查詢和選擇
        final Selector selector;
        public SubReactor(Selector selector) {
            this.selector = selector;
        }
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    selector.select();
                    Set<SelectionKey> keySet = selector.selectedKeys();
                    Iterator<SelectionKey> it = keySet.iterator();
                    while (it.hasNext()) {
                        //反應(yīng)器負(fù)責(zé)dispatch收到的事件
                        SelectionKeysk = it.next();
                        dispatch(sk);
                    }
                keySet.clear();
                }
            } catch (IOException ex) {
                ex.printStackTrace();
              }
        }
        void dispatch(SelectionKeysk) {
            Runnable handler = (Runnable) sk.attachment();
            //調(diào)用之前attach綁定到選擇鍵的handler處理器對象
            if (handler != null) {
                handler.run();
            }
        }
    }
    // Handler:新連接處理器
    class AcceptorHandler implements Runnable {
        public void run() {
            try {
                SocketChannel channel = serverSocket.accept();
                if (channel != null)
                    new MultiThreadEchoHandler(selectors[next.get()], channel);
            } catch (IOException e) {
                e.printStackTrace();
              }
            if (next.incrementAndGet() == selectors.length) {
                next.set(0);
            }
        }
    }
    public static void main(String[] args) throws IOException {
        MultiThreadEchoServerReactor server =
                   new MultiThreadEchoServerReactor();
        server.startService();
    }
}

上面是反應(yīng)器的演進(jìn)代碼,再來看看Handler處理器的多線程演進(jìn)實(shí)踐。

4.3.3 多線程Handler處理器的實(shí)踐案例

基于前面的單線程反應(yīng)器的EchoHandler回顯處理器的程序代碼,予以改進(jìn),新的回顯處理器為:MultiThreadEchoHandler。主要的升級是引入了一個線程池(ThreadPool),業(yè)務(wù)處理的代碼執(zhí)行在自己的線程池中,徹底地做到業(yè)務(wù)處理線程和反應(yīng)器IO事件線程的完全隔離。這個實(shí)踐案例的代碼如下:

//...
class MultiThreadEchoHandler implements Runnable {
    final SocketChannel channel;
    final SelectionKeysk;
    final ByteBufferbyteBuffer = ByteBuffer.allocate(1024);
    static final int RECIEVING = 0, SENDING = 1;
    int state = RECIEVING;
    //引入線程池
    static ExecutorService pool = Executors.newFixedThreadPool(4);
    MultiThreadEchoHandler(Selector selector, SocketChannel c) throws IOException {
        channel = c;
        c.configureBlocking(false);
        //取得選擇鍵,、再設(shè)置感興趣的IO事件
        sk = channel.register(selector, 0);
        //將本Handler作為sk選擇鍵的附件,方便事件分發(fā)(dispatch)
        sk.attach(this);
        //向sk選擇鍵注冊Read就緒事件
        sk.interestOps(SelectionKey.OP_READ);
        selector.wakeup();
    }
    public void run() {
        //異步任務(wù),在獨(dú)立的線程池中執(zhí)行
        pool.execute(new AsyncTask());
    }
    //業(yè)務(wù)處理,不在反應(yīng)器線程中執(zhí)行
    public synchronized void asyncRun() {
        try {
            if (state == SENDING) {
                //寫入通道
                channel.write(byteBuffer);
                //寫完后,準(zhǔn)備開始從通道讀,byteBuffer切換成寫入模式
                byteBuffer.clear();
                //寫完后,注冊read就緒事件
                sk.interestOps(SelectionKey.OP_READ);
                //寫完后,進(jìn)入接收的狀態(tài)
                state = RECIEVING;
            } else if (state == RECIEVING) {
                //從通道讀
                int length = 0;
                while ((length = channel.read(byteBuffer)) &gt; 0) {
                    Logger.info(new String(byteBuffer.array(), 0, length));
                }
                //讀完后,準(zhǔn)備開始寫入通道,byteBuffer切換成讀取模式
                byteBuffer.flip();
                //讀完后,注冊write就緒事件
                sk.interestOps(SelectionKey.OP_WRITE);
                //讀完后,進(jìn)入發(fā)送的狀態(tài)
                state = SENDING;
            }
            //處理結(jié)束了, 這里不能關(guān)閉select key,需要重復(fù)使用
            //sk.cancel();
        } catch (IOException ex) {
            ex.printStackTrace();
          }
    }
    //異步任務(wù)的內(nèi)部類
    class AsyncTask implements Runnable {
        public void run() {
            MultiThreadEchoHandler.this.asyncRun();
        }
    }
}

代碼中設(shè)計(jì)了一個內(nèi)部類AsyncTask,是一個簡單的異步任務(wù)的提交類。它使得異步業(yè)務(wù)asyncRun方法,可以獨(dú)立地提交到線程池中。另外,既然業(yè)務(wù)處理異步執(zhí)行,需要在asyncRun方法的前面加上synchronized同步修飾符。

至此,多線程版本的反應(yīng)器模式,實(shí)踐案例的代碼就演示完了。執(zhí)行新版本的多線程MultiThreadEchoServerReactor服務(wù)器,可以使用之前的EchoClient客戶端與之配置,完成整個回顯(echo)的通信演示。

演示的輸出和之前單線程版本的EchoServer回顯服務(wù)器示例,是一模一樣的。

客戶端演示結(jié)果:

[main|EchoClient.start]:客戶端啟動成功!
[Thread-0|EchoClient$Processer.run]:請輸入發(fā)送內(nèi)容:
Multi helloworld

服務(wù)器端演示結(jié)果:

[pool-1-thread-1|MultiThreadEchoHandler.asyncRun] |>  2019-11-19 03:13:28 >>Multi 

4.4 Reactor反應(yīng)器模式小結(jié)

在總結(jié)反應(yīng)器模式前,首先看看和其他模式的對比,加強(qiáng)一下對它的理解。

  1. 反應(yīng)器模式和生產(chǎn)者消費(fèi)者模式對比

相似之處:在一定程度上,反應(yīng)器模式有點(diǎn)類似生產(chǎn)者消費(fèi)者模式。在生產(chǎn)者消費(fèi)者模式中,一個或多個生產(chǎn)者將事件加入到一個隊(duì)列中,一個或多個消費(fèi)者主動地從這個隊(duì)列中提?。≒ull)事件來處理。

不同之處在于:反應(yīng)器模式是基于查詢的,沒有專門的隊(duì)列去緩沖存儲IO事件,查詢到IO事件之后,反應(yīng)器會根據(jù)不同IO選擇鍵(事件)將其分發(fā)給對應(yīng)的Handler處理器來處理。

  1. 反應(yīng)器模式和觀察者模式(Observer Pattern)對比

相似之處在于:在反應(yīng)器模式中,當(dāng)查詢到IO事件后,服務(wù)處理程序使用單路/多路分發(fā)(Dispatch)策略,同步地分發(fā)這些IO事件。觀察者模式(Observer Pattern)也被稱作發(fā)布/訂閱模式,它定義了一種依賴關(guān)系,讓多個觀察者同時(shí)監(jiān)聽某一個主題(Topic)。這個主題對象在狀態(tài)發(fā)生變化時(shí),會通知所有觀察者,它們能夠執(zhí)行相應(yīng)的處理。

不同之處在于:在反應(yīng)器模式中,Handler處理器實(shí)例和IO事件(選擇鍵)的訂閱關(guān)系,基本上是一個事件綁定到一個Handler處理器;每一個IO事件(選擇鍵)被查詢后,反應(yīng)器會將事件分發(fā)給所綁定的Handler處理器;而在觀察者模式中,同一個時(shí)刻,同一個主題可以被訂閱過的多個觀察者處理。

最后,總結(jié)一下反應(yīng)器模式的優(yōu)點(diǎn)和缺點(diǎn)。作為高性能的IO模式,反應(yīng)器模式的優(yōu)點(diǎn)如下:

  • 響應(yīng)快,雖然同一反應(yīng)器線程本身是同步的,但不會被單個連接的同步IO所阻塞;

  • 編程相對簡單,最大程度避免了復(fù)雜的多線程同步,也避免了多線程的各個進(jìn)程之間切換的開銷;

  • 可擴(kuò)展,可以方便地通過增加反應(yīng)器線程的個數(shù)來充分利用CPU資源。

反應(yīng)器模式的缺點(diǎn)如下:

  • 反應(yīng)器模式增加了一定的復(fù)雜性,因而有一定的門檻,并且不易于調(diào)試。

  • 反應(yīng)器模式需要操作系統(tǒng)底層的IO多路復(fù)用的支持,如Linux中的epoll。如果操作系統(tǒng)的底層不支持IO多路復(fù)用,反應(yīng)器模式不會有那么高效。

  • 同一個Handler業(yè)務(wù)線程中,如果出現(xiàn)一個長時(shí)間的數(shù)據(jù)讀寫,會影響這個反應(yīng)器中其他通道的IO處理。例如在大文件傳輸時(shí),IO操作就會影響其他客戶端(Client)的響應(yīng)時(shí)間。因而對于這種操作,還需要進(jìn)一步對反應(yīng)器模式進(jìn)行改進(jìn)。

4.5 本章小結(jié)

反應(yīng)器(Reactor)模式是高性能網(wǎng)絡(luò)編程在設(shè)計(jì)和架構(gòu)層面的基礎(chǔ)模式。同時(shí),反應(yīng)器模式,也是BAT級別大公司必不可少的面試題。

本章首先從最簡單的Connection Per Thread(一個線程處理一個連接)模式入手,介紹了該模式的嚴(yán)重缺陷,從而引出來了單線程的反應(yīng)器模式。

為了充分利用系統(tǒng)資源,最大限度地減少阻塞,在單線程的反應(yīng)器模式的基礎(chǔ)上,又演進(jìn)出來了多線程的反應(yīng)器模式實(shí)現(xiàn)。

本章的反應(yīng)器模式的實(shí)現(xiàn),僅僅是拋磚引玉,在充分利用系統(tǒng)資源、最大限度地減少阻塞兩個維度,都有很大的提升空間,建議大家自行嘗試。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容