[深入RxBus]:支持Sticky事件

RxBus、EventBus因?yàn)榻怦钐珡氐?,濫用的話,項(xiàng)目可維護(hù)性會(huì)越來越低;一些簡單場(chǎng)景更推薦用回調(diào)、Subject來代替事件總線。

實(shí)際使用場(chǎng)景,如果RxBus,EventBus二選一,我更傾向于使用EventBus, RxJava專注工作流,EventBus專注事件總線,職責(zé)更清晰

有段時(shí)間沒更了,幾個(gè)月前,我寫過一篇實(shí)現(xiàn)簡單的RxBus文章: 用RxJava實(shí)現(xiàn)事件總線。

在實(shí)際環(huán)境中,你會(huì)發(fā)現(xiàn)RxBus還是有一些問題的。

  • 你需要RxBus支持Sticky功能。
  • 你會(huì)發(fā)現(xiàn)在你訂閱了某個(gè)事件后,在后續(xù)接收到該事件時(shí),處理的過程中發(fā)生了異常,你可能會(huì)發(fā)現(xiàn)后續(xù)的事件都接收不到了!

我將分2篇文章分別給出其方案,這篇介紹如何實(shí)現(xiàn)Sticky,另外一篇介紹RxBus中的異常處理方案:
深入RxBus:[異常處理]


什么是Sticky事件?

在Android開發(fā)中,Sticky事件只指事件消費(fèi)者在事件發(fā)布之后才注冊(cè)的也能接收到該事件的特殊類型。Android中就有這樣的實(shí)例,也就是Sticky Broadcast,即粘性廣播。正常情況下如果發(fā)送者發(fā)送了某個(gè)廣播,而接收者在這個(gè)廣播發(fā)送后才注冊(cè)自己的Receiver,這時(shí)接收者便無法接收到剛才的廣播,為此Android引入了StickyBroadcast,在廣播發(fā)送結(jié)束后會(huì)保存剛剛發(fā)送的廣播(Intent),這樣當(dāng)接收者注冊(cè)完Receiver后就可以接收到剛才已經(jīng)發(fā)布的廣播。這就使得我們可以預(yù)先處理一些事件,讓有消費(fèi)者時(shí)再把這些事件投遞給消費(fèi)者。

Subject

我們?cè)趯?shí)現(xiàn)簡單的RxBus時(shí)使用了PublishSubject,其實(shí)RxJava提供給開發(fā)者4種Subject
PublishSubject,BehaviorSubject ,BehaviorSubject,AsyncSubject。

  1. PublishSubject只會(huì)給在訂閱者訂閱的時(shí)間點(diǎn)之后的數(shù)據(jù)發(fā)送給觀察者。
  • BehaviorSubject在訂閱者訂閱時(shí),會(huì)發(fā)送其最近發(fā)送的數(shù)據(jù)(如果此時(shí)還沒有收到任何數(shù)據(jù),它會(huì)發(fā)送一個(gè)默認(rèn)值)。

  • ReplaySubject在訂閱者訂閱時(shí),會(huì)發(fā)送所有的數(shù)據(jù)給訂閱者,無論它們是何時(shí)訂閱的。

  • AsyncSubject只在原Observable事件序列完成后,發(fā)送最后一個(gè)數(shù)據(jù),后續(xù)如果還有訂閱者繼續(xù)訂閱該Subject, 則可以直接接收到最后一個(gè)值。

Subject

從上圖來看,似乎BehaviorSubjectReplaySubject具備Sticky的特性。

BehaviorSubject方案

BehaviorSubject似乎完全符合Sticky的定義,但是你發(fā)現(xiàn)了它只能保存最近的那個(gè)事件。

有這樣的場(chǎng)景:如果訂閱者A訂閱了Event1,訂閱者B訂閱了Event2,而此時(shí)BehaviorSubject事件隊(duì)列里是[..., Event2, Event1],當(dāng)訂閱者訂閱時(shí),因?yàn)楸4娴氖亲罱氖录杭碋vent1,所以訂閱者B是接收不到Event2的。

解決辦法就是:
每個(gè)Event類型都各自創(chuàng)建一個(gè)對(duì)應(yīng)的BehaviorSubject,這樣的話資源開銷比較大,并且該Sticky事件總線和普通的RxBus事件總線不能共享,即:普通事件和Sticky事件是獨(dú)立的,因?yàn)槠胀ㄊ录腔?code>PublishSubject的, 暫時(shí)放棄該方案!

