緊跟前文,先把前面缺的例子補全,接著一步一步分析。
這里的內(nèi)容比較少,直接以注釋的形式補充在每一行上,有必要的地方會在底下補上圖文。
public void dispatch(SelectionKey key) throws IOException, InterruptedException {
if (key.isAcceptable()) {
//前文中已經(jīng)輪訓(xùn)過一遍updateList,通過
//ski.channel.translateAndSetReadyOps已經(jīng)將準(zhǔn)備好的操作set到key中,
//這里的key有4種操作OP_READ,OP_WRITE,OP_CONNECT,OP_ACCEPT,
//這里只有ServerSocketChannel支持accept操作,對應(yīng)的操作如最底下
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//accept操作必定是ServerSocketChannel
// 接受一個連接. 底下詳細講
SocketChannel sc = ssc.accept();
// 對新的連接的channel注冊read事件.
sc.configureBlocking(false);
//這里的selector是另一個了
sc.register(readBell.getSelector(), SelectionKey.OP_READ);
// 如果讀取線程還沒有啟動,那就啟動一個讀取線程.
synchronized(NioServer.this) {
if (!NioServer.this.isReadBellRunning) {
NioServer.this.isReadBellRunning = true;
new Thread(readBell).start();
}
}
} else if (key.isReadable()) {
// 這是一個read事件,并且這個事件是注冊在socketchannel上的.
SocketChannel sc = (SocketChannel) key.channel(); //這個channel就是 ssc.accept()對應(yīng)的那個
// 寫數(shù)據(jù)到buffer
int count = sc.read(temp);
if (count < 0) {
// 客戶端已經(jīng)斷開連接.
key.cancel();
sc.close();
return;
}
// 切換buffer到讀狀態(tài),內(nèi)部指針歸位.
temp.flip();
String msg = Charset.forName("UTF-8").decode(temp).toString();
System.out.println("Server received ["+msg+"] from client address:" + sc.getRemoteAddress());
Thread.sleep(1000);
// echo back.
sc.write(ByteBuffer.wrap(msg.getBytes(Charset.forName("UTF-8"))));
// 清空buffer
temp.clear();
}
}
}
核心語句 ssc.accept()
public SocketChannel accept() throws IOException {
synchronized (lock) {//加鎖
if (!isOpen())
throw new ClosedChannelException();
if (!isBound())
throw new NotYetBoundException();
SocketChannel sc = null;
int n = 0;
FileDescriptor newfd = new FileDescriptor();
InetSocketAddress[] isaa = new InetSocketAddress[1];
try {
begin();//和下面的end()配對,用于線程被中斷時關(guān)閉channel,這里另開一篇再講
if (!isOpen())
return null;
thread = NativeThread.current();
for (;;) {
n = accept0(this.fd, newfd, isaa);//native函數(shù),解釋:
// Accepts a new connection, setting the given file descriptor to refer to
// the new socket and setting isaa[0] to the socket's remote address.
// Returns 1 on success, or IOStatus.UNAVAILABLE (if non-blocking and no
// connections are pending) or IOStatus.INTERRUPTED.
if ((n == IOStatus.INTERRUPTED) && isOpen())//如果狀態(tài)是INTERRUPTED且channel還是open的則繼續(xù)嘗試
continue;
break;
}
} finally {
thread = 0;
end(n > 0);
assert IOStatus.check(n);
}
if (n < 1)
return null;
IOUtil.configureBlocking(newfd, true);
InetSocketAddress isa = isaa[0];
//新建一個socketchannel供read/write操作使用
sc = new SocketChannelImpl(provider(), newfd, isa);
//IP端口權(quán)限檢測
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
try {
sm.checkAccept(isa.getAddress().getHostAddress(),
isa.getPort());
} catch (SecurityException x) {
sc.close();
throw x;
}
}
return sc;
}
}
就緒操作與通道對應(yīng)關(guān)系:
| OP_ACCEPT | OP_CONNECT | OP_WRITE | OP_READ | |
|---|---|---|---|---|
| 客戶端 SocketChannel | Y | Y | Y | |
| 服務(wù)端 ServerSocketChannel | Y | |||
| 服務(wù)端 SocketChannel | Y | Y |