
前言
在之前的文章 Android 注解系列之APT工具(三) 中,我們介紹了 APT 技術(shù)的及其使用方式,也提到了一些知名的開源框架如 Dagger2、ButterKnife、EventBus 都使用了該技術(shù)。為了讓大家更好的了解 APT 技術(shù)的使用,在接下來的文章中我將會著重帶領(lǐng)大家來了解 EventBus 中 APT 技術(shù)的使用,在了解該知識之前,需要我們對 EventBus 內(nèi)部原理較為熟悉,如果你已經(jīng)熟悉其內(nèi)部機制了,可以跳過該篇文章,直接閱讀 Android 注解系列之EventBus3 “加速引擎“(五)。
閱讀該篇文章,我們能夠?qū)W到如下知識點:
- EventBus3 內(nèi)部原理
- EventBus3 訂閱與發(fā)送消息原理
- EventBus3 線程切換的原理
- EventBus3 粘性事件的處理
整篇文章結(jié)合 EventBus 3.1.1 版本進(jìn)行講解。
EventBus 簡介
EventBus 對于 Android 程序員來說應(yīng)該不是很陌生,它是基于觀察者模式的事件發(fā)布/訂閱框架,我們常常用它來實現(xiàn)不同組件的通訊,后臺線程通信等。

雖然 EventBus 非常簡單好用,但是還是會因為 EventBus 滿天飛,使程序代碼結(jié)構(gòu)非常混亂,難以測試和追蹤。即使 EventBus 有很多詬病,但仍然不影響我們?nèi)W(xué)習(xí)其中的原理與編程思想~
大概流程
在了解 EventBus 內(nèi)部原理之前,我們先了解一下 EventBus 框架的一個大概流程。如下圖所示:

上圖中
綠色為訂閱流程,紅色為發(fā)送事件流程,大家可以結(jié)合上圖,來理解源碼。
在上圖中我們在 A.java 中訂閱了事件 AEvent,在 B.java 中訂閱了事件 AEvent 與 BEvent,下面我們來分析 EventBus 中注冊與事件發(fā)送的兩個流程,在介紹兩個流程之前,先介紹一下 Subscription 與 SubscriberMethod 中所包含的內(nèi)容。
Subscription 類中包含以下內(nèi)容:
- 當(dāng)前注冊對象
- 對應(yīng)訂閱方法的封裝對象 SubscriberMethod
SubscriberMethod 類中包含以下內(nèi)容:
- 包含
@Subscribe注解的方法的 Method (java.lang.reflect包下的對象)。 -
@Subscribe注解中設(shè)置的線程模式 ThreadMode - 方法的注冊的事件類型的 Class 對象
-
@Subscribe中設(shè)置的優(yōu)先級 priority -
@Subscribe中設(shè)置事件是否是粘性事件 sticky
注冊流程
當(dāng)我們通過調(diào)用 EventBus.register() 注冊 A、B 兩個對象時,EventBus 會做以下幾件事件:
- 通過內(nèi)部的
SubscriberMethodFinder來獲取 A、B類中含有@Subscribe注解的方法,并將該注解中的內(nèi)容與對應(yīng)方法封裝為SubscriberMethod對象。然后再將當(dāng)前訂閱對象與對應(yīng)的SubscriberMethod再封裝為Subscription對象。 - 將所有的
Subscription放在名為subscriptionsByEventType類型為Map<Class<?>, CopyOnWriteArrayList<Subscription>>數(shù)據(jù)結(jié)構(gòu)(key 為事件類型的 Class 對象) 中,因為Subscription對象內(nèi)部包含SubscriberMethod, 那么就能知道訂閱的事件類型,所以我們可以根據(jù)事件類型來區(qū)分Subscription,又因為相同事件可以被不同訂閱者中的方法來訂閱,所以相同類型的事件也就以對應(yīng)不同的Subscription。 - 將訂閱者中的所有訂閱的事件都封裝在名為
typesBySubscriber類型為Map<Object, List<Class<?>>>數(shù)據(jù)結(jié)構(gòu)(key 為訂閱對象,value 為該對象訂閱的事件類型 Class 對象)。該集合主要用于取消訂閱,在下文中我們會進(jìn)行介紹。
在整個注冊流程中,最主要的流程就是 EventBus 通過 SubscriberMethodFinder 去獲取類中包含 @Subscribe 注解的訂閱方法。在 EventBus 3.0 之前該流程一直都是通過反射的方式去獲取。在 3.0 及以后版本,EventBus 采用了 APT 技術(shù),對 SubscriberMethodFinder 查找訂閱方法流程進(jìn)行了優(yōu)化,使其能在 EventBus.register() 方法調(diào)用之前就能知道相關(guān)訂閱事件的方法,這樣就減少了程序在運行期間使用反射遍歷獲取方法所帶來的時間消耗。在下文中我們也會指出具體的優(yōu)化點。
事件發(fā)送流程
知道了 EventBus 的注冊過程,再來了解事件的發(fā)送流程就非常簡單了。因為我們已經(jīng)通過 subscriptionsByEventType 存儲事件對應(yīng)的 Subscription,只要找到了 Subscription ,那么我們就能從 Subscription 拿到訂閱事件的對象 subscriber ,以及對應(yīng)的訂閱方法 Method (java.lang.reflect 包下的對象)。然后通過反射調(diào)用:
Subscription 內(nèi)部包含訂閱者及 SubscriberMethod(內(nèi)部包含訂閱方法 Method )
method.invoke(subscription.subscriber, event)
通過上述方法,就能將對應(yīng)事件發(fā)送到相關(guān)訂閱者了。當(dāng)然這里只是簡單的介紹了事件是如何發(fā)送到相關(guān)訂閱者的。關(guān)于 EventBus 中粘性事件的處理,線程如何切換。會在下文中進(jìn)行詳細(xì)介紹。
源碼分析
在了解了 EventBus 的內(nèi)部大概流程后,現(xiàn)在我們通過源碼來更深層次的了解其內(nèi)部實現(xiàn)。還是從訂閱過程與事件的發(fā)送兩個過程進(jìn)行講解。
訂閱過程源碼分析
EventBus 的訂閱入口為 register() 方法,如下所示:
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
//流程1:獲取對應(yīng)類中所有的訂閱方法
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
//流程2:實際訂閱
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
在該方法中,主要涉及到 SubscriberMethodFinder 查找方法與實際訂閱兩個流程,下面我們會對這兩個流程進(jìn)行介紹。
SubscriberMethodFinder 查找方法流程
在該流程中,主要通過 SubscriberMethodFinder 去獲取訂閱者中所有的 SubscriberMethod ,我們先看 findSubscriberMethods() 方法:
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
//從緩存中獲取訂閱者中的訂閱方法,如果有則讀緩存,如果沒有進(jìn)行查找
List<SubscriberMethod> subscriberMethods = (List)METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
} else {
if (this.ignoreGeneratedIndex) {//如果忽略索引類,則使用反射。
subscriberMethods = this.findUsingReflection(subscriberClass);
} else {//否則使用索引類
subscriberMethods = this.findUsingInfo(subscriberClass);
}
//如果訂閱者沒有訂閱方法,則拋出異常
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass + " and its super classes have no public methods with the @Subscribe annotation”);
} else {
//將對應(yīng)類中的訂閱方法,添加到緩存中,提高效率,方便下次查找
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
}
該方法的邏輯也非常簡單,為如下幾個步驟:
- 步驟1:先從緩存(
METHOD_CACHE)中獲取訂閱者對應(yīng)的SubscriberMethod(訂閱方法),如果有則從緩存中取。 - 步驟2:如果緩存中沒有,則通過布爾變量
ignoreGeneratedIndex,來判斷是直接使用反射獲取訂閱方法,還是通過索引類(EventBus 3.0 使用APT 增加的類)來獲取。因為ignoreGeneratedIndex默認(rèn)值為 false ,則默認(rèn)會走findUsingInfo()方法 - 步驟3:將步驟2中獲得的訂閱方法集合,存儲到緩存中,方便下一次獲取,提高效率。
因為默認(rèn)會走 findUsingInfo() 方法,我們繼續(xù)查看該方法:
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
//步驟1:構(gòu)建了查詢狀態(tài)緩存池,最多緩存4個類的查詢狀態(tài)
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
//步驟2,獲取查找狀態(tài)對應(yīng)的訂閱信息,??這里EventBus 3.0 使用了索引類,
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
//將訂閱者的所有的訂閱方法添加到FindState的集合中
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {//步驟3:如果訂閱信息為null,則通過反射來獲取類中所有的方法
findUsingReflectionInSingleClass(findState);
}// 繼續(xù)查找父類的方法
findState.moveToSuperclass();
}
//步驟4,獲取findState中的所有方法,并清空對象池
return getMethodsAndRelease(findState);
}
- 步驟1:創(chuàng)建與訂閱者相關(guān)的 FindState 對象。會從 FinState 對象緩存池(最大為4個)中獲取,一個訂閱者對象對應(yīng)一個FindState,一個訂閱者對象對應(yīng)一個或多個訂閱方法。
- 步驟2:通過 FindState 對象 調(diào)用
getSubscriberInfo()方法去獲取訂閱者相關(guān)的訂閱方法信息。該方法使用了 APT 技術(shù),構(gòu)建了EventBus的索引類。關(guān)于具體的優(yōu)化,會在下篇文章中 Android 注解系列之EventBus3 “加速引擎“(五)進(jìn)行描述,大家這里有個印象就好了。 - 步驟3:如果通過步驟2獲取不到訂閱方法信息,則通過
反射來獲取類中的所有的訂閱方法。并將獲取的方法,封裝到 FindState 中的 subscriberMethods 集合中去。 - 步驟4:將 FindState 對象中的 subscriberMethods 集合返回。
在上述方法中,我們需要注意的是,如果當(dāng)前訂閱著沒有相關(guān)的訂閱方法,那么會依次遍歷其父類的訂閱方法。還有一個知識點,就是該方法中 FindState 使用了 對象緩存池,不會每次注冊一個訂閱者就創(chuàng)建 一個FindState 對象。這樣就節(jié)約了內(nèi)存的使用。
關(guān)于索引類的知識點,會在下篇文章中進(jìn)行介紹,這里我們直接查看 findUsingReflectionInSingleClass() 方法:
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
//獲取當(dāng)前訂閱者中的所有的方法
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
//獲取該類的所有public 方法 包括繼承的公有方法
methods = findState.clazz.getMethods();
findState.skipSuperClasses = true;
}
//循環(huán)遍歷所有的方法,通過相關(guān)注解找到相應(yīng)的訂閱方法。
for (Method method : methods) {
int modifiers = method.getModifiers();
//滿足修飾符為 public 并且非抽象、非靜態(tài)
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
//找到參數(shù)為1,且該方法包含Subscrile注解的方法
if (parameterTypes.length == 1) {
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
if (findState.checkAdd(method, eventType)) {
// 創(chuàng)建訂閱方法對象,并將對應(yīng)方法對象,事件類型,線程模式,優(yōu)先級,粘性事件封裝到SubscriberMethod對象中。
ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract”);
}
}
}
該方法的邏輯也非常簡單,通過獲取 FindState 中的訂閱者的 Class 對象,然后通過反射獲取所有包含 @Subscribe 注解且參數(shù)為 1 的 Method 對象,并讀取到該參數(shù)的類型EventType,接著讀取注解中的 thredMode、priority、sticy,最后將這些數(shù)據(jù)都統(tǒng)一分裝到新建的SubscriberMethod 對象中,最后將該對象添加到 FindState 中的 subscriberMethods 集合中去。
實際訂閱方法 subscribe
當(dāng)找到訂閱者所有的方法集合后,最終會遍歷調(diào)用 subscribe() 方法,查看該方法:
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
//步驟1,將每個訂閱方法和訂閱者封裝成Subscription
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
//步驟2,獲取對應(yīng)事件中所有的 Subscription,判斷是否重復(fù)添加
CopyOnWriteArrayList<Subscription> subscriptions = (CopyOnWriteArrayList)this.subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList();
this.subscriptionsByEventType.put(eventType, subscriptions);
} else if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event " + eventType);
}
//步驟3,根據(jù)優(yōu)先級,將當(dāng)前新封裝的Subscription對象添加到subscriptionsByEventType中去
int size = subscriptions.size();
for(int i = 0; i <= size; ++i) {
if (i == size || subscriberMethod.priority > ((Subscription)subscriptions.get(i)).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
//步驟4,將當(dāng)前訂閱者中與當(dāng)前訂閱者所訂閱的事件類型,添加到typesBySubscriber中去
List<Class<?>> subscribedEvents = (List)this.typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList();
this.typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
//步驟5,如果該方法有訂閱了粘性事件,則從stickyEvents中獲取相應(yīng)粘性事件,并發(fā)送
if (subscriberMethod.sticky) {
if (eventInheritance) {
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = this.stickyEvents.get(eventType);
this.checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
在上述方法中主要流程如下:
- 步驟1,將每個訂閱方法和訂閱者封裝成 Subscription。
- 步驟2,獲取對應(yīng)事件中所有的 Subscription ,判斷是否重復(fù)添加。
- 步驟3,根據(jù)
優(yōu)先級,將當(dāng)前新封裝的 Subscription 對象添加到 subscriptionsByEventType 中去。(設(shè)置了優(yōu)先級后,EvenBus 就可以按照優(yōu)先級順序,將事件發(fā)送給訂閱者) - 步驟4,將當(dāng)前訂閱者中與當(dāng)前訂閱者所訂閱的事件類型,添加到 typesBySubscriber 中去。
- 步驟5,如果該方法有訂閱了粘性事件,則從 stickyEvents 中獲取相應(yīng)粘性事件,并發(fā)送。
再結(jié)合我們最開始所畫的 EventBus 大致流程,該方法其實就做了下圖紅色虛線框中的事:

關(guān)于粘性事件的知識點,需要我們了解事件的發(fā)送流程,我們會在下文進(jìn)行詳細(xì)介紹。
事件發(fā)送流程源碼分析
事件的發(fā)送,主要分為簡單事件與粘性事件,分別對應(yīng)方法為 post() 與 postSticky() 兩個方法。這里我們先看簡單事件的發(fā)送,代碼如下:
簡單事件的發(fā)送
public void post(Object event) {
//步驟1,獲取當(dāng)前線程中獨立擁有的PostingThreadState,并從中獲取事件隊列(eventQueue),將發(fā)送的事件添加到該隊列中
EventBus.PostingThreadState postingState = (EventBus.PostingThreadState)this.currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
//步驟2:判斷當(dāng)前線程是否正在分發(fā)事件,如果不是,則循環(huán)遍歷事件隊列中的事件,并將事件分發(fā)出去,直到當(dāng)前事件隊列空為止
if (!postingState.isPosting) {
postingState.isMainThread = this.isMainThread();
postingState.isPosting = true;
//如果當(dāng)前分發(fā)事件狀態(tài)為取消,則拋出異常
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset”);
}
//循環(huán)遍歷事件隊列,并將消息發(fā)送出去
try {
while(!eventQueue.isEmpty()) {
this.postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
在 EventBus 中會為個每調(diào)用 post() 方法的線程都會創(chuàng)建一個唯一的 PostingThreadState 對象,用于記錄當(dāng)前線程存儲發(fā)送消息與發(fā)送的狀態(tài),其內(nèi)部結(jié)構(gòu)如下所示:

PostingThreadState 使用了 ThreadLocal 不熟悉 ThreadLocal 的小伙伴,可以查看該篇文章:Android Handler機制之ThreadLocal
也就是說當(dāng)我們調(diào)用 EventBus.post() 方法,其實是從 EventQueue 隊列中取出消息,然后通過調(diào)用 postSingleEvent()方法 來實際發(fā)送消息,該方法代碼如下所示:
private void postSingleEvent(Object event, EventBus.PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
//步驟1:??判斷否事件傳遞發(fā)送
if (this.eventInheritance) {
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for(int h = 0; h < countTypes; ++h) {
Class<?> clazz = (Class)eventTypes.get(h);
//??循環(huán)遍歷遍歷事件并發(fā)送
subscriptionFound |= this.postSingleEventForEventType(event, postingState, clazz);
}
} else {
//步驟2:??如果不支持事件的傳遞,那么這里開始發(fā)送事件。
subscriptionFound = this.postSingleEventForEventType(event, postingState, eventClass);
}
//步驟3:如果沒有找到訂閱的方式,提示用戶
if (!subscriptionFound) {
if (this.logNoSubscriberMessages) {
this.logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (this.sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class && eventClass != SubscriberExceptionEvent.class) {
this.post(new NoSubscriberEvent(this, event));
}
}
}
該方法主要為如下三個步驟:
- 步驟1:通過布爾變量
eventInheritance判斷是否支持事件是否傳遞發(fā)送,如果支持,那么通過lookupAllEventTypes()方法獲得發(fā)送事件祖先類及其接口。然后通過postSingleEventForEventType()方法,將它們都發(fā)送出去, - 步驟2:步驟1返回 false 那么就直接使用
postSingleEventForEventType()方法發(fā)送事件。 - 步驟3:如果沒有找到相關(guān)的訂閱方法,那么就提示用戶沒有相關(guān)的訂閱方法。
布爾變量
eventInheritance默認(rèn)為false,我們可以通過 EventBusBuilder 來配置該變量的值。
那什么是事件的傳遞發(fā)送呢?我們來查看 lookupAllEventTypes()方法:
private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) {
synchronized (eventTypesCache) {
List<Class<?>> eventTypes = eventTypesCache.get(eventClass);
if (eventTypes == null) {
eventTypes = new ArrayList<>();
Class<?> clazz = eventClass;
//??獲取該類所有祖先類及其接口
while (clazz != null) {
eventTypes.add(clazz);
addInterfaces(eventTypes, clazz.getInterfaces());
clazz = clazz.getSuperclass();
}
eventTypesCache.put(eventClass, eventTypes);
}
return eventTypes;
}
}
//將接口添加到集合中
static void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) {
for (Class<?> interfaceClass : interfaces) {
if (!eventTypes.contains(interfaceClass)) {
eventTypes.add(interfaceClass);
addInterfaces(eventTypes, interfaceClass.getInterfaces());
}
}
}
在該方法中,會獲取發(fā)送事件的所有的祖先類及其接口,最后將他們以集合的方式返回,在 postSingleEvent 方法中拿到這個集合之后,那么就會將集合中所有的數(shù)據(jù)都發(fā)送出去。這樣做會造成什么效果呢?如果當(dāng)前我們的繼承體系為 Aevent -> Bevent -> Cevent ( -> 表示繼承),那么通過發(fā)送 Aevent,那么其他所有訂閱過 Bevent 及 Cevent 的訂閱者都會收到消息。
我們繼續(xù)查看 postSingleEventForEventType() 方法,代碼如下所示:
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
//??從緩存中拿取之前存取的 Subscription
synchronized (this) {
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted = false;
try {
//??這里找到相應(yīng)的方法后,開始切換線程了。
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
該方法的邏輯非常簡單,就是從我們之前的 subscriptionsByEventType 集合中拿到存儲的 Subscription,并根據(jù)當(dāng)前線程狀態(tài)設(shè)置關(guān)聯(lián)的 PostingState 中 canceled 、subscription 、isMainThread 等屬性值,然后通過 postToSubscription() 方法來真正的執(zhí)行事件的傳遞。
到目前為止整個流程如下所示:

postToSubscription()
postToSubscription() 方法是真正實際將事件傳遞到訂閱者的代碼。查看該方法:
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
從上述方法中,我們拿到 Subscription 中成員變量 SubscriberMethod 中的線程模式 threadMode 來判斷訂閱方法需要執(zhí)行的線程。如果當(dāng)前線程模式是 POSTING ,那么默認(rèn)就直接調(diào)用 invokeSubscriber() 方法。具體代碼如下所示:
void invokeSubscriber(Subscription subscription, Object event) {
try {
//??直接通過反射調(diào)用訂閱方法。
subscription.subscriberMethod.method.
invoke(subscription.subscriber, event);
}
//省略部分代碼
}
如果為其他模式,那么會根據(jù)相應(yīng)的 poster 調(diào)用 enqueue() 方法來控制執(zhí)行訂閱方法所在的線程。在 EventBus 中提供了如下三個 Poster 來控制訂閱方法的所運行的線程。
- HandlerPoster (切換到主線程)
- BackgroundPoster (切換到后臺線程)
- AsyncPoster (切換到后臺線程)
以上三個 Poster 都實現(xiàn)了 Poster 接口,且內(nèi)部都維護(hù)了一個名為 PendingPostQueue 的隊列,該隊列以 PendingPost 為存儲單元,其中 PendingPost 中存儲內(nèi)容為我們根據(jù)當(dāng)前事件所找到的 Subscription 與當(dāng)前所發(fā)生的事件。
那么結(jié)合整個流程,我們能得到下圖:

針對上圖,再進(jìn)行一下簡單的說明。
- 當(dāng)我們調(diào)用
EventBus.post()發(fā)送簡單事件時,會將該事件放入與線程相關(guān)的PostingThreadState的EventQueue中。 - 接著會從之前在
subscriptionsByEventType集合中找到與該事件相關(guān)的Subscription。 - 接著將找到的
Subscription與當(dāng)前所發(fā)送的事件都封裝為PendingPost并添加到對應(yīng)Poster中的PendingPostQueue隊列中。 - 最后對應(yīng)的
Poster從隊列中取出相應(yīng)的PendingPost,通過反射調(diào)用訂閱者的訂閱方法。
其中訂閱方法執(zhí)行線程的規(guī)則,如下所示:

線程的切換
在上節(jié)中,訂閱者的訂閱方法執(zhí)行的所在線程,是由 EventBus 中內(nèi)部的三個 Poster來實現(xiàn)的。那下面我們就來看看這三個 Poster 的實現(xiàn)。
HandlerPoster
public class HandlerPoster extends Handler implements Poster {
private final PendingPostQueue queue;
private final int maxMillisInsideHandleMessage;
private final EventBus eventBus;
private boolean handlerActive;
//默認(rèn)會傳遞主線程的Looper
protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
//??這里將PedingPost放入PendingPostQueue中,然后發(fā)送消息
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message”);
}
}
}
}
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
while (true) {
//??從隊列中取出最近的PendingPost
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
//??直接通過反射,調(diào)用訂閱者的訂閱方法。
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message”);
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
}
HanderPoster 中的邏輯非常容易理解,繼承 Handler,并在初始化的時候默認(rèn)會關(guān)聯(lián) 主線程 的 Looper,這樣該 Handler 所發(fā)送的消息將會在主線程中被處理。
分析一下 HanderPoster 中主要的步驟:
- 在調(diào)用
enqueue()方法時,會將之前我們封裝好的PendingPost放入PendingPostQueue隊列中,同時發(fā)送消息。 - 在
handleMessage()方法中,從PendingPostQueue隊列中取出最近的PendingPost,然后直接通過eventBus.invokeSubscriber()反射執(zhí)行訂閱者的訂閱方法。
BackgroundPoster
final class BackgroundPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
private volatile boolean executorRunning;
BackgroundPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
//使用線程池來提交任務(wù),該方法是線程安全的。
synchronized (this) {
queue.enqueue(pendingPost);
if (!executorRunning) {
executorRunning = true;
eventBus.getExecutorService().execute(this);
}
}
}
@Override
public void run() {
try {
try {
while (true) {
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
executorRunning = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
executorRunning = false;
}
}
}
BackgroundPoster 與 HandlerPoster 最大的不同是其內(nèi)部使用了線程池,并且該類也實現(xiàn)了 Runnable 接口。
在 BackgroundPoster 中的 enqueue() 方法中,默認(rèn)會使用 EventBus 中默認(rèn)的線程池 DEFAULT_EXECUTOR_SERVICE來提交任務(wù) ,該線程池的聲明如下:
private final static ExecutorService DEFAULT_EXECUTOR_SERVICE = Executors.newCachedThreadPool();
CachedThreadPool 適用于大量的且耗時較少的任務(wù)
同樣的,BackgroundPoster 也就是通過反射調(diào)用訂閱者的訂閱方法,只不過不同的是它是放入線程池中的非主線程中進(jìn)行執(zhí)行。
需要注意的是不管是在任何線程中發(fā)送消息,EventBus 總是線程安全的。從 BackgroundPoster 的代碼中我們就可以看出。
AsyncPoster
class AsyncPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
AsyncPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
eventBus.getExecutorService().execute(this);
}
@Override
public void run() {
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available”);
}
eventBus.invokeSubscriber(pendingPost);
}
}
這里就不對 AsyncPoster 進(jìn)行講解了,相信大家根據(jù)之前的內(nèi)容也能理解。
粘性事件的發(fā)送
現(xiàn)在我們還剩最后一個知識點了,就是粘性事件的發(fā)送。在 EventBus 中發(fā)送粘性事件,我們需要調(diào)用方法 postSticky() 方法,代碼如下所示:
public void postSticky(Object event) {
synchronized (stickyEvents) {
stickyEvents.put(event.getClass(), event);
}
post(event);
}
從代碼中,我們不難看出,粘性的事件發(fā)送與簡單事件的發(fā)送唯一的區(qū)別就是將發(fā)送的事件添加到 stickyEvents 集合中去了。那為什么要這么做呢?在了解具體的原因之前,我們需要了解粘性事件的概念。
粘性事件的概念:當(dāng)訂閱者還沒有訂閱相關(guān)事件 A 時,程序已經(jīng)發(fā)送了一些事件 A,按照正常的邏輯,當(dāng)訂閱者開始訂閱事件 A 時,是接受不到程序已經(jīng)發(fā)送過的事件 A ,但是我們希望接受到那些已經(jīng)發(fā)送過的消息。這種已經(jīng)過時,但又被重新接受的事件,我們稱之為粘性事件。
那么根據(jù)粘性事件的思想,我們需要將已經(jīng)發(fā)送的事件存儲下來,并在粘性事件的訂閱的過程中進(jìn)行特別的處理,也就是在 EventBus.register() 方法中進(jìn)行處理。還記得之前注冊過程中的 subscribe() 方法嗎?該方法內(nèi)部對粘性事件進(jìn)行了特殊的處理,代碼如下所示:
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
//省略部分代碼
//判斷是否是粘性事件
if (subscriberMethod.sticky) {
//??支持事件傳遞的粘性事件
if (eventInheritance) {
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
//??開始執(zhí)行訂閱方法。
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
在上述邏輯中,會從 stickyEvents 中獲取之前發(fā)送的事件,然后調(diào)用 checkPostStickyEventToSubscription()。該方法代碼如下所示:
private void checkPostStickyEventToSubscription(Subscription newSubscription, Object stickyEvent) {
if (stickyEvent != null) {
postToSubscription(newSubscription, stickyEvent, isMainThread());
}
}
又因為checkPostStickyEventToSubscription() 方法內(nèi)部會調(diào)用 postToSubscription() 方法。那么最終訂閱者就能接受到之前發(fā)送的事件,并執(zhí)行相應(yīng)的訂閱方法啦。
最后
EventBus 主要的流程到現(xiàn)在已經(jīng)講完了。從實際的代碼中,我們不僅能看到其良好的代碼規(guī)范以及封裝思想。還能看到該框架對性能的優(yōu)化,尤其是添加了一些必要的緩存。我相信以上的這些點,都是值得我們借鑒與參考的。在接下來的文章中我們會講解 EventBus 中的 “加速引擎" 索引類。有興趣的小伙伴可以繼續(xù)關(guān)注。