多路復(fù)用器Selector

Unix系統(tǒng)有五種IO模型分別是阻塞IO(blocking IO),非阻塞IO( non-blocking IO),IO多路復(fù)用(IO multiplexing),信號(hào)驅(qū)動(dòng)(SIGIO/Signal IO)和異步IO(Asynchronous IO)。而IO多路復(fù)用通常有select,poll,epoll,kqueue等方式。而多路復(fù)用器Selector,就是采用這些IO多路復(fù)用的機(jī)制獲取事件。JDK中的NIO(new IO)包,采用的就是IO多路復(fù)用的模型。

select,poll和epoll

阻塞IO下,應(yīng)用程序調(diào)用IO函數(shù),如果沒(méi)有數(shù)據(jù)貯備好,那么IO操作會(huì)一直阻塞下去,阻塞IO不會(huì)占用大量CPU,但在這種IO模型下,一個(gè)線程只能處理一個(gè)文件描述符的IO事件;非阻塞IO下,應(yīng)用程序調(diào)用IO事件,如果數(shù)據(jù)沒(méi)有準(zhǔn)備好,會(huì)直接返回一個(gè)錯(cuò)誤,應(yīng)用程序不會(huì)阻塞,這樣就可以同時(shí)處理多個(gè)文件描述符的IO事件,但是需要不間斷地輪詢來(lái)獲取IO事件,對(duì)CPU是很大的浪費(fèi)。并且阻塞IO和非阻塞IO調(diào)用一個(gè)IO函數(shù)只能獲取一個(gè)IO事件。
select,poll和epoll是最常見(jiàn)的三種IO多路復(fù)用的方式,它們都支持同時(shí)感知多個(gè)IO事件,它們的工作特點(diǎn)和區(qū)別如下:

  • select可以在一定時(shí)間內(nèi)監(jiān)視多個(gè)文件描述符的IO事件,select函數(shù)需要傳入要監(jiān)視的文件描述符的句柄作為參數(shù),并且返回所有文件描述符,應(yīng)用程序需要遍歷所有的循環(huán)來(lái)看每一個(gè)文件描述符是否有IO事件發(fā)生,效率較低。并且,select默認(rèn)只能監(jiān)視1024個(gè)文件描述符,這些文件描述符采用數(shù)組進(jìn)行存儲(chǔ),可以修改FD_SETSIZE的值來(lái)修改文件描述符的數(shù)量限制。
  • poll和select類似,poll采用鏈表存儲(chǔ)監(jiān)視的文件描述符,可以超過(guò)1024的限制。
  • epoll可以監(jiān)控的文件描述符數(shù)量是可以打開(kāi)文件的數(shù)量上限。與select和poll不同,epoll獲取事件不是通過(guò)輪詢得到,而是通過(guò)給每個(gè)文件描述符定義回調(diào)得到,因此,在監(jiān)視的文件描述符很多的情況下,epoll的效率不會(huì)有明顯的下降。并且,select和poll返回給應(yīng)用程序的是所有的文件描述符,而epoll返回的是就緒(有事件發(fā)生的)的文件描述符。

JDK NIO包中的各種Selector

JDK中的Selector是一個(gè)抽象類,創(chuàng)建一個(gè)Selector通常以下面代碼中的方式進(jìn)行:

    /**
     * 代碼片段1 創(chuàng)建Selector
     */
   Selector selector = Selector.open();

下面是具體的實(shí)現(xiàn):

    /**
     * 代碼片段2 Selector中的open方法和SelectorProvider中的provider方法
     */
    //調(diào)用SelectorProvider的openSelector創(chuàng)建Selector
    public static Selector open() throws IOException {
       return SelectorProvider.provider().openSelector();
   }
   
   //創(chuàng)建SelectorProvider,最終調(diào)用sun.nio.ch.DefaultSelectorProvider.create(), 這個(gè)方法在不同平臺(tái)上有不同的實(shí)現(xiàn)
   public static SelectorProvider provider() {
       synchronized (lock) {
           if (provider != null)
               return provider;
           return AccessController.doPrivileged(
               new PrivilegedAction<SelectorProvider>() {
                   public SelectorProvider run() {
                           if (loadProviderFromProperty())
                               return provider;
                           if (loadProviderAsService())
                               return provider;
                           provider = sun.nio.ch.DefaultSelectorProvider.create();
                           return provider;
                       }
                   });
       }
   }

