java 網(wǎng)絡(luò)通信 NlO(non-blocking i/o 或者 new i/o) channel

  • channel 是雙向的,同時(shí)讀取和寫入,流是單向的

  • channel 和多路復(fù)用器結(jié)合之后,有多種狀態(tài)位,方便多路復(fù)用器(輪詢)去識(shí)別(連接狀態(tài),阻塞狀態(tài),可讀狀態(tài),可寫狀態(tài))

  • channel 分為倆大類,網(wǎng)絡(luò)讀寫的SelectableChannel,文件操作的FileChannel

  • 網(wǎng)絡(luò)讀寫的SocketChannel和ServerSocketChannel是SelectableChannel的子類

  • SocketChannel和ServerSocketChannel 依賴于多路復(fù)用器(Selector),Selector是NIO編程的基礎(chǔ),提供選擇已經(jīng)就緒的任務(wù)的能力。簡(jiǎn)單來說,Selector會(huì)不斷地輪詢注冊(cè)在其上的Channel,如果某個(gè)Channel上面發(fā)生讀或者寫事件,這個(gè)Channel就處于就緒狀態(tài),會(huì)被Selector輪詢出來,然后通過SelectionKey可以獲取就緒Channel的集合,進(jìn)行后續(xù)的I/O操作

  • 一個(gè)多路復(fù)用器可以負(fù)責(zé)成千上萬個(gè)Channel通道,沒有上限,這也是JDK使用了epoll代替了傳統(tǒng)的select實(shí)現(xiàn),獲得連接句柄沒有限制。這樣意味著我們只要一個(gè)線程負(fù)責(zé)Selector的輪詢,就可以接入成千上萬個(gè)客戶端,這是JDK NIO 的巨大進(jìn)步。

  • Selector線程就類似一個(gè)管理者M(jìn)aster,管理成千上萬個(gè)Channel,然后輪詢哪個(gè)管道的數(shù)據(jù)已經(jīng)準(zhǔn)備好,通知CPU執(zhí)行IO的讀取或?qū)懭氩僮鳌?/p>

  • Selector模式:當(dāng)IO事件(管道)注冊(cè)到選擇器后,Selector會(huì)分配給每個(gè)管道一個(gè)key值,相當(dāng)于標(biāo)簽。Selector選擇器是以輪詢的方式進(jìn)行查找注冊(cè)所有的IO事件。當(dāng)我們的IO事件(管道)準(zhǔn)備就緒后,select就會(huì)識(shí)別,會(huì)通過key值找到相應(yīng)的管道,進(jìn)行相關(guān)的數(shù)據(jù)處理操作(從管道里讀或?qū)憯?shù)據(jù),寫到我們的數(shù)據(jù)緩沖區(qū)Buffer去)

  • 每個(gè)管道都會(huì)對(duì)選擇器進(jìn)行注冊(cè)到不同的事件狀態(tài),以便選擇器查找:SelectionKey.OP_CONNECT SelectionKey.OP_ACCEPT SelectionKey.OP_READ SelectionKey.OP_WRITE

  • NIO、AIO學(xué)習(xí)歷程

  • NIO入門

  • io與nio比較:

  1. io當(dāng)客戶端多時(shí),會(huì)創(chuàng)建大量的處理線程。且每個(gè)線程都要占用??臻g和一些CPU時(shí)間
  2. io阻塞可能帶來頻繁的上下文切換,且大部分上下文切換可能是無意義的。
  3. NIO的本質(zhì)是原始的tcp建立連接使用3次握手的操作,減少連接的開銷
  4. IO是面向流(Stream)的,NIO是面向塊(buffer)的
  5. Java NIO的selectors允許一條線程去監(jiān)控多個(gè)channels的輸入,你可以向一個(gè)selector上注冊(cè)多個(gè)channel,然后調(diào)用selector的select()方法判斷是否有新的連接進(jìn)來或者已經(jīng)在selector上注冊(cè)時(shí)channel是否有數(shù)據(jù)進(jìn)入。selector的機(jī)制讓一個(gè)線程管理多個(gè)channel變得簡(jiǎn)單。(不再使用多線程處理連接)
  6. NIO允許你用一個(gè)單獨(dú)的線程或幾個(gè)線程管理很多個(gè)channels(網(wǎng)絡(luò)的或者文件的),代價(jià)是程序的處理和處理IO相比更加復(fù)雜
