RxBus、EventBus因?yàn)榻怦钐珡氐?,濫用的話,項(xiàng)目可維護(hù)性會(huì)越來越低;一些簡單場(chǎng)景更推薦用回調(diào)、Subject來代替事件總線。
實(shí)際使用場(chǎng)景,如果RxBus,EventBus二選一,我更傾向于使用EventBus, RxJava專注工作流,EventBus專注事件總線,職責(zé)更清晰
有段時(shí)間沒更了,幾個(gè)月前,我寫過一篇實(shí)現(xiàn)簡單的RxBus文章: 用RxJava實(shí)現(xiàn)事件總線。
在實(shí)際環(huán)境中,你會(huì)發(fā)現(xiàn)RxBus還是有一些問題的。
- 你需要RxBus支持Sticky功能。
- 你會(huì)發(fā)現(xiàn)在你訂閱了某個(gè)事件后,在后續(xù)接收到該事件時(shí),處理的過程中發(fā)生了異常,你可能會(huì)發(fā)現(xiàn)后續(xù)的事件都接收不到了!
我將分2篇文章分別給出其方案,這篇介紹如何實(shí)現(xiàn)Sticky,另外一篇介紹RxBus中的異常處理方案:
深入RxBus:[異常處理]
什么是Sticky事件?
在Android開發(fā)中,Sticky事件只指事件消費(fèi)者在事件發(fā)布之后才注冊(cè)的也能接收到該事件的特殊類型。Android中就有這樣的實(shí)例,也就是Sticky Broadcast,即粘性廣播。正常情況下如果發(fā)送者發(fā)送了某個(gè)廣播,而接收者在這個(gè)廣播發(fā)送后才注冊(cè)自己的Receiver,這時(shí)接收者便無法接收到剛才的廣播,為此Android引入了StickyBroadcast,在廣播發(fā)送結(jié)束后會(huì)保存剛剛發(fā)送的廣播(Intent),這樣當(dāng)接收者注冊(cè)完Receiver后就可以接收到剛才已經(jīng)發(fā)布的廣播。這就使得我們可以預(yù)先處理一些事件,讓有消費(fèi)者時(shí)再把這些事件投遞給消費(fèi)者。
Subject
我們?cè)趯?shí)現(xiàn)簡單的RxBus時(shí)使用了PublishSubject,其實(shí)RxJava提供給開發(fā)者4種Subject:
PublishSubject,BehaviorSubject ,BehaviorSubject,AsyncSubject。
-
PublishSubject只會(huì)給在訂閱者訂閱的時(shí)間點(diǎn)之后的數(shù)據(jù)發(fā)送給觀察者。
BehaviorSubject在訂閱者訂閱時(shí),會(huì)發(fā)送其最近發(fā)送的數(shù)據(jù)(如果此時(shí)還沒有收到任何數(shù)據(jù),它會(huì)發(fā)送一個(gè)默認(rèn)值)。ReplaySubject在訂閱者訂閱時(shí),會(huì)發(fā)送所有的數(shù)據(jù)給訂閱者,無論它們是何時(shí)訂閱的。AsyncSubject只在原Observable事件序列完成后,發(fā)送最后一個(gè)數(shù)據(jù),后續(xù)如果還有訂閱者繼續(xù)訂閱該Subject, 則可以直接接收到最后一個(gè)值。

