【JAVA IO】JAVA NIO源碼淺析
JAVA NIO 是 JDK在1.4版本中發(fā)布的一套API,這套API采用了不同的方式進(jìn)行了IO操作,主要包括四個部分:
- Buffers,數(shù)據(jù)容器,包路徑一般為java.nio.*
- Charsets 和 相關(guān)的編碼解碼器,用于byte和Unicode字符之間編解碼,包路徑為java.nio.charset.*
- Channels 代表和實體之間的連接,可以進(jìn)行IO操作。包路徑為java.nio.channel.*
- Selectors,和可選擇的通道配合,實現(xiàn)多路復(fù)用和IO非阻塞的能力。包路徑為java.nio.channel.*
這里沒有提到的是還有一個包路徑為 java.nio.file.* ,這個是在文件系統(tǒng)方面提供了一些接口和實現(xiàn),since 1.7 ,實際上這個屬于 NIO2的部分。
一般而言,NIO 狹義上指的是 JSR 51 (www.jcp.org/en/jsr/detail?id=51 )規(guī)范,實現(xiàn)的關(guān)于現(xiàn)代操作系統(tǒng)上IO操作的支持和簡化。在JDK1.4中實現(xiàn),包括Buffers, Charsets,Channels,Selectors,以及Regular expressions 。而NIO2 是指 在JSR 203 (www.jcp.org/en/jsr/detail?id=203 ),克服傳統(tǒng)File類的一些問題,并且支持異步IO和socket通道的功能。在JDK1.7中實現(xiàn),主要包括 增強(qiáng)的文件系統(tǒng)接口,異步IO,完善了Socket 通道功能。
Buffer
Buffer 是存放基本數(shù)據(jù)的容器,Buffer類本身是一個抽象類,具體實現(xiàn)有ByteBuffer, CharBuffer,
DoubleBuffer, FloatBuffer, IntBuffer, LongBuffer, ShortBuffer 。除了boolean,每個基本類型都有一個實現(xiàn)。還有HeapByteBuffer
Buffer本身是為了讀寫而存在的,有幾個關(guān)鍵變量,position, limit, capacity 和 mark。
pos代表當(dāng)前讀或者寫的位置
limit代表有效數(shù)據(jù)上界,
mark代表標(biāo)記的位置
capacity代表整個容量
并且下面條件始終成立,否則拋出異常
0 <= mark <= pos <= lim <= cap;
有幾個常規(guī)操作,put,get,flip,rewind,reset/mark, clear。
一般寫完了,flip進(jìn)行讀,重讀就rewind,再clear繼續(xù)去寫, 如果沒有讀完,就compact
還有一個特殊的Buffer是DirectByteBuffer ,這個Buffer的類圖如下:

DirectByteBuffer 只有包訪問權(quán)限,可以通過ByteBuffer的靜態(tài)方法allocateDirect工廠方法創(chuàng)建,這個Buffer的特點是不在堆上分配,而是使用直接內(nèi)存,所以是連續(xù)的,而不受垃圾回收的影響,底層操作系統(tǒng)可以使用原生IO操作對這塊Buff直接進(jìn)行填寫和清空,效率非常高。并且在使用channel的時候如果使用的不是Direct的buffer,channel底層會創(chuàng)建一個臨時的direct buffer ,把內(nèi)容拷貝進(jìn)行進(jìn)行操作。
另外MappedBuffer,是可以把底層文件直接映射到內(nèi)存進(jìn)來的,對這一段buff的操作會直接對底層文件生效。
MappedByteBuffer buffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 50, 100);
channel
channel對象代表一種連接,包括文件,socket,硬件設(shè)備,應(yīng)用組件等一系列可以進(jìn)行讀寫IO操作的實體。Channel能夠高效的在buffer和這些實體之間傳輸速度。Channel可以很好的對,操作系統(tǒng)的設(shè)施進(jìn)行建模。
channel常見的實現(xiàn)有FileChannel,SocketChannel,DatagramChannel 類圖如下:

