Guava之EventBus源碼

最近需要使用事件驅(qū)動(dòng),打算使用EventBus管理事件的注冊(cè)和分發(fā)。于是仔細(xì)閱讀了下Guava的EventBus實(shí)現(xiàn),并在此做了些整理。
EventBus是基于設(shè)計(jì)模式中的Observer模式的實(shí)現(xiàn)。Observer模式是非常常用的設(shè)計(jì)模式之一,jdk中的EventObject、EventListener、Observable、Observer都是為觀(guān)察者模式服務(wù)的。但隨著業(yè)務(wù)場(chǎng)景復(fù)雜度的不斷提高,我們希望能在管理事件的同時(shí)提供更多的擴(kuò)展。所以我們通過(guò)EventBus來(lái)優(yōu)雅的實(shí)現(xiàn)這些。


Observer模式

我們先簡(jiǎn)單回顧下Observer模式:


觀(guān)察者模式.png

定義對(duì)象之間的一對(duì)多依賴(lài)關(guān)系,以便當(dāng)一個(gè)對(duì)象更改狀態(tài)時(shí),它的所有依賴(lài)關(guān)系都會(huì)被通知并自動(dòng)更新。

Observer Pattern: Define a one-to-many dependency between objects so that when one object changes state, all its dependents are notified and updated automatically.

一些文中的“發(fā)布-訂閱(Publish/Subscribe)模式”其實(shí)就是Observer模式,他所做的事情和我們將要做的事情一樣:豐富Subject的功能。


EventBus

首先,我們先來(lái)看一下EventBus模塊的類(lèi):


EventBus.jpg

EventBus.class

EventBus.class:它對(duì)應(yīng)于Subject類(lèi),是整個(gè)模塊的核心,也是功能擴(kuò)展的中心點(diǎn)。
首先看下EventBus.class包含的以下幾個(gè)變量:

  private final String identifier;
  private final Executor executor;
  private final SubscriberExceptionHandler exceptionHandler;
  private final SubscriberRegistry subscribers = new SubscriberRegistry(this);
  private final Dispatcher dispatcher;
  • identifier:定義了EventBus的名稱(chēng),用來(lái)區(qū)分項(xiàng)目中的多個(gè)EventBus,在之后的Exception等日志輸出,和線(xiàn)程命名時(shí)都會(huì)有使用。
private static Logger logger(SubscriberExceptionContext context) {
      return Logger.getLogger(EventBus.class.getName() + "." + context.getEventBus().identifier());
    }
  • executor:java的異步執(zhí)行的實(shí)現(xiàn)類(lèi)。用來(lái)執(zhí)行訂閱者處理Event的方法。值得注意的是在'EventBus'中對(duì)Executor變量賦值的構(gòu)造器是私有的,也就是說(shuō)我們只能使用它所指定的Executor:'DirectExecutor.class'。但'AsyncEventBus'的Executor是被允許傳入的。這也是這兩者的區(qū)別所在之一。
  public EventBus(String identifier) {
    this(identifier, MoreExecutors.directExecutor(),
        Dispatcher.perThreadDispatchQueue(), LoggingHandler.INSTANCE);
  }
  EventBus(String identifier, Executor executor, Dispatcher dispatcher,
      SubscriberExceptionHandler exceptionHandler) {...}
  • exceptionHandler:SubscriberExceptionHandler的實(shí)現(xiàn)類(lèi),用于處理過(guò)程中產(chǎn)生的異常。
  • subscribers:創(chuàng)建了一個(gè)SubscriberRegistry用來(lái)維護(hù)Subscriber與Event的對(duì)應(yīng)關(guān)系。
  • dispatcher:Dispatcher的實(shí)現(xiàn)類(lèi),他是一個(gè)Event的分發(fā)器,所有Event都會(huì)經(jīng)過(guò)dispatcher傳遞給Subscriber。和executor一樣,它也不能被從外部傳入,在'EventBus'中默認(rèn)使用了'PerThreadQueuedDispatcher',在'AsyncEventBus'中默認(rèn)使用'LegacyAsyncDispatcher'。這是兩個(gè)類(lèi)的唯二區(qū)別了。

然后我們看下EventBus的方法,作為一個(gè)核心類(lèi),它一共只有三個(gè)public方法:

  • register:注冊(cè)event。
  • unregister:取消event注冊(cè)。
  • post:發(fā)布event。
 public void register(Object object) {
    subscribers.register(object);
  }
public void unregister(Object object) {
    subscribers.unregister(object);
  }
public void post(Object event) {
  ...
      dispatcher.dispatch(event, eventSubscribers);
  ...
  }

