nio和netty詳解

一、概述
Netty是一個Java的開源框架。提供異步的、事件驅(qū)動的網(wǎng)絡(luò)應(yīng)用程序框架和工具,用以快速開發(fā)高性能、高可靠性的網(wǎng)絡(luò)服務(wù)器和客戶端程序。
Netty是一個NIO客戶端,服務(wù)端框架。允許快速簡單的開發(fā)網(wǎng)絡(luò)應(yīng)用程序。例如:服務(wù)端和客戶端之間的協(xié)議,它簡化了網(wǎng)絡(luò)編程規(guī)范。

二、NIO開發(fā)的問題
1、NIO類庫和API復(fù)雜,使用麻煩。
2、需要具備Java多線程編程能力(涉及到Reactor模式)。
3、客戶端斷線重連、網(wǎng)絡(luò)不穩(wěn)定、半包讀寫、失敗緩存、網(wǎng)絡(luò)阻塞和異常碼流等問題處理難度非常大
4、存在部分BUG

NIO進行服務(wù)器開發(fā)的步驟:
1、創(chuàng)建ServerSocketChannel,配置為非阻塞模式;
2、綁定監(jiān)聽,配置TCP參數(shù);
3、創(chuàng)建一個獨立的IO線程,用于輪詢多路復(fù)用器Selector;
4、創(chuàng)建Selector,將之前創(chuàng)建的ServerSocketChannel注冊到Selector上,監(jiān)聽Accept事件;
5、啟動IO線程,在循環(huán)中執(zhí)行Select.select()方法,輪詢就緒的Channel;
6、當輪詢到處于就緒狀態(tài)的Channel時,需要對其進行判斷,如果是OP_ACCEPT狀態(tài),說明有新的客戶端接入,則調(diào)用ServerSocketChannel.accept()方法接受新的客戶端;
7、設(shè)置新接入的客戶端鏈路SocketChannel為非阻塞模式,配置TCP參數(shù);
8、將SocketChannel注冊到Selector上,監(jiān)聽READ事件;
9、如果輪詢的Channel為OP_READ,則說明SocketChannel中有新的準備就緒的數(shù)據(jù)包需要讀取,則構(gòu)造ByteBuffer對象,讀取數(shù)據(jù)包;
10、如果輪詢的Channel為OP_WRITE,則說明還有數(shù)據(jù)沒有發(fā)送完成,需要繼續(xù)發(fā)送。

三、Netty的優(yōu)點
1、API使用簡單,開發(fā)門檻低;
2、功能強大,預(yù)置了多種編解碼功能,支持多種主流協(xié)議;
3、定制功能強,可以通過ChannelHandler對通信框架進行靈活的擴展;
4、性能高,通過與其他業(yè)界主流的NIO框架對比,Netty綜合性能最優(yōu);
5、成熟、穩(wěn)定,Netty修復(fù)了已經(jīng)發(fā)現(xiàn)的NIO所有BUG;
6、社區(qū)活躍;
7、經(jīng)歷了很多商用項目的考驗。

粘包/拆包問題
TCP是一個“流”協(xié)議,所謂流,就是沒有界限的一串數(shù)據(jù)??梢韵胂鬄楹恿髦械乃]有分界線。TCP底層并不了解上層業(yè)務(wù)數(shù)據(jù)的具體含義,它會根據(jù)TCP緩沖區(qū)的實際情況進行包的劃分,所以在業(yè)務(wù)上認為,一個完整的包可能會被TCP拆分成多個包進行發(fā)送,也有可能把多個小的包封裝成一個大的數(shù)據(jù)包發(fā)送,這就是所謂的TCP粘包和拆包問題。
TCP粘包拆包問題示例圖:

