組件間通信方案(五):事件總線EventBus源碼解析

getDefault 方法

我們先從 EventBus 的入口,getDefalut 方法入手:


public static EventBus getDefault() {

    if (defaultInstance == null) {

        synchronized (EventBus.class) {

            if (defaultInstance == null) {

                defaultInstance = new EventBus();

            }

        }

    }

    return defaultInstance;

}

從 getDefault 方法可以看出,EventBus 類是一個(gè)采用了 Double Check 的單例類。

我們接下來看到它的構(gòu)造函數(shù),它的無參構(gòu)造函數(shù) EventBus() 實(shí)際上是調(diào)用了 EventBus(EventBusBuilder) 這個(gè)有參構(gòu)造函數(shù)的,傳入的參數(shù)是 DEFAULT_BUILDER,而 DEFAULT_BUILDER 則是一個(gè)調(diào)用了 EventBusBuilder 默認(rèn)構(gòu)造器的對(duì)象??梢钥闯?,這里用到了 Builder 模式來支持用 EventBusBuilder 進(jìn)行一些配置。

下面我們看到 EventBus(EventBusBuilder):


EventBus(EventBusBuilder builder) {

    logger = builder.getLogger();

    subscriptionsByEventType = new HashMap<>();

    typesBySubscriber = new HashMap<>();

    stickyEvents = new ConcurrentHashMap<>();

    mainThreadSupport = builder.getMainThreadSupport();

    mainThreadPoster = mainThreadSupport != null ? mainThreadSupport.createPoster(this) : null;

    backgroundPoster = new BackgroundPoster(this);

    asyncPoster = new AsyncPoster(this);

    indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;

    subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,

            builder.strictMethodVerification, builder.ignoreGeneratedIndex);

    logSubscriberExceptions = builder.logSubscriberExceptions;

    logNoSubscriberMessages = builder.logNoSubscriberMessages;

    sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;

    sendNoSubscriberEvent = builder.sendNoSubscriberEvent;

    throwSubscriberException = builder.throwSubscriberException;

    eventInheritance = builder.eventInheritance;

    executorService = builder.executorService;

}

這個(gè)構(gòu)造方法中主要進(jìn)行的是一些容器的初始化以及將一些配置參數(shù)從 Builder 中取出。

register 方法

下面我們?cè)購?register 方法的角度進(jìn)行分析,看看 EventBus 在我們進(jìn)行 register 時(shí)做了一些什么事:


public void register(Object subscriber) {

    Class<?> subscriberClass = subscriber.getClass();

    List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);

    synchronized (this) {

        for (SubscriberMethod subscriberMethod : subscriberMethods) {

            subscribe(subscriber, subscriberMethod);

        }

    }

}

register 傳入的 register 是一個(gè) Object 類型,也就是任意類都可以向 EventBus 進(jìn)行 register。它首先獲取到了 subscriber 的類型信息,然后將其傳遞給了 subscriberMethodFinder 的 findSubscriberMethods() 方法。

通過名稱可以很容易看出,SubscriberMethodFinder 類是一個(gè)專門用來搜尋 subscriber 中含有 @Subscribe 注解的方法的類。這里進(jìn)行的操作就是將所有被 @Subscribe 標(biāo)記的方法都加入到 List 中。

而在找到了這些方法后,則一個(gè)個(gè)進(jìn)行遍歷,并將其執(zhí)行 subscribe操作。

也就是說 register 可以分為兩部分來看——搜尋訂閱

搜尋過程

我們先進(jìn)入 findSubscriberMethods() 看看它的搜尋過程


List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {

    List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);

    if (subscriberMethods != null) {

        return subscriberMethods;

    }

    if (ignoreGeneratedIndex) { // 1

        subscriberMethods = findUsingReflection(subscriberClass);

    } else {

        subscriberMethods = findUsingInfo(subscriberClass);

    }

    if (subscriberMethods.isEmpty()) {

        throw new EventBusException("Subscriber " + subscriberClass

                + " and its super classes have no public methods with the @Subscribe annotation");

    } else {

        METHOD_CACHE.put(subscriberClass, subscriberMethods);

        return subscriberMethods;

    }

}