在不同的操作系統(tǒng)平臺(tái)下,SelectorProvider的實(shí)現(xiàn)也不相同,創(chuàng)建出來(lái)的Selector的實(shí)現(xiàn)也不一樣。

windows下的多路復(fù)用實(shí)現(xiàn)

windows下的jdk中只有一個(gè)非抽象的SelectorProvider的實(shí)現(xiàn)類——WindowsSelectorProvider。顯然,sun.nio.ch.DefaultSelectorProvider.create()返回的也是一個(gè)WindowsSelectorProvider對(duì)象:

    /**
     * 代碼片段3 windows環(huán)境下jdk中的sun.nio.ch.DefaultSelectorProvider.create()方法
     */
    public static SelectorProvider create() {
       return new WindowsSelectorProvider();
   }

WindowsSelectorProvider的openSelector方法會(huì)返回一個(gè)WindowsSelectorImpl對(duì)象,WindowsSelectorImpl繼承了SelectorImpl這個(gè)抽象類:

   /**
    * 代碼片段4 
    */
    public AbstractSelector openSelector() throws IOException {
       return new WindowsSelectorImpl(this);
   }

WindowsSelectorImpl的成員變量pollWrapper是一個(gè)PollArrayWrapper對(duì)象,PollArrayWrapper類在openjdk的源碼中有這樣的一段文檔注釋:


/**
* 代碼片段5 windows下的PollArrayWrapper類中的注釋
*/
/**
 * Manipulates a native array of structs corresponding to (fd, events) pairs.
 *
 * typedef struct pollfd {
 *    SOCKET fd;            // 4 bytes
 *    short events;         // 2 bytes
 * } pollfd_t;
 *
 * @author Konstantin Kladko
 * @author Mike McCloskey
 */

PollArrayWrapper是用來(lái)操作(fd,events)對(duì)相對(duì)應(yīng)的結(jié)構(gòu)的原生數(shù)組。這個(gè)原生數(shù)組的結(jié)構(gòu)就是上面的注釋中的結(jié)構(gòu)體所定義的。PollArrayWrapper類中通過(guò)操作AllocatedNativeObject類型的成員變量pollArray來(lái)操作文件描述符和事件。AllocatedNativeObject類繼承了NativeObject類,NativeObject類型是駐留在本地內(nèi)存中的對(duì)象的代理,提供了在堆外內(nèi)存中存放和取出除boolean外的基本類型數(shù)據(jù)的方法。以byte類型為例,其存取方法如下:

   /**
    * 代碼片段6  NativeObject中的getByte和putByte方法
    */
    /**
     * Reads a byte starting at the given offset from base of this native
     * object.
     *
     * @param  offset
     *         The offset at which to read the byte
     *
     * @return The byte value read
     */
    final byte getByte(int offset) {
        return unsafe.getByte(offset + address);
    }

    /**
     * Writes a byte at the specified offset from this native object's
     * base address.
     *
     * @param  offset
     *         The offset at which to write the byte
     *
     * @param  value
     *         The byte value to be written
     */
    final void putByte(int offset, byte value) {
        unsafe.putByte(offset + address,  value);
    }

PollArrayWrapper中提供了方法存儲(chǔ)事件和文件描述符,這些方法都通過(guò)來(lái)pollArray存取int和short類型,這些方法和PollArrayWrapper構(gòu)造方法如下:

   /**
    * 代碼片段7  PollArrayWrapper構(gòu)造方法和對(duì)文件描述符和事件的操作方法
    */
    PollArrayWrapper(int newSize) {
        int allocationSize = newSize * SIZE_POLLFD;
        pollArray = new AllocatedNativeObject(allocationSize, true);
        pollArrayAddress = pollArray.address();
        this.size = newSize;
    }
    
    // Access methods for fd structures
    void putDescriptor(int i, int fd) {
        pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd);
    }

    void putEventOps(int i, int event) {
        pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event);
    }

    int getEventOps(int i) {
        return pollArray.getShort(SIZE_POLLFD * i + EVENT_OFFSET);
    }

    int getDescriptor(int i) {
       return pollArray.getInt(SIZE_POLLFD * i + FD_OFFSET);
    }

