三方庫-EventBus源碼解析(二)

EventBus源碼解析(二)

源碼版本:

  • EventBus:3.3.1

導航:

EventBus的post

EventBus --> post()

public void post(Object event) {
    // 1、獲取當前線程的PostingThreadState,currentPostingThreadState是一個ThreadLocal對象,保證每個線程都有一個PostingThreadState對象,保證了線程安全。
    PostingThreadState postingState = currentPostingThreadState.get();
    // 2、獲取當前線程的事件隊列
    List<Object> eventQueue = postingState.eventQueue;
    // 3、將要發(fā)送的事件加入到當前線程的事件隊列中
    eventQueue.add(event);

    // 4、判斷是否正在發(fā)送事件,如果正在發(fā)送中,則不再執(zhí)行里面的邏輯,因為事件已經(jīng)添加到eventQueue中,它會繼續(xù)走里面while循環(huán)的判斷邏輯。
    if (!postingState.isPosting) {
        // 5、事件未被發(fā)送中,則初始化PostingThreadState,遍歷事件隊列發(fā)送。
        // 6、判斷當前線程是否是主線程
        postingState.isMainThread = isMainThread();
        // 7、標記當前線程正在發(fā)送中
        postingState.isPosting = true;
        // 8、如果當前線程已經(jīng)取消,則拋出異常。
        if (postingState.canceled) {
            throw new EventBusException("Internal error. Abort state was not reset");
        }
        try {eventQueue.remove(0)
            // 9、遍歷事件隊列,只要有事件,就一直發(fā)送。
            while (!eventQueue.isEmpty()) {
                // 10、發(fā)送事件,eventQueue.remove(0)說明post進來的事件,會依次發(fā)送。
                postSingleEvent(eventQueue.remove(0), postingState);
            }
        } finally {
            // 11、當前線程的所有事件完成,還原狀態(tài)。
            postingState.isPosting = false;
            postingState.isMainThread = false;
        }
    }
}

post()方法,為事件的發(fā)送,兼容了多線程多發(fā)送多事件(多事件后面介紹)的情況。它使用ThreadLocal對象,保證每個線程都有一個PostingThreadState對象,保證了線程安全,它通過postingState.isMainThread記錄了發(fā)送線程是否是主線程,然后它會依次進行發(fā)送。

說明:

  1. postingState.isMainThread,記錄了發(fā)送線程是否是主線程。
  2. 步驟10eventQueue.remove(0),說明post()進來的事件,會依次發(fā)送。

我們再來看一下currentPostingThreadState屬性和PostingThreadState類。

private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {
    @Override
    protected PostingThreadState initialValue() {
        return new PostingThreadState();
    }
};

final static class PostingThreadState {
    final List<Object> eventQueue = new ArrayList<>(); // 發(fā)送線程的事件隊列
    boolean isPosting; // 發(fā)送線程是否正在發(fā)送中
    boolean isMainThread; // 發(fā)送線程是否在主線程
    Subscription subscription; // 發(fā)送線程正在發(fā)布的Subscription
    Object event; // 發(fā)送線程正在發(fā)布的Event
    boolean canceled; // 發(fā)送線程是否已取消
}

PostingThreadState類為發(fā)送線程狀態(tài)類,狀態(tài)信息看上面注釋。currentPostingThreadState是一個ThreadLocal對象,保證每個線程都有一個PostingThreadState對象,保證了線程安全。

EventBus --> postSingleEvent

private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
    // 11、獲取發(fā)送事件的Class
    Class<?> eventClass = event.getClass();
    // 12、記錄是否找到Subscription
    boolean subscriptionFound = false;
    if (eventInheritance) { // eventInheritance:事件是否有繼承性,默認為true。
        // 13、事件有繼承性,則發(fā)送[發(fā)送Event的所有父類以及所有父接口]的Event。
        // 14、獲取到[發(fā)送Event的所有父類以及所有父接口]。
        List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
        // 15、遍歷集合,發(fā)送單個事件。
        int countTypes = eventTypes.size();
        for (int h = 0; h < countTypes; h++) {
            Class<?> clazz = eventTypes.get(h);
            // 16、發(fā)送單個事件,并判斷是否找到Subscription(只要有一個有,就代表找到)。
            subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
        }
    } else {
        // 17、事件沒有繼承性,則發(fā)送[發(fā)送Event]的Event。
        subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
    }
    if (!subscriptionFound) {
        // 18、沒有找到,判斷是否打印Log,或者發(fā)送NoSubscriberEvent。
        if (logNoSubscriberMessages) {
            // 19、沒有訂閱者時,打印異常信息。
            logger.log(Level.FINE, "No subscribers registered for event " + eventClass);
        }
        if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                eventClass != SubscriberExceptionEvent.class) {
            // 20、沒有訂閱者時,發(fā)送NoSubscriberEvent事件。
            post(new NoSubscriberEvent(this, event));
        }
    }
}

