1 什么是Reactor模式
wki上對reactor模式的定義是
The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.
翻譯成中文: Reactor模式是一種事件處理模式。在這種模式中,存在一個或多個并發(fā)的輸入,它們將事件提交到事件處理服務(wù),事件處理服務(wù)以多路復用的方式同步的將它們分發(fā)到請求處理器。
在Reactor模式中,有以下幾個核心的模塊
-
Handles
資源句柄:如文件,網(wǎng)絡(luò)連接,同步事件分解器可以從資源句柄中等待事件。 -
Synchronous Event Demultiplexer
同步事件分解器: 以阻塞的形式從資源句柄集合中等待事件,當資源集合中的某個資源已經(jīng)就緒,允許對其執(zhí)行某個操作時,同步事件分解器即可返回。典型的實現(xiàn)有l(wèi)inux下的eselect和epoll。 -
Initiation Dispatcher
初始分發(fā)器:定義一個統(tǒng)一的接口用于事件處理器的注冊,移除,和事件分發(fā)。 -
Event Handler
事件處理器:定義一個統(tǒng)一的接口用于處理事件。 -
Concrete Event Handler
具體事件處理器:實現(xiàn)了事件處理器,用于處理不同類型的事件。
這些模塊以如下方式互動:
應(yīng)用將具體事件處理器注冊到事件分發(fā)器上,當同步事件處理器等待到某個事件發(fā)生時,它調(diào)用初始事件分發(fā)器,由應(yīng)用注冊的具體的事件處理器對事件進行處理。
通常而言,事件是指接受連接(connection accept),數(shù)據(jù)輸入輸出(data input and output),超時(timeout)等。
Reactor pattern的類圖如下:

Reactor pattern
根據(jù)此類圖的實現(xiàn)代碼和測試代碼如下:
package scaiz.pattern;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
public class ReactorPattern {
public static class Event {
String type;
String input;
Event(String type, String input) {
this.type = type;
this.input = input;
}
public String toString() {
return "[type: " + type + ", input: " + input + " ]";
}
}
public static class Demultiplexer {
private BlockingQueue<Handle> blockingQueue = new LinkedBlockingDeque<>();
Handle select() {
try {
return blockingQueue.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
public static class Handle { // Event Producer
private Event event;
Demultiplexer demultiplexer;
Handle(Demultiplexer demultiplexer) {
this.demultiplexer = demultiplexer;
}
Event getEvent() {
Event e = this.event;
this.event = null;
return e;
}
void putEvent(Event event) {
this.event = event;
this.demultiplexer.blockingQueue.add(this);
}
}
public interface EventHandler {
void handle(Event event);
}
public static class ConcreteEventHandler implements EventHandler {
private final String type;
ConcreteEventHandler(String type) {
this.type = type;
}
@Override
public void handle(Event event) {
if (Objects.equals(this.type, event.type)) {
System.out.println("Event " + event + " handled by " + Thread.currentThread().getName());
}
}
}
public static class InitiationDispatcher {
private List<EventHandler> handlers = new LinkedList<>();
void handle(Event event) {
for (EventHandler handler : handlers) {
handler.handle(event);
}
}
void registerHandler(EventHandler handler) {
handlers.add(handler);
}
void removeHandler(EventHandler handler) {
handlers.remove(handler);
}
}
public static void main(String args[]) {
Demultiplexer demultiplexer = new Demultiplexer();
Handle handle1 = new Handle(demultiplexer);
Handle handle2 = new Handle(demultiplexer);
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
handle1.putEvent(new Event("accept",
UUID.randomUUID().toString()));
}
}).start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
handle1.putEvent(new Event("input",
UUID.randomUUID().toString()));
}
}).start();
new Thread(() -> handle2.putEvent(new Event("input",
UUID.randomUUID().toString()))).start();
InitiationDispatcher dispatcher = new InitiationDispatcher();
dispatcher.registerHandler(new ConcreteEventHandler("accept"));
dispatcher.registerHandler(new ConcreteEventHandler("input"));
new Thread(() -> {
Handle handle;
do {
handle = demultiplexer.select();
if (handle != null) {
dispatcher.handle(handle.getEvent());
}
} while (handle != null);
}).start();
}
}