前言
Getty是我為了學(xué)習(xí) Java NIO 所寫的一個(gè) NIO 框架,實(shí)現(xiàn)過(guò)程中參考了 Netty 的設(shè)計(jì),同時(shí)使用 Groovy 來(lái)實(shí)現(xiàn)。雖然只是玩具,但是麻雀雖小,五臟俱全,在實(shí)現(xiàn)過(guò)程中,不僅熟悉了 NIO 的使用,還借鑒了很多 Netty 的設(shè)計(jì)思想,提升了自己的編碼和設(shè)計(jì)能力。
至于為什么用 Groovy 來(lái)寫,因?yàn)槲覄倢W(xué)了 Groovy,正好拿來(lái)練手,加上 Groovy 是兼容 Java 的,所以只是語(yǔ)法上的差別,底層實(shí)現(xiàn)還是基于 Java API的。
Getty 的核心代碼行數(shù)不超過(guò) 500 行,一方面得益于 Groovy 簡(jiǎn)潔的語(yǔ)法,另一方面是因?yàn)槲抑粚?shí)現(xiàn)了核心的邏輯,最復(fù)雜的其實(shí)是解碼器實(shí)現(xiàn)。腳手架容易搭,摩天大樓哪有那么容易蓋,但用來(lái)學(xué)習(xí) NIO 足以。
線程模型
- 有專門一個(gè) NIO 線程- Acceptor 線程用于監(jiān)聽服務(wù)端,接收客戶端的 TCP 連接請(qǐng)求,然后將連接分配給工作線程,由工作線程來(lái)監(jiān)聽讀寫事件。
- 網(wǎng)絡(luò) IO 操作-讀/寫等由多個(gè)工作線程負(fù)責(zé),由這些工作線程負(fù)責(zé)消息的讀取、解碼、編碼和發(fā)送。
- 1 個(gè)工作線程可以同時(shí)處理N條鏈路,但是 1 個(gè)鏈路只對(duì)應(yīng) 1 個(gè)工作線程,防止發(fā)生并發(fā)操作問(wèn)題。
事件驅(qū)動(dòng)模型
整個(gè)服務(wù)端的流程處理,建立于事件機(jī)制上。在 [接受連接->讀->業(yè)務(wù)處理->寫 ->關(guān)閉連接 ]這個(gè)過(guò)程中,觸發(fā)器將觸發(fā)相應(yīng)事件,由事件處理器對(duì)相應(yīng)事件分別響應(yīng),完成服務(wù)器端的業(yè)務(wù)處理。
事件定義
-
onRead:當(dāng)客戶端發(fā)來(lái)數(shù)據(jù),并已被工作線程正確讀取時(shí),觸發(fā)該事件 。該事件通知各事件處理器可以對(duì)客戶端發(fā)來(lái)的數(shù)據(jù)進(jìn)行實(shí)際處理了。 -
onWrite:當(dāng)客戶端可以開始接受服務(wù)端發(fā)送數(shù)據(jù)時(shí)觸發(fā)該事件,通過(guò)該事件,我們可以向客戶端發(fā)送響應(yīng)數(shù)據(jù)。(當(dāng)前的實(shí)現(xiàn)中并未使用寫事件) -
onClosed:當(dāng)客戶端與服務(wù)器斷開連接時(shí)觸發(fā)該事件。
事件回調(diào)機(jī)制的實(shí)現(xiàn)
在這個(gè)模型中,事件采用廣播方式,也就是所有注冊(cè)的事件處理器都能獲得事件通知。這樣可以將不同性質(zhì)的業(yè)務(wù)處理,分別用不同的處理器實(shí)現(xiàn),使每個(gè)處理器的功能盡可能單一。
如下圖:整個(gè)事件模型由監(jiān)聽器、事件適配器、事件觸發(fā)器(HandlerChain,PipeLine)、事件處理器組成。
-
ServerListener:這是一個(gè)事件接口,定義需監(jiān)聽的服務(wù)器事件interface ServerListener extends Serializable{ /** * 可讀事件回調(diào) * @param request */ void onRead(ctx) /** * 可寫事件回調(diào) * @param request * @param response */ void onWrite(ctx) /** * 連接關(guān)閉回調(diào) * @param request */ void onClosed(ctx) } -
EventAdapter:對(duì) Serverlistener 接口實(shí)現(xiàn)一個(gè)適配器 (EventAdapter),這樣的好處是最終的事件處理器可以只處理所關(guān)心的事件。class EventAdapter implements ServerListener { //下個(gè)處理器的引用 protected next void onRead(Object ctx) { } void onWrite(Object ctx) { } void onClosed(Object ctx) { } } -
Notifier:用于在適當(dāng)?shù)臅r(shí)候通過(guò)觸發(fā)服務(wù)器事件,通知在冊(cè)的事件處理器對(duì)事件做出響應(yīng)。interface Notifier extends Serializable{ /** * 觸發(fā)所有可讀事件回調(diào) */ void fireOnRead(ctx) /** * 觸發(fā)所有可寫事件回調(diào) */ void fireOnWrite(ctx) /** * 觸發(fā)所有連接關(guān)閉事件回調(diào) */ void fireOnClosed(ctx) } -
HandlerChain:實(shí)現(xiàn)了Notifier接口,維持有序的事件處理器鏈條,每次從第一個(gè)處理器開始觸發(fā)。class HandlerChain implements Notifier{ EventAdapter head EventAdapter tail /** * 添加處理器到執(zhí)行鏈的最后 * @param handler */ void addLast(handler) { if (tail != null) { tail.next = handler tail = tail.next } else { head = handler tail = head } } void fireOnRead(ctx) { head.onRead(ctx) } void fireOnWrite(ctx) { head.onWrite(ctx) } void fireOnClosed(ctx) { head.onClosed(ctx) } } -
PipeLine:實(shí)現(xiàn)了Notifier接口,作為事件總線,維持一個(gè)事件鏈的列表。class PipeLine implements Notifier{ static logger = LoggerFactory.getLogger(PipeLine.name) //監(jiān)聽器隊(duì)列 def listOfChain = [] PipeLine(){} /** * 添加監(jiān)聽器到監(jiān)聽隊(duì)列中 * @param chain */ void addChain(chain) { synchronized (listOfChain) { if (!listOfChain.contains(chain)) { listOfChain.add(chain) } } } /** * 觸發(fā)所有可讀事件回調(diào) */ void fireOnRead(ctx) { logger.debug("fireOnRead") listOfChain.each { chain -> chain.fireOnRead(ctx) } } /** * 觸發(fā)所有可寫事件回調(diào) */ void fireOnWrite(ctx) { listOfChain.each { chain -> chain.fireOnWrite(ctx) } } /** * 觸發(fā)所有連接關(guān)閉事件回調(diào) */ void fireOnClosed(ctx) { listOfChain.each { chain -> chain.fireOnClosed(ctx) } } }
事件處理流程
事件處理采用職責(zé)鏈模式,每個(gè)處理器處理完數(shù)據(jù)之后會(huì)決定是否繼續(xù)執(zhí)行下一個(gè)處理器。如果處理器不將任務(wù)交給線程池處理,那么整個(gè)處理流程都在同一個(gè)線程中處理。而且每個(gè)連接都有單獨(dú)的
PipeLine,工作線程可以在多個(gè)連接上下文切換,但是一個(gè)連接上下文只會(huì)被一個(gè)線程處理。
核心類
ConnectionCtx
連接上下文ConnectionCtx
class ConnectionCtx {
/**socket連接*/
SocketChannel channel
/**用于攜帶額外參數(shù)*/
Object attachment
/**處理當(dāng)前連接的工作線程*/
Worker worker
/**連接超時(shí)時(shí)間*/
Long timeout
/**每個(gè)連接擁有自己的pipeline*/
PipeLine pipeLine
}
NioServer
主線程負(fù)責(zé)監(jiān)聽端口,持有工作線程的引用(使用輪轉(zhuǎn)法分配連接),每次有連接到來(lái)時(shí),將連接放入工作線程的連接隊(duì)列,并喚醒線程selector.wakeup()(線程可能阻塞在selector上)。
class NioServer extends Thread {
/**服務(wù)端的套接字通道*/
ServerSocketChannel ssc
/**選擇器*/
Selector selector
/**事件總線*/
PipeLine pipeLine
/**工作線程列表*/
def workers = []
/**當(dāng)前工作線程索引*/
int index
}
Worker
工作線程,負(fù)責(zé)注冊(cè)server傳遞過(guò)來(lái)的socket連接。主要監(jiān)聽讀事件,管理socket,處理寫操作。
class Worker extends Thread {
/**選擇器*/
Selector selector
/**讀緩沖區(qū)*/
ByteBuffer buffer
/**主線程分配的連接隊(duì)列*/
def queue = []
/**存儲(chǔ)按超時(shí)時(shí)間從小到大的連接*/
TreeMap<Long, ConnectionCtx> ctxTreeMap
void run() {
while (true) {
selector.select()
//注冊(cè)主線程發(fā)送過(guò)來(lái)的連接
registerCtx()
//關(guān)閉超時(shí)的連接
closeTimeoutCtx()
//處理事件
dispatchEvent()
}
}
}
運(yùn)行一個(gè)簡(jiǎn)單的 Web 服務(wù)器
我實(shí)現(xiàn)了一系列處理HTTP請(qǐng)求的處理器,具體實(shí)現(xiàn)看代碼。
-
LineBasedDecoder:行解碼器,按行解析數(shù)據(jù) -
HttpRequestDecoder:HTTP請(qǐng)求解析,目前只支持GET請(qǐng)求 -
HttpRequestHandler:Http 請(qǐng)求處理器,目前只支持GET方法 -
HttpResponseHandler:Http響應(yīng)處理器
下面是寫在test中的例子
class WebServerTest {
static void main(args) {
def pipeLine = new PipeLine()
def readChain = new HandlerChain()
readChain.addLast(new LineBasedDecoder())
readChain.addLast(new HttpRequestDecoder())
readChain.addLast(new HttpRequestHandler())
readChain.addLast(new HttpResponseHandler())
def closeChain = new HandlerChain()
closeChain.addLast(new ClosedHandler())
pipeLine.addChain(readChain)
pipeLine.addChain(closeChain)
NioServer nioServer = new NioServer(pipeLine)
nioServer.start()
}
}
另外,還可以使用配置文件getty.properties設(shè)置程序的運(yùn)行參數(shù)。
#用于拼接消息時(shí)使用的二進(jìn)制數(shù)組的緩存區(qū)
common_buffer_size=1024
#工作線程讀取tcp數(shù)據(jù)的緩存大小
worker_rcv_buffer_size=1024
#監(jiān)聽的端口
port=4399
#工作線程的數(shù)量
worker_num=1
#連接超時(shí)自動(dòng)斷開時(shí)間
timeout=900
#根目錄
root=.
總結(jié)
Getty是我造的第二個(gè)小輪子,第一個(gè)是RedisHttpSession。都說(shuō)不要重復(fù)造輪子。這話我是認(rèn)同的,但是掌握一門技術(shù)最好的方法就是實(shí)踐,在沒(méi)有合適項(xiàng)目可以使用新技術(shù)的時(shí)候,造一個(gè)簡(jiǎn)單的輪子是不錯(cuò)的實(shí)踐手段。
Getty 的缺點(diǎn)或者說(shuō)還可以優(yōu)化的點(diǎn):
- 線程的使用直接用了
Thread類,看起來(lái)有點(diǎn)low。等以后水平提升了再來(lái)抽象一下。 - 目前只有讀事件是異步的,寫事件是同步的。未來(lái)將寫事件也改為異步的。


