【Getty】Java NIO框架設(shè)計(jì)與實(shí)現(xiàn)

前言

Getty是我為了學(xué)習(xí) Java NIO 所寫的一個(gè) NIO 框架,實(shí)現(xiàn)過(guò)程中參考了 Netty 的設(shè)計(jì),同時(shí)使用 Groovy 來(lái)實(shí)現(xiàn)。雖然只是玩具,但是麻雀雖小,五臟俱全,在實(shí)現(xiàn)過(guò)程中,不僅熟悉了 NIO 的使用,還借鑒了很多 Netty 的設(shè)計(jì)思想,提升了自己的編碼和設(shè)計(jì)能力。

至于為什么用 Groovy 來(lái)寫,因?yàn)槲覄倢W(xué)了 Groovy,正好拿來(lái)練手,加上 Groovy 是兼容 Java 的,所以只是語(yǔ)法上的差別,底層實(shí)現(xiàn)還是基于 Java API的。

Getty 的核心代碼行數(shù)不超過(guò) 500 行,一方面得益于 Groovy 簡(jiǎn)潔的語(yǔ)法,另一方面是因?yàn)槲抑粚?shí)現(xiàn)了核心的邏輯,最復(fù)雜的其實(shí)是解碼器實(shí)現(xiàn)。腳手架容易搭,摩天大樓哪有那么容易蓋,但用來(lái)學(xué)習(xí) NIO 足以。

線程模型

Getty 使用的是 Reactor 多線程模型

  1. 有專門一個(gè) NIO 線程- Acceptor 線程用于監(jiān)聽服務(wù)端,接收客戶端的 TCP 連接請(qǐng)求,然后將連接分配給工作線程,由工作線程來(lái)監(jiān)聽讀寫事件。
  2. 網(wǎng)絡(luò) IO 操作-讀/寫等由多個(gè)工作線程負(fù)責(zé),由這些工作線程負(fù)責(zé)消息的讀取、解碼、編碼和發(fā)送。
  3. 1 個(gè)工作線程可以同時(shí)處理N條鏈路,但是 1 個(gè)鏈路只對(duì)應(yīng) 1 個(gè)工作線程,防止發(fā)生并發(fā)操作問(wèn)題。

事件驅(qū)動(dòng)模型

整個(gè)服務(wù)端的流程處理,建立于事件機(jī)制上。在 [接受連接->讀->業(yè)務(wù)處理->寫 ->關(guān)閉連接 ]這個(gè)過(guò)程中,觸發(fā)器將觸發(fā)相應(yīng)事件,由事件處理器對(duì)相應(yīng)事件分別響應(yīng),完成服務(wù)器端的業(yè)務(wù)處理。

事件定義

  1. onRead:當(dāng)客戶端發(fā)來(lái)數(shù)據(jù),并已被工作線程正確讀取時(shí),觸發(fā)該事件 。該事件通知各事件處理器可以對(duì)客戶端發(fā)來(lái)的數(shù)據(jù)進(jìn)行實(shí)際處理了。
  2. onWrite:當(dāng)客戶端可以開始接受服務(wù)端發(fā)送數(shù)據(jù)時(shí)觸發(fā)該事件,通過(guò)該事件,我們可以向客戶端發(fā)送響應(yīng)數(shù)據(jù)。(當(dāng)前的實(shí)現(xiàn)中并未使用寫事件)
  3. onClosed:當(dāng)客戶端與服務(wù)器斷開連接時(shí)觸發(fā)該事件。

事件回調(diào)機(jī)制的實(shí)現(xiàn)

在這個(gè)模型中,事件采用廣播方式,也就是所有注冊(cè)的事件處理器都能獲得事件通知。這樣可以將不同性質(zhì)的業(yè)務(wù)處理,分別用不同的處理器實(shí)現(xiàn),使每個(gè)處理器的功能盡可能單一。

