Java多線程N(yùn)IO學(xué)習(xí)

原文:https://juejin.im/post/5cb2f8d7e51d456e500f7cf3

阻塞IO 如果數(shù)據(jù)沒(méi)有準(zhǔn)備就緒,就一直等待,直到數(shù)據(jù)準(zhǔn)備就緒;整個(gè)進(jìn)程會(huì)被阻塞。

非阻塞IO 需不斷詢問(wèn)內(nèi)核是否已經(jīng)準(zhǔn)備好數(shù)據(jù),非阻塞雖然不用等待但是一直占用CPU。

多路復(fù)用IO NIO 多路復(fù)用IO,會(huì)有一個(gè)線程不斷地去輪詢多個(gè)socket的狀態(tài),當(dāng)socket有讀寫(xiě)事件的時(shí)候才會(huì)調(diào)用IO讀寫(xiě)操作。 用一個(gè)線程管理多個(gè)socket,是通過(guò)selector.select()查詢每個(gè)通道是否有事件到達(dá),如果沒(méi)有事件到達(dá),則會(huì)一直阻塞在那里,因此也會(huì)帶來(lái)線程阻塞問(wèn)題。

信號(hào)驅(qū)動(dòng)IO模型 在信號(hào)驅(qū)動(dòng)IO模型中,當(dāng)用戶發(fā)起一個(gè)IO請(qǐng)求操作時(shí),會(huì)給對(duì)應(yīng)的socket注冊(cè)一個(gè)信號(hào)函數(shù),線程會(huì)繼續(xù)執(zhí)行,當(dāng)數(shù)據(jù)準(zhǔn)備就緒的時(shí)候會(huì)給線程發(fā)送一個(gè)信號(hào),線程接受到信號(hào)時(shí),會(huì)在信號(hào)函數(shù)中進(jìn)行IO操作。 非阻塞IO、多路復(fù)用IO、信號(hào)驅(qū)動(dòng)IO都不會(huì)造成IO操作的第一步,查看數(shù)據(jù)是否準(zhǔn)備就緒而帶來(lái)的線程阻塞,但是在第二步,對(duì)數(shù)據(jù)進(jìn)行拷貝都會(huì)使線程阻塞。

異步IO jdk7AIO 異步IO是最理想的IO模型,當(dāng)線程發(fā)出一個(gè)IO請(qǐng)求操作時(shí),接著就去做自己的事情了,內(nèi)核去查看數(shù)據(jù)是否準(zhǔn)備就緒和準(zhǔn)備就緒后對(duì)數(shù)據(jù)的拷貝,拷貝完以后內(nèi)核會(huì)給線程發(fā)送一個(gè)通知說(shuō)整個(gè)IO操作已經(jīng)完成了,數(shù)據(jù)可以直接使用了。 同步的IO操作在第二個(gè)階段,對(duì)數(shù)據(jù)的拷貝階段,都會(huì)造成線程的阻塞,異步IO則不會(huì)。

異步IO在IO操作的兩個(gè)階段,都不會(huì)使線程阻塞。 Java 的 I/O 依賴于操作系統(tǒng)的實(shí)現(xiàn)。

Java NIO的工作原理

由一個(gè)專門(mén)的線程(Selector)來(lái)處理所有的IO事件,并負(fù)責(zé)分發(fā)。

事件驅(qū)動(dòng)機(jī)制:事件到的時(shí)候觸發(fā),而不是同步的去監(jiān)視事件。

線程通訊:線程之間通過(guò) wait,notify 等方式通訊。保證每次上下文切換都是有意義的。減少無(wú)謂的線程切換。

三大基本組件

Channel

FileChannel, 從文件中讀寫(xiě)數(shù)據(jù)。

DatagramChannel,通過(guò)UDP讀寫(xiě)網(wǎng)絡(luò)中的數(shù)據(jù)。

SocketChannel,通過(guò)TCP讀寫(xiě)網(wǎng)絡(luò)中的數(shù)據(jù)。

ServerSocketChannel,可以監(jiān)聽(tīng)新進(jìn)來(lái)的TCP連接,對(duì)每一個(gè)新進(jìn)來(lái)的連接都會(huì)創(chuàng)建一個(gè)SocketChannel。

Java NIO 的通道類似流,但又有些不同:

既可以從通道中讀取數(shù)據(jù),又可以寫(xiě)數(shù)據(jù)到通道。但流的讀寫(xiě)通常是單向的。

通道可以異步地讀寫(xiě)。

通道中的數(shù)據(jù)總是要先讀到一個(gè) Buffer,或者總是要從一個(gè) Buffer 中寫(xiě)入。

Buffer