從上圖來看,似乎BehaviorSubject和ReplaySubject具備Sticky的特性。
BehaviorSubject方案
BehaviorSubject似乎完全符合Sticky的定義,但是你發(fā)現(xiàn)了它只能保存最近的那個(gè)事件。
有這樣的場(chǎng)景:如果訂閱者A訂閱了Event1,訂閱者B訂閱了Event2,而此時(shí)BehaviorSubject事件隊(duì)列里是[..., Event2, Event1],當(dāng)訂閱者訂閱時(shí),因?yàn)楸4娴氖亲罱氖录杭碋vent1,所以訂閱者B是接收不到Event2的。
解決辦法就是:
每個(gè)Event類型都各自創(chuàng)建一個(gè)對(duì)應(yīng)的BehaviorSubject,這樣的話資源開銷比較大,并且該Sticky事件總線和普通的RxBus事件總線不能共享,即:普通事件和Sticky事件是獨(dú)立的,因?yàn)槠胀ㄊ录腔?code>PublishSubject的, 暫時(shí)放棄該方案!
ReplaySubject方案
ReplaySubject可以保存發(fā)送過的所有數(shù)據(jù)事件。
因?yàn)楸4媪怂械臄?shù)據(jù)事件,所以不管什么類型的Event,我們只要過濾該類型,并讓其發(fā)送最近的那個(gè)Event即可滿足Sticky事件了。但是獲取最近的對(duì)應(yīng)事件是個(gè)難點(diǎn),因?yàn)樽罘闲枨蟮牟僮鞣?code>takeLast()僅在訂閱事件結(jié)束時(shí)(即:onCompleted())才會(huì)發(fā)送最后的那個(gè)數(shù)據(jù)事件,而我們的RxBus正常情況下應(yīng)該是盡量避免其訂閱事件結(jié)束的。(我沒能找到合適的操作符,如果你知道,請(qǐng)告知我)
所以BehaviorSubject也是比較難實(shí)現(xiàn)Sticky特性的。
并且,不管是BehaviorSubject還是ReplaySubject,它們還有一個(gè)棘手的問題:它們實(shí)現(xiàn)的EventBus和普通的RxBus(基于PublishSubject)之間的數(shù)據(jù)是相互獨(dú)立的!
總結(jié):BehaviorSubject和BehaviorSubject都不能天然適合Sticky事件......
使用Map實(shí)現(xiàn)Sticky
該方法思路是在原來PublishSubject實(shí)現(xiàn)的RxBus基礎(chǔ)上,使用ConcurrentHashMap<事件類型,事件>保存每個(gè)事件的最近事件,不僅能實(shí)現(xiàn)Sticky特性,最重要的是可以和普通RxBus的事件數(shù)據(jù)共享,不獨(dú)立。
因?yàn)槲覀兊腞xBus是基于
PublishSubject的,而RxJava又有4種Subject,而且其中的BehaviorSubject和ReplaySubject看起來又符合Sticky特性,所以我們可能會(huì)鉆這個(gè)牛角尖,理所當(dāng)然的認(rèn)為實(shí)現(xiàn)Sticky需要通過其他類型的Subject.... (好吧,我鉆進(jìn)去了...)
這個(gè)方案的思路我是根據(jù)EventBus的實(shí)現(xiàn)想到的,下面是大致流程:
- Map的初始化:
private final Map<Class<?>, Object> mStickyEventMap;
public RxBus() {
mBus = new SerializedSubject<>(PublishSubject.create());
mStickyEventMap = new ConcurrentHashMap<>();
}
ConcurrentHashMap是一個(gè)線程安全的HashMap, 采用stripping lock(分離鎖),效率比HashTable高很多。
- 在我們
postSticky(Event)時(shí),存入Map中:
public void postSticky(Object event) {
synchronized (mStickyEventMap) {
mStickyEventMap.put(event.getClass(), event);
}
post(event);
}
- 訂閱時(shí)
toObservableSticky(Class<T> eventType),先從Map中尋找是否包含該類型的事件,如果沒有,則說明沒有Sticky事件要發(fā)送,直接訂閱Subject(此時(shí)作為被觀察者Observable);如果有,則說明有Sticky事件需要發(fā)送,訂閱merge(Subject 和 Sticky事件)。
public <T> Observable<T> toObservableSticky(final Class<T> eventType) {
synchronized (mStickyEventMap) {
Observable<T> observable = mBus.ofType(eventType);
final Object event = mStickyEventMap.get(eventType);
if (event != null) {
return observable.mergeWith(Observable.create(new Observable.OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> subscriber) {
subscriber.onNext(eventType.cast(event));
}
}));
} else {
return observable;
}
}
}
merge操作符:可以將多個(gè)Observables合并,就好像它們是單個(gè)的Observable一樣。
這樣,Sticky的核心功能就完成了,使用上和普通RxBus一樣,通過postSticky()發(fā)送事件,toObservableSticky()訂閱事件。
除此之外,我還提供了getStickyEvent(Class<T> eventType),removeStickyEvent(Class<T> eventType),removeAllStickyEvents()方法,供查找、移除對(duì)應(yīng)事件類型的事件、移除全部Sticky事件。
重要的事
在使用Sticky特性時(shí),在不需要某Sticky事件時(shí), 通過removeStickyEvent(Class<T> eventType)移除它,最保險(xiǎn)的做法是:在主Activity的onDestroy里removeAllStickyEvents()。
因?yàn)槲覀兊腞xBus是個(gè)單例靜態(tài)對(duì)象,再正常退出app時(shí),該對(duì)象依然會(huì)存在于JVM,除非進(jìn)程被殺死,這樣的話導(dǎo)致StickyMap里的數(shù)據(jù)依然存在,為了避免該問題,需要在app退出時(shí),清理StickyMap。
// 主Activity(一般是棧底Activity)
@Override
protected void onDestroy() {
super.onDestroy();
// 移除所有Sticky事件
RxBus.getDefault().removeAllStickyEvents();
}
完整代碼
下面是支持Sticky的完整RxBus代碼:
/**
* PublishSubject: 只會(huì)把在訂閱發(fā)生的時(shí)間點(diǎn)之后來自原始Observable的數(shù)據(jù)發(fā)射給觀察者
* <p>
* Created by YoKeyword on 2015/6/17.
*/
public class RxBus {
private static volatile RxBus mDefaultInstance;
private final Subject<Object, Object> mBus;
private final Map<Class<?>, Object> mStickyEventMap;
public RxBus() {
mBus = new SerializedSubject<>(PublishSubject.create());
mStickyEventMap = new ConcurrentHashMap<>();
}
public static RxBus getDefault() {
if (mDefaultInstance == null) {
synchronized (RxBus.class) {
if (mDefaultInstance == null) {
mDefaultInstance = new RxBus();
}
}
}
return mDefaultInstance;
}
/**
* 發(fā)送事件
*/
public void post(Object event) {
mBus.onNext(event);
}
/**
* 根據(jù)傳遞的 eventType 類型返回特定類型(eventType)的 被觀察者
*/
public <T> Observable<T> toObservable(Class<T> eventType) {
return mBus.ofType(eventType);
}
/**
* 判斷是否有訂閱者
*/
public boolean hasObservers() {
return mBus.hasObservers();
}
public void reset() {
mDefaultInstance = null;
}
/**
* Stciky 相關(guān)
*/
/**
* 發(fā)送一個(gè)新Sticky事件
*/
public void postSticky(Object event) {
synchronized (mStickyEventMap) {
mStickyEventMap.put(event.getClass(), event);
}
post(event);
}
/**
* 根據(jù)傳遞的 eventType 類型返回特定類型(eventType)的 被觀察者
*/
public <T> Observable<T> toObservableSticky(final Class<T> eventType) {
synchronized (mStickyEventMap) {
Observable<T> observable = mBus.ofType(eventType);
final Object event = mStickyEventMap.get(eventType);
if (event != null) {
return observable.mergeWith(Observable.create(new Observable.OnSubscribe<T>() {
@Override
public void call(Subscriber<? super T> subscriber) {
subscriber.onNext(eventType.cast(event));
}
}));
} else {
return observable;
}
}
}
/**
* 根據(jù)eventType獲取Sticky事件
*/
public <T> T getStickyEvent(Class<T> eventType) {
synchronized (mStickyEventMap) {
return eventType.cast(mStickyEventMap.get(eventType));
}
}
/**
* 移除指定eventType的Sticky事件
*/
public <T> T removeStickyEvent(Class<T> eventType) {
synchronized (mStickyEventMap) {
return eventType.cast(mStickyEventMap.remove(eventType));
}
}
/**
* 移除所有的Sticky事件
*/
public void removeAllStickyEvents() {
synchronized (mStickyEventMap) {
mStickyEventMap.clear();
}
}
}
雖然使用了線程安全的ConCurrentHashMap,但是依然大量使用了synchronized,可以發(fā)現(xiàn)鎖住的是mStickyEventMap對(duì)象,這是為了保證對(duì)于讀、寫、查、刪同步,即讀時(shí)不能寫,寫時(shí)不能讀...
最后
附上一個(gè)Demo:
- 提供了使用Sticky特性的示例
- 異常處理的示例:讓其在發(fā)生異常后,仍能正確接收到后續(xù)的Event。

參考:
ReactiveX: http://reactivex.io/
EventBus: https://github.com/greenrobot/EventBus