Scalable IO in Java

經典的網絡編程

一般網絡編程都具有以下幾個步驟:

  • 讀取請求 Read request
  • 解碼請求 Decode request
  • 處理服務 Process services
  • 加密回應 Encode reply
  • 發(fā)送回應 Send reply

但是每一步的處理的內容和成本都不一樣。xml、json、file等等

image.png

每種類型的處理程序都需要在各自都線程中來進行,用代碼表示就是如下

class Server implements Runnable {
    public void run() {
        try {
            ServerSocket ss = new ServerSocket(PORT);
            while (!Thread.interrupted())
                new Thread(new Handler(ss.accept())).start();
            // or, single-threaded, or a thread pool
        } catch (IOException ex) { /* ... */ }
    }

    static class Handler implements Runnable {
        final Socket socket;
        Handler(Socket s) { socket = s; }
        public void run() {
            try {
                byte[] input = new byte[MAX_INPUT];
                socket.getInputStream().read(input);
                byte[] output = process(input);
                socket.getOutputStream().write(output);
            } catch (IOException ex) { /* ... */ }
        }
        private byte[] process(byte[] cmd) { /* ... */ }
    }
}

如果當前運行線程沒有被中斷就一直循環(huán)創(chuàng)建一個線程或者線程池用來處理ServerSocket里面的Socket請求。

注意:Thread.interrupted()和Thread.isInterrupted()

這樣會造成我們需要為每一個socket請求創(chuàng)建一個線程來處理對應的數(shù)據。一旦用戶過多或者處理程序時間較長就會造成各種各樣的問題。無法并發(fā),前面的活沒干完后面的需要等著,負載等等

優(yōu)化方向

  • 增加負載

  • 增加硬件 (CPU, memory, disk, bandwidth)

  • 同時滿足可用性和性能目標

  • 減短延遲

  • 滿足高峰需求

  • 提高服務質量

  • 通常來說Divide-and-conquer(分而治之)是實現(xiàn)任何可擴展性目標的最佳方法

Divide-and-conquer(分而治之)

  • 將整體任務切割成小任務。每個小任務只執(zhí)行單一任務,并且不會阻塞其他小任務的運行

  • 用IO事件來觸發(fā)每個小任務的啟動

  • java.nio 中支持的基本機制

  • 非阻塞讀取和寫入

  • 用感測到的IO事件來調度相關的任務

  • 事件驅動設計中可能出現(xiàn)的無盡變化

Event-driven Designs 事件驅動設計

  • 比較有效的方法

  • 占用更少的資源,每個客戶端不一定需要單獨創(chuàng)建一個線程

  • 減少開銷。減少Context的切換可以相應的減少鎖定

  • 調度可能會更慢,所以必須手動將動作綁定到事件

  • 更難的編程

  • 將動作分解為簡單非阻塞的

  • 類似于GUI事件驅動的動作

  • 無法消除所有阻塞。比如:GC,頁面錯誤等

  • 必須跟蹤服務的邏輯狀態(tài)

AWT事件機制

IO事件驅動使用相似的想法,但設計不同

image.png

java.awt是一個軟件包,包含用于創(chuàng)建用戶界面和繪制圖形圖像的所有分類

Reactor Pattern(反應堆模式)

  • Reactor通過調度來響應IO事件。如:AWT thread
  • Handler執(zhí)行非阻塞動作。如:AWT ActionListeners
  • Manager將Handler綁定到事件上。如:AWT addActionListener

預先使用Manager將Handler綁定到指定的事件上,如onClick

用戶點擊按鈕的時候,Reactor獲取到事件,并調度事先綁定好的處理程序

經典的Reactor設計

單線程版本

image.png

java.nio 支持

  • Channels

  • 支持非阻塞的讀取文件和socket連接

  • Buffers

  • Channels通過Buffers可以直接讀取或者寫入對象

  • Selectors

  • 通知一組Channels觸發(fā)了哪些IO事件

  • SelectionKeys

  • 維護IO事件的狀態(tài)和綁定

Reactor 實現(xiàn)

Setup

class Reactor implements Runnable {
        //Selector選擇器
        final Selector selector;
        //Socket服務通道
        final ServerSocketChannel serverSocket;

        Reactor(int port) throws IOException {
            //創(chuàng)建一個Selector
            selector = Selector.open();
            //創(chuàng)建一個Socket Channel
            serverSocket = ServerSocketChannel.open();
            //將Socket Channel綁定到指定端口
            serverSocket.socket().bind(
                    new InetSocketAddress(port));
            //設置Socket Channel為非阻塞
            serverSocket.configureBlocking(false);
            //將Selector和Socket Channel注冊到SelectionKey
            SelectionKey sk =
                    serverSocket.register(selector, SelectionKey.OP_ACCEPT);
            //將SelectionKey附加到接受者
            sk.attach(new Acceptor());
        }