關(guān)鍵的Buffer實(shí)現(xiàn) ByteBuffer,CharBuffer,DoubleBuffer,F(xiàn)loatBuffer,IntBuffer,LongBuffer,ShortBuffer

Buffer兩種模式、三個(gè)屬性:

capacity

作為一個(gè)內(nèi)存塊,Buffer有一個(gè)固定的大小值,也叫“capacity”.你只能往里寫(xiě)capacity個(gè)byte、long,char等類型。一旦Buffer滿了,需要將其清空(通過(guò)讀數(shù)據(jù)或者清除數(shù)據(jù))才能繼續(xù)寫(xiě)數(shù)據(jù)往里寫(xiě)數(shù)據(jù)。

position

當(dāng)你寫(xiě)數(shù)據(jù)到Buffer中時(shí),position表示當(dāng)前的位置。初始的position值為0.當(dāng)一個(gè)byte、long等數(shù)據(jù)寫(xiě)到Buffer后, position會(huì)向前移動(dòng)到下一個(gè)可插入數(shù)據(jù)的Buffer單元。position最大可為capacity – 1. 當(dāng)讀取數(shù)據(jù)時(shí),也是從某個(gè)特定位置讀。當(dāng)將Buffer從寫(xiě)模式切換到讀模式,position會(huì)被重置為0. 當(dāng)從Buffer的position處讀取數(shù)據(jù)時(shí),position向前移動(dòng)到下一個(gè)可讀的位置。

limit

在寫(xiě)模式下,Buffer的limit表示你最多能往Buffer里寫(xiě)多少數(shù)據(jù)。 寫(xiě)模式下,limit等于Buffer的capacity。 當(dāng)切換Buffer到讀模式時(shí), limit表示你最多能讀到多少數(shù)據(jù)。因此,當(dāng)切換Buffer到讀模式時(shí),limit會(huì)被設(shè)置成寫(xiě)模式下的position值。換句話說(shuō),你能讀到之前寫(xiě)入的所有數(shù)據(jù)(limit被設(shè)置成已寫(xiě)數(shù)據(jù)的數(shù)量,這個(gè)值在寫(xiě)模式下就是position)

參考鏈接:Buffer原理?www.cnblogs.com/chenpi/p/64…

Selector

Selector(選擇器)是Java NIO中能夠檢測(cè)一到多個(gè)NIO通道,并能夠知曉通道是否為諸如讀寫(xiě)事件做好準(zhǔn)備的組件。這樣,一個(gè)單獨(dú)的線程可以管理多個(gè)channel,從而管理多個(gè)網(wǎng)絡(luò)連接。

監(jiān)聽(tīng)四種事件

SelectionKey.OP_CONNECT

SelectionKey.OP_ACCEPT

SelectionKey.OP_READ

SelectionKey.OP_WRITE

select()方法

select()阻塞到至少有一個(gè)通道在你注冊(cè)的事件上就緒了。 select(long timeout)和select()一樣,除了最長(zhǎng)會(huì)阻塞timeout毫秒(參數(shù))。

selectedKeys()方法

調(diào)用selector的selectedKeys()方法,訪問(wèn)“已選擇鍵集(selected key set)”中的就緒通道。

參考鏈接:操作系統(tǒng)層面分析Selector原理?zhhphappy.iteye.com/blog/203289…

NIO實(shí)現(xiàn)

服務(wù)端

