要點
- 是一個抽象類,表示selectableChannel在Selector中注冊的標識.每個Channel向Selector注冊時,都將會創(chuàng)建一個selectionKey
- 選擇鍵將Channel與Selector建立了關(guān)系,并維護了channel事件.
- 可以通過cancel方法取消鍵,取消的鍵不會立即從selector中移除,而是添加到cancelledKeys中,在下一次select操作時移除它.所以在調(diào)用某個key時,需要使用isValid進行校驗.
操作集
- interest 集合:當前channel感興趣的操作,此類操作將會在下一次選擇器select操作時被交付,可以通過selectionKey.interestOps(int)進行修改.
- ready 集合:表示此選擇鍵上,已經(jīng)就緒的操作.每次select時,選擇器都會對ready集合進行更新;外部程序無法修改此集合.
操作屬性
- OP_ACCEPT:連接可接受操作,僅ServerSocketChannel支持
- OP_CONNECT:連接操作,Client端支持的一種操作
- OP_READ/OP_WRITE
0表示什么?
- 這些opts都不為0,如果向selector之中register一個為“0”的opts,表示此channel不關(guān)注任何類型的事件。(言外之意,register方法只是獲取一個selectionKey,具體這個Channel對何種事件感興趣,可以在稍后操作)
方法列表
- public abstract SelectableChannel channel():返回此選擇鍵所關(guān)聯(lián)的通道.即使此key已經(jīng)被取消,仍然會返回.
- public abstract Selector selector():返回此選擇鍵所關(guān)聯(lián)的選擇器,即使此鍵已經(jīng)被取消,仍然會返回.
- public abstract boolean isValid():檢測此key是否有效.當key被取消,或者通道被關(guān)閉,或者selector被關(guān)閉,都將導(dǎo)致此key無效.在AbstractSelector.removeKey(key)中,會導(dǎo)致selectionKey被置為無效.
- public abstract void cancel():請求將此鍵取消注冊.一旦返回成功,那么該鍵就是無效的,被添加到selector的cancelledKeys中.cancel操作將key的valid屬性置為false,并執(zhí)行selector.cancel(key)(即將key加入cancelledkey集合)
- public abstract int interesOps():獲得此鍵的interes集合.
- public abstract SelectionKey interestOps(int ops):將此鍵的interst設(shè)置為指定值.此操作會對ops和channel.validOps進行校驗.如果此ops不會當前channel支持,將拋出異常.
- public abstract int readyOps():獲取此鍵上ready操作集合.即在當前通道上已經(jīng)就緒的事件.
- public final boolean isReadable(): 檢測此鍵是否為"read"事件.等效于:k.,readyOps() & OP_READ != 0;還有isWritable(),isConnectable(),isAcceptable()
- public final Object attach(Object ob):將給定的對象作為附件添加到此key上.在key有效期間,附件可以在多個ops事件中傳遞.
- public final Object attachment():獲取附件.一個channel的附件,可以再當前Channel(或者說是SelectionKey)生命周期中共享,但是attachment數(shù)據(jù)不會作為socket數(shù)據(jù)在網(wǎng)絡(luò)中傳輸.
參考代碼
public class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;
Reactor(int port) throws IOException {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(), port);
serverSocket.socket().bind(address);
serverSocket.configureBlocking(false);
//向selector注冊該channel
SelectionKey sk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("-->Start serverSocket.register!");
//利用sk的attache功能綁定Acceptor 如果有事情,觸發(fā)Acceptor
sk.attach(new Acceptor());
System.out.println("-->attach(new Acceptor()!");
}
public void run() { // normally in a new Thread
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
//Selector如果發(fā)現(xiàn)channel有OP_ACCEPT或READ事件發(fā)生,下列遍歷就會進行。
while (it.hasNext()) {
//來一個事件 第一次觸發(fā)一個accepter線程
//以后觸發(fā)SocketReadHandler
dispatch((SelectionKey) (it.next()));
}
selected.clear();
}
} catch (IOException ex) {
System.out.println("reactor stop!" + ex);
}
}
//運行Acceptor或SocketReadHandler
void dispatch(SelectionKey k) {
Runnable r = (Runnable) (k.attachment());
if (r != null) {
r.run();
}
}
class Acceptor implements Runnable { // inner
public void run() {
try {
System.out.println("-->ready for accept!");
SocketChannel c = serverSocket.accept();
if (c != null)
//調(diào)用Handler來處理channel
new Handler(selector, c);
} catch (IOException ex) {
}
}
}
}
public class Handler implements Runnable {
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(Integer.MAX_VALUE);
ByteBuffer output = ByteBuffer.allocate(Integer.MAX_VALUE);
static final int READING = 0, SENDING = 1;
int state = READING;
public Handler(Selector sel, SocketChannel c) throws IOException {
socket = c;
//設(shè)置為非阻塞模式
c.configureBlocking(false);
//此處的0,表示不關(guān)注任何時間
sk = socket.register(sel, 0);
//將SelectionKey綁定為本Handler 下一步有事件觸發(fā)時,將調(diào)用本類的run方法
sk.attach(this);
//將SelectionKey標記為可讀,以便讀取,不可關(guān)注可寫事件
sk.interestOps(SelectionKey.OP_READ);
sel.wakeup();
}
boolean inputIsComplete() {
return false;
}
boolean outputIsComplete() {
return false;
}
//這里可以通過線程池處理數(shù)據(jù)
void process() {
}
public void run() {
try {
if (state == READING) {
read();
} else if (state == SENDING) {
send();
}
} catch (IOException ex) { /* ... */ }
}
void read() throws IOException {
socket.read(input);
if (inputIsComplete()) {
process();
state = SENDING;
// Normally also do first write now
sk.interestOps(SelectionKey.OP_WRITE);
}
}
void send() throws IOException {
socket.write(output);
if (outputIsComplete()) {
//
sk.cancel();
}
}
}
參考來源
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。