Java的BIO,NIO,AIO

1. 前言

有一些概念總是Java I/O一塊出現(xiàn),比如同步與異步,阻塞與非阻塞,這些概念往往也是非常難以區(qū)分。在介紹Java I/O之前,本文先通俗地介紹一下這兩組概念的區(qū)別:

  • 同步和異步: 同步和異步的區(qū)分點(diǎn)在消息的通知機(jī)制。
    如果是程序主動(dòng)獲取消息,為同步,程序被動(dòng)獲取消息,為異步。例如燒水,如果我們時(shí)不時(shí)去看看水是否燒開,則為同步。而如果水壺是會(huì)響笛的水壺,我們聽見響笛則認(rèn)為水燒開了,則為異步。而對(duì)于程序,如果是程序輪詢結(jié)果或者直接等待結(jié)果,為同步。如果程序調(diào)用了然后立刻返回,結(jié)果等待被調(diào)用方通知,或者回調(diào),則為異步。重點(diǎn)在獲取調(diào)用的結(jié)果的方式。
  • 阻塞和非阻塞: 區(qū)分點(diǎn)則在等待程序調(diào)用結(jié)果時(shí),程序所處的狀態(tài)。
    例如燒水,如果在燒水的過程中,我們一直等著,啥事都不干,則為阻塞。如果我們?cè)跓倪^程中,繼續(xù)干著別的事,則為非阻塞。重點(diǎn)在獲取程序調(diào)用結(jié)果的,程序所處于的狀態(tài)。

總的來說,對(duì)于一個(gè)程序中調(diào)用過程來說,獲取調(diào)用結(jié)果的方式,決定了程序是同步(主動(dòng))還是異步(被動(dòng))。而在獲取調(diào)用結(jié)果的過程中,程序所處的狀態(tài),決定了程序是阻塞(掛起)還是非阻塞(處理其他的事情)

2. Java I/O的發(fā)展歷程

Java I/O的發(fā)展一般來說主要是分為三個(gè)階段:

  • 第一個(gè)階段:在JDK 1.0到JDK 1.3中,Java的I/O類庫是非常簡單的,很多UNIX網(wǎng)絡(luò)編程中的概念或者接口在Java I/O類庫中都沒有體現(xiàn)。通常,我們這種類型的I/O為BIO,即Blocking I/O。
  • 第二個(gè)階段:在JDK 1.4中,java 新增加了java.nio包,正式引入了NIO(Non-blocking I/O),提供了異步開發(fā)I/O的API和類庫。Java NIO主要由Selector,ByteBuffer和Channel三個(gè)核心部分組成。
  • 第三個(gè)階段:JDK1.7正式發(fā)布,java對(duì)NIO進(jìn)行了升級(jí),被稱為NIO2.0,也稱為AIO,支持文件的異步I/O以及網(wǎng)絡(luò)的異步操作。
3. BIO、NIO以及AIO

網(wǎng)絡(luò)Socket編程的一般類型是Server/client類型的,即兩個(gè)進(jìn)程之間的通信。Server端通過綁定端口號(hào)建立Socket監(jiān)聽連接,而Client端通過指定Ip地址和端口號(hào)通過三次握手建立雙方的連接,如果連接成功,雙方就通過Socket進(jìn)行通信。
在第2部分,簡單的介紹了BIO,NIO以及AIO的概念,在本部分主要通過實(shí)例代碼來展示三個(gè)的關(guān)鍵點(diǎn)。

1.BIO

當(dāng)Server和Client端采用BIO形式,雙方通過輸入流和輸出流通過同步阻塞的方式進(jìn)行通信。
采用BIO通信方式的Server端,一般由一個(gè)單獨(dú)的Acceptor線程來監(jiān)聽客戶端的連接請(qǐng)求,Server接收到Client端的連接后,就為每一個(gè)client建立新的線程,進(jìn)行鏈路處理,通過輸入流發(fā)送響應(yīng)到客戶端,銷毀線程,整個(gè)socket通信流程結(jié)束。這是典型的一請(qǐng)求一應(yīng)答模式的。

以下示例的socket通信流程,模擬客戶端發(fā)送http請(qǐng)求給服務(wù)器端,如果請(qǐng)求的路徑為登錄地址,則返回已經(jīng)登錄,如果請(qǐng)求路徑不為登錄地址,則返回還未登錄。