假設(shè)客戶端分別發(fā)送了兩個數(shù)據(jù)包D1和D2給服務(wù)端,由于服務(wù)端一次讀取到的字節(jié)數(shù)是不確定的,可能存在以下4中情況。
    1、服務(wù)端分兩次讀取到了兩個獨立的數(shù)據(jù)包,分別是D1和D2,沒有粘包和拆包;
    2、服務(wù)端一次接收到了兩個數(shù)據(jù)包,D1和D2粘合在一起,被稱為TCP粘包;
    3、服務(wù)端分兩次讀取到了兩個數(shù)據(jù)包,第一次讀取到了完整的D1包和D2包的部分內(nèi)容,第二次讀取到了D2包的剩余部分內(nèi)容,這被稱為TCP拆包;
    4、服務(wù)端分兩次讀取到了兩個數(shù)據(jù)包,第一次讀取到了D1包的部分內(nèi)容D1_1,第二次讀取到了D1包的剩余內(nèi)容D1_1和D2包的完整內(nèi)容;
如果此時服務(wù)器TCP接收滑窗非常小,而數(shù)據(jù)包D1和D2比較大,很有可能發(fā)生第五種情況,既服務(wù)端分多次才能將D1和D2包接收完全,期間發(fā)生多次拆包;

問題的解決策略
由于底層的TCP無法理解上層的業(yè)務(wù)數(shù)據(jù),所以在底層是無法保證數(shù)據(jù)包不被拆分和重組的,這個問題只能通過上層的應(yīng)用協(xié)議棧設(shè)計來解決,根據(jù)業(yè)界的主流協(xié)議的解決方案可歸納如下:
1、消息定長,例如每個報文的大小為固定長度200字節(jié),如果不夠,空位補空格;
2、在包尾增加回車換行符進行分割,例如FTP協(xié)議;
3、將消息分為消息頭和消息體,消息頭中包含消息總長度(或消息體總長度)的字段,通常設(shè)計思路為消息頭的第一個字段使用int32來表示消息的總程度;
4、更復(fù)雜的應(yīng)用層協(xié)議;

LineBasedFrameDecoder
為了解決TCP粘包/拆包導(dǎo)致的半包讀寫問題,Netty默認提供了多種編解碼器用于處理半包。
LinkeBasedFrameDecoder的工作原理是它一次遍歷ByteBuf中的可讀字節(jié),判斷看是否有“\n”、“\r\n”,如果有,就一次位置為結(jié)束位置,從可讀索引到結(jié)束位置區(qū)間的字節(jié)就組成一行。它是以換行符為結(jié)束標志的編解碼,支持攜帶結(jié)束符或者不攜帶結(jié)束符兩種解碼方式,同事支持配置單行的最大長度。如果連續(xù)讀取到最大長度后任然沒有發(fā)現(xiàn)換行符,就會拋出異常,同時忽略掉之前讀到的異常碼流。

DelimiterBasedFrameDecoder
實現(xiàn)自定義分隔符作為消息的結(jié)束標志,完成解碼。

FixedLengthFrameDecoder
是固定長度解碼器,能夠按照指定的長度對消息進行自動解碼,開發(fā)者不需要考慮TCP的粘包/拆包問題。

Netty高性能之道
1、異步非阻塞通信
在IO編程過程中,當需要同時處理多個客戶端接入請求時,可以利用多線程或者IO多路復(fù)用技術(shù)進行處理。IO多路復(fù)用技術(shù)通過把多個IO的阻塞復(fù)用到同一個Selector的阻塞上,從而使得系統(tǒng)在單線程的情況下可以同時處理多個客戶端請求。與傳統(tǒng)的多線程/多進程模型相比,IO多路復(fù)用的最大優(yōu)勢是系統(tǒng)開銷小,系統(tǒng)不需要創(chuàng)建新的額外進程或者線程,也不需要維護這些進程和線程的運行,降低了系統(tǒng)的維護工作量,節(jié)省了系統(tǒng)資源。
Netty的IO線程NioEventLoop由于聚合了多路復(fù)用器Selector,可以同時并發(fā)處理成百上千個客戶端SocketChannel。由于讀寫操作都是非阻塞的,這就可以充分提升IO線程的運行效率,避免由頻繁的IO阻塞導(dǎo)致的線程掛起。另外,由于Netty采用了異步通信模式,一個IO線程可以并發(fā)處理N個客戶端連接和讀寫操作,這從根本上解決了傳統(tǒng)同步阻塞IO中 一連接一線程模型,架構(gòu)的性能、彈性伸縮能力和可靠性都得到了極大的提升。