InterruptibleChannel 接口支持異步關(guān)閉和IO阻塞可中斷,AbstractInteruptibleChannel給出了初步的實現(xiàn),利用begin和end進(jìn)行組合,代表一個I/O操作。源碼如下:
protected final void begin() {
if (interruptor == null) {
interruptor = new Interruptible() {
public void interrupt(Thread target) {
synchronized (closeLock) {
if (!open)
return;
open = false;
interrupted = target;
try {
(略).this.implCloseChannel();
} catch (IOException x) { }
}
}};
}
blockedOn(interruptor);
Thread me = Thread.currentThread();
if (me.isInterrupted())
interruptor.interrupt(me);
}
protected final void end(boolean completed)
throws AsynchronousCloseException
{
blockedOn(null);
Thread interrupted = this.interrupted;
if (interrupted != null && interrupted == Thread.currentThread()) {
interrupted = null;
throw new ClosedByInterruptException();
}
if (!completed && !open)
throw new AsynchronousCloseException();
}
interruptor相當(dāng)于一個中斷回調(diào),blockedOn是把這個回調(diào)綁定到當(dāng)前線程上,
最后判斷,是不是已經(jīng)被中斷了,被中斷了,就執(zhí)行回調(diào)。
回調(diào)做的事情:關(guān)閉當(dāng)前Channel,并把被中斷的線程記錄到channel對象interrupted域。
而end,先把回調(diào)和本線程割裂,那么本線程就不會執(zhí)行中斷回調(diào),進(jìn)一步在判斷channel是不是已經(jīng)中斷過了,中斷的線程是不是自己,如果發(fā)現(xiàn)自己已經(jīng)被中斷過了,拋出中斷關(guān)閉,否則判斷channel是不是在不完整工作的情況下關(guān)閉了,拋出異步關(guān)閉。
FileChannel可以獲得文件鎖,實際上文件鎖是對應(yīng)到底層文件系統(tǒng)的,可以分段加鎖,選擇是否共享,共享的情況下都可以讀,獨占的情況下,只有一個線程可以進(jìn)行寫操作。
package com.shalk.jio;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.IntBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
public class ChannelLock {
public static final int QUERY_LOOP = 15000;
public static final int UPDATE_LOOP = 15000;
// 加鎖長度
public final static int lockLen = 16;
public static void update(FileChannel channel) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(lockLen);
IntBuffer intBuffer = buffer.asIntBuffer();
int count = 0;
int pos = 0;
for (int i = 0; i < UPDATE_LOOP; i++, pos += lockLen, count++) {
FileLock lock = channel.lock(pos, lockLen, false);
System.out.println("獲得獨占鎖");
try {
intBuffer.clear();
int a = count;
int b = count * 2;
int c = count * 3;
int d = count * 4;
intBuffer.put(a);
intBuffer.put(b);
intBuffer.put(c);
intBuffer.put(d);
System.out.println(String.format("寫入: %d %d %d %d", a, b, c, d));
buffer.clear();
channel.write(buffer, pos);
} finally {
lock.release();
}
}
}
public static void main(String[] args) throws IOException {
boolean writeMode = false;
if (args.length != 0) {
writeMode = true;
}
try (
RandomAccessFile file = new RandomAccessFile("tmp", writeMode ? "rw" : "r");
FileChannel channel = file.getChannel();
) {
if (writeMode) {
System.out.println("寫模式:");
update(channel);
} else {
System.out.println("讀模式:");
query(channel);
}
}
}
private static void query(FileChannel channel) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(lockLen);
IntBuffer intBuffer = buffer.asIntBuffer();
int pos = 0;
int count = 0;
for (int i = 0; i < QUERY_LOOP; i++) {
FileLock lock = channel.lock(pos, lockLen, true);
System.out.println("獲得共享鎖");
try {
buffer.clear();
channel.read(buffer, pos);
int a = intBuffer.get(0);
int b = intBuffer.get(1);
int c = intBuffer.get(2);
int d = intBuffer.get(3);
System.out.println(String.format("讀到: %d %d %d %d", a, b, c, d));
if (a != count || 2 * a != b || 3 * a != c || 4 * a != d) {
System.err.println("數(shù)據(jù)錯誤");
break;
}
count++;
pos += lockLen;
} finally {
lock.release();
}
}
}
}
再看一下Socketchannel 的類圖如下:

