1. 前言
有一些概念總是Java I/O一塊出現(xiàn),比如同步與異步,阻塞與非阻塞,這些概念往往也是非常難以區(qū)分。在介紹Java I/O之前,本文先通俗地介紹一下這兩組概念的區(qū)別:
- 同步和異步: 同步和異步的區(qū)分點(diǎn)在消息的通知機(jī)制。
如果是程序主動(dòng)獲取消息,為同步,程序被動(dòng)獲取消息,為異步。例如燒水,如果我們時(shí)不時(shí)去看看水是否燒開,則為同步。而如果水壺是會(huì)響笛的水壺,我們聽見響笛則認(rèn)為水燒開了,則為異步。而對(duì)于程序,如果是程序輪詢結(jié)果或者直接等待結(jié)果,為同步。如果程序調(diào)用了然后立刻返回,結(jié)果等待被調(diào)用方通知,或者回調(diào),則為異步。重點(diǎn)在獲取調(diào)用的結(jié)果的方式。 - 阻塞和非阻塞: 區(qū)分點(diǎn)則在等待程序調(diào)用結(jié)果時(shí),程序所處的狀態(tài)。
例如燒水,如果在燒水的過程中,我們一直等著,啥事都不干,則為阻塞。如果我們?cè)跓倪^程中,繼續(xù)干著別的事,則為非阻塞。重點(diǎn)在獲取程序調(diào)用結(jié)果的,程序所處于的狀態(tài)。
總的來說,對(duì)于一個(gè)程序中調(diào)用過程來說,獲取調(diào)用結(jié)果的方式,決定了程序是同步(主動(dòng))還是異步(被動(dòng))。而在獲取調(diào)用結(jié)果的過程中,程序所處的狀態(tài),決定了程序是阻塞(掛起)還是非阻塞(處理其他的事情)
2. Java I/O的發(fā)展歷程
Java I/O的發(fā)展一般來說主要是分為三個(gè)階段:
- 第一個(gè)階段:在JDK 1.0到JDK 1.3中,Java的I/O類庫是非常簡單的,很多UNIX網(wǎng)絡(luò)編程中的概念或者接口在Java I/O類庫中都沒有體現(xiàn)。通常,我們這種類型的I/O為BIO,即Blocking I/O。
- 第二個(gè)階段:在JDK 1.4中,java 新增加了java.nio包,正式引入了NIO(Non-blocking I/O),提供了異步開發(fā)I/O的API和類庫。Java NIO主要由Selector,ByteBuffer和Channel三個(gè)核心部分組成。
- 第三個(gè)階段:JDK1.7正式發(fā)布,java對(duì)NIO進(jìn)行了升級(jí),被稱為NIO2.0,也稱為AIO,支持文件的異步I/O以及網(wǎng)絡(luò)的異步操作。
3. BIO、NIO以及AIO
網(wǎng)絡(luò)Socket編程的一般類型是Server/client類型的,即兩個(gè)進(jìn)程之間的通信。Server端通過綁定端口號(hào)建立Socket監(jiān)聽連接,而Client端通過指定Ip地址和端口號(hào)通過三次握手建立雙方的連接,如果連接成功,雙方就通過Socket進(jìn)行通信。
在第2部分,簡單的介紹了BIO,NIO以及AIO的概念,在本部分主要通過實(shí)例代碼來展示三個(gè)的關(guān)鍵點(diǎn)。
1.BIO
當(dāng)Server和Client端采用BIO形式,雙方通過輸入流和輸出流通過同步阻塞的方式進(jìn)行通信。
采用BIO通信方式的Server端,一般由一個(gè)單獨(dú)的Acceptor線程來監(jiān)聽客戶端的連接請(qǐng)求,Server接收到Client端的連接后,就為每一個(gè)client建立新的線程,進(jìn)行鏈路處理,通過輸入流發(fā)送響應(yīng)到客戶端,銷毀線程,整個(gè)socket通信流程結(jié)束。這是典型的一請(qǐng)求一應(yīng)答模式的。
以下示例的socket通信流程,模擬客戶端發(fā)送http請(qǐng)求給服務(wù)器端,如果請(qǐng)求的路徑為登錄地址,則返回已經(jīng)登錄,如果請(qǐng)求路徑不為登錄地址,則返回還未登錄。
Server端監(jiān)聽8080端口通過輪詢的方式不斷監(jiān)聽client的連接請(qǐng)求,等待client的連接(serverSocket.accept())。當(dāng)沒有client連接server的時(shí)候,線程阻塞在accept()處。當(dāng)Server接收到client的連接請(qǐng)求,新建一個(gè)線程進(jìn)行鏈路業(yè)務(wù)處理。
服務(wù)器端Acceptor線程(主線程)代碼如下:
public class InfoServer {
public static void main(String args[]){
try{
ServerSocket serverSocket = new ServerSocket(8080);
Socket socket = null;
while(true){
System.out.println("socket listening");
socket = serverSocket.accept(); //線程阻塞在此處
new Thread(new LoginCheckThread(socket)).start();
System.out.println("socket accepted");
}
}catch(Exception e){
e.printStackTrace();
}
}
}
在鏈路處理線程中,代碼流程為:
- 首先根據(jù)socket的輸入流產(chǎn)生字符流BufferReader對(duì)象;
- 通過BufferReader對(duì)象的readline()方法,讀取client端傳過來的數(shù)據(jù)。readline()方法當(dāng)讀取到換行符'\n'或'\r'時(shí)才返回。
- 最后,從socket的輸出流中產(chǎn)生了字符流PrintWriter對(duì)象,通過PrintWriter對(duì)象發(fā)送響應(yīng)內(nèi)容。
鏈路處理線程為:
public class LoginCheckThread implements Runnable {
private Socket socket = null;
public LoginCheckThread(Socket socket){
this.socket = socket;
}
public void run(){
try{
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
PrintWriter printWriter = new PrintWriter(socket.getOutputStream(),true);
String content = bufferedReader.readLine();
System.out.println(Thread.currentThread().getName()+" "+content);
//獲取訪問的域名,如果是登錄請(qǐng)求,則返回已經(jīng)登錄,否則提示沒有登錄
if(content.split(" ")[1].equals("/a/login")){
printWriter.println("you are not login in this system!");
}else{
printWriter.println("you have login in this system!");
}
socket.close();
}catch(Exception e){
e.printStackTrace();
}
}
}
客戶端的代碼:
客戶端通過指定IP地址和端口,嘗試連接server端口,連接上Server端后,通過PrinterWriter對(duì)象想Server端發(fā)送請(qǐng)求數(shù)據(jù)。發(fā)送完請(qǐng)求數(shù)據(jù)之后,client通過BufferReader的read(Char[])方法來讀取Server端的響應(yīng)數(shù)據(jù)。需要注意到是read(char[])會(huì)產(chǎn)生阻塞,read(char[])方法只有在以下三種情況下才會(huì)返回:
- 讀取到足夠多的字節(jié);
- 讀取輸入流的終止符;
- 發(fā)生IO異常
public class Client {
public static void main(String args[]){
try{
Socket socket = new Socket("127.0.0.1",8080);
PrintWriter printWriter = new PrintWriter(socket.getOutputStream());
printWriter.println();
printWriter.flush();
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
char[] reponseChar = new char[1024];
//read方法是阻塞方法,程序在此處會(huì)產(chǎn)生阻塞
bufferedReader.read(reponseChar);
System.out.println("repose:"+new String(reponseChar));
socket.close();
}catch(Exception e){
e.printStackTrace();
}
}
}
2.NIO
NIO即非阻塞I/O(Non-blocking I/O),NIO類庫提供了 對(duì)應(yīng)的ServerSocketChannel和SocketChannel兩種不同套接字的通道實(shí)現(xiàn),ServerSocketChannel和Socketchannel分別于BIO中的ServerSocket和Socket對(duì)應(yīng)。這兩種新增的通道都支持阻塞和非阻塞模式,阻塞模式使用起來更加的簡單,但是性能和可靠性上都不好,非阻塞模式卻正好相反。
Java NIO由Channel、Buffer、Selector三個(gè)核心部分組成。Channel和Buffer與操作系統(tǒng)的IO方式更加接近,所以性能上會(huì)比傳統(tǒng)的AIO要好。
在NIO中,基本上所有的IO中都是從一個(gè)Channel開始。Channel譯為通道,與流不同的是,通道同時(shí)讀和寫。數(shù)據(jù)可以從Channel中讀到Buffer中,也可以從Buffer中寫到Channel中。Selector(選擇器)是能夠檢測(cè)一個(gè)到多個(gè)NIO通道,并能夠知曉通道是否為諸如讀寫事件做好準(zhǔn)備的組件。通過Selector,一個(gè)單獨(dú)的線程可以管理多個(gè)Channel,從而管理多個(gè)網(wǎng)絡(luò)連接。
如何使用NIO來進(jìn)行網(wǎng)絡(luò)編程:
- 創(chuàng)建Selector:通過Selector.open()可以創(chuàng)建Selector對(duì)象;
- 創(chuàng)建Channel:Channel分為ServerSocketChannel和SocketChannel。在Server端,通過ServerSocketChannel.open()可以創(chuàng)建Server端監(jiān)聽通道ServerSocketChannel,在Client端可以通過SocketChannel.open()可以打開連接通道SocketChannel對(duì)象;
-
向Selector中注冊(cè)Channel及感興趣的事件(OP_READ,OP_WRITE,OP_CONNECT,OP_ACCEPT);
- ServerSocketChannel可以向Selector注冊(cè)O(shè)P_ACCEPT,而SocketChannel可以向Seletor注冊(cè)O(shè)P_READ,OP_WRTIE,OP_CONNECT;
- 輪詢Selector,獲取就緒的Channel(通過Selector.select()及其他重載方法)
- 針對(duì)特定的Channel進(jìn)行業(yè)務(wù)上的處理。
下面通過具體的代碼來說明如何進(jìn)行NIO編程
InfoServer類為Server端的啟動(dòng)類,通過新建線程的形式啟動(dòng)Server端的監(jiān)聽線程。
public class InfoServer {
public static void main(String args[]){
new Thread(new LoginCheckTask(8080)).start();
}
}
LoginCheckTask類實(shí)現(xiàn)了網(wǎng)絡(luò)監(jiān)聽、網(wǎng)絡(luò)連接及請(qǐng)求處理的操作。在構(gòu)造函數(shù)對(duì)Server端進(jìn)行了網(wǎng)絡(luò)初始化,包括獲得Selector對(duì)象、ServersocketChannel對(duì)象、設(shè)置Socket參數(shù),最后還向Selector注冊(cè)了當(dāng)前ServerSocketChannel通道的OP_ACCERT事件。
在NIO中,通道Channel要么從緩沖器獲得數(shù)據(jù),要么向緩沖器發(fā)送數(shù)據(jù)。唯一直接與通道交互的緩沖器是ByteBuffer。
當(dāng)server端通道中監(jiān)聽到client端的連接后,建立與Client端連接的SocketChannel,并向該Socketchannel中注冊(cè)O(shè)P_READ事件。
當(dāng)多路復(fù)用器Selector檢測(cè)到OP_READ事件就緒后,就從該SocketChannel的的緩沖器ByteBuffer中讀取請(qǐng)求內(nèi)容。當(dāng)Server端有數(shù)據(jù)需要向client端發(fā)送響應(yīng)時(shí),首先需要將響應(yīng)的字節(jié)數(shù)據(jù)寫入到ByteBuffer中,然后通過SocketChannel的write方法向client端發(fā)送響應(yīng)的。
public class LoginCheckTask implements Runnable{
private Selector selector;
private ServerSocketChannel serverSocketChannel;
private volatile boolean stop;
public LoginCheckTask(int port){
try{
//獲取Selector對(duì)象
selector = Selector.open();
//獲取ServerSocketChannel對(duì)象
serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
//設(shè)置監(jiān)聽參數(shù)
serverSocketChannel.socket().bind(new InetSocketAddress(port),1024);
//向selector注冊(cè)O(shè)P_ACCEEPT事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println(" server listening");
}catch (Exception e){
e.printStackTrace();
}
}
public void stop(){
this.stop = true;
}
public void run(){
while (!stop){
try{
//獲取通道就緒的網(wǎng)絡(luò)事件,該方法會(huì)阻塞1s
selector.select(1000);
Set<SelectionKey> selectionKeySet = selector.selectedKeys();
Iterator<SelectionKey> it = selectionKeySet.iterator();
SelectionKey key = null;
//分別處理就緒的網(wǎng)絡(luò)事件
while(it.hasNext()){
key = it.next();
it.remove();
try{
if(key.isValid()){
//處理OP_ACCEPT事件
if(key.isAcceptable()){
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
SocketChannel socketChannel = serverSocketChannel.accept();
socketChannel.configureBlocking(false);
//將與client端建立的socketChannel,向Selector注冊(cè)O(shè)P_READ事件;
socketChannel.register(selector,SelectionKey.OP_READ);
System.out.println("server accepted");
}
//處理OP_READ事件
if(key.isReadable()){
System.out.println("client is readable");
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
SocketChannel sc = (SocketChannel) key.channel();
int readBytes = sc.read(readBuffer);
if(readBytes > 0){
readBuffer.flip();
byte[] bytes = new byte[readBytes];
readBuffer.get(bytes);
String body = new String(bytes,"UTF-8");
String response;
//獲取訪問的域名,如果是登錄請(qǐng)求,則返回已經(jīng)登錄,否則提示沒有登錄
if(body.split(" ")[1].equals("/a/login")){
response = "you are not login in this system!/n";
}else{
response = "you have login in this system!/n";
}
byte[] reponseBytes = response.getBytes();
ByteBuffer reposeByteBuffer = ByteBuffer.allocate(1024);
reposeByteBuffer.put(reponseBytes);
reposeByteBuffer.flip();
Thread.currentThread().sleep(1000);
sc.write(reposeByteBuffer);
}
}
}
}catch(Exception e){
e.printStackTrace();
key.cancel();
if(key.channel()!= null){
key.channel().close();
}
}
}
}catch(Exception e){
e.printStackTrace();
}
}
if(selector != null){
try {
selector.close();
}catch(Exception e){
e.printStackTrace();;
}
}
}
}
在NIO的Client端,程序的基本執(zhí)行流程為:
- 初始化時(shí)需要產(chǎn)生Selector對(duì)象和SocketChannel,配置SocketChannel的阻塞模式為非阻塞模式。
- client嘗試與Server建立的Socket連接,如果直接連接成功,則注冊(cè)O(shè)P_READ事件并發(fā)送請(qǐng)求數(shù)據(jù),否則注冊(cè)O(shè)P_CONNECT事件。
- 與Server端類似,client也是通過循環(huán)不斷探測(cè)多路復(fù)用器Selector的就緒事件
- 如果事件是連接成功事件,則注冊(cè)O(shè)P_READ事件,并向Server服務(wù)端發(fā)送請(qǐng)求數(shù)據(jù)
- 如果事件為可讀時(shí)間,則通過緩沖器ByteBuffer從SocketChannel中讀取Server端的響應(yīng)數(shù)據(jù)。
public class Client {
private Selector selector;
private SocketChannel socketChannel;
public static void main(String args[]){
new Client().connect();
}
public void connect(){
try{
selector = Selector.open();
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
}catch(Exception e){
e.printStackTrace();
}
try {
//非阻塞模式,如果直接連接成功,則注冊(cè)讀,否則注冊(cè)連接事件
if(socketChannel.connect(new InetSocketAddress("127.0.0.1",8080))){
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
}else{
socketChannel.register(selector,SelectionKey.OP_CONNECT);
}
}catch(Exception e){
e.printStackTrace();
}
boolean loop= true;
while(loop){
try{
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while(iterator.hasNext()){
SelectionKey key = iterator.next();
iterator.remove();
if(key.isValid()){
if(key.isConnectable()){
if(socketChannel.finishConnect()){
socketChannel.register(selector, SelectionKey.OP_READ);
doWrite(socketChannel);
}else{
System.exit(1);
}
}
//讀取服務(wù)器端返回?cái)?shù)據(jù)
if(key.isReadable()){
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int readBytes = socketChannel.read(byteBuffer);
if(readBytes > 0){
byteBuffer.flip();
byte[] responseByte = new byte[byteBuffer.remaining()];
byteBuffer.get(responseByte);
System.out.println(new String(responseByte,"UTF-8"));
loop= false;
}
}
}
}
}catch(Exception e){
e.printStackTrace();
}
}
}
/**
* 如果連接成功,則向服務(wù)器發(fā)送數(shù)據(jù)
* @param sc
*/
public void doWrite(SocketChannel sc)throws Exception{
String request = "GET /a/index HTTP/1.1 ";
byte[] requestBytes = request.getBytes();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
byteBuffer.put(requestBytes);
byteBuffer.flip();
sc.write(byteBuffer);
}
}
在整個(gè)NIOSocket通信流程中,只有在Selector.select(1000)處阻塞1s,其他的讀寫操作由于都是通過緩沖器來操作Channel,所以均為非阻塞操作。
3.AIO
NIO 2.0(即AIO)引入了新的異步通道的概念,并提供了異步文件通道和異步套接字通道的實(shí)現(xiàn)。異步通道提供以下兩種方式獲取操作結(jié)果:
- 通過java.util.concurrent.Future類表示異步操作的結(jié)果;
- 在執(zhí)行異步的時(shí)候傳入一個(gè)java.nio.channels;
CompletionHandler的實(shí)現(xiàn)類作為操作完成的回調(diào)。
NIO2.0的異步套接字通道是真正的異步非阻塞I/O,對(duì)應(yīng)于unix網(wǎng)絡(luò)編程的事件驅(qū)動(dòng)I/O。它不需要通過多路復(fù)用器Selector對(duì)注冊(cè)的通道進(jìn)行輪詢,即可實(shí)現(xiàn)異步讀寫,從而簡化了NIO的編程模型。
Server端代碼:
public class InfoServer {
public static void main(String args[]){
new Thread(new LoginCheckHandler(8080)).start();
}
}
和之前類似,在構(gòu)造函數(shù)中完成Server端Socket的初始化,主要完成獲取AsynchronousServerSocketChannel對(duì)象,監(jiān)聽端口設(shè)置。
在run方法中,我們通過AsynchronousServerSocketChannel的異步方法accept(A attachment, CompletionHandler<AsynchronousSocketChannel,? super A> handler)來接收client端的連接,并指定了連接完成后的回調(diào)函數(shù)AcceptCompletionHandler對(duì)象(AcceptCompletionHandler是ComplettionHandler的實(shí)現(xiàn)類)
public class LoginCheckHandler implements Runnable {
private int port;
CountDownLatch latch;
AsynchronousServerSocketChannel asynchronousServerSocketChannel;
public LoginCheckHandler(int port){
try{
//打開異步通道
asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open();
//監(jiān)聽8080端口
asynchronousServerSocketChannel.bind(new InetSocketAddress(8080));
System.out.println("服務(wù)器正在監(jiān)聽8080端口中");
}catch(Exception e){
e.printStackTrace();
}
}
public void run(){
//latch的作用是在完成一組正在執(zhí)行的操作前之前,允許當(dāng)前的線程一直阻塞。在這里我們是為了防止服務(wù)器執(zhí)行完成退出。
latch = new CountDownLatch(1);
//異步ServerSocketchannel
asynchronousServerSocketChannel.accept(this,new AcceptCompletionHandler());
try{
latch.await();
}catch(Exception e){
e.printStackTrace();
}
}
}
AcceptCompletionHandler是Server端與Client端完成Socket通信過程建立后的回調(diào)函數(shù),當(dāng)通信過程成功建立,則調(diào)用completed()方法,否則調(diào)用failed()方法。
需要特別注意,在completed方法中,還需要再次調(diào)用AsyncrhonousSocketChannel的accept方法,因?yàn)橐粋€(gè)Server端可以接收多個(gè)client端的連接,所以需要繼續(xù)調(diào)用accept方法繼續(xù)接收其他client的連接,最終形成一個(gè)循環(huán)。每當(dāng)一個(gè)client端連接進(jìn)來后,再異步接收新的連接。
在completed方法中,通過asynchronousSocketChannel的read方法來異步讀取客戶端的請(qǐng)求內(nèi)容,讀操作也是異步的。ReadCompletedHandler類是讀操作完成后的回調(diào)函數(shù)。
public class AcceptCompletionHandler implements CompletionHandler<AsynchronousSocketChannel,LoginCheckHandler> {
@Override
public void completed(AsynchronousSocketChannel asynchronousSocketChannel, LoginCheckHandler attachment){
System.out.println("服務(wù)器接收到連接");
attachment.asynchronousServerSocketChannel.accept(attachment,this);
ByteBuffer buffer = ByteBuffer.allocate(1024);
asynchronousSocketChannel.read(buffer,buffer,new ReadCompletedHandler(asynchronousSocketChannel));
}
@Override
public void failed(Throwable t,LoginCheckHandler attachment){
t.printStackTrace();
attachment.latch.countDown();
}
}
ReadCompletedHandler也是CompletionHandler的實(shí)現(xiàn)類,如果是讀操作的回調(diào)函數(shù),java類庫已經(jīng)明確規(guī)定了,其泛型的參數(shù)類型為
CompletionHandler<Integer,ByteBuffer>
其中Integer主要是為了記錄讀取client端的請(qǐng)求數(shù)據(jù)的大小。
在completed方法內(nèi)部,通過讀取緩沖區(qū)ByteBuffer獲取請(qǐng)求數(shù)據(jù),并且完成業(yè)務(wù)處理,返回響應(yīng)內(nèi)容。從doWrite方法內(nèi)部,可以看到寫操作也是異步的。通過匿名內(nèi)部類的方式來指定了寫操作完成后的回調(diào)函數(shù)。
public class ReadCompletedHandler implements CompletionHandler<Integer,ByteBuffer>{
private AsynchronousSocketChannel asynchronousSocketChannel;
public ReadCompletedHandler(AsynchronousSocketChannel channel){
if( this.asynchronousSocketChannel == null){
this.asynchronousSocketChannel = channel;
}
}
public void completed(Integer result,ByteBuffer attachment){
attachment.flip();
byte[] reponse = new byte[attachment.remaining()];
attachment.get(reponse);
try{
String req = new String(reponse,"UTF-8");
System.out.println("the server received: "+req);
//獲取訪問的域名,如果是登錄請(qǐng)求,則返回已經(jīng)登錄,否則提示沒有登錄
if(req.split(" ")[1].equals("/a/login")){
doWrite("you are not login in this system!");
}else{
doWrite("you have login in this system!");
}
}catch(Exception e){
e.printStackTrace();
}
}
public void doWrite(String response){
try{
Thread.currentThread().sleep(5000);
System.out.println("讀取到數(shù)據(jù),等待5s");
}catch(Exception e){
e.printStackTrace();
}
byte[] bytes = response.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
writeBuffer.put(bytes);
writeBuffer.flip();
asynchronousSocketChannel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
//只要有剩余的內(nèi)容沒有發(fā)送完就繼續(xù)發(fā)送數(shù)據(jù)
if(attachment.hasRemaining()){
asynchronousSocketChannel.write(attachment,attachment,this);
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try{
asynchronousSocketChannel.close();
}catch(Exception e){
e.printStackTrace();
}
}
});
}
public void failed(Throwable t,ByteBuffer attachment){
try{
this.asynchronousSocketChannel.close();
}catch(Exception e){
e.printStackTrace();
}
}
}
AIO client端代碼:
public class Client {
public static void main(String args[]){
new Thread(new ClientHandler("localhost",8080)).start();
}
}
對(duì)于Client端的代碼,特別要注意AsynchronousSocketChannel的connect方法已經(jīng)指定了連接成功后的回調(diào)函數(shù)的泛型為
CompletionHandler<Void,ClientHandler>
在ClientHandler內(nèi)部,完成了連接成功后的回調(diào)函數(shù)(ClientHandler),寫操作完成后的回調(diào)函數(shù)(匿名內(nèi)部類),讀操作完成后的回調(diào)函數(shù)(匿名內(nèi)部類)。
public class ClientHandler implements CompletionHandler<Void,ClientHandler>,Runnable {
private AsynchronousSocketChannel asynchronousSocketChannel;
private String host;
private int port;
private CountDownLatch countDownLatch;
public ClientHandler(String host,int port){
this.host = host;
this.port = port;
try{
asynchronousSocketChannel = AsynchronousSocketChannel.open();
}catch(Exception e){
e.printStackTrace();
}
}
public void run(){
countDownLatch = new CountDownLatch(1);
asynchronousSocketChannel.connect(new InetSocketAddress(host,port),this,this);
try{
countDownLatch.await();
}catch(Exception e){
e.printStackTrace();
}
}
@Override
public void completed(Void result, ClientHandler attachment) {
byte[] req = "GET /a/index HTTP/1.1 ".getBytes();
final ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
writeBuffer.put(req);
writeBuffer.flip();
asynchronousSocketChannel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer,ByteBuffer>() {
@Override
public void completed(Integer result,ByteBuffer attachment){
if(attachment.hasRemaining()){
asynchronousSocketChannel.write(attachment,attachment,this);
}else{
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
asynchronousSocketChannel.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) {
attachment.flip();
byte[] bytes = new byte[attachment.remaining()];
attachment.get(bytes);
String body;
try{
body = new String(bytes,"UTF-8");
System.out.println(body);
countDownLatch.countDown();
}catch(Exception e){
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
asynchronousSocketChannel.close();
countDownLatch.countDown();
}catch(Exception e){
e.printStackTrace();
}
}
});
}
}
public void failed(Throwable t,ByteBuffer attachment){
try{
asynchronousSocketChannel.close();
}catch(Exception e){
e.printStackTrace();
}
}
});
}
@Override
public void failed(Throwable exc, ClientHandler attachment) {
try{
asynchronousSocketChannel.close();
}catch(Exception e){
e.printStackTrace();
}
}
}
異步SocketChannel為被動(dòng)執(zhí)行對(duì)象,編程人員不需要像NIO一樣,編寫?yīng)毩⒌腎/O線程來處理讀寫操作。對(duì)于AsynchrousServerSocketChannel和AsynchrousSocketChannel,它們都由JDK底層的線程池負(fù)責(zé)回調(diào)并驅(qū)動(dòng)讀寫操作。也正是因?yàn)槿绱薃IO編程模型比NIO編程模型更加簡單。
4.BIO、NIO、AIO對(duì)比
| 類型 | 同步阻塞BIO | 同步非阻塞NIO | 異步非阻塞AIO |
|---|---|---|---|
| 客戶端個(gè)數(shù):(I/O 線程) | 1:1 | M:1(一個(gè)I/O線程處理多個(gè)客戶端連接) | M:0(不需要額外的啟動(dòng)線程,被動(dòng)回調(diào)) |
| I/O類型(阻塞) | 阻塞I/O | 非阻塞I/O | 非阻塞I/O |
| I/O類型(同步) | 同步I/O | 同步I/O | 異步I/O |
| API使用難度 | 簡單 | 非常復(fù)雜 | 復(fù)雜 |
| 調(diào)試難度 | 簡單 | 復(fù)雜 | 復(fù)雜 |
| 可靠性 | 非常差 | 高 | 高 |
| 吞吐量 | 低 | 高 | 高 |
上表對(duì)于三種類型的I/O模型進(jìn)行了對(duì)比,具體應(yīng)該選用哪個(gè)編程模型,完全基于實(shí)際的業(yè)務(wù)場景進(jìn)行技術(shù)選型。一般情況低負(fù)載、低并發(fā)的應(yīng)用程序可以選用阻塞IO,以降低編程的復(fù)雜度。而高負(fù)載、高并發(fā)的網(wǎng)絡(luò)應(yīng)用可以采用NIO的非阻塞模式,提供應(yīng)用程序的性能。
參考:李林鋒《netty權(quán)威指南》