如下圖:整個(gè)事件模型由監(jiān)聽器、事件適配器、事件觸發(fā)器(HandlerChain,PipeLine)、事件處理器組成。

  • ServerListener:這是一個(gè)事件接口,定義需監(jiān)聽的服務(wù)器事件

    interface ServerListener extends Serializable{
        /**
         * 可讀事件回調(diào)
         * @param request
         */
        void onRead(ctx)
        /**
         * 可寫事件回調(diào)
         * @param request
         * @param response
         */
        void onWrite(ctx)
        /**
         * 連接關(guān)閉回調(diào)
         * @param request
         */
        void onClosed(ctx)
    }
    
    
  • EventAdapter:對(duì) Serverlistener 接口實(shí)現(xiàn)一個(gè)適配器 (EventAdapter),這樣的好處是最終的事件處理器可以只處理所關(guān)心的事件。

    class EventAdapter implements ServerListener {
        //下個(gè)處理器的引用
        protected next
        void onRead(Object ctx) {
        }
        void onWrite(Object ctx) {
        }
        void onClosed(Object ctx) {
        }
    }
    
    
  • Notifier:用于在適當(dāng)?shù)臅r(shí)候通過(guò)觸發(fā)服務(wù)器事件,通知在冊(cè)的事件處理器對(duì)事件做出響應(yīng)。

    interface Notifier extends Serializable{
        /**
         * 觸發(fā)所有可讀事件回調(diào)
         */
        void fireOnRead(ctx)
        /**
         * 觸發(fā)所有可寫事件回調(diào)
         */
        void fireOnWrite(ctx)
        /**
         * 觸發(fā)所有連接關(guān)閉事件回調(diào)
         */
        void fireOnClosed(ctx)
    }
    
    
  • HandlerChain:實(shí)現(xiàn)了Notifier接口,維持有序的事件處理器鏈條,每次從第一個(gè)處理器開始觸發(fā)。

    class HandlerChain implements Notifier{
        EventAdapter head
        EventAdapter tail
        /**
         * 添加處理器到執(zhí)行鏈的最后
         * @param handler
         */
        void addLast(handler) {
            if (tail != null) {
                tail.next = handler
                tail = tail.next
            } else {
                head = handler
                tail = head
            }
        }
        void fireOnRead(ctx) {
            head.onRead(ctx)
        }
        void fireOnWrite(ctx) {
            head.onWrite(ctx)
        }
        void fireOnClosed(ctx) {
            head.onClosed(ctx)
        }
    }
    
    
  • PipeLine:實(shí)現(xiàn)了Notifier接口,作為事件總線,維持一個(gè)事件鏈的列表。

    class PipeLine implements Notifier{
        static logger = LoggerFactory.getLogger(PipeLine.name)
        //監(jiān)聽器隊(duì)列
        def listOfChain = []
        PipeLine(){}
        /**
         * 添加監(jiān)聽器到監(jiān)聽隊(duì)列中
         * @param chain
         */
        void addChain(chain) {
            synchronized (listOfChain) {
                if (!listOfChain.contains(chain)) {
                    listOfChain.add(chain)
                }
            }
        }
        /**
         * 觸發(fā)所有可讀事件回調(diào)
         */
        void fireOnRead(ctx) {
            logger.debug("fireOnRead")
            listOfChain.each { chain ->
                chain.fireOnRead(ctx)
            }
        }
        /**
         * 觸發(fā)所有可寫事件回調(diào)
         */
        void fireOnWrite(ctx) {
            listOfChain.each { chain ->
                chain.fireOnWrite(ctx)
            }
        }
        /**
         * 觸發(fā)所有連接關(guān)閉事件回調(diào)
         */
        void fireOnClosed(ctx) {
            listOfChain.each { chain ->
                chain.fireOnClosed(ctx)
            }
        }
    }
    
    

事件處理流程


事件處理采用職責(zé)鏈模式,每個(gè)處理器處理完數(shù)據(jù)之后會(huì)決定是否繼續(xù)執(zhí)行下一個(gè)處理器。如果處理器不將任務(wù)交給線程池處理,那么整個(gè)處理流程都在同一個(gè)線程中處理。而且每個(gè)連接都有單獨(dú)的PipeLine,工作線程可以在多個(gè)連接上下文切換,但是一個(gè)連接上下文只會(huì)被一個(gè)線程處理。

核心類

ConnectionCtx

連接上下文ConnectionCtx

class ConnectionCtx {
    /**socket連接*/
    SocketChannel channel
    /**用于攜帶額外參數(shù)*/
    Object attachment
    /**處理當(dāng)前連接的工作線程*/
    Worker worker
    /**連接超時(shí)時(shí)間*/
    Long timeout
    /**每個(gè)連接擁有自己的pipeline*/
    PipeLine pipeLine
}

NioServer

主線程負(fù)責(zé)監(jiān)聽端口,持有工作線程的引用(使用輪轉(zhuǎn)法分配連接),每次有連接到來(lái)時(shí),將連接放入工作線程的連接隊(duì)列,并喚醒線程selector.wakeup()(線程可能阻塞在selector上)。

class NioServer extends Thread {
    /**服務(wù)端的套接字通道*/
    ServerSocketChannel ssc
    /**選擇器*/
    Selector selector
    /**事件總線*/
    PipeLine pipeLine
    /**工作線程列表*/
    def workers = []
    /**當(dāng)前工作線程索引*/
    int index
}

Worker

工作線程,負(fù)責(zé)注冊(cè)server傳遞過(guò)來(lái)的socket連接。主要監(jiān)聽讀事件,管理socket,處理寫操作。

