一、概述
Netty是一個Java的開源框架。提供異步的、事件驅(qū)動的網(wǎng)絡(luò)應(yīng)用程序框架和工具,用以快速開發(fā)高性能、高可靠性的網(wǎng)絡(luò)服務(wù)器和客戶端程序。
Netty是一個NIO客戶端,服務(wù)端框架。允許快速簡單的開發(fā)網(wǎng)絡(luò)應(yīng)用程序。例如:服務(wù)端和客戶端之間的協(xié)議,它簡化了網(wǎng)絡(luò)編程規(guī)范。
二、NIO開發(fā)的問題
1、NIO類庫和API復(fù)雜,使用麻煩。
2、需要具備Java多線程編程能力(涉及到Reactor模式)。
3、客戶端斷線重連、網(wǎng)絡(luò)不穩(wěn)定、半包讀寫、失敗緩存、網(wǎng)絡(luò)阻塞和異常碼流等問題處理難度非常大
4、存在部分BUG
NIO進行服務(wù)器開發(fā)的步驟:
1、創(chuàng)建ServerSocketChannel,配置為非阻塞模式;
2、綁定監(jiān)聽,配置TCP參數(shù);
3、創(chuàng)建一個獨立的IO線程,用于輪詢多路復(fù)用器Selector;
4、創(chuàng)建Selector,將之前創(chuàng)建的ServerSocketChannel注冊到Selector上,監(jiān)聽Accept事件;
5、啟動IO線程,在循環(huán)中執(zhí)行Select.select()方法,輪詢就緒的Channel;
6、當輪詢到處于就緒狀態(tài)的Channel時,需要對其進行判斷,如果是OP_ACCEPT狀態(tài),說明有新的客戶端接入,則調(diào)用ServerSocketChannel.accept()方法接受新的客戶端;
7、設(shè)置新接入的客戶端鏈路SocketChannel為非阻塞模式,配置TCP參數(shù);
8、將SocketChannel注冊到Selector上,監(jiān)聽READ事件;
9、如果輪詢的Channel為OP_READ,則說明SocketChannel中有新的準備就緒的數(shù)據(jù)包需要讀取,則構(gòu)造ByteBuffer對象,讀取數(shù)據(jù)包;
10、如果輪詢的Channel為OP_WRITE,則說明還有數(shù)據(jù)沒有發(fā)送完成,需要繼續(xù)發(fā)送。
三、Netty的優(yōu)點
1、API使用簡單,開發(fā)門檻低;
2、功能強大,預(yù)置了多種編解碼功能,支持多種主流協(xié)議;
3、定制功能強,可以通過ChannelHandler對通信框架進行靈活的擴展;
4、性能高,通過與其他業(yè)界主流的NIO框架對比,Netty綜合性能最優(yōu);
5、成熟、穩(wěn)定,Netty修復(fù)了已經(jīng)發(fā)現(xiàn)的NIO所有BUG;
6、社區(qū)活躍;
7、經(jīng)歷了很多商用項目的考驗。
粘包/拆包問題
TCP是一個“流”協(xié)議,所謂流,就是沒有界限的一串數(shù)據(jù)??梢韵胂鬄楹恿髦械乃]有分界線。TCP底層并不了解上層業(yè)務(wù)數(shù)據(jù)的具體含義,它會根據(jù)TCP緩沖區(qū)的實際情況進行包的劃分,所以在業(yè)務(wù)上認為,一個完整的包可能會被TCP拆分成多個包進行發(fā)送,也有可能把多個小的包封裝成一個大的數(shù)據(jù)包發(fā)送,這就是所謂的TCP粘包和拆包問題。
TCP粘包拆包問題示例圖:
假設(shè)客戶端分別發(fā)送了兩個數(shù)據(jù)包D1和D2給服務(wù)端,由于服務(wù)端一次讀取到的字節(jié)數(shù)是不確定的,可能存在以下4中情況。
1、服務(wù)端分兩次讀取到了兩個獨立的數(shù)據(jù)包,分別是D1和D2,沒有粘包和拆包;
2、服務(wù)端一次接收到了兩個數(shù)據(jù)包,D1和D2粘合在一起,被稱為TCP粘包;
3、服務(wù)端分兩次讀取到了兩個數(shù)據(jù)包,第一次讀取到了完整的D1包和D2包的部分內(nèi)容,第二次讀取到了D2包的剩余部分內(nèi)容,這被稱為TCP拆包;
4、服務(wù)端分兩次讀取到了兩個數(shù)據(jù)包,第一次讀取到了D1包的部分內(nèi)容D1_1,第二次讀取到了D1包的剩余內(nèi)容D1_1和D2包的完整內(nèi)容;
如果此時服務(wù)器TCP接收滑窗非常小,而數(shù)據(jù)包D1和D2比較大,很有可能發(fā)生第五種情況,既服務(wù)端分多次才能將D1和D2包接收完全,期間發(fā)生多次拆包;
問題的解決策略
由于底層的TCP無法理解上層的業(yè)務(wù)數(shù)據(jù),所以在底層是無法保證數(shù)據(jù)包不被拆分和重組的,這個問題只能通過上層的應(yīng)用協(xié)議棧設(shè)計來解決,根據(jù)業(yè)界的主流協(xié)議的解決方案可歸納如下:
1、消息定長,例如每個報文的大小為固定長度200字節(jié),如果不夠,空位補空格;
2、在包尾增加回車換行符進行分割,例如FTP協(xié)議;
3、將消息分為消息頭和消息體,消息頭中包含消息總長度(或消息體總長度)的字段,通常設(shè)計思路為消息頭的第一個字段使用int32來表示消息的總程度;
4、更復(fù)雜的應(yīng)用層協(xié)議;
LineBasedFrameDecoder
為了解決TCP粘包/拆包導(dǎo)致的半包讀寫問題,Netty默認提供了多種編解碼器用于處理半包。
LinkeBasedFrameDecoder的工作原理是它一次遍歷ByteBuf中的可讀字節(jié),判斷看是否有“\n”、“\r\n”,如果有,就一次位置為結(jié)束位置,從可讀索引到結(jié)束位置區(qū)間的字節(jié)就組成一行。它是以換行符為結(jié)束標志的編解碼,支持攜帶結(jié)束符或者不攜帶結(jié)束符兩種解碼方式,同事支持配置單行的最大長度。如果連續(xù)讀取到最大長度后任然沒有發(fā)現(xiàn)換行符,就會拋出異常,同時忽略掉之前讀到的異常碼流。
DelimiterBasedFrameDecoder
實現(xiàn)自定義分隔符作為消息的結(jié)束標志,完成解碼。
FixedLengthFrameDecoder
是固定長度解碼器,能夠按照指定的長度對消息進行自動解碼,開發(fā)者不需要考慮TCP的粘包/拆包問題。
Netty高性能之道
1、異步非阻塞通信
在IO編程過程中,當需要同時處理多個客戶端接入請求時,可以利用多線程或者IO多路復(fù)用技術(shù)進行處理。IO多路復(fù)用技術(shù)通過把多個IO的阻塞復(fù)用到同一個Selector的阻塞上,從而使得系統(tǒng)在單線程的情況下可以同時處理多個客戶端請求。與傳統(tǒng)的多線程/多進程模型相比,IO多路復(fù)用的最大優(yōu)勢是系統(tǒng)開銷小,系統(tǒng)不需要創(chuàng)建新的額外進程或者線程,也不需要維護這些進程和線程的運行,降低了系統(tǒng)的維護工作量,節(jié)省了系統(tǒng)資源。
Netty的IO線程NioEventLoop由于聚合了多路復(fù)用器Selector,可以同時并發(fā)處理成百上千個客戶端SocketChannel。由于讀寫操作都是非阻塞的,這就可以充分提升IO線程的運行效率,避免由頻繁的IO阻塞導(dǎo)致的線程掛起。另外,由于Netty采用了異步通信模式,一個IO線程可以并發(fā)處理N個客戶端連接和讀寫操作,這從根本上解決了傳統(tǒng)同步阻塞IO中 一連接一線程模型,架構(gòu)的性能、彈性伸縮能力和可靠性都得到了極大的提升。
2、高效的Reactor線程模型
常用的Reactor線程模型有三種,分別如下:
Reactor單線程模型;
Reactor多線程模型;
3、主從Reactor多線程模型;
Reactor單線程模型,指的是所有的IO操作都在同一個NIO線程上面完成,NIO線程職責如下:
1、作為NIO服務(wù)端,接收客戶端的TCP連接;
2、作為NIO客戶端,向服務(wù)端發(fā)起TCP連接;
3、讀取通信對端的請求或者應(yīng)答消息;
4、向通信對端發(fā)送請求消息或者應(yīng)答消息;
由于Reactor模式使用的是異步非阻塞IO,所有的IO操作都不會導(dǎo)致阻塞,理論上一個線程可以獨立處理所有IO相關(guān)操作。從架構(gòu)層面看,一個NIO線程確實可以完成其承擔的職責。例如,通過Acceptor接收客戶端的TCP連接請求消息,鏈路建立成功之后,通過Dispatch將對應(yīng)的ByteBuffer派發(fā)到指定的Handler上進行消息編碼。用戶Handler可以通過NIO線程將消息發(fā)送給客戶端。
對于一些小容量應(yīng)用場景,可以使用單線程模型,但是對于高負載、大并發(fā)的應(yīng)用卻不合適,主要原因如下:
1、一個NIO線程同時處理成百上千的鏈路,性能上無法支撐。幾遍NIO線程的CPU負荷達到100%,也無法滿足海量消息的編碼、解碼、讀取和發(fā)送;
2、當NIO線程負載過重后,處理速度將變慢,這會導(dǎo)致大量客戶端連接超時,超時之后往往會進行重發(fā),這更加重了NIO線程的負載,最終會導(dǎo)致大量消息積壓和處理超時,NIO線程會成為系統(tǒng)的性能瓶頸;
3、可靠性問題。一旦NIO線程意外進入死循環(huán),會導(dǎo)致整個系統(tǒng)通信模塊不可用,不能接收和處理外部消息,造成節(jié)點故障。
為了解決這些問題,從而演進出了Reactor多線程模型。
Reactor多線程模型與單線程模型最大的區(qū)別就是有一組NIO線程處理IO操作,特點如下:
1、有一個專門的NIO線程——Acceptor線程用于監(jiān)聽服務(wù)端,接收客戶端TCP連接請求;
2、網(wǎng)絡(luò)IO操作——讀、寫等由一個NIO線程池負責,線程池可以采用標準的JDK線程池實現(xiàn),它包含一個任務(wù)隊列和N個可用的線程,由這些NIO線程負責消息的讀取、編碼、解碼和發(fā)送;
3、1個NIO線程可以同時處理N條鏈路,但是1個鏈路只對應(yīng)1個NIO線程,防止發(fā)生并發(fā)操作問題。
在絕大多數(shù)場景下,Reactor多線程模型都可以滿足性能需求;但是,在極特殊應(yīng)用場景中,一個NIO線程負責監(jiān)聽和處理所有的客戶端連接可能會存在性能問題。例如百萬客戶端并發(fā)連接,或者服務(wù)端需要對客戶端的握手消息進行安全認證,認證本身非常損耗性能。在這類場景下,單獨一個Acceptor線程可能會存在性能不足問題,為了解決性能問題,產(chǎn)生了第三種Reactor線程模型——主從Reactor多線程模型。
主從Reactor線程模型的特定是:服務(wù)端用于接收客戶端連接的不再是一個單獨的NIO線程,而是一個獨立的NIO線程池。Acceptor接收到客戶端TCP連接請求處理完成后(可能包含接入認證等),將新創(chuàng)建的SocketChannel注冊到IO線程池(subReactor線程池)的某個IO線程上,由它負責SocketChannel的讀寫和編解碼工作。Acceptor線程池只用于客戶端的登錄、握手和安全認證,一旦鏈路建立成功,就將鏈路注冊到后端subReactor線程池的IO線程上,由IO線程負責后續(xù)的IO操作。
利用主從NIO線程模型,可以解決1個服務(wù)端監(jiān)聽線程無法有效處理所有客戶端連接的性能不足問題。Netty官方推薦使用該線程模型。它的工作流程總結(jié)如下:
1、從主線程池中隨機選擇一個Reactor線程作為Acceptor線程,用于綁定監(jiān)聽端口,接收客戶端連接;
2、Acceptor線程接收客戶端連接請求之后,創(chuàng)建新的SocketChannel,將其注冊到主線程池的其他Reactor線程上,由其負責接入認證、IP黑白名單過濾、握手等操作;
3、然后也業(yè)務(wù)層的鏈路正式建立成功,將SocketChannel從主線程池的Reactor線程的多路復(fù)用器上摘除,重新注冊到Sub線程池的線程上,用于處理IO的讀寫操作。
3、無鎖化的串行設(shè)計
在大多數(shù)場景下,并行多線程處理可以提升系統(tǒng)的并發(fā)性能。但是,如果對于共享資源的并發(fā)訪問處理不當,會帶來嚴重的鎖競爭,這最終會導(dǎo)致性能的下降。為了盡可能地避免鎖競爭帶來的性能損耗,可以通過串行化設(shè)計,既消息的處理盡可能在同一個線程內(nèi)完成,期間不進行線程切換,這樣就避免了多線程競爭和同步鎖。
為了盡可能提升性能,Netty采用了串行無鎖化設(shè)計,在IO線程內(nèi)部進行串行操作,避免多線程競爭導(dǎo)致的性能下降。表面上看,串行化設(shè)計似乎CPU利用率不高,并發(fā)程度不夠。但是,通過調(diào)整NIO線程池的線程參數(shù),可以同時啟動多個串行化的線程并行運行,這種局部無鎖化的串行線程設(shè)計相比一個隊列——多個工作線程模型性能更優(yōu)。
Netty串行化設(shè)計工作原理圖如下:
Netty的NioEventLoop讀取到消息后,直接調(diào)用ChannelPipeline的fireChannelRead(Object msg),只要用戶不主動切換線程,一直會由NioEventLoop調(diào)用到用戶的Handler,期間不進行線程切換。這種串行化處理方式避免了多線程導(dǎo)致的鎖競爭,從性能角度看是最優(yōu)的。
4、高效的并發(fā)編程
Netty中高效并發(fā)編程主要體現(xiàn):
1、volatile的大量、正確使用;
2、CAS和原子類的廣泛使用;
3、線程安全容器的使用;
4、通過讀寫鎖提升并發(fā)性能。
5、高性能的序列化框架
影響序列化性能的關(guān)鍵因素總結(jié)如下:
1、序列化后的碼流大小(網(wǎng)絡(luò)寬帶的占用);
2、序列化與反序列化的性能(CPU資源占用);
3、是否支持跨語言(異構(gòu)系統(tǒng)的對接和開發(fā)語言切換)。
Netty默認提供了對GoogleProtobuf的支持,通過擴展Netty的編解碼接口,用戶可以實現(xiàn)其他的高性能序列化框架
6、零拷貝
Netty的“零拷貝”主要體現(xiàn)在三個方面:
1)、Netty的接收和發(fā)送ByteBuffer采用DIRECT BUFFERS,使用堆外直接內(nèi)存進行Socket讀寫,不需要進行字節(jié)緩沖區(qū)的二次拷貝。如果使用傳統(tǒng)的堆內(nèi)存(HEAP BUFFERS)進行Socket讀寫,JVM會將堆內(nèi)存Buffer拷貝一份到直接內(nèi)存中,然后才寫入Socket中。相比于堆外直接內(nèi)存,消息在發(fā)送過程中多了一次緩沖區(qū)的內(nèi)存拷貝。
2)、第二種“零拷貝 ”的實現(xiàn)CompositeByteBuf,它對外將多個ByteBuf封裝成一個ByteBuf,對外提供統(tǒng)一封裝后的ByteBuf接口。
3)、第三種“零拷貝”就是文件傳輸,Netty文件傳輸類DefaultFileRegion通過transferTo方法將文件發(fā)送到目標Channel中。很多操作系統(tǒng)直接將文件緩沖區(qū)的內(nèi)容發(fā)送到目標Channel中,而不需要通過循環(huán)拷貝的方式,這是一種更加高效的傳輸方式,提升了傳輸性能,降低了CPU和內(nèi)存占用,實現(xiàn)了文件傳輸?shù)摹傲憧截悺薄?/p>
7、內(nèi)存池
隨著JVM虛擬機和JIT即時編譯技術(shù)的發(fā)展,對象的分配和回收是個非常輕量級的工作。但是對于緩沖區(qū)Buffer,情況卻稍有不同,特別是對于堆外直接內(nèi)存的分配和回收,是一件耗時的操作。為了盡量重用緩沖區(qū),Netty提供了基于內(nèi)存池的緩沖區(qū)重用機制。
8、靈活的TCP參數(shù)配置能力
Netty在啟動輔助類中可以靈活的配置TCP參數(shù),滿足不同的用戶場景。合理設(shè)置TCP參數(shù)在某些場景下對于性能的提升可以起到的顯著的效果,總結(jié)一下對性能影響比較大的幾個配置項:
1)、SO_RCVBUF和SO_SNDBUF:通常建議值為128KB或者256KB;
2)、SO_TCPNODELAY:NAGLE算法通過將緩沖區(qū)內(nèi)的小封包自動相連,組成較大的封包,阻止大量小封包的發(fā)送阻塞網(wǎng)絡(luò),從而提高網(wǎng)絡(luò)應(yīng)用效率。但是對于時延敏感的應(yīng)用場景需要關(guān)閉該優(yōu)化算法;
3)、軟中斷:如果Linux內(nèi)核版本支持RPS(2.6.35以上版本),開啟RPS后可以實現(xiàn)軟中斷,提升網(wǎng)絡(luò)吞吐量。RPS根據(jù)數(shù)據(jù)包的源地址,目的地址以及目的和源端口,計算出一個hash值,然后根據(jù)這個hash值來選擇軟中斷運行的CPU。從上層來看,也就是說將每個連接和CPU綁定,并通過這個hash值,來均衡軟中斷在多個CPU上,提升網(wǎng)絡(luò)并行處理性能。
Nio中的重要概念
FileChannel:
從文件中讀寫數(shù)據(jù),無法設(shè)置為非阻塞
public class Demo {
public static void main(String[] args) {
ByteBuffer buffer = ByteBuffer.allocate(2048);
byte[] bb = new byte[2048];
try {
FileInputStream fis = new FileInputStream("D:\\vipkid\\bak\\pangolin-service\\src\\main\\resources\\data\\file.txt");
FileChannel fc = fis.getChannel();
long timeStar = System.currentTimeMillis();
fc.read(buffer);
buffer.flip();
byte[] chars=buffer.array();
String txt=new String(chars);
System.out.println(txt);
long endTime = System.currentTimeMillis();
System.out.println("read time:" + (endTime - timeStar) + "ms");
/// 或者下面這種方法
RandomAccessFile file = new RandomAccessFile("D:\\vipkid\\bak\\pangolin-service\\src\\main\\resources\\data\\file.txt", "rw");
FileChannel channel = file.getChannel();
ByteBuffer byteBuffer=ByteBuffer.allocate(2048);
channel.read(buffer);
String msg=new String(buffer.array());
System.out.println(msg);
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
DatagramChannel:
通過udp讀寫網(wǎng)絡(luò)中的數(shù)據(jù)
SocketChannel:
通過tcp讀取網(wǎng)絡(luò)中的數(shù)據(jù),可以設(shè)置為非阻塞
打開方式
SocketChannel socket=SocketChannel.open();
socket.connect(new InetSocketAddress("localhost",8080));
從SocketChannel上讀取數(shù)據(jù):
ByteBuffer buffer=ByteBuffer.allocate(2048);
buffer.clear();
int count=socket.read(buffer);//count標識讀了多少字節(jié)
寫數(shù)據(jù):
String txt="vipkid has many bugs";
buffer.clear();
while(buffer.hasRemaining()){
socket.write(buffer);
}
非阻塞模式+connect
socket.configureBlocking(false);
socket.connect(new InetSocketAddress("127.0.0.1",8080));
if(socket.isConnectable()){
if(socket.isConnectionPending()){
if(socket.finishConnect()){
//do stm
}
}
}
ServerSocketChannel:
監(jiān)聽網(wǎng)絡(luò)中的一個連接請求,為請求生成一個SocketChannel;
打開:ServerSocketChannel serverChannel=ServerSocketChannel.open();
serverChannel.socket().bind(new InetSocketAddress(9999));
serverChannel.configureBlocking(false);//非阻塞模式
while(true){
serverChannel.accept();//阻塞直到一個連接請求到來
}
緩沖區(qū)類型:
ByteBuffer
CharBuffer、IntBuffer、LongBuffer、ShortBuffer
常用方法:
get() 從緩沖區(qū)中獲取數(shù)據(jù)
put() 往緩沖區(qū)中添加數(shù)據(jù)
read() 從channel中讀取數(shù)據(jù)到buffer
write() 往channel中寫入數(shù)據(jù)
capacity - 緩沖區(qū)大小,無論是讀模式還是寫模式,此屬性值不會變;
position - 寫數(shù)據(jù)時,position表示當前寫的位置,每寫一個數(shù)據(jù),會向下移動一個數(shù)據(jù)單元,初始為0;最大為capacity - 1
切換到讀模式時,position會被置為0,表示當前讀的位置
limit - 寫模式下,limit 相當于capacity 表示最多可以寫多少數(shù)據(jù),切換到讀模式時,limit 等于原先的position,表示最多可以讀多少數(shù)據(jù)。
MappedByteBuffer
內(nèi)存映射文件和之前說的 標準IO操作最大的不同之處就在于它雖然最終也是要從磁盤讀取數(shù)據(jù),
但是它并不需要將數(shù)據(jù)讀取到OS內(nèi)核緩沖區(qū),而是直接將進程的用戶私有地址空間中的一部分區(qū)
域與文件對象建立起映射關(guān)系,就好像直接從內(nèi)存中讀、寫文件一樣,速度當然快了(省去了把數(shù)據(jù)拷貝到OS內(nèi)核緩沖區(qū))
MappedByteBuffer 將文件直接映射到內(nèi)存(這里的內(nèi)存指的是虛擬內(nèi)存,并不是物理內(nèi)存)
代碼示例:
public class MappedByteBufferDemo {
public static void main(String[] args) {
try {
RandomAccessFile file=new RandomAccessFile("/Users/penny/code/vip/from.txt","rw");
FileChannel channel=file.getChannel();
MappedByteBuffer buffer=channel.map(FileChannel.MapMode.READ_WRITE,0,file.length());
byte[] bytes=new byte[(int)file.length()];
buffer.get(bytes);
System.out.println(new String(bytes));
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}
下面為基于NIO實現(xiàn)的客戶端代碼:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Random;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class Client implements Runnable{
private BlockingQueue<String> words;
private Random random;
public static void main(String[] args) {
//種多個線程發(fā)起Socket客戶端連接請求
for(int i=0; i<1; i++){
Client c = new Client();
c.init();
new Thread(c).start();
}
}
@Override
public void run() {
SocketChannel channel = null;
Selector selector = null;
try {
channel = SocketChannel.open();
channel.configureBlocking(false);
//請求連接
channel.connect(new InetSocketAddress("localhost", 8383));
selector = Selector.open();
channel.register(selector, SelectionKey.OP_CONNECT);
boolean isOver = false;
while(! isOver){
selector.select();
Iterator ite = selector.selectedKeys().iterator();
while(ite.hasNext()){
SelectionKey key = (SelectionKey) ite.next();
ite.remove();
if(key.isConnectable()){
//連接操作是否在該通道上執(zhí)行
if(channel.isConnectionPending()){
if(channel.finishConnect()){
//只有當連接成功后才能注冊O(shè)P_READ事件
key.interestOps(SelectionKey.OP_READ);
channel.write(CharsetHelper.encode(CharBuffer.wrap(getWord())));
sleep();
}
else{
key.cancel();
}
}
}
else if(key.isReadable()){
ByteBuffer byteBuffer = ByteBuffer.allocate(128);
channel.read(byteBuffer);
byteBuffer.flip();
CharBuffer charBuffer = CharsetHelper.decode(byteBuffer);
String answer = charBuffer.toString();
System.out.println(Thread.currentThread().getId() + "---" + answer);
String word = getWord();
if(word != null){
channel.write(CharsetHelper.encode(CharBuffer.wrap(word)));
}
else{
isOver = true;
}
sleep();
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
finally{
if(channel != null){
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
if(selector != null){
try {
selector.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
private void init() {
words = new ArrayBlockingQueue<String>(5);
try {
words.put("hi");
words.put("who");
words.put("what");
words.put("where");
words.put("bye");
} catch (InterruptedException e) {
e.printStackTrace();
}
random = new Random();
}
private String getWord(){
return words.poll();
}
private void sleep() {
try {
TimeUnit.SECONDS.sleep(random.nextInt(3));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
private void sleep(long l) {
try {
TimeUnit.SECONDS.sleep(l);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
服務(wù)端代碼:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class XiaoNa {
private ByteBuffer readBuffer;
private Selector selector;
public static void main(String[] args){
XiaoNa xiaona = new XiaoNa();
xiaona.init();
xiaona.listen();
}
private void init(){
readBuffer = ByteBuffer.allocate(1024);
ServerSocketChannel servSocketChannel;
try {
servSocketChannel = ServerSocketChannel.open();
servSocketChannel.configureBlocking(false);
//綁定端口
servSocketChannel.socket().bind(new InetSocketAddress(8383));
selector = Selector.open();
servSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
}
private void listen() {
while(true){
try{
selector.select();
Iterator ite = selector.selectedKeys().iterator();
while(ite.hasNext()){
SelectionKey key = (SelectionKey) ite.next();
ite.remove();//確保不重復(fù)處理
handleKey(key);
}
}
catch(Throwable t){
t.printStackTrace();
}
}
}
private void handleKey(SelectionKey key)
throws IOException, ClosedChannelException {
SocketChannel channel = null;
try{
if(key.isAcceptable()){
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
channel = serverChannel.accept();//接受連接請求
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
}
else if(key.isReadable()){
channel = (SocketChannel) key.channel();
readBuffer.clear();
/*當客戶端channel關(guān)閉后,會不斷收到read事件,但沒有消息,即read方法返回-1
* 所以這時服務(wù)器端也需要關(guān)閉channel,避免無限無效的處理*/
int count = channel.read(readBuffer);
if(count > 0){
//一定需要調(diào)用flip函數(shù),否則讀取錯誤數(shù)據(jù)
readBuffer.flip();
/*使用CharBuffer配合取出正確的數(shù)據(jù)
String question = new String(readBuffer.array());
可能會出錯,因為前面readBuffer.clear();并未真正清理數(shù)據(jù)
只是重置緩沖區(qū)的position, limit, mark,
而readBuffer.array()會返回整個緩沖區(qū)的內(nèi)容。
decode方法只取readBuffer的position到limit數(shù)據(jù)。
例如,上一次讀取到緩沖區(qū)的是"where", clear后position為0,limit為 1024,
再次讀取“bye"到緩沖區(qū)后,position為3,limit不變,
flip后position為0,limit為3,前三個字符被覆蓋了,但"re"還存在緩沖區(qū)中,
所以 new String(readBuffer.array()) 返回 "byere",
而decode(readBuffer)返回"bye"。
*/
CharBuffer charBuffer = CharsetHelper.decode(readBuffer);
String question = charBuffer.toString();
System.out.println("question:"+question);
String answer = getAnswer(question);
channel.write(CharsetHelper.encode(CharBuffer.wrap(answer)));
}
else{
//這里關(guān)閉channel,因為客戶端已經(jīng)關(guān)閉channel或者異常了
channel.close();
}
}
}
catch(Throwable t){
t.printStackTrace();
if(channel != null){
channel.close();
}
}
}
private String getAnswer(String question){
String answer = null;
switch(question){
case "who":
answer = "我是小娜\n";
break;
case "what":
answer = "我是來幫你解悶的\n";
break;
case "where":
answer = "我來自外太空\n";
break;
case "hi":
answer = "hello\n";
break;
case "bye":
answer = "88\n";
break;
default:
answer = "請輸入 who, 或者what, 或者where";
}
return answer;
}
}