Reactor反應(yīng)器模式是高性能網(wǎng)絡(luò)編程在設(shè)計(jì)和架構(gòu)層面的基礎(chǔ)模式。為什么呢?只有徹底了解反應(yīng)器的原理,才能真正構(gòu)建好高性能的網(wǎng)絡(luò)應(yīng)用,才能輕松地學(xué)習(xí)和掌握Netty框架。同時(shí),反應(yīng)器模式也是BAT級別大公司必不可少的面試題。
4.1 Reactor 反應(yīng)器模式為何如此重要
在詳細(xì)介紹什么是Reactor反應(yīng)器模式之前,首先說明一下它的重要性。
到目前為止,高性能網(wǎng)絡(luò)編程都繞不開反應(yīng)器模式。很多著名的服務(wù)器軟件或者中間件都是基于反應(yīng)器模式實(shí)現(xiàn)的。
比如說,“ 全宇宙最有名的、最高性能”的Web服務(wù)器Nginx,就是基于反應(yīng)器模式的;如雷貫耳的Redis,作為最高性能的緩存服務(wù)器之一, 也是基于反應(yīng)器模式的;目前火得“一塌糊涂”、在開源項(xiàng)目中應(yīng)用極為廣泛的高性能通信中間件Netty,更是基于反應(yīng)器模式的。
從開發(fā)的角度來說,如果要完成和勝任高性能的服務(wù)器開發(fā),反應(yīng)器模式是必須學(xué)會和掌握的。從學(xué)習(xí)的角度來說,反應(yīng)器模式相當(dāng)于高性能、高并發(fā)的一項(xiàng)非常重要的基礎(chǔ)知識,只有掌握了它,才能真正掌握Nginx、Redis、 Netty 等這些大名鼎鼎的中間件技術(shù)。正因?yàn)槿绱?,在大的互?lián)網(wǎng)公司如阿里、騰訊、京東的面試過程中,反應(yīng)器模式相關(guān)的問題是經(jīng)常出現(xiàn)的面試問題。
總之,反應(yīng)器模式是高性能網(wǎng)絡(luò)編程的必知、必會的模式。
4.1.1 為什么首先學(xué)習(xí)Reactor反應(yīng)器模式
本書的目標(biāo),是學(xué)習(xí)基于Netty的開發(fā)高性能通信服務(wù)器。為什么在學(xué)習(xí)Netty之前,首先要學(xué)習(xí)Reactor反應(yīng)器模式呢?
寫多了代碼的程序員都知道,Java程序不是按照順序執(zhí)行的邏輯來組織的。代碼中所用到的設(shè)計(jì)模式,在一定程度上已經(jīng)演變成了代碼的組織方式。越是高水平的Java代碼,抽象的層次越高,到處都是高度抽象和面向接口的調(diào)用,大量用到繼承、多態(tài)的設(shè)計(jì)模式。
在閱讀別人的源代碼時(shí),如果不了解代碼所使用的設(shè)計(jì)模式,往往會暈頭轉(zhuǎn)向,不知身在何處,很難讀懂別人的代碼,對代碼跟蹤很成問題。反過來,如果先了解代碼的設(shè)計(jì)模式,再去看代碼,就會閱讀得很輕松,不會那么難懂了。
當(dāng)然,在寫代碼時(shí),不了解設(shè)計(jì)模式,也很難寫出高水平的Java代碼。
本書的重要使命之一,就是幫助大家學(xué)習(xí)和掌握Netty。Netty本身很抽象,大量應(yīng)用了設(shè)計(jì)模式。學(xué)習(xí)像Netty這樣的“精品中的精品”,肯定也是需要先從設(shè)計(jì)模式入手的。而Netty的整體架構(gòu),就是基于這個著名反應(yīng)器模式。
總之,反應(yīng)器模式非常重要。首先學(xué)習(xí)和掌握反應(yīng)器模式,對于學(xué)習(xí)Netty的人來說,一定是磨刀不誤砍柴工。
4.1.2 Reactor反應(yīng)器模式簡介
什么是Reactor反應(yīng)器模式呢?本文站在巨人的肩膀上,引用一下Doug Lea(那是一位讓人無限景仰的大師,Java中Concurrent并發(fā)包的重要作者之一)在文章《Scalable IO in Java》中對反應(yīng)器模式的定義,具體如下:
反應(yīng)器模式由Reactor反應(yīng)器線程、Handlers處理器兩大角色組成:
- Reactor反應(yīng)器線程的職責(zé):負(fù)責(zé)響應(yīng)IO事件,并且分發(fā)到Handlers處理器。
- Handlers處理器的職責(zé):非阻塞的執(zhí)行業(yè)務(wù)處理邏輯。
在這里,為了方便大家學(xué)習(xí),將Doug Lea著名的文章《Scalable IO in Java》的鏈接地址貼出來:http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf,建議大家去閱讀一下,提升自己的基礎(chǔ)知識,開闊下眼界。
從上面的反應(yīng)器模式定義,看不出這種模式有什么神奇的地方。當(dāng)然,從簡單到復(fù)雜,反應(yīng)器模式也有很多版本。根據(jù)前面的定義,僅僅是最為簡單的一個版本。
如果需要徹底了解反應(yīng)器模式,還得從最原始的OIO編程開始講起。
4.1.3 多線程OIO的致命缺陷
在Java的OIO編程中,最初和最原始的網(wǎng)絡(luò)服務(wù)器程序,是用一個while循環(huán),不斷地監(jiān)聽端口是否有新的連接。如果有,那么就調(diào)用一個處理函數(shù)來處理,示例代碼如下:
while(true){
socket = accept(); //阻塞,接收連接
handle(socket) ; //讀取數(shù)據(jù)、業(yè)務(wù)處理、寫入結(jié)果
}
這種方法的最大問題是:如果前一個網(wǎng)絡(luò)連接的handle(socket)沒有處理完,那么后面的連接請求沒法被接收,于是后面的請求通通會被阻塞住,服務(wù)器的吞吐量就太低了。對于服務(wù)器來說,這是一個嚴(yán)重的問題。
為了解決這個嚴(yán)重的連接阻塞問題,出現(xiàn)了一個極為經(jīng)典模式:Connection Per Thread(一個線程處理一個連接)模式。示例代碼如下:
//...省略: 導(dǎo)入的Java類
class ConnectionPerThread implements Runnable {
public void run() {
try {
//服務(wù)器監(jiān)聽socket
ServerSocketserverSocket =
new ServerSocket(NioDemoConfig.SOCKET_SERVER_PORT);
while (!Thread.interrupted()) {
Socket socket = serverSocket.accept();
//接收一個連接后,為socket連接,新建一個專屬的處理器對象
Handler handler = new Handler(socket);
//創(chuàng)建新線程,專門負(fù)責(zé)一個連接的處理
new Thread(handler).start();
}
} catch (IOException ex) { /* 處理異常 */ }
}
//處理器對象
static class Handler implements Runnable {
final Socket socket;
Handler(Socket s) {
socket = s;
}
public void run() {
while (true) {
try {
byte[] input = new byte[NioDemoConfig.SERVER_BUFFER_SIZE];
/* 讀取數(shù)據(jù) */
socket.getInputStream().read(input);
/* 處理業(yè)務(wù)邏輯,獲取處理結(jié)果*/
byte[] output =null;
/* 寫入結(jié)果 */
socket.getOutputStream().write(output);
} catch (IOException ex) { /*處理異常*/ }
}
}
}
}
對于每一個新的網(wǎng)絡(luò)連接都分配給一個線程。每個線程都獨(dú)自處理自己負(fù)責(zé)的輸入和輸出。當(dāng)然,服務(wù)器的監(jiān)聽線程也是獨(dú)立的,任何的socket連接的輸入和輸出處理,不會阻塞到后面新socket連接的監(jiān)聽和建立。早期版本的Tomcat服務(wù)器,就是這樣實(shí)現(xiàn)的。
Connection Per Thread模式(一個線程處理一個連接)的優(yōu)點(diǎn)是:解決了前面的新連接被嚴(yán)重阻塞的問題,在一定程度上,極大地提高了服務(wù)器的吞吐量。
這里有個問題:如果一個線程同時(shí)負(fù)責(zé)處理多個socket連接的輸入和輸入,行不行呢?
看上去,沒有什么不可以。但是,實(shí)際上沒有用。為什么?傳統(tǒng)OIO編程中每一個socket的IO讀寫操作,都是阻塞的。在同一時(shí)刻,一個線程里只能處理一個socket,前一個socket被阻塞了,后面連接的IO操作是無法被并發(fā)執(zhí)行的。所以,不論怎么處理,OIO中一個線程也只能是處理一個連接的IO操作。
Connection Per Thread模式的缺點(diǎn)是:對應(yīng)于大量的連接,需要耗費(fèi)大量的線程資源,對線程資源要求太高。在系統(tǒng)中,線程是比較昂貴的系統(tǒng)資源。如果線程數(shù)太多,系統(tǒng)無法承受。而且,線程的反復(fù)創(chuàng)建、銷毀、線程的切換也需要代價(jià)。因此,在高并發(fā)的應(yīng)用場景下,多線程OIO的缺陷是致命的。
如何解決Connection Per Thread模式的巨大缺陷呢?一個有效路徑是:使用Reactor反應(yīng)器模式。用反應(yīng)器模式對線程的數(shù)量進(jìn)行控制,做到一個線程處理大量的連接。它是如何做到呢?首先來看簡單的版本——單線程的Reactor反應(yīng)器模式。
4.2 單線程Reactor反應(yīng)器模式
總體來說,Reactor反應(yīng)器模式有點(diǎn)兒類似事件驅(qū)動模式。
在事件驅(qū)動模式中,當(dāng)有事件觸發(fā)時(shí),事件源會將事件dispatch分發(fā)到handler處理器進(jìn)行事件處理。反應(yīng)器模式中的反應(yīng)器角色,類似于事件驅(qū)動模式中的dispatcher事件分發(fā)器角色。
前面已經(jīng)提到,在反應(yīng)器模式中,有Reactor反應(yīng)器和Handler處理器兩個重要的組件:
- Reactor反應(yīng)器:負(fù)責(zé)查詢IO事件,當(dāng)檢測到一個IO事件,將其發(fā)送給相應(yīng)的Handler處理器去處理。這里的IO事件,就是NIO中選擇器監(jiān)控的通道IO事件。
- Handler處理器:與IO事件(或者選擇鍵)綁定,負(fù)責(zé)IO事件的處理。完成真正的連接建立、通道的讀取、處理業(yè)務(wù)邏輯、負(fù)責(zé)將結(jié)果寫出到通道等。
4.2.1 什么是單線程Reactor反應(yīng)器
什么是單線程版本的Reactor反應(yīng)器模式呢?簡單地說,Reactor反應(yīng)器和Handers處理器處于一個線程中執(zhí)行。它是最簡單的反應(yīng)器模型,如圖4-1所示。

