理解RxJava(四)Subject用法及原理分析

概述

本文將盡可能將RxJava中的Subject相關(guān)類的用法做一個(gè)闡述,并對(duì)其原理進(jìn)行簡單的解析。

說到Subject,很多人可能都不是很熟悉它,因?yàn)橄鄬?duì)于RxJava的Observable、Schedulers、Subscribes等關(guān)鍵字來講,它拋頭露面的場合似乎很少。

事實(shí)上,Subject作用是很大的,借用官方的解釋,Subject在同一時(shí)間內(nèi),既可以作為Observable,也可以作為Observer:

在RxJava2.x中,官方一共為我們提供了以下幾種Subject:

  • ReplaySubject (釋放接收到的所有數(shù)據(jù))
  • BehaviorSubject (釋放訂閱前最后一個(gè)數(shù)據(jù)和訂閱后接收到的所有數(shù)據(jù))
  • PublishSubject (釋放訂閱后接收到的數(shù)據(jù))
  • AsyncSubject (僅釋放接收到的最后一個(gè)數(shù)據(jù))
  • SerializedSubject(串行Subject)
  • UnicastSubject (僅支持訂閱一次的Subject)
  • TestSubject(已廢棄,在2.x中被TestScheduler和TestObserver替代)

下面依次對(duì)以上的Subject進(jìn)行講解,本文將重點(diǎn)講解BehaviorSubjectPublishSubject。前者是RxLifecycle中核心類所使用到,后者則是RxBus實(shí)現(xiàn)事件總線的核心類。

在開始正文之前,我們需要搞清楚一個(gè)問題:

Subject是什么?

Subject在ReactiveX是作為observer和observerable的一個(gè)bridge或者proxy。因?yàn)樗且粋€(gè)觀察者,所以它可以訂閱一個(gè)或多個(gè)可觀察對(duì)象,同時(shí)因?yàn)樗且粋€(gè)可觀測對(duì)象,所以它可以傳遞和釋放它觀測到的數(shù)據(jù)對(duì)象,并且能釋放新的對(duì)象。

上文已經(jīng)說的很清楚了,它既可以是數(shù)據(jù)源observerable,也可以是數(shù)據(jù)的訂閱者Observer。

我們點(diǎn)開源碼:

public abstract class Subject<T> extends Observable<T> implements Observer<T> {
    ...
}

可以看到,Subject實(shí)際上還是Observable,只不過它繼承了Observer接口,可以通過onNext、onComplete、onError方法發(fā)射和終止發(fā)射數(shù)據(jù)。

我們從不同種類的Subject展開來講:

一、ReplaySubject

介紹

該Subject會(huì)接收數(shù)據(jù),當(dāng)被訂閱時(shí),將所有接收到的數(shù)據(jù)全部發(fā)送給訂閱者。

這意味著,不管何時(shí)訂閱這個(gè)Subject,這個(gè)Subject會(huì)把它接收到的數(shù)據(jù)都發(fā)送出去:

  ReplaySubject<Object> subject = new ReplaySubject<>();
  subject.onNext("one");
  subject.onNext("two");
  subject.onNext("three");
  subject.onComplete();

  // both of the following will get the onNext/onComplete calls from above
  subject.subscribe(observer1);
  subject.subscribe(observer2);

一圖頂千言,下方的箭頭代表兩次不同時(shí)間的訂閱:

ReplaySubject

很顯然,無論是在接收到數(shù)據(jù)前還是數(shù)據(jù)后訂閱,ReplaySubject都會(huì)發(fā)射所有數(shù)據(jù)給訂閱者。

原理

事實(shí)上,每個(gè)Subject的實(shí)現(xiàn)都不可避免的涉及到線程安全的問題,而RxJava則是依靠在內(nèi)部使用原子操作類(AtomicXXX系列)保證線程安全,本文不深入討論原子操作類相關(guān)知識(shí),有興趣的朋友可以參考:
深入分析Java中的原子操作 @碼農(nóng)一枚