僅有的三個(gè)方法也都異常的簡(jiǎn)單,'register'和'unregister'都調(diào)用了SubscriberRegistry類(lèi),'post'交給了Dispatcher類(lèi)。而多線(xiàn)程的控制也通過(guò)'executor'交給了Subscriber,異常的處理不在自身管理同樣傳遞給了Subscriber,作為中心的EventBus只做了功能的定義和分配,事件的轉(zhuǎn)發(fā),完美的實(shí)現(xiàn)了功能的解耦,做到了職責(zé)單一原則。

AsyncEventBus.class

AsyncEventBus.class是EventBus.class的異步多線(xiàn)程的子類(lèi),上面也有提到過(guò),二者之間只在構(gòu)造器中有兩處區(qū)別:

  1. Executor:EventBus默認(rèn)使用'DirectExecutor.class',他是一個(gè)線(xiàn)程執(zhí)行器,簡(jiǎn)單的直接執(zhí)行傳入的Runnable。AsyncEventBus正好相反,它的Executor必須是傳入的。
 private enum DirectExecutor implements Executor {
    INSTANCE;
    @Override public void execute(Runnable command) {
      command.run();
    }
}
  1. Dispatcher:在'EventBus'中默認(rèn)使用了'PerThreadQueuedDispatcher',在'AsyncEventBus'中默認(rèn)使用'LegacyAsyncDispatcher'。前者是單線(xiàn)程同步,后者是多線(xiàn)程同步。兩者的具體區(qū)別在下面介紹。

通過(guò)上面的描述,兩者并不能通過(guò)他們類(lèi)名簡(jiǎn)單的區(qū)別為一個(gè)單線(xiàn)程,一個(gè)多線(xiàn)程。他們的區(qū)別同樣可以總結(jié)為兩點(diǎn):

  • Subscriber中都是多線(xiàn)程調(diào)用方法執(zhí)行event,區(qū)別是'EventBus'只簡(jiǎn)單的run()了線(xiàn)程,而'AsyncEventBus'能過(guò)定義線(xiàn)程池。
  • Dispatcher中都是同步分發(fā),區(qū)別是'EventBus'使用了ThreadLocal實(shí)現(xiàn)了單線(xiàn)程同步,而'AsyncEventBus'通過(guò)ConcurrentLinkedQueue使多線(xiàn)程同步分發(fā)。

Dispatcher.class

Dispatcher是一個(gè)抽象類(lèi),它本身是default的,因此無(wú)法被外部繼承,EventBus也沒(méi)有可以傳入Dispatcher的構(gòu)造器,所以對(duì)于Dispatcher我們是無(wú)法正常擴(kuò)展的。
Dispatcher中只有一個(gè)抽象方法:來(lái)實(shí)現(xiàn)消息的分發(fā)。

abstract void dispatch(Object event, Iterator<Subscriber> subscribers);

還有三個(gè)靜態(tài)方法來(lái)創(chuàng)建它的三個(gè)實(shí)現(xiàn)類(lèi):

  • PerThreadQueuedDispatcher:將收到的Event保存在了ThreadLoacl中,意味著多線(xiàn)程中即使使用了同一個(gè)Dispatcher實(shí)現(xiàn)收到的event都會(huì)分開(kāi)保存互不影響。
private final ThreadLocal<Queue<Event>> queue =
        new ThreadLocal<Queue<Event>>() {...};
private final ThreadLocal<Boolean> dispatching =
        new ThreadLocal<Boolean>() {...};
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
...
Queue<Event> queueForThread = queue.get();
queueForThread.offer(new Event(event, subscribers));
if (!dispatching.get()) {
        dispatching.set(true);
        try {
          Event nextEvent;
          while ((nextEvent = queueForThread.poll()) != null) {
            while (nextEvent.subscribers.hasNext()) {
              nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
            }
          }
        } finally {
          dispatching.remove();
          queue.remove();
        }
      }
}

里面的dispatching用于避免重入事件分派,例如循環(huán)發(fā)起Event的場(chǎng)景。

  • LegacyAsyncDispatcher:創(chuàng)建了一個(gè)ConcurrentLinkedQueue來(lái)保存收到的Event。多個(gè)線(xiàn)程中使用同一個(gè)LegacyAsyncDispatcher實(shí)現(xiàn)的話(huà),線(xiàn)程收到的Event會(huì)保存在一起,并共同完成所有Event的分發(fā)。