Server端監(jiān)聽8080端口通過輪詢的方式不斷監(jiān)聽client的連接請(qǐng)求,等待client的連接(serverSocket.accept())。當(dāng)沒有client連接server的時(shí)候,線程阻塞在accept()處。當(dāng)Server接收到client的連接請(qǐng)求,新建一個(gè)線程進(jìn)行鏈路業(yè)務(wù)處理。

服務(wù)器端Acceptor線程(主線程)代碼如下:

public class InfoServer {
    public static void main(String args[]){
        try{
            ServerSocket serverSocket = new ServerSocket(8080);
            Socket socket = null;
            while(true){
                System.out.println("socket listening");
                socket = serverSocket.accept(); //線程阻塞在此處
                new Thread(new LoginCheckThread(socket)).start();
                System.out.println("socket accepted");
            }
        }catch(Exception e){
            e.printStackTrace();
        }
    }
}

在鏈路處理線程中,代碼流程為:

  • 首先根據(jù)socket的輸入流產(chǎn)生字符流BufferReader對(duì)象;
  • 通過BufferReader對(duì)象的readline()方法,讀取client端傳過來的數(shù)據(jù)。readline()方法當(dāng)讀取到換行符'\n'或'\r'時(shí)才返回。
  • 最后,從socket的輸出流中產(chǎn)生了字符流PrintWriter對(duì)象,通過PrintWriter對(duì)象發(fā)送響應(yīng)內(nèi)容。

鏈路處理線程為:

public class LoginCheckThread implements Runnable {

    private Socket socket = null;
    public LoginCheckThread(Socket socket){
        this.socket = socket;
    }

    public void run(){
        try{
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            PrintWriter printWriter = new PrintWriter(socket.getOutputStream(),true);
            String content = bufferedReader.readLine();
            System.out.println(Thread.currentThread().getName()+"   "+content);
            //獲取訪問的域名,如果是登錄請(qǐng)求,則返回已經(jīng)登錄,否則提示沒有登錄
            if(content.split(" ")[1].equals("/a/login")){
                printWriter.println("you are not login in this system!");
            }else{
                printWriter.println("you have login in this system!");
            }
            socket.close();
        }catch(Exception e){
            e.printStackTrace();
        }

    }
}

客戶端的代碼:

客戶端通過指定IP地址和端口,嘗試連接server端口,連接上Server端后,通過PrinterWriter對(duì)象想Server端發(fā)送請(qǐng)求數(shù)據(jù)。發(fā)送完請(qǐng)求數(shù)據(jù)之后,client通過BufferReader的read(Char[])方法來讀取Server端的響應(yīng)數(shù)據(jù)。需要注意到是read(char[])會(huì)產(chǎn)生阻塞,read(char[])方法只有在以下三種情況下才會(huì)返回:

  • 讀取到足夠多的字節(jié);
  • 讀取輸入流的終止符;
  • 發(fā)生IO異常
public class Client {

    public static void main(String args[]){
        try{
            Socket socket = new Socket("127.0.0.1",8080);
            PrintWriter printWriter = new PrintWriter(socket.getOutputStream());
            printWriter.println();
            printWriter.flush();
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            char[] reponseChar = new char[1024];
            //read方法是阻塞方法,程序在此處會(huì)產(chǎn)生阻塞
            bufferedReader.read(reponseChar); 
            System.out.println("repose:"+new String(reponseChar));
            socket.close();
        }catch(Exception e){
            e.printStackTrace();
        }
    }
}

2.NIO

NIO即非阻塞I/O(Non-blocking I/O),NIO類庫提供了 對(duì)應(yīng)的ServerSocketChannel和SocketChannel兩種不同套接字的通道實(shí)現(xiàn),ServerSocketChannel和Socketchannel分別于BIO中的ServerSocket和Socket對(duì)應(yīng)。這兩種新增的通道都支持阻塞和非阻塞模式,阻塞模式使用起來更加的簡單,但是性能和可靠性上都不好,非阻塞模式卻正好相反。

Java NIO由Channel、Buffer、Selector三個(gè)核心部分組成。Channel和Buffer與操作系統(tǒng)的IO方式更加接近,所以性能上會(huì)比傳統(tǒng)的AIO要好。

