「高并發(fā)通信框架Netty4 源碼解讀(八)」NIO應(yīng)用——聊天案例及Reactor線程模式

前面對NIO原理進(jìn)行了大篇幅的分析,最后我們舉幾個(gè)案例,教大家如何更好的使用NIO。

基于NIO編寫的聊天DEMO

服務(wù)端

package NIO.luban.chat;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;

//聊天室服務(wù)端
public class ChatServer {

    private Selector selector;
    private ServerSocketChannel serverSocketChannel;
    private long timeout=2000;

    public ChatServer(){
        try {
            //服務(wù)端channel
            serverSocketChannel=ServerSocketChannel.open();
            //選擇器對象,底層就是IO多路復(fù)用
            selector=Selector.open();
            //綁定端口
            serverSocketChannel.bind(new InetSocketAddress(9090));
            //設(shè)置非阻塞式
            serverSocketChannel.configureBlocking(false);
            //注冊"監(jiān)聽連接"給Selector
            SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("服務(wù)端準(zhǔn)備就緒");
            start();
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    public void start() throws Exception{
        int count=0;
        long start=System.nanoTime();

        while (true){
            //等待感興趣的事件,沒有事件就會阻塞2秒鐘,2秒鐘沒有感興趣事件發(fā)生,程序繼續(xù)往下執(zhí)行
            selector.select(timeout);
//            System.out.println("2秒了");
            long end=System.nanoTime();
            if(end-start>= TimeUnit.MILLISECONDS.toNanos(timeout)){
                count=1;
            }else{
                count++;//記錄空輪詢的次數(shù)
            }
            //空輪詢次數(shù)太多的話,重新建立連接
            if(count>=10){
                System.out.println("有可能發(fā)生空輪詢"+count+"次");
                rebuildSelector();
                count=0;
                selector.selectNow();
                continue;
            }
            //得到所有就緒的SelectionKey對象
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();
            //遍歷就緒事件,并判斷就緒的事件類型
            while (iterator.hasNext()){
                SelectionKey selectionKey=iterator.next();
                //連接事件
                if(selectionKey.isAcceptable()){
                    //獲取網(wǎng)絡(luò)通道,有客戶端來鏈接啦
                    SocketChannel accept = serverSocketChannel.accept();
                    //設(shè)置非阻塞式
                    accept.configureBlocking(false);
                    //連接上了  注冊讀取事件
                    accept.register(selector,SelectionKey.OP_READ);
                    System.out.println(accept.getRemoteAddress().toString()+"上線了");
                }
                //讀事件
                if(selectionKey.isReadable()){     //讀取客戶端數(shù)據(jù)事件
                    //讀取客戶端發(fā)來的數(shù)據(jù)
                    readClientData(selectionKey);
                }
                //手動從當(dāng)前集合將本次運(yùn)行完的對象刪除,事件處理完了就要刪除
                iterator.remove();
            }
        }
    }

    //重新建立鏈接
    private void rebuildSelector() throws IOException {
        Selector newSelector=Selector.open();
        Selector oldSelect=selector;
        for (SelectionKey selectionKey : oldSelect.keys()) {
            //感興趣事件對應(yīng)的數(shù)值
            int i = selectionKey.interestOps();
            //取消舊的鍵
            selectionKey.cancel();
            //將channel注冊到新的選擇器上
            selectionKey.channel().register(newSelector,i);
        }
        selector=newSelector;
        oldSelect.close();//關(guān)閉舊的
    }

    //讀取客戶端發(fā)來的數(shù)據(jù)
    private void readClientData(SelectionKey selectionKey) throws IOException {
        //獲取跟客戶端連接的通道
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        //生成緩沖區(qū),用于接收客戶端傳輸進(jìn)來的數(shù)據(jù)
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        //讀取數(shù)據(jù)到緩沖區(qū),返回實(shí)際讀取到的字節(jié)數(shù),沒有數(shù)據(jù)返回-1
        int read = socketChannel.read(byteBuffer);
        //讀之前,將緩沖區(qū)設(shè)置為讀狀態(tài)
        byteBuffer.flip();
        if(read>0){//判斷確實(shí)讀到數(shù)據(jù)了
            //創(chuàng)建臨時(shí)發(fā)送字節(jié)數(shù)組
            byte[] bytes=new byte[read];
            //將緩沖區(qū)數(shù)據(jù)寫到臨時(shí)數(shù)組
            byteBuffer.get(bytes,0,read);
            //讀取了數(shù)據(jù)  廣播
            String s = new String(bytes,"utf-8");
            //將此數(shù)據(jù)發(fā)送到其他客戶端
            writeClientData(socketChannel,s);
        }
    }

    //廣播  將讀取的數(shù)據(jù)群發(fā)
    private void writeClientData(SocketChannel socketChannel,String s) throws IOException {
        //獲取到所有的注冊事件,不管有沒有就緒
        Set<SelectionKey> keys = selector.keys();
        //遍歷事件
        for (SelectionKey key : keys) {
            //判斷事件是否還有效
            if(key.isValid()){
                //獲取事件對應(yīng)的channel
                SelectableChannel channel = key.channel();
                //注意,我們只需要將信息發(fā)送給客戶端
                if(channel instanceof  SocketChannel){
                    SocketChannel socketChannel1= (SocketChannel) channel;
                    //不需要發(fā)送給自己了
                    if(channel!=socketChannel){
                        ByteBuffer wrap = ByteBuffer.wrap(s.getBytes());
                        socketChannel1.write(wrap);
                    }
                }
            }
        }
    }


    public static void main(String[] args) throws Exception {
        new ChatServer().start();
    }
}

客戶端

package NIO.luban.chat;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;

public class ChatClient implements  Runnable{

    private SocketChannel socketChannel;

    private Selector selector;

    public ChatClient(){
        try {
            //得到一個(gè)網(wǎng)絡(luò)通道
            socketChannel=SocketChannel.open();
            //打開一個(gè)選擇器
            selector=Selector.open();
            //設(shè)置非阻塞式
            socketChannel.configureBlocking(false);
        }catch (Exception e){
            e.printStackTrace();
        }
    }



    public void doCon(){
        //提供服務(wù)器ip與端口
        InetSocketAddress inetSocketAddress=new InetSocketAddress("127.0.0.1",9090);
        //連接服務(wù)器端
        try {
            //連接服務(wù)器,如果成功了
            if(socketChannel.connect(inetSocketAddress)){
                //注冊讀事件
                socketChannel.register(selector,SelectionKey.OP_READ);
                //寫數(shù)據(jù)
                writeData(socketChannel);
            }else{
                //注冊連接事件
                socketChannel.register(selector, SelectionKey.OP_CONNECT);//如果連接不上
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void writeData(SocketChannel socketChannel) throws IOException {
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    while (true){
                        //等待你的輸入
                        Scanner scanner=new Scanner(System.in);
                        String str = scanner.nextLine();
                        if(str.equals("by")){
                            socketChannel.close();
                            return;
                        }
                        //將你的輸入包裝成緩沖區(qū)
                        ByteBuffer byteBuffer=ByteBuffer.wrap((socketChannel.getLocalAddress().toString()+"說:"+str).getBytes());
                        //發(fā)送你的數(shù)據(jù)
                        socketChannel.write(byteBuffer);
                    }
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }).start();
    }

    //讀數(shù)據(jù)
    public void readData() throws IOException {
        ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
        int read = socketChannel.read(byteBuffer);
        if(read>0){
            byte[] array = byteBuffer.array();
            System.out.println(new String(array,"utf-8"));
        }
    }


    public static void main(String[] args) throws IOException {
        new Thread(new ChatClient()).start();
    }

    @Override
    public void run() {
        doCon();
        try {
            while (true){
                selector.select(1000);
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()){
                    SelectionKey selectionKey = iterator.next();
                    if(selectionKey.isValid()){
                        if(selectionKey.isConnectable()){
                            SocketChannel channel = (SocketChannel) selectionKey.channel();
                            if (channel.finishConnect()){
                                channel.register(selector,SelectionKey.OP_READ);
                                System.out.println("bbbbbbbbbbbbb");
                                //寫數(shù)據(jù)
                                writeData(channel);
                            }else{
                                System.exit(1);
                            }
                        }
                        if(selectionKey.isReadable()){
                            readData();
                        }
                    }
                    iterator.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}


NIO的Reactor單線程模型

上面的聊天案例無論是服務(wù)端還是客戶端,都是單線程的,所有的鏈接及讀寫都是在一個(gè)main方法所在的主線程內(nèi)運(yùn)行。

拿服務(wù)器代碼來說,一個(gè)main線程,要做以下工作

  1. 接收客戶端連接
  2. 讀取已連接上的客戶端發(fā)來的數(shù)據(jù)
  3. 讀到數(shù)據(jù)后要解碼,處理業(yè)務(wù)邏輯
  4. 編碼,響應(yīng)客戶端,向客戶端寫回?cái)?shù)據(jù)

一個(gè)線程,在同一時(shí)刻只能做上面的一件事情,如果線程在讀取數(shù)據(jù)的時(shí)候阻塞了,那其他三件事都不能做,新的客戶端也無法鏈接成功。我們可以讓服務(wù)器端只處理鏈接,讀和寫交給另一個(gè)線程處理。如下圖所示:


服務(wù)端,主線程處理鏈接,讀寫交給其他線程

    // Reactor線程  
    package NIO.luban.oneReactor;

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.util.Iterator;
    import java.util.Set;

    public class TCPReactor implements Runnable {  
        private final ServerSocketChannel ssc;  
        private final Selector selector;  
        public TCPReactor(int port) throws IOException {
            //打開選擇器進(jìn)行IO多路復(fù)用
            selector = Selector.open();
            //打開服務(wù)器通道
            ssc = ServerSocketChannel.open();  
            InetSocketAddress addr = new InetSocketAddress(port);
            //綁定端口
            ssc.socket().bind(addr); // 在ServerSocketChannel綁定監(jiān)聽端口
            //設(shè)置ServerSocketChannel為非阻塞
            ssc.configureBlocking(false);
            //注冊鏈接事件
            SelectionKey sk = ssc.register(selector, SelectionKey.OP_ACCEPT);
            //將時(shí)間綁定一個(gè)處理器,事件發(fā)生后由這個(gè)處理器完成后續(xù)操作
            sk.attach(new Acceptor(selector, ssc));
        }  
      
        @Override  
        public void run() {  
            while (!Thread.interrupted()) { // 在線程被中斷前持續(xù)運(yùn)行  
                System.out.println("Waiting for new event on port: " + ssc.socket().getLocalPort() + "...");  
                try {
                    // 若沒有事件就緒則不往下執(zhí)行
                    if (selector.select() == 0)
                        continue;  
                } catch (IOException e) {  
                    e.printStackTrace();
                }
                // 取得所有已就緒事件的key集合
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                //遍歷事件
                Iterator<SelectionKey> it = selectedKeys.iterator();  
                while (it.hasNext()) {
                    //調(diào)度事件,在這里我們開啟另一個(gè)線程進(jìn)行讀寫操作
                    dispatch((it.next()));
                    it.remove();  
                }  
            }  
        }  
      
        /* 
         * name: dispatch(SelectionKey key) 
         * description: 調(diào)度方法,根據(jù)事件綁定的對象開新線程 
         */  
        private void dispatch(SelectionKey key) {  
            Runnable r = (Runnable) (key.attachment()); // 根據(jù)事件之key綁定的對象開新線程  
            if (r != null)  {
                r.run();
            }
        }
      
    }  

鏈接處理器

    public class Acceptor implements Runnable {  
      
        private final ServerSocketChannel ssc;  
        private final Selector selector;  
          
        public Acceptor(Selector selector, ServerSocketChannel ssc) {  
            this.ssc=ssc;  
            this.selector=selector;  
        }  
          
        @Override  
        public void run() {  
            try {
                // 接受client鏈接請求
                SocketChannel sc= ssc.accept();
                System.out.println(sc.socket().getRemoteSocketAddress().toString() + " is connected.");  
                if(sc!=null) {
                    // 設(shè)置為非阻塞  
                    sc.configureBlocking(false);
                    // SocketChannel向selector註冊一個(gè)讀事件,然後返回該通道的key  
                    SelectionKey sk = sc.register(selector, SelectionKey.OP_READ);
                    // 使一個(gè)阻塞住的selector操作立即返回
                    selector.wakeup();
                    // 給定key一個(gè)附加的TCPHandler對象,用來處理后續(xù)讀寫操作
                    sk.attach(new TCPHandler(sk, sc));
                }
            } catch (IOException e) {  
                // TODO Auto-generated catch block  
                e.printStackTrace();  
            }  
        }  
          
    }  

讀寫處理器

    // Handler線程  
    package NIO.luban.oneReactor;

    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.SocketChannel;

    public class TCPHandler implements Runnable {  
      
        private final SelectionKey sk;  
        private final SocketChannel sc;  
      
        int state;   
      
        public TCPHandler(SelectionKey sk, SocketChannel sc) {  
            this.sk = sk;  
            this.sc = sc;  
            state = 0; // 初始狀態(tài)設(shè)定為READING ,第一次肯定是先讀客戶端數(shù)據(jù)
        }  
      
        @Override  
        public void run() {  
            try {  
                if (state == 0)  
                    read(); // 讀取數(shù)據(jù)
                else  
                    send(); // 發(fā)送
      
            } catch (IOException e) {  
                System.out.println("[Warning!] A client has been closed.");  
                closeChannel();  
            }  
        }  
          
   
      
        private synchronized void read() throws IOException {  
            byte[] arr = new byte[1024];  
            ByteBuffer buf = ByteBuffer.wrap(arr);  
              
            int numBytes = sc.read(buf); // 讀取字符串  
            if(numBytes == -1)  
            {  
                System.out.println("[Warning!] A client has been closed.");  
                closeChannel();  
                return;  
            }  
            String str = new String(arr); // 將讀取到的byte內(nèi)容轉(zhuǎn)換字符串  
            if ((str != null) && !str.equals(" ")) { 
                //處理數(shù)據(jù)
                process(str); // 
                System.out.println(sc.socket().getRemoteSocketAddress().toString()  
                        + " > " + str);  
                //在這個(gè)通道讀完了后,下一步往這個(gè)通道寫數(shù)據(jù)
                //改成寫狀態(tài)
                state = 1; 
                sk.interestOps(SelectionKey.OP_WRITE); // 通過key改變通道註冊的事件  
                sk.selector().wakeup(); // 使一個(gè)阻塞住的selector操作立即返回  
            }  
        }  
      
        private void send() throws IOException  {  
            // get message from message queue  
              
            String str = "Your message has sent to "  
                    + sc.socket().getLocalSocketAddress().toString() + "\r\n";  
            ByteBuffer buf = ByteBuffer.wrap(str.getBytes()); // wrap自動把buf的position設(shè)為0,所以不需要再flip()  
      
            while (buf.hasRemaining()) {  
                sc.write(buf); // 回傳給client回應(yīng)字符串,發(fā)送buf的position位置 到limit位置為止之間的內(nèi)容  
            }  
              
            state = 0; // 改變狀態(tài)  
            sk.interestOps(SelectionKey.OP_READ); // 通過key改變通道註冊的事件  
            sk.selector().wakeup(); // 使一個(gè)阻塞住的selector操作立即返回  
        }  
          
        void process(String str) {  
            // do process(decode, logically process, encode)..
            // ..
            try {
                //等待6秒,模擬數(shù)據(jù)處理
                Thread.sleep(60000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        //關(guān)閉通道
        private void closeChannel() {
            try {
                sk.cancel();
                sc.close();
            } catch (IOException e1) {
                e1.printStackTrace();
            }
        }
    }  

啟動服務(wù)器

public class Main {
   public static void main(String[] args) {
       System.out.println(Main.class.getName());
       // TODO Auto-generated method stub
       try {
           TCPReactor reactor = new TCPReactor(1333);
           reactor.run();
       } catch (IOException e) {
           // TODO Auto-generated catch block
           e.printStackTrace();
       }
   }
}

客戶端

public class Client {

    /**
     * @param args
     */
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        String hostname="127.0.0.1";
        int port = 9999;
        //String hostname="127.0.0.1";
        //int port=1333;
        try {
            Socket client = new Socket(hostname, port); // 連接至目的地
            System.out.println("連接至目的地:"+ hostname);
            PrintWriter out = new PrintWriter(client.getOutputStream());
            BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
            BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in));
            String input;

            while((input=stdIn.readLine()) != null) { // 讀取輸入
                out.println(input); // 發(fā)送輸入的字符串
                out.flush(); // 強(qiáng)制將緩衝區(qū)內(nèi)的數(shù)據(jù)輸出
                if(input.equals("exit"))
                {
                    break;
                }
                System.out.println("server: "+in.readLine());
            }
            client.close();
            System.out.println("client stop.");
        } catch (UnknownHostException e) {
            // TODO Auto-generated catch block
            System.err.println("Don't know about host: " + hostname);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            System.err.println("Couldn't get I/O for the socket connection");
        }

    }

}

NIO的Reactor多線程模型

上面的單線程模型有個(gè)缺點(diǎn),就是每一個(gè)連接都要開啟一個(gè)線程,如果有10000個(gè)請求,服務(wù)器需要開啟10000個(gè)線程,顯然是不合理的,我們可以使用線程池技術(shù)來實(shí)現(xiàn)多線程模型。
首先,編寫服務(wù)端ServerSocketChannel對應(yīng)的Selector

    public TCPReactor(int port) throws IOException {
        //打開一個(gè)selector IO多路復(fù)用器
        selector = Selector.open();
        //打開服務(wù)端通道
        ssc = ServerSocketChannel.open();  
        InetSocketAddress addr = new InetSocketAddress(port);
        //綁定端口
        ssc.socket().bind(addr);
        ssc.configureBlocking(false); // 設(shè)置ServerSocketChannel為非阻塞
        //注冊連接請求事件
        SelectionKey sk = ssc.register(selector, SelectionKey.OP_ACCEPT); // ServerSocketChannel向selector註冊一個(gè)OP_ACCEPT事件,然後返回該通道的key
        //綁定連接處理器,連接進(jìn)來后用Acceptor做后續(xù)處理
        sk.attach(new Acceptor(selector, ssc));
    }  
    
    @Override  
    public void run() {  
        while (!Thread.interrupted()) { // 在線程被中斷前持續(xù)運(yùn)行  
            System.out.println("Waiting for new event on port: " + ssc.socket().getLocalPort() + "...");  
            try {
                //輪詢查看是否有事件就緒, 若沒有事件就緒則不往下執(zhí)行
                if (selector.select() == 0)
                    continue;  
            } catch (IOException e) {  
                // TODO Auto-generated catch block  
                e.printStackTrace();  
            }
            //程序執(zhí)行到這里說明有連接事件發(fā)生了,也就是說有客戶端請求連接了
            //獲取所有的連接事件,遍歷處理
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            Iterator<SelectionKey> it = selectedKeys.iterator();  
            while (it.hasNext()) {
                //連接請求轉(zhuǎn)發(fā)
                dispatch((SelectionKey) (it.next())); // 根據(jù)事件的key進(jìn)行調(diào)度
                //刪除事件,表示已經(jīng)處理完了,下次循環(huán)不再處理已經(jīng)處理過的連接
                it.remove();
            }  
        }  
    }  

    //獲取事件的處Acceptor理器,開啟一個(gè)線程進(jìn)行處理
    private void dispatch(SelectionKey key) {
        Runnable r = (Runnable) (key.attachment());  
        if (r != null)  
            r.run();  
    }}  

下一步,我們看Acceptor處理器是如何處理的:

    public class Acceptor implements Runnable {  
      
        private final ServerSocketChannel ssc;  
        private final Selector selector;  
          
        public Acceptor(Selector selector, ServerSocketChannel ssc) {  
            this.ssc=ssc;  
            this.selector=selector;  
        }  
          
        @Override  
        public void run() {  
            try {
                // 接受client連接請求
                SocketChannel sc= ssc.accept();
                System.out.println(sc.socket().getRemoteSocketAddress().toString() + " is connected.");  
                  
                if(sc!=null) {  
                    sc.configureBlocking(false); // 設(shè)置為非阻塞
                    //注冊讀事件
                    SelectionKey sk = sc.register(selector, SelectionKey.OP_READ); // SocketChannel向selector註冊一個(gè)OP_READ事件,然後返回該通道的key  
//                    System.out.println(sk.selector()==selector);
                    selector.wakeup(); // 使一個(gè)阻塞住的selector操作立即返回
                    // 將讀事件交給TCPHandler進(jìn)行處理
                    sk.attach(new TCPHandler(sk, sc));
                }  
                  
            } catch (IOException e) {  
                // TODO Auto-generated catch block  
                e.printStackTrace();  
            }  
        }        
    } 

Acceptor的處理很簡單,就是接收請求,然后注冊讀事件,同事讀事件的后續(xù)處理交給處理器TCPHandler處理,我們看一下TCPHandler如何處理的:

    public class TCPHandler implements Runnable {  
      
        private final SelectionKey sk;  
        private final SocketChannel sc;  
        private static final int THREAD_COUNTING = 10;
        //讀寫事件交給線程池處理
        private static ThreadPoolExecutor pool = new ThreadPoolExecutor(  
                THREAD_COUNTING, THREAD_COUNTING, 10, TimeUnit.SECONDS,  
                new LinkedBlockingQueue<Runnable>()); // 線程池  

        //讀寫狀態(tài)處理器
        HandlerState state;
      
        public TCPHandler(SelectionKey sk, SocketChannel sc) {  
            this.sk = sk;  
            this.sc = sc;
            // 初始狀態(tài)設(shè)置為讀狀態(tài)
            state = new ReadState();
            pool.setMaximumPoolSize(32); // 設(shè)置線程池最大線程數(shù)  
        }  
      
        @Override  
        public void run() {  
            try {
                //利用線程池處理讀寫
                state.handle(this, sk, sc, pool);  
            } catch (IOException e) {
                System.out.println("[Warning!] A client has been closed.");  
                closeChannel();  
            }  
        }  
          
        public void closeChannel() {  
            try {  
                sk.cancel();  
                sc.close();  
            } catch (IOException e1) {  
                e1.printStackTrace();  
            }  
        }  

        //讀寫狀態(tài)的更改,讀事件處理完改為寫狀態(tài),寫狀態(tài)處理完改為讀狀態(tài)
        public void setState(HandlerState state) {  
            this.state = state;  
        }  
    }  

TCPHandler 處理器維護(hù)一個(gè)線程池,用于處理真正的讀寫事件,客戶端連接服務(wù)器后初始時(shí)處理讀事件,讀事件處理完后處理寫事件,寫事件處理完后繼續(xù)處理讀事件,來回反復(fù)處理。我們看一下讀事件是如何處理的

public class ReadState implements HandlerState{

    private SelectionKey sk;

    public ReadState() {
    }

    @Override
    public void changeState(TCPHandler h) {
        // TODO Auto-generated method stub
        h.setState(new WorkState());
    }

    @Override
    public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
                       ThreadPoolExecutor pool) throws IOException { // read()
        this.sk = sk;
        // non-blocking下不可用Readers,因?yàn)镽eaders不支援non-blocking
        byte[] arr = new byte[1024];
        ByteBuffer buf = ByteBuffer.wrap(arr);

        int numBytes = sc.read(buf); // 讀取字符串
        if(numBytes == -1)
        {
            System.out.println("[Warning!] A client has been closed.");
            h.closeChannel();
            return;
        }
        String str = new String(arr); // 將讀取到的byte內(nèi)容轉(zhuǎn)為字符串型態(tài)
        if ((str != null) && !str.equals(" ")) {
            h.setState(new WorkState()); // 改變狀態(tài)(READING->WORKING)
            pool.execute(new WorkerThread(h, str)); // do process in worker thread
            System.out.println(sc.socket().getRemoteSocketAddress().toString()
                    + " > " + str);
        }

    }

    /*
     * 執(zhí)行邏輯處理之函數(shù)
     */
    synchronized void process(TCPHandler h, String str) {
//             do process(decode, logically process, encode)..
        // ..
        h.setState(new WriteState()); // 改變狀態(tài)(WORKING->SENDING)
        this.sk.interestOps(SelectionKey.OP_WRITE); // 通過key改變通道註冊的事件
        this.sk.selector().wakeup(); // 使一個(gè)阻塞住的selector操作立即返回
    }

    /*
     * 工作者線程
     */
    class WorkerThread implements Runnable {

        TCPHandler h;
        String str;

        public WorkerThread(TCPHandler h, String str) {
            this.h = h;
            this.str=str;
        }

        @Override
        public void run() {
            process(h, str);
        }

    }
}

讀完后,將寫事件注冊。寫一次輪詢到讀事件后,交由WriteState處理器處理

public class WriteState implements HandlerState{

    public WriteState() {
    }

    @Override
    public void changeState(TCPHandler h) {
        // TODO Auto-generated method stub
        h.setState(new ReadState());
    }

    @Override
    public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
                       ThreadPoolExecutor pool) throws IOException { // send()
        // get message from message queue

        String str = "Your message has sent to "
                + sc.socket().getLocalSocketAddress().toString() + "\r\n";
        ByteBuffer buf = ByteBuffer.wrap(str.getBytes()); // wrap自動把buf的position設(shè)為0,所以不需要再flip()

        while (buf.hasRemaining()) {
            sc.write(buf); // 回傳給client回應(yīng)字符串,發(fā)送buf的position位置 到limit位置為止之間的內(nèi)容
        }

        h.setState(new ReadState()); // 改變狀態(tài)(SENDING->READING)
        sk.interestOps(SelectionKey.OP_READ); // 通過key改變通道註冊的事件
        sk.selector().wakeup(); // 使一個(gè)阻塞住的selector操作立即返回
    }
}

WriteState 寫處理器和ReadState讀處理器都繼承了HandlerState接口,

public interface HandlerState {