postSingleEvent()方法,為事件的發(fā)送,兼容了多線程單發(fā)送多事件的情況。如果事件有繼承性,則發(fā)送[發(fā)送Event的所有父類以及所有父接口]的Event(此為多事件),否則則發(fā)送[發(fā)送Event]的Event,并判斷沒有找到,是否打印Log,或者發(fā)送NoSubscriberEvent。

我們再來看一下lookupAllEventTypes方法。
EventBus --> lookupAllEventTypes

private static List<Class<?>> lookupAllEventTypes(Class<?> eventClass) {
    // 使用了同步并且鎖唯一,保證了線程有序執(zhí)行。
    synchronized (eventTypesCache) {
        // 從緩存中獲取當前Event的已維護的所有事件(父類、父接口)集合。
        List<Class<?>> eventTypes = eventTypesCache.get(eventClass);
        if (eventTypes == null) {
            // 集合為空,說明是第一次獲取,則創(chuàng)建集合,并維護集合,并將其添加到緩存中。
            eventTypes = new ArrayList<>();
            // 當前clazz默認為eventClass
            Class<?> clazz = eventClass;
            // 遍歷當前clazz,只要不為空,就繼續(xù)遍歷。
            while (clazz != null) {
                // 添加當前clazz,第一次會把eventClass添加進去。
                eventTypes.add(clazz);
                // 添加當前clazz的所有接口class
                addInterfaces(eventTypes, clazz.getInterfaces());
                // 獲取當前clazz的父類,以繼續(xù)添加。
                clazz = clazz.getSuperclass();
            }
            // 將維護好的集合,添加到緩存。
            eventTypesCache.put(eventClass, eventTypes);
        }
        // 最后返回此維護好的所有事件(父類、父接口)集合。
        return eventTypes;
    }
}

// 循環(huán)遞歸添加當前類的所有的接口class到eventTypes。
// 舉例:當前類,實現(xiàn)A、B兩個接口,A實現(xiàn)C、D接口,B實現(xiàn)E接口,C、D、E未實現(xiàn)任何接口,最終會把A、B、C、D、E接口全部添加到eventTypes中。
static void addInterfaces(List<Class<?>> eventTypes, Class<?>[] interfaces) {
    // 遍歷所有接口
    for (Class<?> interfaceClass : interfaces) {
        if (!eventTypes.contains(interfaceClass)) {
            // 此接口未被包含,則進行添加。
            eventTypes.add(interfaceClass);
            // 繼續(xù)遞歸增加父接口的所有接口,使用遞歸以完成其所有父接口。
            addInterfaces(eventTypes, interfaceClass.getInterfaces());
        }
    }
}

lookupAllEventTypes()方法,為獲取到[發(fā)送Event的所有父類以及所有父接口]。

EventBus --> postSingleEventForEventType

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {
    CopyOnWriteArrayList<Subscription> subscriptions;
    synchronized (this) {
        // 21、根據(jù)事件獲取所有訂閱它的訂閱者,在同步方法內(nèi)執(zhí)行,保證了HashMap的get方法線程安全。
        subscriptions = subscriptionsByEventType.get(eventClass);
    }
    if (subscriptions != null && !subscriptions.isEmpty()) {
        // 22、有訂閱者,遍歷集合進行發(fā)送,返回true(代表已經(jīng)找到)。
        // 23、遍歷所有此類型的訂閱者進行發(fā)送
        for (Subscription subscription : subscriptions) {
            // 24、記錄發(fā)送線程正在發(fā)布的Event
            postingState.event = event;
            // 25、記錄發(fā)送線程正在發(fā)布的Subscription
            postingState.subscription = subscription;
            // 26、是否是中斷的
            boolean aborted;
            try {
                // 27、將事件發(fā)送給訂閱者,postingState.isMainThread為post方法記錄的。
                postToSubscription(subscription, event, postingState.isMainThread);
                // 28、事件是否被取消
                aborted = postingState.canceled;
            } finally {
                // 29、重置postingState
                postingState.event = null;
                postingState.subscription = null;
                postingState.canceled = false;
            }
            if (aborted) {
                // 30、事件被取消,則取消此類型的全部發(fā)布。
                break;
            }
        }
        return true;
    }
    // 31、無訂閱者,不進行發(fā)送,返回false(代表沒有找到)。
    return false;
}

postSingleEventForEventType()方法,為事件的發(fā)送,兼容了多線程單發(fā)送單事件的情況。根據(jù)事件獲取所有訂閱它的Subscription(持有訂閱者對象、訂閱方法對象),然后進行遍歷依次發(fā)送。

