引子
在學(xué)習(xí)一個(gè)新東西之前,都需要先知道為什么需要這個(gè)東西。NIO也是如此,它和之前的IO有什么區(qū)別,為了加深理解,我先從NIO涉及到的網(wǎng)絡(luò)方面來(lái)切入學(xué)習(xí)。
java 網(wǎng)絡(luò)基礎(chǔ)知識(shí)
java傳統(tǒng)的網(wǎng)絡(luò)通信分為客戶端程序和服務(wù)端程序兩個(gè)部分。
客戶端程序大致步驟為:
Socket socket = new Socket(...); // 打開(kāi)套接字, 負(fù)責(zé)進(jìn)行網(wǎng)絡(luò)通信
InputStream is = socket.getInputStream(); // 獲取流
in.nextLine(); // 讀取數(shù)據(jù)
服務(wù)端程序大致步驟為:
ServerSocket serverSocket = new ServerSocket(...); // 創(chuàng)建服務(wù)端套接字
Socket incoming = serverSocket.accept(...); // 監(jiān)聽(tīng)端口,等待客戶端連接
InputStream inputStream = incoming.getInputStream(); // 獲取流
// 處理邏輯,返回?cái)?shù)據(jù)給客戶端...
服務(wù)端這種寫法的問(wèn)題在于使用ServerSocket的accept()方法時(shí)會(huì)導(dǎo)致當(dāng)前線程阻塞,而且,getInputStream()讀方法同樣也會(huì)導(dǎo)致線程阻塞。只能支持一個(gè)客戶端連接服務(wù)器,為了支持多個(gè)客戶端同時(shí)連接服務(wù)器,需要修改服務(wù)端程序如下:
while(true){
Socket income =s .accpet(); // 等待連接
Thread t = new Thread(...); // 創(chuàng)建線程,將accept后的socket實(shí)例在線程內(nèi)部進(jìn)行相關(guān)的邏輯處理
t.start(); // 啟動(dòng)線程
}
這樣寫的好處在于,監(jiān)聽(tīng)到一個(gè)新的客戶端連接后,就創(chuàng)建一個(gè)新的線程,在新的線程內(nèi)部進(jìn)行讀寫操作,這樣就不會(huì)阻塞其他線程,就不會(huì)因?yàn)楫?dāng)前線程的讀寫操作影響到客戶端的連接了。當(dāng)然,accpet方法導(dǎo)致的阻塞問(wèn)題還是沒(méi)有解決,所以還是不能滿足高性能服務(wù)器的要求,為實(shí)現(xiàn)服務(wù)器更高的吞吐量,我們需要學(xué)習(xí)java的NIO。事實(shí)上,說(shuō)到這才剛開(kāi)始引出我今天要分享的主題:NIO。
NIO
NIO中涉及一些新的概念,上網(wǎng)閱讀了一些文章幫助理解,發(fā)現(xiàn)有篇文章寫得挺好,分享一下:
http://www.iteye.com/magazines/132-Java-NIO,有需要可以先讀一下這篇文章:
Java NIO 核心部分如下:
- Channels
- Buffers
- Selectors
剛開(kāi)始學(xué),我就簡(jiǎn)單的把Channels理解為連接,對(duì)標(biāo)Socket;把Buffers理解為緩沖區(qū),是InputStream流和OutInputStream的集合,所以網(wǎng)絡(luò)數(shù)據(jù)傳輸就是Channels和Buffers之間的數(shù)據(jù)傳遞。
Channel和Buffer有好幾種類型。下面是JAVA NIO中的一些主要Channel的實(shí)現(xiàn):
- FileChannel
- DatagramChannel
- SocketChannel
- ServerSocketChannel
Selector允許單線程處理多個(gè) Channel,這是在一個(gè)單線程中使用一個(gè)Selector處理3個(gè)Channel的圖示:
要使用Selector,得向Selector注冊(cè)Channel,然后調(diào)用它的select()方法。這個(gè)方法會(huì)一直阻塞到某個(gè)注冊(cè)的通道有事件就緒。一旦這個(gè)方法返回,線程就可以處理這些事件,事件的例子有如新連接進(jìn)來(lái),數(shù)據(jù)接收等。
NIO和IO的區(qū)別
NIO和IO最大的區(qū)別就是IO是面向流的,NIO面向緩沖區(qū)的。IO的各種流是阻塞的,這意味著,當(dāng)一個(gè)線程調(diào)用read和writer方法時(shí),該線程將被阻塞,直到有一些數(shù)據(jù)被讀取,或者數(shù)據(jù)完全被寫入,該線程在此期間不能再干任何事。NIO是非阻塞模式,使一個(gè)線程從某通道發(fā)送請(qǐng)求讀取數(shù)據(jù),但是它僅能得到目前可用數(shù)據(jù),如果目前沒(méi)有數(shù)據(jù)可用,就什么都不會(huì)獲取,而不是保持線程阻塞,所以在變得數(shù)據(jù)可以獲取之前,線程可以先繼續(xù)做其他事情,非阻塞寫也是如此,一個(gè)線程請(qǐng)求寫入一些數(shù)據(jù)到某通道,但不需要等待它完全寫完,這個(gè)線程同時(shí)也可以去做其他事情,線程通常將非阻塞IO的空閑時(shí)間用于其他通道上執(zhí)行IO操作,所以一個(gè)單獨(dú)的線程現(xiàn)在可以管理多個(gè)輸入和輸出通道(channel)。IO是單線程的,而NIO是用選擇器來(lái)模擬多線程的。
SocketChannel
SocketChannel是Channel的一種,本篇文章主要也是要討論網(wǎng)絡(luò)中涉及到的NIO,所以接下來(lái)來(lái)著重討論下SocketChannel。下面部分內(nèi)容參考https://blog.csdn.net/u011381576/article/details/79876754這篇文章,有需要的同學(xué)也可以看下這篇問(wèn)題。
NIO的channel抽象的一個(gè)重要特征就是可以通過(guò)配置它的阻塞行為,以實(shí)現(xiàn)非阻塞式的信道。
channel.configureBlocking(false)
在非阻塞式信道上調(diào)用一個(gè)方法總是會(huì)立即返回。這種調(diào)用的返回值指示了所請(qǐng)求的操作完成的程度。例如,在一個(gè)非阻塞式ServerSocketChannel上調(diào)用accept()方法,如果有連接請(qǐng)求來(lái)了,則返回客戶端SocketChannel,否則返回null。
使用了SocketChannel后,客戶端程序可以寫作如下:
ByteBuffer buffer = ByteBuffer.allocate(1024);
SocketChannel socketChannel = null;
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.connect(new InetSocketAddress("10.10.195.115",8080));
if(socketChannel.finishConnect())
{
int i=0;
while(true)
{
TimeUnit.SECONDS.sleep(1);
String info = "I'm "+i+++"-th information from client";
buffer.clear();
buffer.put(info.getBytes());
buffer.flip();
while(buffer.hasRemaining()){
System.out.println(buffer);
socketChannel.write(buffer);
}
}
}
if(socketChannel!=null){
socketChannel.close();
}
}
服務(wù)端程序用到了selector選擇器,代碼如下:
public class ServerConnect
{
private static final int BUF_SIZE=1024;
private static final int PORT = 8080;
private static final int TIMEOUT = 3000;
public static void main(String[] args)
{
selector();
}
public static void selector() {
Selector selector = null;
ServerSocketChannel ssc = null;
try{
selector = Selector.open();
ssc= ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(PORT));
ssc.configureBlocking(false);
ssc.register(selector, SelectionKey.OP_ACCEPT);
while(true){
if(selector.select(TIMEOUT) == 0){
System.out.println("==");
continue;
}
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while(iter.hasNext()){
SelectionKey key = iter.next();
if(key.isAcceptable()){
handleAccept(key);
}
if(key.isReadable()){
handleRead(key);
}
if(key.isWritable() && key.isValid()){
handleWrite(key);
}
if(key.isConnectable()){
System.out.println("isConnectable = true");
}
iter.remove();
}
}
}catch(IOException e){
e.printStackTrace();
}finally{
try{
if(selector!=null){
selector.close();
}
if(ssc!=null){
ssc.close();
}
}catch(IOException e){
e.printStackTrace();
}
}
}
public static void handleAccept(SelectionKey key) throws IOException{
ServerSocketChannel ssChannel = (ServerSocketChannel)key.channel();
SocketChannel sc = ssChannel.accept();
sc.configureBlocking(false);
sc.register(key.selector(), SelectionKey.OP_READ,ByteBuffer.allocateDirect(BUF_SIZE));
}
public static void handleRead(SelectionKey key) throws IOException{
SocketChannel sc = (SocketChannel)key.channel();
ByteBuffer buf = (ByteBuffer)key.attachment();
long bytesRead = sc.read(buf);
while(bytesRead>0){
buf.flip();
while(buf.hasRemaining()){
System.out.print((char)buf.get());
}
System.out.println();
buf.clear();
bytesRead = sc.read(buf);
}
if(bytesRead == -1){
sc.close();
}
}
public static void handleWrite(SelectionKey key) throws IOException{
ByteBuffer buf = (ByteBuffer)key.attachment();
buf.flip();
SocketChannel sc = (SocketChannel) key.channel();
while(buf.hasRemaining()){
sc.write(buf);
}
buf.compact();
}
}
注意:
- ServerSocketChannel可以設(shè)置成非阻塞模式。在非阻塞模式下,accept() 方法會(huì)立刻返回,如果還沒(méi)有新進(jìn)來(lái)的連接,返回的將是null。 因此,需要檢查返回的SocketChannel是否是null。
Selector
Selector的創(chuàng)建:
Selector selector = Selector.open();
為了將Channel和Selector配合使用,必須將Channel注冊(cè)到Selector上,通過(guò)
SelectableChannel.register() //SelectableChannel是ServerSocketChannel的父類
方法來(lái)實(shí)現(xiàn)。
與Selector一起使用時(shí),Channel必須處于非阻塞模式下。這意味著不能將FileChannel與Selector一起使用,因?yàn)镕ileChannel不能切換到非阻塞模式。而套接字通道都可以。
register()方法的第二個(gè)參數(shù)。這是一個(gè)“interest集合”,意思是在通過(guò)Selector監(jiān)聽(tīng)Channel時(shí)對(duì)什么事件感興趣??梢员O(jiān)聽(tīng)四種不同類型的事件:Connect,Accept, Read,Write。通道觸發(fā)了一個(gè)事件意思是該事件已經(jīng)就緒。所以,某個(gè)channel成功連接到另一個(gè)服務(wù)器稱為“連接就緒”。一個(gè)server socket channel準(zhǔn)備好接收新進(jìn)入的連接稱為“接收就緒”。一個(gè)有數(shù)據(jù)可讀的通道可以說(shuō)是“讀就緒”。等待寫數(shù)據(jù)的通道可以說(shuō)是“寫就緒”。這四種事件用SelectionKey的四個(gè)常量來(lái)表示:
1. SelectionKey.OP_CONNECT
2. SelectionKey.OP_ACCEPT
3. SelectionKey.OP_READ
4. SelectionKey.OP_WRITE
SelectionKey
當(dāng)向Selector注冊(cè)Channel時(shí),register()方法會(huì)返回一個(gè)SelectionKey對(duì)象。這個(gè)對(duì)象包含了一些你感興趣的屬性:
- interest集合
- ready集合
- Channel
- Selector
- 附加的對(duì)象(可選)
interest集合:就像向Selector注冊(cè)通道一節(jié)中所描述的,interest集合是你所選擇的感興趣的事件集合??梢酝ㄟ^(guò)SelectionKey讀寫interest集合。
ready 集合是通道已經(jīng)準(zhǔn)備就緒的操作的集合。在一次選擇(Selection)之后,你會(huì)首先訪問(wèn)這個(gè)ready set。Selection將在下一小節(jié)進(jìn)行解釋。可以這樣訪問(wèn)ready集合:
int readySet = selectionKey.readyOps();
可以用像檢測(cè)interest集合那樣的方法,來(lái)檢測(cè)channel中什么事件或操作已經(jīng)就緒。但是,也可以使用以下四個(gè)方法,它們都會(huì)返回一個(gè)布爾類型:
selectionKey.isAcceptable();
selectionKey.isConnectable();
selectionKey.isReadable();
selectionKey.isWritable();
從SelectionKey訪問(wèn)Channel和Selector很簡(jiǎn)單。如下:
Channel channel = selectionKey.channel();
Selector selector = selectionKey.selector();
通過(guò)Selector選擇通道
一旦向Selector注冊(cè)了一或多個(gè)通道,就可以調(diào)用幾個(gè)重載的select()方法。這些方法返回你所感興趣的事件(如連接、接受、讀或?qū)懀┮呀?jīng)準(zhǔn)備就緒的那些通道。換句話說(shuō),如果你對(duì)“讀就緒”的通道感興趣,select()方法會(huì)返回讀事件已經(jīng)就緒的那些通道。
下面是select()方法:
int select()
int select(long timeout)
int selectNow()
select()阻塞到至少有一個(gè)通道在你注冊(cè)的事件上就緒了。
select(long timeout)和select()一樣,除了最長(zhǎng)會(huì)阻塞timeout毫秒(參數(shù))。
selectNow()不會(huì)阻塞,不管什么通道就緒都立刻返回(譯者注:此方法執(zhí)行非阻塞的選擇操作。如果自從前一次選擇操作后,沒(méi)有通道變成可選擇的,則此方法直接返回零。)。
select()方法返回的int值表示有多少通道已經(jīng)就緒。亦即,自上次調(diào)用select()方法后有多少通道變成就緒狀態(tài)。如果調(diào)用select()方法,因?yàn)橛幸粋€(gè)通道變成就緒狀態(tài),返回了1,若再次調(diào)用select()方法,如果另一個(gè)通道就緒了,它會(huì)再次返回1。如果對(duì)第一個(gè)就緒的channel沒(méi)有做任何操作,現(xiàn)在就有兩個(gè)就緒的通道,但在每次select()方法調(diào)用之間,只有一個(gè)通道就緒了。
一旦調(diào)用了select()方法,并且返回值表明有一個(gè)或更多個(gè)通道就緒了,然后可以通過(guò)調(diào)用selector的selectedKeys()方法,訪問(wèn)“已選擇鍵集(selected key set)”中的就緒通道。如下所示:
Set selectedKeys = selector.selectedKeys();
當(dāng)向Selector注冊(cè)Channel時(shí),Channel.register()方法會(huì)返回一個(gè)SelectionKey 對(duì)象。這個(gè)對(duì)象代表了注冊(cè)到該Selector的通道。
注意每次迭代末尾的keyIterator.remove()調(diào)用。Selector不會(huì)自己從已選擇鍵集中移除SelectionKey實(shí)例。必須在處理完通道時(shí)自己移除。下次該通道變成就緒時(shí),Selector會(huì)再次將其放入已選擇鍵集中。
SelectionKey.channel()方法返回的通道需要轉(zhuǎn)型成你要處理的類型,如ServerSocketChannel或SocketChannel等。