JAVA進階篇(10)—Guava實現(xiàn)的EventBus(調(diào)度算法源碼分析)

1. 使用方式

  1. 引入依賴
<dependency>
    <groupId>com.google.guava</groupId>
    <artifactId>guava</artifactId>
    <version>27.0.1-jre</version>
</dependency>
  1. 定義被觀察者類

由該類觸發(fā)事件通知:

public class TestBus {

    /**
     * EventBus,默認使用PerThreadQueuedDispatcher分發(fā)器(該分發(fā)器內(nèi)部維護的Executor是執(zhí)行執(zhí)行線程run方法,即使用主線程執(zhí)行監(jiān)聽方法)。
     * 該分發(fā)器是每個線程內(nèi)部維護了一個queue。
     * 每個線程互不干擾(都利于本身線程去串行的執(zhí)行觀察者的方法)
     *
     */
    public static void testPerThreadQueuedDispatcher(){
        EventBus eventBus = new EventBus();

        //觀察者1
        DataObserver1 observer1 = new DataObserver1();
        //觀察者2
        DataObserver2 observer2 = new DataObserver2();
        
        eventBus.register(observer2);
        eventBus.register(observer1);

        Thread t1 = new Thread(() -> {
            eventBus.post("信息1;");
            eventBus.post("信息5;");
        });

        Thread t2 = new Thread(() -> {
            eventBus.post("信息2;");
        });

        Thread t3 = new Thread(() -> {
            eventBus.post(123);
        });
        
        t1.start();
        t2.start();
        try {
            Thread.sleep(20);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        t3.start();
    }

}
  1. 定義多個觀察者
@Slf4j
public class DataObserver2 {
    /**
     * post() 不支持自動裝箱功能,只能使用Integer,不能使用int,否則handlersByType的Class會是int而不是Intege
     * 而傳入的int msg參數(shù)在post(int msg)的時候會被包裝成Integer,導致無法匹配到
     */
    @Subscribe
    public void func(Integer msg) {
        log.info("Integer msg: " + msg);
    }
}
@Slf4j
public class DataObserver1 {
    /**
     * 只有通過@Subscribe注解的方法才會被注冊進EventBus
     * 而且方法有且只能有1個參數(shù)
     *
     * @param msg
     */
    @Subscribe
//    @AllowConcurrentEvents
    public void func(String msg) throws InterruptedException {
        log.info("消息開始~:" + msg);
        Thread.sleep(2000);
        log.info("消息結(jié)束~:" + msg);
    }
}

使用原理:觀察者對象注冊到EventBus中,而EventBus會通過反射解析觀察者及其父類對象是否存在@Subscribe注解,若是存在,則維護一個Map(key是對應方法的參數(shù)類型,value是Subscriber對象)。
當被觀察者通過post()方法發(fā)送事件后,會解析事件的類型,找打?qū)?code>Subscriber(消費者對象)。然后循環(huán)通過反射調(diào)用對應的觀察者方法。完成事件通知。

2. EventBus源碼分析

事件總線的配置:

@Beta
public class EventBus {

  private static final Logger logger = Logger.getLogger(EventBus.class.getName());
  //id標識符
  private final String identifier;
  //發(fā)送事件的線程池
  private final Executor executor;
  //訂閱者異常處理器
  private final SubscriberExceptionHandler exceptionHandler;
  //訂閱者解析器
  private final SubscriberRegistry subscribers = new SubscriberRegistry(this);
  //分發(fā)器策略
  private final Dispatcher dispatcher;
  ...
}

查看其構(gòu)造方法:

public class EventBus {

 ...
  /** Creates a new EventBus named "default". */
  public EventBus() {
    this("default");
  }

  public EventBus(String identifier) {
    this(
        identifier,
        MoreExecutors.directExecutor(),
        Dispatcher.perThreadDispatchQueue(),
        LoggingHandler.INSTANCE);
  }

  public EventBus(SubscriberExceptionHandler exceptionHandler) {
    this(
        "default",
        MoreExecutors.directExecutor(),
        Dispatcher.perThreadDispatchQueue(),
        exceptionHandler);
  }

  EventBus(
      String identifier,
      Executor executor,
      Dispatcher dispatcher,
      SubscriberExceptionHandler exceptionHandler) {
    this.identifier = checkNotNull(identifier);
    this.executor = checkNotNull(executor);
    this.dispatcher = checkNotNull(dispatcher);
    this.exceptionHandler = checkNotNull(exceptionHandler);
  }
}