EventBus --> postToSubscription

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
    // 32、根據(jù)訂閱方法ThreadMode調(diào)用訂閱方法
    switch (subscription.subscriberMethod.threadMode) {
        case POSTING:
            // 33、發(fā)布線程(默認)
            // 34、調(diào)用反射直接通知將事件
            invokeSubscriber(subscription, event);
            break;
        case MAIN:
            // 35、主線程
            if (isMainThread) {
                // 36、在Android上并且是主線程,或者不在Android上,調(diào)用反射直接通知。
                invokeSubscriber(subscription, event);
            } else {
                // 37、在Android上并且不是主線程,將事件添加到主線程隊列中,在主線程調(diào)用。
                mainThreadPoster.enqueue(subscription, event);
            }
            break;
        case MAIN_ORDERED:
            // 38、主線程(有序)
            if (mainThreadPoster != null) {
                // 39、在Android上,始終將事件添加到主線程隊列中,在主線程調(diào)用。
                mainThreadPoster.enqueue(subscription, event);
            } else {
                // 40、不在Android上,直接通過反射調(diào)用。
                invokeSubscriber(subscription, event);
            }
            break;
        case BACKGROUND:
            // 41、后臺線程
            if (isMainThread) {
                // 42、在Android上并且是主線程,或者不在Android上,則將事件添加到后臺線程隊列中,在后臺線程調(diào)用。
                backgroundPoster.enqueue(subscription, event);
            } else {
                // 43、在Android上并且不是主線程,直接通過反射調(diào)用。
                invokeSubscriber(subscription, event);
            }
            break;
        case ASYNC:
            // 44、異步線程
            // 45、將事件添加到異步線程隊列中,在異步線程調(diào)用。
            asyncPoster.enqueue(subscription, event);
            break;
        default:
            throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
    }
}

postToSubscription()方法,為事件的通知,兼容了多線程單發(fā)送單事件的情況。根據(jù)訂閱方法ThreadMode調(diào)用訂閱方法,如果需要線程切換,則切換線程進行調(diào)用;否則,直接調(diào)用。

說明:

  1. mainThreadPoster(即HandlerPoster)、backgroundPosterasyncPoster這3個Poster,最終都會調(diào)用invokeSubscriber()方法進行通知,詳細介紹看后面的-Poster。
  2. EventBus的注冊,如果是粘性方法并且已經(jīng)發(fā)送了此事件,也會調(diào)用postToSubscription()方法進行通知。

EventBus --> invokeSubscriber

void invokeSubscriber(Subscription subscription, Object event) {
    try {
        // 46、反射調(diào)用訂閱者對象的訂閱方法,并傳入event參數(shù)。
        subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
    } catch (InvocationTargetException e) {
        // 47、處理調(diào)用訂閱方法異常
        handleSubscriberException(subscription, event, e.getCause());
    } catch (IllegalAccessException e) {
        throw new IllegalStateException("Unexpected exception", e);
    }
}

private void handleSubscriberException(Subscription subscription, Object event, Throwable cause) {
    if (event instanceof SubscriberExceptionEvent) {
        // SubscriberExceptionEvent事件,判斷調(diào)用訂閱方法異常時是否打印Log(不發(fā)送事件防止遞歸)。
        if (logSubscriberExceptions) {
            // 調(diào)用訂閱方法異常時,打印異常信息。
            // Don't send another SubscriberExceptionEvent to avoid infinite event recursion, just log
            logger.log(Level.SEVERE, "SubscriberExceptionEvent subscriber " + subscription.subscriber.getClass()
                    + " threw an exception", cause);
            SubscriberExceptionEvent exEvent = (SubscriberExceptionEvent) event;
            logger.log(Level.SEVERE, "Initial event " + exEvent.causingEvent + " caused exception in "
                    + exEvent.causingSubscriber, exEvent.throwable);
        }
    } else {
        // 普通事件,判斷調(diào)用訂閱方法異常時是否拋出異常、打印Log、發(fā)送SubscriberExceptionEvent。
        if (throwSubscriberException) {
            // 調(diào)用訂閱方法異常時,拋出SubscriberException異常。
            throw new EventBusException("Invoking subscriber failed", cause);
        }
        if (logSubscriberExceptions) {
            // 調(diào)用訂閱方法異常時,打印異常信息。
            logger.log(Level.SEVERE, "Could not dispatch event: " + event.getClass() + " to subscribing class "
                    + subscription.subscriber.getClass(), cause);
        }
        if (sendSubscriberExceptionEvent) {
            // 調(diào)用訂閱方法異常時,發(fā)送SubscriberExceptionEvent事件。
            SubscriberExceptionEvent exEvent = new SubscriberExceptionEvent(this, cause, event,
                    subscription.subscriber);
            post(exEvent);
        }
    }
}

invokeSubscriber()方法,為反射調(diào)用訂閱者對象訂閱方法,并傳入event參數(shù)。handleSubscriberException()方法,為處理調(diào)用訂閱方法異常,判斷是否拋出異常打印Log、發(fā)送SubscriberExceptionEvent

線程模型

POSTING(發(fā)布線程)

事件的訂閱和事件的發(fā)布處于同一線程步驟34)。這是默認設(shè)置。事件傳遞意味著最少的開銷,因為它完全避免了線程切換。因此,對于已知在非常短的時間內(nèi)完成而不需要主線程的簡單任務(wù),這是推薦的模式。使用此模式的事件處理程序必須快速返回,以避免阻塞可能是主線程發(fā)布線程。