     void changeState(TCPHandler h);

     void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
                 ThreadPoolExecutor pool) throws IOException ;
}

上面的工作狀態(tài)轉(zhuǎn)換有WorkState完成

public class WorkState implements HandlerState {

    public WorkState() {
    }

    @Override
    public void changeState(TCPHandler h) {
        // TODO Auto-generated method stub
        h.setState(new WriteState());
    }

    @Override
    public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
                       ThreadPoolExecutor pool) throws IOException {
        // TODO Auto-generated method stub

    }
}

編寫測試類

public class Main {
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        try {
            TCPReactor reactor = new TCPReactor(1333);
//                new Thread(reactor).start();
            reactor.run();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

客戶端用單線程模式的就可以啦。

NIO主從Reactor模型

上面的代碼,有沒有發(fā)現(xiàn)所有的事件都注冊在同一個(gè)selector上,selector表示好累!講道理,ServerSocketChannel只是用來處理鏈接就可以了,它不需要處理讀事件和寫事件。讀事件和寫事件完全可以交給另一個(gè)選擇器。這就是NIO的主從Reactor模型。

主線程只負(fù)責(zé)接收客戶端連接,然后交其他從線程,使當(dāng)有客戶端連接時(shí),可以很快的受到處理。同時(shí),從線程專門負(fù)責(zé)讀取注冊到自己selector上面的客戶端數(shù)據(jù)。并發(fā)讀寫能力得到了大大的提高。當(dāng)然,如果,每一個(gè)SocketChannel的讀寫事件都注冊到單獨(dú)的selector上顯然是浪費(fèi)資源的,我們可以用一個(gè)selecort管理N個(gè)SocketChannel,也就是說對selector進(jìn)行了分組。比如,用戶管理模塊注冊一個(gè)selector,權(quán)限模塊注冊一個(gè)selector,日志模塊注冊一個(gè)selector,這樣模塊間的讀寫互不影響。selector數(shù)量取決你電腦CPU的核數(shù),一般來說selecor數(shù)量為cpu核數(shù)2。也就是說,我們的主selector有1個(gè),從selector有cpu2個(gè)。

OK!下面我們看這種主從Reactor模式的代碼如何編寫。
首先編寫服務(wù)端

    public class TCPReactor implements Runnable {  
      
        private final ServerSocketChannel ssc;  
        private final Selector selector; // mainReactor用的selector  
      
        public TCPReactor(int port) throws IOException {  
            selector = Selector.open();  
            ssc = ServerSocketChannel.open();
            ssc.configureBlocking(false); // 設(shè)置ServerSocketChannel為非阻塞

            Acceptor acceptor = new Acceptor(ssc);

            SelectionKey sk = ssc.register(selector,SelectionKey.OP_ACCEPT); // ServerSocketChannel向selector註冊一個(gè)OP_ACCEPT事件,然後返回該通道的key
            sk.attach(acceptor); // 給定key一個(gè)附加的Acceptor對象

            InetSocketAddress addr = new InetSocketAddress(port);
            ssc.socket().bind(addr); // 在ServerSocketChannel綁定監(jiān)聽端口
        }  
      
        @Override  
        public void run() {  
            while (!Thread.interrupted()) { // 在線程被中斷前持續(xù)運(yùn)行  
                System.out.println("mainReactor waiting for new event on port: "  
                        + ssc.socket().getLocalPort() + "...");  
                try {  
                    if (selector.select() == 0) // 若沒有事件就緒則不往下執(zhí)行  
                        continue;  
                } catch (IOException e) {  
                    e.printStackTrace();  
                }  
                Set<SelectionKey> selectedKeys = selector.selectedKeys(); // 取得所有已就緒事件的key集合  
                Iterator<SelectionKey> it = selectedKeys.iterator();  
                while (it.hasNext()) {  
                    dispatch((SelectionKey) (it.next())); // 根據(jù)事件的key進(jìn)行調(diào)度  
                    it.remove();  
                }  
            }  
        }  
      
        /* 
         * name: dispatch(SelectionKey key) 
         * description: 調(diào)度方法,根據(jù)事件綁定的對象開新線程 
         */  
        private void dispatch(SelectionKey key) {  
            Runnable r = (Runnable) (key.attachment()); // 根據(jù)事件之key綁定的對象開新線程  
            if (r != null)  
                r.run();  
        }  
      
    }  

代碼跟多線層模式基本一樣,不解釋了。
再來看Acceptor處理器

public class Acceptor implements Runnable {  
      
        private final ServerSocketChannel ssc; // mainReactor監(jiān)聽的socket通道  
        private final int cores = Runtime.getRuntime().availableProcessors(); // 取得CPU核心數(shù)
        private final Selector[] selectors = new Selector[cores]; // 創(chuàng)建核心數(shù)個(gè)selector給subReactor用  
        private int selIdx = 0; // 當(dāng)前可使用的subReactor索引  
        private TCPSubReactor[] r = new TCPSubReactor[cores]; // subReactor線程
        private Thread[] t = new Thread[cores]; // subReactor線程  
      
        public Acceptor(ServerSocketChannel ssc) throws IOException {  
            this.ssc = ssc;  
            // 創(chuàng)建多個(gè)selector以及多個(gè)subReactor線程  
            for (int i = 0; i < cores; i++) {  
                selectors[i] = Selector.open();  
                r[i] = new TCPSubReactor(selectors[i], ssc, i);
                t[i] = new Thread(r[i]);  
                t[i].start();
            }
        }
      
        @Override  
        public synchronized void run() {  
            try {  
                SocketChannel sc = ssc.accept(); // 接受client連線請求  
                System.out.println(sc.socket().getRemoteSocketAddress().toString()  
                        + " is connected.");  
      
                if (sc != null) {  
                    sc.configureBlocking(false); // 設(shè)置為非阻塞  
                    r[selIdx].setRestart(true); // 暫停線程  
                    selectors[selIdx].wakeup(); // 使一個(gè)阻塞住的selector操作立即返回  
                    SelectionKey sk = sc.register(selectors[selIdx],  
                            SelectionKey.OP_READ); // SocketChannel向selector[selIdx]註冊一個(gè)OP_READ事件,然後返回該通道的key  
                    selectors[selIdx].wakeup(); // 使一個(gè)阻塞住的selector操作立即返回  
                    r[selIdx].setRestart(false); // 重啟線程  
                    sk.attach(new TCPHandler(sk, sc)); // 給定key一個(gè)附加的TCPHandler對象
                    if (++selIdx == selectors.length)  
                        selIdx = 0;  
                }  
            } catch (IOException e) {  
                e.printStackTrace();  
            }  
        }  
      
    }  

此時(shí),我們將讀寫事件注冊到其他selector中,讀寫事件輪詢注冊到不同的子selector上,實(shí)現(xiàn)高并發(fā)處理。

private final int cores = Runtime.getRuntime().availableProcessors(); // 取得CPU核心數(shù)
private final Selector[] selectors = new Selector[cores]; // 創(chuàng)建核心數(shù)個(gè)selector給subReactor用  

子selector

    public class TCPReactor implements Runnable {  
      
        private final ServerSocketChannel ssc;  
        private final Selector selector; // mainReactor用的selector  
      
        public TCPReactor(int port) throws IOException {  
            selector = Selector.open();  
            ssc = ServerSocketChannel.open();
            ssc.configureBlocking(false); // 設(shè)置ServerSocketChannel為非阻塞

            Acceptor acceptor = new Acceptor(ssc);

            SelectionKey sk = ssc.register(selector,SelectionKey.OP_ACCEPT); // ServerSocketChannel向selector註冊一個(gè)OP_ACCEPT事件,然後返回該通道的key
            sk.attach(acceptor); // 給定key一個(gè)附加的Acceptor對象

            InetSocketAddress addr = new InetSocketAddress(port);
            ssc.socket().bind(addr); // 在ServerSocketChannel綁定監(jiān)聽端口
        }  
      
        @Override  
        public void run() {  
            while (!Thread.interrupted()) { // 在線程被中斷前持續(xù)運(yùn)行  
                System.out.println("mainReactor waiting for new event on port: "  
                        + ssc.socket().getLocalPort() + "...");  
                try {  
                    if (selector.select() == 0) // 若沒有事件就緒則不往下執(zhí)行  
                        continue;  
                } catch (IOException e) {  
                    e.printStackTrace();  
                }  
                Set<SelectionKey> selectedKeys = selector.selectedKeys(); // 取得所有已就緒事件的key集合  
                Iterator<SelectionKey> it = selectedKeys.iterator();  
                while (it.hasNext()) {  
                    dispatch((SelectionKey) (it.next())); // 根據(jù)事件的key進(jìn)行調(diào)度  
                    it.remove();  
                }  
            }  
        }  
      
        /* 
         * name: dispatch(SelectionKey key) 
         * description: 調(diào)度方法,根據(jù)事件綁定的對象開新線程 
         */  
        private void dispatch(SelectionKey key) {  
            Runnable r = (Runnable) (key.attachment()); // 根據(jù)事件之key綁定的對象開新線程  
            if (r != null)  
                r.run();  
        }  
      
    }  

讀寫處理器

 public class TCPHandler implements Runnable {  
      
        private final SelectionKey sk;  
        private final SocketChannel sc;  
        private static final int THREAD_COUNTING = 10;  
        private static ThreadPoolExecutor pool = new ThreadPoolExecutor(  
                THREAD_COUNTING, THREAD_COUNTING, 10, TimeUnit.SECONDS,  
                new LinkedBlockingQueue<Runnable>()); // 線程池  
      
        HandlerState state; // 以狀態(tài)模式實(shí)現(xiàn)Handler  
      
        public TCPHandler(SelectionKey sk, SocketChannel sc) {  
            this.sk = sk;  
            this.sc = sc;  
            state = new ReadState(); // 初始狀態(tài)設(shè)定為READING
            pool.setMaximumPoolSize(32); // 設(shè)置線程池最大線程數(shù)  
        }  
      
        @Override  
        public void run() {  
            try {
                state.handle(this, sk, sc, pool);
            } catch (IOException e) {
                System.out.println("[Warning!] A client has been closed.");  
                closeChannel();  
            }  
        }  
      
        public void closeChannel() {  
            try {  
                sk.cancel();  
                sc.close();  
            } catch (IOException e1) {  
                e1.printStackTrace();  
            }  
        }  
      
        public void setState(HandlerState state) {  
            this.state = state;  
        }  
    }  

真正的讀

public class ReadState implements HandlerState {

    private SelectionKey sk;

    public ReadState() {
    }

    @Override
    public void changeState(TCPHandler h) {
        // TODO Auto-generated method stub
        h.setState(new WorkState());
    }

    @Override
    public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
            ThreadPoolExecutor pool) throws IOException { // read()
        this.sk = sk;
        // non-blocking下不可用Readers,因?yàn)镽eaders不支援non-blocking
        byte[] arr = new byte[1024];
        ByteBuffer buf = ByteBuffer.wrap(arr);

        int numBytes = sc.read(buf); // 讀取字符串
        if(numBytes == -1)
        {
            System.out.println("[Warning!] A client has been closed.");
            h.closeChannel();
            return;
        }
        String str = new String(arr); // 將讀取到的byte內(nèi)容轉(zhuǎn)為字符串型態(tài)
        if ((str != null) && !str.equals(" ")) {
            h.setState(new WorkState()); // 改變狀態(tài)(READING->WORKING)
            pool.execute(new WorkerThread(h, str)); // do process in worker thread
            System.out.println(sc.socket().getRemoteSocketAddress().toString()
                    + " > " + str);
        }

    }

    /*
     * 執(zhí)行邏輯處理之函數(shù)
     */
    synchronized void process(TCPHandler h, String str) {
        // do process(decode, logically process, encode)..
        // ..
        h.setState(new WriteState()); // 改變狀態(tài)(WORKING->SENDING)
        this.sk.interestOps(SelectionKey.OP_WRITE); // 通過key改變通道註冊的事件
        this.sk.selector().wakeup(); // 使一個(gè)阻塞住的selector操作立即返回
    }

    /*
     * 工作者線程
     */
    class WorkerThread implements Runnable {

        TCPHandler h;
        String str;

        public WorkerThread(TCPHandler h, String str) {
            this.h = h;
            this.str=str;
        }

        @Override
        public void run() {
            process(h, str);
        }

    }
}

真正的寫

public class WriteState implements HandlerState {

