上一篇關(guān)于訂閱和取消訂閱的分析:http://www.itdecent.cn/p/3f08a23c4544
上一篇對(duì)訂閱和取消訂閱進(jìn)行了一個(gè)源碼分析,簡(jiǎn)單來(lái)講就是我們?cè)陬愔姓{(diào)用@Subscribe所訂閱事件的方法在訂閱過(guò)程被封裝成了subscriberMethod對(duì)象并被逐一添加到subscriptionsByEventType和typesBySubscriber這兩個(gè)map中去,取消訂閱則是分別從這兩個(gè)map中移除相關(guān)的映射關(guān)系。
注冊(cè)訂閱事件后,接下來(lái)看一下是如何發(fā)送訂閱事件的,發(fā)送訂閱事件使用的是:
EventBus.getDefault().post(TestEvent())
點(diǎn)擊進(jìn)去看一下post方法:
private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
@Override
protected PostingThreadState initialValue() {
return new PostingThreadState();
}
};
/** Posts the given event to the event bus. */
public void post(Object event) {
//分裝成PostingThreadState 對(duì)象
PostingThreadState postingState = currentPostingThreadState.get();
//從postingState獲取事件隊(duì)列
List<Object> eventQueue = postingState.eventQueue;
// 將當(dāng)前要發(fā)送的事件加入到隊(duì)列中
eventQueue.add(event);
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
//while 不斷輪詢發(fā)送事件
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
/** For ThreadLocal, much faster to set (and get multiple values). */
final static class PostingThreadState {
final List<Object> eventQueue = new ArrayList<>();
boolean isPosting;
boolean isMainThread;
Subscription subscription;
Object event;
boolean canceled;
}
currentPostingThreadState是一個(gè)ThreadLocal類型的變量(ThreadLocal的作用:ThreadLocal是解決線程安全問(wèn)題一個(gè)很好的思路,它通過(guò)為每個(gè)線程提供一個(gè)獨(dú)立的變量副本解決了變量并發(fā)訪問(wèn)的沖突問(wèn)題。在很多情況下,ThreadLocal比直接使用synchronized同步機(jī)制解決線程安全問(wèn)題更簡(jiǎn)單,更方便,且結(jié)果程序擁有更高的并發(fā)性。)currentPostingThreadState中存儲(chǔ)了當(dāng)前線程對(duì)應(yīng)的事件列表和線程的狀態(tài)信息等,上述主要調(diào)用了輪詢調(diào)用postSingleEvent方法,看一下postSingleEvent方法:
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
//獲取該事件對(duì)象類型
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
//是否支持事件繼承,默認(rèn)為true,如果訂閱了父類型,當(dāng)發(fā)送子類型事件實(shí)也會(huì)調(diào)用其相關(guān)訂閱方法
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
// 找不到該事件的異常處理
if (!subscriptionFound) {
if (logNoSubscriberMessages) {
logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
}
if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
eventClass != SubscriberExceptionEvent.class) {
post(new NoSubscriberEvent(this, event));
}
}
}
上述eventInheritance默認(rèn)為true,表示如果訂閱了父類型,當(dāng)發(fā)送子類型事件實(shí)也會(huì)調(diào)用其相關(guān)訂閱方法,最終是調(diào)用postSingleEventForEventType進(jìn)行分發(fā),看一下postSingleEventForEventType:
private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
//CopyOnWriteArrayList是一個(gè)線程安全的list,寫入時(shí)會(huì)復(fù)制一份數(shù)據(jù)出來(lái),之后再賦值回去
CopyOnWriteArrayList<Subscription> subscriptions;
synchronized (this) {
//從subscriptionsByEventType map中獲取該eventType下的所有訂閱對(duì)象,subscriptionsByEventType會(huì)不會(huì)有點(diǎn)熟悉??
subscriptions = subscriptionsByEventType.get(eventClass);
}
if (subscriptions != null && !subscriptions.isEmpty()) {
// 遍歷該eventType對(duì)應(yīng)下的訂閱對(duì)象,并調(diào)用postToSubscription執(zhí)行分發(fā)操作
for (Subscription subscription : subscriptions) {
postingState.event = event;
postingState.subscription = subscription;
boolean aborted;
try {
postToSubscription(subscription, event, postingState.isMainThread);
aborted = postingState.canceled;
} finally {
postingState.event = null;
postingState.subscription = null;
postingState.canceled = false;
}
if (aborted) {
break;
}
}
return true;
}
return false;
}
subscriptionsByEventType會(huì)不會(huì)覺(jué)得有點(diǎn)熟悉??這個(gè)便是我們上一篇訂閱分析所提到的,這個(gè)map key為某一事件類型,value為該事件類型下的所有訂閱,我們從subscriptionsByEventType中獲取該eventType下的所有訂閱后對(duì)其進(jìn)行遍歷,并逐一調(diào)用postToSubscription()把事件分發(fā)到每一個(gè)訂閱對(duì)象中去,繼續(xù)看postToSubscription,這里我們會(huì)根據(jù)訂閱方法指定的threadMode信息來(lái)執(zhí)行不同的發(fā)布策略:
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
threadMode總共有以下幾種類型
POSTING:執(zhí)行invokeSubscriber()方法,就是直接反射調(diào)用;
MAIN:首先去判斷當(dāng)前是否在UI線程,如果是的話則直接反射調(diào)用,否則調(diào)用mainThreadPoster#enqueue(),即把當(dāng)前的方法加入到隊(duì)列之中,然后通過(guò)handler去發(fā)送一個(gè)消息,在handler的handleMessage中去執(zhí)行方法。具體邏輯在HandlerPoster.java中;
MAIN_ORDERED:與上面邏輯類似,順序執(zhí)行我們的方法;
BACKGROUND:判斷當(dāng)前是否在UI線程,如果不是的話直接反射調(diào)用,是的話通過(guò)backgroundPoster.enqueue()將方法加入到后臺(tái)的一個(gè)隊(duì)列,最后通過(guò)線程池去執(zhí)行;
ASYNC:與BACKGROUND的邏輯類似,將任務(wù)加入到后臺(tái)的一個(gè)隊(duì)列,最終由Eventbus中的一個(gè)線程池去調(diào)用,這里的線程池與BACKGROUND邏輯中的線程池用的是同一個(gè)。
這里先取一個(gè)分支來(lái)看,假設(shè)我們指定事件監(jiān)聽最后是回到主線程,也即是平常常使用的 @Subscribe(threadMode = ThreadMode.MAIN),那么這里將會(huì)來(lái)到 case MAIN:分支,第一步先判斷發(fā)送事件的時(shí)候(即調(diào)用event post)是不是在主線程,是的話直接執(zhí)行invokeSubscriber()使用反射執(zhí)行方法
void invokeSubscriber(Subscription subscription, Object event) {
try {
//使用反射執(zhí)行
subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
} catch (InvocationTargetException e) {
handleSubscriberException(subscription, event, e.getCause());
} catch (IllegalAccessException e) {
throw new IllegalStateException("Unexpected exception", e);
}
}
如果發(fā)送事件的時(shí)候(即調(diào)用event post)不是在主線程,則執(zhí)行mainThreadPoster.enqueue(subscription, event)方法,那這個(gè)mainThreadPoster是什么呢?
//簡(jiǎn)化代碼
private final Poster mainThreadPoster;
....
mainThreadSupport = builder.getMainThreadSupport();
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
....
MainThreadSupport getMainThreadSupport() {
if (mainThreadSupport != null) {
return mainThreadSupport;
} else if (AndroidLogger.isAndroidLogAvailable()) {
Object looperOrNull = getAndroidMainLooperOrNull();
return looperOrNull == null ? null :
new MainThreadSupport.AndroidHandlerMainThreadSupport((Looper) looperOrNull);
} else {
return null;
}
}
....
public interface MainThreadSupport {
boolean isMainThread();
Poster createPoster(EventBus eventBus);
class AndroidHandlerMainThreadSupport implements MainThreadSupport {
private final Looper looper;
public AndroidHandlerMainThreadSupport(Looper looper) {
this.looper = looper;
}
@Override
public boolean isMainThread() {
return looper == Looper.myLooper();
}
@Override
public Poster createPoster(EventBus eventBus) {
return new HandlerPoster(eventBus, looper, 10);
}
}
}
可以看到,mainThreadPoster內(nèi)部創(chuàng)建了一個(gè)主線程Looper,并最終new了一個(gè)HandlerPoster,HandlerPoster是mainThreadPoster的實(shí)現(xiàn)類,這里大概可以猜到mainThreadPoster其實(shí)就是主線程的handler,看一下HandlerPoster如何實(shí)現(xiàn),先看一下enqueue方法:
public class HandlerPoster extends Handler implements Poster {
private final PendingPostQueue queue;
private final int maxMillisInsideHandleMessage;
private final EventBus eventBus;
private boolean handlerActive;
protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
super(looper);
this.eventBus = eventBus;
this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
//構(gòu)建一個(gè)PendingPost對(duì)象
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
//添加進(jìn)PendingPost隊(duì)列
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
//調(diào)用sendMessage發(fā)送消息,從而觸發(fā)handleMessage
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
......省略
PendingPostQueue是一個(gè)簡(jiǎn)單實(shí)現(xiàn)的鏈表,內(nèi)部保存了兩個(gè)PendingPost對(duì)象 ,一頭一尾,尾部插入,頭部移除,調(diào)用enqueue()從鏈表尾部插入,調(diào)用poll()從鏈表頭部移除,每次插入后調(diào)用sendMessage從而回調(diào)到handleMessage,看一下handleMessage是如何從這個(gè)PendingPostQueue取出這個(gè)訂閱事件的:
......省略
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
//記錄開始時(shí)間
long started = SystemClock.uptimeMillis();
while (true) {
//while循環(huán)一直從PendingPostQueue取出PendingPost
PendingPost pendingPost = queue.poll();
//如果隊(duì)列為空則取消
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
//這里同樣使用反射調(diào)用了方法
eventBus.invokeSubscriber(pendingPost);
//判斷執(zhí)行是否超過(guò)了指定時(shí)間,是的話重新調(diào)用sendMessage方法
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
} finally {
handlerActive = rescheduled;
}
}
可以看出,其實(shí)mainThreadPoster主要是幫我們回調(diào)到主線程,其內(nèi)部本質(zhì)上還是調(diào)用了反射去執(zhí)行方法,關(guān)于handleMessage,第一步是用while循環(huán)不斷輪詢?nèi)〕鲫?duì)列中的PendingPost,當(dāng)隊(duì)列為空則停止輪詢,當(dāng)執(zhí)行處理方法的時(shí)長(zhǎng)過(guò)長(zhǎng)時(shí)則重新調(diào)用sendMessage,從而繼續(xù)回調(diào)到handleMessage,這是為了防止執(zhí)行時(shí)間過(guò)長(zhǎng),導(dǎo)致while循環(huán)阻塞主線程造成卡頓。
本篇博文到此結(jié)束,關(guān)于其他類型的threadMode執(zhí)行操作,可自行在研究,差不多。