BIO方式使得整個處理過程和連接時綁定,只要連接建立,無論客戶端是否有消息,都要進行等待處理,一定程度上浪費了服務(wù)器端的硬件資源。
Java對于NIO的支持是通過channel和selector方式來實現(xiàn),采用的方法為向channel注冊感興趣的事件,然后通過selector來獲取發(fā)生了事件的Key。為實現(xiàn)TCP/IP+NIO方式的系統(tǒng)間通訊,Java提供了SocketChannel和ServerSocketChannel兩個關(guān)鍵的類,網(wǎng)絡(luò)IO的操作則改為ByteBuffer來實現(xiàn)。
Java NIO通道及通道模型:由一個專門的線程來處理所有的IO事件,并負責(zé)分發(fā)。事件驅(qū)動機制:事件到的時候觸發(fā),而不是同步的去監(jiān)視事件。
線程通訊:線程之間通過wait,notify等方式通訊。保證每次上下文切換都是有意義的。減少無謂的線程切換。
Java NIO采用了雙向通道channel進行數(shù)據(jù)傳輸,而不是單向的流stream,在通道上可以注冊我們感興趣的事件。一共有以下四種事件:
事件名對應(yīng)值
服務(wù)端接收客戶端連接事件
SelectionKey.OP_ACCEPT(16)
客戶端連接服務(wù)端事件
SelectionKey.OP_CONNECT(8)
讀事件
SelectionKey.OP_READ(1)
寫事件
SelectionKey.OP_WRITE(4)
服務(wù)端和客戶端各自維護一個管理通道的對象,我們稱之為Selector,該對象能檢測一個或多個通道channel上的事件。我們以服務(wù)端為例,如果服務(wù)端的selector注冊了事件,某時刻客戶端給服務(wù)端發(fā)送了一些數(shù)據(jù),阻塞I/O這時會調(diào)用read()方法阻塞地讀取數(shù)據(jù),而NIO服務(wù)端會在selector中添加一個讀事件。服務(wù)端的處理線程會輪詢地訪問selector,如果訪問selector時發(fā)現(xiàn)有感興趣的時間到達,則處理這些事件,如果沒有感興趣的事件到達,則處理線程會一直阻塞直到感興趣的事件到達為止。