可以分別看到 1 和 2 處的 if 語句,在 ignoreGeneratedIndex 為 true 時(shí),調(diào)用了 findUsingReflection 來使用反射搜尋方法。反之則調(diào)用了 findUsingInfo 方法進(jìn)行搜尋。ignoreGenereatedIndex 是用于標(biāo)記是否忽略由 Builder 傳入的 SubscriberInfoIndex。

直接搜尋


private List<SubscriberMethod> findUsingReflection(Class<?> subscriberClass) {

    FindState findState = prepareFindState();

    findState.initForSubscriber(subscriberClass);

    while (findState.clazz != null) {

        findUsingReflectionInSingleClass(findState);

        findState.moveToSuperclass();

    }

    return getMethodsAndRelease(findState);

}

可以看出,搜尋過程的結(jié)果是用了一個(gè)名為 FindState 的類進(jìn)行存儲(chǔ)的,它是 SubscriberMethodFinder 的一個(gè)內(nèi)部類。我們重點(diǎn)關(guān)注的是這里的 while 循環(huán),它先執(zhí)行了 findUsingReflectionInSingleClass 方法通過反射找到所有被 @Subscribe 標(biāo)注的方法,再通過 moveToSuperclass 向這個(gè)類的父類進(jìn)行搜尋。

查找

我們看到 findUsingReflectionInSingleClass 的實(shí)現(xiàn):


private void findUsingReflectionInSingleClass(FindState findState) {

    Method[] methods;

    try {

        // This is faster than getMethods, especially when subscribers are fat classes like Activities

        methods = findState.clazz.getDeclaredMethods();

    } catch (Throwable th) {

        // Workaround for java.lang.NoClassDefFoundError, see https://github.com/greenrobot/EventBus/issues/149

        methods = findState.clazz.getMethods();

        findState.skipSuperClasses = true;

    }

    for (Method method : methods) {

        int modifiers = method.getModifiers();

        if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {

            Class<?>[] parameterTypes = method.getParameterTypes();

            if (parameterTypes.length == 1) {

                Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);

                if (subscribeAnnotation != null) {

                    Class<?> eventType = parameterTypes[0];

                    if (findState.checkAdd(method, eventType)) {

                        ThreadMode threadMode = subscribeAnnotation.threadMode();

                        findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,

                                subscribeAnnotation.priority(), subscribeAnnotation.sticky()));

                    }

                }

            } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {

                String methodName = method.getDeclaringClass().getName() + "." + method.getName();

                throw new EventBusException("@Subscribe method " + methodName +

                        "must have exactly 1 parameter but has " + parameterTypes.length);

            }

        } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {

            String methodName = method.getDeclaringClass().getName() + "." + method.getName();

            throw new EventBusException(methodName +

                    " is a illegal @Subscribe method: must be public, non-static, and non-abstract");

        }

    }

}

這里的代碼比較長,我們慢慢分析。

首先進(jìn)行的是 Method 列表的獲取。這里通過注釋可以看出,使用 getDeclaredMethods() 方法其實(shí)是比 getMethods() 方法的效率更高的,但有時(shí)會(huì)導(dǎo)致 NoClassDefFoundError,此時(shí)采取備用方案,使用 getMethods() 進(jìn)行獲取。

之后遍歷 Method 數(shù)組,其中進(jìn)行了兩次校驗(yàn):第一次校驗(yàn)用于檢查被 @Subscribe 修飾的方法是否是 public、non-static、non-abstract 的。第二次檢查則是檢查其參數(shù)個(gè)數(shù)是否符合 1 的要求。當(dāng)不滿足時(shí)會(huì)拋出對(duì)應(yīng)異常,而滿足要求則會(huì)進(jìn)行 @Subscribe 注解的搜索。

當(dāng)找到了對(duì)應(yīng)注解后會(huì)進(jìn)行一次 checkAdd,在這個(gè)方法中會(huì)將 方法及其對(duì)應(yīng)的 Event 放入一個(gè) HashMap anyMethodByEventType,同時(shí)還會(huì)將方法的簽名(形式為 方法名>Event 類型名)及對(duì)應(yīng)方法放入另一個(gè) HashMap subscriberClassByMethodKey。

