前面對NIO原理進(jìn)行了大篇幅的分析,最后我們舉幾個(gè)案例,教大家如何更好的使用NIO。
基于NIO編寫的聊天DEMO
服務(wù)端
package NIO.luban.chat;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;
//聊天室服務(wù)端
public class ChatServer {
private Selector selector;
private ServerSocketChannel serverSocketChannel;
private long timeout=2000;
public ChatServer(){
try {
//服務(wù)端channel
serverSocketChannel=ServerSocketChannel.open();
//選擇器對象,底層就是IO多路復(fù)用
selector=Selector.open();
//綁定端口
serverSocketChannel.bind(new InetSocketAddress(9090));
//設(shè)置非阻塞式
serverSocketChannel.configureBlocking(false);
//注冊"監(jiān)聽連接"給Selector
SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("服務(wù)端準(zhǔn)備就緒");
start();
}catch (Exception e){
e.printStackTrace();
}
}
public void start() throws Exception{
int count=0;
long start=System.nanoTime();
while (true){
//等待感興趣的事件,沒有事件就會阻塞2秒鐘,2秒鐘沒有感興趣事件發(fā)生,程序繼續(xù)往下執(zhí)行
selector.select(timeout);
// System.out.println("2秒了");
long end=System.nanoTime();
if(end-start>= TimeUnit.MILLISECONDS.toNanos(timeout)){
count=1;
}else{
count++;//記錄空輪詢的次數(shù)
}
//空輪詢次數(shù)太多的話,重新建立連接
if(count>=10){
System.out.println("有可能發(fā)生空輪詢"+count+"次");
rebuildSelector();
count=0;
selector.selectNow();
continue;
}
//得到所有就緒的SelectionKey對象
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
//遍歷就緒事件,并判斷就緒的事件類型
while (iterator.hasNext()){
SelectionKey selectionKey=iterator.next();
//連接事件
if(selectionKey.isAcceptable()){
//獲取網(wǎng)絡(luò)通道,有客戶端來鏈接啦
SocketChannel accept = serverSocketChannel.accept();
//設(shè)置非阻塞式
accept.configureBlocking(false);
//連接上了 注冊讀取事件
accept.register(selector,SelectionKey.OP_READ);
System.out.println(accept.getRemoteAddress().toString()+"上線了");
}
//讀事件
if(selectionKey.isReadable()){ //讀取客戶端數(shù)據(jù)事件
//讀取客戶端發(fā)來的數(shù)據(jù)
readClientData(selectionKey);
}
//手動從當(dāng)前集合將本次運(yùn)行完的對象刪除,事件處理完了就要刪除
iterator.remove();
}
}
}
//重新建立鏈接
private void rebuildSelector() throws IOException {
Selector newSelector=Selector.open();
Selector oldSelect=selector;
for (SelectionKey selectionKey : oldSelect.keys()) {
//感興趣事件對應(yīng)的數(shù)值
int i = selectionKey.interestOps();
//取消舊的鍵
selectionKey.cancel();
//將channel注冊到新的選擇器上
selectionKey.channel().register(newSelector,i);
}
selector=newSelector;
oldSelect.close();//關(guān)閉舊的
}
//讀取客戶端發(fā)來的數(shù)據(jù)
private void readClientData(SelectionKey selectionKey) throws IOException {
//獲取跟客戶端連接的通道
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
//生成緩沖區(qū),用于接收客戶端傳輸進(jìn)來的數(shù)據(jù)
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
//讀取數(shù)據(jù)到緩沖區(qū),返回實(shí)際讀取到的字節(jié)數(shù),沒有數(shù)據(jù)返回-1
int read = socketChannel.read(byteBuffer);
//讀之前,將緩沖區(qū)設(shè)置為讀狀態(tài)
byteBuffer.flip();
if(read>0){//判斷確實(shí)讀到數(shù)據(jù)了
//創(chuàng)建臨時(shí)發(fā)送字節(jié)數(shù)組
byte[] bytes=new byte[read];
//將緩沖區(qū)數(shù)據(jù)寫到臨時(shí)數(shù)組
byteBuffer.get(bytes,0,read);
//讀取了數(shù)據(jù) 廣播
String s = new String(bytes,"utf-8");
//將此數(shù)據(jù)發(fā)送到其他客戶端
writeClientData(socketChannel,s);
}
}
//廣播 將讀取的數(shù)據(jù)群發(fā)
private void writeClientData(SocketChannel socketChannel,String s) throws IOException {
//獲取到所有的注冊事件,不管有沒有就緒
Set<SelectionKey> keys = selector.keys();
//遍歷事件
for (SelectionKey key : keys) {
//判斷事件是否還有效
if(key.isValid()){
//獲取事件對應(yīng)的channel
SelectableChannel channel = key.channel();
//注意,我們只需要將信息發(fā)送給客戶端
if(channel instanceof SocketChannel){
SocketChannel socketChannel1= (SocketChannel) channel;
//不需要發(fā)送給自己了
if(channel!=socketChannel){
ByteBuffer wrap = ByteBuffer.wrap(s.getBytes());
socketChannel1.write(wrap);
}
}
}
}
}
public static void main(String[] args) throws Exception {
new ChatServer().start();
}
}
客戶端
package NIO.luban.chat;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
public class ChatClient implements Runnable{
private SocketChannel socketChannel;
private Selector selector;
public ChatClient(){
try {
//得到一個(gè)網(wǎng)絡(luò)通道
socketChannel=SocketChannel.open();
//打開一個(gè)選擇器
selector=Selector.open();
//設(shè)置非阻塞式
socketChannel.configureBlocking(false);
}catch (Exception e){
e.printStackTrace();
}
}
public void doCon(){
//提供服務(wù)器ip與端口
InetSocketAddress inetSocketAddress=new InetSocketAddress("127.0.0.1",9090);
//連接服務(wù)器端
try {
//連接服務(wù)器,如果成功了
if(socketChannel.connect(inetSocketAddress)){
//注冊讀事件
socketChannel.register(selector,SelectionKey.OP_READ);
//寫數(shù)據(jù)
writeData(socketChannel);
}else{
//注冊連接事件
socketChannel.register(selector, SelectionKey.OP_CONNECT);//如果連接不上
}
} catch (IOException e) {
e.printStackTrace();
}
}
public void writeData(SocketChannel socketChannel) throws IOException {
new Thread(new Runnable() {
@Override
public void run() {
try {
while (true){
//等待你的輸入
Scanner scanner=new Scanner(System.in);
String str = scanner.nextLine();
if(str.equals("by")){
socketChannel.close();
return;
}
//將你的輸入包裝成緩沖區(qū)
ByteBuffer byteBuffer=ByteBuffer.wrap((socketChannel.getLocalAddress().toString()+"說:"+str).getBytes());
//發(fā)送你的數(shù)據(jù)
socketChannel.write(byteBuffer);
}
}catch (Exception e){
e.printStackTrace();
}
}
}).start();
}
//讀數(shù)據(jù)
public void readData() throws IOException {
ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
int read = socketChannel.read(byteBuffer);
if(read>0){
byte[] array = byteBuffer.array();
System.out.println(new String(array,"utf-8"));
}
}
public static void main(String[] args) throws IOException {
new Thread(new ChatClient()).start();
}
@Override
public void run() {
doCon();
try {
while (true){
selector.select(1000);
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()){
SelectionKey selectionKey = iterator.next();
if(selectionKey.isValid()){
if(selectionKey.isConnectable()){
SocketChannel channel = (SocketChannel) selectionKey.channel();
if (channel.finishConnect()){
channel.register(selector,SelectionKey.OP_READ);
System.out.println("bbbbbbbbbbbbb");
//寫數(shù)據(jù)
writeData(channel);
}else{
System.exit(1);
}
}
if(selectionKey.isReadable()){
readData();
}
}
iterator.remove();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
NIO的Reactor單線程模型
上面的聊天案例無論是服務(wù)端還是客戶端,都是單線程的,所有的鏈接及讀寫都是在一個(gè)main方法所在的主線程內(nèi)運(yùn)行。

拿服務(wù)器代碼來說,一個(gè)main線程,要做以下工作
- 接收客戶端連接
- 讀取已連接上的客戶端發(fā)來的數(shù)據(jù)
- 讀到數(shù)據(jù)后要解碼,處理業(yè)務(wù)邏輯
- 編碼,響應(yīng)客戶端,向客戶端寫回?cái)?shù)據(jù)
一個(gè)線程,在同一時(shí)刻只能做上面的一件事情,如果線程在讀取數(shù)據(jù)的時(shí)候阻塞了,那其他三件事都不能做,新的客戶端也無法鏈接成功。我們可以讓服務(wù)器端只處理鏈接,讀和寫交給另一個(gè)線程處理。如下圖所示:

服務(wù)端,主線程處理鏈接,讀寫交給其他線程
// Reactor線程
package NIO.luban.oneReactor;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;
import java.util.Set;
public class TCPReactor implements Runnable {
private final ServerSocketChannel ssc;
private final Selector selector;
public TCPReactor(int port) throws IOException {
//打開選擇器進(jìn)行IO多路復(fù)用
selector = Selector.open();
//打開服務(wù)器通道
ssc = ServerSocketChannel.open();
InetSocketAddress addr = new InetSocketAddress(port);
//綁定端口
ssc.socket().bind(addr); // 在ServerSocketChannel綁定監(jiān)聽端口
//設(shè)置ServerSocketChannel為非阻塞
ssc.configureBlocking(false);
//注冊鏈接事件
SelectionKey sk = ssc.register(selector, SelectionKey.OP_ACCEPT);
//將時(shí)間綁定一個(gè)處理器,事件發(fā)生后由這個(gè)處理器完成后續(xù)操作
sk.attach(new Acceptor(selector, ssc));
}
@Override
public void run() {
while (!Thread.interrupted()) { // 在線程被中斷前持續(xù)運(yùn)行
System.out.println("Waiting for new event on port: " + ssc.socket().getLocalPort() + "...");
try {
// 若沒有事件就緒則不往下執(zhí)行
if (selector.select() == 0)
continue;
} catch (IOException e) {
e.printStackTrace();
}
// 取得所有已就緒事件的key集合
Set<SelectionKey> selectedKeys = selector.selectedKeys();
//遍歷事件
Iterator<SelectionKey> it = selectedKeys.iterator();
while (it.hasNext()) {
//調(diào)度事件,在這里我們開啟另一個(gè)線程進(jìn)行讀寫操作
dispatch((it.next()));
it.remove();
}
}
}
/*
* name: dispatch(SelectionKey key)
* description: 調(diào)度方法,根據(jù)事件綁定的對象開新線程
*/
private void dispatch(SelectionKey key) {
Runnable r = (Runnable) (key.attachment()); // 根據(jù)事件之key綁定的對象開新線程
if (r != null) {
r.run();
}
}
}
鏈接處理器
public class Acceptor implements Runnable {
private final ServerSocketChannel ssc;
private final Selector selector;
public Acceptor(Selector selector, ServerSocketChannel ssc) {
this.ssc=ssc;
this.selector=selector;
}
@Override
public void run() {
try {
// 接受client鏈接請求
SocketChannel sc= ssc.accept();
System.out.println(sc.socket().getRemoteSocketAddress().toString() + " is connected.");
if(sc!=null) {
// 設(shè)置為非阻塞
sc.configureBlocking(false);
// SocketChannel向selector註冊一個(gè)讀事件,然後返回該通道的key
SelectionKey sk = sc.register(selector, SelectionKey.OP_READ);
// 使一個(gè)阻塞住的selector操作立即返回
selector.wakeup();
// 給定key一個(gè)附加的TCPHandler對象,用來處理后續(xù)讀寫操作
sk.attach(new TCPHandler(sk, sc));
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
讀寫處理器
// Handler線程
package NIO.luban.oneReactor;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
public class TCPHandler implements Runnable {
private final SelectionKey sk;
private final SocketChannel sc;
int state;
public TCPHandler(SelectionKey sk, SocketChannel sc) {
this.sk = sk;
this.sc = sc;
state = 0; // 初始狀態(tài)設(shè)定為READING ,第一次肯定是先讀客戶端數(shù)據(jù)
}
@Override
public void run() {
try {
if (state == 0)
read(); // 讀取數(shù)據(jù)
else
send(); // 發(fā)送
} catch (IOException e) {
System.out.println("[Warning!] A client has been closed.");
closeChannel();
}
}
private synchronized void read() throws IOException {
byte[] arr = new byte[1024];
ByteBuffer buf = ByteBuffer.wrap(arr);
int numBytes = sc.read(buf); // 讀取字符串
if(numBytes == -1)
{
System.out.println("[Warning!] A client has been closed.");
closeChannel();
return;
}
String str = new String(arr); // 將讀取到的byte內(nèi)容轉(zhuǎn)換字符串
if ((str != null) && !str.equals(" ")) {
//處理數(shù)據(jù)
process(str); //
System.out.println(sc.socket().getRemoteSocketAddress().toString()
+ " > " + str);
//在這個(gè)通道讀完了后,下一步往這個(gè)通道寫數(shù)據(jù)
//改成寫狀態(tài)
state = 1;
sk.interestOps(SelectionKey.OP_WRITE); // 通過key改變通道註冊的事件
sk.selector().wakeup(); // 使一個(gè)阻塞住的selector操作立即返回
}
}
private void send() throws IOException {
// get message from message queue
String str = "Your message has sent to "
+ sc.socket().getLocalSocketAddress().toString() + "\r\n";
ByteBuffer buf = ByteBuffer.wrap(str.getBytes()); // wrap自動把buf的position設(shè)為0,所以不需要再flip()
while (buf.hasRemaining()) {
sc.write(buf); // 回傳給client回應(yīng)字符串,發(fā)送buf的position位置 到limit位置為止之間的內(nèi)容
}
state = 0; // 改變狀態(tài)
sk.interestOps(SelectionKey.OP_READ); // 通過key改變通道註冊的事件
sk.selector().wakeup(); // 使一個(gè)阻塞住的selector操作立即返回
}
void process(String str) {
// do process(decode, logically process, encode)..
// ..
try {
//等待6秒,模擬數(shù)據(jù)處理
Thread.sleep(60000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//關(guān)閉通道
private void closeChannel() {
try {
sk.cancel();
sc.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
啟動服務(wù)器
public class Main {
public static void main(String[] args) {
System.out.println(Main.class.getName());
// TODO Auto-generated method stub
try {
TCPReactor reactor = new TCPReactor(1333);
reactor.run();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
客戶端
public class Client {
/**
* @param args
*/
public static void main(String[] args) {
// TODO Auto-generated method stub
String hostname="127.0.0.1";
int port = 9999;
//String hostname="127.0.0.1";
//int port=1333;
try {
Socket client = new Socket(hostname, port); // 連接至目的地
System.out.println("連接至目的地:"+ hostname);
PrintWriter out = new PrintWriter(client.getOutputStream());
BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));
BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in));
String input;
while((input=stdIn.readLine()) != null) { // 讀取輸入
out.println(input); // 發(fā)送輸入的字符串
out.flush(); // 強(qiáng)制將緩衝區(qū)內(nèi)的數(shù)據(jù)輸出
if(input.equals("exit"))
{
break;
}
System.out.println("server: "+in.readLine());
}
client.close();
System.out.println("client stop.");
} catch (UnknownHostException e) {
// TODO Auto-generated catch block
System.err.println("Don't know about host: " + hostname);
} catch (IOException e) {
// TODO Auto-generated catch block
System.err.println("Couldn't get I/O for the socket connection");
}
}
}
NIO的Reactor多線程模型
上面的單線程模型有個(gè)缺點(diǎn),就是每一個(gè)連接都要開啟一個(gè)線程,如果有10000個(gè)請求,服務(wù)器需要開啟10000個(gè)線程,顯然是不合理的,我們可以使用線程池技術(shù)來實(shí)現(xiàn)多線程模型。
首先,編寫服務(wù)端ServerSocketChannel對應(yīng)的Selector
public TCPReactor(int port) throws IOException {
//打開一個(gè)selector IO多路復(fù)用器
selector = Selector.open();
//打開服務(wù)端通道
ssc = ServerSocketChannel.open();
InetSocketAddress addr = new InetSocketAddress(port);
//綁定端口
ssc.socket().bind(addr);
ssc.configureBlocking(false); // 設(shè)置ServerSocketChannel為非阻塞
//注冊連接請求事件
SelectionKey sk = ssc.register(selector, SelectionKey.OP_ACCEPT); // ServerSocketChannel向selector註冊一個(gè)OP_ACCEPT事件,然後返回該通道的key
//綁定連接處理器,連接進(jìn)來后用Acceptor做后續(xù)處理
sk.attach(new Acceptor(selector, ssc));
}
@Override
public void run() {
while (!Thread.interrupted()) { // 在線程被中斷前持續(xù)運(yùn)行
System.out.println("Waiting for new event on port: " + ssc.socket().getLocalPort() + "...");
try {
//輪詢查看是否有事件就緒, 若沒有事件就緒則不往下執(zhí)行
if (selector.select() == 0)
continue;
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
//程序執(zhí)行到這里說明有連接事件發(fā)生了,也就是說有客戶端請求連接了
//獲取所有的連接事件,遍歷處理
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> it = selectedKeys.iterator();
while (it.hasNext()) {
//連接請求轉(zhuǎn)發(fā)
dispatch((SelectionKey) (it.next())); // 根據(jù)事件的key進(jìn)行調(diào)度
//刪除事件,表示已經(jīng)處理完了,下次循環(huán)不再處理已經(jīng)處理過的連接
it.remove();
}
}
}
//獲取事件的處Acceptor理器,開啟一個(gè)線程進(jìn)行處理
private void dispatch(SelectionKey key) {
Runnable r = (Runnable) (key.attachment());
if (r != null)
r.run();
}}
下一步,我們看Acceptor處理器是如何處理的:
public class Acceptor implements Runnable {
private final ServerSocketChannel ssc;
private final Selector selector;
public Acceptor(Selector selector, ServerSocketChannel ssc) {
this.ssc=ssc;
this.selector=selector;
}
@Override
public void run() {
try {
// 接受client連接請求
SocketChannel sc= ssc.accept();
System.out.println(sc.socket().getRemoteSocketAddress().toString() + " is connected.");
if(sc!=null) {
sc.configureBlocking(false); // 設(shè)置為非阻塞
//注冊讀事件
SelectionKey sk = sc.register(selector, SelectionKey.OP_READ); // SocketChannel向selector註冊一個(gè)OP_READ事件,然後返回該通道的key
// System.out.println(sk.selector()==selector);
selector.wakeup(); // 使一個(gè)阻塞住的selector操作立即返回
// 將讀事件交給TCPHandler進(jìn)行處理
sk.attach(new TCPHandler(sk, sc));
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
Acceptor的處理很簡單,就是接收請求,然后注冊讀事件,同事讀事件的后續(xù)處理交給處理器TCPHandler處理,我們看一下TCPHandler如何處理的:
public class TCPHandler implements Runnable {
private final SelectionKey sk;
private final SocketChannel sc;
private static final int THREAD_COUNTING = 10;
//讀寫事件交給線程池處理
private static ThreadPoolExecutor pool = new ThreadPoolExecutor(
THREAD_COUNTING, THREAD_COUNTING, 10, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>()); // 線程池
//讀寫狀態(tài)處理器
HandlerState state;
public TCPHandler(SelectionKey sk, SocketChannel sc) {
this.sk = sk;
this.sc = sc;
// 初始狀態(tài)設(shè)置為讀狀態(tài)
state = new ReadState();
pool.setMaximumPoolSize(32); // 設(shè)置線程池最大線程數(shù)
}
@Override
public void run() {
try {
//利用線程池處理讀寫
state.handle(this, sk, sc, pool);
} catch (IOException e) {
System.out.println("[Warning!] A client has been closed.");
closeChannel();
}
}
public void closeChannel() {
try {
sk.cancel();
sc.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
//讀寫狀態(tài)的更改,讀事件處理完改為寫狀態(tài),寫狀態(tài)處理完改為讀狀態(tài)
public void setState(HandlerState state) {
this.state = state;
}
}
TCPHandler 處理器維護(hù)一個(gè)線程池,用于處理真正的讀寫事件,客戶端連接服務(wù)器后初始時(shí)處理讀事件,讀事件處理完后處理寫事件,寫事件處理完后繼續(xù)處理讀事件,來回反復(fù)處理。我們看一下讀事件是如何處理的
public class ReadState implements HandlerState{
private SelectionKey sk;
public ReadState() {
}
@Override
public void changeState(TCPHandler h) {
// TODO Auto-generated method stub
h.setState(new WorkState());
}
@Override
public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
ThreadPoolExecutor pool) throws IOException { // read()
this.sk = sk;
// non-blocking下不可用Readers,因?yàn)镽eaders不支援non-blocking
byte[] arr = new byte[1024];
ByteBuffer buf = ByteBuffer.wrap(arr);
int numBytes = sc.read(buf); // 讀取字符串
if(numBytes == -1)
{
System.out.println("[Warning!] A client has been closed.");
h.closeChannel();
return;
}
String str = new String(arr); // 將讀取到的byte內(nèi)容轉(zhuǎn)為字符串型態(tài)
if ((str != null) && !str.equals(" ")) {
h.setState(new WorkState()); // 改變狀態(tài)(READING->WORKING)
pool.execute(new WorkerThread(h, str)); // do process in worker thread
System.out.println(sc.socket().getRemoteSocketAddress().toString()
+ " > " + str);
}
}
/*
* 執(zhí)行邏輯處理之函數(shù)
*/
synchronized void process(TCPHandler h, String str) {
// do process(decode, logically process, encode)..
// ..
h.setState(new WriteState()); // 改變狀態(tài)(WORKING->SENDING)
this.sk.interestOps(SelectionKey.OP_WRITE); // 通過key改變通道註冊的事件
this.sk.selector().wakeup(); // 使一個(gè)阻塞住的selector操作立即返回
}
/*
* 工作者線程
*/
class WorkerThread implements Runnable {
TCPHandler h;
String str;
public WorkerThread(TCPHandler h, String str) {
this.h = h;
this.str=str;
}
@Override
public void run() {
process(h, str);
}
}
}
讀完后,將寫事件注冊。寫一次輪詢到讀事件后,交由WriteState處理器處理
public class WriteState implements HandlerState{
public WriteState() {
}
@Override
public void changeState(TCPHandler h) {
// TODO Auto-generated method stub
h.setState(new ReadState());
}
@Override
public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
ThreadPoolExecutor pool) throws IOException { // send()
// get message from message queue
String str = "Your message has sent to "
+ sc.socket().getLocalSocketAddress().toString() + "\r\n";
ByteBuffer buf = ByteBuffer.wrap(str.getBytes()); // wrap自動把buf的position設(shè)為0,所以不需要再flip()
while (buf.hasRemaining()) {
sc.write(buf); // 回傳給client回應(yīng)字符串,發(fā)送buf的position位置 到limit位置為止之間的內(nèi)容
}
h.setState(new ReadState()); // 改變狀態(tài)(SENDING->READING)
sk.interestOps(SelectionKey.OP_READ); // 通過key改變通道註冊的事件
sk.selector().wakeup(); // 使一個(gè)阻塞住的selector操作立即返回
}
}
WriteState 寫處理器和ReadState讀處理器都繼承了HandlerState接口,
public interface HandlerState {
void changeState(TCPHandler h);
void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
ThreadPoolExecutor pool) throws IOException ;
}
上面的工作狀態(tài)轉(zhuǎn)換有WorkState完成
public class WorkState implements HandlerState {
public WorkState() {
}
@Override
public void changeState(TCPHandler h) {
// TODO Auto-generated method stub
h.setState(new WriteState());
}
@Override
public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
ThreadPoolExecutor pool) throws IOException {
// TODO Auto-generated method stub
}
}
編寫測試類
public class Main {
public static void main(String[] args) {
// TODO Auto-generated method stub
try {
TCPReactor reactor = new TCPReactor(1333);
// new Thread(reactor).start();
reactor.run();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
客戶端用單線程模式的就可以啦。
NIO主從Reactor模型
上面的代碼,有沒有發(fā)現(xiàn)所有的事件都注冊在同一個(gè)selector上,selector表示好累!講道理,ServerSocketChannel只是用來處理鏈接就可以了,它不需要處理讀事件和寫事件。讀事件和寫事件完全可以交給另一個(gè)選擇器。這就是NIO的主從Reactor模型。

主線程只負(fù)責(zé)接收客戶端連接,然后交其他從線程,使當(dāng)有客戶端連接時(shí),可以很快的受到處理。同時(shí),從線程專門負(fù)責(zé)讀取注冊到自己selector上面的客戶端數(shù)據(jù)。并發(fā)讀寫能力得到了大大的提高。當(dāng)然,如果,每一個(gè)SocketChannel的讀寫事件都注冊到單獨(dú)的selector上顯然是浪費(fèi)資源的,我們可以用一個(gè)selecort管理N個(gè)SocketChannel,也就是說對selector進(jìn)行了分組。比如,用戶管理模塊注冊一個(gè)selector,權(quán)限模塊注冊一個(gè)selector,日志模塊注冊一個(gè)selector,這樣模塊間的讀寫互不影響。selector數(shù)量取決你電腦CPU的核數(shù),一般來說selecor數(shù)量為cpu核數(shù)2。也就是說,我們的主selector有1個(gè),從selector有cpu2個(gè)。
OK!下面我們看這種主從Reactor模式的代碼如何編寫。
首先編寫服務(wù)端
public class TCPReactor implements Runnable {
private final ServerSocketChannel ssc;
private final Selector selector; // mainReactor用的selector
public TCPReactor(int port) throws IOException {
selector = Selector.open();
ssc = ServerSocketChannel.open();
ssc.configureBlocking(false); // 設(shè)置ServerSocketChannel為非阻塞
Acceptor acceptor = new Acceptor(ssc);
SelectionKey sk = ssc.register(selector,SelectionKey.OP_ACCEPT); // ServerSocketChannel向selector註冊一個(gè)OP_ACCEPT事件,然後返回該通道的key
sk.attach(acceptor); // 給定key一個(gè)附加的Acceptor對象
InetSocketAddress addr = new InetSocketAddress(port);
ssc.socket().bind(addr); // 在ServerSocketChannel綁定監(jiān)聽端口
}
@Override
public void run() {
while (!Thread.interrupted()) { // 在線程被中斷前持續(xù)運(yùn)行
System.out.println("mainReactor waiting for new event on port: "
+ ssc.socket().getLocalPort() + "...");
try {
if (selector.select() == 0) // 若沒有事件就緒則不往下執(zhí)行
continue;
} catch (IOException e) {
e.printStackTrace();
}
Set<SelectionKey> selectedKeys = selector.selectedKeys(); // 取得所有已就緒事件的key集合
Iterator<SelectionKey> it = selectedKeys.iterator();
while (it.hasNext()) {
dispatch((SelectionKey) (it.next())); // 根據(jù)事件的key進(jìn)行調(diào)度
it.remove();
}
}
}
/*
* name: dispatch(SelectionKey key)
* description: 調(diào)度方法,根據(jù)事件綁定的對象開新線程
*/
private void dispatch(SelectionKey key) {
Runnable r = (Runnable) (key.attachment()); // 根據(jù)事件之key綁定的對象開新線程
if (r != null)
r.run();
}
}
代碼跟多線層模式基本一樣,不解釋了。
再來看Acceptor處理器
public class Acceptor implements Runnable {
private final ServerSocketChannel ssc; // mainReactor監(jiān)聽的socket通道
private final int cores = Runtime.getRuntime().availableProcessors(); // 取得CPU核心數(shù)
private final Selector[] selectors = new Selector[cores]; // 創(chuàng)建核心數(shù)個(gè)selector給subReactor用
private int selIdx = 0; // 當(dāng)前可使用的subReactor索引
private TCPSubReactor[] r = new TCPSubReactor[cores]; // subReactor線程
private Thread[] t = new Thread[cores]; // subReactor線程
public Acceptor(ServerSocketChannel ssc) throws IOException {
this.ssc = ssc;
// 創(chuàng)建多個(gè)selector以及多個(gè)subReactor線程
for (int i = 0; i < cores; i++) {
selectors[i] = Selector.open();
r[i] = new TCPSubReactor(selectors[i], ssc, i);
t[i] = new Thread(r[i]);
t[i].start();
}
}
@Override
public synchronized void run() {
try {
SocketChannel sc = ssc.accept(); // 接受client連線請求
System.out.println(sc.socket().getRemoteSocketAddress().toString()
+ " is connected.");
if (sc != null) {
sc.configureBlocking(false); // 設(shè)置為非阻塞
r[selIdx].setRestart(true); // 暫停線程
selectors[selIdx].wakeup(); // 使一個(gè)阻塞住的selector操作立即返回
SelectionKey sk = sc.register(selectors[selIdx],
SelectionKey.OP_READ); // SocketChannel向selector[selIdx]註冊一個(gè)OP_READ事件,然後返回該通道的key
selectors[selIdx].wakeup(); // 使一個(gè)阻塞住的selector操作立即返回
r[selIdx].setRestart(false); // 重啟線程
sk.attach(new TCPHandler(sk, sc)); // 給定key一個(gè)附加的TCPHandler對象
if (++selIdx == selectors.length)
selIdx = 0;
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
此時(shí),我們將讀寫事件注冊到其他selector中,讀寫事件輪詢注冊到不同的子selector上,實(shí)現(xiàn)高并發(fā)處理。
private final int cores = Runtime.getRuntime().availableProcessors(); // 取得CPU核心數(shù)
private final Selector[] selectors = new Selector[cores]; // 創(chuàng)建核心數(shù)個(gè)selector給subReactor用
子selector
public class TCPReactor implements Runnable {
private final ServerSocketChannel ssc;
private final Selector selector; // mainReactor用的selector
public TCPReactor(int port) throws IOException {
selector = Selector.open();
ssc = ServerSocketChannel.open();
ssc.configureBlocking(false); // 設(shè)置ServerSocketChannel為非阻塞
Acceptor acceptor = new Acceptor(ssc);
SelectionKey sk = ssc.register(selector,SelectionKey.OP_ACCEPT); // ServerSocketChannel向selector註冊一個(gè)OP_ACCEPT事件,然後返回該通道的key
sk.attach(acceptor); // 給定key一個(gè)附加的Acceptor對象
InetSocketAddress addr = new InetSocketAddress(port);
ssc.socket().bind(addr); // 在ServerSocketChannel綁定監(jiān)聽端口
}
@Override
public void run() {
while (!Thread.interrupted()) { // 在線程被中斷前持續(xù)運(yùn)行
System.out.println("mainReactor waiting for new event on port: "
+ ssc.socket().getLocalPort() + "...");
try {
if (selector.select() == 0) // 若沒有事件就緒則不往下執(zhí)行
continue;
} catch (IOException e) {
e.printStackTrace();
}
Set<SelectionKey> selectedKeys = selector.selectedKeys(); // 取得所有已就緒事件的key集合
Iterator<SelectionKey> it = selectedKeys.iterator();
while (it.hasNext()) {
dispatch((SelectionKey) (it.next())); // 根據(jù)事件的key進(jìn)行調(diào)度
it.remove();
}
}
}
/*
* name: dispatch(SelectionKey key)
* description: 調(diào)度方法,根據(jù)事件綁定的對象開新線程
*/
private void dispatch(SelectionKey key) {
Runnable r = (Runnable) (key.attachment()); // 根據(jù)事件之key綁定的對象開新線程
if (r != null)
r.run();
}
}
讀寫處理器
public class TCPHandler implements Runnable {
private final SelectionKey sk;
private final SocketChannel sc;
private static final int THREAD_COUNTING = 10;
private static ThreadPoolExecutor pool = new ThreadPoolExecutor(
THREAD_COUNTING, THREAD_COUNTING, 10, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>()); // 線程池
HandlerState state; // 以狀態(tài)模式實(shí)現(xiàn)Handler
public TCPHandler(SelectionKey sk, SocketChannel sc) {
this.sk = sk;
this.sc = sc;
state = new ReadState(); // 初始狀態(tài)設(shè)定為READING
pool.setMaximumPoolSize(32); // 設(shè)置線程池最大線程數(shù)
}
@Override
public void run() {
try {
state.handle(this, sk, sc, pool);
} catch (IOException e) {
System.out.println("[Warning!] A client has been closed.");
closeChannel();
}
}
public void closeChannel() {
try {
sk.cancel();
sc.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
public void setState(HandlerState state) {
this.state = state;
}
}
真正的讀
public class ReadState implements HandlerState {
private SelectionKey sk;
public ReadState() {
}
@Override
public void changeState(TCPHandler h) {
// TODO Auto-generated method stub
h.setState(new WorkState());
}
@Override
public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
ThreadPoolExecutor pool) throws IOException { // read()
this.sk = sk;
// non-blocking下不可用Readers,因?yàn)镽eaders不支援non-blocking
byte[] arr = new byte[1024];
ByteBuffer buf = ByteBuffer.wrap(arr);
int numBytes = sc.read(buf); // 讀取字符串
if(numBytes == -1)
{
System.out.println("[Warning!] A client has been closed.");
h.closeChannel();
return;
}
String str = new String(arr); // 將讀取到的byte內(nèi)容轉(zhuǎn)為字符串型態(tài)
if ((str != null) && !str.equals(" ")) {
h.setState(new WorkState()); // 改變狀態(tài)(READING->WORKING)
pool.execute(new WorkerThread(h, str)); // do process in worker thread
System.out.println(sc.socket().getRemoteSocketAddress().toString()
+ " > " + str);
}
}
/*
* 執(zhí)行邏輯處理之函數(shù)
*/
synchronized void process(TCPHandler h, String str) {
// do process(decode, logically process, encode)..
// ..
h.setState(new WriteState()); // 改變狀態(tài)(WORKING->SENDING)
this.sk.interestOps(SelectionKey.OP_WRITE); // 通過key改變通道註冊的事件
this.sk.selector().wakeup(); // 使一個(gè)阻塞住的selector操作立即返回
}
/*
* 工作者線程
*/
class WorkerThread implements Runnable {
TCPHandler h;
String str;
public WorkerThread(TCPHandler h, String str) {
this.h = h;
this.str=str;
}
@Override
public void run() {
process(h, str);
}
}
}
真正的寫
public class WriteState implements HandlerState {
public WriteState() {
}
@Override
public void changeState(TCPHandler h) {
// TODO Auto-generated method stub
h.setState(new ReadState());
}
@Override
public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,
ThreadPoolExecutor pool) throws IOException { // send()
// get message from message queue
String str = "Your message has sent to "
+ sc.socket().getLocalSocketAddress().toString() + "\r\n";
ByteBuffer buf = ByteBuffer.wrap(str.getBytes()); // wrap自動把buf的position設(shè)為0,所以不需要再flip()
while (buf.hasRemaining()) {
sc.write(buf); // 回傳給client回應(yīng)字符串,發(fā)送buf的position位置 到limit位置為止之間的內(nèi)容
}
h.setState(new ReadState()); // 改變狀態(tài)(SENDING->READING)
sk.interestOps(SelectionKey.OP_READ); // 通過key改變通道註冊的事件
sk.selector().wakeup(); // 使一個(gè)阻塞住的selector操作立即返回
}
}
編寫客戶端
public class Main {
public static void main(String[] args) {
// TODO Auto-generated method stub
try {
TCPReactor reactor = new TCPReactor(1333);
// reactor.run();
Thread thread = new Thread(reactor);
thread.start();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
如果第一次接觸NIO,上面的代碼讀起來比較費(fèi)勁,NIO編程確實(shí)麻煩,而且很容易出錯,現(xiàn)實(shí)開發(fā)中不會用原生NIO庫,小編都是用netty這個(gè)NIO框架進(jìn)行編程,簡單 高效 穩(wěn)定,所以看不懂上面的代碼沒關(guān)系,只要理解上面的三幅圖即可,這三幅圖是netty最最核心的。下篇開始講netty應(yīng)用及源碼。