Selector詳解

Selector簡述

A multiplexor of {@link SelectableChannel} objects.

參照Java doc中Selector描述的第一句話,Selector的作用是Java NIO中管理一組多路復用的SelectableChannel對象,并能夠識別通道是否為諸如讀寫事件做好準備的組件

image.png

Selector的創(chuàng)建過程如下:

// 1.創(chuàng)建Selector
Selector selector = Selector.open();

// 2.將Channel注冊到選擇器中
// ....... new channel的過程 ....

//Notes:channel要注冊到Selector上就必須是非阻塞的,所以FileChannel是不可以使用Selector的,因為FileChannel是阻塞的
channel.configureBlocking(false);

// 第二個參數(shù)指定了我們對 Channel 的什么類型的事件感興趣
SelectionKey key = channel.register(selector , SelectionKey.OP_READ);

// 也可以使用或運算|來組合多個事件,例如
SelectionKey key = channel.register(selector , SelectionKey.OP_READ | SelectionKey.OP_WRITE);

// 不過值得注意的是,一個 Channel 僅僅可以被注冊到一個 Selector 一次, 如果將 Channel 注冊到 Selector 多次, 那么其實就是相當于更新 SelectionKey 的 interest set.

一個Channel在Selector注冊其代表的是一個SelectionKey事件,SelectionKey的類型包括:

  • OP_READ:可讀事件;值為:1<<0
  • OP_WRITE:可寫事件;值為:1<<2
  • OP_CONNECT:客戶端連接服務端的事件(tcp連接),一般為創(chuàng)建SocketChannel客戶端channel;值為:1<<3
  • OP_ACCEPT:服務端接收客戶端連接的事件,一般為創(chuàng)建ServerSocketChannel服務端channel;值為:1<<4

一個Selector內(nèi)部維護了三組keys:

  1. key set:當前channel注冊在Selector上所有的key;可調(diào)用keys()獲取
  2. selected-key set:當前channel就緒的事件;可調(diào)用selectedKeys()獲取
  3. cancelled-key:主動觸發(fā)SelectionKey#cancel()方法會放在該集合,前提條件是該channel沒有被取消注冊;不可通過外部方法調(diào)用

Selector類中總共包含以下10個方法:

  • open():創(chuàng)建一個Selector對象
  • isOpen():是否是open狀態(tài),如果調(diào)用了close()方法則會返回false
  • provider():獲取當前Selector的Provider
  • keys():如上文所述,獲取當前channel注冊在Selector上所有的key
  • selectedKeys():獲取當前channel就緒的事件列表
  • selectNow():獲取當前是否有事件就緒,該方法立即返回結果,不會阻塞;如果返回值>0,則代表存在一個或多個
  • select(long timeout):selectNow的阻塞超時方法,超時時間內(nèi),有事件就緒時才會返回;否則超過時間也會返回
  • select():selectNow的阻塞方法,直到有事件就緒時才會返回
  • wakeup():調(diào)用該方法會時,阻塞在select()處的線程會立馬返回;(ps:下面一句劃重點)即使當前不存在線程阻塞在select()處,那么下一個執(zhí)行select()方法的線程也會立即返回結果,相當于執(zhí)行了一次selectNow()方法
  • close(): 用完Selector后調(diào)用其close()方法會關閉該Selector,且使注冊到該Selector上的所有SelectionKey實例無效。channel本身并不會關閉。

關于SelectionKey

談到Selector就不得不提SelectionKey,兩者是緊密關聯(lián),配合使用的;如上文所示,往Channel注冊Selector會返回一個SelectionKey對象,
這個對象包含了如下內(nèi)容:

  • interest set,當前Channel感興趣的事件集,即在調(diào)用register方法設置的interes set
  • ready set
  • channel
  • selector
  • attached object,可選的附加對象

interest set
可以通過SelectionKey類中的方法來獲取和設置interes set

// 返回當前感興趣的事件列表
int interestSet = key.interestOps();

// 也可通過interestSet判斷其中包含的事件
boolean isInterestedInAccept  = interestSet & SelectionKey.OP_ACCEPT;
boolean isInterestedInConnect = interestSet & SelectionKey.OP_CONNECT;
boolean isInterestedInRead    = interestSet & SelectionKey.OP_READ;
boolean isInterestedInWrite   = interestSet & SelectionKey.OP_WRITE;    

// 可以通過interestOps(int ops)方法修改事件列表
key.interestOps(interestSet | SelectionKey.OP_WRITE);

ready set
當前Channel就緒的事件列表

int readySet = key.readyOps();

// 也可通過四個方法來分別判斷不同事件是否就緒
key.isReadable();    //讀事件是否就緒
key.isWritable();    //寫事件是否就緒
key.isConnectable(); //客戶端連接事件是否就緒
key.isAcceptable();  //服務端連接事件是否就緒

channel和selector
我們可以通過SelectionKey來獲取當前的channel和selector