當(dāng) checkAdd 檢查沒有放入過這個(gè)方法及 Event 后,就會(huì)將方法的信息包裝為一個(gè) SubscriberMethod 類,然后放入我們需要的結(jié)果列表。

向上查找

當(dāng)查找完成后,會(huì)向調(diào)用 moveToSuperclass 方法向其父類進(jìn)行查詢,直到遇到了系統(tǒng)提供的庫。通過這種向上查找的方式可以使得 EventBus 支持繼承這一 OOP 特性。


void moveToSuperclass() {

    if (skipSuperClasses) {

        clazz = null;

    } else {

        clazz = clazz.getSuperclass();

        String clazzName = clazz.getName();

        /** Skip system classes, this just degrades performance. */

        if (clazzName.startsWith("java.") || clazzName.startsWith("javax.") || clazzName.startsWith("android.")) {

            clazz = null;

        }

    }

}

非直接搜尋

下面我們看到搜尋過程的另一種實(shí)現(xiàn):


private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {

    FindState findState = prepareFindState();

    findState.initForSubscriber(subscriberClass);

    while (findState.clazz != null) {

        findState.subscriberInfo = getSubscriberInfo(findState);

        if (findState.subscriberInfo != null) {

            SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();

            for (SubscriberMethod subscriberMethod : array) {

                if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {

                    findState.subscriberMethods.add(subscriberMethod);

                }

            }

        } else {

            findUsingReflectionInSingleClass(findState);

        }

        findState.moveToSuperclass();

    }

    return getMethodsAndRelease(findState);

}

這里可以看到,先通過 getSubscriberInfo 獲取到了由 Builder 過程傳入的 SubscriberInfoIndex,從中獲取需要的信息。當(dāng)其值為 null 時(shí),再使用反射搜尋的方式進(jìn)行搜尋。

而具體添加過程,則與直接搜尋中的方式類似,這里不再贅述。

FindState 復(fù)用

這里要注意,無論是直接搜索還是非直接搜索,它們所使用的 FindState 都是通過 prepareFindState() 來進(jìn)行獲取,同時(shí),它們的結(jié)果最后都會(huì)通過 getMethodsAndRelease(findState) 來進(jìn)行返回。其實(shí) SubscriberMethodFinder 類中維護(hù)了一個(gè) FindState 池,是一個(gè)默認(rèn)大小為 4 的數(shù)組,在 prepareFindState 中會(huì)遍歷數(shù)組找到非 null 的 FindState 進(jìn)行返回。而在 getMethodsAndRelease(findState) 中則是將搜尋的結(jié)果取出后,對(duì) FindState 進(jìn)行 recycle,之后再將其放回 FindState 池中。這種池的復(fù)用思想非常值得我們?cè)谠O(shè)計(jì)庫的時(shí)候?qū)W習(xí)。

訂閱過程

我們回到 register 方法中來:


public void register(Object subscriber) {

    Class<?> subscriberClass = subscriber.getClass();

    List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);

    synchronized (this) {

        for (SubscriberMethod subscriberMethod : subscriberMethods) {

            subscribe(subscriber, subscriberMethod);

        }

    }

}

可以看到 EventBus 為方法的訂閱過程進(jìn)行了加鎖,保證了線程安全。然后遍歷了每一個(gè)被標(biāo)記的方法,一一將其訂閱。

下面我們看看具體的訂閱流程,進(jìn)入 subscribe 方法:


private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {

    Class<?> eventType = subscriberMethod.eventType;

    Subscription newSubscription = new Subscription(subscriber, subscriberMethod);

    CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);

    if (subscriptions == null) {

        subscriptions = new CopyOnWriteArrayList<>();

        subscriptionsByEventType.put(eventType, subscriptions);

    } else {

        if (subscriptions.contains(newSubscription)) {

            throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "

                    + eventType);

        }

    }

    int size = subscriptions.size();

    for (int i = 0; i <= size; i++) {

        if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {

            subscriptions.add(i, newSubscription);

            break;

        }

    }

    List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);

    if (subscribedEvents == null) {

        subscribedEvents = new ArrayList<>();

        typesBySubscriber.put(subscriber, subscribedEvents);

    }

    subscribedEvents.add(eventType);

    if (subscriberMethod.sticky) {

        if (eventInheritance) {

            // Existing sticky events of all subclasses of eventType have to be considered.

            // Note: Iterating over all events may be inefficient with lots of sticky events,

            // thus data structure should be changed to allow a more efficient lookup

            // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).

            Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();

            for (Map.Entry<Class<?>, Object> entry : entries) {

                Class<?> candidateEventType = entry.getKey();

                if (eventType.isAssignableFrom(candidateEventType)) {

                    Object stickyEvent = entry.getValue();

                    checkPostStickyEventToSubscription(newSubscription, stickyEvent);

                }

            }

        } else {

            Object stickyEvent = stickyEvents.get(eventType);

            checkPostStickyEventToSubscription(newSubscription, stickyEvent);

        }

    }

}

這里的代碼也是很長...讓我們慢慢進(jìn)行分析。

先看看 Subcription,它就是一個(gè)將 Subcriber 及 SubcriberMethod 進(jìn)行包裝的類。

首先,這個(gè)方法進(jìn)行了一個(gè)安全檢查,檢查了同樣的方法是否已經(jīng)被注冊(cè)過,沒有則將其放入 Map 中,否則拋出異常。

之后再將其按照優(yōu)先級(jí)放入以該 Event 為參數(shù)的方法列表中。

然后,又將這個(gè)方法加入了以該 Subcriber 為 key,以它內(nèi)部所有被 @Subscribe 標(biāo)記的方法的列表為 value 的 Map typesBySubscriber中。

最后,進(jìn)行了一個(gè)對(duì) sticky 事件的特殊處理,如果為 sticky 事件則會(huì)在 register 時(shí)進(jìn)行一次對(duì) Method 的調(diào)用。主要邏輯是:首先判斷了 Event 是否子 Event,若是一個(gè)子 Event 則找到其父 Event 作為參數(shù) Event,否則將其作為參數(shù) Event,然后在判 null 的情況下調(diào)用 postToSubscription 方法來執(zhí)行這個(gè)方法。關(guān)于具體的執(zhí)行過程,會(huì)在下文中進(jìn)行講解。

post 方法

post 方法開始,就會(huì)涉及我們的 Event 的執(zhí)行過程了。這里根據(jù) register 的分析過程可以大概猜出方法應(yīng)該是由反射的方式被執(zhí)行的,下面讓我們進(jìn)入 post 方法的源碼看看是否是這樣:


public void post(Object event) {

    PostingThreadState postingState = currentPostingThreadState.get();

    List<Object> eventQueue = postingState.eventQueue;

    eventQueue.add(event);

    if (!postingState.isPosting) {

        postingState.isMainThread = isMainThread();

        postingState.isPosting = true;

        if (postingState.canceled) {

            throw new EventBusException("Internal error. Abort state was not reset");

        }

        try {

            while (!eventQueue.isEmpty()) {

                postSingleEvent(eventQueue.remove(0), postingState);

            }

        } finally {

            postingState.isPosting = false;

            postingState.isMainThread = false;

        }

    }

}

存放線程信息

首先,它獲取到了 currentPostingThreadState 這一個(gè)對(duì)象,它的類型是 PostingThreadState,這個(gè)類的主要用途是記錄事件的發(fā)布者的線程信息。


final static class PostingThreadState {

    final List<Object> eventQueue = new ArrayList<>();

    boolean isPosting;

    boolean isMainThread;

    Subscription subscription;

    Object event;

    boolean canceled;

}

我們看到 currentPostingThreadState 的創(chuàng)建過程:


private final ThreadLocal<PostingThreadState> currentPostingThreadState = new ThreadLocal<PostingThreadState>() {

    @Override

    protected PostingThreadState initialValue() {

        return new PostingThreadState();

    }

};

這里可以看到 currentPostingThreadState 是通過 ThreadLocal 進(jìn)行保存。ThreadLocal 是一個(gè)用于創(chuàng)建線程局部變量的類。它創(chuàng)建的變量只能被當(dāng)前線程訪問,其他線程則無法訪問和修改。