ReplaySubject類時(shí)Subject相關(guān)類中代碼最多的一個(gè)類,但這并不意味著這個(gè)類是最難理解的,相反,它的原理很直白:通過一個(gè)List動(dòng)態(tài)存儲(chǔ)所有接收到的數(shù)據(jù),當(dāng)被訂閱時(shí),將所有的數(shù)據(jù)都發(fā)送給訂閱者。

是不是覺得很熟悉?是的,其根本就是一個(gè)動(dòng)態(tài)的鏈表,甚至其創(chuàng)建時(shí)的基礎(chǔ)容量也是16,并且隨著數(shù)據(jù)的不斷增加,每次遞增50%,如果想要節(jié)省開支,也可以自己定義初始容量和遞增規(guī)則。

千余行的代碼并非上述總結(jié)的那么簡單,不過作為了解和使用已經(jīng)足夠,事實(shí)上,這其中絕大部分都是算法和數(shù)據(jù)結(jié)構(gòu)的處理,筆者研究源碼時(shí),也并沒有去深入分析源碼,只是淺嘗輒止。

二、BehaviorSubject

當(dāng)Observer訂閱了一個(gè)BehaviorSubject,它一開始就會(huì)釋放Observable最近釋放的一個(gè)數(shù)據(jù)對(duì)象,當(dāng)還沒有任何數(shù)據(jù)釋放時(shí),它則是一個(gè)默認(rèn)值。接下來就會(huì)釋放Observable釋放的所有數(shù)據(jù)。如果Observable因異常終止,BehaviorSubject將不會(huì)向后續(xù)的Observer釋放數(shù)據(jù),但是會(huì)向Observer傳遞一個(gè)異常通知。

簡單來說,就是釋放訂閱前最后一個(gè)數(shù)據(jù)訂閱后接收到的所有數(shù)據(jù):

  // observer will receive all 4 events (including "default").
  BehaviorSubject<Object> subject = BehaviorSubject.createDefault("default");
  subject.subscribe(observer);
  subject.onNext("one");
  subject.onNext("two");
  subject.onNext("three");

  // observer will receive the "one", "two" and "three" events, but not "zero"
  BehaviorSubject<Object> subject = BehaviorSubject.create();
  subject.onNext("zero");
  subject.onNext("one");
  subject.subscribe(observer);
  subject.onNext("two");
  subject.onNext("three");

  // observer will receive only onComplete
  BehaviorSubject<Object> subject = BehaviorSubject.create();
  subject.onNext("zero");
  subject.onNext("one");
  subject.onComplete();
  subject.subscribe(observer);

  // observer will receive only onError
  BehaviorSubject<Object> subject = BehaviorSubject.create();
  subject.onNext("zero");
  subject.onNext("one");
  subject.onError(new RuntimeException("error"));
  subject.subscribe(observer);

所有case都通過上述代碼進(jìn)行了分析,我們看下圖:

BehaviorSubject

原理

我們簡單看一下BehaviorSubject內(nèi)部的幾個(gè)成員:

public final class BehaviorSubject<T> extends Subject<T> {

    private static final Object[] EMPTY_ARRAY = new Object[0];
    //原子操作類,當(dāng)前接收到的最后一個(gè)數(shù)據(jù)
    final AtomicReference<Object> value;
  
    //原子操作類,BehaviorDisposable內(nèi)部存儲(chǔ)了所有接受到的數(shù)據(jù)
    final AtomicReference<BehaviorDisposable<T>[]> subscribers;
    
    //標(biāo)記,意味著一個(gè)空的BehaviorDisposable
    static final BehaviorDisposable[] EMPTY = new BehaviorDisposable[0];
    
    //標(biāo)記,意味著已經(jīng)達(dá)到了TERMINATED,終止數(shù)據(jù)的發(fā)射
    static final BehaviorDisposable[] TERMINATED = new BehaviorDisposable[0];
...
}

我們?cè)賮砜匆幌翨ehaviorDisposable:

    static final class BehaviorDisposable<T> implements Disposable, NonThrowingPredicate<Object> {
      ...
      //實(shí)際上內(nèi)部有一個(gè)List,存儲(chǔ)所有接收到的數(shù)據(jù)
      AppendOnlyLinkedArrayList<Object> queue;
      ...
}