2、高效的Reactor線程模型
常用的Reactor線程模型有三種,分別如下:
Reactor單線程模型;

Reactor多線程模型;

    3、主從Reactor多線程模型;


    Reactor單線程模型,指的是所有的IO操作都在同一個NIO線程上面完成,NIO線程職責如下:
    1、作為NIO服務(wù)端,接收客戶端的TCP連接;
    2、作為NIO客戶端,向服務(wù)端發(fā)起TCP連接;
    3、讀取通信對端的請求或者應(yīng)答消息;
    4、向通信對端發(fā)送請求消息或者應(yīng)答消息;
    由于Reactor模式使用的是異步非阻塞IO,所有的IO操作都不會導(dǎo)致阻塞,理論上一個線程可以獨立處理所有IO相關(guān)操作。從架構(gòu)層面看,一個NIO線程確實可以完成其承擔的職責。例如,通過Acceptor接收客戶端的TCP連接請求消息,鏈路建立成功之后,通過Dispatch將對應(yīng)的ByteBuffer派發(fā)到指定的Handler上進行消息編碼。用戶Handler可以通過NIO線程將消息發(fā)送給客戶端。
    對于一些小容量應(yīng)用場景,可以使用單線程模型,但是對于高負載、大并發(fā)的應(yīng)用卻不合適,主要原因如下:
    1、一個NIO線程同時處理成百上千的鏈路,性能上無法支撐。幾遍NIO線程的CPU負荷達到100%,也無法滿足海量消息的編碼、解碼、讀取和發(fā)送;
    2、當NIO線程負載過重后,處理速度將變慢,這會導(dǎo)致大量客戶端連接超時,超時之后往往會進行重發(fā),這更加重了NIO線程的負載,最終會導(dǎo)致大量消息積壓和處理超時,NIO線程會成為系統(tǒng)的性能瓶頸;
    3、可靠性問題。一旦NIO線程意外進入死循環(huán),會導(dǎo)致整個系統(tǒng)通信模塊不可用,不能接收和處理外部消息,造成節(jié)點故障。
    為了解決這些問題,從而演進出了Reactor多線程模型。

    Reactor多線程模型與單線程模型最大的區(qū)別就是有一組NIO線程處理IO操作,特點如下:
    1、有一個專門的NIO線程——Acceptor線程用于監(jiān)聽服務(wù)端,接收客戶端TCP連接請求;
    2、網(wǎng)絡(luò)IO操作——讀、寫等由一個NIO線程池負責,線程池可以采用標準的JDK線程池實現(xiàn),它包含一個任務(wù)隊列和N個可用的線程,由這些NIO線程負責消息的讀取、編碼、解碼和發(fā)送;
    3、1個NIO線程可以同時處理N條鏈路,但是1個鏈路只對應(yīng)1個NIO線程,防止發(fā)生并發(fā)操作問題。
    在絕大多數(shù)場景下,Reactor多線程模型都可以滿足性能需求;但是,在極特殊應(yīng)用場景中,一個NIO線程負責監(jiān)聽和處理所有的客戶端連接可能會存在性能問題。例如百萬客戶端并發(fā)連接,或者服務(wù)端需要對客戶端的握手消息進行安全認證,認證本身非常損耗性能。在這類場景下,單獨一個Acceptor線程可能會存在性能不足問題,為了解決性能問題,產(chǎn)生了第三種Reactor線程模型——主從Reactor多線程模型。

    主從Reactor線程模型的特定是:服務(wù)端用于接收客戶端連接的不再是一個單獨的NIO線程,而是一個獨立的NIO線程池。Acceptor接收到客戶端TCP連接請求處理完成后(可能包含接入認證等),將新創(chuàng)建的SocketChannel注冊到IO線程池(subReactor線程池)的某個IO線程上,由它負責SocketChannel的讀寫和編解碼工作。Acceptor線程池只用于客戶端的登錄、握手和安全認證,一旦鏈路建立成功,就將鏈路注冊到后端subReactor線程池的IO線程上,由IO線程負責后續(xù)的IO操作。
    利用主從NIO線程模型,可以解決1個服務(wù)端監(jiān)聽線程無法有效處理所有客戶端連接的性能不足問題。Netty官方推薦使用該線程模型。它的工作流程總結(jié)如下:

1、從主線程池中隨機選擇一個Reactor線程作為Acceptor線程,用于綁定監(jiān)聽端口,接收客戶端連接;
2、Acceptor線程接收客戶端連接請求之后,創(chuàng)建新的SocketChannel,將其注冊到主線程池的其他Reactor線程上,由其負責接入認證、IP黑白名單過濾、握手等操作;
3、然后也業(yè)務(wù)層的鏈路正式建立成功,將SocketChannel從主線程池的Reactor線程的多路復(fù)用器上摘除,重新注冊到Sub線程池的線程上,用于處理IO的讀寫操作。

3、無鎖化的串行設(shè)計
在大多數(shù)場景下,并行多線程處理可以提升系統(tǒng)的并發(fā)性能。但是,如果對于共享資源的并發(fā)訪問處理不當,會帶來嚴重的鎖競爭,這最終會導(dǎo)致性能的下降。為了盡可能地避免鎖競爭帶來的性能損耗,可以通過串行化設(shè)計,既消息的處理盡可能在同一個線程內(nèi)完成,期間不進行線程切換,這樣就避免了多線程競爭和同步鎖。
為了盡可能提升性能,Netty采用了串行無鎖化設(shè)計,在IO線程內(nèi)部進行串行操作,避免多線程競爭導(dǎo)致的性能下降。表面上看,串行化設(shè)計似乎CPU利用率不高,并發(fā)程度不夠。但是,通過調(diào)整NIO線程池的線程參數(shù),可以同時啟動多個串行化的線程并行運行,這種局部無鎖化的串行線程設(shè)計相比一個隊列——多個工作線程模型性能更優(yōu)。
Netty串行化設(shè)計工作原理圖如下:

    Netty的NioEventLoop讀取到消息后,直接調(diào)用ChannelPipeline的fireChannelRead(Object msg),只要用戶不主動切換線程,一直會由NioEventLoop調(diào)用到用戶的Handler,期間不進行線程切換。這種串行化處理方式避免了多線程導(dǎo)致的鎖競爭,從性能角度看是最優(yōu)的。

4、高效的并發(fā)編程
Netty中高效并發(fā)編程主要體現(xiàn):
1、volatile的大量、正確使用;
2、CAS和原子類的廣泛使用;
3、線程安全容器的使用;
4、通過讀寫鎖提升并發(fā)性能。

5、高性能的序列化框架
影響序列化性能的關(guān)鍵因素總結(jié)如下:
1、序列化后的碼流大小(網(wǎng)絡(luò)寬帶的占用);
2、序列化與反序列化的性能(CPU資源占用);
3、是否支持跨語言(異構(gòu)系統(tǒng)的對接和開發(fā)語言切換)。
Netty默認提供了對GoogleProtobuf的支持,通過擴展Netty的編解碼接口,用戶可以實現(xiàn)其他的高性能序列化框架