因?yàn)閜ollfd結(jié)構(gòu)體中,fd占用4個(gè)字節(jié),events占用2個(gè)字節(jié)分別對(duì)應(yīng)int和short的長(zhǎng)度。FD_OFFSET,EVENT_OFFSET和SIZE_POLLFD分別是final修飾的int常量0,4和8。PollArrayWrapper會(huì)用8個(gè)字節(jié)來(lái)存儲(chǔ)一個(gè)event和fd的配對(duì),構(gòu)造一個(gè)PollArrayWrapper對(duì)象會(huì)從堆外內(nèi)存分配newSize8字節(jié)的空間。獲取第i個(gè)fd則獲取對(duì)應(yīng)的第8i個(gè)字節(jié)對(duì)應(yīng)的int,獲取第i個(gè)event則只要獲取第8*i+4個(gè)字節(jié)對(duì)應(yīng)的short。所以構(gòu)造一個(gè)size大小的PollArrayWrapper對(duì)象就可以存儲(chǔ)size個(gè)fd,event對(duì),并且他們?cè)趦?nèi)存上是連續(xù)的(每對(duì)的空間末尾有2個(gè)字節(jié)用不到),所以這也是一個(gè)數(shù)組。

Selector中的doSelect方法是具體對(duì)文件描述符的操作,WindowsSelectorImpl中的doSelector方法如下:

   /**
    * 代碼片段8  WindowsSelectorImpl內(nèi)部類SubSelector的poll方法中的doSelect方法及其調(diào)用的方法具體實(shí)現(xiàn)
    */
   protected int doSelect(long timeout) throws IOException {
       if (channelArray == null)
           throw new ClosedSelectorException();
       this.timeout = timeout; // set selector timeout
       processDeregisterQueue();
       if (interruptTriggered) {
           resetWakeupSocket();
           return 0;
       }
       //  計(jì)算輪詢所需的輔助線程數(shù)。如果需要,在這里創(chuàng)建線程并開(kāi)始等待startLock 
       adjustThreadsCount();
       // 重置finishLock
       finishLock.reset(); 
       //  喚醒輔助線程,等待啟動(dòng)鎖,線程啟動(dòng)后會(huì)開(kāi)始輪詢。冗余線程將在喚醒后退出。 
       startLock.startThreads();
       // 在主線程中進(jìn)行輪詢。主線程負(fù)責(zé)pollArray中的前MAX_SELECTABLE_FDS(默認(rèn)1024)個(gè)fd,event對(duì)。
       try {
           begin();
           try {
               subSelector.poll();
           } catch (IOException e) {
               // 保存異常
               finishLock.setException(e); 
           }
           //  主線程poll()調(diào)用結(jié)束。喚醒其他線程并等待他們 
           if (threads.size()0)
               finishLock.waitForHelperThreads();
         } finally {
             end();
         }
       finishLock.checkForException();
       processDeregisterQueue();
       // 更新相應(yīng)channel的操作。將就緒的key添加到就緒隊(duì)列。 
       int updated = updateSelectedKeys();
       // poll()調(diào)用完成。為下一次運(yùn)行,將wakeupSocket設(shè)置為nonsigned。 
       resetWakeupSocket();
       return updated;
   }
   
   //WindowsSelectorImpl內(nèi)部類SubSelector的poll方法
   private int poll() throws IOException{ // poll for the main thread
       return poll0(pollWrapper.pollArrayAddress,
                    Math.min(totalChannels, MAX_SELECTABLE_FDS),
                    readFds, writeFds, exceptFds, timeout);
   }
   //WindowsSelectorImpl內(nèi)部類SubSelector的poll0方法
   private native int poll0(long pollAddress, int numfds,
        int[] readFds, int[] writeFds, int[] exceptFds, long timeout);

poll0方法的C語(yǔ)言源碼:

/**
* 代碼片段9  WindowsSelectorImpl內(nèi)部類SubSelector的poll方法的c源碼 
*/
JNIEXPORT jint JNICALL
Java_sun_nio_ch_WindowsSelectorImpl_00024SubSelector_poll0(JNIEnv *env, jobject this,
                                   jlong pollAddress, jint numfds,
                                   jintArray returnReadFds, jintArray returnWriteFds,
                                   jintArray returnExceptFds, jlong timeout)
{
    ... //省略部分代碼 
    /* Call select */
    if ((result = select(0 , &readfds, &writefds, &exceptfds, tv))//調(diào)用系統(tǒng)的select函數(shù)
                                                             == SOCKET_ERROR) {
        /* Bad error - this should not happen frequently */
        /* Iterate over sockets and call select() on each separately */
        FD_SET errreadfds, errwritefds, errexceptfds;
        readfds.fd_count = 0;
        writefds.fd_count = 0;
        exceptfds.fd_count = 0;
        for (i = 0; i < numfds; i++) {
            /* prepare select structures for the i-th socket */
            errreadfds.fd_count = 0;
            errwritefds.fd_count = 0;
            if (fds[i].events & POLLIN) {
               errreadfds.fd_array[0] = fds[i].fd;
               errreadfds.fd_count = 1;
            }
            if (fds[i].events & (POLLOUT | POLLCONN))
            {
                errwritefds.fd_array[0] = fds[i].fd;
                errwritefds.fd_count = 1;
            }
            errexceptfds.fd_array[0] = fds[i].fd;
            errexceptfds.fd_count = 1;

            /* call select on the i-th socket */
            if (select(0, &errreadfds, &errwritefds, &errexceptfds, &zerotime)//調(diào)用系統(tǒng)的select函數(shù)
                                                             == SOCKET_ERROR) {
                /* This socket causes an error. Add it to exceptfds set */
                exceptfds.fd_array[exceptfds.fd_count] = fds[i].fd;
                exceptfds.fd_count++;
            } else {
                /* This socket does not cause an error. Process result */
                if (errreadfds.fd_count == 1) {
                    readfds.fd_array[readfds.fd_count] = fds[i].fd;
                    readfds.fd_count++;
                }
                if (errwritefds.fd_count == 1) {
                    writefds.fd_array[writefds.fd_count] = fds[i].fd;
                    writefds.fd_count++;
                }
                if (errexceptfds.fd_count == 1) {
                    exceptfds.fd_array[exceptfds.fd_count] = fds[i].fd;
                    exceptfds.fd_count++;
                }
            }
        }
    }
    ... //省略部分代碼
}

可見(jiàn)Window環(huán)境的JDK的nio是調(diào)用select系統(tǒng)函數(shù)來(lái)進(jìn)行的。

linux下的多路復(fù)用實(shí)現(xiàn)

linux的jdk中有2個(gè)非抽象的Selector的子類——PollSelectorImpl和EPollSelectorImpl。

PollSelectorImpl

顧名思義,PollSelectorImpl是采用poll來(lái)進(jìn)行多路復(fù)用。PollSelectorImpl繼承了AbstractPollSelectorImpl。AbstractPollSelectorImpl中也維護(hù)了一個(gè)PollArrayWrapper來(lái)存儲(chǔ)文件描述符和事件對(duì),但linux下的PollArrayWrapper和windows下的實(shí)現(xiàn)并不相同。先看PollSelectorImpl的doSelect方法如下:

    /**
     * 代碼片段10  PollSelectorImpl的doSelect方法
     */
    protected int doSelect(long timeout)
        throws IOException
    {
        if (channelArray == null)
            throw new ClosedSelectorException();
        processDeregisterQueue();
        try {
            begin();
            pollWrapper.poll(totalChannels, 0, timeout);
        } finally {
            end();
        }
        processDeregisterQueue();
        // 將pollfd結(jié)構(gòu)中的信息復(fù)制到相應(yīng)通道的ops中。將就緒的key添加到就緒隊(duì)列。 
        int numKeysUpdated = updateSelectedKeys();
        if (pollWrapper.getReventOps(0) != 0) {
            // Clear the wakeup pipe
            pollWrapper.putReventOps(0, 0);
            synchronized (interruptLock) {
                IOUtil.drain(fd0);
                interruptTriggered = false;
            }
        }
        return numKeysUpdated;
    }

doSelect方法中會(huì)調(diào)用PollArrayWrapper中的poll方法,linux下的PollArrayWrapper和windows下的不太一樣。PollArrayWrapper源碼中的文檔注釋如下:

/**
 * 代碼片段11  linux下的PollArrayWrapper類中的注釋
 */
 /**
  * Manipulates a native array of pollfd structs on Solaris:
  *
  * typedef struct pollfd {
  *    int fd;
  *    short events;
  *    short revents;
  * } pollfd_t;
  *
  * @author Mike McCloskey
  * @since 1.4
  */

可以發(fā)現(xiàn),與windows的相比,linux下的PollArrayWrapper操作的結(jié)構(gòu)體多了一個(gè)revents(實(shí)際發(fā)生的事件)的字段,linux下的PollArrayWrapper類繼承了抽象類AbstractPollArrayWrapper,AbstractPollArrayWrapper定義了對(duì)文件描述符和事件的操作方法:

    /**
     * 代碼片段12  AbstractPollArrayWrapper中定義的幾個(gè)final常量和對(duì)文件描述符、事件的操作方法
     */
    static final short SIZE_POLLFD   = 8;
    static final short FD_OFFSET     = 0;
    static final short EVENT_OFFSET  = 4;
    static final short REVENT_OFFSET = 6;
    protected AllocatedNativeObject pollArray;
     // Access methods for fd structures
     int getEventOps(int i) {
         int offset = SIZE_POLLFD * i + EVENT_OFFSET;
         return pollArray.getShort(offset);
     }
 
     int getReventOps(int i) {
         int offset = SIZE_POLLFD * i + REVENT_OFFSET;
         return pollArray.getShort(offset);
     }
 
     int getDescriptor(int i) {
         int offset = SIZE_POLLFD * i + FD_OFFSET;
         return pollArray.getInt(offset);
     }
 
     void putEventOps(int i, int event) {
         int offset = SIZE_POLLFD * i + EVENT_OFFSET;
         pollArray.putShort(offset, (short)event);
     }
 
     void putReventOps(int i, int revent) {
         int offset = SIZE_POLLFD * i + REVENT_OFFSET;
         pollArray.putShort(offset, (short)revent);
     }
 
     void putDescriptor(int i, int fd) {
         int offset = SIZE_POLLFD * i + FD_OFFSET;
         pollArray.putInt(offset, fd);
     }

可見(jiàn),linux下的PollArrayWrapper中pollArray的每8個(gè)字節(jié)的后兩個(gè)字節(jié)不是空,而是存儲(chǔ)著兩個(gè)字節(jié)的revents。PollArrayWrapper的poll方法如下:

    /**
     * 代碼片段13  PollArrayWrapper中poll方法
     */
    int poll(int numfds, int offset, long timeout) {
        return poll0(pollArrayAddress + (offset * SIZE_POLLFD),
                     numfds, timeout);
    }
    
    private native int poll0(long pollAddress, int numfds, long timeout);

poll0方法的c語(yǔ)言源碼:

/**
 * 代碼片段14  PollArrayWrapper中poll方法的c源碼
 */
JNIEXPORT jint JNICALL
Java_sun_nio_ch_PollArrayWrapper_poll0(JNIEnv *env, jobject this,
                                       jlong address, jint numfds,
                                       jlong timeout)
{
    struct pollfd *a;
    int err = 0;

    a = (struct pollfd *) jlong_to_ptr(address);

    if (timeout <= 0) {           /* Indefinite or no wait */
    //如果timeout<=0,立即調(diào)用系統(tǒng)的poll函數(shù)
        RESTARTABLE (poll(a, numfds, timeout), err);
    } else {                     /* Bounded wait; bounded restarts */
    //如果timeout>0,會(huì)循環(huán)的調(diào)用poll函數(shù)直到到了timeout的時(shí)間
        err = ipoll(a, numfds, timeout);
    }

    if (err < 0) {
        JNU_ThrowIOExceptionWithLastError(env, "Poll failed");
    }
    return (jint)err;
}
static int ipoll(struct pollfd fds[], unsigned int nfds, int timeout)
{
    jlong start, now;
    int remaining = timeout;
    struct timeval t;
    int diff;

    gettimeofday(&t, NULL);
    start = t.tv_sec * 1000 + t.tv_usec / 1000;

    for (;;) {
        //調(diào)用poll函數(shù) remaining是剩余的timeout,其實(shí)也就調(diào)用一次,用循環(huán)應(yīng)該是為了防止poll函數(shù)的進(jìn)程被異常喚醒
        int res = poll(fds, nfds, remaining);
        if (res < 0 && errno == EINTR) {
            if (remaining >= 0) {
                gettimeofday(&t, NULL);
                now = t.tv_sec * 1000 + t.tv_usec / 1000;
                diff = now - start;
                remaining -= diff;
                if (diff < 0 || remaining <= 0) {
                    return 0;
                }
                start = now;
            }
        } else {
            return res;
        }
    }
}

