NIO三大核心
Buffer緩沖區(qū)
緩沖區(qū)本質(zhì)上是一個(gè)可以寫(xiě)入數(shù)據(jù)的內(nèi)存塊(類似數(shù)組),然后可以再次讀取。此內(nèi)存塊包含在NIO Buffer對(duì)象中,該對(duì)象提供了一組方法,可以更輕松地使用內(nèi)存塊。相比較直接對(duì)數(shù)組的操作,Buffer API更加容易操作和管理
使用Buffer進(jìn)行數(shù)據(jù)寫(xiě)入與讀取,需要進(jìn)行如下四個(gè)步驟:
- 將數(shù)據(jù)寫(xiě)入緩沖區(qū)
- 調(diào)用buffer.flip(),轉(zhuǎn)換為讀取模式,如果不調(diào)用,則position還在之前寫(xiě)模式寫(xiě)到的位置,讀取數(shù)據(jù)肯定不全面
- 緩沖區(qū)讀取數(shù)據(jù)
- 調(diào)用buffer.clear()清除緩沖區(qū),否則寫(xiě)的數(shù)據(jù)超出的話,則會(huì)越界異常
- buffer.compact()僅清除已閱讀的數(shù)據(jù)轉(zhuǎn)為寫(xiě)入模式,否則寫(xiě)的數(shù)據(jù)超出的話,則會(huì)越界異常
Buffer三個(gè)重要屬性
- capacity容量:作為一個(gè)內(nèi)存塊,Buffer具有一定的固定大小,也稱為"容量"
- position位置:寫(xiě)入模式時(shí)代表寫(xiě)數(shù)據(jù)的位置。讀取模式時(shí)代表讀取數(shù)據(jù)的位置
-
limit限制:寫(xiě)入模式,限制等于buffer的容量。讀取模式下,limit等于寫(xiě)入的數(shù)據(jù)量(可讀量)
1.png
補(bǔ)充:
- rewind() 重置position為0
- mark() 標(biāo)記position的位置
- reset() 重置position為上次mark()標(biāo)記的位置
ByteBuffer內(nèi)存類型
ByteBuffer為性能關(guān)鍵型代碼提供了直接內(nèi)存(direct堆外)和非直接內(nèi)存(heap堆)兩種實(shí)現(xiàn)。堆外內(nèi)存獲取的方式:ByteBuffer directByteBuffer=ByteBuffer.allocateDirect(noBytes);
-
好處
- 進(jìn)行網(wǎng)絡(luò)IO或者文件IO時(shí)比heapBuffer少一次拷貝。(file/socker----OS memory ---- jvm heap)。GC會(huì)移動(dòng)對(duì)象內(nèi)存,在寫(xiě)file或者socket的過(guò)程中,JVM的實(shí)現(xiàn)中,會(huì)先把數(shù)據(jù)復(fù)制到堆外,再進(jìn)行寫(xiě)入。
- GC范圍之外,降低GC壓力,但實(shí)現(xiàn)了自動(dòng)管理。DirectByteBuffer中有一個(gè)Cleaner對(duì)象(PhantomReference),Cleaner被GC前會(huì)執(zhí)行clean方法,觸發(fā)DirectByteBuffer中定義的Deallocator
解釋:為什么JVM需要拷貝一次數(shù)據(jù)到系統(tǒng)內(nèi)存,因?yàn)閖vm本身有g(shù)c,所以如果不拷貝,則在移動(dòng)過(guò)程中很可能出現(xiàn)地址變化了的問(wèn)題(被回收),地址一旦變化,則file/socket操作自然而然不知道哪里是哪里;就類似于交易,如果用股票抵消債務(wù),股票實(shí)時(shí)變動(dòng),很可能走手續(xù)過(guò)程中價(jià)值變化,所以需要先賣出股票然后進(jìn)行相關(guān)操作。
-
建議
- 性能確實(shí)可觀的時(shí)候才去使用,分配給大型、長(zhǎng)壽命:(網(wǎng)絡(luò)傳輸、文件讀寫(xiě)場(chǎng)景)
- 通過(guò)虛擬機(jī)參數(shù)MaxDirectMemorySize限制大小,防止耗盡整個(gè)機(jī)器的內(nèi)存;
Channel通道