        /*
        也可以使用SPI提供接口:
        SelectorProvider p = SelectorProvider.provider();
        selector = p.openSelector();
        serverSocket = p.openServerSocketChannel();
         */
    }

Dispatch Loop

// class Reactor continued
        public void run() { //通常在新線程中執(zhí)行
            try {
                //如果當前線程沒有中斷就循環(huán)執(zhí)行
                while (!Thread.interrupted()) {
                    //查詢選擇器中獲取已經準備好的并且注冊過的操作
                    selector.select();
                    //獲取所有已經準備好的并且注冊過的操作
                    Set selected = selector.selectedKeys();
                    //循環(huán)遍歷
                    for (Object o : selected) {
                        //調度任務并處理事件操作
                        dispatch((SelectionKey) o);
                    }
                    //移除選擇器
                    selected.clear();
                }
            } catch (IOException ex) { /* ... */ }
        }

        //處理事件操作
        void dispatch(SelectionKey k) {
            //獲取SelectionKey中綁定的處理程序,如果不為空就執(zhí)行
            Runnable r = (Runnable) (k.attachment());
            if (r != null)
                r.run();
        }

Acceptor

// class Reactor continued
        // 創(chuàng)建接收器
        class Acceptor implements Runnable {
            public void run() {
                try {
                    //獲取連接成功到客戶端連接
                    SocketChannel c = serverSocket.accept();
                    if (c != null) {
                        //如果不為空就處理客戶端連接以及selector
                        new Handler(selector, c);
                    }
                } catch (IOException ex) { /* ... */ }
            }
        }

Handler setup

//處理程序
        final class Handler implements Runnable {
            //指定最大輸入bytes
            private static final int MAXIN = 1024;
            //指定最大輸出bytes
            private static final int MAXOUT = 1024;
            //客戶端連接
            final SocketChannel socket;
            final SelectionKey sk;
            ByteBuffer input = ByteBuffer.allocate(MAXIN);
            ByteBuffer output = ByteBuffer.allocate(MAXOUT);
            static final int READING = 0, SENDING = 1;
            int state = READING;

            Handler(Selector sel, SocketChannel c) throws IOException {
                socket = c;
                //配置非阻塞模式
                c.configureBlocking(false);
                //將客戶端連接和讀注冊到SelectionKey
                sk = socket.register(sel, SelectionKey.OP_READ);
                //將SelectionKey附加到當前線程的run
                sk.attach(this);
                //將SelectionKey的操作設置為讀取
                sk.interestOps(SelectionKey.OP_READ);
                //喚醒Selector
                sel.wakeup();
            }
        }

Request handling

// class Handler continued

            //輸入處理完成
            boolean inputIsComplete() { /* ... */
                return true;
            }

            //輸出處理完成
            boolean outputIsComplete() { /* ... */
                return true;
            }

            //處理過程中
            void process() { /* ... */ }

            public void run() {
                try {
                    //根據不同的狀態(tài)進行不同的處理程序
                    if (state == READING) read();
                    else if (state == SENDING) send();
                } catch (IOException ex) { /* ... */ }
            }

            //讀取數(shù)據
            void read() throws IOException {
                //從客戶端獲取數(shù)據
                socket.read(input);
                //如果讀取完成
                if (inputIsComplete()) {
                    //處理數(shù)據
                    process();
                    //標記為發(fā)送狀態(tài)
                    state = SENDING;
                    // 將SelectionKey的操作設置為寫入
                    sk.interestOps(SelectionKey.OP_WRITE);
                }
            }

            //發(fā)送數(shù)據
            void send() throws IOException {
                //將數(shù)據寫入客戶端連接
                socket.write(output);
                //發(fā)送完成后將SelectionKey中的綁定取消
                if (outputIsComplete()) sk.cancel();
            }
        }

Per-State Handlers

GoF State-Object pattern 狀態(tài)模式,針對狀態(tài)重新綁定對應的處理程序

//處理程序
        class Handler {
            // 初始化為讀取狀態(tài)
            public void run() { 
                //客戶端讀取數(shù)據
                socket.read(input);
                //讀取完成
                if (inputIsComplete()) {
                    //處理數(shù)據
                    process();
                    //附加新的處理程序Sender
                    sk.attach(new Sender());
                    //標記狀態(tài)為寫入
                    sk.interest(SelectionKey.OP_WRITE);
                    //喚醒SelectionKey中綁定的Selector
                    sk.selector().wakeup();
                }
            }
            
            //處理程序Sender
            class Sender implements Runnable {
                public void run(){ // ...
                    //寫入數(shù)據
                    socket.write(output);
                    //寫入完成之后將SelectionKey中的綁定取消
                    if (outputIsComplete()) sk.cancel();
                }
            }
        }