// 返回當前事件關聯(lián)的通道,可轉(zhuǎn)換的選項包括:`ServerSocketChannel`和`SocketChannel`
Channel channel = key.channel();

//返回當前事件所關聯(lián)的Selector對象
Selector selector = key.selector();

attached object
我們可以在selectionKey中附加一個對象:

key.attach(theObject);
Object attachedObj = key.attachment();

或者在注冊時直接附加:

SelectionKey key = channel.register(selector, SelectionKey.OP_READ, theObject);

一個Selector完整的例子

一個Selector的基本使用流程包括(讀者不放試著按照這個流程自己實現(xiàn)一波):

  1. 創(chuàng)建一個Selector
  2. 將Channel注冊到Selector中,并設置監(jiān)聽的interest set
  3. loop
    • 執(zhí)行select()方法
    • 調(diào)用selector.selectedKeys()獲取當前就緒的key
    • 迭代selectedKeys
      • 從key中獲取對應的Channel和附加信息(if exist)
      • 判斷是哪些 IO 事件已經(jīng)就緒了, 然后處理它們. 如果是 OP_ACCEPT 事件, 則調(diào)用 "SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept()" 獲取 SocketChannel, 并將它設置為 非阻塞的, 然后將這個 Channel 注冊到 Selector 中.
      • 根據(jù)需要更改 selected key 的監(jiān)聽事件.
      • 將已經(jīng)處理過的 key 從 selected keys 集合中刪除.
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;

/**
 * Created by locoder on 2019/2/28.
 */
public class SelectorDemo {

    public static void main(String[] args) throws IOException {
        // create a Selector
        Selector selector = Selector.open();

        // new Server Channel
        ServerSocketChannel ssc = ServerSocketChannel.open();
        // config async
        ssc.configureBlocking(false);

        ssc.socket().bind(new InetSocketAddress(8080));

        // register to selector
        // Notes:這里只能注冊OP_ACCEPT事件,否則將會拋出IllegalArgumentException,詳見AbstractSelectableChannel#register方法
        ssc.register(selector, SelectionKey.OP_ACCEPT);

        // loop
        for (; ; ) {
            int nKeys = selector.select();

            if (nKeys > 0) {
                Set<SelectionKey> keys = selector.selectedKeys();

                for (Iterator<SelectionKey> it = keys.iterator(); it.hasNext(); ) {
                    SelectionKey key = it.next();

                    // 處理客戶端連接事件
                    if (key.isAcceptable()) {
                        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();

                        SocketChannel clientChannel = serverSocketChannel.accept();

                        clientChannel.configureBlocking(false);

                        clientChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024 * 1024));

                    } else if (key.isReadable()) {
                        SocketChannel socketChannel = (SocketChannel) key.channel();

                        ByteBuffer buf = (ByteBuffer) key.attachment();

                        int readBytes = 0;
                        int ret = 0;

                        try {
                            while ((ret = socketChannel.read(buf)) > 0) {
                                readBytes += ret;
                            }

                            if (readBytes > 0) {
                                String message = decode(buf);
                                System.out.println(message);

                                // 這里注冊寫事件,因為寫事件基本都處于就緒狀態(tài);
                                // 從處理邏輯來看,一般接收到客戶端讀事件時也會伴隨著寫,類似HttpServletRequest和HttpServletResponse
                                key.interestOps(key.interestOps() | SelectionKey.OP_WRITE);

                            }
                        } finally {
                            // 將緩沖區(qū)切換為待讀取狀態(tài)
                            buf.flip();
                        }

                    } else if (key.isValid() && key.isWritable()) {
                        SocketChannel socketChannel = (SocketChannel) key.channel();

                        ByteBuffer buf = (ByteBuffer) key.attachment();

                        if (buf.hasRemaining()) {
                            socketChannel.write(buf);
                        } else {
                            // 取消寫事件,否則寫事件內(nèi)的代碼會不斷執(zhí)行
                            // 因為寫事件就緒的條件是判斷緩沖區(qū)是否有空閑空間,絕大多時候緩存區(qū)都是有空閑空間的
                            key.interestOps(key.interestOps() & (~SelectionKey.OP_WRITE));
                        }

                        // 丟棄本次內(nèi)容
                        buf.compact();
                    }
                    // 注意, 在每次迭代時, 我們都調(diào)用 "it.remove()" 將這個 key 從迭代器中刪除,
                    // 因為 select() 方法僅僅是簡單地將就緒的 IO 操作放到 selectedKeys 集合中,
                    // 因此如果我們從 selectedKeys 獲取到一個 key, 但是沒有將它刪除, 那么下一次 select 時, 這個 key 所對應的 IO 事件還在 selectedKeys 中.
                    it.remove();
                }
            }

        }
    }

    /**
     * 將ByteBuffer轉(zhuǎn)換為String
     *
     * @param in
     * @return
     * @throws UnsupportedEncodingException
     */
    private static String decode(ByteBuffer in) throws UnsupportedEncodingException {
        String receiveText = new String(in.array(), 0, in.capacity(), Charset.defaultCharset());
        int index = -1;
        if ((index = receiveText.lastIndexOf("\r\n")) != -1) {
            receiveText = receiveText.substring(0, index);
        }
        return receiveText;
    }

}