說明:

  1. 事件的發(fā)布(可能會阻塞Android主線程)
  • 事件的接收發(fā)布同一個線程,由于沒有使用線程池,所以發(fā)布一定阻塞線程
  1. 事件的接收(可能會阻塞Android主線程)
  • 事件的接收發(fā)布同一個線程,由于可能是主線程發(fā)送,所以接收可能阻塞主線程,所以接收不推薦執(zhí)行耗時操作。

MAIN(主線程)

Android上,訂閱者將在Android主線程(UI線程)中被調(diào)用。如果發(fā)布線程主線程,則將直接調(diào)用訂閱方法,從而阻塞發(fā)布線程步驟36)。否則,事件將排隊等待傳遞(非阻塞)(步驟37)。使用此模式的訂閱者必須快速返回以避免阻塞主線程。如果不是在Android上,則行為與POSTING相同(步驟36)。

說明:

  1. 事件的發(fā)布(一定會阻塞Android主線程)
  • Android上并且是主線程,或者不在Android上,則行為與POSTING相同(步驟36),所以發(fā)布一定阻塞線程。
  • Android上并且不是主線程,將事件插入到主線程隊列中(步驟37),由于使用了線程池,所以發(fā)布一定不會阻塞線程
  1. 事件的接收(一定會阻塞Android主線程)
  • 事件的接收主線程,所以接收一定阻塞主線程,所以接收不能執(zhí)行耗時操作。
  1. 事件的有序性(可能不是有序的)
  • Android上,如果先非主線程發(fā)布(排隊等待,可能會等待很長時間才會執(zhí)行到自己),后主線程發(fā)布(直接調(diào)用),所以在Android上事件它可能不是有序的。
  • 在非Android上,由于是直接調(diào)用,所以事件一定有序的。

MAIN_ORDERED(主線程-有序)

Android上,訂閱者將在Android主線程(UI線程)中被調(diào)用。與MAIN不同的是,事件總是排隊等待傳遞(步驟39)。這確保了post調(diào)用是非阻塞的。

說明:

  1. 事件的發(fā)布(一定不會阻塞Android主線程)
  • Android上,始終將事件添加到主線程隊列中,在主線程調(diào)用,由于使用Handler發(fā)布,所以發(fā)布一定不會阻塞線程
  • 不在Android上,則行為與POSTING相同,所以發(fā)布一定阻塞線程
  1. 事件的接收(一定會阻塞Android主線程)
  • 事件的接收主線程,所以接收一定阻塞主線程,所以接收不能執(zhí)行耗時操作(同MAIN)。
  1. 事件的有序性(一定是有序的)
  • Android上,始終添加到主線程隊列,所以事件一定有序的(步驟39)。
  • 在非Android上,由于是直接調(diào)用,所以事件一定有序的(步驟40)。

BACKGROUND(后臺)

Android上,訂閱者將在后臺線程中被調(diào)用。如果發(fā)布線程不是主線程,則將在發(fā)布線程中直接調(diào)用訂閱方法(步驟43)。如果發(fā)布線程主線程,則EventBus使用一個單獨后臺線程,它將按順序傳遞其所有事件(步驟42)。使用此模式的訂閱者應(yīng)該嘗試快速返回,以避免阻塞后臺線程。如果不在Android上,則始終使用后臺線程步驟42)。

說明:

  1. 事件的發(fā)布(一定不會阻塞Android主線程)
  • Android上并且是主線程,或者不在Android上,將事件插入到后臺線程隊列中(步驟42),由于使用了線程池,所以發(fā)布一定不會阻塞線程。
  • Android上并且不是主線程,則行為與POSTING相同(步驟43),所以發(fā)布一定阻塞線程。
  1. 事件的接收(可能會阻塞線程池線程)
  • Android上并且是主線程,使用backgroundPoster執(zhí)行(步驟42),由于backgroundPoster里判斷了如果正在執(zhí)行中,則不再創(chuàng)建線程執(zhí)行,而是用上次線程一塊執(zhí)行(詳細看下面的-Poster-backgroundPoster),所以接收可能阻塞后臺線程,所以接收不推薦執(zhí)行耗時操作。

ASYNC(異步)

訂閱者將在單獨線程中調(diào)用。這始終獨立發(fā)布線程主線程。使用此模式發(fā)布事件從不等待訂閱方法。如果訂閱方法的執(zhí)行可能需要一些時間(例如,用于網(wǎng)絡(luò)訪問),則應(yīng)使用此模式。避免同時觸發(fā)大量長時間運行的異步訂閱方法以限制并發(fā)線程的數(shù)量。EventBus使用線程池高效地重用已完成異步訂閱者通知中的線程。

