最近需要使用事件驅(qū)動(dòng),打算使用EventBus管理事件的注冊(cè)和分發(fā)。于是仔細(xì)閱讀了下Guava的EventBus實(shí)現(xiàn),并在此做了些整理。
EventBus是基于設(shè)計(jì)模式中的Observer模式的實(shí)現(xiàn)。Observer模式是非常常用的設(shè)計(jì)模式之一,jdk中的EventObject、EventListener、Observable、Observer都是為觀(guān)察者模式服務(wù)的。但隨著業(yè)務(wù)場(chǎng)景復(fù)雜度的不斷提高,我們希望能在管理事件的同時(shí)提供更多的擴(kuò)展。所以我們通過(guò)EventBus來(lái)優(yōu)雅的實(shí)現(xiàn)這些。
Observer模式
我們先簡(jiǎn)單回顧下Observer模式:

定義對(duì)象之間的一對(duì)多依賴(lài)關(guān)系,以便當(dāng)一個(gè)對(duì)象更改狀態(tài)時(shí),它的所有依賴(lài)關(guān)系都會(huì)被通知并自動(dòng)更新。
Observer Pattern: Define a one-to-many dependency between objects so that when one object changes state, all its dependents are notified and updated automatically.
一些文中的“發(fā)布-訂閱(Publish/Subscribe)模式”其實(shí)就是Observer模式,他所做的事情和我們將要做的事情一樣:豐富Subject的功能。
EventBus
首先,我們先來(lái)看一下EventBus模塊的類(lèi):

