Unix系統(tǒng)有五種IO模型分別是阻塞IO(blocking IO),非阻塞IO( non-blocking IO),IO多路復(fù)用(IO multiplexing),信號(hào)驅(qū)動(dòng)(SIGIO/Signal IO)和異步IO(Asynchronous IO)。而IO多路復(fù)用通常有select,poll,epoll,kqueue等方式。而多路復(fù)用器Selector,就是采用這些IO多路復(fù)用的機(jī)制獲取事件。JDK中的NIO(new IO)包,采用的就是IO多路復(fù)用的模型。
select,poll和epoll
阻塞IO下,應(yīng)用程序調(diào)用IO函數(shù),如果沒(méi)有數(shù)據(jù)貯備好,那么IO操作會(huì)一直阻塞下去,阻塞IO不會(huì)占用大量CPU,但在這種IO模型下,一個(gè)線程只能處理一個(gè)文件描述符的IO事件;非阻塞IO下,應(yīng)用程序調(diào)用IO事件,如果數(shù)據(jù)沒(méi)有準(zhǔn)備好,會(huì)直接返回一個(gè)錯(cuò)誤,應(yīng)用程序不會(huì)阻塞,這樣就可以同時(shí)處理多個(gè)文件描述符的IO事件,但是需要不間斷地輪詢來(lái)獲取IO事件,對(duì)CPU是很大的浪費(fèi)。并且阻塞IO和非阻塞IO調(diào)用一個(gè)IO函數(shù)只能獲取一個(gè)IO事件。
select,poll和epoll是最常見(jiàn)的三種IO多路復(fù)用的方式,它們都支持同時(shí)感知多個(gè)IO事件,它們的工作特點(diǎn)和區(qū)別如下:
- select可以在一定時(shí)間內(nèi)監(jiān)視多個(gè)文件描述符的IO事件,select函數(shù)需要傳入要監(jiān)視的文件描述符的句柄作為參數(shù),并且返回所有文件描述符,應(yīng)用程序需要遍歷所有的循環(huán)來(lái)看每一個(gè)文件描述符是否有IO事件發(fā)生,效率較低。并且,select默認(rèn)只能監(jiān)視1024個(gè)文件描述符,這些文件描述符采用數(shù)組進(jìn)行存儲(chǔ),可以修改FD_SETSIZE的值來(lái)修改文件描述符的數(shù)量限制。
- poll和select類似,poll采用鏈表存儲(chǔ)監(jiān)視的文件描述符,可以超過(guò)1024的限制。
- epoll可以監(jiān)控的文件描述符數(shù)量是可以打開(kāi)文件的數(shù)量上限。與select和poll不同,epoll獲取事件不是通過(guò)輪詢得到,而是通過(guò)給每個(gè)文件描述符定義回調(diào)得到,因此,在監(jiān)視的文件描述符很多的情況下,epoll的效率不會(huì)有明顯的下降。并且,select和poll返回給應(yīng)用程序的是所有的文件描述符,而epoll返回的是就緒(有事件發(fā)生的)的文件描述符。
JDK NIO包中的各種Selector
JDK中的Selector是一個(gè)抽象類,創(chuàng)建一個(gè)Selector通常以下面代碼中的方式進(jìn)行:
/**
* 代碼片段1 創(chuàng)建Selector
*/
Selector selector = Selector.open();
下面是具體的實(shí)現(xiàn):
/**
* 代碼片段2 Selector中的open方法和SelectorProvider中的provider方法
*/
//調(diào)用SelectorProvider的openSelector創(chuàng)建Selector
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
//創(chuàng)建SelectorProvider,最終調(diào)用sun.nio.ch.DefaultSelectorProvider.create(), 這個(gè)方法在不同平臺(tái)上有不同的實(shí)現(xiàn)
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}
在不同的操作系統(tǒng)平臺(tái)下,SelectorProvider的實(shí)現(xiàn)也不相同,創(chuàng)建出來(lái)的Selector的實(shí)現(xiàn)也不一樣。
windows下的多路復(fù)用實(shí)現(xiàn)
windows下的jdk中只有一個(gè)非抽象的SelectorProvider的實(shí)現(xiàn)類——WindowsSelectorProvider。顯然,sun.nio.ch.DefaultSelectorProvider.create()返回的也是一個(gè)WindowsSelectorProvider對(duì)象:
/**
* 代碼片段3 windows環(huán)境下jdk中的sun.nio.ch.DefaultSelectorProvider.create()方法
*/
public static SelectorProvider create() {
return new WindowsSelectorProvider();
}
WindowsSelectorProvider的openSelector方法會(huì)返回一個(gè)WindowsSelectorImpl對(duì)象,WindowsSelectorImpl繼承了SelectorImpl這個(gè)抽象類:
/**
* 代碼片段4
*/
public AbstractSelector openSelector() throws IOException {
return new WindowsSelectorImpl(this);
}
WindowsSelectorImpl的成員變量pollWrapper是一個(gè)PollArrayWrapper對(duì)象,PollArrayWrapper類在openjdk的源碼中有這樣的一段文檔注釋:
/**
* 代碼片段5 windows下的PollArrayWrapper類中的注釋
*/
/**
* Manipulates a native array of structs corresponding to (fd, events) pairs.
*
* typedef struct pollfd {
* SOCKET fd; // 4 bytes
* short events; // 2 bytes
* } pollfd_t;
*
* @author Konstantin Kladko
* @author Mike McCloskey
*/
PollArrayWrapper是用來(lái)操作(fd,events)對(duì)相對(duì)應(yīng)的結(jié)構(gòu)的原生數(shù)組。這個(gè)原生數(shù)組的結(jié)構(gòu)就是上面的注釋中的結(jié)構(gòu)體所定義的。PollArrayWrapper類中通過(guò)操作AllocatedNativeObject類型的成員變量pollArray來(lái)操作文件描述符和事件。AllocatedNativeObject類繼承了NativeObject類,NativeObject類型是駐留在本地內(nèi)存中的對(duì)象的代理,提供了在堆外內(nèi)存中存放和取出除boolean外的基本類型數(shù)據(jù)的方法。以byte類型為例,其存取方法如下:
/**
* 代碼片段6 NativeObject中的getByte和putByte方法
*/
/**
* Reads a byte starting at the given offset from base of this native
* object.
*
* @param offset
* The offset at which to read the byte
*
* @return The byte value read
*/
final byte getByte(int offset) {
return unsafe.getByte(offset + address);
}
/**
* Writes a byte at the specified offset from this native object's
* base address.
*
* @param offset
* The offset at which to write the byte
*
* @param value
* The byte value to be written
*/
final void putByte(int offset, byte value) {
unsafe.putByte(offset + address, value);
}
PollArrayWrapper中提供了方法存儲(chǔ)事件和文件描述符,這些方法都通過(guò)來(lái)pollArray存取int和short類型,這些方法和PollArrayWrapper構(gòu)造方法如下:
/**
* 代碼片段7 PollArrayWrapper構(gòu)造方法和對(duì)文件描述符和事件的操作方法
*/
PollArrayWrapper(int newSize) {
int allocationSize = newSize * SIZE_POLLFD;
pollArray = new AllocatedNativeObject(allocationSize, true);
pollArrayAddress = pollArray.address();
this.size = newSize;
}
// Access methods for fd structures
void putDescriptor(int i, int fd) {
pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd);
}
void putEventOps(int i, int event) {
pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event);
}
int getEventOps(int i) {
return pollArray.getShort(SIZE_POLLFD * i + EVENT_OFFSET);
}
int getDescriptor(int i) {
return pollArray.getInt(SIZE_POLLFD * i + FD_OFFSET);
}
因?yàn)閜ollfd結(jié)構(gòu)體中,fd占用4個(gè)字節(jié),events占用2個(gè)字節(jié)分別對(duì)應(yīng)int和short的長(zhǎng)度。FD_OFFSET,EVENT_OFFSET和SIZE_POLLFD分別是final修飾的int常量0,4和8。PollArrayWrapper會(huì)用8個(gè)字節(jié)來(lái)存儲(chǔ)一個(gè)event和fd的配對(duì),構(gòu)造一個(gè)PollArrayWrapper對(duì)象會(huì)從堆外內(nèi)存分配newSize8字節(jié)的空間。獲取第i個(gè)fd則獲取對(duì)應(yīng)的第8i個(gè)字節(jié)對(duì)應(yīng)的int,獲取第i個(gè)event則只要獲取第8*i+4個(gè)字節(jié)對(duì)應(yīng)的short。所以構(gòu)造一個(gè)size大小的PollArrayWrapper對(duì)象就可以存儲(chǔ)size個(gè)fd,event對(duì),并且他們?cè)趦?nèi)存上是連續(xù)的(每對(duì)的空間末尾有2個(gè)字節(jié)用不到),所以這也是一個(gè)數(shù)組。
Selector中的doSelect方法是具體對(duì)文件描述符的操作,WindowsSelectorImpl中的doSelector方法如下:
/**
* 代碼片段8 WindowsSelectorImpl內(nèi)部類SubSelector的poll方法中的doSelect方法及其調(diào)用的方法具體實(shí)現(xiàn)
*/
protected int doSelect(long timeout) throws IOException {
if (channelArray == null)
throw new ClosedSelectorException();
this.timeout = timeout; // set selector timeout
processDeregisterQueue();
if (interruptTriggered) {
resetWakeupSocket();
return 0;
}
// 計(jì)算輪詢所需的輔助線程數(shù)。如果需要,在這里創(chuàng)建線程并開(kāi)始等待startLock
adjustThreadsCount();
// 重置finishLock
finishLock.reset();
// 喚醒輔助線程,等待啟動(dòng)鎖,線程啟動(dòng)后會(huì)開(kāi)始輪詢。冗余線程將在喚醒后退出。
startLock.startThreads();
// 在主線程中進(jìn)行輪詢。主線程負(fù)責(zé)pollArray中的前MAX_SELECTABLE_FDS(默認(rèn)1024)個(gè)fd,event對(duì)。
try {
begin();
try {
subSelector.poll();
} catch (IOException e) {
// 保存異常
finishLock.setException(e);
}
// 主線程poll()調(diào)用結(jié)束。喚醒其他線程并等待他們
if (threads.size()0)
finishLock.waitForHelperThreads();
} finally {
end();
}
finishLock.checkForException();
processDeregisterQueue();
// 更新相應(yīng)channel的操作。將就緒的key添加到就緒隊(duì)列。
int updated = updateSelectedKeys();
// poll()調(diào)用完成。為下一次運(yùn)行,將wakeupSocket設(shè)置為nonsigned。
resetWakeupSocket();
return updated;
}
//WindowsSelectorImpl內(nèi)部類SubSelector的poll方法
private int poll() throws IOException{ // poll for the main thread
return poll0(pollWrapper.pollArrayAddress,
Math.min(totalChannels, MAX_SELECTABLE_FDS),
readFds, writeFds, exceptFds, timeout);
}
//WindowsSelectorImpl內(nèi)部類SubSelector的poll0方法
private native int poll0(long pollAddress, int numfds,
int[] readFds, int[] writeFds, int[] exceptFds, long timeout);
poll0方法的C語(yǔ)言源碼:
/**
* 代碼片段9 WindowsSelectorImpl內(nèi)部類SubSelector的poll方法的c源碼
*/
JNIEXPORT jint JNICALL
Java_sun_nio_ch_WindowsSelectorImpl_00024SubSelector_poll0(JNIEnv *env, jobject this,
jlong pollAddress, jint numfds,
jintArray returnReadFds, jintArray returnWriteFds,
jintArray returnExceptFds, jlong timeout)
{
... //省略部分代碼
/* Call select */
if ((result = select(0 , &readfds, &writefds, &exceptfds, tv))//調(diào)用系統(tǒng)的select函數(shù)
== SOCKET_ERROR) {
/* Bad error - this should not happen frequently */
/* Iterate over sockets and call select() on each separately */
FD_SET errreadfds, errwritefds, errexceptfds;
readfds.fd_count = 0;
writefds.fd_count = 0;
exceptfds.fd_count = 0;
for (i = 0; i < numfds; i++) {
/* prepare select structures for the i-th socket */
errreadfds.fd_count = 0;
errwritefds.fd_count = 0;
if (fds[i].events & POLLIN) {
errreadfds.fd_array[0] = fds[i].fd;
errreadfds.fd_count = 1;
}
if (fds[i].events & (POLLOUT | POLLCONN))
{
errwritefds.fd_array[0] = fds[i].fd;
errwritefds.fd_count = 1;
}
errexceptfds.fd_array[0] = fds[i].fd;
errexceptfds.fd_count = 1;
/* call select on the i-th socket */
if (select(0, &errreadfds, &errwritefds, &errexceptfds, &zerotime)//調(diào)用系統(tǒng)的select函數(shù)
== SOCKET_ERROR) {
/* This socket causes an error. Add it to exceptfds set */
exceptfds.fd_array[exceptfds.fd_count] = fds[i].fd;
exceptfds.fd_count++;
} else {
/* This socket does not cause an error. Process result */
if (errreadfds.fd_count == 1) {
readfds.fd_array[readfds.fd_count] = fds[i].fd;
readfds.fd_count++;
}
if (errwritefds.fd_count == 1) {
writefds.fd_array[writefds.fd_count] = fds[i].fd;
writefds.fd_count++;
}
if (errexceptfds.fd_count == 1) {
exceptfds.fd_array[exceptfds.fd_count] = fds[i].fd;
exceptfds.fd_count++;
}
}
}
}
... //省略部分代碼
}
可見(jiàn)Window環(huán)境的JDK的nio是調(diào)用select系統(tǒng)函數(shù)來(lái)進(jìn)行的。
linux下的多路復(fù)用實(shí)現(xiàn)
linux的jdk中有2個(gè)非抽象的Selector的子類——PollSelectorImpl和EPollSelectorImpl。
PollSelectorImpl
顧名思義,PollSelectorImpl是采用poll來(lái)進(jìn)行多路復(fù)用。PollSelectorImpl繼承了AbstractPollSelectorImpl。AbstractPollSelectorImpl中也維護(hù)了一個(gè)PollArrayWrapper來(lái)存儲(chǔ)文件描述符和事件對(duì),但linux下的PollArrayWrapper和windows下的實(shí)現(xiàn)并不相同。先看PollSelectorImpl的doSelect方法如下:
/**
* 代碼片段10 PollSelectorImpl的doSelect方法
*/
protected int doSelect(long timeout)
throws IOException
{
if (channelArray == null)
throw new ClosedSelectorException();
processDeregisterQueue();
try {
begin();
pollWrapper.poll(totalChannels, 0, timeout);
} finally {
end();
}
processDeregisterQueue();
// 將pollfd結(jié)構(gòu)中的信息復(fù)制到相應(yīng)通道的ops中。將就緒的key添加到就緒隊(duì)列。
int numKeysUpdated = updateSelectedKeys();
if (pollWrapper.getReventOps(0) != 0) {
// Clear the wakeup pipe
pollWrapper.putReventOps(0, 0);
synchronized (interruptLock) {
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}
doSelect方法中會(huì)調(diào)用PollArrayWrapper中的poll方法,linux下的PollArrayWrapper和windows下的不太一樣。PollArrayWrapper源碼中的文檔注釋如下:
/**
* 代碼片段11 linux下的PollArrayWrapper類中的注釋
*/
/**
* Manipulates a native array of pollfd structs on Solaris:
*
* typedef struct pollfd {
* int fd;
* short events;
* short revents;
* } pollfd_t;
*
* @author Mike McCloskey
* @since 1.4
*/
可以發(fā)現(xiàn),與windows的相比,linux下的PollArrayWrapper操作的結(jié)構(gòu)體多了一個(gè)revents(實(shí)際發(fā)生的事件)的字段,linux下的PollArrayWrapper類繼承了抽象類AbstractPollArrayWrapper,AbstractPollArrayWrapper定義了對(duì)文件描述符和事件的操作方法:
/**
* 代碼片段12 AbstractPollArrayWrapper中定義的幾個(gè)final常量和對(duì)文件描述符、事件的操作方法
*/
static final short SIZE_POLLFD = 8;
static final short FD_OFFSET = 0;
static final short EVENT_OFFSET = 4;
static final short REVENT_OFFSET = 6;
protected AllocatedNativeObject pollArray;
// Access methods for fd structures
int getEventOps(int i) {
int offset = SIZE_POLLFD * i + EVENT_OFFSET;
return pollArray.getShort(offset);
}
int getReventOps(int i) {
int offset = SIZE_POLLFD * i + REVENT_OFFSET;
return pollArray.getShort(offset);
}
int getDescriptor(int i) {
int offset = SIZE_POLLFD * i + FD_OFFSET;
return pollArray.getInt(offset);
}
void putEventOps(int i, int event) {
int offset = SIZE_POLLFD * i + EVENT_OFFSET;
pollArray.putShort(offset, (short)event);
}
void putReventOps(int i, int revent) {
int offset = SIZE_POLLFD * i + REVENT_OFFSET;
pollArray.putShort(offset, (short)revent);
}
void putDescriptor(int i, int fd) {
int offset = SIZE_POLLFD * i + FD_OFFSET;
pollArray.putInt(offset, fd);
}
可見(jiàn),linux下的PollArrayWrapper中pollArray的每8個(gè)字節(jié)的后兩個(gè)字節(jié)不是空,而是存儲(chǔ)著兩個(gè)字節(jié)的revents。PollArrayWrapper的poll方法如下:
/**
* 代碼片段13 PollArrayWrapper中poll方法
*/
int poll(int numfds, int offset, long timeout) {
return poll0(pollArrayAddress + (offset * SIZE_POLLFD),
numfds, timeout);
}
private native int poll0(long pollAddress, int numfds, long timeout);
poll0方法的c語(yǔ)言源碼:
/**
* 代碼片段14 PollArrayWrapper中poll方法的c源碼
*/
JNIEXPORT jint JNICALL
Java_sun_nio_ch_PollArrayWrapper_poll0(JNIEnv *env, jobject this,
jlong address, jint numfds,
jlong timeout)
{
struct pollfd *a;
int err = 0;
a = (struct pollfd *) jlong_to_ptr(address);
if (timeout <= 0) { /* Indefinite or no wait */
//如果timeout<=0,立即調(diào)用系統(tǒng)的poll函數(shù)
RESTARTABLE (poll(a, numfds, timeout), err);
} else { /* Bounded wait; bounded restarts */
//如果timeout>0,會(huì)循環(huán)的調(diào)用poll函數(shù)直到到了timeout的時(shí)間
err = ipoll(a, numfds, timeout);
}
if (err < 0) {
JNU_ThrowIOExceptionWithLastError(env, "Poll failed");
}
return (jint)err;
}
static int ipoll(struct pollfd fds[], unsigned int nfds, int timeout)
{
jlong start, now;
int remaining = timeout;
struct timeval t;
int diff;
gettimeofday(&t, NULL);
start = t.tv_sec * 1000 + t.tv_usec / 1000;
for (;;) {
//調(diào)用poll函數(shù) remaining是剩余的timeout,其實(shí)也就調(diào)用一次,用循環(huán)應(yīng)該是為了防止poll函數(shù)的進(jìn)程被異常喚醒
int res = poll(fds, nfds, remaining);
if (res < 0 && errno == EINTR) {
if (remaining >= 0) {
gettimeofday(&t, NULL);
now = t.tv_sec * 1000 + t.tv_usec / 1000;
diff = now - start;
remaining -= diff;
if (diff < 0 || remaining <= 0) {
return 0;
}
start = now;
}
} else {
return res;
}
}
}
可見(jiàn)PollSelectorImpl確實(shí)是調(diào)用系統(tǒng)的poll函數(shù)實(shí)現(xiàn)多路復(fù)用的。
EPollSelectorImpl
EPollSelectorImpl中使用EPollArrayWrapper來(lái)操作文件描述符和事件,EPollArrayWrapper中的對(duì)EPoll事件結(jié)構(gòu)體的文檔注釋:
/**
* 代碼片段15 EPollArrayWrapper類中的注釋
*/
/**
* Manipulates a native array of epoll_event structs on Linux:
*
* typedef union epoll_data {
* void *ptr;
* int fd;
* __uint32_t u32;
* __uint64_t u64;
* } epoll_data_t;
*
* struct epoll_event {
* __uint32_t events;
* epoll_data_t data;
* };
*
* The system call to wait for I/O events is epoll_wait(2). It populates an
* array of epoll_event structures that are passed to the call. The data
* member of the epoll_event structure contains the same data as was set
* when the file descriptor was registered to epoll via epoll_ctl(2). In
* this implementation we set data.fd to be the file descriptor that we
* register. That way, we have the file descriptor available when we
* process the events.
*/
等待IO時(shí)間的系統(tǒng)調(diào)用函數(shù)是epoll_wait(2),它填充了一個(gè)epoll_event結(jié)構(gòu)體的數(shù)組,這個(gè)數(shù)組被傳遞給系統(tǒng)調(diào)用。epoll_event結(jié)構(gòu)的數(shù)據(jù)成員包含的數(shù)據(jù)與通過(guò)epoll_ctl(2)將文件描述符注冊(cè)到epoll時(shí)設(shè)置的數(shù)據(jù)相同。在這個(gè)實(shí)現(xiàn)中,我們將data.fd設(shè)置為注冊(cè)的文件描述符。這樣,我們?cè)谔幚硎录r(shí)就有了可用的文件描述符。
很明顯,EPollSelectorImpl中操作的結(jié)構(gòu)體大小比PollSelectorImpl要大,這里不一一解讀了。EPoll的調(diào)用和select、poll不同,需要調(diào)用三個(gè)系統(tǒng)函數(shù),分別是epoll_create,epoll_ctl 和 epoll_wait,這點(diǎn)在JDK NIO中也得到驗(yàn)證。在EPollArrayWrapper創(chuàng)建時(shí)會(huì)調(diào)用epollCreate方法:
/**
* 代碼片段16 EPollArrayWrapper的構(gòu)造方法和構(gòu)造方法中調(diào)用的epollCreate方法
*/
EPollArrayWrapper() throws IOException {
// creates the epoll file descriptor
epfd = epollCreate();
// the epoll_event array passed to epoll_wait
int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
pollArray = new AllocatedNativeObject(allocationSize, true);
pollArrayAddress = pollArray.address();
// eventHigh needed when using file descriptors > 64k
if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)
eventsHigh = new HashMap<>();
}
private native int epollCreate();
這里的epollCreate方法也就是進(jìn)行epoll_create系統(tǒng)調(diào)用,創(chuàng)建一個(gè)EPoll實(shí)例。以下是C源碼:
/**
* 代碼片段17 epollCreate方法的c源碼
*/
JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this)
{
/*
* epoll_create expects a size as a hint to the kernel about how to
* dimension internal structures. We can't predict the size in advance.
*/
//進(jìn)行epoll_create系統(tǒng)調(diào)用
int epfd = epoll_create(256);
if (epfd < 0) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");
}
return epfd;
}
EPollSelectorImpl的構(gòu)造方法中創(chuàng)建完成一個(gè)EPollArrayWrapper實(shí)例后,會(huì)執(zhí)行該實(shí)例的initInterrupt方法,這個(gè)方法中調(diào)用了epollCtl方法:
/**
* 代碼片段18 EPollSelectorImpl的構(gòu)造方法、構(gòu)造方法中調(diào)用的EPollArrayWrapper中的initInterrupt方法
* 和initInterrupt中調(diào)用的epollCtl方法
*/
/**
* Package private constructor called by factory method in
* the abstract superclass Selector.
*/
EPollSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
long pipeFds = IOUtil.makePipe(false);
fd0 = (int) (pipeFds >>> 32);
fd1 = (int) pipeFds;
pollWrapper = new EPollArrayWrapper();
//調(diào)用initInterrupt方法
pollWrapper.initInterrupt方法(fd0, fd1);
fdToKey = new HashMap<>();
}
void initInterrupt(int fd0, int fd1) {
outgoingInterruptFD = fd1;
incomingInterruptFD = fd0;
//調(diào)用epollCtl
epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);
}
private native void epollCtl(int epfd, int opcode, int fd, int events);
這里的epollCtl方法也就是進(jìn)行epoll_ctl系統(tǒng)調(diào)用,往剛剛創(chuàng)建的EPoll實(shí)例中添加要監(jiān)控的事件。以下是C源碼:
/**
* 代碼片段19 epollCtl方法的c源碼
*/
JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd,
jint opcode, jint fd, jint events)
{
struct epoll_event event;
int res;
event.events = events;
event.data.fd = fd;
//調(diào)用epoll_ctl
RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);
/*
* A channel may be registered with several Selectors. When each Selector
* is polled a EPOLL_CTL_DEL op will be inserted into its pending update
* list to remove the file descriptor from epoll. The "last" Selector will
* close the file descriptor which automatically unregisters it from each
* epoll descriptor. To avoid costly synchronization between Selectors we
* allow pending updates to be processed, ignoring errors. The errors are
* harmless as the last update for the file descriptor is guaranteed to
* be EPOLL_CTL_DEL.
*/
if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");
}
}
EPollSelectorImpl的doSelect方法會(huì)調(diào)用EPollArrayWrapper的poll方法,而在poll方法中會(huì)調(diào)用epollWait:
/**
* 代碼片段20 EPollSelectorImpl中的doSelect方法、doSelect方法中調(diào)用的EPollArrayWrapper的poll方法
* 和poll方法中調(diào)用的epollWait方法
*/
protected int doSelect(long timeout) throws IOException {
if (closed)
throw new ClosedSelectorException();
processDeregisterQueue();
try {
begin();
pollWrapper.poll(timeout);
} finally {
end();
}
processDeregisterQueue();
int numKeysUpdated = updateSelectedKeys();
if (pollWrapper.interrupted()) {
// Clear the wakeup pipe
pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
synchronized (interruptLock) {
pollWrapper.clearInterrupted();
IOUtil.drain(fd0);
interruptTriggered = false;
}
}
return numKeysUpdated;
}
int poll(long timeout) throws IOException {
//更新注冊(cè)信息,如果監(jiān)視的實(shí)踐發(fā)生變化,會(huì)調(diào)用epoll_ctl往Epoll實(shí)例中增加或刪除事件
updateRegistrations();
//調(diào)用epollWait
updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
for (int i=0; i<updated; i++) {
if (getDescriptor(i) == incomingInterruptFD) {
interruptedIndex = i;
interrupted = true;
break;
}
}
return updated;
}
private native int epollWait(long pollAddress, int numfds, long timeout,
int epfd) throws IOException;
這里的epollWait方法也就是進(jìn)行epoll_wait系統(tǒng)調(diào)用,調(diào)用者進(jìn)程被掛起,在等待內(nèi)核I/O事件的分發(fā)。以下是C源碼:
/**
* 代碼片段21 epollWait方法的c源碼
*/
JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this,
jlong address, jint numfds,
jlong timeout, jint epfd)
{
struct epoll_event *events = jlong_to_ptr(address);
int res;
if (timeout <= 0) { /* Indefinite or no wait */
//如果timeout<=0,立即調(diào)用系統(tǒng)的epoll_wait函數(shù)
RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);
} else { /* Bounded wait; bounded restarts */
//如果timeout>0,循環(huán)調(diào)用直到超時(shí)時(shí)間到了,用循環(huán)應(yīng)該是為了防止異常喚醒
res = iepoll(epfd, events, numfds, timeout);
}
if (res < 0) {
JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");
}
return res;
}
總結(jié)
至此,本文已經(jīng)對(duì)三種IO多路復(fù)用技術(shù)和在JDK中的應(yīng)用進(jìn)行了解讀。在windows環(huán)境下,JDK NIO中只有WindowsSelectorImpl這有一個(gè)Selector的非抽象實(shí)現(xiàn),采用的IO多路復(fù)用方式是select;在linux環(huán)境下PollSelectorImpl和EPollSelectorImpl兩種實(shí)現(xiàn),分別采用poll和epoll實(shí)現(xiàn)IO多路復(fù)用。本文還對(duì)這些Selector的具體實(shí)現(xiàn)進(jìn)行了詳細(xì)的解讀,不足之處,敬請(qǐng)指正。