可以看到,EventBus對外暴露的構(gòu)造方法,只能去修改identifierexceptionHandler兩個參數(shù)。

  • 發(fā)送事件的線程池executor使用的是MoreExecutors.directExecutor()
  • 消息的轉(zhuǎn)發(fā)器dispatcher使用的是Dispatcher.perThreadDispatchQueue();

executordispatcher兩個參數(shù)決定了什么呢?

public class EventBus {

  public void post(Object event) {
    //通過事件,找到所有的訂閱者。
    Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
    //存在訂閱者
    if (eventSubscribers.hasNext()) {
      //使用dispatcher去分發(fā)消息
      dispatcher.dispatch(event, eventSubscribers);
    } else if (!(event instanceof DeadEvent)) {
      // the event had no subscribers and was not itself a DeadEvent
      post(new DeadEvent(this, event));
    }
  }
}

默認:Dispatcher.perThreadDispatchQueue()的作用:

每一個線程內(nèi)部都有一個queue,從而保證單線程中消息的有序性。

  private static final class PerThreadQueuedDispatcher extends Dispatcher {

    // This dispatcher matches the original dispatch behavior of EventBus.

    /** Per-thread queue of events to dispatch. */
    private final ThreadLocal<Queue<Event>> queue =
        new ThreadLocal<Queue<Event>>() {
          @Override
          protected Queue<Event> initialValue() {
            return Queues.newArrayDeque();
          }
        };

    /** Per-thread dispatch state, used to avoid reentrant event dispatching. */
    private final ThreadLocal<Boolean> dispatching =
        new ThreadLocal<Boolean>() {
          @Override
          protected Boolean initialValue() {
            return false;
          }
        };

    @Override
    void dispatch(Object event, Iterator<Subscriber> subscribers) {
      checkNotNull(event);
      checkNotNull(subscribers);
      Queue<Event> queueForThread = queue.get();
      //放入隊尾【1】重入的線程事件會放入到隊列尾部
      queueForThread.offer(new Event(event, subscribers));
      //【1】線程再次重入后,該方法!dispatching.get()為false,直接結(jié)束
      if (!dispatching.get()) {
        dispatching.set(true);
        try {
          Event nextEvent;
          //檢索并刪除此隊列的頭,如果此隊列為空,則返回 null 。
          while ((nextEvent = queueForThread.poll()) != null) {
            //第一個事件通知給所有的訂閱者,才會通知后續(xù)的消息。
            while (nextEvent.subscribers.hasNext()) {
              //當訂閱者中再次使用同一個EventBus發(fā)布消息,線程會沖入【1】
              nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
            }
          }
        } finally {
          //單線程
          dispatching.remove();
          queue.remove();
        }
      }
    }
    //構(gòu)建事件對象(隊列的元素)
    private static final class Event {
      private final Object event;
      private final Iterator<Subscriber> subscribers;

      private Event(Object event, Iterator<Subscriber> subscribers) {
        this.event = event;
        this.subscribers = subscribers;
      }
    }
  }

使用場景:

代碼A中發(fā)布事件(String類型),B訂閱者收到消息后,在B中發(fā)布事件(Integer類型)。

該分發(fā)器會確保A事件通知給所有訂閱者才會執(zhí)行B事件(同一個線程中,訂閱者發(fā)布的事件要排到后面去執(zhí)行)。

默認:MoreExecutors.directExecutor()

class Subscriber {
  final void dispatchEvent(final Object event) {
    //分發(fā)事件,是使用的線程池
    executor.execute(
        new Runnable() {
          @Override
          public void run() {
            try {
              invokeSubscriberMethod(event);
            } catch (InvocationTargetException e) {
              bus.handleSubscriberException(e.getCause(), context(event));
            }
          }
        });
  }
}

而MoreExecutors.directExecutor()使用如下的線程池,即訂閱者使用當前線程同步的處理事件。

enum DirectExecutor implements Executor {
  INSTANCE;

  @Override
  public void execute(Runnable command) {
    command.run();
  }

  @Override
  public String toString() {
    return "MoreExecutors.directExecutor()";
  }
}