基于Java NIO,如何實(shí)現(xiàn)簡單的單線程版本的反應(yīng)器模式呢?需要用到SelectionKey選擇鍵的幾個重要的成員方法:
方法一:void attach(Object o)
此方法可以將任何的Java POJO對象,作為附件添加到SelectionKey實(shí)例,相當(dāng)于附件屬性的setter方法。這方法非常重要,因?yàn)樵趩尉€程版本的反應(yīng)器模式中,需要將Handler處理器實(shí)例,作為附件添加到SelectionKey實(shí)例。
方法二:Object attachment()
此方法的作用是取出之前通過attach(Object o)添加到SelectionKey選擇鍵實(shí)例的附件,相當(dāng)于附件屬性的getter方法,與attach(Object o)配套使用。
這個方法同樣非常重要,當(dāng)IO事件發(fā)生,選擇鍵被select方法選到,可以直接將事件的附件取出,也就是之前綁定的Handler處理器實(shí)例,通過該Handler,完成相應(yīng)的處理。
總之,在反應(yīng)器模式中,需要進(jìn)行attach和attachment結(jié)合使用:在選擇鍵注冊完成之后,調(diào)用attach方法,將Handler處理器綁定到選擇鍵;當(dāng)事件發(fā)生時(shí),調(diào)用attachment方法,可以從選擇鍵取出Handler處理器,將事件分發(fā)到Handler處理器中,完成業(yè)務(wù)處理。
4.2.2 單線程Reactor反應(yīng)器的參考代碼
Doug Lea在《Scalable IO in Java》中,實(shí)現(xiàn)了一個單線程Reactor反應(yīng)器模式的參考代碼。這里,我們站在巨人的肩膀上,借鑒Doug Lea的實(shí)現(xiàn),對其進(jìn)行介紹。為了方便說明,對Doug Lea的參考代碼進(jìn)行一些適當(dāng)?shù)男薷?。具體的參考代碼如下:
//...
class Reactor implements Runnable {
Selector selector;
ServerSocketChannelserverSocket;
EchoServerReactor() throws IOException {
//....省略:打開選擇器、serverSocket連接監(jiān)聽通道
//注冊serverSocket的accept事件
SelectionKeysk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
//將新連接處理器作為附件,綁定到sk選擇鍵
sk.attach(new AcceptorHandler());
}
public void run() {
//選擇器輪詢
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext()) {
//反應(yīng)器負(fù)責(zé)dispatch收到的事件
SelectionKeysk=it.next();
dispatch(sk);
}
selected.clear();
}
} catch (IOException ex) { ex.printStackTrace(); }
}
//反應(yīng)器的分發(fā)方法
void dispatch(SelectionKey k) {
Runnable handler = (Runnable) (k.attachment());
//調(diào)用之前綁定到選擇鍵的handler處理器對象
if (handler != null) {
handler.run();
}
}
// 新連接處理器
class AcceptorHandler implements Runnable {
public void run() {
//接受新連接
//需要為新連接,創(chuàng)建一個輸入輸出的handler處理器
}
}
//….
}
在上面的代碼中,設(shè)計(jì)了一個Handler處理器,叫作AcceptorHandler處理器,它是一個內(nèi)部類。在注冊serverSocket服務(wù)監(jiān)聽連接的接受事件之后,創(chuàng)建一個AcceptorHandler新連接處理器的實(shí)例,作為附件,被設(shè)置(attach)到了SelectionKey中。
//注冊serverSocket的接受(accept)事件
SelectionKeysk = serverSocket.register(selector, SelectionKey.OP_ACCEPT);
//將新連接處理器作為附件,綁定到sk選擇鍵
sk.attach(new AcceptorHandler());
當(dāng)新連接事件發(fā)生后,取出了之前attach到SelectionKey中的Handler業(yè)務(wù)處理器,進(jìn)行socket的各種IO處理
void dispatch(SelectionKey k) {
Runnable r = (Runnable) (k.attachment());
//調(diào)用之前綁定到選擇鍵的處理器對象
if (r != null) {
r.run();
}
}
AcceptorHandler處理器的兩大職責(zé):一是接受新連接,二是在為新連接創(chuàng)建一個輸入輸出的Handler處理器,稱之為IOHandler。
// 新連接處理器
class AcceptorHandler implements Runnable {
public void run() {
// 接受新連接
// 需要為新連接創(chuàng)建一個輸入輸出的handler處理器
}
}
IOHandler,顧名思義,就是負(fù)責(zé)socket的數(shù)據(jù)輸入、業(yè)務(wù)處理、結(jié)果輸出。示例代碼如下:
//...
class IOHandler implements Runnable {
final SocketChannel channel;
final SelectionKeysk;
IOHandler (Selector selector, SocketChannel c) throws IOException {
channel = c;
c.configureBlocking(false);
//僅僅取得選擇鍵,稍候設(shè)置感興趣的IO事件
sk = channel.register(selector, 0);
//將Handler處理器作為選擇鍵的附件
sk.attach(this);
//注冊讀寫就緒事件
sk.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);
}
public void run() {
//...處理輸入和輸出
}
}
在IOHandler的構(gòu)造器中,有兩點(diǎn)比較重要:
- 將新的SocketChannel傳輸通道,注冊到了反應(yīng)器Reactor類的同一個選擇器中。這樣保證了Reactor類和Handler類在同一個線程中執(zhí)行。
- Channel傳輸通道注冊完成后,將IOHandler自身作為附件,attach到了選擇鍵中。這樣,在Reactor類分發(fā)事件(選擇鍵)時(shí),能執(zhí)行到IOHandler的run方法。
如果上面的示例代碼比較繞口,不要緊。為了徹底地理解個中妙處,自己動手開發(fā)一個可以執(zhí)行的實(shí)例。下面基于反應(yīng)器模式,實(shí)現(xiàn)了一個EchoServer回顯服務(wù)器實(shí)例。仔細(xì)閱讀和運(yùn)行這個實(shí)例,就可以明白上面這段繞口的程序代碼的真正含義了。
4.2.3 一個Reactor反應(yīng)器版本的EchoServer實(shí)踐案例
EchoServer回顯服務(wù)器的功能很簡單:讀取客戶端的輸入,回顯到客戶端,所以也叫回顯服務(wù)器?;赗eactor反應(yīng)器模式來實(shí)現(xiàn),設(shè)計(jì)3個重要的類:
- 設(shè)計(jì)一個反應(yīng)器類:EchoServerReactor類。
- 設(shè)計(jì)兩個處理器類:AcceptorHandler新連接處理器、EchoHandler回顯處理器。
反應(yīng)器類EchoServerReactor的實(shí)現(xiàn)思路和前面的示例代碼基本上相同,具體如下:
//.....
//反應(yīng)器
class EchoServerReactor implements Runnable {
Selector selector;
ServerSocketChannel serverSocket;
EchoServerReactor() throws IOException {
//...獲取選擇器、開啟serverSocket服務(wù)監(jiān)聽通道
//...綁定AcceptorHandler新連接處理器到selectKey
}
//輪詢和分發(fā)事件
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set<SelectionKey> selected = selector.selectedKeys();
Iterator<SelectionKey> it = selected.iterator();
while (it.hasNext()) {
//反應(yīng)器負(fù)責(zé)dispatch收到的事件
SelectionKey sk = it.next();
dispatch(sk);
}
selected.clear();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
void dispatch(SelectionKeysk) {
Runnable handler = (Runnable) sk.attachment();
//調(diào)用之前attach綁定到選擇鍵的handler處理器對象
if (handler != null) {
handler.run();
}
}
// Handler:新連接處理器
class AcceptorHandler implements Runnable {
public void run() {
try {
SocketChannel channel = serverSocket.accept();
if (channel != null)
new EchoHandler(selector, channel);
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException {
new Thread(new EchoServerReactor()).start();
}
}
EchoHandler回顯處理器,主要是完成客戶端的內(nèi)容讀取和回顯,具體如下:
//...
class EchoHandler implements Runnable {
final SocketChannel channel;
final SelectionKeysk;
final ByteBufferbyteBuffer = ByteBuffer.allocate(1024);
static final int RECIEVING = 0, SENDING = 1;
int state = RECIEVING;
EchoHandler(Selector selector, SocketChannel c) throws IOException {
channel = c;
c.configureBlocking(false);
//取得選擇鍵,再設(shè)置感興趣的IO事件
sk = channel.register(selector, 0);
//將Handler自身作為選擇鍵的附件
sk.attach(this);
//注冊Read就緒事件
sk.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}
public void run() {
try {
if (state == SENDING) {
//寫入通道
channel.write(byteBuffer);
//寫完后,準(zhǔn)備開始從通道讀,byteBuffer切換成寫入模式
byteBuffer.clear();
//寫完后,注冊read就緒事件
sk.interestOps(SelectionKey.OP_READ);
//寫完后,進(jìn)入接收的狀態(tài)
state = RECIEVING;
} else if (state == RECIEVING) {
//從通道讀
int length = 0;
while ((length = channel.read(byteBuffer)) > 0) {
Logger.info(new String(byteBuffer.array(), 0, length));
}
//讀完后,準(zhǔn)備開始寫入通道,byteBuffer切換成讀取模式
byteBuffer.flip();
//讀完后,注冊write就緒事件
sk.interestOps(SelectionKey.OP_WRITE);
//讀完后,進(jìn)入發(fā)送的狀態(tài)
state = SENDING;
}
//處理結(jié)束了, 這里不能關(guān)閉select key,需要重復(fù)使用
//sk.cancel();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
以上兩個類,是一個基于反應(yīng)器模式的EchoServer回顯服務(wù)器的完整實(shí)現(xiàn)。它是一個單線程版本的反應(yīng)器模式,Reactor反應(yīng)器和所有的Handler處理器,都執(zhí)行在同一條線程中。
運(yùn)行EchoServerReactor類中的main方法,可以啟動回顯服務(wù)器。如果要看到回顯輸出,還需要啟動客戶端??蛻舳说拇a,在同一個包下,類名為EchoClient,負(fù)責(zé)數(shù)據(jù)的發(fā)送。代碼如下:
public class EchoClient {
public void start() throws IOException {
InetSocketAddress address =
new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,
NioDemoConfig.SOCKET_SERVER_PORT);
// 1、獲取通道(channel)
SocketChannel socketChannel = SocketChannel.open(address);
// 2、切換成非阻塞模式
socketChannel.configureBlocking(false);
//不斷的自旋、等待連接完成,或者做一些其他的事情
while (!socketChannel.finishConnect()) {
}
Print.tcfo("客戶端啟動成功!");
//啟動接受線程
Processer processer = new Processer(socketChannel);
new Thread(processer).start();
}
static class Processer implements Runnable {
final Selector selector;
final SocketChannel channel;
Processer(SocketChannel channel) throws IOException {
//Reactor初始化
selector = Selector.open();
this.channel = channel;
channel.register(selector,
SelectionKey.OP_READ | SelectionKey.OP_WRITE);
}
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set<SelectionKey> selected = selector.selectedKeys();
Iterator<SelectionKey> it = selected.iterator();
while (it.hasNext()) {
SelectionKey sk = it.next();
if (sk.isWritable()) {
ByteBuffer buffer = ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE);
Scanner scanner = new Scanner(System.in);
Print.tcfo("請輸入發(fā)送內(nèi)容:");
if (scanner.hasNext()) {
SocketChannel socketChannel = (SocketChannel) sk.channel();
String next = scanner.next();
buffer.put((Dateutil.getNow() + " >>" + next).getBytes());
buffer.flip();
// 操作三:通過DatagramChannel數(shù)據(jù)報(bào)通道發(fā)送數(shù)據(jù)
socketChannel.write(buffer);
buffer.clear();
}
}
if (sk.isReadable()) {
// 若選擇鍵的IO事件是“可讀”事件,讀取數(shù)據(jù)
SocketChannel socketChannel = (SocketChannel) sk.channel();
//讀取數(shù)據(jù)
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
int length = 0;
while ((length = socketChannel.read(byteBuffer)) > 0) {
byteBuffer.flip();
Logger.info("server echo:" + new String(byteBuffer.array(), 0, length));
byteBuffer.clear();
}
}
//處理結(jié)束了, 這里不能關(guān)閉select key,需要重復(fù)使用
//selectionKey.cancel();
}
selected.clear();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException {
new EchoClient().start();
}
}
代碼測試,client類運(yùn)行結(jié)果如下:
[main|EchoClient.start]:客戶端啟動成功!
[Thread-0|EchoClient$Processer.run]:請輸入發(fā)送內(nèi)容:
hello
server類運(yùn)行結(jié)果如下:
[Thread-0|EchoHandler.run] |> 2019-11-19 02:39:51 >>hello
4.2.4 單線程Reactor反應(yīng)器模式的缺點(diǎn)
單線程Reactor反應(yīng)器模式,是基于Java的NIO實(shí)現(xiàn)的。相對于傳統(tǒng)的多線程OIO,反應(yīng)器模式不再需要啟動成千上萬條線程,效率自然是大大提升了。
在單線程反應(yīng)器模式中,Reactor反應(yīng)器和Handler處理器,都執(zhí)行在同一條線程上。這樣,帶來了一個問題:當(dāng)其中某個Handler阻塞時(shí),會導(dǎo)致其他所有的Handler都得不到執(zhí)行。在這種場景下,如果被阻塞的Handler不僅僅負(fù)責(zé)輸入和輸出處理的業(yè)務(wù),還包括負(fù)責(zé)連接監(jiān)聽的AcceptorHandler處理器。這個是非常嚴(yán)重的問題。
為什么?一旦AcceptorHandler處理器阻塞,會導(dǎo)致整個服務(wù)不能接收新的連接,使得服務(wù)器變得不可用。因?yàn)檫@個缺陷,因此單線程反應(yīng)器模型用得比較少。
另外,目前的服務(wù)器都是多核的,單線程反應(yīng)器模式模型不能充分利用多核資源??傊诟咝阅芊?wù)器應(yīng)用場景中,單線程反應(yīng)器模式實(shí)際使用的很少。
4.3 多線程的Reactor反應(yīng)器模式
既然Reactor反應(yīng)器和Handler處理器,擠在一個線程會造成非常嚴(yán)重的性能缺陷。那么,可以使用多線程,對基礎(chǔ)的反應(yīng)器模式進(jìn)行改造和演進(jìn)。
4.3.1 多線程池Reactor反應(yīng)器演進(jìn)
多線程池Reactor反應(yīng)器的演進(jìn),分為兩個方面:
- 首先是升級Handler處理器。既要使用多線程,又要盡可能的高效率,則可以考慮使用線程池。
- 其次是升級Reactor反應(yīng)器??梢钥紤]引入多個Selector選擇器,提升選擇大量通道的能力。
總體來說,多線程池反應(yīng)器的模式,大致如下:
- 將負(fù)責(zé)輸入輸出處理的IOHandler處理器的執(zhí)行,放入獨(dú)立的線程池中。這樣,業(yè)務(wù)處理線程與負(fù)責(zé)服務(wù)監(jiān)聽和IO事件查詢的反應(yīng)器線程相隔離,避免服務(wù)器的連接監(jiān)聽受到阻塞。
- 如果服務(wù)器為多核的CPU,可以將反應(yīng)器線程拆分為多個子反應(yīng)器(SubReactor)線程;同時(shí),引入多個選擇器,每一個SubReactor子線程負(fù)責(zé)一個選擇器。這樣,充分釋放了系統(tǒng)資源的能力;也提高了反應(yīng)器管理大量連接,提升選擇大量通道的能力。
4.3.2 多線程Reactor反應(yīng)器的實(shí)踐案例
在前面的“回顯服務(wù)器”(EchoServer)的基礎(chǔ)上,完成多線程Reactor反應(yīng)器的升級。多線程反應(yīng)器的實(shí)踐案例設(shè)計(jì)如下:
- 引入多個選擇器。
- 設(shè)計(jì)一個新的子反應(yīng)器(SubReactor)類,一個子反應(yīng)器負(fù)責(zé)查詢一個選擇器。
- 開啟多個反應(yīng)器的處理線程,一個線程負(fù)責(zé)執(zhí)行一個子反應(yīng)器(SubReactor)。
為了提升效率,建議SubReactor的數(shù)量和選擇器的數(shù)量一致。避免多個線程負(fù)責(zé)一個選擇器,導(dǎo)致需要進(jìn)行線程同步,引起的效率降低。這個實(shí)踐案例的代碼如下:
//....反應(yīng)器
class MultiThreadEchoServerReactor {
ServerSocketChannelserverSocket;
AtomicInteger next = new AtomicInteger(0);
//選擇器集合,引入多個選擇器
Selector[] selectors = new Selector[2];
//引入多個子反應(yīng)器
SubReactor[] subReactors = null;
MultiThreadEchoServerReactor() throws IOException {
//初始化多個選擇器
selectors[0] = Selector.open();
selectors[1] = Selector.open();
serverSocket = ServerSocketChannel.open();
InetSocketAddress address =
new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,
NioDemoConfig.SOCKET_SERVER_PORT);
serverSocket.socket().bind(address);
//非阻塞
serverSocket.configureBlocking(false);
//第一個選擇器,負(fù)責(zé)監(jiān)控新連接事件
SelectionKeysk =
serverSocket.register(selectors[0], SelectionKey.OP_ACCEPT);
//綁定Handler:attach新連接監(jiān)控handler處理器到SelectionKey(選擇鍵)
sk.attach(new AcceptorHandler());
//第一個子反應(yīng)器,一子反應(yīng)器負(fù)責(zé)一個選擇器
SubReactor subReactor1 = new SubReactor(selectors[0]);
//第二個子反應(yīng)器,一子反應(yīng)器負(fù)責(zé)一個選擇器
SubReactor subReactor2 = new SubReactor(selectors[1]);
subReactors = new SubReactor[]{subReactor1, subReactor2};
}
private void startService() {
// 一子反應(yīng)器對應(yīng)一個線程
new Thread(subReactors[0]).start();
new Thread(subReactors[1]).start();
}
//子反應(yīng)器
class SubReactor implements Runnable {
//每個線程負(fù)責(zé)一個選擇器的查詢和選擇
final Selector selector;
public SubReactor(Selector selector) {
this.selector = selector;
}
public void run() {
try {
while (!Thread.interrupted()) {
selector.select();
Set<SelectionKey> keySet = selector.selectedKeys();
Iterator<SelectionKey> it = keySet.iterator();
while (it.hasNext()) {
//反應(yīng)器負(fù)責(zé)dispatch收到的事件
SelectionKeysk = it.next();
dispatch(sk);
}
keySet.clear();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
void dispatch(SelectionKeysk) {
Runnable handler = (Runnable) sk.attachment();
//調(diào)用之前attach綁定到選擇鍵的handler處理器對象
if (handler != null) {
handler.run();
}
}
}
// Handler:新連接處理器
class AcceptorHandler implements Runnable {
public void run() {
try {
SocketChannel channel = serverSocket.accept();
if (channel != null)
new MultiThreadEchoHandler(selectors[next.get()], channel);
} catch (IOException e) {
e.printStackTrace();
}
if (next.incrementAndGet() == selectors.length) {
next.set(0);
}
}
}
public static void main(String[] args) throws IOException {
MultiThreadEchoServerReactor server =
new MultiThreadEchoServerReactor();
server.startService();
}
}
上面是反應(yīng)器的演進(jìn)代碼,再來看看Handler處理器的多線程演進(jìn)實(shí)踐。
4.3.3 多線程Handler處理器的實(shí)踐案例
基于前面的單線程反應(yīng)器的EchoHandler回顯處理器的程序代碼,予以改進(jìn),新的回顯處理器為:MultiThreadEchoHandler。主要的升級是引入了一個線程池(ThreadPool),業(yè)務(wù)處理的代碼執(zhí)行在自己的線程池中,徹底地做到業(yè)務(wù)處理線程和反應(yīng)器IO事件線程的完全隔離。這個實(shí)踐案例的代碼如下:
//...
class MultiThreadEchoHandler implements Runnable {
final SocketChannel channel;
final SelectionKeysk;
final ByteBufferbyteBuffer = ByteBuffer.allocate(1024);
static final int RECIEVING = 0, SENDING = 1;
int state = RECIEVING;
//引入線程池
static ExecutorService pool = Executors.newFixedThreadPool(4);
MultiThreadEchoHandler(Selector selector, SocketChannel c) throws IOException {
channel = c;
c.configureBlocking(false);
//取得選擇鍵,、再設(shè)置感興趣的IO事件
sk = channel.register(selector, 0);
//將本Handler作為sk選擇鍵的附件,方便事件分發(fā)(dispatch)
sk.attach(this);
//向sk選擇鍵注冊Read就緒事件
sk.interestOps(SelectionKey.OP_READ);
selector.wakeup();
}
public void run() {
//異步任務(wù),在獨(dú)立的線程池中執(zhí)行
pool.execute(new AsyncTask());
}
//業(yè)務(wù)處理,不在反應(yīng)器線程中執(zhí)行
public synchronized void asyncRun() {
try {
if (state == SENDING) {
//寫入通道
channel.write(byteBuffer);
//寫完后,準(zhǔn)備開始從通道讀,byteBuffer切換成寫入模式
byteBuffer.clear();
//寫完后,注冊read就緒事件
sk.interestOps(SelectionKey.OP_READ);
//寫完后,進(jìn)入接收的狀態(tài)
state = RECIEVING;
} else if (state == RECIEVING) {
//從通道讀
int length = 0;
while ((length = channel.read(byteBuffer)) > 0) {
Logger.info(new String(byteBuffer.array(), 0, length));
}
//讀完后,準(zhǔn)備開始寫入通道,byteBuffer切換成讀取模式
byteBuffer.flip();
//讀完后,注冊write就緒事件
sk.interestOps(SelectionKey.OP_WRITE);
//讀完后,進(jìn)入發(fā)送的狀態(tài)
state = SENDING;
}
//處理結(jié)束了, 這里不能關(guān)閉select key,需要重復(fù)使用
//sk.cancel();
} catch (IOException ex) {
ex.printStackTrace();
}
}
//異步任務(wù)的內(nèi)部類
class AsyncTask implements Runnable {
public void run() {
MultiThreadEchoHandler.this.asyncRun();
}
}
}
代碼中設(shè)計(jì)了一個內(nèi)部類AsyncTask,是一個簡單的異步任務(wù)的提交類。它使得異步業(yè)務(wù)asyncRun方法,可以獨(dú)立地提交到線程池中。另外,既然業(yè)務(wù)處理異步執(zhí)行,需要在asyncRun方法的前面加上synchronized同步修飾符。
至此,多線程版本的反應(yīng)器模式,實(shí)踐案例的代碼就演示完了。執(zhí)行新版本的多線程MultiThreadEchoServerReactor服務(wù)器,可以使用之前的EchoClient客戶端與之配置,完成整個回顯(echo)的通信演示。
演示的輸出和之前單線程版本的EchoServer回顯服務(wù)器示例,是一模一樣的。
客戶端演示結(jié)果:
[main|EchoClient.start]:客戶端啟動成功!
[Thread-0|EchoClient$Processer.run]:請輸入發(fā)送內(nèi)容:
Multi helloworld
服務(wù)器端演示結(jié)果:
[pool-1-thread-1|MultiThreadEchoHandler.asyncRun] |> 2019-11-19 03:13:28 >>Multi
4.4 Reactor反應(yīng)器模式小結(jié)
在總結(jié)反應(yīng)器模式前,首先看看和其他模式的對比,加強(qiáng)一下對它的理解。
- 反應(yīng)器模式和生產(chǎn)者消費(fèi)者模式對比
相似之處:在一定程度上,反應(yīng)器模式有點(diǎn)類似生產(chǎn)者消費(fèi)者模式。在生產(chǎn)者消費(fèi)者模式中,一個或多個生產(chǎn)者將事件加入到一個隊(duì)列中,一個或多個消費(fèi)者主動地從這個隊(duì)列中提?。≒ull)事件來處理。
不同之處在于:反應(yīng)器模式是基于查詢的,沒有專門的隊(duì)列去緩沖存儲IO事件,查詢到IO事件之后,反應(yīng)器會根據(jù)不同IO選擇鍵(事件)將其分發(fā)給對應(yīng)的Handler處理器來處理。
- 反應(yīng)器模式和觀察者模式(Observer Pattern)對比
相似之處在于:在反應(yīng)器模式中,當(dāng)查詢到IO事件后,服務(wù)處理程序使用單路/多路分發(fā)(Dispatch)策略,同步地分發(fā)這些IO事件。觀察者模式(Observer Pattern)也被稱作發(fā)布/訂閱模式,它定義了一種依賴關(guān)系,讓多個觀察者同時(shí)監(jiān)聽某一個主題(Topic)。這個主題對象在狀態(tài)發(fā)生變化時(shí),會通知所有觀察者,它們能夠執(zhí)行相應(yīng)的處理。
不同之處在于:在反應(yīng)器模式中,Handler處理器實(shí)例和IO事件(選擇鍵)的訂閱關(guān)系,基本上是一個事件綁定到一個Handler處理器;每一個IO事件(選擇鍵)被查詢后,反應(yīng)器會將事件分發(fā)給所綁定的Handler處理器;而在觀察者模式中,同一個時(shí)刻,同一個主題可以被訂閱過的多個觀察者處理。
最后,總結(jié)一下反應(yīng)器模式的優(yōu)點(diǎn)和缺點(diǎn)。作為高性能的IO模式,反應(yīng)器模式的優(yōu)點(diǎn)如下:
響應(yīng)快,雖然同一反應(yīng)器線程本身是同步的,但不會被單個連接的同步IO所阻塞;
編程相對簡單,最大程度避免了復(fù)雜的多線程同步,也避免了多線程的各個進(jìn)程之間切換的開銷;
可擴(kuò)展,可以方便地通過增加反應(yīng)器線程的個數(shù)來充分利用CPU資源。
反應(yīng)器模式的缺點(diǎn)如下:
反應(yīng)器模式增加了一定的復(fù)雜性,因而有一定的門檻,并且不易于調(diào)試。
反應(yīng)器模式需要操作系統(tǒng)底層的IO多路復(fù)用的支持,如Linux中的epoll。如果操作系統(tǒng)的底層不支持IO多路復(fù)用,反應(yīng)器模式不會有那么高效。
同一個Handler業(yè)務(wù)線程中,如果出現(xiàn)一個長時(shí)間的數(shù)據(jù)讀寫,會影響這個反應(yīng)器中其他通道的IO處理。例如在大文件傳輸時(shí),IO操作就會影響其他客戶端(Client)的響應(yīng)時(shí)間。因而對于這種操作,還需要進(jìn)一步對反應(yīng)器模式進(jìn)行改進(jìn)。
4.5 本章小結(jié)
反應(yīng)器(Reactor)模式是高性能網(wǎng)絡(luò)編程在設(shè)計(jì)和架構(gòu)層面的基礎(chǔ)模式。同時(shí),反應(yīng)器模式,也是BAT級別大公司必不可少的面試題。
本章首先從最簡單的Connection Per Thread(一個線程處理一個連接)模式入手,介紹了該模式的嚴(yán)重缺陷,從而引出來了單線程的反應(yīng)器模式。
為了充分利用系統(tǒng)資源,最大限度地減少阻塞,在單線程的反應(yīng)器模式的基礎(chǔ)上,又演進(jìn)出來了多線程的反應(yīng)器模式實(shí)現(xiàn)。
本章的反應(yīng)器模式的實(shí)現(xiàn),僅僅是拋磚引玉,在充分利用系統(tǒng)資源、最大限度地減少阻塞兩個維度,都有很大的提升空間,建議大家自行嘗試。