說明:

  1. 事件的發(fā)布(一定不會阻塞Android主線程)
  • 訂閱者將在單獨線程中調(diào)用(步驟45),由于使用了線程池,所以發(fā)布一定不會阻塞線程。
  1. 事件的接收(不會阻塞線程池線程)
  • 訂閱者將在單獨線程中調(diào)用,使用asyncPoster執(zhí)行(步驟45),由于asyncPoster里執(zhí)行每次都在線程池里面使用新的線程執(zhí)行(詳細看下面的-Poster-asyncPoster),所以接收執(zhí)行耗時操作。

小結(jié)

  1. EventBuspost(),獲取當前線程事件隊列,將要發(fā)送的事件加入到其中。
  2. 遍歷事件隊列,只要有事件,就一直發(fā)送。
  3. 判斷事件是否有繼承性,如果有則根據(jù)事件類型,找到當前Event所有父類以及所有父接口class集合,遍歷這個集合,依次發(fā)送單個事件;否則,則只發(fā)送當前Event一個事件。
  4. 根據(jù)事件獲取所有訂閱它的Subscription集合,遍歷集合,將事件發(fā)送給訂閱者訂閱方法。
  5. 發(fā)送給訂閱者時,根據(jù)訂閱方法的線程模型調(diào)用訂閱方法,如果需要線程切換,則切換線程進行調(diào)用;否則,直接調(diào)用。

EventBus的postSticky

EventBus --> postSticky()

public void postSticky(Object event) {
    synchronized (stickyEvents) {
        // 1、將事件添加到粘性事件集合中
        stickyEvents.put(event.getClass(), event);
    }
    // 2、發(fā)送事件
    post(event);
}

說明:

  1. EventBuspost(),只會發(fā)送給已經(jīng)注冊訂閱者
  2. EventBuspostSticky(),會發(fā)送給已經(jīng)注冊訂閱者、之后要注冊訂閱者粘性訂閱方法

小結(jié)

  1. EventBuspostSticky(),將事件加入到stickyEvents集合中,當有新的訂閱者注冊后,如果該訂閱者訂閱了該事件粘性訂閱方法,則會發(fā)送此方法。
  2. 事件發(fā)送給已經(jīng)注冊訂閱者

EventBus的注銷

EventBus --> unregister()

public synchronized void unregister(Object subscriber) {
    // 1、獲取訂閱者訂閱的所有事件
    List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);
    if (subscribedTypes != null) {
        // 2、遍歷此訂閱者訂閱的所有事件集合
        for (Class<?> eventType : subscribedTypes) {
            // 3、通過eventType,將此事件的Subscription集合中含有該訂閱者的Subscription,從集合中移除。
            unsubscribeByEventType(subscriber, eventType);
        }
        // 4、將此訂閱者從typesBySubscriber中移除
        typesBySubscriber.remove(subscriber);
    } else {
        // 5、未注冊,調(diào)用了注銷,Log打印警告。
        logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());
    }
}

unregister()方法,為注銷訂閱者。從typesBySubscriber中移除自己,從subscriptionsByEventType中移除subscriber內(nèi)的多個事件類型。

EventBus --> unsubscribeByEventType()

private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {
    // 6、獲取該事件的所有訂閱者
    List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
    if (subscriptions != null) {
        int size = subscriptions.size();
        // 7、遍歷上面的集合,找到此訂閱者的Subscription,然后從此集合中移除。
        for (int i = 0; i < size; i++) {
            Subscription subscription = subscriptions.get(i);
            if (subscription.subscriber == subscriber) {
                // 8、找到,從此集合中移除。
                subscription.active = false;
                subscriptions.remove(i);
                i--;
                size--;
            }
        }
    }
}

unsubscribeByEventType()方法,為從subscriptionsByEventType中移除subscriber內(nèi)的單個事件類型。

說明:

  1. 可以先遍歷subscriptionsByEventType,然后再遍歷subscriptionsByEventType,最后匹配subscriber,通過這種方式可以找到要移除Subscription,但是這種實現(xiàn)要比上面實現(xiàn)效率低。

小結(jié)

  1. EventBus注銷,獲取訂閱者訂閱的所有事件,遍歷所有事件,通過事件subscriptionsByEventType中,獲取該事件所有Subscription,遍歷Subscription集合,找到此訂閱者Subscription,然后從此集合移除
  2. 將此訂閱者typesBySubscriber移除。

Poster

Poster

public interface Poster {

    void enqueue(Subscription subscription, Object event);
}

Poster接口,它有一個enqueue方法,將subscription、event入隊執(zhí)行。它有3個實現(xiàn)類,為HandlerPoster、BackgroundPosterAsyncPoster,我們來分別看一下。

mainThreadPoster

HandlerPoster

public class HandlerPoster extends Handler implements Poster {

    private final PendingPostQueue queue;
    private final int maxMillisInsideHandleMessage;
    private final EventBus eventBus;
    private boolean handlerActive;