在NIO中,基本上所有的IO中都是從一個(gè)Channel開始。Channel譯為通道,與流不同的是,通道同時(shí)讀和寫。數(shù)據(jù)可以從Channel中讀到Buffer中,也可以從Buffer中寫到Channel中。Selector(選擇器)是能夠檢測(cè)一個(gè)到多個(gè)NIO通道,并能夠知曉通道是否為諸如讀寫事件做好準(zhǔn)備的組件。通過Selector,一個(gè)單獨(dú)的線程可以管理多個(gè)Channel,從而管理多個(gè)網(wǎng)絡(luò)連接。

如何使用NIO來進(jìn)行網(wǎng)絡(luò)編程:

  • 創(chuàng)建Selector:通過Selector.open()可以創(chuàng)建Selector對(duì)象;
  • 創(chuàng)建Channel:Channel分為ServerSocketChannel和SocketChannel。在Server端,通過ServerSocketChannel.open()可以創(chuàng)建Server端監(jiān)聽通道ServerSocketChannel,在Client端可以通過SocketChannel.open()可以打開連接通道SocketChannel對(duì)象;
  • 向Selector中注冊(cè)Channel及感興趣的事件(OP_READ,OP_WRITE,OP_CONNECT,OP_ACCEPT);
    • ServerSocketChannel可以向Selector注冊(cè)O(shè)P_ACCEPT,而SocketChannel可以向Seletor注冊(cè)O(shè)P_READ,OP_WRTIE,OP_CONNECT;
  • 輪詢Selector,獲取就緒的Channel(通過Selector.select()及其他重載方法)
  • 針對(duì)特定的Channel進(jìn)行業(yè)務(wù)上的處理。

下面通過具體的代碼來說明如何進(jìn)行NIO編程

InfoServer類為Server端的啟動(dòng)類,通過新建線程的形式啟動(dòng)Server端的監(jiān)聽線程。

public class InfoServer {
    public static void main(String args[]){
        new Thread(new LoginCheckTask(8080)).start();
    }
}

LoginCheckTask類實(shí)現(xiàn)了網(wǎng)絡(luò)監(jiān)聽、網(wǎng)絡(luò)連接及請(qǐng)求處理的操作。在構(gòu)造函數(shù)對(duì)Server端進(jìn)行了網(wǎng)絡(luò)初始化,包括獲得Selector對(duì)象、ServersocketChannel對(duì)象、設(shè)置Socket參數(shù),最后還向Selector注冊(cè)了當(dāng)前ServerSocketChannel通道的OP_ACCERT事件。

在NIO中,通道Channel要么從緩沖器獲得數(shù)據(jù),要么向緩沖器發(fā)送數(shù)據(jù)。唯一直接與通道交互的緩沖器是ByteBuffer。

當(dāng)server端通道中監(jiān)聽到client端的連接后,建立與Client端連接的SocketChannel,并向該Socketchannel中注冊(cè)O(shè)P_READ事件。
當(dāng)多路復(fù)用器Selector檢測(cè)到OP_READ事件就緒后,就從該SocketChannel的的緩沖器ByteBuffer中讀取請(qǐng)求內(nèi)容。當(dāng)Server端有數(shù)據(jù)需要向client端發(fā)送響應(yīng)時(shí),首先需要將響應(yīng)的字節(jié)數(shù)據(jù)寫入到ByteBuffer中,然后通過SocketChannel的write方法向client端發(fā)送響應(yīng)的。

public class LoginCheckTask implements Runnable{
    private Selector selector;
    private ServerSocketChannel serverSocketChannel;
    private volatile  boolean stop;