6、零拷貝
Netty的“零拷貝”主要體現(xiàn)在三個方面:
1)、Netty的接收和發(fā)送ByteBuffer采用DIRECT BUFFERS,使用堆外直接內(nèi)存進行Socket讀寫,不需要進行字節(jié)緩沖區(qū)的二次拷貝。如果使用傳統(tǒng)的堆內(nèi)存(HEAP BUFFERS)進行Socket讀寫,JVM會將堆內(nèi)存Buffer拷貝一份到直接內(nèi)存中,然后才寫入Socket中。相比于堆外直接內(nèi)存,消息在發(fā)送過程中多了一次緩沖區(qū)的內(nèi)存拷貝。
2)、第二種“零拷貝 ”的實現(xiàn)CompositeByteBuf,它對外將多個ByteBuf封裝成一個ByteBuf,對外提供統(tǒng)一封裝后的ByteBuf接口。
3)、第三種“零拷貝”就是文件傳輸,Netty文件傳輸類DefaultFileRegion通過transferTo方法將文件發(fā)送到目標Channel中。很多操作系統(tǒng)直接將文件緩沖區(qū)的內(nèi)容發(fā)送到目標Channel中,而不需要通過循環(huán)拷貝的方式,這是一種更加高效的傳輸方式,提升了傳輸性能,降低了CPU和內(nèi)存占用,實現(xiàn)了文件傳輸?shù)摹傲憧截悺薄?/p>

7、內(nèi)存池
隨著JVM虛擬機和JIT即時編譯技術(shù)的發(fā)展,對象的分配和回收是個非常輕量級的工作。但是對于緩沖區(qū)Buffer,情況卻稍有不同,特別是對于堆外直接內(nèi)存的分配和回收,是一件耗時的操作。為了盡量重用緩沖區(qū),Netty提供了基于內(nèi)存池的緩沖區(qū)重用機制。

8、靈活的TCP參數(shù)配置能力
Netty在啟動輔助類中可以靈活的配置TCP參數(shù),滿足不同的用戶場景。合理設(shè)置TCP參數(shù)在某些場景下對于性能的提升可以起到的顯著的效果,總結(jié)一下對性能影響比較大的幾個配置項:
1)、SO_RCVBUF和SO_SNDBUF:通常建議值為128KB或者256KB;
2)、SO_TCPNODELAY:NAGLE算法通過將緩沖區(qū)內(nèi)的小封包自動相連,組成較大的封包,阻止大量小封包的發(fā)送阻塞網(wǎng)絡(luò),從而提高網(wǎng)絡(luò)應(yīng)用效率。但是對于時延敏感的應(yīng)用場景需要關(guān)閉該優(yōu)化算法;
3)、軟中斷:如果Linux內(nèi)核版本支持RPS(2.6.35以上版本),開啟RPS后可以實現(xiàn)軟中斷,提升網(wǎng)絡(luò)吞吐量。RPS根據(jù)數(shù)據(jù)包的源地址,目的地址以及目的和源端口,計算出一個hash值,然后根據(jù)這個hash值來選擇軟中斷運行的CPU。從上層來看,也就是說將每個連接和CPU綁定,并通過這個hash值,來均衡軟中斷在多個CPU上,提升網(wǎng)絡(luò)并行處理性能。

Nio中的重要概念
FileChannel:

從文件中讀寫數(shù)據(jù),無法設(shè)置為非阻塞
public class Demo {
   public static void main(String[] args) {
       ByteBuffer buffer = ByteBuffer.allocate(2048);
       byte[] bb = new byte[2048];
       try {
           FileInputStream fis = new FileInputStream("D:\\vipkid\\bak\\pangolin-service\\src\\main\\resources\\data\\file.txt");
           FileChannel fc = fis.getChannel();
           long timeStar = System.currentTimeMillis();
           fc.read(buffer);
           buffer.flip();
           byte[] chars=buffer.array();
           String txt=new String(chars);
           System.out.println(txt);
           long endTime = System.currentTimeMillis();
           System.out.println("read time:" + (endTime - timeStar) + "ms");
/// 或者下面這種方法           
RandomAccessFile file = new RandomAccessFile("D:\\vipkid\\bak\\pangolin-service\\src\\main\\resources\\data\\file.txt", "rw");
           FileChannel channel = file.getChannel();
           ByteBuffer byteBuffer=ByteBuffer.allocate(2048);
           channel.read(buffer);
           String msg=new String(buffer.array());
           System.out.println(msg);
       } catch (FileNotFoundException e) {
           e.printStackTrace();
       } catch (IOException e) {
           e.printStackTrace();
       }
   }
}

