很早之前有看過別人實現(xiàn)的 RxBus , 當初也只是隨意用用而已,沒有想過去研究。今天看到 brucezz 天哥在群里分享了一把,自己也加入了討論,下來還實踐了一把,所以想借此篇進入到源碼層,深刻體驗下 RxBus 這輛 “蘭博基尼” 的設計美感和獨特魅力。
不太清楚簡書怎么生成目錄,帶目錄版本可以在我博客看下。
RxBus
準備
關于簡單的實現(xiàn)和用法,這篇文章已經(jīng)很好的說明了。
推薦先看看 RxBus 的簡單實現(xiàn)和用法。
地址在這里:RxBus 的簡單實現(xiàn)
解剖

讓我們看看這輛車到底用了些什么?
Subject
SerializedSubject
PublishSubject
CompositeSubscription
從 Subject 開始發(fā)車
官方解釋
這是 Subject 的中文解釋:
Subject可以看成是一個橋梁或者代理,在某些ReactiveX實現(xiàn)中(如RxJava),它同時充當了Observer和Observable的角色。因為它是一個Observer,它可以訂閱一個或多個Observable;又因為它是一個Observable,它可以轉(zhuǎn)發(fā)它收到(Observe)的數(shù)據(jù),也可以發(fā)射新的數(shù)據(jù)。
由于一個Subject訂閱一個Observable,它可以觸發(fā)這個Observable開始發(fā)射數(shù)據(jù)(如果那個Observable是"冷"的--就是說,它等待有訂閱才開始發(fā)射數(shù)據(jù))。因此有這樣的效果,Subject可以把原來那個"冷"的Observable變成"熱"的。
Subject 源碼
源碼:
public abstract class Subject<T, R> extends Observable<R> implements Observer<T> {
protected Subject(OnSubscribe<R> onSubscribe) {
super(onSubscribe);
}
public abstract boolean hasObservers();
public final SerializedSubject<T, R> toSerialized() {
if (getClass() == SerializedSubject.class) {
return (SerializedSubject<T, R>)this;
}
return new SerializedSubject<T, R>(this);
}
Subject 只有兩個方法。
hasObservers()方法的解釋是:
Indicates whether the {@link Subject} has {@link Observer Observers} subscribed to it.
判斷 Subject 是否已經(jīng)有 observers 訂閱了 有則返回 ture
toSerialized() 方法的解釋是:
Wraps a {@link Subject} so that it is safe to call its various {@code on} methods from different threads.
包裝 Subject 后讓它可以安全的在不同線程中調(diào)用各種方法
為什么這個方法后就可以是線程安全了呢?
我們看到 toSerialized() 返回了 SerializedSubject<T, R> 。我們先到這里打住,稍后我們再看看該類做了什么。
PublishSubject 解釋

在 RxJava 里有一個抽象類 Subject,既是 Observable 又是 Observer,可以把 Subject 理解成一個管道或者轉(zhuǎn)發(fā)器,數(shù)據(jù)從一端輸入,然后從另一端輸出。
Subject 有好幾種,這里可以使用最簡單的 PublishSubject。訂閱之后,一旦數(shù)據(jù)從一端傳入,結(jié)果會里立刻從另一端輸出。
源碼里給了用法例子:
PublishSubject<Object> subject = PublishSubject.create();
// observer1 will receive all onNext and onCompleted events
subject.subscribe(observer1);
subject.onNext("one");
subject.onNext("two");
// observer2 will only receive "three" and onCompleted
subject.subscribe(observer2);
subject.onNext("three");
subject.onCompleted();
串行化
官方文檔推薦我們:
如果你把 Subject 當作一個 Subscriber 使用,注意不要從多個線程中調(diào)用它的onNext方法(包括其它的on系列方法),這可能導致同時(非順序)調(diào)用,這會違反Observable協(xié)議,給Subject的結(jié)果增加了不確定性。
要避免此類問題,你可以將 Subject 轉(zhuǎn)換為一個 SerializedSubject ,類似于這樣:
mySafeSubject = new SerializedSubject( myUnsafeSubject );
所以我們可以看到在 RxBus 初始化的時候我們做了這樣一件事情:
private final Subject<Object, Object> BUS;
private RxBus() {
BUS = new SerializedSubject<>(PublishSubject.create());
}
為了保證多線程的調(diào)用中結(jié)果的確定性,我們按照官方推薦將 Subject 轉(zhuǎn)換成了一個 SerializedSubject 。
SerializedSubject

該類同樣是 Subject 的子類,這里貼出該類的構(gòu)造方法。
private final SerializedObserver<T> observer;
private final Subject<T, R> actual;
public SerializedSubject(final Subject<T, R> actual) {
super(new OnSubscribe<R>() {
@Override
public void call(Subscriber<? super R> child) {
actual.unsafeSubscribe(child);
}
});
this.actual = actual;
this.observer = new SerializedObserver<T>(actual);
}
我們發(fā)現(xiàn),Subject 最后轉(zhuǎn)化成了 SerializedObserver.
SerializedObserver
When multiple threads are emitting and/or notifying they will be serialized by:
Allowing only one thread at a time to emit
Adding notifications to a queue if another thread is already emitting
Not holding any locks or blocking any threads while emitting
一次只會允許一個線程進行發(fā)送事物
如果其他線程已經(jīng)準備就緒,會通知給隊列
在發(fā)送事物中,不會持有任何鎖和阻塞任何線程

通過介紹可以知道是通過 notifications 來進行并發(fā)處理的。
SerializedObserver 類中
private final NotificationLite<T> nl = NotificationLite.instance();
重點看看 nl 在 onNext() 方法里的使用:
@Override
public void onNext(T t) {
// 省略一些代碼
for (;;) {
for (int i = 0; i < MAX_DRAIN_ITERATION; i++) {
FastList list;
synchronized (this) {
list = queue;
if (list == null) {
emitting = false;
return;
}
queue = null;
}
for (Object o : list.array) {
if (o == null) {
break;
}
// 這里的 accept() 方法
try {
if (nl.accept(actual, o)) {
terminated = true;
return;
}
} catch (Throwable e) {
terminated = true;
Exceptions.throwIfFatal(e);
actual.onError(OnErrorThrowable.addValueAsLastCause(e, t));
return;
}
}
}
}
}
NotificationLite
知道哪里具體調(diào)用了之后,我們再仔細看看 NotificationLite 。
先來了解它到底是什么:
For use in internal operators that need something like materialize and dematerialize wholly within the implementation of the operator but don't want to incur the allocation cost of actually creating {@link rx.Notification} objects for every {@link Observer#onNext onNext} and {@link Observer#onCompleted onCompleted}.
It's implemented as a singleton to maintain some semblance of type safety that is completely non-existent.
大致意思是:作為一個單例類保持這種完全不存在的安全類型的表象。

剛我們在 SerializedObserver 的 onNext() 方法中看到 nl.accept(actual, o)
所以我們再深入到 accept() 方法中:
public boolean accept(Observer<? super T> o, Object n) {
if (n == ON_COMPLETED_SENTINEL) {
o.onCompleted();
return true;
} else if (n == ON_NEXT_NULL_SENTINEL) {
o.onNext(null);
return false;
} else if (n != null) {
if (n.getClass() == OnErrorSentinel.class) {
o.onError(((OnErrorSentinel) n).e);
return true;
}
o.onNext((T) n);
return false;
} else {
throw new IllegalArgumentException("The lite notification can not be null");
}
}
Unwraps the lite notification and calls the appropriate method on the {@link Observer}.
判斷 lite 通知類別,通知 observer 執(zhí)行適當方法。
通過 NotificationLite 類圖可以看到有三個標識
- ON_NEXT_NULL_SENTINEL (onNext 標識)
- ON_COMPLETED_SENTINEL (onCompleted 標識)
- OnErrorSentinel (onError 標識)
與 Observer 回調(diào)一致。通過分析得知 accept() 就是通過標識來判斷,然后調(diào)用 Observer 相對應的方法。
CompositeSubscription
RxBus 這輛"蘭博基尼"與 CompositeSubscription 車間搭配更好。

構(gòu)造函數(shù):
private Set<Subscription> subscriptions;
private volatile boolean unsubscribed;
public CompositeSubscription() {
}
public CompositeSubscription(final Subscription... subscriptions) {
this.subscriptions = new HashSet<Subscription>(Arrays.asList(subscriptions));
}
內(nèi)部是初始化了一個 HashSet ,按照哈希算法來存取集合中的對象,存取速度比較快,并且沒有重復對象。
所以我們推薦在基類里實例化一個 CompositeSubscription 對象,使用 CompositeSubscription 來持有所有的 Subscriptions ,然后在 onDestroy()或者 onDestroyView()里取消所有的訂閱。
參考文章
- http://blog.csdn.net/lzyzsd/article/details/45033611
- https://mcxiaoke.gitbooks.io/rxdocs/content/Subject.html
熄火休息
能力有限,文章錯誤還望指出,有任何問題都歡迎討論 :)
轉(zhuǎn)載請注明出處。
最后送上我女神 Gakki , 開心最好 ( ′?v `? )?。