之后在 post 中進(jìn)行的操作就是為 currentPostingThreadState 填充各種線程相關(guān)信息了。


List<Object> eventQueue = postingState.eventQueue;

eventQueue.add(event);

if (!postingState.isPosting) {

    postingState.isMainThread = isMainThread();

    postingState.isPosting = true;

    if (postingState.canceled) {

        throw new EventBusException("Internal error. Abort state was not reset");

    }

    try {

        while (!eventQueue.isEmpty()) {

            postSingleEvent(eventQueue.remove(0), postingState);

        }

    } finally {

        postingState.isPosting = false;

        postingState.isMainThread = false;

    }

}

我們回到 post 中,可以發(fā)現(xiàn),在 currentPostingThreadState 中維護(hù)了一個(gè) eventQueue。

首先,將 Event 插入了 eventQueue 中,之后將 isMainThread 等信息進(jìn)行填充。同時(shí)將 postingState 的 isPosting 置為了 true,使得事件 post 的過程中當(dāng)前線程的其他 post 事件無法被相應(yīng),當(dāng) post 過程結(jié)束后,再將其置為 true。

之后,便開始遍歷 eventQueue, 將事件一個(gè)個(gè)出隊(duì)并執(zhí)行 postSingleEvent 方法,接下來我們看到 postSingleEvent 方法:


private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {

    Class<?> eventClass = event.getClass();

    boolean subscriptionFound = false;

    if (eventInheritance) {

        List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);

        int countTypes = eventTypes.size();

        for (int h = 0; h < countTypes; h++) {

            Class<?> clazz = eventTypes.get(h);

            subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);

        }

    } else {

        subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);

    }

    if (!subscriptionFound) {

        if (logNoSubscriberMessages) {

            logger.log(Level.FINE, "No subscribers registered for event " + eventClass);

        }

        if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&

                eventClass != SubscriberExceptionEvent.class) {

            post(new NoSubscriberEvent(this, event));

        }

    }

}

可以看到,這里通過 postSingleEventForEventType 來進(jìn)行搜尋對(duì)應(yīng) Subscription,如果 Event 是子 Event,則獲取它的所有父 Event 列表,再遍歷列表進(jìn)行搜尋。否則直接調(diào)用 postSingleEventForEventType 進(jìn)行搜尋。

搜尋 Subscription

下面我們看到 postSingleEventForEventType:


private boolean postSingleEventForEventType(Object event, PostingThreadState postingState, Class<?> eventClass) {

    CopyOnWriteArrayList<Subscription> subscriptions;

    synchronized (this) {

        subscriptions = subscriptionsByEventType.get(eventClass);

    }

    if (subscriptions != null && !subscriptions.isEmpty()) {

        for (Subscription subscription : subscriptions) {

            postingState.event = event;

            postingState.subscription = subscription;

            boolean aborted = false;

            try {

                postToSubscription(subscription, event, postingState.isMainThread);

                aborted = postingState.canceled;

            } finally {

                postingState.event = null;

                postingState.subscription = null;

                postingState.canceled = false;

            }

            if (aborted) {

                break;

            }

        }

        return true;

    }

    return false;

}

首先,這里根據(jù) event 找到了所有對(duì)應(yīng)的 Subscription,然后遍歷 subscription 列表調(diào)用 postToSubscription() 方法,這個(gè)方法在之前 register 的分析中針對(duì) sticky 事件的代碼中也有調(diào)用,它的作用是調(diào)用 event 對(duì)應(yīng)的方法。

未找到對(duì)應(yīng) Subscription 的處理

我們?cè)谶@里先不關(guān)心 postToSubscription 的具體實(shí)現(xiàn),先看看在 Subscription 調(diào)用結(jié)束后做了什么事,我們回到 postSingleEvent 方法中:


if (eventInheritance) {

    List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);

    int countTypes = eventTypes.size();

    for (int h = 0; h < countTypes; h++) {

        Class<?> clazz = eventTypes.get(h);

        subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);

    }

} else {

    subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);

}