ReplaySubject方案

ReplaySubject可以保存發(fā)送過的所有數(shù)據(jù)事件。

因?yàn)楸4媪怂械臄?shù)據(jù)事件,所以不管什么類型的Event,我們只要過濾該類型,并讓其發(fā)送最近的那個(gè)Event即可滿足Sticky事件了。但是獲取最近的對(duì)應(yīng)事件是個(gè)難點(diǎn),因?yàn)樽罘闲枨蟮牟僮鞣?code>takeLast()僅在訂閱事件結(jié)束時(shí)(即:onCompleted())才會(huì)發(fā)送最后的那個(gè)數(shù)據(jù)事件,而我們的RxBus正常情況下應(yīng)該是盡量避免其訂閱事件結(jié)束的。(我沒能找到合適的操作符,如果你知道,請(qǐng)告知我)

所以BehaviorSubject也是比較難實(shí)現(xiàn)Sticky特性的。

并且,不管是BehaviorSubject還是ReplaySubject,它們還有一個(gè)棘手的問題:它們實(shí)現(xiàn)的EventBus和普通的RxBus(基于PublishSubject)之間的數(shù)據(jù)是相互獨(dú)立的!

總結(jié):BehaviorSubjectBehaviorSubject都不能天然適合Sticky事件......

使用Map實(shí)現(xiàn)Sticky

該方法思路是在原來PublishSubject實(shí)現(xiàn)的RxBus基礎(chǔ)上,使用ConcurrentHashMap<事件類型,事件>保存每個(gè)事件的最近事件,不僅能實(shí)現(xiàn)Sticky特性,最重要的是可以和普通RxBus的事件數(shù)據(jù)共享,不獨(dú)立。

因?yàn)槲覀兊腞xBus是基于PublishSubject的,而RxJava又有4種Subject,而且其中的BehaviorSubjectReplaySubject看起來又符合Sticky特性,所以我們可能會(huì)鉆這個(gè)牛角尖,理所當(dāng)然的認(rèn)為實(shí)現(xiàn)Sticky需要通過其他類型的Subject.... (好吧,我鉆進(jìn)去了...)

這個(gè)方案的思路我是根據(jù)EventBus的實(shí)現(xiàn)想到的,下面是大致流程:

  1. Map的初始化:
   private final Map<Class<?>, Object> mStickyEventMap;

    public RxBus() {
        mBus = new SerializedSubject<>(PublishSubject.create());
        mStickyEventMap = new ConcurrentHashMap<>();
    }

ConcurrentHashMap是一個(gè)線程安全的HashMap, 采用stripping lock(分離鎖),效率比HashTable高很多。

  • 在我們postSticky(Event)時(shí),存入Map中:
public void postSticky(Object event) {
     synchronized (mStickyEventMap) {
         mStickyEventMap.put(event.getClass(), event);
     } 
     post(event); 
}
  • 訂閱時(shí)toObservableSticky(Class<T> eventType),先從Map中尋找是否包含該類型的事件,如果沒有,則說明沒有Sticky事件要發(fā)送,直接訂閱Subject(此時(shí)作為被觀察者Observable);如果有,則說明有Sticky事件需要發(fā)送,訂閱merge(Subject 和 Sticky事件)。
  public <T> Observable<T> toObservableSticky(final Class<T> eventType) {
        synchronized (mStickyEventMap) {
            Observable<T> observable = mBus.ofType(eventType);
            final Object event = mStickyEventMap.get(eventType);

            if (event != null) {
                return observable.mergeWith(Observable.create(new Observable.OnSubscribe<T>() {
                    @Override
                    public void call(Subscriber<? super T> subscriber) {
                        subscriber.onNext(eventType.cast(event));
                    }
                }));
            } else {
                return observable;
            }
        }
    }

merge操作符:可以將多個(gè)Observables合并,就好像它們是單個(gè)的Observable一樣。

這樣,Sticky的核心功能就完成了,使用上和普通RxBus一樣,通過postSticky()發(fā)送事件,toObservableSticky()訂閱事件。

除此之外,我還提供了getStickyEvent(Class<T> eventType),removeStickyEvent(Class<T> eventType),removeAllStickyEvents()方法,供查找、移除對(duì)應(yīng)事件類型的事件、移除全部Sticky事件。

重要的事