Multithreaded Designs 多線程設計

  • 戰(zhàn)略性的為擴展性增加線程

  • 主要適用于多處理器

  • 工作線程

  • Reactor可以快速的觸發(fā)處理程序

  • 因為處理程序過多或者處理時間過程會減慢Reactor的速度

  • 將非IO處理放到其他的線程

  • 多個Reactor處理線程

  • Reactor線程任務過多會導致IO飽和

  • 分配一些任務給其他Reactor線程

  • 負載均衡以匹配CPU和IO速率

Worker Threads 工作線程設計

  • 將非IO處理放到其他的線程來加快Reactor線程

  • 比計算綁定處理重新處理為事件驅動的形式更簡單

  • 應該仍然是純非阻塞計算

  • 足夠的處理勝過開銷

  • 很難與IO重疊處理

  • 最好能先將所有輸入讀入緩沖區(qū)

  • 使用線程池可以進行調優(yōu)和控制

  • 通常需要的線程數(shù)比客戶端少得多

image.png

Handler with Thread Pool 多線程處理

class Handler implements Runnable {
            // 創(chuàng)建一個線程池 
            static PooledExecutor pool = new PooledExecutor(...);
            //設置處理狀態(tài)
            static final int PROCESSING = 3;
            //讀數(shù)據操作,設計到多線程讀取需要加線程鎖
            synchronized void read() { 
                //讀取數(shù)據
                socket.read(input);
                //讀取完成
                if (inputIsComplete()) {
                    //標記為處理狀態(tài)
                    state = PROCESSING;
                    //將處理過程放到線程池中執(zhí)行
                    pool.execute(new Processer());
                }
            }

            //處理數(shù)據線程
            class Processer implements Runnable {
                public void run() { processAndHandOff(); }
            }
            
            //處理數(shù)據并關閉
            synchronized void processAndHandOff() {
                //處理數(shù)據
                process();
                //標記處理完成并標記發(fā)送狀態(tài)
                state = SENDING; // 或者綁定其他操作
                //將SelectionKey的操作設置為寫入
                sk.interest(SelectionKey.OP_WRITE);
            }
        }

協(xié)調任務Coordinating Tasks

  • Handoffs 傳遞

  • 循環(huán)任務的啟用、觸發(fā)或調用下一個任務

  • 通常是最快的但同時也是脆弱的

  • 給每個處理程序觸發(fā)回調

  • 設置狀態(tài)、附加處理程序等等

  • 狀態(tài)模式

  • Queues 隊列

  • 比如跨階段傳遞buffers

  • Futures

  • 當每個任務產生結果時觸發(fā)

  • 協(xié)調層位于連接或等待/通知之上

Using PooledExecutor 使用線程池執(zhí)行

  • 一個可優(yōu)化的工作線程池

  • 主方法執(zhí)行(Runnable r)

  • 控制

  • 任務隊列的類型(任何通道)

  • 最大線程數(shù)

  • 最小線程數(shù)

  • "Warm" 與按需加載線程

  • 保持活動間隔,直到空閑線程死亡

  • 如有必要,稍后將其替換為新的

  • 飽和策略

  • 阻塞、下降、生產運行等

Multiple Reactor Threads 多個Reactor線程

  • 使用Reactor線程池

  • 用于匹配CPU和IO速率

  • 靜態(tài)或動態(tài)構造

  • 每個Reactor都有自己的選擇器,線程,調度循環(huán)

  • 主接收器分配到專用的Reactor

image.png

Using other java.nio features 使用其他的java.nio特性

  • 一個Reactor對應多個Selectors

  • 將不同的處理程序綁定到不同的IO事件

  • 調度需要仔細處理線程安全

  • 文件傳輸

  • 自動化的文件到網絡或網絡到文件的復制

  • 內存映射文件

  • 通過緩沖區(qū)訪問文件

  • 直接訪問緩沖區(qū)

  • 有可能實現(xiàn)零拷貝傳輸嗎

  • 但是有設置和完成的開銷

  • 最適合長時間連接的應用

Connection-Based Extensions 基礎連接的擴展

  • 不能使用單個服務請求

  • 客戶端連接

  • 客戶端發(fā)送一系列消息/請求

  • 客戶端斷開連接

  • 舉例

  • 數(shù)據庫和事務監(jiān)控器

  • 多人游戲,聊天等

  • 可以擴展基本的網絡服務模式

  • 處理許多相對長連接的客戶

  • 跟蹤客戶端和會話狀態(tài)(包括丟棄)

  • 分布式部署服務

原文:Doug Lea Scalable IO in Java

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容