    public WriteState() {
    }

    @Override
    public void changeState(TCPHandler h) {
        // TODO Auto-generated method stub
        h.setState(new ReadState());
    }

    @Override
    public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
            ThreadPoolExecutor pool) throws IOException { // send()
        // get message from message queue

        String str = "Your message has sent to "
                + sc.socket().getLocalSocketAddress().toString() + "\r\n";
        ByteBuffer buf = ByteBuffer.wrap(str.getBytes()); // wrap自動把buf的position設(shè)為0,所以不需要再flip()

        while (buf.hasRemaining()) {
            sc.write(buf); // 回傳給client回應(yīng)字符串,發(fā)送buf的position位置 到limit位置為止之間的內(nèi)容
        }

        h.setState(new ReadState()); // 改變狀態(tài)(SENDING->READING)
        sk.interestOps(SelectionKey.OP_READ); // 通過key改變通道註冊的事件
        sk.selector().wakeup(); // 使一個(gè)阻塞住的selector操作立即返回
    }
}

編寫客戶端

public class Main {
    public static void main(String[] args) {
        // TODO Auto-generated method stub
        try {
            TCPReactor reactor = new TCPReactor(1333);
//                reactor.run();
            Thread thread = new Thread(reactor);
            thread.start();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

如果第一次接觸NIO,上面的代碼讀起來比較費(fèi)勁,NIO編程確實(shí)麻煩,而且很容易出錯,現(xiàn)實(shí)開發(fā)中不會用原生NIO庫,小編都是用netty這個(gè)NIO框架進(jìn)行編程,簡單 高效 穩(wěn)定,所以看不懂上面的代碼沒關(guān)系,只要理解上面的三幅圖即可,這三幅圖是netty最最核心的。下篇開始講netty應(yīng)用及源碼。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容