概述
本文將盡可能將RxJava中的Subject相關(guān)類的用法做一個(gè)闡述,并對(duì)其原理進(jìn)行簡單的解析。
說到Subject,很多人可能都不是很熟悉它,因?yàn)橄鄬?duì)于RxJava的Observable、Schedulers、Subscribes等關(guān)鍵字來講,它拋頭露面的場合似乎很少。
事實(shí)上,Subject作用是很大的,借用官方的解釋,Subject在同一時(shí)間內(nèi),既可以作為Observable,也可以作為Observer:
在RxJava2.x中,官方一共為我們提供了以下幾種Subject:
- ReplaySubject (釋放接收到的所有數(shù)據(jù))
- BehaviorSubject (釋放訂閱前最后一個(gè)數(shù)據(jù)和訂閱后接收到的所有數(shù)據(jù))
- PublishSubject (釋放訂閱后接收到的數(shù)據(jù))
- AsyncSubject (僅釋放接收到的最后一個(gè)數(shù)據(jù))
- SerializedSubject(串行Subject)
- UnicastSubject (僅支持訂閱一次的Subject)
- TestSubject(已廢棄,在2.x中被TestScheduler和TestObserver替代)
下面依次對(duì)以上的Subject進(jìn)行講解,本文將重點(diǎn)講解BehaviorSubject和PublishSubject。前者是RxLifecycle中核心類所使用到,后者則是RxBus實(shí)現(xiàn)事件總線的核心類。
在開始正文之前,我們需要搞清楚一個(gè)問題:
Subject是什么?
Subject在ReactiveX是作為observer和observerable的一個(gè)bridge或者proxy。因?yàn)樗且粋€(gè)觀察者,所以它可以訂閱一個(gè)或多個(gè)可觀察對(duì)象,同時(shí)因?yàn)樗且粋€(gè)可觀測對(duì)象,所以它可以傳遞和釋放它觀測到的數(shù)據(jù)對(duì)象,并且能釋放新的對(duì)象。
上文已經(jīng)說的很清楚了,它既可以是數(shù)據(jù)源observerable,也可以是數(shù)據(jù)的訂閱者Observer。
我們點(diǎn)開源碼:
public abstract class Subject<T> extends Observable<T> implements Observer<T> {
...
}
可以看到,Subject實(shí)際上還是Observable,只不過它繼承了Observer接口,可以通過onNext、onComplete、onError方法發(fā)射和終止發(fā)射數(shù)據(jù)。
我們從不同種類的Subject展開來講:
一、ReplaySubject
介紹
該Subject會(huì)接收數(shù)據(jù),當(dāng)被訂閱時(shí),將所有接收到的數(shù)據(jù)全部發(fā)送給訂閱者。
這意味著,不管何時(shí)訂閱這個(gè)Subject,這個(gè)Subject會(huì)把它接收到的數(shù)據(jù)都發(fā)送出去:
ReplaySubject<Object> subject = new ReplaySubject<>();
subject.onNext("one");
subject.onNext("two");
subject.onNext("three");
subject.onComplete();
// both of the following will get the onNext/onComplete calls from above
subject.subscribe(observer1);
subject.subscribe(observer2);
一圖頂千言,下方的箭頭代表兩次不同時(shí)間的訂閱:
很顯然,無論是在接收到數(shù)據(jù)前還是數(shù)據(jù)后訂閱,ReplaySubject都會(huì)發(fā)射所有數(shù)據(jù)給訂閱者。
原理
事實(shí)上,每個(gè)Subject的實(shí)現(xiàn)都不可避免的涉及到線程安全的問題,而RxJava則是依靠在內(nèi)部使用原子操作類(AtomicXXX系列)保證線程安全,本文不深入討論原子操作類相關(guān)知識(shí),有興趣的朋友可以參考:
深入分析Java中的原子操作 @碼農(nóng)一枚
ReplaySubject類時(shí)Subject相關(guān)類中代碼最多的一個(gè)類,但這并不意味著這個(gè)類是最難理解的,相反,它的原理很直白:通過一個(gè)List動(dòng)態(tài)存儲(chǔ)所有接收到的數(shù)據(jù),當(dāng)被訂閱時(shí),將所有的數(shù)據(jù)都發(fā)送給訂閱者。
是不是覺得很熟悉?是的,其根本就是一個(gè)動(dòng)態(tài)的鏈表,甚至其創(chuàng)建時(shí)的基礎(chǔ)容量也是16,并且隨著數(shù)據(jù)的不斷增加,每次遞增50%,如果想要節(jié)省開支,也可以自己定義初始容量和遞增規(guī)則。
千余行的代碼并非上述總結(jié)的那么簡單,不過作為了解和使用已經(jīng)足夠,事實(shí)上,這其中絕大部分都是算法和數(shù)據(jù)結(jié)構(gòu)的處理,筆者研究源碼時(shí),也并沒有去深入分析源碼,只是淺嘗輒止。
二、BehaviorSubject
當(dāng)Observer訂閱了一個(gè)BehaviorSubject,它一開始就會(huì)釋放Observable最近釋放的一個(gè)數(shù)據(jù)對(duì)象,當(dāng)還沒有任何數(shù)據(jù)釋放時(shí),它則是一個(gè)默認(rèn)值。接下來就會(huì)釋放Observable釋放的所有數(shù)據(jù)。如果Observable因異常終止,BehaviorSubject將不會(huì)向后續(xù)的Observer釋放數(shù)據(jù),但是會(huì)向Observer傳遞一個(gè)異常通知。
簡單來說,就是釋放訂閱前最后一個(gè)數(shù)據(jù)和訂閱后接收到的所有數(shù)據(jù):
// observer will receive all 4 events (including "default").
BehaviorSubject<Object> subject = BehaviorSubject.createDefault("default");
subject.subscribe(observer);
subject.onNext("one");
subject.onNext("two");
subject.onNext("three");
// observer will receive the "one", "two" and "three" events, but not "zero"
BehaviorSubject<Object> subject = BehaviorSubject.create();
subject.onNext("zero");
subject.onNext("one");
subject.subscribe(observer);
subject.onNext("two");
subject.onNext("three");
// observer will receive only onComplete
BehaviorSubject<Object> subject = BehaviorSubject.create();
subject.onNext("zero");
subject.onNext("one");
subject.onComplete();
subject.subscribe(observer);
// observer will receive only onError
BehaviorSubject<Object> subject = BehaviorSubject.create();
subject.onNext("zero");
subject.onNext("one");
subject.onError(new RuntimeException("error"));
subject.subscribe(observer);
所有case都通過上述代碼進(jìn)行了分析,我們看下圖:
原理
我們簡單看一下BehaviorSubject內(nèi)部的幾個(gè)成員:
public final class BehaviorSubject<T> extends Subject<T> {
private static final Object[] EMPTY_ARRAY = new Object[0];
//原子操作類,當(dāng)前接收到的最后一個(gè)數(shù)據(jù)
final AtomicReference<Object> value;
//原子操作類,BehaviorDisposable內(nèi)部存儲(chǔ)了所有接受到的數(shù)據(jù)
final AtomicReference<BehaviorDisposable<T>[]> subscribers;
//標(biāo)記,意味著一個(gè)空的BehaviorDisposable
static final BehaviorDisposable[] EMPTY = new BehaviorDisposable[0];
//標(biāo)記,意味著已經(jīng)達(dá)到了TERMINATED,終止數(shù)據(jù)的發(fā)射
static final BehaviorDisposable[] TERMINATED = new BehaviorDisposable[0];
...
}
我們?cè)賮砜匆幌翨ehaviorDisposable:
static final class BehaviorDisposable<T> implements Disposable, NonThrowingPredicate<Object> {
...
//實(shí)際上內(nèi)部有一個(gè)List,存儲(chǔ)所有接收到的數(shù)據(jù)
AppendOnlyLinkedArrayList<Object> queue;
...
}
其原理是:
1.當(dāng)BehaviorSubject.create()實(shí)例化時(shí),默認(rèn)將EMPTY賦值給subscribers,這意味著此時(shí)Subject中沒有數(shù)據(jù),:
BehaviorSubject() {
//...省略其他代碼
//這意味著Subject中沒有數(shù)據(jù)
this.subscribers = new AtomicReference<BehaviorDisposable<T>[]>(EMPTY);
//當(dāng)前沒有最終的數(shù)據(jù)
this.value = new AtomicReference<Object>();
}
2.當(dāng)通過onNext發(fā)射數(shù)據(jù)時(shí),存儲(chǔ)數(shù)據(jù):
@Override
public void onNext(T t) {
//省略判斷代碼
Object o = NotificationLite.next(t);
setCurrent(o); //內(nèi)部就是給成員value賦值,記錄最新接收到的數(shù)據(jù)
for (BehaviorDisposable<T> bs : subscribers.get()) {
bs.emitNext(o, index); //其實(shí)就是EMPTY.emitNext()
}
}
3.當(dāng)訂閱時(shí),執(zhí)行subscribeActual方法:
@Override
protected void subscribeActual(Observer<? super T> observer) {
BehaviorDisposable<T> bs = new BehaviorDisposable<T>(observer, this);
observer.onSubscribe(bs);
//1.執(zhí)行add(bs)方法
if (add(bs)) {
if (bs.cancelled) {
remove(bs);//2.如果已經(jīng)取消發(fā)射,則不中斷數(shù)據(jù)的傳遞
} else {
bs.emitFirst();//3.執(zhí)行bs.emitFirst方法
}
} else {
//....
}
}
看一下add方法:
boolean add(BehaviorDisposable<T> rs) {
for (;;) {
BehaviorDisposable<T>[] a = subscribers.get();
if (a == TERMINATED) {
return false;
}
int len = a.length;
BehaviorDisposable<T>[] b = new BehaviorDisposable[len + 1];
System.arraycopy(a, 0, b, 0, len);
b[len] = rs;
//原子數(shù)據(jù)的更新操作
if (subscribers.compareAndSet(a, b)) {
return true;
}
}
}
不熟悉原子操作類的同學(xué),我簡單介紹一下上述代碼的思想:
首先獲取subscribers的值(應(yīng)該獲得一個(gè)BehaviorDisposable數(shù)組,其中有且僅有一個(gè)元素EMPTY),然后將該數(shù)組擴(kuò)容+1,將新創(chuàng)建的BehaviorDisposable對(duì)象放入數(shù)組index=1的位置上,并將該數(shù)組賦值給subscriber。
可以看到,RxJava中的源碼并沒有使用Java Util包中的Collections相關(guān)工具類進(jìn)行數(shù)據(jù)的操作,而是直接使用System.arraycopy(a, 0, b, 0, len)這個(gè)Native底層方法進(jìn)行數(shù)組的處理,減少了額外的開支。
至于BehaviorDisposable的emitFirst方法,我們不難想象,應(yīng)該是處理數(shù)據(jù)的發(fā)射相關(guān)邏輯,實(shí)際上它會(huì)發(fā)射BehaviorSubject的value,這也就是為什么當(dāng)訂閱后,observer會(huì)先接收到訂閱前的最后一個(gè)數(shù)據(jù)的原因。
這之后,每當(dāng)onNext()從上游接收到一個(gè)數(shù)據(jù),都會(huì)subscribers數(shù)組里對(duì)每一個(gè)BehaviorDisposable中的observer向下發(fā)射數(shù)據(jù)。
BehaviorSubject小結(jié)
其原理就是通過subscribers這個(gè)核心的成員,它是一個(gè)不斷變化的數(shù)組。在創(chuàng)建時(shí),其內(nèi)部只是一個(gè)EMPTY(BehaviorDisposable)對(duì)象,每次被訂閱,都會(huì)在既有的數(shù)組上新加一個(gè)BehaviorDisposable對(duì)象,這個(gè)對(duì)象中包含了一個(gè)List,存儲(chǔ)之后會(huì)收到的數(shù)據(jù)。
同時(shí),BehaviorSubject還有一個(gè)value的成員,該成員會(huì)隨著數(shù)據(jù)的不斷接收而進(jìn)行更新,它總是記錄著當(dāng)前最后一個(gè)接收到的數(shù)據(jù),當(dāng)被subscribe時(shí),會(huì)執(zhí)行emitFirst()方法,發(fā)射當(dāng)前記錄的數(shù)據(jù),也就是訂閱前接收到的最后一個(gè)數(shù)據(jù)。
三、PublishSubject
PublishSubject僅會(huì)向Observer釋放在訂閱之后Observable釋放的數(shù)據(jù)。
參考前兩個(gè)Subject的子類,這個(gè)類很好理解,先上代碼:
PublishSubject<Object> subject = PublishSubject.create();
// observer1 will receive all onNext and onComplete events
subject.subscribe(observer1);
subject.onNext("one");
subject.onNext("two");
// observer2 will only receive "three" and onComplete
subject.subscribe(observer2);
subject.onNext("three");
subject.onComplete();
用圖說話:
原理
和BehaviorSubject原理思想如出一轍, 而PublishSubject更簡單,其內(nèi)部的PublishDisposable的原子操作類連AtomicReference都不是,而是AtomicBoolean:
public final class PublishSubject<T> extends Subject<T> {
static final PublishDisposable[] TERMINATED = new PublishDisposable[0];
static final PublishDisposable[] EMPTY = new PublishDisposable[0];
final AtomicReference<PublishDisposable<T>[]> subscribers;
}
static final class PublishDisposable<T> extends AtomicBoolean implements Disposable {
}
其原理就是通過subscribers這個(gè)核心的成員,它是一個(gè)不斷變化的數(shù)組。在創(chuàng)建時(shí),其內(nèi)部只是一個(gè)EMPTY(PublishDisposable)對(duì)象,每次被訂閱,都會(huì)在既有的數(shù)組上新加一個(gè)PublishDisposable對(duì)象,這個(gè)對(duì)象中包含了一個(gè)List,存儲(chǔ)之后會(huì)收到的數(shù)據(jù)。
其實(shí)往上翻,你會(huì)發(fā)現(xiàn),這段總結(jié)是我復(fù)制BehaviorSubject小結(jié)的第一段......不是(其實(shí)就是)偷懶,而是兩者本身幾乎沒什么區(qū)別,PublishDisposable更簡單。
嘗試放棄RxBus !!!
為什么要著重說PublishSubject這個(gè)類,原因很簡單,因?yàn)樵谀壳傲餍械氖录偩€處理方式RxBus中,其原理就是使用PublishSubject進(jìn)行數(shù)據(jù)的分發(fā)。
RxBus的代碼基本如下:
public class RxBus {
private static RxBus rxBus;
private final Subject<Object, Object> _bus = new SerializedSubject<>(PublishSubject.create());
private RxBus() {
}
public static RxBus getInstance() {
if (rxBus == null) {
synchronized (RxBus.class) {
if (rxBus == null) {
rxBus = new RxBus();
}
}
}
return rxBus;
}
public void send(Object o) {
_bus.onNext(o);
}
public Observable<Object> toObserverable() {
return _bus;
}
}
每個(gè)人的RxBus類略有不同,可能有的RxBus類會(huì)添加類似ofType()相關(guān)的filter操作符對(duì)數(shù)據(jù)進(jìn)行篩選,但其原理基本一模一樣,都是通過PublishSubject進(jìn)行數(shù)據(jù)的接收和分發(fā),這樣的使用會(huì)有什么后果,請(qǐng)參考下文:
放棄RxBus,擁抱RxJava(一):為什么避免使用EventBus/RxBus
文中作者已經(jīng)很形象舉出了RxBus的種種缺陷,以及對(duì)于事件的傳遞應(yīng)該如何處理。
在分析完P(guān)ublishSubject的原理后,我們更應(yīng)該清楚,RxBus類似一個(gè)全局的Observable,不斷地接受新的數(shù)據(jù),以及接受訂閱,如果使用不當(dāng),會(huì)導(dǎo)致代碼的混亂,我們也應(yīng)該考慮,在接受的數(shù)據(jù)和訂閱越來越多,PublishSubject的單例對(duì)象會(huì)占用越來越多的內(nèi)存,如果沒有處理好生命周期相關(guān),則會(huì)導(dǎo)致嚴(yán)重的內(nèi)存泄漏!
因此,已經(jīng)分析完RxBus(實(shí)際上也就是PublishSubject )后,我們應(yīng)當(dāng)對(duì)其使用的方式有一個(gè)全新的認(rèn)知,使用,亦或放棄RxBus。
四、AsyncSubject
AsyncSubject僅釋放Observable釋放的最后一個(gè)數(shù)據(jù),并且僅在Observable完成之后。然而如果當(dāng)Observable因?yàn)楫惓6K止,AsyncSubject將不會(huì)釋放任何數(shù)據(jù),但是會(huì)向Observer傳遞一個(gè)異常通知。
隨著可以接收到數(shù)據(jù)的范圍越來越小,似乎也越來越好理解了,直接看圖:
原理
其原理就更簡單了,這下原子操作了連AtomicBoolean都不是了,直接就是AtomicInteger:
public final class AsyncSubject<T> extends Subject<T> {
static final AsyncDisposable[] EMPTY = new AsyncDisposable[0];
static final AsyncDisposable[] TERMINATED = new AsyncDisposable[0];
final AtomicReference<AsyncDisposable<T>[]> subscribers;
//記錄最新的數(shù)據(jù)
T value;
}
//繼承DeferredScalarDisposable
static final class AsyncDisposable<T> extends DeferredScalarDisposable<T> {}
//繼承BasicIntQueueDisposable
public class DeferredScalarDisposable<T> extends BasicIntQueueDisposable<T> {}
//繼承AtomicInteger
public abstract class BasicIntQueueDisposable<T>
extends AtomicInteger
implements QueueDisposable<T> {}
每當(dāng)執(zhí)行onComplete()的時(shí)候,都會(huì)執(zhí)行complete(Object o)方法,先發(fā)送最后一個(gè)數(shù)據(jù),然后執(zhí)行onComplete():
public final void complete(T value) {
//......
Observer<? super T> a = actual;
//先發(fā)送最后一個(gè)數(shù)據(jù)
a.onNext(value);
if (get() != DISPOSED) {
//執(zhí)行onComplete()
a.onComplete();
}
}
五、簡單介紹UnicastSubject和SerializedSubject
UnicastSubject
僅支持訂閱一次的Subject,如果多個(gè)訂閱者試圖訂閱這個(gè)Subject,若該subject未terminate,將會(huì)受到IllegalStateException ,若已經(jīng)terminate,那么只會(huì)執(zhí)行onError或者onComplete方法。
SerializedSubject
回到剛剛的RxBus類中,我們發(fā)現(xiàn)了SerializedSubject的身影,實(shí)際上,它的作用就是:
將Subject串行化的方法,所有其他的Observable和Subject方法都是線程安全的。
我們也可以通過Subject.toSerialized()方法將Subject對(duì)象串行化保證其線程安全:
public abstract class Subject<T> extends Observable<T> implements Observer<T> {
//......
public final Subject<T> toSerialized() {
if (this instanceof SerializedSubject) {
return this;
}
return new SerializedSubject<T>(this);
}
}
我們直接打開這個(gè)類,會(huì)發(fā)現(xiàn),其實(shí)該類中,大部分代碼都會(huì)通過synchronized加鎖,這意味著會(huì)有額外的性能消耗。
這也更進(jìn)一步說明了,使用RxBus,會(huì)額外增加更多的支出(因?yàn)镽xBus中PublisSuject本身的單例對(duì)象就是調(diào)用了toSerialized()方法保證線程安全)。