    public LoginCheckTask(int port){
        try{
            //獲取Selector對(duì)象
            selector = Selector.open();
            //獲取ServerSocketChannel對(duì)象
            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.configureBlocking(false);
            //設(shè)置監(jiān)聽參數(shù)
            serverSocketChannel.socket().bind(new InetSocketAddress(port),1024);
            //向selector注冊(cè)O(shè)P_ACCEEPT事件
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println(" server listening");
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    public void stop(){
        this.stop = true;
    }

    public void run(){
        while (!stop){
            try{
                //獲取通道就緒的網(wǎng)絡(luò)事件,該方法會(huì)阻塞1s
                selector.select(1000);
                Set<SelectionKey> selectionKeySet = selector.selectedKeys();
                Iterator<SelectionKey> it = selectionKeySet.iterator();
                SelectionKey key = null;
                //分別處理就緒的網(wǎng)絡(luò)事件
                while(it.hasNext()){
                    key = it.next();
                    it.remove();
                    try{
                        if(key.isValid()){
                            //處理OP_ACCEPT事件
                            if(key.isAcceptable()){
                                ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
                                SocketChannel socketChannel = serverSocketChannel.accept();
                                socketChannel.configureBlocking(false);
                               //將與client端建立的socketChannel,向Selector注冊(cè)O(shè)P_READ事件;
                               socketChannel.register(selector,SelectionKey.OP_READ);
                                System.out.println("server accepted");
                            }
                            //處理OP_READ事件
                            if(key.isReadable()){
                                System.out.println("client is readable");
                                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                                SocketChannel sc = (SocketChannel) key.channel();
                                int readBytes = sc.read(readBuffer);
                                if(readBytes > 0){
                                    readBuffer.flip();
                                    byte[] bytes = new byte[readBytes];
                                    readBuffer.get(bytes);
                                    String body = new String(bytes,"UTF-8");
                                    String response;
                                    //獲取訪問的域名,如果是登錄請(qǐng)求,則返回已經(jīng)登錄,否則提示沒有登錄
                                    if(body.split(" ")[1].equals("/a/login")){
                                        response = "you are not login in this system!/n";
                                    }else{
                                        response = "you have login in this system!/n";
                                    }
                                    byte[] reponseBytes = response.getBytes();
                                    ByteBuffer reposeByteBuffer = ByteBuffer.allocate(1024);
                                    reposeByteBuffer.put(reponseBytes);
                                    reposeByteBuffer.flip();
                                    Thread.currentThread().sleep(1000);
                                    sc.write(reposeByteBuffer);
                                }
                            }

                        }
                    }catch(Exception e){
                        e.printStackTrace();
                        key.cancel();
                        if(key.channel()!= null){
                            key.channel().close();
                        }
                    }
                }
            }catch(Exception e){
                e.printStackTrace();
            }
        }
        if(selector != null){
            try {
                selector.close();
            }catch(Exception e){
                e.printStackTrace();;
            }
        }

    }
}

在NIO的Client端,程序的基本執(zhí)行流程為:

  • 初始化時(shí)需要產(chǎn)生Selector對(duì)象和SocketChannel,配置SocketChannel的阻塞模式為非阻塞模式。
  • client嘗試與Server建立的Socket連接,如果直接連接成功,則注冊(cè)O(shè)P_READ事件并發(fā)送請(qǐng)求數(shù)據(jù),否則注冊(cè)O(shè)P_CONNECT事件。
  • 與Server端類似,client也是通過循環(huán)不斷探測(cè)多路復(fù)用器Selector的就緒事件
    • 如果事件是連接成功事件,則注冊(cè)O(shè)P_READ事件,并向Server服務(wù)端發(fā)送請(qǐng)求數(shù)據(jù)
    • 如果事件為可讀時(shí)間,則通過緩沖器ByteBuffer從SocketChannel中讀取Server端的響應(yīng)數(shù)據(jù)。
public class Client {

    private Selector selector;
    private SocketChannel socketChannel;

    public static void main(String args[]){
        new Client().connect();
    }
    public void connect(){
        try{
            selector = Selector.open();
            socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
        }catch(Exception e){
            e.printStackTrace();
        }
        try {
            //非阻塞模式,如果直接連接成功,則注冊(cè)讀,否則注冊(cè)連接事件
            if(socketChannel.connect(new InetSocketAddress("127.0.0.1",8080))){
                socketChannel.register(selector, SelectionKey.OP_READ);
                doWrite(socketChannel);
            }else{
                socketChannel.register(selector,SelectionKey.OP_CONNECT);
            }
        }catch(Exception e){
            e.printStackTrace();
        }
        boolean loop= true;
        while(loop){
            try{
                selector.select(1000);
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while(iterator.hasNext()){
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    if(key.isValid()){
                        if(key.isConnectable()){
                            if(socketChannel.finishConnect()){
                                socketChannel.register(selector, SelectionKey.OP_READ);
                                doWrite(socketChannel);
                            }else{
                                System.exit(1);
                            }
                        }
                        //讀取服務(wù)器端返回?cái)?shù)據(jù)
                        if(key.isReadable()){
                            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                            int readBytes = socketChannel.read(byteBuffer);
                            if(readBytes > 0){
                                byteBuffer.flip();
                                byte[] responseByte = new byte[byteBuffer.remaining()];
                                byteBuffer.get(responseByte);
                                System.out.println(new String(responseByte,"UTF-8"));
                                loop= false;
                            }
                        }
                    }
                }
            }catch(Exception e){
                e.printStackTrace();
            }
        }
    }

    /**
     * 如果連接成功,則向服務(wù)器發(fā)送數(shù)據(jù)
     * @param sc
     */
    public void doWrite(SocketChannel sc)throws Exception{
        String request = "GET /a/index HTTP/1.1 ";
        byte[] requestBytes = request.getBytes();
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        byteBuffer.put(requestBytes);
        byteBuffer.flip();
        sc.write(byteBuffer);
    }
}

在整個(gè)NIOSocket通信流程中,只有在Selector.select(1000)處阻塞1s,其他的讀寫操作由于都是通過緩沖器來操作Channel,所以均為非阻塞操作。

3.AIO

NIO 2.0(即AIO)引入了新的異步通道的概念,并提供了異步文件通道和異步套接字通道的實(shí)現(xiàn)。異步通道提供以下兩種方式獲取操作結(jié)果:

  • 通過java.util.concurrent.Future類表示異步操作的結(jié)果;
  • 在執(zhí)行異步的時(shí)候傳入一個(gè)java.nio.channels;

CompletionHandler的實(shí)現(xiàn)類作為操作完成的回調(diào)。

NIO2.0的異步套接字通道是真正的異步非阻塞I/O,對(duì)應(yīng)于unix網(wǎng)絡(luò)編程的事件驅(qū)動(dòng)I/O。它不需要通過多路復(fù)用器Selector對(duì)注冊(cè)的通道進(jìn)行輪詢,即可實(shí)現(xiàn)異步讀寫,從而簡化了NIO的編程模型。

Server端代碼:

public class InfoServer {

    public static void main(String args[]){
        new Thread(new LoginCheckHandler(8080)).start();
    }
}

和之前類似,在構(gòu)造函數(shù)中完成Server端Socket的初始化,主要完成獲取AsynchronousServerSocketChannel對(duì)象,監(jiān)聽端口設(shè)置。
在run方法中,我們通過AsynchronousServerSocketChannel的異步方法accept(A attachment, CompletionHandler<AsynchronousSocketChannel,? super A> handler)來接收client端的連接,并指定了連接完成后的回調(diào)函數(shù)AcceptCompletionHandler對(duì)象(AcceptCompletionHandler是ComplettionHandler的實(shí)現(xiàn)類)

public class LoginCheckHandler implements Runnable {
    private int port;
    CountDownLatch latch;
    AsynchronousServerSocketChannel asynchronousServerSocketChannel;

    public LoginCheckHandler(int port){
        try{
            //打開異步通道
            asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open();
            //監(jiān)聽8080端口
            asynchronousServerSocketChannel.bind(new InetSocketAddress(8080));
            System.out.println("服務(wù)器正在監(jiān)聽8080端口中");
        }catch(Exception e){
            e.printStackTrace();
        }

    }
    public void run(){
        //latch的作用是在完成一組正在執(zhí)行的操作前之前,允許當(dāng)前的線程一直阻塞。在這里我們是為了防止服務(wù)器執(zhí)行完成退出。
        latch = new CountDownLatch(1);
        //異步ServerSocketchannel
        asynchronousServerSocketChannel.accept(this,new AcceptCompletionHandler());
        try{
            latch.await();
        }catch(Exception e){
            e.printStackTrace();
        }
    }
}

AcceptCompletionHandler是Server端與Client端完成Socket通信過程建立后的回調(diào)函數(shù),當(dāng)通信過程成功建立,則調(diào)用completed()方法,否則調(diào)用failed()方法。


需要特別注意,在completed方法中,還需要再次調(diào)用AsyncrhonousSocketChannel的accept方法,因?yàn)橐粋€(gè)Server端可以接收多個(gè)client端的連接,所以需要繼續(xù)調(diào)用accept方法繼續(xù)接收其他client的連接,最終形成一個(gè)循環(huán)。每當(dāng)一個(gè)client端連接進(jìn)來后,再異步接收新的連接。
在completed方法中,通過asynchronousSocketChannel的read方法來異步讀取客戶端的請(qǐng)求內(nèi)容,讀操作也是異步的。ReadCompletedHandler類是讀操作完成后的回調(diào)函數(shù)。

public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel,LoginCheckHandler> {
    @Override
    public void completed(AsynchronousSocketChannel asynchronousSocketChannel, LoginCheckHandler attachment){
        System.out.println("服務(wù)器接收到連接");
        attachment.asynchronousServerSocketChannel.accept(attachment,this);
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        asynchronousSocketChannel.read(buffer,buffer,new ReadCompletedHandler(asynchronousSocketChannel));
    }
    @Override
    public void failed(Throwable t,LoginCheckHandler attachment){
        t.printStackTrace();
        attachment.latch.countDown();
    }
}

ReadCompletedHandler也是CompletionHandler的實(shí)現(xiàn)類,如果是讀操作的回調(diào)函數(shù),java類庫已經(jīng)明確規(guī)定了,其泛型的參數(shù)類型為

CompletionHandler<Integer,ByteBuffer>

其中Integer主要是為了記錄讀取client端的請(qǐng)求數(shù)據(jù)的大小。
在completed方法內(nèi)部,通過讀取緩沖區(qū)ByteBuffer獲取請(qǐng)求數(shù)據(jù),并且完成業(yè)務(wù)處理,返回響應(yīng)內(nèi)容。從doWrite方法內(nèi)部,可以看到寫操作也是異步的。通過匿名內(nèi)部類的方式來指定了寫操作完成后的回調(diào)函數(shù)。

public class ReadCompletedHandler implements CompletionHandler<Integer,ByteBuffer>{
    private AsynchronousSocketChannel asynchronousSocketChannel;
    public ReadCompletedHandler(AsynchronousSocketChannel channel){
        if( this.asynchronousSocketChannel == null){
            this.asynchronousSocketChannel = channel;
        }
    }
    public void completed(Integer result,ByteBuffer attachment){
        attachment.flip();
        byte[] reponse = new byte[attachment.remaining()];
        attachment.get(reponse);
        try{
            String req = new String(reponse,"UTF-8");
            System.out.println("the server received: "+req);
            //獲取訪問的域名,如果是登錄請(qǐng)求,則返回已經(jīng)登錄,否則提示沒有登錄
            if(req.split(" ")[1].equals("/a/login")){
                doWrite("you are not login in this system!");
            }else{
                doWrite("you have login in this system!");
            }
        }catch(Exception e){
            e.printStackTrace();
        }

    }
    public void doWrite(String response){
        try{
            Thread.currentThread().sleep(5000);
            System.out.println("讀取到數(shù)據(jù),等待5s");
        }catch(Exception e){
            e.printStackTrace();
        }
        byte[] bytes = response.getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
        writeBuffer.put(bytes);
        writeBuffer.flip();
        asynchronousSocketChannel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer result, ByteBuffer attachment) {
                //只要有剩余的內(nèi)容沒有發(fā)送完就繼續(xù)發(fā)送數(shù)據(jù)
                if(attachment.hasRemaining()){
                    asynchronousSocketChannel.write(attachment,attachment,this);
                }
            }

            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                try{
                    asynchronousSocketChannel.close();
                }catch(Exception e){
                    e.printStackTrace();
                }
            }
        });
    }
    public void failed(Throwable t,ByteBuffer attachment){
        try{
            this.asynchronousSocketChannel.close();
        }catch(Exception e){
            e.printStackTrace();
        }
    }
}