訂閱者去使用EventBus的線程去消費消息,可以保證消息的有序性。即先post的事件一定會先執(zhí)行。

總結(jié):

EventBus的特點:

  1. 單個線程上發(fā)布的所有事件都按其發(fā)布的順序被調(diào)度到所有訂閱服務器;
  2. 發(fā)布者和多個訂閱者使用同一個線程處理??赡軙绊懓l(fā)布者的性能,且某個訂閱者耗時,也會影響其他訂閱者;

EventBus特點的場景:

    public static void testPre1(){
        //單例獲取到事件總線
        EventBus eventBus = EventBusCenter.getInstance();
        DataObserver1 observer1 = new DataObserver1();
        DataObserver2 observer2 = new DataObserver2();
        //注冊訂閱者1
        eventBus.register(observer1);
        //注冊訂閱者2
        eventBus.register(observer2);
        //通知訂閱者1
        eventBus.post("發(fā)送事件!");
    }

訂閱者1收到消息后,通知訂閱者2。但是123事件會存儲在ThreadLocal<Queue>中,等待發(fā)送事件!事件通知完所有的訂閱者,才開始通知123事件。

@Slf4j
public class DataObserver1 {
    /**
     * 只有通過@Subscribe注解的方法才會被注冊進EventBus
     * 而且方法有且只能有1個參數(shù)
     *
     * @param msg
     */
    @Subscribe
    public void func(String msg) throws InterruptedException {
        log.info("收到消息:{}", msg);
        EventBus eventBus = EventBusCenter.getInstance();
        eventBus.post(123);
    }
}
@Slf4j
public class DataObserver2 {
    /**
     * post() 不支持自動裝箱功能,只能使用Integer,不能使用int,否則handlersByType的Class會是int而不是Intege
     * 而傳入的int msg參數(shù)在post(int msg)的時候會被包裝成Integer,導致無法匹配到
     */
    @Subscribe
    public void func(Integer msg) {
        log.info("Integer msg: " + msg);
    }
}

3. AsyncEventBus源碼分析

構(gòu)造方法:

  public AsyncEventBus(String identifier, Executor executor) {
    super(identifier, executor, Dispatcher.legacyAsync(), LoggingHandler.INSTANCE);
  }
  1. 可以傳入exector去異步發(fā)布消息。
  2. 只能使用Dispatcher.legacyAsync()去調(diào)度消息。
  private static final class LegacyAsyncDispatcher extends Dispatcher {

    // This dispatcher matches the original dispatch behavior of AsyncEventBus.
    //
    // We can't really make any guarantees about the overall dispatch order for this dispatcher in
    // a multithreaded environment for a couple reasons:
    //
    // 1. Subscribers to events posted on different threads can be interleaved with each other
    //    freely. (A event on one thread, B event on another could yield any of
    //    [a1, a2, a3, b1, b2], [a1, b2, a2, a3, b2], [a1, b2, b3, a2, a3], etc.)
    // 2. It's possible for subscribers to actually be dispatched to in a different order than they
    //    were added to the queue. It's easily possible for one thread to take the head of the
    //    queue, immediately followed by another thread taking the next element in the queue. That
    //    second thread can then dispatch to the subscriber it took before the first thread does.
    //
    // All this makes me really wonder if there's any value in queueing here at all. A dispatcher
    // that simply loops through the subscribers and dispatches the event to each would actually
    // probably provide a stronger order guarantee, though that order would obviously be different
    // in some cases.

    /** Global event queue. */
    //【注意:】若發(fā)布者產(chǎn)生消息的速度遠遠大于生產(chǎn)者消費消息的速度,此處容易造成OOM
    private final ConcurrentLinkedQueue<EventWithSubscriber> queue =
        Queues.newConcurrentLinkedQueue();

    @Override
    void dispatch(Object event, Iterator<Subscriber> subscribers) {
      checkNotNull(event);
      //先放入隊列中
      while (subscribers.hasNext()) {
        queue.add(new EventWithSubscriber(event, subscribers.next()));
      }

      EventWithSubscriber e;
      //隊頭取出,并刪除元素
      while ((e = queue.poll()) != null) {
        //使用配置線程池去發(fā)布事件。
        e.subscriber.dispatchEvent(e.event);
      }
    }

    private static final class EventWithSubscriber {
      private final Object event;
      private final Subscriber subscriber;

      private EventWithSubscriber(Object event, Subscriber subscriber) {
        this.event = event;
        this.subscriber = subscriber;
      }
    }
  }
  1. 多線程事件先存儲到ConcurrentLinkedQueue中,然后在循環(huán)調(diào)用訂閱者。
  2. 可以自定義線程池可以異步的去處理事件。
  3. (訂閱者發(fā)布的事件一定會在隊尾,但是可能會被別的線程先消費)故只能某些情況下可以保證事件按照發(fā)布的順序被調(diào)度到訂閱服務器;
  4. 因為使用了ConcurrentLinkedQueue,所以可能會造成OOM。
  5. 性能沒有ImmediateDispatcher好。(采用了隊列)