publicclassNIOServerSocket {//存儲(chǔ)SelectionKey的隊(duì)列privatestaticList writeQueue =newArrayList();privatestaticSelector selector =null;//添加SelectionKey到隊(duì)列publicstaticvoidaddWriteQueue(SelectionKey key){synchronized(writeQueue) {? ? ? ? ? ? writeQueue.add(key);//喚醒主線程selector.wakeup();? ? ? ? }? ? }publicstaticvoidmain(String[] args)throwsIOException {// 1.創(chuàng)建ServerSocketChannelServerSocketChannel serverSocketChannel = ServerSocketChannel.open();// 2.綁定端口serverSocketChannel.bind(newInetSocketAddress(60000));// 3.設(shè)置為非阻塞serverSocketChannel.configureBlocking(false);// 4.創(chuàng)建通道選擇器selector = Selector.open();/*

? ? ? ? * 5.注冊(cè)事件類型

? ? ? ? *

? ? ? ? *? sel:通道選擇器

? ? ? ? *? ops:事件類型 ==>SelectionKey:包裝類,包含事件類型和通道本身。四個(gè)常量類型表示四種事件類型

? ? ? ? *? SelectionKey.OP_ACCEPT 獲取報(bào)文? ? ? SelectionKey.OP_CONNECT 連接

? ? ? ? *? SelectionKey.OP_READ 讀? ? ? ? ? SelectionKey.OP_WRITE 寫(xiě)

? ? ? ? */serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);while(true) {? ? ? ? ? ? System.out.println("服務(wù)器端:正在監(jiān)聽(tīng)60000端口");// 6.獲取可用I/O通道,獲得有多少可用的通道intnum = selector.select();if(num >0) {// 判斷是否存在可用的通道// 獲得所有的keysSet selectedKeys = selector.selectedKeys();// 使用iterator遍歷所有的keysIterator iterator = selectedKeys.iterator();// 迭代遍歷當(dāng)前I/O通道while(iterator.hasNext()) {// 獲得當(dāng)前keySelectionKey key = iterator.next();// 調(diào)用iterator的remove()方法,并不是移除當(dāng)前I/O通道,標(biāo)識(shí)當(dāng)前I/O通道已經(jīng)處理。iterator.remove();// 判斷事件類型,做對(duì)應(yīng)的處理if(key.isAcceptable()) {? ? ? ? ? ? ? ? ? ? ? ? ServerSocketChannel ssChannel = (ServerSocketChannel) key.channel();? ? ? ? ? ? ? ? ? ? ? ? SocketChannel socketChannel = ssChannel.accept();? ? ? ? ? ? ? ? ? ? ? ? System.out.println("處理請(qǐng)求:"+ socketChannel.getRemoteAddress());// 獲取客戶端的數(shù)據(jù)// 設(shè)置非阻塞狀態(tài)socketChannel.configureBlocking(false);// 注冊(cè)到selector(通道選擇器)socketChannel.register(selector, SelectionKey.OP_READ);? ? ? ? ? ? ? ? ? ? }elseif(key.isReadable()) {? ? ? ? ? ? ? ? ? ? ? ? System.out.println("讀事件");//取消讀事件的監(jiān)控key.cancel();//調(diào)用讀操作工具類NIOHandler.read(key);? ? ? ? ? ? ? ? ? ? }elseif(key.isWritable()) {? ? ? ? ? ? ? ? ? ? ? ? System.out.println("寫(xiě)事件");//取消讀事件的監(jiān)控key.cancel();//調(diào)用寫(xiě)操作工具類NIOHandler.write(key);? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? }? ? ? ? ? ? }else{synchronized(writeQueue) {while(writeQueue.size() >0){? ? ? ? ? ? ? ? ? ? ? ? SelectionKey key = writeQueue.remove(0);//注冊(cè)寫(xiě)事件SocketChannel channel = (SocketChannel) key.channel();? ? ? ? ? ? ? ? ? ? ? ? Object attachment = key.attachment();? ? ? ? ? ? ? ? ? ? ? ? channel.register(selector, SelectionKey.OP_WRITE,attachment);? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? }? ? ? ? ? ? }? ? ? ? }? ? } }復(fù)制代碼

消息處理

publicclassNIOHandler{//構(gòu)造線程池privatestaticExecutorService executorService? = Executors.newFixedThreadPool(10);publicstaticvoidread(finalSelectionKey key){//獲得線程并執(zhí)行executorService.submit(newRunnable() {@Overridepublicvoidrun(){try{? ? ? ? ? ? ? ? ? ? SocketChannel readChannel = (SocketChannel) key.channel();// I/O讀數(shù)據(jù)操作ByteBuffer buffer = ByteBuffer.allocate(1024);? ? ? ? ? ? ? ? ? ? ByteArrayOutputStream baos =newByteArrayOutputStream();intlen =0;while(true) {? ? ? ? ? ? ? ? ? ? ? ? buffer.clear();? ? ? ? ? ? ? ? ? ? ? ? len = readChannel.read(buffer);if(len == -1)break;? ? ? ? ? ? ? ? ? ? ? ? buffer.flip();while(buffer.hasRemaining()) {? ? ? ? ? ? ? ? ? ? ? ? ? ? baos.write(buffer.get());? ? ? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? ? System.out.println("服務(wù)器端接收到的數(shù)據(jù):"+newString(baos.toByteArray()));//將數(shù)據(jù)添加到key中key.attach(baos);//將注冊(cè)寫(xiě)操作添加到隊(duì)列中NIOServerSocket.addWriteQueue(key);? ? ? ? ? ? ? ? }catch(IOException e) {? ? ? ? ? ? ? ? ? ? e.printStackTrace();? ? ? ? ? ? ? ? }? ? ? ? ? ? }? ? ? ? });? ? }publicstaticvoidwrite(finalSelectionKey key){//拿到線程并執(zhí)行executorService.submit(newRunnable() {@Overridepublicvoidrun(){try{// 寫(xiě)操作SocketChannel writeChannel = (SocketChannel) key.channel();//拿到客戶端傳遞的數(shù)據(jù)ByteArrayOutputStream attachment = (ByteArrayOutputStream)key.attachment();? ? ? ? ? ? ? ? ? ? System.out.println("客戶端發(fā)送來(lái)的數(shù)據(jù):"+newString(attachment.toByteArray()));? ? ? ? ? ? ? ? ? ? ByteBuffer buffer = ByteBuffer.allocate(1024);? ? ? ? ? ? ? ? ? ? String message ="你好,我是服務(wù)器?。?;? ? ? ? ? ? ? ? ? ? buffer.put(message.getBytes());? ? ? ? ? ? ? ? ? ? buffer.flip();? ? ? ? ? ? ? ? ? ? writeChannel.write(buffer);? ? ? ? ? ? ? ? ? ? writeChannel.close();? ? ? ? ? ? ? ? }catch(IOException e) {? ? ? ? ? ? ? ? ? ? e.printStackTrace();? ? ? ? ? ? ? ? }? ? ? ? ? ? }? ? ? ? });? ? }}復(fù)制代碼

客戶端

publicclassNIOClientSocket {publicstaticvoidmain(String[] args)throwsIOException {//使用線程模擬用戶 并發(fā)訪問(wèn)for(inti =0; i <1; i++) {newThread(){publicvoidrun() {try{//1.創(chuàng)建SocketChannelSocketChannel socketChannel=SocketChannel.open();//2.連接服務(wù)器socketChannel.connect(newInetSocketAddress("localhost",60000));//寫(xiě)數(shù)據(jù)String msg="我是客戶端"+Thread.currentThread().getId();? ? ? ? ? ? ? ? ? ? ? ? ByteBuffer buffer=ByteBuffer.allocate(1024);? ? ? ? ? ? ? ? ? ? ? ? buffer.put(msg.getBytes());? ? ? ? ? ? ? ? ? ? ? ? buffer.flip();? ? ? ? ? ? ? ? ? ? ? ? socketChannel.write(buffer);? ? ? ? ? ? ? ? ? ? ? ? socketChannel.shutdownOutput();//讀數(shù)據(jù)ByteArrayOutputStream bos =newByteArrayOutputStream();intlen =0;while(true) {? ? ? ? ? ? ? ? ? ? ? ? ? ? buffer.clear();? ? ? ? ? ? ? ? ? ? ? ? ? ? len = socketChannel.read(buffer);if(len == -1)break;? ? ? ? ? ? ? ? ? ? ? ? ? ? buffer.flip();while(buffer.hasRemaining()) {? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? bos.write(buffer.get());? ? ? ? ? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? ? ? ? ? System.out.println("客戶端收到:"+newString(bos.toByteArray()));? ? ? ? ? ? ? ? ? ? ? ? socketChannel.close();? ? ? ? ? ? ? ? ? ? }catch(IOException e) {? ? ? ? ? ? ? ? ? ? ? ? e.printStackTrace();? ? ? ? ? ? ? ? ? ? }? ? ? ? ? ? ? ? };? ? ? ? ? ? }.start();? ? ? ? }? ? }}復(fù)制代碼

多線程N(yùn)IO Tips

示例代碼僅供學(xué)習(xí)參考。對(duì)于一個(gè)已經(jīng)被監(jiān)聽(tīng)到的事件,處理前先取消事件(SelectionKey .cancel())監(jiān)控。否則selector.selectedKeys()會(huì)一直獲取到該事件,但該方法比較粗暴,并且后續(xù)register會(huì)產(chǎn)生多個(gè)SelectionKey。推薦使用selectionKey.interestOps()改變感興趣事件。

Selector.select()和Channel.register()需同步。

當(dāng)Channel設(shè)置為非阻塞(Channel.configureBlocking(false))時(shí),SocketChannel.read 沒(méi)讀到數(shù)據(jù)也會(huì)返回,返回參數(shù)等于0。

OP_WRITE事件,寫(xiě)緩沖區(qū)在絕大部分時(shí)候都是有空閑空間的,所以如果你注冊(cè)了寫(xiě)事件,這會(huì)使得寫(xiě)事件一直處于就就緒,選擇處理現(xiàn)場(chǎng)就會(huì)一直占用著CPU資源。參考下面的第二個(gè)鏈接。

粘包問(wèn)題。

參考鏈接:SocketChannel.read?blog.csdn.net/cao47820824…

參考鏈接:NIO坑?www.itdecent.cn/p/1af407c04…

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容