可見(jiàn)PollSelectorImpl確實(shí)是調(diào)用系統(tǒng)的poll函數(shù)實(shí)現(xiàn)多路復(fù)用的。

EPollSelectorImpl

EPollSelectorImpl中使用EPollArrayWrapper來(lái)操作文件描述符和事件,EPollArrayWrapper中的對(duì)EPoll事件結(jié)構(gòu)體的文檔注釋:

/**
 * 代碼片段15  EPollArrayWrapper類中的注釋
 */
/**
 * Manipulates a native array of epoll_event structs on Linux:
 *
 * typedef union epoll_data {
 *     void *ptr;
 *     int fd;
 *     __uint32_t u32;
 *     __uint64_t u64;
 *  } epoll_data_t;
 *
 * struct epoll_event {
 *     __uint32_t events;
 *     epoll_data_t data;
 * };
 *
 * The system call to wait for I/O events is epoll_wait(2). It populates an
 * array of epoll_event structures that are passed to the call. The data
 * member of the epoll_event structure contains the same data as was set
 * when the file descriptor was registered to epoll via epoll_ctl(2). In
 * this implementation we set data.fd to be the file descriptor that we
 * register. That way, we have the file descriptor available when we
 * process the events.
 */

等待IO時(shí)間的系統(tǒng)調(diào)用函數(shù)是epoll_wait(2),它填充了一個(gè)epoll_event結(jié)構(gòu)體的數(shù)組,這個(gè)數(shù)組被傳遞給系統(tǒng)調(diào)用。epoll_event結(jié)構(gòu)的數(shù)據(jù)成員包含的數(shù)據(jù)與通過(guò)epoll_ctl(2)將文件描述符注冊(cè)到epoll時(shí)設(shè)置的數(shù)據(jù)相同。在這個(gè)實(shí)現(xiàn)中,我們將data.fd設(shè)置為注冊(cè)的文件描述符。這樣,我們?cè)谔幚硎录r(shí)就有了可用的文件描述符。

很明顯,EPollSelectorImpl中操作的結(jié)構(gòu)體大小比PollSelectorImpl要大,這里不一一解讀了。EPoll的調(diào)用和select、poll不同,需要調(diào)用三個(gè)系統(tǒng)函數(shù),分別是epoll_create,epoll_ctl 和 epoll_wait,這點(diǎn)在JDK NIO中也得到驗(yàn)證。在EPollArrayWrapper創(chuàng)建時(shí)會(huì)調(diào)用epollCreate方法:

    /**
     * 代碼片段16  EPollArrayWrapper的構(gòu)造方法和構(gòu)造方法中調(diào)用的epollCreate方法
     */
    EPollArrayWrapper() throws IOException {
        // creates the epoll file descriptor
        epfd = epollCreate();

        // the epoll_event array passed to epoll_wait
        int allocationSize = NUM_EPOLLEVENTS * SIZE_EPOLLEVENT;
        pollArray = new AllocatedNativeObject(allocationSize, true);
        pollArrayAddress = pollArray.address();

        // eventHigh needed when using file descriptors > 64k
        if (OPEN_MAX > MAX_UPDATE_ARRAY_SIZE)
            eventsHigh = new HashMap<>();
    }
    
    private native int epollCreate();

這里的epollCreate方法也就是進(jìn)行epoll_create系統(tǒng)調(diào)用,創(chuàng)建一個(gè)EPoll實(shí)例。以下是C源碼:

/**
 * 代碼片段17  epollCreate方法的c源碼
 */
JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCreate(JNIEnv *env, jobject this)
{
    /*
     * epoll_create expects a size as a hint to the kernel about how to
     * dimension internal structures. We can't predict the size in advance.
     */
    //進(jìn)行epoll_create系統(tǒng)調(diào)用
    int epfd = epoll_create(256);
    if (epfd < 0) {
       JNU_ThrowIOExceptionWithLastError(env, "epoll_create failed");
    }
    return epfd;
}

EPollSelectorImpl的構(gòu)造方法中創(chuàng)建完成一個(gè)EPollArrayWrapper實(shí)例后,會(huì)執(zhí)行該實(shí)例的initInterrupt方法,這個(gè)方法中調(diào)用了epollCtl方法:

    /**
     * 代碼片段18  EPollSelectorImpl的構(gòu)造方法、構(gòu)造方法中調(diào)用的EPollArrayWrapper中的initInterrupt方法
     * 和initInterrupt中調(diào)用的epollCtl方法
     */
    /**
     * Package private constructor called by factory method in
     * the abstract superclass Selector.
     */
    EPollSelectorImpl(SelectorProvider sp) throws IOException {
        super(sp);
        long pipeFds = IOUtil.makePipe(false);
        fd0 = (int) (pipeFds >>> 32);
        fd1 = (int) pipeFds;
        pollWrapper = new EPollArrayWrapper();
        //調(diào)用initInterrupt方法
        pollWrapper.initInterrupt方法(fd0, fd1);
        fdToKey = new HashMap<>();
    }
    
    void initInterrupt(int fd0, int fd1) {
        outgoingInterruptFD = fd1;
        incomingInterruptFD = fd0;
        //調(diào)用epollCtl
        epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);
    }
    
    private native void epollCtl(int epfd, int opcode, int fd, int events);

這里的epollCtl方法也就是進(jìn)行epoll_ctl系統(tǒng)調(diào)用,往剛剛創(chuàng)建的EPoll實(shí)例中添加要監(jiān)控的事件。以下是C源碼:

/**
 * 代碼片段19  epollCtl方法的c源碼
 */
JNIEXPORT void JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollCtl(JNIEnv *env, jobject this, jint epfd,
                                           jint opcode, jint fd, jint events)
{
    struct epoll_event event;
    int res;

    event.events = events;
    event.data.fd = fd;

    //調(diào)用epoll_ctl
    RESTARTABLE(epoll_ctl(epfd, (int)opcode, (int)fd, &event), res);

    /*
     * A channel may be registered with several Selectors. When each Selector
     * is polled a EPOLL_CTL_DEL op will be inserted into its pending update
     * list to remove the file descriptor from epoll. The "last" Selector will
     * close the file descriptor which automatically unregisters it from each
     * epoll descriptor. To avoid costly synchronization between Selectors we
     * allow pending updates to be processed, ignoring errors. The errors are
     * harmless as the last update for the file descriptor is guaranteed to
     * be EPOLL_CTL_DEL.
     */
    if (res < 0 && errno != EBADF && errno != ENOENT && errno != EPERM) {
        JNU_ThrowIOExceptionWithLastError(env, "epoll_ctl failed");
    }
}

EPollSelectorImpl的doSelect方法會(huì)調(diào)用EPollArrayWrapper的poll方法,而在poll方法中會(huì)調(diào)用epollWait:

    /**
     * 代碼片段20  EPollSelectorImpl中的doSelect方法、doSelect方法中調(diào)用的EPollArrayWrapper的poll方法
     * 和poll方法中調(diào)用的epollWait方法
     */
    protected int doSelect(long timeout) throws IOException {
        if (closed)
            throw new ClosedSelectorException();
        processDeregisterQueue();
        try {
            begin();
            pollWrapper.poll(timeout);
        } finally {
            end();
        }
        processDeregisterQueue();
        int numKeysUpdated = updateSelectedKeys();
        if (pollWrapper.interrupted()) {
            // Clear the wakeup pipe
            pollWrapper.putEventOps(pollWrapper.interruptedIndex(), 0);
            synchronized (interruptLock) {
                pollWrapper.clearInterrupted();
                IOUtil.drain(fd0);
                interruptTriggered = false;
            }
        }
        return numKeysUpdated;
    }

    int poll(long timeout) throws IOException {
        //更新注冊(cè)信息,如果監(jiān)視的實(shí)踐發(fā)生變化,會(huì)調(diào)用epoll_ctl往Epoll實(shí)例中增加或刪除事件
        updateRegistrations();
        //調(diào)用epollWait
        updated = epollWait(pollArrayAddress, NUM_EPOLLEVENTS, timeout, epfd);
        for (int i=0; i<updated; i++) {
            if (getDescriptor(i) == incomingInterruptFD) {
                interruptedIndex = i;
                interrupted = true;
                break;
            }
        }
        return updated;
    }
    
    private native int epollWait(long pollAddress, int numfds, long timeout,
                                 int epfd) throws IOException;
    

