RxBus的簡單和強大是基于RxJava技術(shù)的,RxJava天生就是類似sub/pub的觀察者模式,而且很容易處理線程切換。這就使得RxBus天然的支持事件總線。本文基于對RxJava的理解,自定義RxBus,并在目前一個相對輕量級的應(yīng)用中得到了驗證。
自定義RxBus
RxJava中有一種實體對象叫做Subject,官方的解釋是“A Subject is a sort of bridge or proxy that is available in some implementations of ReactiveX that acts both as an observer and as an Observable. Because it is an observer, it can subscribe to one or more Observables, and because it is an Observable, it can pass through the items it observes by reemitting them, and it can also emit new items.” 這句話的意思就是Subject既是observer(觀察者) 也是 Observable(訂閱者),作為observer,它可以訂閱多個Observable并觀察之;作為Observable,它可以被訂閱,并拋出事件。
按照不同的需求,Subject被設(shè)計有四種,分別為:AsyncSubject、BehaviorSubject、PublishSubject和ReplaySubject。這四種Subject的說明可查閱 http://reactivex.io/documentation/subject.html。
這里主要講述PublishSubject。
PublishSubject可以實現(xiàn)從哪里訂閱就從哪里開始發(fā)送數(shù)據(jù),這就保證了事件的訂閱和接收的有序。示意圖如下:


public class RxBus {
private static final String TAG = "RxBus";
public RxBus() {
}
private ConcurrentHashMap<Object, List<Subject>> subjectMapper = new ConcurrentHashMap<>();
public <T> Observable<T> register(@NonNull Object tag) {
List<Subject> subjectList = subjectMapper.get(tag);
if (null == subjectList) {
subjectList = new ArrayList<>();
subjectMapper.put(tag, subjectList);
}
Subject<T, T> subject = PublishSubject.create();
subjectList.add(subject);
Logger.t(TAG).i("[register]subjectMapper: %s", subjectMapper);
return subject;
}
@SuppressWarnings("unchecked")
public void unregister(@NonNull Object tag, @NonNull Observable observable) {
List<Subject> subjects = subjectMapper.get(tag);
if (null != subjects) {
subjects.remove(observable);
if (subjects.size() == 0) {
subjectMapper.remove(tag);
}
}
Logger.t(TAG).i("[unregister]subjectMapper: %s", subjectMapper);
}
@SuppressWarnings("unchecked")
public void post(@NonNull Object tag, @NonNull Object content) {
List<Subject> subjectList = subjectMapper.get(tag);
if (subjectList != null) {
for (Subject subject : subjectList) {
subject.onNext(content);
}
}
Logger.t(TAG).i("[send]subjectMapper: %s", subjectMapper);
}
}
代碼解讀:ConcurrentHashMap<Object, List<Subject>> subjectMapper 保存了所有的regist的 observer,每次regist就會PublishSubject.create()并存到該Map中;post事件之后,check subjectMapper是否有對應(yīng)的observer,有則取出執(zhí)行onNext()方法。
使用:
public class TestFragment extends BaseFragment {
@Inject
RxBus rxBus;
private Observable<Object> observable;
@Override
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
component().inject(this);
// 注冊事件
observable = rxBus.register(GlobalConfig.RXBUS_TAG_VERIFY_SUBMIT_SUC);
// 訂閱事件 observable.observeOn(AndroidSchedulers.mainThread())
.subscribe(new HttpResponseObserver<Object>() {
@Override
public void onCompleted() {
}
@Override
public void onNext(Object o) {
if (null != getActivity()) {
getActivity().finish();
}
}
});
}
}
發(fā)送事件: rxBus.post(GlobalConfig.RXBUS_TAG_VERIFY_SUBMIT_SUC, new Object());
EventBus與RxBus討論
一個完美的事件總線應(yīng)該具備哪些功能?
- 容易訂閱事件:事件訂閱者只要聲明自己就好了,當(dāng)事件發(fā)生時自然會被調(diào)到。訂閱和取消可以方便綁定到Activity和Fragment的生命周期上。
- 容易發(fā)送事件:事件發(fā)送者直接發(fā)送就好了,其他的事都不管。
- 方便的切換線程:有些事必須主線程干,有些事必須非主線程干,所以這個還是要說清楚。
- 性能:隨著應(yīng)用的成長,總線可能會被重度使用,性能一定要好。
EventBus在以上幾點做到了極致,其使用和性能都已經(jīng)被廣大開發(fā)者所驗證。其優(yōu)點是全面并且性能和分發(fā)效率都很好。
RxBus由于有RxJava的支持,在易用性和性能上都非常好。但是其在大型項目中,事件非常多的情況下,其分發(fā)效率和可靠性沒有得到足夠驗證,這一點不如EventBus。