EventBus源碼分析(二)

上一篇關(guān)于訂閱和取消訂閱的分析:http://www.itdecent.cn/p/3f08a23c4544
上一篇對(duì)訂閱和取消訂閱進(jìn)行了一個(gè)源碼分析,簡(jiǎn)單來(lái)講就是我們?cè)陬愔姓{(diào)用@Subscribe所訂閱事件的方法在訂閱過(guò)程被封裝成了subscriberMethod對(duì)象并被逐一添加到subscriptionsByEventType和typesBySubscriber這兩個(gè)map中去,取消訂閱則是分別從這兩個(gè)map中移除相關(guān)的映射關(guān)系。
注冊(cè)訂閱事件后,接下來(lái)看一下是如何發(fā)送訂閱事件的,發(fā)送訂閱事件使用的是:

  EventBus.getDefault().post(TestEvent())

點(diǎn)擊進(jìn)去看一下post方法:

    private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
        @Override
        protected PostingThreadState initialValue() {
            return new PostingThreadState();
        }
    };
    /** Posts the given event to the event bus. */
    public void post(Object event) {
        //分裝成PostingThreadState 對(duì)象
        PostingThreadState postingState = currentPostingThreadState.get();
        //從postingState獲取事件隊(duì)列
        List<Object> eventQueue = postingState.eventQueue;
        // 將當(dāng)前要發(fā)送的事件加入到隊(duì)列中
        eventQueue.add(event);

        if (!postingState.isPosting) {
            postingState.isMainThread = isMainThread();
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                //while 不斷輪詢發(fā)送事件
                while (!eventQueue.isEmpty()) {
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }

    /** For ThreadLocal, much faster to set (and get multiple values). */
    final static class PostingThreadState {
        final List<Object> eventQueue = new ArrayList<>();
        boolean isPosting;
        boolean isMainThread;
        Subscription subscription;
        Object event;
        boolean canceled;
    }

currentPostingThreadState是一個(gè)ThreadLocal類型的變量(ThreadLocal的作用:ThreadLocal是解決線程安全問(wèn)題一個(gè)很好的思路,它通過(guò)為每個(gè)線程提供一個(gè)獨(dú)立的變量副本解決了變量并發(fā)訪問(wèn)的沖突問(wèn)題。在很多情況下,ThreadLocal比直接使用synchronized同步機(jī)制解決線程安全問(wèn)題更簡(jiǎn)單,更方便,且結(jié)果程序擁有更高的并發(fā)性。)currentPostingThreadState中存儲(chǔ)了當(dāng)前線程對(duì)應(yīng)的事件列表和線程的狀態(tài)信息等,上述主要調(diào)用了輪詢調(diào)用postSingleEvent方法,看一下postSingleEvent方法:

    private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
        //獲取該事件對(duì)象類型
        Class<?> eventClass = event.getClass();
        boolean subscriptionFound = false;
        if (eventInheritance) {
            //是否支持事件繼承,默認(rèn)為true,如果訂閱了父類型,當(dāng)發(fā)送子類型事件實(shí)也會(huì)調(diào)用其相關(guān)訂閱方法
            List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
            int countTypes = eventTypes.size();
            for (int h = 0; h < countTypes; h++) {
                Class<?> clazz = eventTypes.get(h);
                subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
            }
        } else {
            subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
        }
         // 找不到該事件的異常處理
        if (!subscriptionFound) {
            if (logNoSubscriberMessages) {
                logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
            }
            if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                    eventClass != SubscriberExceptionEvent.class) {
                post(new NoSubscriberEvent(this, event));
            }
        }
    }

