channel 是雙向的,同時(shí)讀取和寫入,流是單向的
channel 和多路復(fù)用器結(jié)合之后,有多種狀態(tài)位,方便多路復(fù)用器(輪詢)去識(shí)別(連接狀態(tài),阻塞狀態(tài),可讀狀態(tài),可寫狀態(tài))
channel 分為倆大類,網(wǎng)絡(luò)讀寫的SelectableChannel,文件操作的FileChannel
網(wǎng)絡(luò)讀寫的SocketChannel和ServerSocketChannel是SelectableChannel的子類
SocketChannel和ServerSocketChannel 依賴于多路復(fù)用器(Selector),Selector是NIO編程的基礎(chǔ),提供選擇已經(jīng)就緒的任務(wù)的能力。簡(jiǎn)單來說,Selector會(huì)不斷地輪詢注冊(cè)在其上的Channel,如果某個(gè)Channel上面發(fā)生讀或者寫事件,這個(gè)Channel就處于就緒狀態(tài),會(huì)被Selector輪詢出來,然后通過SelectionKey可以獲取就緒Channel的集合,進(jìn)行后續(xù)的I/O操作
一個(gè)多路復(fù)用器可以負(fù)責(zé)成千上萬個(gè)Channel通道,沒有上限,這也是JDK使用了epoll代替了傳統(tǒng)的select實(shí)現(xiàn),獲得連接句柄沒有限制。這樣意味著我們只要一個(gè)線程負(fù)責(zé)Selector的輪詢,就可以接入成千上萬個(gè)客戶端,這是JDK NIO 的巨大進(jìn)步。
Selector線程就類似一個(gè)管理者M(jìn)aster,管理成千上萬個(gè)Channel,然后輪詢哪個(gè)管道的數(shù)據(jù)已經(jīng)準(zhǔn)備好,通知CPU執(zhí)行IO的讀取或?qū)懭氩僮鳌?/p>
Selector模式:當(dāng)IO事件(管道)注冊(cè)到選擇器后,Selector會(huì)分配給每個(gè)管道一個(gè)key值,相當(dāng)于標(biāo)簽。Selector選擇器是以輪詢的方式進(jìn)行查找注冊(cè)所有的IO事件。當(dāng)我們的IO事件(管道)準(zhǔn)備就緒后,select就會(huì)識(shí)別,會(huì)通過key值找到相應(yīng)的管道,進(jìn)行相關(guān)的數(shù)據(jù)處理操作(從管道里讀或?qū)憯?shù)據(jù),寫到我們的數(shù)據(jù)緩沖區(qū)Buffer去)
每個(gè)管道都會(huì)對(duì)選擇器進(jìn)行注冊(cè)到不同的事件狀態(tài),以便選擇器查找:SelectionKey.OP_CONNECT SelectionKey.OP_ACCEPT SelectionKey.OP_READ SelectionKey.OP_WRITE
io與nio比較:
- io當(dāng)客戶端多時(shí),會(huì)創(chuàng)建大量的處理線程。且每個(gè)線程都要占用??臻g和一些CPU時(shí)間
- io阻塞可能帶來頻繁的上下文切換,且大部分上下文切換可能是無意義的。
- NIO的本質(zhì)是原始的tcp建立連接使用3次握手的操作,減少連接的開銷
- IO是面向流(Stream)的,NIO是面向塊(buffer)的
- Java NIO的selectors允許一條線程去監(jiān)控多個(gè)channels的輸入,你可以向一個(gè)selector上注冊(cè)多個(gè)channel,然后調(diào)用selector的select()方法判斷是否有新的連接進(jìn)來或者已經(jīng)在selector上注冊(cè)時(shí)channel是否有數(shù)據(jù)進(jìn)入。selector的機(jī)制讓一個(gè)線程管理多個(gè)channel變得簡(jiǎn)單。(不再使用多線程處理連接)
- NIO允許你用一個(gè)單獨(dú)的線程或幾個(gè)線程管理很多個(gè)channels(網(wǎng)絡(luò)的或者文件的),代價(jià)是程序的處理和處理IO相比更加復(fù)雜
ByteBuffer buffer = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buffer);
ByteBuffer buffer = ByteBuffer.allocate(48);
int bytesRead = inChannel.read(buffer);
while(! bufferFull(bytesRead) ) {
bytesRead = inChannel.read(buffer);
}
注意第二行從channel中讀取數(shù)據(jù)到ByteBuffer,當(dāng)這個(gè)方法返回你不知道是否你需要的所有數(shù)據(jù)都被讀到buffer了,你所知道的一切就是有一些數(shù)據(jù)被讀到了buffer中,但是你并不知道具體有多少數(shù)據(jù),這使程序的處理變得稍微有些困難
想象一下,調(diào)用了read(buffer)方法后,只有半行數(shù)據(jù)被讀進(jìn)了buffer,例如:“Name: An”,你能現(xiàn)在就處理數(shù)據(jù)嗎?當(dāng)然不能。你需要等待直到至少一整行數(shù)據(jù)被讀到buffer中,在這之前確保程序不要處理buffer中的數(shù)據(jù)
你如何知道buffer中是否有足夠的數(shù)據(jù)可以被處理呢?你不知道,唯一的方法就是檢查buffer中的數(shù)據(jù)。可能你會(huì)進(jìn)行幾次無效的檢查(檢查了幾次數(shù)據(jù)都不夠進(jìn)行處理),這會(huì)令程序設(shè)計(jì)變得比較混亂復(fù)雜
bufferFull方法負(fù)責(zé)檢查有多少數(shù)據(jù)被讀到了buffer中,根據(jù)返回值是true還是false來判斷數(shù)據(jù)是否夠進(jìn)行處理。bufferFull方法掃描buffer但不能改變buffer的內(nèi)部狀態(tài)
- NIO允許你用一個(gè)單獨(dú)的線程或幾個(gè)線程管理很多個(gè)channels(網(wǎng)絡(luò)的或者文件的),代價(jià)是程序的處理和處理IO相比更加復(fù)雜,如果你需要同時(shí)管理成千上萬的連接,但是每個(gè)連接只發(fā)送少量數(shù)據(jù),例如一個(gè)聊天服務(wù)器,用NIO實(shí)現(xiàn)會(huì)更好一些,相似的,如果你需要保持很多個(gè)到其他電腦的連接,例如P2P網(wǎng)絡(luò),用一個(gè)單獨(dú)的線程來管理所有出口連接是比較合適的,如果你只有少量的連接但是每個(gè)連接都占有很高的帶寬,同時(shí)發(fā)送很多數(shù)據(jù),傳統(tǒng)的IO會(huì)更適合
- NIO實(shí)現(xiàn)步驟
- 打開多路復(fù)用器
- 打開服務(wù)器端通道
- 設(shè)置服務(wù)器通道為非阻塞模式
- 服務(wù)端通道綁定端口和地址
- 把服務(wù)器通道注冊(cè)到多路復(fù)用器上,并且監(jiān)聽阻塞事件
- 多路復(fù)用器輪詢監(jiān)聽
- 返回多路復(fù)用器已經(jīng)選擇的SelectionKey結(jié)果集
- 結(jié)果集進(jìn)行遍歷,遍歷時(shí)移除SelectionKey元素,防止重復(fù)處理
- 對(duì)selectionkey進(jìn)行isValid()有效性校驗(yàn)
- 如果selectionkey是連接狀態(tài)的,用socketchannel.finishConnect();
- 如果selectionkey是阻塞狀態(tài)的,用socketchannel.register(seletor, SelectionKey.OP_READ)可讀狀態(tài)
- 如果selectionkey是可讀狀態(tài)的,進(jìn)行讀取
- 如果selectionkey是可寫狀態(tài)的,OP_WRITE比較特殊,表示本地的寫緩沖區(qū)可用,一般只有在一次寫沒有把數(shù)據(jù)寫完的情況下需要注冊(cè)O(shè)P_WRITE,寫完后要及時(shí)關(guān)閉,否則每次循環(huán)都有可能被調(diào)用,因?yàn)閷懢彌_區(qū)在大多數(shù)情況下是始終可用的。
- 雙向通信示例
服務(wù)器端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
public class Server implements Runnable{
//1 多路復(fù)用器(管理所有的通道)
private Selector seletor;
//2 建立緩沖區(qū)
private ByteBuffer readBuf = ByteBuffer.allocate(1024);
//3
private ByteBuffer writeBuf = ByteBuffer.allocate(1024);
public Server(int port){
try {
//1 打開路復(fù)用器
this.seletor = Selector.open();
//2 打開服務(wù)器通道
ServerSocketChannel ssc = ServerSocketChannel.open();
//3 設(shè)置服務(wù)器通道為非阻塞模式
ssc.configureBlocking(false);
//4 綁定地址
ssc.bind(new InetSocketAddress(port));
//5 把服務(wù)器通道注冊(cè)到多路復(fù)用器上,并且監(jiān)聽阻塞事件
ssc.register(this.seletor, SelectionKey.OP_ACCEPT);
System.out.println("Server start, port :" + port);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
while(true){
try {
//1 必須要讓多路復(fù)用器開始監(jiān)聽
this.seletor.select();
//2 返回多路復(fù)用器已經(jīng)選擇的結(jié)果集
Iterator<SelectionKey> keys = this.seletor.selectedKeys().iterator();
//3 進(jìn)行遍歷
while(keys.hasNext()){
//4 獲取一個(gè)選擇的元素
SelectionKey key = keys.next();
//5 直接從容器中移除就可以了
keys.remove();
//6 如果是有效的
if(key.isValid()){
//7 如果為阻塞狀態(tài)
if(key.isAcceptable()){
this.accept(key);
}
//8 如果為可讀狀態(tài)
if(key.isReadable()){
this.read(key);
}
//9 寫數(shù)據(jù)
if(key.isWritable()){
}
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void read(SelectionKey key) {
try {
//1 清空緩沖區(qū)舊的數(shù)據(jù)
this.readBuf.clear();
//2 獲取之前注冊(cè)的socket通道對(duì)象
SocketChannel sc = (SocketChannel) key.channel();
//3 讀取數(shù)據(jù)
int count = sc.read(this.readBuf);
//4 如果沒有數(shù)據(jù)
if(count == -1){
key.channel().close();
key.cancel();
return;
}
//5 有數(shù)據(jù)則進(jìn)行讀取 讀取之前需要進(jìn)行復(fù)位方法(把position 和limit進(jìn)行復(fù)位)
this.readBuf.flip();
//6 根據(jù)緩沖區(qū)的數(shù)據(jù)長(zhǎng)度創(chuàng)建相應(yīng)大小的byte數(shù)組,接收緩沖區(qū)的數(shù)據(jù)
byte[] bytes = new byte[this.readBuf.remaining()];
//7 接收緩沖區(qū)數(shù)據(jù)
this.readBuf.get(bytes);
//8 打印結(jié)果
String body = new String(bytes).trim();
System.out.println("收到客戶端 : " + body);
// 9..可以寫回給客戶端數(shù)據(jù)
readBuf.flip();
sc.write(readBuf);
sc.register(this.seletor, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
private void accept(SelectionKey key) {
try {
//1 獲取服務(wù)通道
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//2 執(zhí)行阻塞方法
SocketChannel sc = ssc.accept();
//3 設(shè)置阻塞模式
sc.configureBlocking(false);
//4 注冊(cè)到多路復(fù)用器上,并設(shè)置讀取標(biāo)識(shí)
sc.register(this.seletor, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new Thread(new Server(8765)).start();;
}
}
客戶端:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
public class Client {
//需要一個(gè)Selector
private Selector selector;
private ByteBuffer readBuf = ByteBuffer.allocate(1024);
public Client(){
try {
// 獲得一個(gè)Socket通道
SocketChannel channel = SocketChannel.open();
// 設(shè)置通道為非阻塞
channel.configureBlocking(false);
// 獲得一個(gè)通道管理器
this.selector = Selector.open();
// 客戶端連接服務(wù)器,其實(shí)方法執(zhí)行并沒有實(shí)現(xiàn)連接,需要在listen()方法中調(diào)
//用channel.finishConnect();才能完成連接
channel.connect(new InetSocketAddress("127.0.0.1", 8765));
//將通道管理器和該通道綁定,并為該通道注冊(cè)SelectionKey.OP_CONNECT事件。
channel.register(selector, SelectionKey.OP_CONNECT);
} catch (IOException e) {
e.printStackTrace();
}
}
public void listen() throws IOException {
// 輪詢?cè)L問selector
while (true) {
selector.select();
// 獲得selector中選中的項(xiàng)的迭代器
Iterator ite = this.selector.selectedKeys().iterator();
while (ite.hasNext()) {
SelectionKey key = (SelectionKey) ite.next();
// 刪除已選的key,以防重復(fù)處理
ite.remove();
// 連接事件發(fā)生
if (key.isConnectable()) {
SocketChannel channel = (SocketChannel) key
.channel();
// 如果正在連接,則完成連接
if(channel.isConnectionPending()){
channel.finishConnect();
}
// 設(shè)置成非阻塞
channel.configureBlocking(false);
//在這里可以給服務(wù)端發(fā)送信息哦
channel.write(ByteBuffer.wrap(new String("向服務(wù)端發(fā)送了一條信息").getBytes()));
//在和服務(wù)端連接成功之后,為了可以接收到服務(wù)端的信息,需要給通道設(shè)置讀的權(quán)限。
channel.register(this.selector, SelectionKey.OP_READ);
// 獲得了可讀的事件
} else if (key.isReadable()) {
read(key);
}
}
}
}
private void read(SelectionKey key) {
try {
//1 清空緩沖區(qū)舊的數(shù)據(jù)
this.readBuf.clear();
//2 獲取之前注冊(cè)的socket通道對(duì)象
SocketChannel sc = (SocketChannel) key.channel();
//3 讀取數(shù)據(jù)
int count = sc.read(this.readBuf);
//4 如果沒有數(shù)據(jù)
if(count == -1){
key.channel().close();
key.cancel();
return;
}
//5 有數(shù)據(jù)則進(jìn)行讀取 讀取之前需要進(jìn)行復(fù)位方法(把position 和limit進(jìn)行復(fù)位)
this.readBuf.flip();
//6 根據(jù)緩沖區(qū)的數(shù)據(jù)長(zhǎng)度創(chuàng)建相應(yīng)大小的byte數(shù)組,接收緩沖區(qū)的數(shù)據(jù)
byte[] bytes = new byte[this.readBuf.remaining()];
//7 接收緩沖區(qū)數(shù)據(jù)
this.readBuf.get(bytes);
//8 打印結(jié)果
String body = new String(bytes).trim();
System.out.println("收到Server : " + body);
// 9..可以寫回給客戶端數(shù)據(jù)
// readBuf.flip();
// sc.write(readBuf);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
try {
new Client().listen();
} catch (IOException e) {
e.printStackTrace();
}
}
}
運(yùn)行之后并不停止,服務(wù)器和客戶端還在輪詢