EventBus源碼解析(二)
源碼版本:
- EventBus:3.3.1
導航:
- 三方庫-EventBus源碼解析(一)
- 三方庫-EventBus源碼解析(二)
- 更多的文章看這里:主頁
EventBus的post
EventBus --> post()
public void post(Object event) {
// 1、獲取當前線程的PostingThreadState,currentPostingThreadState是一個ThreadLocal對象,保證每個線程都有一個PostingThreadState對象,保證了線程安全。
PostingThreadState postingState = currentPostingThreadState.get();
// 2、獲取當前線程的事件隊列
List<Object> eventQueue = postingState.eventQueue;
// 3、將要發(fā)送的事件加入到當前線程的事件隊列中
eventQueue.add(event);
// 4、判斷是否正在發(fā)送事件,如果正在發(fā)送中,則不再執(zhí)行里面的邏輯,因為事件已經(jīng)添加到eventQueue中,它會繼續(xù)走里面while循環(huán)的判斷邏輯。
if (!postingState.isPosting) {
// 5、事件未被發(fā)送中,則初始化PostingThreadState,遍歷事件隊列發(fā)送。
// 6、判斷當前線程是否是主線程
postingState.isMainThread = isMainThread();
// 7、標記當前線程正在發(fā)送中
postingState.isPosting = true;
// 8、如果當前線程已經(jīng)取消,則拋出異常。
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {eventQueue.remove(0)
// 9、遍歷事件隊列,只要有事件,就一直發(fā)送。
while (!eventQueue.isEmpty()) {
// 10、發(fā)送事件,eventQueue.remove(0)說明post進來的事件,會依次發(fā)送。
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
// 11、當前線程的所有事件完成,還原狀態(tài)。
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
post()方法,為事件的發(fā)送,兼容了多線程多發(fā)送多事件(多事件后面介紹)的情況。它使用ThreadLocal對象,保證每個線程都有一個PostingThreadState對象,保證了線程安全,它通過postingState.isMainThread記錄了發(fā)送線程是否是主線程,然后它會依次進行發(fā)送。
說明:
postingState.isMainThread,記錄了發(fā)送線程是否是主線程。- 步驟10,
eventQueue.remove(0),說明post()進來的事件,會依次發(fā)送。
我們再來看一下currentPostingThreadState屬性和PostingThreadState類。
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<>(); // 發(fā)送線程的事件隊列
boolean isPosting; // 發(fā)送線程是否正在發(fā)送中
boolean isMainThread; // 發(fā)送線程是否在主線程
Subscription subscription; // 發(fā)送線程正在發(fā)布的Subscription
Object event; // 發(fā)送線程正在發(fā)布的Event
boolean canceled; // 發(fā)送線程是否已取消
}
PostingThreadState類為發(fā)送線程的狀態(tài)類,狀態(tài)信息看上面注釋。currentPostingThreadState是一個ThreadLocal對象,保證每個線程都有一個PostingThreadState對象,保證了線程安全。
EventBus --> postSingleEvent
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
// 11、獲取發(fā)送事件的Class
Class<?> eventClass = event.getClass();
// 12、記錄是否找到Subscription
boolean subscriptionFound = false;
if (eventInheritance) { // eventInheritance:事件是否有繼承性,默認為true。
// 13、事件有繼承性,則發(fā)送[發(fā)送Event的所有父類以及所有父接口]的Event。
// 14、獲取到[發(fā)送Event的所有父類以及所有父接口]。
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
// 15、遍歷集合,發(fā)送單個事件。
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
// 16、發(fā)送單個事件,并判斷是否找到Subscription(只要有一個有,就代表找到)。
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
// 17、事件沒有繼承性,則發(fā)送[發(fā)送Event]的Event。
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
if (!subscriptionFound) {
// 18、沒有找到,判斷是否打印Log,或者發(fā)送NoSubscriberEvent。
if (logNoSubscriberMessages) {
// 19、沒有訂閱者時,打印異常信息。
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
// 20、沒有訂閱者時,發(fā)送NoSubscriberEvent事件。
post(new NoSubscriberEvent(this, event));
}
}
}
postSingleEvent()方法,為事件的發(fā)送,兼容了多線程單發(fā)送多事件的情況。如果事件有繼承性,則發(fā)送[發(fā)送Event的所有父類以及所有父接口]的Event(此為多事件),否則則發(fā)送[發(fā)送Event]的Event,并判斷沒有找到,是否打印Log,或者發(fā)送NoSubscriberEvent。
我們再來看一下lookupAllEventTypes方法。
EventBus --> lookupAllEventTypes
private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) {
// 使用了同步并且鎖唯一,保證了線程有序執(zhí)行。
synchronized (eventTypesCache) {
// 從緩存中獲取當前Event的已維護的所有事件(父類、父接口)集合。
List<Class<?>> eventTypes = eventTypesCache.get(eventClass);
if (eventTypes == null) {
// 集合為空,說明是第一次獲取,則創(chuàng)建集合,并維護集合,并將其添加到緩存中。
eventTypes = new ArrayList<>();
// 當前clazz默認為eventClass
Class<?> clazz = eventClass;
// 遍歷當前clazz,只要不為空,就繼續(xù)遍歷。
while (clazz != null) {
// 添加當前clazz,第一次會把eventClass添加進去。
eventTypes.add(clazz);
// 添加當前clazz的所有接口class
addInterfaces(eventTypes, clazz.getInterfaces());
// 獲取當前clazz的父類,以繼續(xù)添加。
clazz = clazz.getSuperclass();
}
// 將維護好的集合,添加到緩存。
eventTypesCache.put(eventClass, eventTypes);
}
// 最后返回此維護好的所有事件(父類、父接口)集合。
return eventTypes;
}
}
// 循環(huán)遞歸添加當前類的所有的接口class到eventTypes。
// 舉例:當前類,實現(xiàn)A、B兩個接口,A實現(xiàn)C、D接口,B實現(xiàn)E接口,C、D、E未實現(xiàn)任何接口,最終會把A、B、C、D、E接口全部添加到eventTypes中。
static void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) {
// 遍歷所有接口
for (Class<?> interfaceClass : interfaces) {
if (!eventTypes.contains(interfaceClass)) {
// 此接口未被包含,則進行添加。
eventTypes.add(interfaceClass);
// 繼續(xù)遞歸增加父接口的所有接口,使用遞歸以完成其所有父接口。
addInterfaces(eventTypes, interfaceClass.getInterfaces());
}
}
}
lookupAllEventTypes()方法,為獲取到[發(fā)送Event的所有父類以及所有父接口]。
EventBus --> postSingleEventForEventType
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
// 21、根據(jù)事件獲取所有訂閱它的訂閱者,在同步方法內(nèi)執(zhí)行,保證了HashMap的get方法線程安全。
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
// 22、有訂閱者,遍歷集合進行發(fā)送,返回true(代表已經(jīng)找到)。
// 23、遍歷所有此類型的訂閱者進行發(fā)送
for (Subscription subscription : subscriptions) {
// 24、記錄發(fā)送線程正在發(fā)布的Event
postingState.event = event;
// 25、記錄發(fā)送線程正在發(fā)布的Subscription
postingState.subscription = subscription;
// 26、是否是中斷的
boolean aborted;
try {
// 27、將事件發(fā)送給訂閱者,postingState.isMainThread為post方法記錄的。
postToSubscription(subscription, event, postingState.isMainThread);
// 28、事件是否被取消
aborted = postingState.canceled;
} finally {
// 29、重置postingState
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
// 30、事件被取消,則取消此類型的全部發(fā)布。
break;
}
}
return true;
}
// 31、無訂閱者,不進行發(fā)送,返回false(代表沒有找到)。
return false;
}
postSingleEventForEventType()方法,為事件的發(fā)送,兼容了多線程單發(fā)送單事件的情況。根據(jù)事件獲取所有訂閱它的Subscription(持有訂閱者對象、訂閱方法對象),然后進行遍歷依次發(fā)送。
EventBus --> postToSubscription
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
// 32、根據(jù)訂閱方法ThreadMode調(diào)用訂閱方法
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
// 33、發(fā)布線程(默認)
// 34、調(diào)用反射直接通知將事件
invokeSubscriber(subscription, event);
break;
case MAIN:
// 35、主線程
if (isMainThread) {
// 36、在Android上并且是主線程,或者不在Android上,調(diào)用反射直接通知。
invokeSubscriber(subscription, event);
} else {
// 37、在Android上并且不是主線程,將事件添加到主線程隊列中,在主線程調(diào)用。
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
// 38、主線程(有序)
if (mainThreadPoster != null) {
// 39、在Android上,始終將事件添加到主線程隊列中,在主線程調(diào)用。
mainThreadPoster.enqueue(subscription, event);
} else {
// 40、不在Android上,直接通過反射調(diào)用。
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
// 41、后臺線程
if (isMainThread) {
// 42、在Android上并且是主線程,或者不在Android上,則將事件添加到后臺線程隊列中,在后臺線程調(diào)用。
backgroundPoster.enqueue(subscription, event);
} else {
// 43、在Android上并且不是主線程,直接通過反射調(diào)用。
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
// 44、異步線程
// 45、將事件添加到異步線程隊列中,在異步線程調(diào)用。
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
postToSubscription()方法,為事件的通知,兼容了多線程單發(fā)送單事件的情況。根據(jù)訂閱方法的ThreadMode調(diào)用訂閱方法,如果需要線程切換,則切換線程進行調(diào)用;否則,直接調(diào)用。
說明:
mainThreadPoster(即HandlerPoster)、backgroundPoster、asyncPoster這3個Poster,最終都會調(diào)用invokeSubscriber()方法進行通知,詳細介紹看后面的-Poster。EventBus的注冊,如果是粘性方法并且已經(jīng)發(fā)送了此事件,也會調(diào)用postToSubscription()方法進行通知。
EventBus --> invokeSubscriber
void invokeSubscriber(Subscription subscription, Object event) {
try {
// 46、反射調(diào)用訂閱者對象的訂閱方法,并傳入event參數(shù)。
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
// 47、處理調(diào)用訂閱方法異常
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
private void handleSubscriberException(Subscription subscription, Object event, Throwable cause) {
if (event instanceof SubscriberExceptionEvent) {
// SubscriberExceptionEvent事件,判斷調(diào)用訂閱方法異常時是否打印Log(不發(fā)送事件防止遞歸)。
if (logSubscriberExceptions) {
// 調(diào)用訂閱方法異常時,打印異常信息。
// Don't send another SubscriberExceptionEvent to avoid infinite event recursion, just log
logger.log(Level.SEVERE, "SubscriberExceptionEvent subscriber " + subscription.subscriber.getClass()
+ " threw an exception", cause);
SubscriberExceptionEvent exEvent = (SubscriberExceptionEvent) event;
logger.log(Level.SEVERE, "Initial event " + exEvent.causingEvent + " caused exception in "
+ exEvent.causingSubscriber, exEvent.throwable);
}
} else {
// 普通事件,判斷調(diào)用訂閱方法異常時是否拋出異常、打印Log、發(fā)送SubscriberExceptionEvent。
if (throwSubscriberException) {
// 調(diào)用訂閱方法異常時,拋出SubscriberException異常。
throw new EventBusException("Invoking subscriber failed", cause);
}
if (logSubscriberExceptions) {
// 調(diào)用訂閱方法異常時,打印異常信息。
logger.log(Level.SEVERE, "Could not dispatch event: " + event.getClass() + " to subscribing class "
+ subscription.subscriber.getClass(), cause);
}
if (sendSubscriberExceptionEvent) {
// 調(diào)用訂閱方法異常時,發(fā)送SubscriberExceptionEvent事件。
SubscriberExceptionEvent exEvent = new SubscriberExceptionEvent(this, cause, event,
subscription.subscriber);
post(exEvent);
}
}
}
invokeSubscriber()方法,為反射調(diào)用訂閱者對象的訂閱方法,并傳入event參數(shù)。handleSubscriberException()方法,為處理調(diào)用訂閱方法異常,判斷是否拋出異常、打印Log、發(fā)送SubscriberExceptionEvent。
線程模型
POSTING(發(fā)布線程)
事件的訂閱和事件的發(fā)布處于同一線程(步驟34)。這是默認設(shè)置。事件傳遞意味著最少的開銷,因為它完全避免了線程切換。因此,對于已知在非常短的時間內(nèi)完成而不需要主線程的簡單任務(wù),這是推薦的模式。使用此模式的事件處理程序必須快速返回,以避免阻塞可能是主線程的發(fā)布線程。
說明:
- 事件的發(fā)布(可能會阻塞Android主線程)
- 事件的接收和發(fā)布在同一個線程,由于沒有使用線程池,所以發(fā)布一定會阻塞線程。
- 事件的接收(可能會阻塞Android主線程)
- 事件的接收和發(fā)布在同一個線程,由于可能是主線程發(fā)送,所以接收可能會阻塞主線程,所以接收里不推薦執(zhí)行耗時操作。
MAIN(主線程)
在Android上,訂閱者將在Android的主線程(UI線程)中被調(diào)用。如果發(fā)布線程是主線程,則將直接調(diào)用訂閱方法,從而阻塞發(fā)布線程(步驟36)。否則,事件將排隊等待傳遞(非阻塞)(步驟37)。使用此模式的訂閱者必須快速返回以避免阻塞主線程。如果不是在Android上,則行為與POSTING相同(步驟36)。
說明:
- 事件的發(fā)布(一定會阻塞Android主線程)
- 在
Android上并且是主線程,或者不在Android上,則行為與POSTING相同(步驟36),所以發(fā)布一定會阻塞線程。- 在
Android上并且不是主線程,將事件插入到主線程隊列中(步驟37),由于使用了線程池,所以發(fā)布一定不會阻塞線程。
- 事件的接收(一定會阻塞Android主線程)
- 事件的接收在主線程,所以接收一定會阻塞主線程,所以接收里不能執(zhí)行耗時操作。
- 事件的有序性(可能不是有序的)
- 在
Android上,如果先非主線程發(fā)布(排隊等待,可能會等待很長時間才會執(zhí)行到自己),后主線程發(fā)布(直接調(diào)用),所以在Android上事件它可能是不是有序的。- 在非
Android上,由于是直接調(diào)用,所以事件一定是有序的。
MAIN_ORDERED(主線程-有序)
在Android上,訂閱者將在Android的主線程(UI線程)中被調(diào)用。與MAIN不同的是,事件總是排隊等待傳遞(步驟39)。這確保了post調(diào)用是非阻塞的。
說明:
- 事件的發(fā)布(一定不會阻塞Android主線程)
- 在
Android上,始終將事件添加到主線程隊列中,在主線程調(diào)用,由于使用Handler發(fā)布,所以發(fā)布一定不會阻塞線程。- 不在
Android上,則行為與POSTING相同,所以發(fā)布一定會阻塞線程。
- 事件的接收(一定會阻塞Android主線程)
- 事件的接收在主線程,所以接收一定會阻塞主線程,所以接收里不能執(zhí)行耗時操作(同
MAIN)。
- 事件的有序性(一定是有序的)
- 在
Android上,始終添加到主線程隊列,所以事件一定是有序的(步驟39)。- 在非
Android上,由于是直接調(diào)用,所以事件一定是有序的(步驟40)。
BACKGROUND(后臺)
在Android上,訂閱者將在后臺線程中被調(diào)用。如果發(fā)布線程不是主線程,則將在發(fā)布線程中直接調(diào)用訂閱方法(步驟43)。如果發(fā)布線程是主線程,則EventBus使用一個單獨的后臺線程,它將按順序傳遞其所有事件(步驟42)。使用此模式的訂閱者應(yīng)該嘗試快速返回,以避免阻塞后臺線程。如果不在Android上,則始終使用后臺線程(步驟42)。
說明:
- 事件的發(fā)布(一定不會阻塞Android主線程)
- 在
Android上并且是主線程,或者不在Android上,將事件插入到后臺線程隊列中(步驟42),由于使用了線程池,所以發(fā)布一定不會阻塞線程。- 在
Android上并且不是主線程,則行為與POSTING相同(步驟43),所以發(fā)布一定會阻塞線程。
- 事件的接收(可能會阻塞線程池線程)
- 在
Android上并且是主線程,使用backgroundPoster執(zhí)行(步驟42),由于backgroundPoster里判斷了如果正在執(zhí)行中,則不再創(chuàng)建線程執(zhí)行,而是用上次線程一塊執(zhí)行(詳細看下面的-Poster-backgroundPoster),所以接收可能會阻塞后臺線程,所以接收里不推薦執(zhí)行耗時操作。
ASYNC(異步)
訂閱者將在單獨的線程中調(diào)用。這始終獨立于發(fā)布線程和主線程。使用此模式發(fā)布事件從不等待訂閱方法。如果訂閱方法的執(zhí)行可能需要一些時間(例如,用于網(wǎng)絡(luò)訪問),則應(yīng)使用此模式。避免同時觸發(fā)大量長時間運行的異步訂閱方法以限制并發(fā)線程的數(shù)量。EventBus使用線程池高效地重用已完成異步訂閱者通知中的線程。
說明:
- 事件的發(fā)布(一定不會阻塞Android主線程)
- 訂閱者將在單獨的線程中調(diào)用(
步驟45),由于使用了線程池,所以發(fā)布一定不會阻塞線程。
- 事件的接收(不會阻塞線程池線程)
- 訂閱者將在單獨的線程中調(diào)用,使用
asyncPoster執(zhí)行(步驟45),由于asyncPoster里執(zhí)行每次都在線程池里面使用新的線程執(zhí)行(詳細看下面的-Poster-asyncPoster),所以接收里能執(zhí)行耗時操作。
小結(jié)
EventBus的post(),獲取當前線程的事件隊列,將要發(fā)送的事件加入到其中。- 遍歷事件隊列,只要有事件,就一直發(fā)送。
- 判斷事件是否有繼承性,如果有則根據(jù)事件類型,找到當前
Event的所有父類以及所有父接口的class集合,遍歷這個集合,依次發(fā)送單個事件;否則,則只發(fā)送當前Event一個事件。- 根據(jù)事件獲取所有訂閱它的
Subscription集合,遍歷集合,將事件發(fā)送給訂閱者的訂閱方法。- 發(fā)送給訂閱者時,根據(jù)訂閱方法的線程模型調(diào)用訂閱方法,如果需要線程切換,則切換線程進行調(diào)用;否則,直接調(diào)用。
EventBus的postSticky
EventBus --> postSticky()
public void postSticky(Object event) {
synchronized (stickyEvents) {
// 1、將事件添加到粘性事件集合中
stickyEvents.put(event.getClass(), event);
}
// 2、發(fā)送事件
post(event);
}
說明:
EventBus的post(),只會發(fā)送給已經(jīng)注冊的訂閱者。EventBus的postSticky(),會發(fā)送給已經(jīng)注冊的訂閱者、之后要注冊的訂閱者的粘性訂閱方法。
小結(jié)
EventBus的postSticky(),將事件加入到stickyEvents集合中,當有新的訂閱者注冊后,如果該訂閱者訂閱了該事件的粘性訂閱方法,則會發(fā)送給此方法。- 將事件發(fā)送給已經(jīng)注冊的訂閱者。
EventBus的注銷
EventBus --> unregister()
public synchronized void unregister(Object subscriber) {
// 1、獲取訂閱者訂閱的所有事件
List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
if (subscribedTypes != null) {
// 2、遍歷此訂閱者訂閱的所有事件集合
for (Class<?> eventType : subscribedTypes) {
// 3、通過eventType,將此事件的Subscription集合中含有該訂閱者的Subscription,從集合中移除。
unsubscribeByEventType(subscriber, eventType);
}
// 4、將此訂閱者從typesBySubscriber中移除
typesBySubscriber.remove(subscriber);
} else {
// 5、未注冊,調(diào)用了注銷,Log打印警告。
logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
}
}
unregister()方法,為注銷訂閱者。從typesBySubscriber中移除自己,從subscriptionsByEventType中移除subscriber內(nèi)的多個事件類型。
EventBus --> unsubscribeByEventType()
private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
// 6、獲取該事件的所有訂閱者
List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions != null) {
int size = subscriptions.size();
// 7、遍歷上面的集合,找到此訂閱者的Subscription,然后從此集合中移除。
for (int i = 0; i < size; i++) {
Subscription subscription = subscriptions.get(i);
if (subscription.subscriber == subscriber) {
// 8、找到,從此集合中移除。
subscription.active = false;
subscriptions.remove(i);
i--;
size--;
}
}
}
}
unsubscribeByEventType()方法,為從subscriptionsByEventType中移除subscriber內(nèi)的單個事件類型。
說明:
- 可以先遍歷
subscriptionsByEventType,然后再遍歷subscriptionsByEventType的值,最后匹配subscriber,通過這種方式可以找到要移除的Subscription,但是這種實現(xiàn)要比上面實現(xiàn)效率低。
小結(jié)
EventBus的注銷,獲取訂閱者訂閱的所有事件,遍歷所有事件,通過事件從subscriptionsByEventType中,獲取該事件的所有Subscription,遍歷Subscription集合,找到此訂閱者的Subscription,然后從此集合中移除。- 將此訂閱者從
typesBySubscriber中移除。
Poster
Poster
public interface Poster {
void enqueue(Subscription subscription, Object event);
}
Poster接口,它有一個enqueue方法,將subscription、event入隊執(zhí)行。它有3個實現(xiàn)類,為HandlerPoster、BackgroundPoster、AsyncPoster,我們來分別看一下。
mainThreadPoster
HandlerPoster
public class HandlerPoster extends Handler implements Poster {
private final PendingPostQueue queue;
private final int maxMillisInsideHandleMessage;
private final EventBus eventBus;
private boolean handlerActive;
public HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
// 這里傳入的是主線程的Looper對象,所以這個Handler對象是主線程的Handler
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
// 創(chuàng)建一個事件隊列
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
// 根據(jù)傳入的參數(shù)封裝一個PendingPost對象
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
// 將PendingPost加入到隊列中
// 發(fā)一個,執(zhí)行一個,再發(fā)送,如果已經(jīng)正在執(zhí)行中,則不再立即執(zhí)行,而是和上次一塊執(zhí)行
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
// 調(diào)用Handler的sendMessage,發(fā)送事件回到主線程,最終會調(diào)用下面的handleMessage方法
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
long started = SystemClock.uptimeMillis();
// 無限循環(huán)
while (true) {
PendingPost pendingPost = queue.poll(); // 獲取并移除第一個
// 進行兩次檢查,確保pendingPost不為null,如果為null直接跳出循環(huán)
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
// 一直循環(huán)到queue結(jié)束,才返回
handlerActive = false;
return;
}
}
}
// 使用反射的方法調(diào)用訂閱者的訂閱方法。
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
}
HandlerPoster類繼承Handler,并實現(xiàn)Poster接口,enqueue方法將PendingPost加入到queue隊列,然后發(fā)送空消息,最終走到handleMessage方法內(nèi),其內(nèi)部判斷,只要queue隊列有數(shù)據(jù)就執(zhí)行,最后使用反射調(diào)用訂閱者的訂閱方法。
backgroundPoster
BackgroundPoster
final class BackgroundPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
private volatile boolean executorRunning;
BackgroundPoster(EventBus eventBus) {
this.eventBus = eventBus;
// 初始化隊列
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
// 封裝PendingPost對象
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
// 將PendingPost對象加入到隊列中
// 發(fā)一個,執(zhí)行一個,再發(fā)送,如果已經(jīng)正在執(zhí)行中,則不再立即執(zhí)行,而是和上次一塊執(zhí)行
queue.enqueue(pendingPost);
if (!executorRunning) {
executorRunning = true;
// 這里使用到之前初始化的線程池
eventBus.getExecutorService().execute(this);
}
}
}
// 線程池的執(zhí)行回調(diào)
@Override
public void run() {
// 實現(xiàn)同HandlerPoster
try {
try {
// 無限循環(huán)
while (true) {
// 獲取隊列中的PendingPost,進行雙重檢查,如果為null直接返回,結(jié)束循環(huán)
PendingPost pendingPost = queue.poll(1000);
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
// 一直循環(huán)到queue結(jié)束,才返回
executorRunning = false;
return;
}
}
}
//使用反射的方式調(diào)用訂閱者的訂閱方法
eventBus.invokeSubscriber(pendingPost);
}
} catch (InterruptedException e) {
eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
}
} finally {
executorRunning = false;
}
}
}
BackgroundPoster類實現(xiàn)Runnable接口,并實現(xiàn)Poster接口,enqueue方法將PendingPost加入到queue隊列,然后使用線程池執(zhí)行此Runnable,最終走到run方法內(nèi),其內(nèi)部判斷,只要queue隊列有數(shù)據(jù)就執(zhí)行,最后使用反射調(diào)用訂閱者的訂閱方法。
asyncPoster
AsyncPoster
class AsyncPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
AsyncPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
// 發(fā)一個,執(zhí)行一個
queue.enqueue(pendingPost);
eventBus.getExecutorService().execute(this);
}
@Override
public void run() {
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}
}
AsyncPoster類實現(xiàn)Runnable接口,并實現(xiàn)Poster接口,enqueue方法將PendingPost加入到queue隊列,然后使用線程池執(zhí)行此Runnable,最終走到run方法內(nèi),其內(nèi)部,獲取queue隊列最新數(shù)據(jù)然后執(zhí)行,最后使用反射調(diào)用訂閱者的訂閱方法。
說明:
BackgroundPoster類和AsyncPoster類線程執(zhí)行區(qū)別:BackgroundPoster類,如果正在執(zhí)行中,則用上一次的線程執(zhí)行,否則用新線程執(zhí)行。AsyncPoster類,始終用新的線程執(zhí)行。
小結(jié)
Poster接口,它有3個實現(xiàn)類,為HandlerPoster、BackgroundPoster、AsyncPoster。- 這幾個類,
enqueue方法將PendingPost(持有subscription對象、event對象)加入到queue隊列,然后切換到指定的線程執(zhí)行,最后使用反射調(diào)用訂閱者的訂閱方法。
SubscriberMethodFinder
SubscriberMethodFinder
class SubscriberMethodFinder {
private static final int BRIDGE = 0x40;
private static final int SYNTHETIC = 0x1000;
private static final int MODIFIERS_IGNORE = Modifier.ABSTRACT | Modifier.STATIC | BRIDGE | SYNTHETIC;
private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();
private List<SubscriberInfoIndex> subscriberInfoIndexes;
private final boolean strictMethodVerification;
private final boolean ignoreGeneratedIndex;
private static final int POOL_SIZE = 4;
private static final FindState[] FIND_STATE_POOL = new FindState[POOL_SIZE];
SubscriberMethodFinder(List<SubscriberInfoIndex> subscriberInfoIndexes, boolean strictMethodVerification,
boolean ignoreGeneratedIndex) {
this.subscriberInfoIndexes = subscriberInfoIndexes;
this.strictMethodVerification = strictMethodVerification;
this.ignoreGeneratedIndex = ignoreGeneratedIndex;
}
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
// 1、先從之前緩存的集合中獲取
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
// 2、如果之前緩存了,直接返回
return subscriberMethods;
}
if (ignoreGeneratedIndex) {
// 忽略注解處理器生成的索引,所以直接用反射獲取,ignoreGeneratedIndex默認為false。
subscriberMethods = findUsingReflection(subscriberClass);
} else {
// 3、獲取所有訂閱方法集合
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
// 4、放入緩存集合中
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
// 5、從數(shù)組中獲取FindState對象,如果有直接返回,如果沒有創(chuàng)建一個新的FindState對象。
FindState findState = prepareFindState();
// 6、根據(jù)事件訂閱者初始化findState
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
// 7、獲取subscriberInfo,初始化為null。
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
// 通過注解處理器處理
// 獲取到所有訂閱者方法
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
// 通過檢查,給findState.subscriberMethods增加訂閱者方法
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
// 8、通過反射的方式獲取訂閱者中的Method
findUsingReflectionInSingleClass(findState);
}
// 移動到父類,以便后續(xù)操作。
findState.moveToSuperclass();
}
// 釋放
return getMethodsAndRelease(findState);
}
// 從findState中獲取訂閱者所有方法并釋放
private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
// 獲取訂閱者所有訂閱方法集合
List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
// findState進行回收
findState.recycle();
// 將回收后的findState,恢復到FIND_STATE_POOL中。
synchronized (FIND_STATE_POOL) {
for (int i = 0; i < POOL_SIZE; i++) {
if (FIND_STATE_POOL[i] == null) {
FIND_STATE_POOL[i] = findState;
break;
}
}
}
// 返回集合
return subscriberMethods;
}
// 準備FindState對象,先從緩存中獲取,如果有則獲取并在緩存中移除;如果沒有則進行創(chuàng)建并返回。
private FindState prepareFindState() {
synchronized (FIND_STATE_POOL) {
for (int i = 0; i < POOL_SIZE; i++) {
FindState state = FIND_STATE_POOL[i];
if (state != null) {
FIND_STATE_POOL[i] = null;
return state;
}
}
}
return new FindState();
}
// 獲取訂閱者信息
private SubscriberInfo getSubscriberInfo(FindState findState) {
// 處理SubscriberInfoIndex添加手動創(chuàng)建的AbstractSubscriberInfo(一般不會手動創(chuàng)建)。
// -findState.subscriberInfo != null:說明調(diào)用者調(diào)用findState.subscriberInfo = getSubscriberInfo(findState)的getSubscriberInfo(findState)返回的不為null,即subscriberInfoIndexes有值。
// -findState.subscriberInfo.getSuperSubscriberInfo() != null:說明手動創(chuàng)建的AbstractSubscriberInfo,而不是注解處理器生成添加的SimpleSubscriberInfo。
if (findState.subscriberInfo != null && findState.subscriberInfo.getSuperSubscriberInfo() != null) {
SubscriberInfo superclassInfo = findState.subscriberInfo.getSuperSubscriberInfo();
if (findState.clazz == superclassInfo.getSubscriberClass()) {
// 要獲取的Class等于父類的Class,則返回父類的Class。
return superclassInfo;
}
}
// 處理addIndex()添加的SubscriberInfoIndex。
if (subscriberInfoIndexes != null) {
for (SubscriberInfoIndex index : subscriberInfoIndexes) {
SubscriberInfo info = index.getSubscriberInfo(findState.clazz);
if (info != null) {
return info;
}
}
}
return null;
}
// 通過反射獲取訂閱者方法
private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
// 獲取FindState,有復用。
FindState findState = prepareFindState();
// 初始化FindState,因為有復用,所以得初始化。
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
// 反射獲取,如果找到則存到findState.subscriberMethods里。
findUsingReflectionInSingleClass(findState);
// 移動到父類,會更改findState.clazz
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// 9、訂閱者中所有聲明的方法,放入數(shù)組中。
// getDeclaredMethods,獲取的是當前類的所有方法。
// getMethods,獲取的是當前類及其所有父類的所有公共方法。
methods = findState.clazz.getDeclaredMethods();
// 因為getDeclaredMethods,獲取的是當前類的所有方法,所以findState.skipSuperClasses為false,為不跳過父類,即繼續(xù)查找父類。
} catch (Throwable th) {
// Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
try {
// 10、獲取訂閱者中聲明的public方法,設(shè)置跳過父類
// 因為getMethods,獲取的是當前類及其所有父類的所有公共方法,所以findState.skipSuperClasses為true,為跳過父類,即不查找父類。
methods = findState.clazz.getMethods();
} catch (LinkageError error) { // super class of NoClassDefFoundError to be a bit more broad...
String msg = "Could not inspect methods of " + findState.clazz.getName();
if (ignoreGeneratedIndex) {
// 忽略注解生成器生成的索引,提示:請考慮使用EventBus注釋處理器來避免反射。
msg += ". Please consider using EventBus annotation processor to avoid reflection.";
} else {
// 忽略注解生成器生成的索引,提示:請使這個類對EventBus注釋處理器可見,以避免反射。
msg += ". Please make this class visible to EventBus annotation processor to avoid reflection.";
}
throw new EventBusException(msg, error);
}
findState.skipSuperClasses = true;
}
// 遍歷這些方法
for (Method method : methods) {
// 11、獲取方法的修飾符:public、private等等
int modifiers = method.getModifiers();
// 12、訂閱方法為public同時不是abstract、static、bridge、synthetic。
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
// 13、方法參數(shù)類型數(shù)組
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1) {
// 14、獲取方法的注解
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
// 15、如果有注解
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
// 16、將method和eventType放入到findState進行檢查
if (findState.checkAdd(method, eventType)) {
// 17、獲取注解中的threadMode對象
ThreadMode threadMode = subscribeAnnotation.threadMode();
// 18、新建一個SubscriberMethod對象,同時加入到findState中
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
// 非1個參數(shù),并且是嚴格模式,并且方法上有Subscribe注解,則拋異常警告。
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
// 方法的修飾符不符合,并且是嚴格模式,并且方法上有Subscribe注解,則拋異常警告。
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}
}
SubscriberMethodFinder類findSubscriberMethods方法,找到此訂閱者的所有訂閱方法。它先從緩存中獲取,如果緩存中有,直接返回;如果緩存中沒有,通過反射或注解處理器的方式獲取到此訂閱者的所有訂閱方法,并放入到集合中進行返回。
總結(jié)
以上就是全面的EventBus源碼了!之后會出其它三方庫源碼系列,請及時關(guān)注。如果你有什么問題,大家評論區(qū)見!
最后推薦一下我的網(wǎng)站,開發(fā)者的技術(shù)博客: devbolg.cn ,目前包含android相關(guān)的技術(shù),之后會面向全部開發(fā)者,歡迎大家來體驗!