java.nio包中主要包括以下幾個類或接口:
- Buffer 緩沖區(qū),用來臨時存放輸入或輸出數(shù)據(jù)
- Charset 用來把Unicode字符編碼和其它字符編碼互轉(zhuǎn)
- Channel 數(shù)據(jù)傳輸通道,用來把Buffer中的數(shù)據(jù)寫入到數(shù)據(jù)源,或者把數(shù)據(jù)源中的數(shù)據(jù)讀入到Buffer。
- Selector 用來支持異步I/O操作,也叫非阻塞I/O操作
nio包中主要通過下面兩個方面來提高I/O操作效率:
- 通過Buffer和Channel來提高I/O操作的速度
- 通過Selector來支持非阻塞I/O操作
與Socket和ServerSocket對應(yīng),NIO提供了SocketChannel和ServerSocketChannel對應(yīng),這兩種通道同時支持一般的阻塞模式和更高效的非阻塞模式。
客戶端通過SocketChannel.open()方法打開一個Socket通道,如果此時提供了SocketAddress參數(shù),則會自動開始連接,否則需要主動調(diào)用connect()方法連接,創(chuàng)建連接后,可以像一般的Channel一樣的用Buffer進行讀寫,這都是阻塞模式的。
服務(wù)器端通過ServerSocketChannel.open()創(chuàng)建,并使用bind()方法綁定到一個監(jiān)聽地址上,最后調(diào)用accept()方法阻塞等待客戶端連接。當(dāng)客戶端連接后會返回一個SocketChannel以實現(xiàn)與客戶端的讀寫交互。
多路復(fù)用套接字通道(Selector實現(xiàn)的非阻塞式IO)
套接字通道多路復(fù)用的思想是創(chuàng)建一個Selector,將多個通道對它進行注冊,當(dāng)套接字有關(guān)注的事件發(fā)生時,可以選出這個通道進行操作。
//創(chuàng)建一個選擇器,可用close()關(guān)閉,isOpen()表示是否處于打開狀態(tài),他不隸屬于當(dāng)前線程
Selector selector = Selector.open();
//創(chuàng)建ServerSocketChannel,并把它綁定到指定端口上
ServerSocketChannel server = ServerSocketChannle.open();
server.bind(new InetSocketAddress("127.0.0.1",7777));
//設(shè)置為非阻塞模式,這個非常重要
server.configureBlocking(false);
//在選擇器里面注冊關(guān)注這個服務(wù)器套接字通道的accept事件
//ServerSocketChannel只有OP_ACCEPT可用,OP_CONNECT,OP_READ, OP_WRITE用于SocketChannel
server.register(selector,SelectionKey.OP_ACCEPT);
while(true){
//測試等待事件發(fā)生,分為直接返回的selectNow()和阻塞等待的select(),另外也可加一個參數(shù)表示阻塞超時
//停止阻塞的方法有兩種:中斷線程和selector.wakeup(),有事件發(fā)生時,會自動的wakeup()
//方法返回為select出的事件數(shù)
//另外務(wù)必注意一個問題是,當(dāng)selector被select()阻塞時,其他的線程調(diào)用同一個selector的register也會被阻塞到select返回為止
//select操作會把發(fā)生關(guān)注事件的Key加入selectionKeys中(只管加不管減)
if(selector.select()==0){//
continue;
}
//獲取發(fā)生了關(guān)注事件的Key集合,每個SelectionKey對應(yīng)了一個注冊的通道
Set<SelectionKey>keys = selector.selectedKeys();
//多說一句selector.keys()返回所有的SelectionKey(包括沒有發(fā)生事件的)
for(SelectionKey key :keys){
//OP_ACCEPT 這個只有ServerSocketChannel才有可能觸發(fā)
if(key.isAcceptable()){
//得到與客戶端的套接字通信
SocketChannel channel = ((ServerSocketChannel) key.channel()).accept();
//同樣設(shè)置為非阻塞模式
channel.configureBlocking(false);
//同樣將與客戶端的通道在selector上注冊,OP_READ對應(yīng)可讀事件(對方有數(shù)據(jù)寫入),可以通過key獲取關(guān)聯(lián)的選擇器
channel.register(key.selector(),SelectionKey.OP_READ,ByteBuffer.allocate(1024));
}
//OP_READ 有數(shù)據(jù)可讀
if(key.isReadable()){
SocketChannel channel = (SocketChannel) key.channel();
//得到附件,就是上面SocketChannel進行register的時候的第三個參數(shù),可為隨意Object
ByteBuffer buffer = (ByteBuffer)key.attachment();
//讀數(shù)據(jù),這里就簡單寫一下,實際上應(yīng)該還是循環(huán)讀取直到讀不出來為止的
channel.read(buffer);
//改變自身關(guān)注事件,可以用位或操作|組合事件
key.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);
}
//OP_WRITE 可寫狀態(tài),這個狀態(tài)通??偸怯|發(fā)的,所以只在需要寫操作時才進行關(guān)注
if(key.isWritable()){
//寫數(shù)據(jù)掠過,可以自建buffer,也可用附件對象(看情況),主義buffer寫入后需要flip
//...
//寫完就把寫狀態(tài)關(guān)注去掉,否則會一直觸發(fā)寫事件
key.interestOps(SelectionKey.OP_READ);
}
//由于select操作只管對selectedKeys進行添加,所以key處理后我們需要從里面把key去掉
keys.remove(key);
}
服務(wù)器端程序
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.nio.channels.SocketChannel;
public class NIOServer {
//標志性數(shù)字
private static int flag = 0;
//定義緩沖區(qū)大小
private static int block = 4096;
//接收緩沖區(qū)
private static ByteBuffer receiveBuffer = ByteBuffer.allocate(block);
//發(fā)送緩沖區(qū)
private static ByteBuffer sendBuffer = ByteBuffer.allocate(block);
//定義Selector
private Selector selector;
public NIOServer(int port)throws IOException{
//打開服務(wù)器套接字通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//服務(wù)器配置為非阻塞
serverSocketChannel.configureBlocking(false);
//檢索與此服務(wù)器套接字通道關(guān)聯(lián)的套接字
ServerSocket serverSocket = serverSocketChannel.socket();
//進行服務(wù)的綁定
serverSocket.bind(new InetSocketAddress(port));
//通過open()方法找到Selector
selector = Selector.open();
//注冊到selector
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("Server Start ------- 8888:");
}
//監(jiān)聽
public void listen() throws IOException{
while(true){
//監(jiān)控所有注冊的channel,當(dāng)其中注冊的IO操作可以進行時,并將對應(yīng)的SelectionKey加入selected-key set
selector.select();
//Selected-key set 代表了所有通過select()方法監(jiān)測到可以進行IO操作發(fā)的channel,這個集合可以通過selectedKeys()拿到
Set<SelectionKey>selectionKeys=selector.selectedKeys();
Iterator<SelectionKey>iterator = selectionKeys.iterator();
while(iterator.hasNext()){
SelectionKey selectionKey = iterator.next();
handleKey(selectionKey);
iterator.remove();
}
}
}
//處理請求
public void handleKey(SelectionKey selectionKey)throws IOException{
//接受請求
ServerSocketChannel serverSocketChannel = null;
SocketChannel socketChannel = null;
String receiveText;
String sendText;
int count;
//測試此鍵的通道是否準備好接受新的套接字連接
if(selectionKey.isAcceptable()){
//返回創(chuàng)建此鍵的通道
serverSocketChannel = (ServerSocketChannel)selectionKey.channel();
//接受客戶端建立連接的請求,并返回SocketChannel對象
socketChannel = serverSocketChannel.accept();
//配置為非阻塞
socketChannel.configureBlocking(false);
//注冊到selector
socketChannel.register(selector, SelectionKey.OP_READ);
}else if (selectionKey.isReadable()){
//返回為之創(chuàng)建此鍵的通道
socketChannel = (SocketChannel)selectionKey.channel();
//將緩沖區(qū)清空,以備下次讀取
receiveBuffer.clear();
//將發(fā)送來的數(shù)據(jù)讀取到緩沖區(qū)
count = socketChannel.read(receiveBuffer);
if(count>0){
receiveText = new String(receiveBuffer.array(),0,count);
System.out.println("服務(wù)器端接受到的數(shù)據(jù)————"+receiveText);
socketChannel.register(selector, SelectionKey.OP_WRITE);
}
}else if (selectionKey.isWritable()){
//將緩沖區(qū)清空以備下次寫入
sendBuffer.clear();
//返回為之創(chuàng)建此鍵的通道
socketChannel = (SocketChannel)selectionKey.channel();
sendText = "message from server--"+flag++;
//向緩沖區(qū)中輸入數(shù)據(jù)
sendBuffer.put(sendText.getBytes());
//將緩沖區(qū)各標志復(fù)位,因為向里面put了數(shù)據(jù)標志被改變要想從中讀取數(shù)據(jù)發(fā)向服務(wù)器,就要復(fù)位
sendBuffer.flip();
//輸出到通道
socketChannel.write(sendBuffer);
System.out.println("服務(wù)器端向客戶端發(fā)送數(shù)據(jù)--:"+sendText);
socketChannel.register(selector,SelectionKey.OP_READ);
}
}
public static void main(String[] args)throws IOException {
// TODO Auto-generated method stub
int port = 8888;
NIOServer server = new NIOServer(port);
server.listen();
}
}