源碼中注釋:(多線程下不能保證順序),所有這些讓我真的懷疑在這里排隊是否有任何價值。LegacyAsyncDispatcher它只是簡單地循環(huán)通過訂閱者并將事件分派給每個訂閱者。在某些情況下,可能會提供更強的順序保證,盡管順序明顯不同。

同一個線程,A發(fā)布事件到訂閱者B,在訂閱者B中再次發(fā)布另一個事件到C。線程會重入到dispatch方法,會將B發(fā)布的事件放到隊列中(排隊)。繼續(xù)從隊列頭開始消費消息。

【注意:該隊列是全局隊列,每一個線程都會消費其消息。】

4. ImmediateDispatcher源碼:

Guava沒有對應的EventBus,但是我們可以繼承EventBus類實現(xiàn)自定義的EventBus。

  /** Implementation of {@link #immediate()}. */
  private static final class ImmediateDispatcher extends Dispatcher {
    private static final ImmediateDispatcher INSTANCE = new ImmediateDispatcher();

    @Override
    void dispatch(Object event, Iterator<Subscriber> subscribers) {
      checkNotNull(event);
      //收到消息后,直接遍歷所有的訂閱者
      while (subscribers.hasNext()) {
        //訂閱者可以使用線程池去執(zhí)行
        subscribers.next().dispatchEvent(event);
      }
    }
  }

特點:

  1. 沒有使用隊列,但凡事件到達后立即使用去處理;
  2. 可以使用線程池異步的去消費消息;
  3. 性能要比LegacyAsyncDispatcher好;

總結(jié)

Guava的EventBus源碼還是比較簡單、清晰的。從源碼來看,它一反常用的Observer的設(shè)計方式,放棄采用統(tǒng)一的接口、統(tǒng)一的事件對象類型。轉(zhuǎn)而采用基于注解掃描的綁定方式。

其實無論是強制實現(xiàn)統(tǒng)一的接口,還是基于注解的實現(xiàn)方式都是在構(gòu)建一種關(guān)聯(lián)關(guān)系(或者說滿足某種契約)。很明顯接口的方式是編譯層面上強制的顯式契約,而注解的方式則是運行時動態(tài)綁定的隱式契約關(guān)系。接口的方式是傳統(tǒng)的方式,編譯時確定觀察者關(guān)系,清晰明了,但通常要求有一致的事件類型、方法簽名。而基于注解實現(xiàn)的機制,剛好相反,編譯時因為沒有接口的語法層面上的依賴關(guān)系,顯得不那么清晰,至少靜態(tài)分析工具很難展示觀察者關(guān)系,但無需一致的方法簽名、事件參數(shù),至于多個訂閱者類之間的繼承關(guān)系,可以繼承接收事件的通知,可以看作既是其優(yōu)點也是其缺點。

  1. EventBus需要注意:發(fā)布者和訂閱者使用同一個線程,可能會影響發(fā)布者的性能。但可以保證單線程中事件的發(fā)布順序和調(diào)度順序保持一致。
  2. AsyncEventBus需要注意的是:發(fā)布者和訂閱者可以使用不同的線程處理;發(fā)布事件時維護了一個LinkedQueue,若訂閱者消費速度慢,可能會造成內(nèi)存溢出;采用全局隊列維護事件順序性,但不能完全保證調(diào)度和發(fā)布的順序;性能不如直接分發(fā)好;
  3. guava的EventBus雖然通過注解的方式更加靈活,但是沒有接口的語法層面的依賴關(guān)系,代碼維護性、可讀性不是特別好。

推薦閱讀

Google-Guava-EventBus源碼解讀

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容