AIO client端代碼:

public class Client {
    public static void main(String args[]){
        new Thread(new ClientHandler("localhost",8080)).start();
    }
}

對(duì)于Client端的代碼,特別要注意AsynchronousSocketChannel的connect方法已經(jīng)指定了連接成功后的回調(diào)函數(shù)的泛型為

CompletionHandler<Void,ClientHandler>

在ClientHandler內(nèi)部,完成了連接成功后的回調(diào)函數(shù)(ClientHandler),寫操作完成后的回調(diào)函數(shù)(匿名內(nèi)部類),讀操作完成后的回調(diào)函數(shù)(匿名內(nèi)部類)。

public class ClientHandler implements CompletionHandler<Void,ClientHandler>,Runnable {

    private AsynchronousSocketChannel asynchronousSocketChannel;
    private String host;
    private int port;
    private CountDownLatch countDownLatch;
    public ClientHandler(String host,int port){
        this.host = host;
        this.port = port;
        try{
            asynchronousSocketChannel = AsynchronousSocketChannel.open();
        }catch(Exception e){
            e.printStackTrace();
        }
    }

    public void run(){
        countDownLatch = new CountDownLatch(1);
        asynchronousSocketChannel.connect(new InetSocketAddress(host,port),this,this);
        try{
            countDownLatch.await();
        }catch(Exception e){
            e.printStackTrace();
        }
    }