其原理是:

1.當(dāng)BehaviorSubject.create()實(shí)例化時(shí),默認(rèn)將EMPTY賦值給subscribers,這意味著此時(shí)Subject中沒有數(shù)據(jù),:

    BehaviorSubject() {
        //...省略其他代碼
        //這意味著Subject中沒有數(shù)據(jù)
        this.subscribers = new AtomicReference<BehaviorDisposable<T>[]>(EMPTY);  
        //當(dāng)前沒有最終的數(shù)據(jù)
        this.value = new AtomicReference<Object>();
    }

2.當(dāng)通過onNext發(fā)射數(shù)據(jù)時(shí),存儲(chǔ)數(shù)據(jù):

    @Override
    public void onNext(T t) {
        //省略判斷代碼
        Object o = NotificationLite.next(t);
        setCurrent(o);    //內(nèi)部就是給成員value賦值,記錄最新接收到的數(shù)據(jù)
        for (BehaviorDisposable<T> bs : subscribers.get()) {
            bs.emitNext(o, index);  //其實(shí)就是EMPTY.emitNext()
        }
    }

3.當(dāng)訂閱時(shí),執(zhí)行subscribeActual方法:

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        BehaviorDisposable<T> bs = new BehaviorDisposable<T>(observer, this);
        observer.onSubscribe(bs);
        //1.執(zhí)行add(bs)方法
        if (add(bs)) {
            if (bs.cancelled) {
                remove(bs);//2.如果已經(jīng)取消發(fā)射,則不中斷數(shù)據(jù)的傳遞
            } else {
                bs.emitFirst();//3.執(zhí)行bs.emitFirst方法
            }
        } else {
            //....
        }
    }

看一下add方法:

    boolean add(BehaviorDisposable<T> rs) {
        for (;;) {
            BehaviorDisposable<T>[] a = subscribers.get();
            if (a == TERMINATED) {
                return false;
            }
            int len = a.length;
            BehaviorDisposable<T>[] b = new BehaviorDisposable[len + 1];
            System.arraycopy(a, 0, b, 0, len);
            b[len] = rs;
            //原子數(shù)據(jù)的更新操作
            if (subscribers.compareAndSet(a, b)) {
                return true;
            }
        }
    }

不熟悉原子操作類的同學(xué),我簡單介紹一下上述代碼的思想:
首先獲取subscribers的值(應(yīng)該獲得一個(gè)BehaviorDisposable數(shù)組,其中有且僅有一個(gè)元素EMPTY),然后將該數(shù)組擴(kuò)容+1,將新創(chuàng)建的BehaviorDisposable對(duì)象放入數(shù)組index=1的位置上,并將該數(shù)組賦值給subscriber。

可以看到,RxJava中的源碼并沒有使用Java Util包中的Collections相關(guān)工具類進(jìn)行數(shù)據(jù)的操作,而是直接使用System.arraycopy(a, 0, b, 0, len)這個(gè)Native底層方法進(jìn)行數(shù)組的處理,減少了額外的開支。

至于BehaviorDisposable的emitFirst方法,我們不難想象,應(yīng)該是處理數(shù)據(jù)的發(fā)射相關(guān)邏輯,實(shí)際上它會(huì)發(fā)射BehaviorSubject的value,這也就是為什么當(dāng)訂閱后,observer會(huì)先接收到訂閱前的最后一個(gè)數(shù)據(jù)的原因。

這之后,每當(dāng)onNext()從上游接收到一個(gè)數(shù)據(jù),都會(huì)subscribers數(shù)組里對(duì)每一個(gè)BehaviorDisposable中的observer向下發(fā)射數(shù)據(jù)。

BehaviorSubject小結(jié)

其原理就是通過subscribers這個(gè)核心的成員,它是一個(gè)不斷變化的數(shù)組。在創(chuàng)建時(shí),其內(nèi)部只是一個(gè)EMPTY(BehaviorDisposable)對(duì)象,每次被訂閱,都會(huì)在既有的數(shù)組上新加一個(gè)BehaviorDisposable對(duì)象,這個(gè)對(duì)象中包含了一個(gè)List,存儲(chǔ)之后會(huì)收到的數(shù)據(jù)。