private final ConcurrentLinkedQueue<EventWithSubscriber> queue =
        Queues.newConcurrentLinkedQueue();
 @Override
    void dispatch(Object event, Iterator<Subscriber> subscribers) {
      while (subscribers.hasNext()) {
        queue.add(new EventWithSubscriber(event, subscribers.next()));
      }
      EventWithSubscriber e;
      while ((e = queue.poll()) != null) {
        e.subscriber.dispatchEvent(e.event);
      }
    }

值得注意的是由于分發(fā)也是多線(xiàn)程共同完成,這使得它將無(wú)法保證Event的順序性。

  • ImmediateDispatcher:該dispatcher在事件發(fā)布時(shí)立即將事件分發(fā)給訂閱者不使用中間隊(duì)列。

Subscriber.class

對(duì)應(yīng)于Observer的抽象類(lèi),但它更像是一種封裝,Subscriber自身提供了靜態(tài)創(chuàng)建方法,將真正的Observer實(shí)現(xiàn)類(lèi)和執(zhí)行Event的方法都與EventBus封裝在了一起,通過(guò)反射實(shí)現(xiàn)了對(duì)應(yīng)于不同Observer的抽象。

static Subscriber create(EventBus bus, Object listener, Method method) {
    return isDeclaredThreadSafe(method)
        ? new Subscriber(bus, listener, method)
        : new SynchronizedSubscriber(bus, listener, method);
  }
final void dispatchEvent(final Object event) {
    executor.execute(new Runnable() {
      @Override
      public void run() {
        try {
         method.invoke(target, checkNotNull(event));
        } catch (InvocationTargetException e) {
          bus.handleSubscriberException(e.getCause(), context(event));
        }
      }
    });
  }

SynchronizedSubscriber:在dispatchEvent()方法上加了synchronized同步鎖,如果正在的Observer方法是線(xiàn)程不安全的話(huà)就需要用到此類(lèi)。他是通過(guò)@AllowConcurrentEvents注解來(lái)判斷的,這里就不多講了。

SubscriberRegistry.class

維護(hù)了Subscriber與Event的對(duì)應(yīng)關(guān)系,對(duì)EventBus進(jìn)行了解耦,使EventBus職責(zé)單一。

private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
      Maps.newConcurrentMap();
  • register/unregister/getSubscribers:這三個(gè)都是維護(hù)Subscriber與Event的對(duì)應(yīng)關(guān)系的基本方法,這里就不多講了,他們只會(huì)在EventBus中被調(diào)用。
  • findAllSubscribers:這是一個(gè)private方法,他在register/unregister中被調(diào)用,之所以單獨(dú)拿出來(lái),主要是想說(shuō)明一下,他也是基于Annotation來(lái)實(shí)現(xiàn)的。在register時(shí),我們只會(huì)傳入Observer類(lèi),但Observer類(lèi)需要訂閱哪個(gè)Event,Event到底又需要調(diào)用哪個(gè)方法,都是在這個(gè)方法中能通過(guò)對(duì)@Subscriber找個(gè)Annotation的讀取和method的反射。
  private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
    Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
    Class<?> clazz = listener.getClass();
    for (Method method : getAnnotatedMethods(clazz)) {
      Class<?>[] parameterTypes = method.getParameterTypes();
      Class<?> eventType = parameterTypes[0];
      methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
    }
    return methodsInListener;
  }

最后多提一句,@Subscriber標(biāo)注的那些Method都是事先通過(guò)'getAnnotatedMethodsNotCached' 方法獲取,保存在了一個(gè)LoadingCache中的。由于和EventBus的機(jī)制沒(méi)有太大關(guān)系,這里就不展開(kāi)了。

  private static final LoadingCache<Class<?>, ImmutableList<Method>> subscriberMethodsCache =
      CacheBuilder.newBuilder().weakKeys()
          .build(new CacheLoader<Class<?>, ImmutableList<Method>>() {
            @Override
            public ImmutableList<Method> load(Class<?> concreteClass) throws Exception {
              return getAnnotatedMethodsNotCached(concreteClass);
            }
          });

總結(jié)

Guava的EventBus以EventBus類(lèi)為中心,對(duì)于Event的發(fā)布、訂閱者的管理、異常的處理都提供了專(zhuān)門(mén)的實(shí)現(xiàn)類(lèi),流程非常清楚。而且基于Annotation掃描綁定的方式會(huì)使代碼非常的簡(jiǎn)潔。但由于這種方式,在EventBus中對(duì)于事件類(lèi)型和事件參數(shù)等等并不能提供很好的支撐,而且由于基本所有的類(lèi)都是default權(quán)限的,這使得擴(kuò)展異常的艱難T~T


Classfier擴(kuò)展

待續(xù)...

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容