    @Override
    public void completed(Void result, ClientHandler attachment) {
        byte[] req = "GET /a/index HTTP/1.1 ".getBytes();
        final ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
        writeBuffer.put(req);
        writeBuffer.flip();
        asynchronousSocketChannel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer,ByteBuffer>() {
            @Override
            public void completed(Integer result,ByteBuffer attachment){
                if(attachment.hasRemaining()){
                    asynchronousSocketChannel.write(attachment,attachment,this);
                }else{
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                    asynchronousSocketChannel.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() {
                        @Override
                        public void completed(Integer result, ByteBuffer attachment) {
                            attachment.flip();
                            byte[] bytes = new byte[attachment.remaining()];
                            attachment.get(bytes);
                            String body;
                            try{
                                body = new String(bytes,"UTF-8");
                                System.out.println(body);
                                countDownLatch.countDown();
                            }catch(Exception e){
                                e.printStackTrace();
                            }
                        }

                        @Override
                        public void failed(Throwable exc, ByteBuffer attachment) {
                            try {
                                asynchronousSocketChannel.close();
                                countDownLatch.countDown();
                            }catch(Exception e){
                                e.printStackTrace();
                            }
                        }
                    });
                }
            }
            public void failed(Throwable t,ByteBuffer attachment){
                try{
                    asynchronousSocketChannel.close();
                }catch(Exception e){
                    e.printStackTrace();
                }
            }
        });
    }

    @Override
    public void failed(Throwable exc, ClientHandler attachment) {
        try{
            asynchronousSocketChannel.close();
        }catch(Exception e){
            e.printStackTrace();
        }
    }
}