    public HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
        // 這里傳入的是主線程的Looper對象,所以這個Handler對象是主線程的Handler
        super(looper);
        this.eventBus = eventBus;
        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
        // 創(chuàng)建一個事件隊列
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        // 根據(jù)傳入的參數(shù)封裝一個PendingPost對象
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            // 將PendingPost加入到隊列中
            // 發(fā)一個,執(zhí)行一個,再發(fā)送,如果已經(jīng)正在執(zhí)行中,則不再立即執(zhí)行,而是和上次一塊執(zhí)行
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                handlerActive = true;
                // 調(diào)用Handler的sendMessage,發(fā)送事件回到主線程,最終會調(diào)用下面的handleMessage方法
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }

    @Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            // 無限循環(huán)
            while (true) {
                PendingPost pendingPost = queue.poll(); // 獲取并移除第一個
                // 進行兩次檢查,確保pendingPost不為null,如果為null直接跳出循環(huán)
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            // 一直循環(huán)到queue結(jié)束,才返回
                            handlerActive = false;
                            return;
                        }
                    }
                }
                // 使用反射的方法調(diào)用訂閱者的訂閱方法。
                eventBus.invokeSubscriber(pendingPost);
                long timeInMethod = SystemClock.uptimeMillis() - started;
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }
}

HandlerPoster類繼承Handler,并實現(xiàn)Poster接口,enqueue方法將PendingPost加入到queue隊列,然后發(fā)送空消息,最終走到handleMessage方法內(nèi),其內(nèi)部判斷,只要queue隊列有數(shù)據(jù)就執(zhí)行,最后使用反射調(diào)用訂閱者訂閱方法。

backgroundPoster

BackgroundPoster

final class BackgroundPoster implements Runnable, Poster {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    private volatile boolean executorRunning;

    BackgroundPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        // 初始化隊列
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        // 封裝PendingPost對象
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            // 將PendingPost對象加入到隊列中
            // 發(fā)一個,執(zhí)行一個,再發(fā)送,如果已經(jīng)正在執(zhí)行中,則不再立即執(zhí)行,而是和上次一塊執(zhí)行
            queue.enqueue(pendingPost);
            if (!executorRunning) {
                executorRunning = true;
                // 這里使用到之前初始化的線程池
                eventBus.getExecutorService().execute(this);
            }
        }
    }

    // 線程池的執(zhí)行回調(diào)
    @Override
    public void run() {
        // 實現(xiàn)同HandlerPoster
        try {
            try {
                // 無限循環(huán)
                while (true) {
                    // 獲取隊列中的PendingPost,進行雙重檢查,如果為null直接返回,結(jié)束循環(huán)
                    PendingPost pendingPost = queue.poll(1000);
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                // 一直循環(huán)到queue結(jié)束,才返回
                                executorRunning = false;
                                return;
                            }
                        }
                    }
                    //使用反射的方式調(diào)用訂閱者的訂閱方法
                    eventBus.invokeSubscriber(pendingPost);
                }
            } catch (InterruptedException e) {
                eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);
            }
        } finally {
            executorRunning = false;
        }
    }

}

BackgroundPoster類實現(xiàn)Runnable接口,并實現(xiàn)Poster接口,enqueue方法將PendingPost加入到queue隊列,然后使用線程池執(zhí)行此Runnable,最終走到run方法內(nèi),其內(nèi)部判斷,只要queue隊列有數(shù)據(jù)就執(zhí)行,最后使用反射調(diào)用訂閱者訂閱方法。

asyncPoster

AsyncPoster

class AsyncPoster implements Runnable, Poster {

    private final PendingPostQueue queue;
    private final EventBus eventBus;

    AsyncPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        // 發(fā)一個,執(zhí)行一個
        queue.enqueue(pendingPost);
        eventBus.getExecutorService().execute(this);
    }

    @Override
    public void run() {
        PendingPost pendingPost = queue.poll();
        if(pendingPost == null) {
            throw new IllegalStateException("No pending post available");
        }
        eventBus.invokeSubscriber(pendingPost);
    }

}

AsyncPoster類實現(xiàn)Runnable接口,并實現(xiàn)Poster接口,enqueue方法將PendingPost加入到queue隊列,然后使用線程池執(zhí)行此Runnable,最終走到run方法內(nèi),其內(nèi)部,獲取queue隊列最新數(shù)據(jù)然后執(zhí)行,最后使用反射調(diào)用訂閱者訂閱方法。

說明:

  1. BackgroundPoster類和AsyncPoster線程執(zhí)行區(qū)別:BackgroundPoster類,如果正在執(zhí)行中,則用上一次的線程執(zhí)行,否則用新線程執(zhí)行。AsyncPoster類,始終用新的線程執(zhí)行。

小結(jié)

  1. Poster接口,它有3個實現(xiàn)類,為HandlerPoster、BackgroundPosterAsyncPoster。
  2. 這幾個類,enqueue方法將PendingPost(持有subscription對象、event對象)加入到queue隊列,然后切換指定的線程執(zhí)行,最后使用反射調(diào)用訂閱者訂閱方法。

SubscriberMethodFinder

SubscriberMethodFinder