SocketChannel

ServerSocketChannel

//會(huì)導(dǎo)致只能接收一個(gè)客戶端連接的代碼示例
public class NIOServer {
public static void main(String[] args) throws IOException {
//創(chuàng)建網(wǎng)絡(luò)服務(wù)端
ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);//設(shè)置為非阻塞模式
serverSocketChannel.socket().bind(new InetSocketAddress(8080));//綁定端口
System.out.println("啟動(dòng)成功");
while (true) {
SocketChannel socketChannel=serverSocketChannel.accept();//獲取新tcp連接通道
if (socketChannel != null) {
System.out.println("收到新連接"+socketChannel.getLocalAddress());
socketChannel.configureBlocking(false);//默認(rèn)阻塞的,一定要設(shè)置為非阻塞
ByteBuffer requestBuffer=ByteBuffer.allocate(1024);
//注意此處,會(huì)導(dǎo)致只能接收一個(gè)連接在這個(gè)連接沒(méi)有發(fā)送數(shù)據(jù)之前,當(dāng)前邏輯會(huì)導(dǎo)致在此處死等
while (socketChannel.isOpen()&&socketChannel.read(requestBuffer)!=-1) {
//長(zhǎng)連接情況下,需要手動(dòng)改判斷數(shù)據(jù)有沒(méi)有讀取結(jié)束(此處做一個(gè)簡(jiǎn)單的判斷,超過(guò)0字節(jié)就認(rèn)為請(qǐng)求結(jié)束了)
if (requestBuffer.position()>0) {
break;
}
}
if (requestBuffer.position()==0) continue;//如果沒(méi)有數(shù)據(jù)了,則不繼續(xù)后面的處理
requestBuffer.flip();//轉(zhuǎn)為讀模式
byte[] content=new byte[requestBuffer.limit()];
requestBuffer.get(content);
System.out.println(new String(content));
System.out.println("收到數(shù)據(jù),來(lái)自:"+socketChannel.getRemoteAddress());
String response="HTTP/1.1 200OK\r\n"+
"Content-Length: 11\r\n\r\n"+
"Hello World";
ByteBuffer buffer=ByteBuffer.wrap(response.getBytes());
while (buffer.hasRemaining()) {
socketChannel.write(buffer);//非阻塞
}
}
}
}
}
public class NIOClient {
public static void main(String[] args) throws IOException {
SocketChannel socketChannel=SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("127.0.0.1",8080));
while (!socketChannel.finishConnect()) {
//沒(méi)鏈接上,則一直等待
Thread.yield();
}
Scanner scanner=new Scanner(System.in);
System.out.println("請(qǐng)輸入:");
String msg=scanner.nextLine();
ByteBuffer buffer=ByteBuffer.wrap(msg.getBytes());
while (buffer.hasRemaining()) {
socketChannel.write(buffer);
}
//讀取響應(yīng)
System.out.println("收到服務(wù)端響應(yīng)");
ByteBuffer requestBuffer=ByteBuffer.allocate(1024);
while (socketChannel.isOpen()&&socketChannel.read(requestBuffer)!=-1) {
if (requestBuffer.position()>0) {
break;
}
}
requestBuffer.flip();
byte[] content=new byte[requestBuffer.limit()];
requestBuffer.get(content);
System.out.println(new String(content));
scanner.close();
socketChannel.close();
}
}
Selector選擇器
Selector是一個(gè)Java NIO組件,可以檢查一個(gè)或多個(gè)NIO通道,并確定哪些通道已準(zhǔn)備好進(jìn)行讀取或?qū)懭搿?shí)現(xiàn)單個(gè)線程可以管理多個(gè)通過(guò),從而管理多個(gè)網(wǎng)絡(luò)連接。如果不使用選擇器,則必須自己手動(dòng)實(shí)現(xiàn)while循環(huán)類似的代碼,才能確保不停的接收連接和正常的數(shù)據(jù)傳輸