前面描述過,InterruptibleChannel主要保證阻塞IO可中斷,那SelectableChannel 和NetworkChannel的作用是什么呢。并且這ServerSocketChannel實現(xiàn)了 ByteChannel, GatheringByteChannel, ScatteringByteChannel,因此具備可讀可寫IO,以及gather、scatter IO的能力。而且這些Channel實現(xiàn)是線程安全的。
SelectableChannel 是允許Channel選擇模式,阻塞模式或者非阻塞模式,非阻塞的時候可以做一些其他的事情,這個在Selector的部分再說。
另外Pipe是NIO關(guān)于管道的實現(xiàn),也可以配置非阻塞,其中有兩個Channel,SourceChannel和SinkChannel因為繼承了SelectableChannel,都可以配置,分別是可讀和可寫的。
Selector
繼承自SelectableChannel的Channel,像PipedChannel和SocketChannel都是可以配置非阻塞的,但是僅僅是非阻塞,無法判斷數(shù)據(jù)是不是到達(dá),例如read方法,在非阻塞下,可能返回0,可能返回數(shù)據(jù),最好是得知數(shù)據(jù)可以讀了再進(jìn)行讀,不然編程模型會非常復(fù)雜,即read的時候要做判斷,并且要處理讀到的數(shù)據(jù)。需要一個判斷可以讀了方法,并且如果在服務(wù)器端,有很多客戶端的SocketChannel,那需要判斷哪些channel沒有阻塞了,可以進(jìn)行處理了,Selector做了一個輪訓(xùn)封裝,并且實現(xiàn)多路服用,即一個線程處理多個channel.
套路如下:
while (true)
{
int numReadyChannels = selector.select();
if (numReadyChannels == 0)
continue; // there are no ready channels to process
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
while (keyIterator.hasNext())
{
SelectionKey key = keyIterator.next();
if (key.isAcceptable())
{
// A connection was accepted by a ServerSocketChannel.
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel client = server.accept();
if (client == null) // in case accept() returns null
continue;
client.configureBlocking(false); // must be nonblocking
// Register socket channel with selector for read operations.
client.register(selector, SelectionKey.OP_READ);
}
else
if (key.isReadable())
{
// A socket channel is ready for reading.
SocketChannel client = (SocketChannel) key.channel();
// Perform work on the socket channel.
}
else
if (key.isWritable())
{
// A socket channel is ready for writing.
SocketChannel client = (SocketChannel) key.channel();
// Perform work on the socket channel.
}
keyIterator.remove();
}
}
注意事項,處理過的要remove,不然下次狀態(tài)不會被更新,并且始終在selectedKeys集合內(nèi),異常處理的Channel,應(yīng)該把key cancel掉,否則有可能輪訓(xùn)出這些已經(jīng)異常的channel。
正則/Charset/Formater
NIO中不僅對非阻塞方面進(jìn)行了增強(qiáng),還增加了正則、字符集以及Formatter方面。
正則可以參考正則方面的。
字符集主要是對應(yīng)編解碼,并且Charset的decode和encode方法很強(qiáng)大,可以配合Buffer使用。
Formatter就是對應(yīng)C語言中的printf,實現(xiàn)了String.format。
小結(jié)
Buffer提供了緩存的抽象和靈活操作,Channel提供了不同IO實體的連接抽象,并提供了非阻塞的實現(xiàn),Selector進(jìn)一步通過不同操作系統(tǒng)提供了SelectorProvider提供實現(xiàn),近似于select/epoll的非阻塞輪詢模型,實現(xiàn)多路復(fù)用的效果。
參考
https://docs.oracle.com/javase/8/docs/api/java/nio/package-summary.html
https://docs.oracle.com/javase/8/docs/api/java/nio/channels/package-summary.html#multiplex