ByteBuffer buffer = ByteBuffer.allocate(48);  

int bytesRead = inChannel.read(buffer);

ByteBuffer buffer = ByteBuffer.allocate(48);  
  
int bytesRead = inChannel.read(buffer);  
  
while(! bufferFull(bytesRead) ) {  
    bytesRead = inChannel.read(buffer);  
}  

注意第二行從channel中讀取數(shù)據(jù)到ByteBuffer,當(dāng)這個(gè)方法返回你不知道是否你需要的所有數(shù)據(jù)都被讀到buffer了,你所知道的一切就是有一些數(shù)據(jù)被讀到了buffer中,但是你并不知道具體有多少數(shù)據(jù),這使程序的處理變得稍微有些困難
想象一下,調(diào)用了read(buffer)方法后,只有半行數(shù)據(jù)被讀進(jìn)了buffer,例如:“Name: An”,你能現(xiàn)在就處理數(shù)據(jù)嗎?當(dāng)然不能。你需要等待直到至少一整行數(shù)據(jù)被讀到buffer中,在這之前確保程序不要處理buffer中的數(shù)據(jù)
你如何知道buffer中是否有足夠的數(shù)據(jù)可以被處理呢?你不知道,唯一的方法就是檢查buffer中的數(shù)據(jù)。可能你會(huì)進(jìn)行幾次無效的檢查(檢查了幾次數(shù)據(jù)都不夠進(jìn)行處理),這會(huì)令程序設(shè)計(jì)變得比較混亂復(fù)雜
bufferFull方法負(fù)責(zé)檢查有多少數(shù)據(jù)被讀到了buffer中,根據(jù)返回值是true還是false來判斷數(shù)據(jù)是否夠進(jìn)行處理。bufferFull方法掃描buffer但不能改變buffer的內(nèi)部狀態(tài)

  1. NIO允許你用一個(gè)單獨(dú)的線程或幾個(gè)線程管理很多個(gè)channels(網(wǎng)絡(luò)的或者文件的),代價(jià)是程序的處理和處理IO相比更加復(fù)雜,如果你需要同時(shí)管理成千上萬的連接,但是每個(gè)連接只發(fā)送少量數(shù)據(jù),例如一個(gè)聊天服務(wù)器,用NIO實(shí)現(xiàn)會(huì)更好一些,相似的,如果你需要保持很多個(gè)到其他電腦的連接,例如P2P網(wǎng)絡(luò),用一個(gè)單獨(dú)的線程來管理所有出口連接是比較合適的,如果你只有少量的連接但是每個(gè)連接都占有很高的帶寬,同時(shí)發(fā)送很多數(shù)據(jù),傳統(tǒng)的IO會(huì)更適合
  • NIO實(shí)現(xiàn)步驟
    • 打開多路復(fù)用器
    • 打開服務(wù)器端通道
    • 設(shè)置服務(wù)器通道為非阻塞模式
    • 服務(wù)端通道綁定端口和地址
    • 把服務(wù)器通道注冊(cè)到多路復(fù)用器上,并且監(jiān)聽阻塞事件
    • 多路復(fù)用器輪詢監(jiān)聽
    • 返回多路復(fù)用器已經(jīng)選擇的SelectionKey結(jié)果集
    • 結(jié)果集進(jìn)行遍歷,遍歷時(shí)移除SelectionKey元素,防止重復(fù)處理
    • 對(duì)selectionkey進(jìn)行isValid()有效性校驗(yàn)
    • 如果selectionkey是連接狀態(tài)的,用socketchannel.finishConnect();
    • 如果selectionkey是阻塞狀態(tài)的,用socketchannel.register(seletor, SelectionKey.OP_READ)可讀狀態(tài)
    • 如果selectionkey是可讀狀態(tài)的,進(jìn)行讀取
    • 如果selectionkey是可寫狀態(tài)的,OP_WRITE比較特殊,表示本地的寫緩沖區(qū)可用,一般只有在一次寫沒有把數(shù)據(jù)寫完的情況下需要注冊(cè)O(shè)P_WRITE,寫完后要及時(shí)關(guān)閉,否則每次循環(huán)都有可能被調(diào)用,因?yàn)閷懢彌_區(qū)在大多數(shù)情況下是始終可用的。
  • 雙向通信示例
    服務(wù)器端
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;