DatagramChannel:
通過udp讀寫網(wǎng)絡(luò)中的數(shù)據(jù)
SocketChannel:

通過tcp讀取網(wǎng)絡(luò)中的數(shù)據(jù),可以設(shè)置為非阻塞
打開方式
SocketChannel socket=SocketChannel.open();
socket.connect(new InetSocketAddress("localhost",8080));
從SocketChannel上讀取數(shù)據(jù):
ByteBuffer buffer=ByteBuffer.allocate(2048);
buffer.clear();
int count=socket.read(buffer);//count標識讀了多少字節(jié)
寫數(shù)據(jù):
String txt="vipkid has many bugs";
buffer.clear();
while(buffer.hasRemaining()){
  socket.write(buffer);
}

非阻塞模式+connect
socket.configureBlocking(false);
socket.connect(new InetSocketAddress("127.0.0.1",8080));
if(socket.isConnectable()){
  if(socket.isConnectionPending()){
  if(socket.finishConnect()){
    //do stm
    }
  }
}

ServerSocketChannel:

監(jiān)聽網(wǎng)絡(luò)中的一個連接請求,為請求生成一個SocketChannel;
打開:ServerSocketChannel serverChannel=ServerSocketChannel.open();
serverChannel.socket().bind(new InetSocketAddress(9999));
serverChannel.configureBlocking(false);//非阻塞模式
while(true){
  serverChannel.accept();//阻塞直到一個連接請求到來
}

緩沖區(qū)類型:

ByteBuffer
CharBuffer、IntBuffer、LongBuffer、ShortBuffer
常用方法:
get() 從緩沖區(qū)中獲取數(shù)據(jù)
put() 往緩沖區(qū)中添加數(shù)據(jù)
read() 從channel中讀取數(shù)據(jù)到buffer
write() 往channel中寫入數(shù)據(jù)
capacity - 緩沖區(qū)大小,無論是讀模式還是寫模式,此屬性值不會變;
position - 寫數(shù)據(jù)時,position表示當前寫的位置,每寫一個數(shù)據(jù),會向下移動一個數(shù)據(jù)單元,初始為0;最大為capacity - 1
切換到讀模式時,position會被置為0,表示當前讀的位置
limit - 寫模式下,limit 相當于capacity 表示最多可以寫多少數(shù)據(jù),切換到讀模式時,limit 等于原先的position,表示最多可以讀多少數(shù)據(jù)。
MappedByteBuffer
內(nèi)存映射文件和之前說的 標準IO操作最大的不同之處就在于它雖然最終也是要從磁盤讀取數(shù)據(jù),
但是它并不需要將數(shù)據(jù)讀取到OS內(nèi)核緩沖區(qū),而是直接將進程的用戶私有地址空間中的一部分區(qū)
域與文件對象建立起映射關(guān)系,就好像直接從內(nèi)存中讀、寫文件一樣,速度當然快了(省去了把數(shù)據(jù)拷貝到OS內(nèi)核緩沖區(qū))
MappedByteBuffer 將文件直接映射到內(nèi)存(這里的內(nèi)存指的是虛擬內(nèi)存,并不是物理內(nèi)存)
代碼示例:
public class MappedByteBufferDemo {
    public static void main(String[] args) {
        try {
            RandomAccessFile file=new RandomAccessFile("/Users/penny/code/vip/from.txt","rw");
            FileChannel channel=file.getChannel();
            MappedByteBuffer buffer=channel.map(FileChannel.MapMode.READ_WRITE,0,file.length());
            byte[] bytes=new byte[(int)file.length()];
            buffer.get(bytes);
            System.out.println(new String(bytes));
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}

下面為基于NIO實現(xiàn)的客戶端代碼:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class Client implements Runnable{
    private BlockingQueue<String> words;
    private Random random;
       
    public static void main(String[] args) {        
        //種多個線程發(fā)起Socket客戶端連接請求  
        for(int i=0; i<1; i++){
            Client c = new Client();  
            c.init();  
            new Thread(c).start();  
        }        
    }  
   
    @Override  
    public void run() {       
        SocketChannel channel = null;
        Selector selector = null;
        try {  
            channel = SocketChannel.open();  
            channel.configureBlocking(false);  
            //請求連接  
            channel.connect(new InetSocketAddress("localhost", 8383));
            selector = Selector.open();  
            channel.register(selector, SelectionKey.OP_CONNECT);
            boolean isOver = false;  
               
            while(! isOver){  
                selector.select();  
                Iterator ite = selector.selectedKeys().iterator();
                while(ite.hasNext()){  
                    SelectionKey key = (SelectionKey) ite.next();  
                    ite.remove();  
                       
                    if(key.isConnectable()){  
                        //連接操作是否在該通道上執(zhí)行
                        if(channel.isConnectionPending()){  
                            if(channel.finishConnect()){  
                                //只有當連接成功后才能注冊O(shè)P_READ事件  
                                key.interestOps(SelectionKey.OP_READ);  
                                   
                                channel.write(CharsetHelper.encode(CharBuffer.wrap(getWord())));
                                sleep();  
                            }  
                            else{  
                                key.cancel();  
                            }  
                        }                                                
                    }  
                    else if(key.isReadable()){  
                        ByteBuffer byteBuffer = ByteBuffer.allocate(128);
                        channel.read(byteBuffer);  
                        byteBuffer.flip();  
                        CharBuffer charBuffer = CharsetHelper.decode(byteBuffer);  
                        String answer = charBuffer.toString();   
                        System.out.println(Thread.currentThread().getId() + "---" + answer);  
                           
                        String word = getWord();  
                        if(word != null){  
                            channel.write(CharsetHelper.encode(CharBuffer.wrap(word)));  
                        }  
                        else{  
                            isOver = true;  
                        }  
                        sleep();                         
                    }  
                }  
            }                            
        } catch (IOException e) {
            e.printStackTrace();  
        }  
        finally{  
            if(channel != null){  
                try {  
                    channel.close();  
                } catch (IOException e) {                        
                    e.printStackTrace();  
                }                    
            }  
               
            if(selector != null){  
                try {  
                    selector.close();  
                } catch (IOException e) {  
                    e.printStackTrace();  
                }  
            }  
        }  
    }  
   
    private void init() {  
        words = new ArrayBlockingQueue<String>(5);
        try {  
            words.put("hi");  
            words.put("who");  
            words.put("what");  
            words.put("where");  
            words.put("bye");  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }    
           
        random = new Random();  
    }  
       
    private String getWord(){  
        return words.poll();  
    }  
   
    private void sleep() {  
        try {  
            TimeUnit.SECONDS.sleep(random.nextInt(3));
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
    }    
       
    private void sleep(long l) {  
        try {  
            TimeUnit.SECONDS.sleep(l);  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
    }  
}

服務(wù)端代碼:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class XiaoNa {
    private ByteBuffer readBuffer;
    private Selector selector;
     
    public static void main(String[] args){
        XiaoNa xiaona = new XiaoNa();
        xiaona.init();
        xiaona.listen();
    }
     
    private void init(){
        readBuffer = ByteBuffer.allocate(1024);
        ServerSocketChannel servSocketChannel;
         
        try {
            servSocketChannel = ServerSocketChannel.open();
            servSocketChannel.configureBlocking(false);
            //綁定端口
            servSocketChannel.socket().bind(new InetSocketAddress(8383));
             
            selector = Selector.open();
            servSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        } catch (IOException e) {
            e.printStackTrace();
        }      
    }
 
    private void listen() {
        while(true){
            try{
                selector.select();             
                Iterator ite = selector.selectedKeys().iterator();
                 
                while(ite.hasNext()){
                    SelectionKey key = (SelectionKey) ite.next();                  
                    ite.remove();//確保不重復(fù)處理
                     
                    handleKey(key);
                }
            }
            catch(Throwable t){
                t.printStackTrace();
            }                          
        }              
    }
 
    private void handleKey(SelectionKey key)
            throws IOException, ClosedChannelException {
        SocketChannel channel = null;
         
        try{
            if(key.isAcceptable()){
                ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
                channel = serverChannel.accept();//接受連接請求
                channel.configureBlocking(false);
                channel.register(selector, SelectionKey.OP_READ);
            }
            else if(key.isReadable()){
                channel = (SocketChannel) key.channel();
                readBuffer.clear();
                /*當客戶端channel關(guān)閉后,會不斷收到read事件,但沒有消息,即read方法返回-1
                 * 所以這時服務(wù)器端也需要關(guān)閉channel,避免無限無效的處理*/              
                int count = channel.read(readBuffer);
                 
                if(count > 0){
                    //一定需要調(diào)用flip函數(shù),否則讀取錯誤數(shù)據(jù)
                    readBuffer.flip();
                    /*使用CharBuffer配合取出正確的數(shù)據(jù)
                    String question = new String(readBuffer.array());  
                    可能會出錯,因為前面readBuffer.clear();并未真正清理數(shù)據(jù)
                    只是重置緩沖區(qū)的position, limit, mark,
                    而readBuffer.array()會返回整個緩沖區(qū)的內(nèi)容。
                    decode方法只取readBuffer的position到limit數(shù)據(jù)。
                    例如,上一次讀取到緩沖區(qū)的是"where", clear后position為0,limit為 1024,
                    再次讀取“bye"到緩沖區(qū)后,position為3,limit不變,
                    flip后position為0,limit為3,前三個字符被覆蓋了,但"re"還存在緩沖區(qū)中,
                    所以 new String(readBuffer.array()) 返回 "byere",
                    而decode(readBuffer)返回"bye"。            
                    */
                    CharBuffer charBuffer = CharsetHelper.decode(readBuffer);
                    String question = charBuffer.toString();
                    System.out.println("question:"+question);
                    String answer = getAnswer(question);
                    channel.write(CharsetHelper.encode(CharBuffer.wrap(answer)));
                }
                else{
                    //這里關(guān)閉channel,因為客戶端已經(jīng)關(guān)閉channel或者異常了
                    channel.close();               
                }                      
            }
        }
        catch(Throwable t){
            t.printStackTrace();
            if(channel != null){
                channel.close();
            }
        }      
    }
     
    private String getAnswer(String question){
        String answer = null;
         
        switch(question){
        case "who":
            answer = "我是小娜\n";
            break;
        case "what":
            answer = "我是來幫你解悶的\n";
            break;
        case "where":
            answer = "我來自外太空\n";
            break;
        case "hi":
            answer = "hello\n";
            break;
        case "bye":
            answer = "88\n";
            break;
        default:
                answer = "請輸入 who, 或者what, 或者where";
        }
        return answer;
    }
}

【參考博客】
1.https://www.cnblogs.com/lyftest/p/6564547.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Java NIO(New IO)是從Java 1.4版本開始引入的一個新的IO API,可以替代標準的Java I...
    編碼前線閱讀 2,344評論 0 5
  • Java NIO(New IO)是從Java 1.4版本開始引入的一個新的IO API,可以替代標準的Java I...
    JackChen1024閱讀 7,939評論 1 143
  • 原文 先來回顧一下傳統(tǒng)的IO模式的,將傳統(tǒng)的IO模式的相關(guān)類理清楚(因為IO的類很多)。 但是,發(fā)現(xiàn)在整理的過程已...
    baby_buibui閱讀 1,521評論 2 35
  • Java NIO(New IO)是從Java 1.4版本開始引入的一個新的IO API,可以替代標準的Java I...
    zhisheng_blog閱讀 1,197評論 0 7
  • FLOWERSEAT THE GIRL 故事開始: 滴滴滴~ 睡意中的阿ken被微信消息吵醒。 “大姨媽…” ...
    Gavin是蓋文閱讀 672評論 2 3

友情鏈接更多精彩內(nèi)容