//client和上面的一樣,只是用選擇器改進(jìn)了server端
public class NIOServer {
public static void main(String[] args) throws IOException {
//創(chuàng)建網(wǎng)絡(luò)服務(wù)端
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);//設(shè)置為非阻塞模式
//構(gòu)建一個(gè)Selector選擇器,并且將channel注冊(cè)上去
Selector selector = Selector.open();
SelectionKey selectionKey = serverSocketChannel.register(selector, 0, serverSocketChannel);
selectionKey.interestOps(SelectionKey.OP_ACCEPT);//對(duì)serverSocketChannel上面的accept事件感興趣(serverSocketChannel只能支持accept操作)
serverSocketChannel.socket().bind(new InetSocketAddress(8080));//綁定端口
System.out.println("啟動(dòng)成功");
while (true) {
//不在輪詢通道,改用下面輪詢事件的方式 select方法有阻塞效果,直到有事件通知才會(huì)有返回
selector.select();
//獲取事件
Set<SelectionKey> selectionKeys = selector.selectedKeys();
//遍歷查詢結(jié)果
Iterator<SelectionKey> iter = selectionKeys.iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
//關(guān)注Read和Accept兩個(gè)事件
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.attachment();
//將拿到的客戶端連接通道,注冊(cè)到selector上面
SocketChannel clientSocketChannel = server.accept();
clientSocketChannel.configureBlocking(false);
//接收到客戶端連接之后,注冊(cè)讀取事件,這樣下面的key.isReadable()才能進(jìn)入
clientSocketChannel.register(selector, SelectionKey.OP_READ, clientSocketChannel);
System.out.println("收到新連接:" + clientSocketChannel.getRemoteAddress());
}
if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.attachment();
ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
while (socketChannel.isOpen() && socketChannel.read(requestBuffer) != -1) {
//長(zhǎng)連接情況下,需要手動(dòng)改判斷數(shù)據(jù)有沒(méi)有讀取結(jié)束(此處做一個(gè)簡(jiǎn)單的判斷,超過(guò)0字節(jié)就認(rèn)為請(qǐng)求結(jié)束了)
if (requestBuffer.position() > 0) {
break;
}
}
if (requestBuffer.position() == 0) continue;//如果沒(méi)有數(shù)據(jù)了,則不繼續(xù)后面的處理
requestBuffer.flip();//轉(zhuǎn)為讀模式
byte[] content = new byte[requestBuffer.limit()];
requestBuffer.get(content);
System.out.println(new String(content));
System.out.println("收到數(shù)據(jù),來(lái)自:" + socketChannel.getRemoteAddress());
String response = "HTTP/1.1 200OK\r\n" +
"Content-Length: 11\r\n\r\n" +
"Hello World";
ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
while (buffer.hasRemaining()) {
socketChannel.write(buffer);//非阻塞
}
}
}
selector.selectNow();
}
}
}
實(shí)現(xiàn)一個(gè)線程處理多個(gè)通道的核心概念理解:事件驅(qū)動(dòng)機(jī)制
非阻塞的網(wǎng)絡(luò)通道下,開(kāi)發(fā)者通過(guò)Selector注冊(cè)對(duì)于通道感興趣的事件類型,線程通過(guò)監(jiān)聽(tīng)事件來(lái)觸發(fā)響應(yīng)的代碼執(zhí)行。(更底層是操作系統(tǒng)的多路復(fù)用機(jī)制)

BIO和NIO對(duì)比

NIO與多線程結(jié)合的改進(jìn)方案
當(dāng)并發(fā)高的時(shí)候,單線程處理連接很容易遇到性能瓶頸,則此時(shí)可以NIO和多線程結(jié)合使用