public class Server implements Runnable{
    //1 多路復(fù)用器(管理所有的通道)
    private Selector seletor;
    //2 建立緩沖區(qū)
    private ByteBuffer readBuf = ByteBuffer.allocate(1024);
    //3 
    private ByteBuffer writeBuf = ByteBuffer.allocate(1024);
    public Server(int port){
        try {
            //1 打開路復(fù)用器
            this.seletor = Selector.open();
            //2 打開服務(wù)器通道
            ServerSocketChannel ssc = ServerSocketChannel.open();
            //3 設(shè)置服務(wù)器通道為非阻塞模式
            ssc.configureBlocking(false);
            //4 綁定地址
            ssc.bind(new InetSocketAddress(port));
            //5 把服務(wù)器通道注冊(cè)到多路復(fù)用器上,并且監(jiān)聽阻塞事件
            ssc.register(this.seletor, SelectionKey.OP_ACCEPT);
            
            System.out.println("Server start, port :" + port);
            
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        while(true){
            try {
                //1 必須要讓多路復(fù)用器開始監(jiān)聽
                this.seletor.select();
                //2 返回多路復(fù)用器已經(jīng)選擇的結(jié)果集
                Iterator<SelectionKey> keys = this.seletor.selectedKeys().iterator();
                //3 進(jìn)行遍歷
                while(keys.hasNext()){
                    //4 獲取一個(gè)選擇的元素
                    SelectionKey key = keys.next();
                    //5 直接從容器中移除就可以了
                    keys.remove();
                    //6 如果是有效的
                    if(key.isValid()){
                        //7 如果為阻塞狀態(tài)
                        if(key.isAcceptable()){
                            this.accept(key);
                        }
                        //8 如果為可讀狀態(tài)
                        if(key.isReadable()){
                            this.read(key);
                        }
                        //9 寫數(shù)據(jù)
                        if(key.isWritable()){
                        }
                    }
                    
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    


    private void read(SelectionKey key) {
        try {
            //1 清空緩沖區(qū)舊的數(shù)據(jù)
            this.readBuf.clear();
            //2 獲取之前注冊(cè)的socket通道對(duì)象
            SocketChannel sc = (SocketChannel) key.channel();
            //3 讀取數(shù)據(jù)
            int count = sc.read(this.readBuf);
            //4 如果沒有數(shù)據(jù)
            if(count == -1){
                key.channel().close();
                key.cancel();
                return;
            }
            //5 有數(shù)據(jù)則進(jìn)行讀取 讀取之前需要進(jìn)行復(fù)位方法(把position 和limit進(jìn)行復(fù)位)
            this.readBuf.flip();
            //6 根據(jù)緩沖區(qū)的數(shù)據(jù)長(zhǎng)度創(chuàng)建相應(yīng)大小的byte數(shù)組,接收緩沖區(qū)的數(shù)據(jù)
            byte[] bytes = new byte[this.readBuf.remaining()];
            //7 接收緩沖區(qū)數(shù)據(jù)
            this.readBuf.get(bytes);
            //8 打印結(jié)果
            String body = new String(bytes).trim();
            System.out.println("收到客戶端 : " + body);
            
            // 9..可以寫回給客戶端數(shù)據(jù)
            readBuf.flip();
            sc.write(readBuf);
            sc.register(this.seletor, SelectionKey.OP_READ);
            
        } catch (IOException e) {
            e.printStackTrace();
        }
        
    }

    private void accept(SelectionKey key) {
        try {
            //1 獲取服務(wù)通道
            ServerSocketChannel ssc =  (ServerSocketChannel) key.channel();
            //2 執(zhí)行阻塞方法
            SocketChannel sc = ssc.accept();
            //3 設(shè)置阻塞模式
            sc.configureBlocking(false);
            //4 注冊(cè)到多路復(fù)用器上,并設(shè)置讀取標(biāo)識(shí)
            sc.register(this.seletor, SelectionKey.OP_READ);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    public static void main(String[] args) {
        
        new Thread(new Server(8765)).start();;
    }
    
    
}

客戶端:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class Client {

    //需要一個(gè)Selector
    private Selector selector;
    private ByteBuffer readBuf = ByteBuffer.allocate(1024);

    public Client(){
        try {
            // 獲得一個(gè)Socket通道
            SocketChannel channel = SocketChannel.open();
            // 設(shè)置通道為非阻塞
            channel.configureBlocking(false);
            // 獲得一個(gè)通道管理器
            this.selector = Selector.open();

            // 客戶端連接服務(wù)器,其實(shí)方法執(zhí)行并沒有實(shí)現(xiàn)連接,需要在listen()方法中調(diào)
            //用channel.finishConnect();才能完成連接
            channel.connect(new InetSocketAddress("127.0.0.1", 8765));
            //將通道管理器和該通道綁定,并為該通道注冊(cè)SelectionKey.OP_CONNECT事件。
            channel.register(selector, SelectionKey.OP_CONNECT);
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
    public void listen() throws IOException {
        // 輪詢?cè)L問selector
        while (true) {
            selector.select();
            // 獲得selector中選中的項(xiàng)的迭代器
            Iterator ite = this.selector.selectedKeys().iterator();
            while (ite.hasNext()) {
                SelectionKey key = (SelectionKey) ite.next();
                // 刪除已選的key,以防重復(fù)處理
                ite.remove();
                // 連接事件發(fā)生
                if (key.isConnectable()) {
                    SocketChannel channel = (SocketChannel) key
                            .channel();
                    // 如果正在連接,則完成連接
                    if(channel.isConnectionPending()){
                        channel.finishConnect();

                    }
                    // 設(shè)置成非阻塞
                    channel.configureBlocking(false);

                    //在這里可以給服務(wù)端發(fā)送信息哦
                    channel.write(ByteBuffer.wrap(new String("向服務(wù)端發(fā)送了一條信息").getBytes()));
                    //在和服務(wù)端連接成功之后,為了可以接收到服務(wù)端的信息,需要給通道設(shè)置讀的權(quán)限。
                    channel.register(this.selector, SelectionKey.OP_READ);

                    // 獲得了可讀的事件
                } else if (key.isReadable()) {
                    read(key);
                }

            }

        }
    }
    private void read(SelectionKey key) {
        try {
            //1 清空緩沖區(qū)舊的數(shù)據(jù)
            this.readBuf.clear();
            //2 獲取之前注冊(cè)的socket通道對(duì)象
            SocketChannel sc = (SocketChannel) key.channel();
            //3 讀取數(shù)據(jù)
            int count = sc.read(this.readBuf);
            //4 如果沒有數(shù)據(jù)
            if(count == -1){
                key.channel().close();
                key.cancel();
                return;
            }
            //5 有數(shù)據(jù)則進(jìn)行讀取 讀取之前需要進(jìn)行復(fù)位方法(把position 和limit進(jìn)行復(fù)位)
            this.readBuf.flip();
            //6 根據(jù)緩沖區(qū)的數(shù)據(jù)長(zhǎng)度創(chuàng)建相應(yīng)大小的byte數(shù)組,接收緩沖區(qū)的數(shù)據(jù)
            byte[] bytes = new byte[this.readBuf.remaining()];
            //7 接收緩沖區(qū)數(shù)據(jù)
            this.readBuf.get(bytes);
            //8 打印結(jié)果
            String body = new String(bytes).trim();
            System.out.println("收到Server : " + body);

            // 9..可以寫回給客戶端數(shù)據(jù)
//          readBuf.flip();
//          sc.write(readBuf);

        } catch (IOException e) {
            e.printStackTrace();
        }

    }
    public static void main(String[] args) {


        try {
            new Client().listen();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
    
}

運(yùn)行之后并不停止,服務(wù)器和客戶端還在輪詢

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容