class SubscriberMethodFinder {
    private static final int BRIDGE = 0x40;
    private static final int SYNTHETIC = 0x1000;

    private static final int MODIFIERS_IGNORE = Modifier.ABSTRACT | Modifier.STATIC | BRIDGE | SYNTHETIC;
    private static final Map<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();

    private List<SubscriberInfoIndex> subscriberInfoIndexes;
    private final boolean strictMethodVerification;
    private final boolean ignoreGeneratedIndex;

    private static final int POOL_SIZE = 4;
    private static final FindState[] FIND_STATE_POOL = new FindState[POOL_SIZE];

    SubscriberMethodFinder(List<SubscriberInfoIndex> subscriberInfoIndexes, boolean strictMethodVerification,
                           boolean ignoreGeneratedIndex) {
        this.subscriberInfoIndexes = subscriberInfoIndexes;
        this.strictMethodVerification = strictMethodVerification;
        this.ignoreGeneratedIndex = ignoreGeneratedIndex;
    }

    List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
        // 1、先從之前緩存的集合中獲取
        List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
        if (subscriberMethods != null) {
            // 2、如果之前緩存了,直接返回
            return subscriberMethods;
        }

        if (ignoreGeneratedIndex) {
            // 忽略注解處理器生成的索引,所以直接用反射獲取,ignoreGeneratedIndex默認為false。
            subscriberMethods = findUsingReflection(subscriberClass);
        } else {
            // 3、獲取所有訂閱方法集合
            subscriberMethods = findUsingInfo(subscriberClass);
        }
        if (subscriberMethods.isEmpty()) {
            throw new EventBusException("Subscriber " + subscriberClass
                    + " and its super classes have no public methods with the @Subscribe annotation");
        } else {
            // 4、放入緩存集合中
            METHOD_CACHE.put(subscriberClass, subscriberMethods);
            return subscriberMethods;
        }
    }

    private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
        // 5、從數(shù)組中獲取FindState對象,如果有直接返回,如果沒有創(chuàng)建一個新的FindState對象。
        FindState findState = prepareFindState();
        // 6、根據(jù)事件訂閱者初始化findState
        findState.initForSubscriber(subscriberClass);
        while (findState.clazz != null) {
            // 7、獲取subscriberInfo,初始化為null。
            findState.subscriberInfo = getSubscriberInfo(findState);
            if (findState.subscriberInfo != null) {
                // 通過注解處理器處理
                // 獲取到所有訂閱者方法
                SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
                for (SubscriberMethod subscriberMethod : array) {
                    if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
                        // 通過檢查,給findState.subscriberMethods增加訂閱者方法
                        findState.subscriberMethods.add(subscriberMethod);
                    }
                }
            } else {
                // 8、通過反射的方式獲取訂閱者中的Method
                findUsingReflectionInSingleClass(findState);
            }
            // 移動到父類,以便后續(xù)操作。
            findState.moveToSuperclass();
        }
        // 釋放
        return getMethodsAndRelease(findState);
    }

    // 從findState中獲取訂閱者所有方法并釋放
    private List<SubscriberMethod> getMethodsAndRelease(FindState findState) {
        // 獲取訂閱者所有訂閱方法集合
        List<SubscriberMethod> subscriberMethods = new ArrayList<>(findState.subscriberMethods);
        // findState進行回收
        findState.recycle();
        // 將回收后的findState,恢復到FIND_STATE_POOL中。
        synchronized (FIND_STATE_POOL) {
            for (int i = 0; i < POOL_SIZE; i++) {
                if (FIND_STATE_POOL[i] == null) {
                    FIND_STATE_POOL[i] = findState;
                    break;
                }
            }
        }
        // 返回集合
        return subscriberMethods;
    }

    // 準備FindState對象,先從緩存中獲取,如果有則獲取并在緩存中移除;如果沒有則進行創(chuàng)建并返回。
    private FindState prepareFindState() {
        synchronized (FIND_STATE_POOL) {
            for (int i = 0; i < POOL_SIZE; i++) {
                FindState state = FIND_STATE_POOL[i];
                if (state != null) {
                    FIND_STATE_POOL[i] = null;
                    return state;
                }
            }
        }
        return new FindState();
    }

    // 獲取訂閱者信息
    private SubscriberInfo getSubscriberInfo(FindState findState) {
        // 處理SubscriberInfoIndex添加手動創(chuàng)建的AbstractSubscriberInfo(一般不會手動創(chuàng)建)。
        // -findState.subscriberInfo != null:說明調(diào)用者調(diào)用findState.subscriberInfo = getSubscriberInfo(findState)的getSubscriberInfo(findState)返回的不為null,即subscriberInfoIndexes有值。
        // -findState.subscriberInfo.getSuperSubscriberInfo() != null:說明手動創(chuàng)建的AbstractSubscriberInfo,而不是注解處理器生成添加的SimpleSubscriberInfo。
        if (findState.subscriberInfo != null && findState.subscriberInfo.getSuperSubscriberInfo() != null) {
            SubscriberInfo superclassInfo = findState.subscriberInfo.getSuperSubscriberInfo();
            if (findState.clazz == superclassInfo.getSubscriberClass()) {
                // 要獲取的Class等于父類的Class,則返回父類的Class。
                return superclassInfo;
            }
        }
        // 處理addIndex()添加的SubscriberInfoIndex。
        if (subscriberInfoIndexes != null) {
            for (SubscriberInfoIndex index : subscriberInfoIndexes) {
                SubscriberInfo info = index.getSubscriberInfo(findState.clazz);
                if (info != null) {
                    return info;
                }
            }
        }
        return null;
    }

    // 通過反射獲取訂閱者方法
    private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {
        // 獲取FindState,有復用。
        FindState findState = prepareFindState();
        // 初始化FindState,因為有復用,所以得初始化。
        findState.initForSubscriber(subscriberClass);
        while (findState.clazz != null) {
            // 反射獲取,如果找到則存到findState.subscriberMethods里。
            findUsingReflectionInSingleClass(findState);
            // 移動到父類,會更改findState.clazz
            findState.moveToSuperclass();
        }
        return getMethodsAndRelease(findState);
    }

    private void findUsingReflectionInSingleClass(FindState findState) {
        Method[] methods;
        try {
            // 9、訂閱者中所有聲明的方法,放入數(shù)組中。
            // getDeclaredMethods,獲取的是當前類的所有方法。
            // getMethods,獲取的是當前類及其所有父類的所有公共方法。
            methods = findState.clazz.getDeclaredMethods();
            // 因為getDeclaredMethods,獲取的是當前類的所有方法,所以findState.skipSuperClasses為false,為不跳過父類,即繼續(xù)查找父類。
        } catch (Throwable th) {
            // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149
            try {
                // 10、獲取訂閱者中聲明的public方法,設(shè)置跳過父類
                // 因為getMethods,獲取的是當前類及其所有父類的所有公共方法,所以findState.skipSuperClasses為true,為跳過父類,即不查找父類。
                methods = findState.clazz.getMethods();
            } catch (LinkageError error) { // super class of NoClassDefFoundError to be a bit more broad...
                String msg = "Could not inspect methods of " + findState.clazz.getName();
                if (ignoreGeneratedIndex) {
                    // 忽略注解生成器生成的索引,提示:請考慮使用EventBus注釋處理器來避免反射。
                    msg += ". Please consider using EventBus annotation processor to avoid reflection.";
                } else {
                    // 忽略注解生成器生成的索引,提示:請使這個類對EventBus注釋處理器可見,以避免反射。
                    msg += ". Please make this class visible to EventBus annotation processor to avoid reflection.";
                }
                throw new EventBusException(msg, error);
            }
            findState.skipSuperClasses = true;
        }
        // 遍歷這些方法
        for (Method method : methods) {
            // 11、獲取方法的修飾符:public、private等等
            int modifiers = method.getModifiers();
            // 12、訂閱方法為public同時不是abstract、static、bridge、synthetic。
            if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                // 13、方法參數(shù)類型數(shù)組
                Class<?>[] parameterTypes = method.getParameterTypes();
                if (parameterTypes.length == 1) {
                    // 14、獲取方法的注解
                    Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
                    // 15、如果有注解
                    if (subscribeAnnotation != null) {
                        Class<?> eventType = parameterTypes[0];
                        // 16、將method和eventType放入到findState進行檢查
                        if (findState.checkAdd(method, eventType)) {
                            // 17、獲取注解中的threadMode對象
                            ThreadMode threadMode = subscribeAnnotation.threadMode();
                            // 18、新建一個SubscriberMethod對象,同時加入到findState中
                            findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
                                    subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
                        }
                    }
                } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                    // 非1個參數(shù),并且是嚴格模式,并且方法上有Subscribe注解,則拋異常警告。
                    String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                    throw new EventBusException("@Subscribe method " + methodName +
                            "must have exactly 1 parameter but has " + parameterTypes.length);
                }
            } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                // 方法的修飾符不符合,并且是嚴格模式,并且方法上有Subscribe注解,則拋異常警告。
                String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                throw new EventBusException(methodName +
                        " is a illegal @Subscribe method: must be public, non-static, and non-abstract");
            }
        }
    }
}

SubscriberMethodFinderfindSubscriberMethods方法,找到此訂閱者所有訂閱方法。它先從緩存中獲取,如果緩存中有,直接返回;如果緩存中沒有,通過反射注解處理器的方式獲取到此訂閱者所有訂閱方法,并放入集合中進行返回。

總結(jié)

以上就是全面的EventBus源碼了!之后會出其它三方庫源碼系列,請及時關(guān)注。如果你有什么問題,大家評論區(qū)見!

最后推薦一下我的網(wǎng)站,開發(fā)者的技術(shù)博客: devbolg.cn ,目前包含android相關(guān)的技術(shù),之后會面向全部開發(fā)者,歡迎大家來體驗!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容