if (!subscriptionFound) {

    if (logNoSubscriberMessages) {

        logger.log(Level.FINE, "No subscribers registered for event " + eventClass);

    }

    if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&

            eventClass != SubscriberExceptionEvent.class) {

        post(new NoSubscriberEvent(this, event));

    }

}

可以看到,如果在沒有找到對(duì)應(yīng)的 subscription,則會(huì)創(chuàng)建一個(gè) NoSubscriberEvent 再調(diào)用 post 請(qǐng)求。這樣,無論 Event 是否有 Subscriber,它都會(huì)進(jìn)行一次檢測。

執(zhí)行對(duì)應(yīng) Subscription

下面我們看到 postToSubscription,看看是如何調(diào)用 Event 對(duì)應(yīng)的方法的:


private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {

    switch (subscription.subscriberMethod.threadMode) {

        case POSTING:

            invokeSubscriber(subscription, event);

            break;

        case MAIN:

            if (isMainThread) {

                invokeSubscriber(subscription, event);

            } else {

                mainThreadPoster.enqueue(subscription, event);

            }

            break;

        case MAIN_ORDERED:

            if (mainThreadPoster != null) {

                mainThreadPoster.enqueue(subscription, event);

            } else {

                // temporary: technically not correct as poster not decoupled from subscriber

                invokeSubscriber(subscription, event);

            }

            break;

        case BACKGROUND:

            if (isMainThread) {

                backgroundPoster.enqueue(subscription, event);

            } else {

                invokeSubscriber(subscription, event);

            }

            break;

        case ASYNC:

            asyncPoster.enqueue(subscription, event);

            break;

        default:

            throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);

    }

}

可以看到,這里對(duì)訂閱者的線程進(jìn)行了判斷,采用不同的 Poster 進(jìn)行入隊(duì)操作,采用隊(duì)列的方式一一處理。對(duì)某些特殊情況則直接調(diào)用 invokeSubscriber(subscription, event) 進(jìn)行處理。關(guān)于 Poster 的設(shè)計(jì)我們?cè)谥筮M(jìn)行分析,在 Poster 內(nèi)部調(diào)用的是 invokeSubscriber(PendingPost) 方法。

invokeSubscriber(subscription, event)

我們先看到 invokeSubscriber(subscription, event) 方法:


void invokeSubscriber(Subscription subscription, Object event) {

    try {

        subscription.subscriberMethod.method.invoke(subscription.subscriber, event);

    } catch (InvocationTargetException e) {

        handleSubscriberException(subscription, event, e.getCause());

    } catch (IllegalAccessException e) {

        throw new IllegalStateException("Unexpected exception", e);

    }

}

果然,如我們之前猜測的一樣,它是通過反射來對(duì)方法進(jìn)行調(diào)用的,代碼很簡單,就不再贅述了。

invokeSubscriber(PendingPost)

下面我們看到 invokeSubscriber(PendingPost) 方法:


void invokeSubscriber(PendingPost pendingPost) {

    Object event = pendingPost.event;

    Subscription subscription = pendingPost.subscription;

    PendingPost.releasePendingPost(pendingPost);

    if (subscription.active) {

        invokeSubscriber(subscription, event);

    }

}

可以看到,它在內(nèi)部判斷了 Subscription 的 active 狀態(tài),如果為 active,再調(diào)用 invokeSubscriber(subscription, event) 方法對(duì) Method 進(jìn)行執(zhí)行。

這個(gè) active 狀態(tài)可以使得 unregister 的類中的對(duì)應(yīng) Method 不再被執(zhí)行。

postSticky 方法

這時(shí)你可能會(huì)想到,還有一個(gè)方法 postSticky 呢,我們接下來看到 postSticky 方法:


public void postSticky(Object event) {

    synchronized (stickyEvents) {

        stickyEvents.put(event.getClass(), event);

    }

    // Should be posted after it is putted, in case the subscriber wants to remove immediately

    post(event);

}

這里的邏輯十分簡單,先將 event 加入 stickyEvents 列表,再執(zhí)行 post 請(qǐng)求。

unregister 方法