上述eventInheritance默認(rèn)為true,表示如果訂閱了父類型,當(dāng)發(fā)送子類型事件實(shí)也會(huì)調(diào)用其相關(guān)訂閱方法,最終是調(diào)用postSingleEventForEventType進(jìn)行分發(fā),看一下postSingleEventForEventType:

    private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
        //CopyOnWriteArrayList是一個(gè)線程安全的list,寫入時(shí)會(huì)復(fù)制一份數(shù)據(jù)出來(lái),之后再賦值回去
        CopyOnWriteArrayList<Subscription> subscriptions;
        synchronized (this) {
            //從subscriptionsByEventType map中獲取該eventType下的所有訂閱對(duì)象,subscriptionsByEventType會(huì)不會(huì)有點(diǎn)熟悉??
            subscriptions = subscriptionsByEventType.get(eventClass);
        }
        if (subscriptions != null && !subscriptions.isEmpty()) {
             // 遍歷該eventType對(duì)應(yīng)下的訂閱對(duì)象,并調(diào)用postToSubscription執(zhí)行分發(fā)操作
            for (Subscription subscription : subscriptions) {
                postingState.event = event;
                postingState.subscription = subscription;
                boolean aborted;
                try {
                    postToSubscription(subscription, event, postingState.isMainThread);
                    aborted = postingState.canceled;
                } finally {
                    postingState.event = null;
                    postingState.subscription = null;
                    postingState.canceled = false;
                }
                if (aborted) {
                    break;
                }
            }
            return true;
        }
        return false;
    }