深入Selector源碼

下面我們繼續(xù)按照Selector的編碼過程來學習Selector源碼

(1)創(chuàng)建過程


image.png

從上圖上可以比較清晰得看到,openjdk中Selector的實現(xiàn)是SelectorImpl,
然后SelectorImpl又將職責委托給了具體的平臺,比如圖中框出的linux2.6以后才有的EpollSelectorImpl, Windows平臺則是WindowsSelectorImpl, MacOSX平臺是KQueueSelectorImpl.

public static Selector open() throws IOException {
    return SelectorProvider.provider().openSelector();
}
// 創(chuàng)建是依賴SelectorProvider.provider()系統(tǒng)級提供
// 我們來看SelectorProvider.provider()方法
// 從系統(tǒng)配置java.nio.channels.spi.SelectorProvider獲取
if (loadProviderFromProperty()) return provider;
// 從ServiceLoader#load
if (loadProviderAsService()) return provider;
// 如果還不存在則使用默認provider,即KQueueSelectorProvider
provider = sun.nio.ch.DefaultSelectorProvider.create();

(2)注冊過程

// AbstractSelectableChannel#register方法
SelectionKey register(Selector sel, int ops,Object att){
        synchronized (regLock) {
            // 判斷當前Channel是否關閉
            if (!isOpen())
                throw new ClosedChannelException();
            // 判斷參數(shù)ops是否只包含OP_ACCEPT
            if ((ops & ~validOps()) != 0)
                throw new IllegalArgumentException();
            // 使用Selector則Channel必須是非阻塞的
            if (blocking)
                throw new IllegalBlockingModeException();
            // 根據(jù)Selector找到SelectionKey,它是可復用的,一個Selector只能有一個SelectionKey,如果存在則直接覆蓋ops和attachedObject
            SelectionKey k = findKey(sel);
            if(key != null) {
                ....
            }
            // 如果不存在則直接實例化一個SelectionKeyImpl對象,并為ops和attachedObject賦值;實際調(diào)用AbstractSelector的register方法
            // 將Selector和SelectionKey綁定
            if (k == null) {
                // New registration
                synchronized (keyLock) {
                    if (!isOpen())
                        throw new ClosedChannelException();
                    k = ((AbstractSelector)sel).register(this, ops, att);
                    addKey(k);
                }
            }
            return k;
        }
    }

(3)select過程
select是Selector模型中最關鍵的一步,下面讓我們來研究一下其過程

// 首先來看select的調(diào)用鏈
// SelectorImpl#select -> SelectorImpl#lockAndDoSelect -> 具體provider提供的Selector中的doSelect方法
// 值得注意的是:在lockAndDoSelect方法中執(zhí)行了`synchronized(this)`操作,故select操作是阻塞的

// open過程中我們知道,Selector有好幾種實現(xiàn),但基本都包含以下操作;感興趣的同學可以具體看看這位大神寫的博客:https://juejin.im/entry/5b51546df265da0f70070b93;這里就不深入寫這部分了,篇幅有點長

int doSelect(long timeout) {
    // close判斷,如果closed,則拋出ClosedSelectorException
    
    // 處理掉被cancel掉的SelectionKey,即`cancelled-key`
    this.processDeregisterQueue();
    
    try {
        // 設置中斷器,實際調(diào)用的是AbstractSelector.this.wakeup();方法
    // 調(diào)用的是方法AbstractInterruptibleChannel.blockedOn(Interruptible);
        this.begin();
        // 從具體的模型中(kqueue、poll、epoll)選擇
        this.pollWrapper.poll(...);
    }finally {
        // 關閉中斷器
        this.end();
    }
    
    // 重新處理被cancel的key
    this.processDeregisterQueue();
    // 更新各個事件的狀態(tài)
    int selectedKeys = this.updateSelectedKeys();
    
    // 可能還有一些操作
    .....
    
    return selectedKeys;
    
    
    
    
}

疑惑點

Q:各事件分別在什么條件下就緒?

  • OP_ACCEPT:客戶端向服務端發(fā)起TCP連接建立【服務端的代碼監(jiān)聽】
  • OP_CONNECT:客戶端與服務端的連接建立成功或失敗【客戶端代碼監(jiān)聽】
  • OP_READ:客戶端向服務端發(fā)請求或服務端向客戶端寫入數(shù)據(jù)時
  • OP_WRITE:判斷緩沖區(qū)是否有空閑空間

FYI

微信搜索公眾號"一只懶懶的coder"可關注我獲取最新動態(tài)哦?。∫部蓲呙柘路降亩S碼哦?。?!

一只懶懶的coder
最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容