這里的epollWait方法也就是進(jìn)行epoll_wait系統(tǒng)調(diào)用,調(diào)用者進(jìn)程被掛起,在等待內(nèi)核I/O事件的分發(fā)。以下是C源碼:

/**
 * 代碼片段21  epollWait方法的c源碼
 */
JNIEXPORT jint JNICALL
Java_sun_nio_ch_EPollArrayWrapper_epollWait(JNIEnv *env, jobject this,
                                            jlong address, jint numfds,
                                            jlong timeout, jint epfd)
{
    struct epoll_event *events = jlong_to_ptr(address);
    int res;

    if (timeout <= 0) {           /* Indefinite or no wait */
    //如果timeout<=0,立即調(diào)用系統(tǒng)的epoll_wait函數(shù)
        RESTARTABLE(epoll_wait(epfd, events, numfds, timeout), res);
    } else {                      /* Bounded wait; bounded restarts */
    //如果timeout>0,循環(huán)調(diào)用直到超時(shí)時(shí)間到了,用循環(huán)應(yīng)該是為了防止異常喚醒
        res = iepoll(epfd, events, numfds, timeout);
    }

    if (res < 0) {
        JNU_ThrowIOExceptionWithLastError(env, "epoll_wait failed");
    }
    return res;
}

總結(jié)

至此,本文已經(jīng)對(duì)三種IO多路復(fù)用技術(shù)和在JDK中的應(yīng)用進(jìn)行了解讀。在windows環(huán)境下,JDK NIO中只有WindowsSelectorImpl這有一個(gè)Selector的非抽象實(shí)現(xiàn),采用的IO多路復(fù)用方式是select;在linux環(huán)境下PollSelectorImpl和EPollSelectorImpl兩種實(shí)現(xiàn),分別采用poll和epoll實(shí)現(xiàn)IO多路復(fù)用。本文還對(duì)這些Selector的具體實(shí)現(xiàn)進(jìn)行了詳細(xì)的解讀,不足之處,敬請(qǐng)指正。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • IO模型介紹 為了更好地了解IO模型,我們需要事先回顧下:同步、異步、阻塞、非阻塞 同步(synchronous)...
    可笑的黑耀斑閱讀 1,330評(píng)論 0 2
  • I/O復(fù)用模型多路復(fù)用是指使用一個(gè)線程來(lái)檢查多個(gè)文件描述符(Socket)的就緒狀態(tài),比如調(diào)用select和pol...
    Catcher07閱讀 2,533評(píng)論 0 4
  • 必備的理論基礎(chǔ) 1.操作系統(tǒng)作用: 隱藏丑陋復(fù)雜的硬件接口,提供良好的抽象接口。 管理調(diào)度進(jìn)程,并將多個(gè)進(jìn)程對(duì)硬件...
    drfung閱讀 3,768評(píng)論 0 5
  • 本文摘抄自linux基礎(chǔ)編程 IO概念 Linux的內(nèi)核將所有外部設(shè)備都可以看做一個(gè)文件來(lái)操作。那么我們對(duì)與外部設(shè)...
    VD2012閱讀 1,069評(píng)論 0 2
  • IO多路復(fù)用 Redis客戶端對(duì)服務(wù)端的每次調(diào)用都經(jīng)歷了發(fā)送命令,執(zhí)行命令,返回結(jié)果三個(gè)過(guò)程。其中執(zhí)行命令階段,由...
    wh4763閱讀 2,581評(píng)論 0 4

友情鏈接更多精彩內(nèi)容