subscriptionsByEventType會(huì)不會(huì)覺(jué)得有點(diǎn)熟悉??這個(gè)便是我們上一篇訂閱分析所提到的,這個(gè)map key為某一事件類型,value為該事件類型下的所有訂閱,我們從subscriptionsByEventType中獲取該eventType下的所有訂閱后對(duì)其進(jìn)行遍歷,并逐一調(diào)用postToSubscription()把事件分發(fā)到每一個(gè)訂閱對(duì)象中去,繼續(xù)看postToSubscription,這里我們會(huì)根據(jù)訂閱方法指定的threadMode信息來(lái)執(zhí)行不同的發(fā)布策略:

    private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING:
                invokeSubscriber(subscription, event);
                break;
            case MAIN:
                if (isMainThread) {
                    invokeSubscriber(subscription, event);
                } else {
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case MAIN_ORDERED:
                if (mainThreadPoster != null) {
                    mainThreadPoster.enqueue(subscription, event);
                } else {
                    // temporary: technically not correct as poster not decoupled from subscriber
                    invokeSubscriber(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }

threadMode總共有以下幾種類型
POSTING:執(zhí)行invokeSubscriber()方法,就是直接反射調(diào)用;
MAIN:首先去判斷當(dāng)前是否在UI線程,如果是的話則直接反射調(diào)用,否則調(diào)用mainThreadPoster#enqueue(),即把當(dāng)前的方法加入到隊(duì)列之中,然后通過(guò)handler去發(fā)送一個(gè)消息,在handler的handleMessage中去執(zhí)行方法。具體邏輯在HandlerPoster.java中;
MAIN_ORDERED:與上面邏輯類似,順序執(zhí)行我們的方法;
BACKGROUND:判斷當(dāng)前是否在UI線程,如果不是的話直接反射調(diào)用,是的話通過(guò)backgroundPoster.enqueue()將方法加入到后臺(tái)的一個(gè)隊(duì)列,最后通過(guò)線程池去執(zhí)行;
ASYNC:與BACKGROUND的邏輯類似,將任務(wù)加入到后臺(tái)的一個(gè)隊(duì)列,最終由Eventbus中的一個(gè)線程池去調(diào)用,這里的線程池與BACKGROUND邏輯中的線程池用的是同一個(gè)。

這里先取一個(gè)分支來(lái)看,假設(shè)我們指定事件監(jiān)聽最后是回到主線程,也即是平常常使用的 @Subscribe(threadMode = ThreadMode.MAIN),那么這里將會(huì)來(lái)到 case MAIN:分支,第一步先判斷發(fā)送事件的時(shí)候(即調(diào)用event post)是不是在主線程,是的話直接執(zhí)行invokeSubscriber()使用反射執(zhí)行方法

    void invokeSubscriber(Subscription subscription, Object event) {
        try {
            //使用反射執(zhí)行
            subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
        } catch (InvocationTargetException e) {
            handleSubscriberException(subscription, event, e.getCause());
        } catch (IllegalAccessException e) {
            throw new IllegalStateException("Unexpected exception", e);
        }
    }

如果發(fā)送事件的時(shí)候(即調(diào)用event post)不是在主線程,則執(zhí)行mainThreadPoster.enqueue(subscription, event)方法,那這個(gè)mainThreadPoster是什么呢?

//簡(jiǎn)化代碼
private final Poster mainThreadPoster;
....
mainThreadSupport = builder.getMainThreadSupport();
mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;
....
    MainThreadSupport getMainThreadSupport() {
        if (mainThreadSupport != null) {
            return mainThreadSupport;
        } else if (AndroidLogger.isAndroidLogAvailable()) {
            Object looperOrNull = getAndroidMainLooperOrNull();
            return looperOrNull == null ? null :
                    new MainThreadSupport.AndroidHandlerMainThreadSupport((Looper) looperOrNull);
        } else {
            return null;
        }
    }
....
public interface MainThreadSupport {

    boolean isMainThread();

    Poster createPoster(EventBus eventBus);

    class AndroidHandlerMainThreadSupport implements MainThreadSupport {

        private final Looper looper;

        public AndroidHandlerMainThreadSupport(Looper looper) {
            this.looper = looper;
        }

        @Override
        public boolean isMainThread() {
            return looper == Looper.myLooper();
        }

        @Override
        public Poster createPoster(EventBus eventBus) {
            return new HandlerPoster(eventBus, looper, 10);
        }
    }

}

可以看到,mainThreadPoster內(nèi)部創(chuàng)建了一個(gè)主線程Looper,并最終new了一個(gè)HandlerPoster,HandlerPoster是mainThreadPoster的實(shí)現(xiàn)類,這里大概可以猜到mainThreadPoster其實(shí)就是主線程的handler,看一下HandlerPoster如何實(shí)現(xiàn),先看一下enqueue方法:

public class HandlerPoster extends Handler implements Poster {

    private final PendingPostQueue queue;
    private final int maxMillisInsideHandleMessage;
    private final EventBus eventBus;
    private boolean handlerActive;

    protected HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
        super(looper);
        this.eventBus = eventBus;
        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        //構(gòu)建一個(gè)PendingPost對(duì)象
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            //添加進(jìn)PendingPost隊(duì)列
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                handlerActive = true;
                //調(diào)用sendMessage發(fā)送消息,從而觸發(fā)handleMessage
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }
  ......省略

PendingPostQueue是一個(gè)簡(jiǎn)單實(shí)現(xiàn)的鏈表,內(nèi)部保存了兩個(gè)PendingPost對(duì)象 ,一頭一尾,尾部插入,頭部移除,調(diào)用enqueue()從鏈表尾部插入,調(diào)用poll()從鏈表頭部移除,每次插入后調(diào)用sendMessage從而回調(diào)到handleMessage,看一下handleMessage是如何從這個(gè)PendingPostQueue取出這個(gè)訂閱事件的:

  ......省略
    @Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            //記錄開始時(shí)間
            long started = SystemClock.uptimeMillis();
            while (true) {
                //while循環(huán)一直從PendingPostQueue取出PendingPost
                PendingPost pendingPost = queue.poll();
                //如果隊(duì)列為空則取消
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            handlerActive = false;
                            return;
                        }
                    }
                }
                //這里同樣使用反射調(diào)用了方法
                eventBus.invokeSubscriber(pendingPost);
                //判斷執(zhí)行是否超過(guò)了指定時(shí)間,是的話重新調(diào)用sendMessage方法
                long timeInMethod = SystemClock.uptimeMillis() - started;
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }

可以看出,其實(shí)mainThreadPoster主要是幫我們回調(diào)到主線程,其內(nèi)部本質(zhì)上還是調(diào)用了反射去執(zhí)行方法,關(guān)于handleMessage,第一步是用while循環(huán)不斷輪詢?nèi)〕鲫?duì)列中的PendingPost,當(dāng)隊(duì)列為空則停止輪詢,當(dāng)執(zhí)行處理方法的時(shí)長(zhǎng)過(guò)長(zhǎng)時(shí)則重新調(diào)用sendMessage,從而繼續(xù)回調(diào)到handleMessage,這是為了防止執(zhí)行時(shí)間過(guò)長(zhǎng),導(dǎo)致while循環(huán)阻塞主線程造成卡頓。
本篇博文到此結(jié)束,關(guān)于其他類型的threadMode執(zhí)行操作,可自行在研究,差不多。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容