class Worker extends Thread {
    /**選擇器*/
    Selector selector
    /**讀緩沖區(qū)*/
    ByteBuffer buffer
    /**主線程分配的連接隊(duì)列*/
    def queue = []
    /**存儲(chǔ)按超時(shí)時(shí)間從小到大的連接*/
    TreeMap<Long, ConnectionCtx> ctxTreeMap

    void run() {
        while (true) {
            selector.select()
            //注冊(cè)主線程發(fā)送過(guò)來(lái)的連接
            registerCtx()
            //關(guān)閉超時(shí)的連接
            closeTimeoutCtx()
            //處理事件
            dispatchEvent()
        }
    }
}

運(yùn)行一個(gè)簡(jiǎn)單的 Web 服務(wù)器

我實(shí)現(xiàn)了一系列處理HTTP請(qǐng)求的處理器,具體實(shí)現(xiàn)看代碼。

  • LineBasedDecoder:行解碼器,按行解析數(shù)據(jù)
  • HttpRequestDecoder:HTTP請(qǐng)求解析,目前只支持GET請(qǐng)求
  • HttpRequestHandler:Http 請(qǐng)求處理器,目前只支持GET方法
  • HttpResponseHandler:Http響應(yīng)處理器

下面是寫在test中的例子

class WebServerTest {
    static void main(args) {
        def pipeLine = new PipeLine()

        def readChain = new HandlerChain()
        readChain.addLast(new LineBasedDecoder())
        readChain.addLast(new HttpRequestDecoder())
        readChain.addLast(new HttpRequestHandler())
        readChain.addLast(new HttpResponseHandler())

        def closeChain = new HandlerChain()
        closeChain.addLast(new ClosedHandler())

        pipeLine.addChain(readChain)
        pipeLine.addChain(closeChain)

        NioServer nioServer = new NioServer(pipeLine)
        nioServer.start()
    }
}

另外,還可以使用配置文件getty.properties設(shè)置程序的運(yùn)行參數(shù)。

#用于拼接消息時(shí)使用的二進(jìn)制數(shù)組的緩存區(qū)
common_buffer_size=1024
#工作線程讀取tcp數(shù)據(jù)的緩存大小
worker_rcv_buffer_size=1024
#監(jiān)聽的端口
port=4399
#工作線程的數(shù)量
worker_num=1
#連接超時(shí)自動(dòng)斷開時(shí)間
timeout=900
#根目錄
root=.

總結(jié)

Getty是我造的第二個(gè)小輪子,第一個(gè)是RedisHttpSession。都說(shuō)不要重復(fù)造輪子。這話我是認(rèn)同的,但是掌握一門技術(shù)最好的方法就是實(shí)踐,在沒(méi)有合適項(xiàng)目可以使用新技術(shù)的時(shí)候,造一個(gè)簡(jiǎn)單的輪子是不錯(cuò)的實(shí)踐手段。

Getty 的缺點(diǎn)或者說(shuō)還可以優(yōu)化的點(diǎn):

  1. 線程的使用直接用了Thread類,看起來(lái)有點(diǎn)low。等以后水平提升了再來(lái)抽象一下。
  2. 目前只有讀事件是異步的,寫事件是同步的。未來(lái)將寫事件也改為異步的。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 從三月份找實(shí)習(xí)到現(xiàn)在,面了一些公司,掛了不少,但最終還是拿到小米、百度、阿里、京東、新浪、CVTE、樂(lè)視家的研發(fā)崗...
    時(shí)芥藍(lán)閱讀 42,818評(píng)論 11 349
  • iOS面試題目100道 1.線程和進(jìn)程的區(qū)別。 進(jìn)程是系統(tǒng)進(jìn)行資源分配和調(diào)度的一個(gè)獨(dú)立單位,線程是進(jìn)程的一個(gè)實(shí)體,...
    有度YouDo閱讀 30,135評(píng)論 8 137
  • Swift1> Swift和OC的區(qū)別1.1> Swift沒(méi)有地址/指針的概念1.2> 泛型1.3> 類型嚴(yán)謹(jǐn) 對(duì)...
    cosWriter閱讀 11,674評(píng)論 1 32
  • 01 阿飛是我之前的同事,因?yàn)樗Z(yǔ)速快,說(shuō)話直,被大家稱為“飛刀嘴”。她喜歡跟大伙兒開玩笑,有時(shí)冷不丁蹦出的一句話...
    愿你與美好不期而遇閱讀 2,035評(píng)論 2 0
  • 昨天沒(méi)來(lái)得及更新,又使用了一張復(fù)活卡,damn it! 昨天單位搞活動(dòng)啊,中午先吃了一頓烤肉,下午去按摩店做按摩。...
    pink922閱讀 208評(píng)論 0 1

友情鏈接更多精彩內(nèi)容