同時(shí),BehaviorSubject還有一個(gè)value的成員,該成員會(huì)隨著數(shù)據(jù)的不斷接收而進(jìn)行更新,它總是記錄著當(dāng)前最后一個(gè)接收到的數(shù)據(jù),當(dāng)被subscribe時(shí),會(huì)執(zhí)行emitFirst()方法,發(fā)射當(dāng)前記錄的數(shù)據(jù),也就是訂閱前接收到的最后一個(gè)數(shù)據(jù)。

三、PublishSubject

PublishSubject僅會(huì)向Observer釋放在訂閱之后Observable釋放的數(shù)據(jù)。

參考前兩個(gè)Subject的子類,這個(gè)類很好理解,先上代碼:

  PublishSubject<Object> subject = PublishSubject.create();
  // observer1 will receive all onNext and onComplete events
  subject.subscribe(observer1);
  subject.onNext("one");
  subject.onNext("two");

  // observer2 will only receive "three" and onComplete
  subject.subscribe(observer2);
  subject.onNext("three");
  subject.onComplete();

用圖說話:

PublishSubject

原理

和BehaviorSubject原理思想如出一轍, 而PublishSubject更簡單,其內(nèi)部的PublishDisposable的原子操作類連AtomicReference都不是,而是AtomicBoolean:

public final class PublishSubject<T> extends Subject<T> {
    static final PublishDisposable[] TERMINATED = new PublishDisposable[0];
    static final PublishDisposable[] EMPTY = new PublishDisposable[0];

    final AtomicReference<PublishDisposable<T>[]> subscribers;
}
static final class PublishDisposable<T> extends AtomicBoolean implements Disposable {
}

其原理就是通過subscribers這個(gè)核心的成員,它是一個(gè)不斷變化的數(shù)組。在創(chuàng)建時(shí),其內(nèi)部只是一個(gè)EMPTY(PublishDisposable)對(duì)象,每次被訂閱,都會(huì)在既有的數(shù)組上新加一個(gè)PublishDisposable對(duì)象,這個(gè)對(duì)象中包含了一個(gè)List,存儲(chǔ)之后會(huì)收到的數(shù)據(jù)。

其實(shí)往上翻,你會(huì)發(fā)現(xiàn),這段總結(jié)是我復(fù)制BehaviorSubject小結(jié)的第一段......不是(其實(shí)就是)偷懶,而是兩者本身幾乎沒什么區(qū)別,PublishDisposable更簡單。

嘗試放棄RxBus !!!

為什么要著重說PublishSubject這個(gè)類,原因很簡單,因?yàn)樵谀壳傲餍械氖录偩€處理方式RxBus中,其原理就是使用PublishSubject進(jìn)行數(shù)據(jù)的分發(fā)。

RxBus的代碼基本如下:

public class RxBus {

    private static RxBus rxBus;
    private final Subject<Object, Object> _bus = new SerializedSubject<>(PublishSubject.create());

    private RxBus() {
    }

    public static RxBus getInstance() {
        if (rxBus == null) {
            synchronized (RxBus.class) {
                if (rxBus == null) {
                    rxBus = new RxBus();
                }
            }
        }
        return rxBus;
    }

    public void send(Object o) {
        _bus.onNext(o);
    }

    public Observable<Object> toObserverable() {
        return _bus;
    }
}

每個(gè)人的RxBus類略有不同,可能有的RxBus類會(huì)添加類似ofType()相關(guān)的filter操作符對(duì)數(shù)據(jù)進(jìn)行篩選,但其原理基本一模一樣,都是通過PublishSubject進(jìn)行數(shù)據(jù)的接收和分發(fā),這樣的使用會(huì)有什么后果,請(qǐng)參考下文:

放棄RxBus,擁抱RxJava(一):為什么避免使用EventBus/RxBus

文中作者已經(jīng)很形象舉出了RxBus的種種缺陷,以及對(duì)于事件的傳遞應(yīng)該如何處理。

