經典的網絡編程
一般網絡編程都具有以下幾個步驟:
- 讀取請求 Read request
- 解碼請求 Decode request
- 處理服務 Process services
- 加密回應 Encode reply
- 發(fā)送回應 Send reply
但是每一步的處理的內容和成本都不一樣。xml、json、file等等
每種類型的處理程序都需要在各自都線程中來進行,用代碼表示就是如下
class Server implements Runnable {
public void run() {
try {
ServerSocket ss = new ServerSocket(PORT);
while (!Thread.interrupted())
new Thread(new Handler(ss.accept())).start();
// or, single-threaded, or a thread pool
} catch (IOException ex) { /* ... */ }
}
static class Handler implements Runnable {
final Socket socket;
Handler(Socket s) { socket = s; }
public void run() {
try {
byte[] input = new byte[MAX_INPUT];
socket.getInputStream().read(input);
byte[] output = process(input);
socket.getOutputStream().write(output);
} catch (IOException ex) { /* ... */ }
}
private byte[] process(byte[] cmd) { /* ... */ }
}
}
如果當前運行線程沒有被中斷就一直循環(huán)創(chuàng)建一個線程或者線程池用來處理ServerSocket里面的Socket請求。
注意:Thread.interrupted()和Thread.isInterrupted()
這樣會造成我們需要為每一個socket請求創(chuàng)建一個線程來處理對應的數(shù)據。一旦用戶過多或者處理程序時間較長就會造成各種各樣的問題。無法并發(fā),前面的活沒干完后面的需要等著,負載等等
優(yōu)化方向
增加負載
增加硬件 (CPU, memory, disk, bandwidth)
同時滿足可用性和性能目標
減短延遲
滿足高峰需求
提高服務質量
通常來說Divide-and-conquer(分而治之)是實現(xiàn)任何可擴展性目標的最佳方法
Divide-and-conquer(分而治之)
將整體任務切割成小任務。每個小任務只執(zhí)行單一任務,并且不會阻塞其他小任務的運行
用IO事件來觸發(fā)每個小任務的啟動
java.nio 中支持的基本機制
非阻塞讀取和寫入
用感測到的IO事件來調度相關的任務
事件驅動設計中可能出現(xiàn)的無盡變化
Event-driven Designs 事件驅動設計
比較有效的方法
占用更少的資源,每個客戶端不一定需要單獨創(chuàng)建一個線程
減少開銷。減少Context的切換可以相應的減少鎖定
調度可能會更慢,所以必須手動將動作綁定到事件
更難的編程
將動作分解為簡單非阻塞的
類似于GUI事件驅動的動作
無法消除所有阻塞。比如:GC,頁面錯誤等
必須跟蹤服務的邏輯狀態(tài)
AWT事件機制
IO事件驅動使用相似的想法,但設計不同
java.awt是一個軟件包,包含用于創(chuàng)建用戶界面和繪制圖形圖像的所有分類
Reactor Pattern(反應堆模式)
- Reactor通過調度來響應IO事件。如:AWT thread
- Handler執(zhí)行非阻塞動作。如:AWT ActionListeners
- Manager將Handler綁定到事件上。如:AWT addActionListener
預先使用Manager將Handler綁定到指定的事件上,如onClick
用戶點擊按鈕的時候,Reactor獲取到事件,并調度事先綁定好的處理程序
經典的Reactor設計
單線程版本
java.nio 支持
Channels
支持非阻塞的讀取文件和socket連接
Buffers
Channels通過Buffers可以直接讀取或者寫入對象
Selectors
通知一組Channels觸發(fā)了哪些IO事件
SelectionKeys
維護IO事件的狀態(tài)和綁定
Reactor 實現(xiàn)
Setup
class Reactor implements Runnable {
//Selector選擇器
final Selector selector;
//Socket服務通道
final ServerSocketChannel serverSocket;
Reactor(int port) throws IOException {
//創(chuàng)建一個Selector
selector = Selector.open();
//創(chuàng)建一個Socket Channel
serverSocket = ServerSocketChannel.open();
//將Socket Channel綁定到指定端口
serverSocket.socket().bind(
new InetSocketAddress(port));
//設置Socket Channel為非阻塞
serverSocket.configureBlocking(false);
//將Selector和Socket Channel注冊到SelectionKey
SelectionKey sk =
serverSocket.register(selector, SelectionKey.OP_ACCEPT);
//將SelectionKey附加到接受者
sk.attach(new Acceptor());
}
/*
也可以使用SPI提供接口:
SelectorProvider p = SelectorProvider.provider();
selector = p.openSelector();
serverSocket = p.openServerSocketChannel();
*/
}
Dispatch Loop
// class Reactor continued
public void run() { //通常在新線程中執(zhí)行
try {
//如果當前線程沒有中斷就循環(huán)執(zhí)行
while (!Thread.interrupted()) {
//查詢選擇器中獲取已經準備好的并且注冊過的操作
selector.select();
//獲取所有已經準備好的并且注冊過的操作
Set selected = selector.selectedKeys();
//循環(huán)遍歷
for (Object o : selected) {
//調度任務并處理事件操作
dispatch((SelectionKey) o);
}
//移除選擇器
selected.clear();
}
} catch (IOException ex) { /* ... */ }
}
//處理事件操作
void dispatch(SelectionKey k) {
//獲取SelectionKey中綁定的處理程序,如果不為空就執(zhí)行
Runnable r = (Runnable) (k.attachment());
if (r != null)
r.run();
}
Acceptor
// class Reactor continued
// 創(chuàng)建接收器
class Acceptor implements Runnable {
public void run() {
try {
//獲取連接成功到客戶端連接
SocketChannel c = serverSocket.accept();
if (c != null) {
//如果不為空就處理客戶端連接以及selector
new Handler(selector, c);
}
} catch (IOException ex) { /* ... */ }
}
}
Handler setup
//處理程序
final class Handler implements Runnable {
//指定最大輸入bytes
private static final int MAXIN = 1024;
//指定最大輸出bytes
private static final int MAXOUT = 1024;
//客戶端連接
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(MAXIN);
ByteBuffer output = ByteBuffer.allocate(MAXOUT);
static final int READING = 0, SENDING = 1;
int state = READING;
Handler(Selector sel, SocketChannel c) throws IOException {
socket = c;
//配置非阻塞模式
c.configureBlocking(false);
//將客戶端連接和讀注冊到SelectionKey
sk = socket.register(sel, SelectionKey.OP_READ);
//將SelectionKey附加到當前線程的run
sk.attach(this);
//將SelectionKey的操作設置為讀取
sk.interestOps(SelectionKey.OP_READ);
//喚醒Selector
sel.wakeup();
}
}
Request handling
// class Handler continued
//輸入處理完成
boolean inputIsComplete() { /* ... */
return true;
}
//輸出處理完成
boolean outputIsComplete() { /* ... */
return true;
}
//處理過程中
void process() { /* ... */ }
public void run() {
try {
//根據不同的狀態(tài)進行不同的處理程序
if (state == READING) read();
else if (state == SENDING) send();
} catch (IOException ex) { /* ... */ }
}
//讀取數(shù)據
void read() throws IOException {
//從客戶端獲取數(shù)據
socket.read(input);
//如果讀取完成
if (inputIsComplete()) {
//處理數(shù)據
process();
//標記為發(fā)送狀態(tài)
state = SENDING;
// 將SelectionKey的操作設置為寫入
sk.interestOps(SelectionKey.OP_WRITE);
}
}
//發(fā)送數(shù)據
void send() throws IOException {
//將數(shù)據寫入客戶端連接
socket.write(output);
//發(fā)送完成后將SelectionKey中的綁定取消
if (outputIsComplete()) sk.cancel();
}
}
Per-State Handlers
GoF State-Object pattern 狀態(tài)模式,針對狀態(tài)重新綁定對應的處理程序
//處理程序
class Handler {
// 初始化為讀取狀態(tài)
public void run() {
//客戶端讀取數(shù)據
socket.read(input);
//讀取完成
if (inputIsComplete()) {
//處理數(shù)據
process();
//附加新的處理程序Sender
sk.attach(new Sender());
//標記狀態(tài)為寫入
sk.interest(SelectionKey.OP_WRITE);
//喚醒SelectionKey中綁定的Selector
sk.selector().wakeup();
}
}
//處理程序Sender
class Sender implements Runnable {
public void run(){ // ...
//寫入數(shù)據
socket.write(output);
//寫入完成之后將SelectionKey中的綁定取消
if (outputIsComplete()) sk.cancel();
}
}
}
Multithreaded Designs 多線程設計
戰(zhàn)略性的為擴展性增加線程
主要適用于多處理器
工作線程
Reactor可以快速的觸發(fā)處理程序
因為處理程序過多或者處理時間過程會減慢Reactor的速度
將非IO處理放到其他的線程
多個Reactor處理線程
Reactor線程任務過多會導致IO飽和
分配一些任務給其他Reactor線程
負載均衡以匹配CPU和IO速率
Worker Threads 工作線程設計
將非IO處理放到其他的線程來加快Reactor線程
比計算綁定處理重新處理為事件驅動的形式更簡單
應該仍然是純非阻塞計算
足夠的處理勝過開銷
很難與IO重疊處理
最好能先將所有輸入讀入緩沖區(qū)
使用線程池可以進行調優(yōu)和控制
通常需要的線程數(shù)比客戶端少得多
Handler with Thread Pool 多線程處理
class Handler implements Runnable {
// 創(chuàng)建一個線程池
static PooledExecutor pool = new PooledExecutor(...);
//設置處理狀態(tài)
static final int PROCESSING = 3;
//讀數(shù)據操作,設計到多線程讀取需要加線程鎖
synchronized void read() {
//讀取數(shù)據
socket.read(input);
//讀取完成
if (inputIsComplete()) {
//標記為處理狀態(tài)
state = PROCESSING;
//將處理過程放到線程池中執(zhí)行
pool.execute(new Processer());
}
}
//處理數(shù)據線程
class Processer implements Runnable {
public void run() { processAndHandOff(); }
}
//處理數(shù)據并關閉
synchronized void processAndHandOff() {
//處理數(shù)據
process();
//標記處理完成并標記發(fā)送狀態(tài)
state = SENDING; // 或者綁定其他操作
//將SelectionKey的操作設置為寫入
sk.interest(SelectionKey.OP_WRITE);
}
}
協(xié)調任務Coordinating Tasks
Handoffs 傳遞
循環(huán)任務的啟用、觸發(fā)或調用下一個任務
通常是最快的但同時也是脆弱的
給每個處理程序觸發(fā)回調
設置狀態(tài)、附加處理程序等等
狀態(tài)模式
Queues 隊列
比如跨階段傳遞buffers
Futures
當每個任務產生結果時觸發(fā)
協(xié)調層位于連接或等待/通知之上
Using PooledExecutor 使用線程池執(zhí)行
一個可優(yōu)化的工作線程池
主方法執(zhí)行(Runnable r)
控制
任務隊列的類型(任何通道)
最大線程數(shù)
最小線程數(shù)
"Warm" 與按需加載線程
保持活動間隔,直到空閑線程死亡
如有必要,稍后將其替換為新的
飽和策略
阻塞、下降、生產運行等
Multiple Reactor Threads 多個Reactor線程
使用Reactor線程池
用于匹配CPU和IO速率
靜態(tài)或動態(tài)構造
每個Reactor都有自己的選擇器,線程,調度循環(huán)
主接收器分配到專用的Reactor
Using other java.nio features 使用其他的java.nio特性
一個Reactor對應多個Selectors
將不同的處理程序綁定到不同的IO事件
調度需要仔細處理線程安全
文件傳輸
自動化的文件到網絡或網絡到文件的復制
內存映射文件
通過緩沖區(qū)訪問文件
直接訪問緩沖區(qū)
有可能實現(xiàn)零拷貝傳輸嗎
但是有設置和完成的開銷
最適合長時間連接的應用
Connection-Based Extensions 基礎連接的擴展
不能使用單個服務請求
客戶端連接
客戶端發(fā)送一系列消息/請求
客戶端斷開連接
舉例
數(shù)據庫和事務監(jiān)控器
多人游戲,聊天等
可以擴展基本的網絡服務模式
處理許多相對長連接的客戶
跟蹤客戶端和會話狀態(tài)(包括丟棄)
分布式部署服務
原文:Doug Lea Scalable IO in Java