異步SocketChannel為被動(dòng)執(zhí)行對(duì)象,編程人員不需要像NIO一樣,編寫?yīng)毩⒌腎/O線程來處理讀寫操作。對(duì)于AsynchrousServerSocketChannel和AsynchrousSocketChannel,它們都由JDK底層的線程池負(fù)責(zé)回調(diào)并驅(qū)動(dòng)讀寫操作。也正是因?yàn)槿绱薃IO編程模型比NIO編程模型更加簡單。

4.BIO、NIO、AIO對(duì)比
類型 同步阻塞BIO 同步非阻塞NIO 異步非阻塞AIO
客戶端個(gè)數(shù):(I/O 線程) 1:1 M:1(一個(gè)I/O線程處理多個(gè)客戶端連接) M:0(不需要額外的啟動(dòng)線程,被動(dòng)回調(diào))
I/O類型(阻塞) 阻塞I/O 非阻塞I/O 非阻塞I/O
I/O類型(同步) 同步I/O 同步I/O 異步I/O
API使用難度 簡單 非常復(fù)雜 復(fù)雜
調(diào)試難度 簡單 復(fù)雜 復(fù)雜
可靠性 非常差
吞吐量

上表對(duì)于三種類型的I/O模型進(jìn)行了對(duì)比,具體應(yīng)該選用哪個(gè)編程模型,完全基于實(shí)際的業(yè)務(wù)場景進(jìn)行技術(shù)選型。一般情況低負(fù)載、低并發(fā)的應(yīng)用程序可以選用阻塞IO,以降低編程的復(fù)雜度。而高負(fù)載、高并發(fā)的網(wǎng)絡(luò)應(yīng)用可以采用NIO的非阻塞模式,提供應(yīng)用程序的性能。

參考:李林鋒《netty權(quán)威指南》

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容