我們接著看到 unregister 方法:


public synchronized void unregister(Object subscriber) {

    List<Class<?>> subscribedTypes = typesBySubscriber.get(subscriber);

    if (subscribedTypes != null) {

        for (Class<?> eventType : subscribedTypes) {

            unsubscribeByEventType(subscriber, eventType);

        }

        typesBySubscriber.remove(subscriber);

    } else {

        logger.log(Level.WARNING, "Subscriber to unregister was not registered before: " + subscriber.getClass());

    }

}

unregister 方法相對(duì)比較簡單,首先它先判斷了這個(gè) Event 是否還未注冊(cè),若已注冊(cè)則遍歷在 typesBySubscriber 中 Subscriber 所對(duì)應(yīng)的每一個(gè) EventType,并一一對(duì)其執(zhí)行 unsubscribeByEventType 方法取消訂閱,同時(shí) Subcriber 所對(duì)應(yīng)的信息從 typesBySubscriber 這個(gè) Map 中移除。

下面我們看看 unsubscribeByEventType 中具體做了什么:


private void unsubscribeByEventType(Object subscriber, Class<?> eventType) {

    List<Subscription> subscriptions = subscriptionsByEventType.get(eventType);

    if (subscriptions != null) {

        int size = subscriptions.size();

        for (int i = 0; i < size; i++) {

            Subscription subscription = subscriptions.get(i);

            if (subscription.subscriber == subscriber) {

                subscription.active = false;

                subscriptions.remove(i);

                i--;

                size--;

            }

        }

    }

}

可以看到,它是將 subscriptionsByEventType 中與 EventType 對(duì)應(yīng)的 Subscription 列表取出,遍歷并將該 Subcriber 對(duì)應(yīng)的 Subscription 的 active 標(biāo)記為 false 并刪除。

Poster

前面提到了 post 方法中會(huì)用到 Poster 來進(jìn)行方法的按隊(duì)列執(zhí)行,EventBus 中定義了一種 Poster 這個(gè)接口,用于處理 post 后方法執(zhí)行的調(diào)度。


/**

* Posts events.

*

* @author William Ferguson

*/

interface Poster {

    /**

    * Enqueue an event to be posted for a particular subscription

    *

    * @param subscription Subscription which will receive the eve

    * @param event        Event that will be posted to subscriber

    */

    void enqueue(Subscription subscription, Object event);

}

可以看到,它內(nèi)部只有一個(gè)方法 enqueue,用于將 Event 放入待定隊(duì)列。

在 EventBus 中的 Poster 用到了三種,我們分別介紹:

HandlerPoster

HandlerPoster 是默認(rèn)情況下 mainThreadPoster 的類型,它的內(nèi)部是使用 Handler 實(shí)現(xiàn)進(jìn)程的調(diào)度,主要關(guān)注其 handleMessage 的實(shí)現(xiàn):


@Override

public void handleMessage(Message msg) {

    boolean rescheduled = false;

    try {

        long started = SystemClock.uptimeMillis();

        while (true) {

            PendingPost pendingPost = queue.poll();

            if (pendingPost == null) {

                synchronized (this) {

                    // Check again, this time in synchronized

                    pendingPost = queue.poll();

                    if (pendingPost == null) {

                        handlerActive = false;

                        return;

                    }

                }

            }

            eventBus.invokeSubscriber(pendingPost);

            long timeInMethod = SystemClock.uptimeMillis() - started;

            if (timeInMethod >= maxMillisInsideHandleMessage) {

                if (!sendMessage(obtainMessage())) {

                    throw new EventBusException("Could not send handler message");

                }

                rescheduled = true;

                return;

            }

        }

    } finally {

        handlerActive = rescheduled;

    }

}

可以看到,內(nèi)部是通過一個(gè)死循環(huán)遍歷 PendingPost 的隊(duì)列,分別對(duì)其執(zhí)行 invokeSubscriber。

AsyncPoster

AsyncPoster 對(duì)應(yīng)了 EventBus 中的 asyncPoster,下面是它的代碼:


class AsyncPoster implements Runnable, Poster {

    private final PendingPostQueue queue;

    private final EventBus eventBus;