在分析完P(guān)ublishSubject的原理后,我們更應(yīng)該清楚,RxBus類似一個(gè)全局的Observable,不斷地接受新的數(shù)據(jù),以及接受訂閱,如果使用不當(dāng),會(huì)導(dǎo)致代碼的混亂,我們也應(yīng)該考慮,在接受的數(shù)據(jù)和訂閱越來越多,PublishSubject的單例對(duì)象會(huì)占用越來越多的內(nèi)存,如果沒有處理好生命周期相關(guān),則會(huì)導(dǎo)致嚴(yán)重的內(nèi)存泄漏!

因此,已經(jīng)分析完RxBus(實(shí)際上也就是PublishSubject )后,我們應(yīng)當(dāng)對(duì)其使用的方式有一個(gè)全新的認(rèn)知,使用,亦或放棄RxBus。

四、AsyncSubject

AsyncSubject僅釋放Observable釋放的最后一個(gè)數(shù)據(jù),并且僅在Observable完成之后。然而如果當(dāng)Observable因?yàn)楫惓6K止,AsyncSubject將不會(huì)釋放任何數(shù)據(jù),但是會(huì)向Observer傳遞一個(gè)異常通知。

隨著可以接收到數(shù)據(jù)的范圍越來越小,似乎也越來越好理解了,直接看圖:

AsyncSubject

原理

其原理就更簡單了,這下原子操作了連AtomicBoolean都不是了,直接就是AtomicInteger:

public final class AsyncSubject<T> extends Subject<T> {

    static final AsyncDisposable[] EMPTY = new AsyncDisposable[0];

    static final AsyncDisposable[] TERMINATED = new AsyncDisposable[0];

    final AtomicReference<AsyncDisposable<T>[]> subscribers;
    //記錄最新的數(shù)據(jù)
    T value;
}
//繼承DeferredScalarDisposable
static final class AsyncDisposable<T> extends DeferredScalarDisposable<T> {}

//繼承BasicIntQueueDisposable
public class DeferredScalarDisposable<T> extends BasicIntQueueDisposable<T> {}

//繼承AtomicInteger
public abstract class BasicIntQueueDisposable<T>
extends AtomicInteger
implements QueueDisposable<T> {}

每當(dāng)執(zhí)行onComplete()的時(shí)候,都會(huì)執(zhí)行complete(Object o)方法,先發(fā)送最后一個(gè)數(shù)據(jù),然后執(zhí)行onComplete():

    public final void complete(T value) {
        //......
        Observer<? super T> a = actual;
        //先發(fā)送最后一個(gè)數(shù)據(jù)
        a.onNext(value);
        if (get() != DISPOSED) {
            //執(zhí)行onComplete()
            a.onComplete();
        }
    }

五、簡單介紹UnicastSubject和SerializedSubject

UnicastSubject

僅支持訂閱一次的Subject,如果多個(gè)訂閱者試圖訂閱這個(gè)Subject,若該subject未terminate,將會(huì)受到IllegalStateException ,若已經(jīng)terminate,那么只會(huì)執(zhí)行onError或者onComplete方法。

SerializedSubject

回到剛剛的RxBus類中,我們發(fā)現(xiàn)了SerializedSubject的身影,實(shí)際上,它的作用就是:

將Subject串行化的方法,所有其他的Observable和Subject方法都是線程安全的。

我們也可以通過Subject.toSerialized()方法將Subject對(duì)象串行化保證其線程安全:

public abstract class Subject<T> extends Observable<T> implements Observer<T> {
    //......
    public final Subject<T> toSerialized() {
        if (this instanceof SerializedSubject) {
            return this;
        }
        return new SerializedSubject<T>(this);
    }
}

我們直接打開這個(gè)類,會(huì)發(fā)現(xiàn),其實(shí)該類中,大部分代碼都會(huì)通過synchronized加鎖,這意味著會(huì)有額外的性能消耗。

這也更進(jìn)一步說明了,使用RxBus,會(huì)額外增加更多的支出(因?yàn)镽xBus中PublisSuject本身的單例對(duì)象就是調(diào)用了toSerialized()方法保證線程安全)。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容