EventBus.class
EventBus.class:它對(duì)應(yīng)于Subject類(lèi),是整個(gè)模塊的核心,也是功能擴(kuò)展的中心點(diǎn)。
首先看下EventBus.class包含的以下幾個(gè)變量:
private final String identifier;
private final Executor executor;
private final SubscriberExceptionHandler exceptionHandler;
private final SubscriberRegistry subscribers = new SubscriberRegistry(this);
private final Dispatcher dispatcher;
- identifier:定義了EventBus的名稱(chēng),用來(lái)區(qū)分項(xiàng)目中的多個(gè)EventBus,在之后的Exception等日志輸出,和線(xiàn)程命名時(shí)都會(huì)有使用。
private static Logger logger(SubscriberExceptionContext context) {
return Logger.getLogger(EventBus.class.getName() + "." + context.getEventBus().identifier());
}
- executor:java的異步執(zhí)行的實(shí)現(xiàn)類(lèi)。用來(lái)執(zhí)行訂閱者處理Event的方法。值得注意的是在'EventBus'中對(duì)Executor變量賦值的構(gòu)造器是私有的,也就是說(shuō)我們只能使用它所指定的Executor:'DirectExecutor.class'。但'AsyncEventBus'的Executor是被允許傳入的。這也是這兩者的區(qū)別所在之一。
public EventBus(String identifier) {
this(identifier, MoreExecutors.directExecutor(),
Dispatcher.perThreadDispatchQueue(), LoggingHandler.INSTANCE);
}
EventBus(String identifier, Executor executor, Dispatcher dispatcher,
SubscriberExceptionHandler exceptionHandler) {...}
- exceptionHandler:SubscriberExceptionHandler的實(shí)現(xiàn)類(lèi),用于處理過(guò)程中產(chǎn)生的異常。
- subscribers:創(chuàng)建了一個(gè)SubscriberRegistry用來(lái)維護(hù)Subscriber與Event的對(duì)應(yīng)關(guān)系。
- dispatcher:Dispatcher的實(shí)現(xiàn)類(lèi),他是一個(gè)Event的分發(fā)器,所有Event都會(huì)經(jīng)過(guò)dispatcher傳遞給Subscriber。和executor一樣,它也不能被從外部傳入,在'EventBus'中默認(rèn)使用了'PerThreadQueuedDispatcher',在'AsyncEventBus'中默認(rèn)使用'LegacyAsyncDispatcher'。這是兩個(gè)類(lèi)的唯二區(qū)別了。
然后我們看下EventBus的方法,作為一個(gè)核心類(lèi),它一共只有三個(gè)public方法:
- register:注冊(cè)event。
- unregister:取消event注冊(cè)。
- post:發(fā)布event。
public void register(Object object) {
subscribers.register(object);
}
public void unregister(Object object) {
subscribers.unregister(object);
}
public void post(Object event) {
...
dispatcher.dispatch(event, eventSubscribers);
...
}
僅有的三個(gè)方法也都異常的簡(jiǎn)單,'register'和'unregister'都調(diào)用了SubscriberRegistry類(lèi),'post'交給了Dispatcher類(lèi)。而多線(xiàn)程的控制也通過(guò)'executor'交給了Subscriber,異常的處理不在自身管理同樣傳遞給了Subscriber,作為中心的EventBus只做了功能的定義和分配,事件的轉(zhuǎn)發(fā),完美的實(shí)現(xiàn)了功能的解耦,做到了職責(zé)單一原則。
AsyncEventBus.class
AsyncEventBus.class是EventBus.class的異步多線(xiàn)程的子類(lèi),上面也有提到過(guò),二者之間只在構(gòu)造器中有兩處區(qū)別:
- Executor:EventBus默認(rèn)使用'DirectExecutor.class',他是一個(gè)線(xiàn)程執(zhí)行器,簡(jiǎn)單的直接執(zhí)行傳入的Runnable。AsyncEventBus正好相反,它的Executor必須是傳入的。
private enum DirectExecutor implements Executor {
INSTANCE;
@Override public void execute(Runnable command) {
command.run();
}
}
- Dispatcher:在'EventBus'中默認(rèn)使用了'PerThreadQueuedDispatcher',在'AsyncEventBus'中默認(rèn)使用'LegacyAsyncDispatcher'。前者是單線(xiàn)程同步,后者是多線(xiàn)程同步。兩者的具體區(qū)別在下面介紹。
通過(guò)上面的描述,兩者并不能通過(guò)他們類(lèi)名簡(jiǎn)單的區(qū)別為一個(gè)單線(xiàn)程,一個(gè)多線(xiàn)程。他們的區(qū)別同樣可以總結(jié)為兩點(diǎn):
- Subscriber中都是多線(xiàn)程調(diào)用方法執(zhí)行event,區(qū)別是'EventBus'只簡(jiǎn)單的run()了線(xiàn)程,而'AsyncEventBus'能過(guò)定義線(xiàn)程池。
- Dispatcher中都是同步分發(fā),區(qū)別是'EventBus'使用了ThreadLocal實(shí)現(xiàn)了單線(xiàn)程同步,而'AsyncEventBus'通過(guò)ConcurrentLinkedQueue使多線(xiàn)程同步分發(fā)。
Dispatcher.class
Dispatcher是一個(gè)抽象類(lèi),它本身是default的,因此無(wú)法被外部繼承,EventBus也沒(méi)有可以傳入Dispatcher的構(gòu)造器,所以對(duì)于Dispatcher我們是無(wú)法正常擴(kuò)展的。
Dispatcher中只有一個(gè)抽象方法:來(lái)實(shí)現(xiàn)消息的分發(fā)。
abstract void dispatch(Object event, Iterator<Subscriber> subscribers);
還有三個(gè)靜態(tài)方法來(lái)創(chuàng)建它的三個(gè)實(shí)現(xiàn)類(lèi):
- PerThreadQueuedDispatcher:將收到的Event保存在了ThreadLoacl中,意味著多線(xiàn)程中即使使用了同一個(gè)Dispatcher實(shí)現(xiàn)收到的event都會(huì)分開(kāi)保存互不影響。
private final ThreadLocal<Queue<Event>> queue =
new ThreadLocal<Queue<Event>>() {...};
private final ThreadLocal<Boolean> dispatching =
new ThreadLocal<Boolean>() {...};
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
...
Queue<Event> queueForThread = queue.get();
queueForThread.offer(new Event(event, subscribers));
if (!dispatching.get()) {
dispatching.set(true);
try {
Event nextEvent;
while ((nextEvent = queueForThread.poll()) != null) {
while (nextEvent.subscribers.hasNext()) {
nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
}
}
} finally {
dispatching.remove();
queue.remove();
}
}
}
里面的dispatching用于避免重入事件分派,例如循環(huán)發(fā)起Event的場(chǎng)景。
- LegacyAsyncDispatcher:創(chuàng)建了一個(gè)ConcurrentLinkedQueue來(lái)保存收到的Event。多個(gè)線(xiàn)程中使用同一個(gè)LegacyAsyncDispatcher實(shí)現(xiàn)的話(huà),線(xiàn)程收到的Event會(huì)保存在一起,并共同完成所有Event的分發(fā)。
private final ConcurrentLinkedQueue<EventWithSubscriber> queue =
Queues.newConcurrentLinkedQueue();
@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
while (subscribers.hasNext()) {
queue.add(new EventWithSubscriber(event, subscribers.next()));
}
EventWithSubscriber e;
while ((e = queue.poll()) != null) {
e.subscriber.dispatchEvent(e.event);
}
}
值得注意的是由于分發(fā)也是多線(xiàn)程共同完成,這使得它將無(wú)法保證Event的順序性。
- ImmediateDispatcher:該dispatcher在事件發(fā)布時(shí)立即將事件分發(fā)給訂閱者不使用中間隊(duì)列。
Subscriber.class
對(duì)應(yīng)于Observer的抽象類(lèi),但它更像是一種封裝,Subscriber自身提供了靜態(tài)創(chuàng)建方法,將真正的Observer實(shí)現(xiàn)類(lèi)和執(zhí)行Event的方法都與EventBus封裝在了一起,通過(guò)反射實(shí)現(xiàn)了對(duì)應(yīng)于不同Observer的抽象。
static Subscriber create(EventBus bus, Object listener, Method method) {
return isDeclaredThreadSafe(method)
? new Subscriber(bus, listener, method)
: new SynchronizedSubscriber(bus, listener, method);
}
final void dispatchEvent(final Object event) {
executor.execute(new Runnable() {
@Override
public void run() {
try {
method.invoke(target, checkNotNull(event));
} catch (InvocationTargetException e) {
bus.handleSubscriberException(e.getCause(), context(event));
}
}
});
}
SynchronizedSubscriber:在dispatchEvent()方法上加了synchronized同步鎖,如果正在的Observer方法是線(xiàn)程不安全的話(huà)就需要用到此類(lèi)。他是通過(guò)@AllowConcurrentEvents注解來(lái)判斷的,這里就不多講了。
SubscriberRegistry.class
維護(hù)了Subscriber與Event的對(duì)應(yīng)關(guān)系,對(duì)EventBus進(jìn)行了解耦,使EventBus職責(zé)單一。
private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
Maps.newConcurrentMap();
- register/unregister/getSubscribers:這三個(gè)都是維護(hù)Subscriber與Event的對(duì)應(yīng)關(guān)系的基本方法,這里就不多講了,他們只會(huì)在EventBus中被調(diào)用。
- findAllSubscribers:這是一個(gè)private方法,他在register/unregister中被調(diào)用,之所以單獨(dú)拿出來(lái),主要是想說(shuō)明一下,他也是基于Annotation來(lái)實(shí)現(xiàn)的。在register時(shí),我們只會(huì)傳入Observer類(lèi),但Observer類(lèi)需要訂閱哪個(gè)Event,Event到底又需要調(diào)用哪個(gè)方法,都是在這個(gè)方法中能通過(guò)對(duì)@Subscriber找個(gè)Annotation的讀取和method的反射。
private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
Class<?> clazz = listener.getClass();
for (Method method : getAnnotatedMethods(clazz)) {
Class<?>[] parameterTypes = method.getParameterTypes();
Class<?> eventType = parameterTypes[0];
methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
}
return methodsInListener;
}
最后多提一句,@Subscriber標(biāo)注的那些Method都是事先通過(guò)'getAnnotatedMethodsNotCached' 方法獲取,保存在了一個(gè)LoadingCache中的。由于和EventBus的機(jī)制沒(méi)有太大關(guān)系,這里就不展開(kāi)了。
private static final LoadingCache<Class<?>, ImmutableList<Method>> subscriberMethodsCache =
CacheBuilder.newBuilder().weakKeys()
.build(new CacheLoader<Class<?>, ImmutableList<Method>>() {
@Override
public ImmutableList<Method> load(Class<?> concreteClass) throws Exception {
return getAnnotatedMethodsNotCached(concreteClass);
}
});
總結(jié)
Guava的EventBus以EventBus類(lèi)為中心,對(duì)于Event的發(fā)布、訂閱者的管理、異常的處理都提供了專(zhuān)門(mén)的實(shí)現(xiàn)類(lèi),流程非常清楚。而且基于Annotation掃描綁定的方式會(huì)使代碼非常的簡(jiǎn)潔。但由于這種方式,在EventBus中對(duì)于事件類(lèi)型和事件參數(shù)等等并不能提供很好的支撐,而且由于基本所有的類(lèi)都是default權(quán)限的,這使得擴(kuò)展異常的艱難T~T
Classfier擴(kuò)展
待續(xù)...