    AsyncPoster(EventBus eventBus) {

        this.eventBus = eventBus;

        queue = new PendingPostQueue();

    }

    public void enqueue(Subscription subscription, Object event) {

        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);

        queue.enqueue(pendingPost);

        eventBus.getExecutorService().execute(this);

    }

    @Override

    public void run() {

        PendingPost pendingPost = queue.poll();

        if(pendingPost == null) {

            throw new IllegalStateException("No pending post available");

        }

        eventBus.invokeSubscriber(pendingPost);

    }

}

它的代碼很簡單,通過 EventBus 的線程池中取出一個(gè)線程并在該線程中調(diào)用 invokeSubscriber 方法。

BackgroundPoster

BackgroundPoster 對(duì)應(yīng) EventBus 中的 backgroundPoster,下面是它的代碼:


final class BackgroundPoster implements Runnable, Poster {

    private final PendingPostQueue queue;

    private final EventBus eventBus;

    private volatile boolean executorRunning;

    BackgroundPoster(EventBus eventBus) {

        this.eventBus = eventBus;

        queue = new PendingPostQueue();

    }

    public void enqueue(Subscription subscription, Object event) {

        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);

        synchronized (this) {

            queue.enqueue(pendingPost);

            if (!executorRunning) {

                executorRunning = true;

                eventBus.getExecutorService().execute(this);

            }

        }

    }

    @Override

    public void run() {

        try {

            try {

                while (true) {

                    PendingPost pendingPost = queue.poll(1000);

                    if (pendingPost == null) {

                        synchronized (this) {

                            // Check again, this time in synchronized

                            pendingPost = queue.poll();

                            if (pendingPost == null) {

                                executorRunning = false;

                                return;

                            }

                        }

                    }

                    eventBus.invokeSubscriber(pendingPost);

                }

            } catch (InterruptedException e) {

                eventBus.getLogger().log(Level.WARNING, Thread.currentThread().getName() + " was interruppted", e);

            }

        } finally {

            executorRunning = false;

        }

    }

}

這段代碼比較長,和 AsyncPoster 有些類似,它使用了 synchronized 以保證線程安全,在 run 方法中遍歷了 PendingPost 列表,取出并執(zhí)行 Event。

PendingPost 池

PendingPost 內(nèi)部也是維護(hù)了一個(gè) PendingPost 池的,上面的幾個(gè) Poster 都是調(diào)用 PendingPost.obtainPendingPost 方法從其中取出 PendingPost,它的代碼如下:


final class PendingPost {

    private final static List<PendingPost> pendingPostPool = new ArrayList<PendingPost>();

    Object event;

    Subscription subscription;

    PendingPost next;

    private PendingPost(Object event, Subscription subscription) {

        this.event = event;

        this.subscription = subscription;

    }

    static PendingPost obtainPendingPost(Subscription subscription, Object event) {

        synchronized (pendingPostPool) {

            int size = pendingPostPool.size();

            if (size > 0) {

                PendingPost pendingPost = pendingPostPool.remove(size - 1);

                pendingPost.event = event;

                pendingPost.subscription = subscription;

                pendingPost.next = null;

                return pendingPost;

            }

        }

        return new PendingPost(event, subscription);

    }

    static void releasePendingPost(PendingPost pendingPost) {

        pendingPost.event = null;

        pendingPost.subscription = null;

        pendingPost.next = null;

        synchronized (pendingPostPool) {

            // Don't let the pool grow indefinitely

            if (pendingPostPool.size() < 10000) {

                pendingPostPool.add(pendingPost);

            }

        }

    }

}

原理比較簡單,就不再分析了。

總結(jié)

通過這次源碼解析,收獲了很多,果然還是自己去看源碼才能對(duì)這些庫有更深層的了解。

EventBus 實(shí)際上就是一個(gè)基于反射實(shí)現(xiàn)的『公告牌』,發(fā)布者可以將各種類型的公告放置到公告牌中,而訂閱者可以訂閱某種類型的公告,當(dāng)公告牌上出現(xiàn)了這種類型的公告時(shí),便會(huì)從上面取出并通知訂閱者。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容