在使用Sticky特性時(shí),在不需要某Sticky事件時(shí), 通過removeStickyEvent(Class<T> eventType)移除它,最保險(xiǎn)的做法是:在主Activity的onDestroyremoveAllStickyEvents()
因?yàn)槲覀兊腞xBus是個(gè)單例靜態(tài)對(duì)象,再正常退出app時(shí),該對(duì)象依然會(huì)存在于JVM,除非進(jìn)程被殺死,這樣的話導(dǎo)致StickyMap里的數(shù)據(jù)依然存在,為了避免該問題,需要在app退出時(shí),清理StickyMap。

// 主Activity(一般是棧底Activity)
@Override
protected void onDestroy() {
    super.onDestroy();
    // 移除所有Sticky事件
    RxBus.getDefault().removeAllStickyEvents();
}

完整代碼

下面是支持Sticky的完整RxBus代碼:

/**
 * PublishSubject: 只會(huì)把在訂閱發(fā)生的時(shí)間點(diǎn)之后來自原始Observable的數(shù)據(jù)發(fā)射給觀察者
 * <p>
 * Created by YoKeyword on 2015/6/17.
 */
public class RxBus {
    private static volatile RxBus mDefaultInstance;
    private final Subject<Object, Object> mBus;

    private final Map<Class<?>, Object> mStickyEventMap;

    public RxBus() {
        mBus = new SerializedSubject<>(PublishSubject.create());
        mStickyEventMap = new ConcurrentHashMap<>();
    }

    public static RxBus getDefault() {
        if (mDefaultInstance == null) {
            synchronized (RxBus.class) {
                if (mDefaultInstance == null) {
                    mDefaultInstance = new RxBus();
                }
            }
        }
        return mDefaultInstance;
    }

    /**
     * 發(fā)送事件
     */
    public void post(Object event) {
        mBus.onNext(event);
    }

    /**
     * 根據(jù)傳遞的 eventType 類型返回特定類型(eventType)的 被觀察者
     */
    public <T> Observable<T> toObservable(Class<T> eventType) {
        return mBus.ofType(eventType);
    }

    /**
     * 判斷是否有訂閱者
     */
    public boolean hasObservers() {
        return mBus.hasObservers();
    }

    public void reset() {
        mDefaultInstance = null;
    }

    /**
     * Stciky 相關(guān)
     */

    /**
     * 發(fā)送一個(gè)新Sticky事件
     */
    public void postSticky(Object event) {
        synchronized (mStickyEventMap) {
            mStickyEventMap.put(event.getClass(), event);
        }
        post(event);
    }

    /**
     * 根據(jù)傳遞的 eventType 類型返回特定類型(eventType)的 被觀察者
     */
    public <T> Observable<T> toObservableSticky(final Class<T> eventType) {
        synchronized (mStickyEventMap) {
            Observable<T> observable = mBus.ofType(eventType);
            final Object event = mStickyEventMap.get(eventType);

            if (event != null) {
                 return observable.mergeWith(Observable.create(new Observable.OnSubscribe<T>() {
                    @Override
                    public void call(Subscriber<? super T> subscriber) {
                        subscriber.onNext(eventType.cast(event));
                    }
                }));
            } else {
                return observable;
            }
        }
    }

    /**
     * 根據(jù)eventType獲取Sticky事件
     */
    public <T> T getStickyEvent(Class<T> eventType) {
        synchronized (mStickyEventMap) {
            return eventType.cast(mStickyEventMap.get(eventType));
        }
    }

    /**
     * 移除指定eventType的Sticky事件
     */
    public <T> T removeStickyEvent(Class<T> eventType) {
        synchronized (mStickyEventMap) {
            return eventType.cast(mStickyEventMap.remove(eventType));
        }
    }

    /**
     * 移除所有的Sticky事件
     */
    public void removeAllStickyEvents() {
        synchronized (mStickyEventMap) {
            mStickyEventMap.clear();
        }
    }
}

雖然使用了線程安全的ConCurrentHashMap,但是依然大量使用了synchronized,可以發(fā)現(xiàn)鎖住的是mStickyEventMap對(duì)象,這是為了保證對(duì)于讀、寫、查、刪同步,即讀時(shí)不能寫,寫時(shí)不能讀...

最后

附上一個(gè)Demo:

  • 提供了使用Sticky特性的示例
  • 異常處理的示例:讓其在發(fā)生異常后,仍能正確接收到后續(xù)的Event。

參考源碼,傳送門

Demo

參考:

ReactiveX: http://reactivex.io/
EventBus: https://github.com/greenrobot/EventBus

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容