Reactor模式泛談

1 什么是Reactor模式

wki上對reactor模式的定義是

The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.

翻譯成中文: Reactor模式是一種事件處理模式。在這種模式中,存在一個或多個并發(fā)的輸入,它們將事件提交到事件處理服務(wù),事件處理服務(wù)以多路復用的方式同步的將它們分發(fā)到請求處理器。
在Reactor模式中,有以下幾個核心的模塊

  • Handles
    資源句柄:如文件,網(wǎng)絡(luò)連接,同步事件分解器可以從資源句柄中等待事件。
  • Synchronous Event Demultiplexer
    同步事件分解器: 以阻塞的形式從資源句柄集合中等待事件,當資源集合中的某個資源已經(jīng)就緒,允許對其執(zhí)行某個操作時,同步事件分解器即可返回。典型的實現(xiàn)有l(wèi)inux下的eselectepoll。
  • Initiation Dispatcher
    初始分發(fā)器:定義一個統(tǒng)一的接口用于事件處理器的注冊,移除,和事件分發(fā)。
  • Event Handler
    事件處理器:定義一個統(tǒng)一的接口用于處理事件。
  • Concrete Event Handler
    具體事件處理器:實現(xiàn)了事件處理器,用于處理不同類型的事件。
    這些模塊以如下方式互動:
    應(yīng)用將具體事件處理器注冊到事件分發(fā)器上,當同步事件處理器等待到某個事件發(fā)生時,它調(diào)用初始事件分發(fā)器,由應(yīng)用注冊的具體的事件處理器對事件進行處理。
    通常而言,事件是指接受連接(connection accept),數(shù)據(jù)輸入輸出(data input and output),超時(timeout)等。

Reactor pattern的類圖如下:


Reactor pattern

根據(jù)此類圖的實現(xiàn)代碼和測試代碼如下:

package scaiz.pattern;

import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;

public class ReactorPattern {

  public static class Event {

    String type;
    String input;

    Event(String type, String input) {
      this.type = type;
      this.input = input;
    }

    public String toString() {
      return "[type: " + type + ", input: " + input + " ]";
    }
  }


  public static class Demultiplexer {

    private BlockingQueue<Handle> blockingQueue = new LinkedBlockingDeque<>();

    Handle select() {
      try {
        return blockingQueue.take();
      } catch (InterruptedException e) {
        throw new RuntimeException(e);
      }
    }
  }

  public static class Handle { // Event Producer

    private Event event;

    Demultiplexer demultiplexer;

    Handle(Demultiplexer demultiplexer) {
      this.demultiplexer = demultiplexer;
    }

    Event getEvent() {
      Event e = this.event;
      this.event = null;
      return e;
    }

    void putEvent(Event event) {
      this.event = event;
      this.demultiplexer.blockingQueue.add(this);
    }
  }

  public interface EventHandler {

    void handle(Event event);
  }

  public static class ConcreteEventHandler implements EventHandler {

    private final String type;

    ConcreteEventHandler(String type) {
      this.type = type;
    }

    @Override
    public void handle(Event event) {
      if (Objects.equals(this.type, event.type)) {
        System.out.println("Event " + event + " handled by " + Thread.currentThread().getName());
      }
    }
  }


  public static class InitiationDispatcher {

    private List<EventHandler> handlers = new LinkedList<>();

    void handle(Event event) {
      for (EventHandler handler : handlers) {
        handler.handle(event);
      }
    }

    void registerHandler(EventHandler handler) {
      handlers.add(handler);
    }

    void removeHandler(EventHandler handler) {
      handlers.remove(handler);
    }
  }


  public static void main(String args[]) {
    Demultiplexer demultiplexer = new Demultiplexer();
    Handle handle1 = new Handle(demultiplexer);
    Handle handle2 = new Handle(demultiplexer);

    new Thread(() -> {
      for (int i = 0; i < 10; i++) {
        try {
          TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        handle1.putEvent(new Event("accept",
            UUID.randomUUID().toString()));
      }

    }).start();

    new Thread(() -> {
      for (int i = 0; i < 10; i++) {
        try {
          TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
        handle1.putEvent(new Event("input",
            UUID.randomUUID().toString()));
      }
    }).start();

    new Thread(() -> handle2.putEvent(new Event("input",
        UUID.randomUUID().toString()))).start();

    InitiationDispatcher dispatcher = new InitiationDispatcher();
    dispatcher.registerHandler(new ConcreteEventHandler("accept"));
    dispatcher.registerHandler(new ConcreteEventHandler("input"));

    new Thread(() -> {
      Handle handle;
      do {
        handle = demultiplexer.select();
        if (handle != null) {
          dispatcher.handle(handle.getEvent());
        }
      } while (handle != null);

    }).start();
  }
}

2 Reactor模式示例

3 為什么使用Reactor模式

4 Reactor模式應(yīng)用

5 Reactor模式相關(guān)

a. C10k Problem

b. Java 線程代價

6 相關(guān)資料

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容