Subject

Subject是一種橋接或代理,可在ReactiveX的某些實(shí)現(xiàn)中用作觀察者(observer)和可觀察對(duì)象(observable)。因?yàn)樗怯^察者,所以可以訂閱一個(gè)或多個(gè)Observable。因?yàn)樗强捎^察的,所以它可以通過釋放觀測(cè)到的數(shù)據(jù)(reemitting),也可以發(fā)出新的數(shù)據(jù)(emit new items)。
因?yàn)橐粋€(gè)subject訂閱了一個(gè)Observable,它將觸發(fā)Observable開始發(fā)射數(shù)據(jù)(如果Observable是“冷”的-也就是說,如果它在開始發(fā)出數(shù)據(jù)之前等待訂閱)。這可以使最終的Subject成為原始“cold”可觀察到的“hot”可觀察到的變體(subject中的一些類型可以立即發(fā)射數(shù)據(jù),無需像Observable一樣subscribe才觸發(fā))。

針對(duì)特定用例設(shè)計(jì)了四種subject,并非所有這些實(shí)現(xiàn)在所有實(shí)現(xiàn)中都可用,并且某些實(shí)現(xiàn)使用其他命名約定。

AsyncSubject

使用AsyncSubject.create()的創(chuàng)建AsyncSubject。AsyncSubject只會(huì)在onComplete方法調(diào)用才會(huì)發(fā)射數(shù)據(jù),只會(huì)發(fā)射一次最后的數(shù)據(jù)。如果源Observable不發(fā)出任何值,則AsyncSubject也將完成而不會(huì)發(fā)出任何值(可以使用observer.assertNoValues())。
它將向所有后續(xù)觀察者發(fā)出相同的最終值(比如下面的observer和observer1)。
但是,如果源Observable終止并發(fā)生錯(cuò)誤,則AsyncSubject將不會(huì)發(fā)出任何項(xiàng)目,而只會(huì)傳遞來自源Observable的錯(cuò)誤通知。

代碼示例

多次onNext,但是只可以看到最后一次的數(shù)據(jù)。多個(gè)observer都可以看到

AsyncSubject<String> subject = AsyncSubject.create();
TestObserver<String> observer = subject.test();
observer.assertEmpty();
subject.onNext("1");
observer.assertEmpty();
//AsyncSubject only emits when onComplete was called.
subject.onNext("2");
subject.onComplete();
observer.assertResult("2");

TestObserver<String> observer1 = subject.test();
observer1.assertResult("2");

沒有發(fā)射出數(shù)據(jù)

AsyncSubject<String> subject = AsyncSubject.create();
TestObserver<String> observer = subject.test();
subject.onComplete();
//observer.assertComplete();
observer.assertNoValues();

subject發(fā)生了異常,下游可以看到。

AsyncSubject<String> subject = AsyncSubject.create();
TestObserver<String> observer = subject.test();
Throwable t = new RuntimeException("發(fā)生異常啦");
subject.onError(t);
observer.assertError(t);

彈珠圖

image.png
image.png

image.png
image.png

BehaviorSubject

當(dāng)觀察者訂閱了BehaviorSubject,它首先發(fā)出源Observable最新發(fā)出的數(shù)據(jù)(如果尚未發(fā)出,則為默認(rèn)值)
然后繼續(xù)發(fā)出源Observable稍后發(fā)出的任何其他項(xiàng)目。
如果源Observable終止并發(fā)生錯(cuò)誤,那么BehaviorSubject將不會(huì)向后續(xù)觀察者發(fā)出任何項(xiàng)目,而只會(huì)傳遞源Observable的錯(cuò)誤通知。
包含default值的訂閱, 輸出內(nèi)容是default,one,two,three

代碼示例

BehaviorSubject<String> subjectHasDefault = BehaviorSubject.createDefault("default");
//default,one,two,three
subjectHasDefault.subscribe(System.out::println);
subjectHasDefault.onNext("one");
subjectHasDefault.onNext("two");
subjectHasDefault.onNext("three");

已經(jīng)發(fā)射出數(shù)據(jù)后訂閱,將從最新的數(shù)據(jù)開始輸出。如下輸出結(jié)果one,two,three

BehaviorSubject<Object> subject = BehaviorSubject.create();
subject.onNext("zero");
subject.onNext("one");
//one,two,three
subject.subscribe(System.out::println);
subject.onNext("two");
subject.onNext("three");

沒有發(fā)出任何數(shù)據(jù), 直接onComplete

BehaviorSubject<Object> emptySubject = BehaviorSubject.create();
emptySubject.onNext("zero");
emptySubject.onNext("one");
emptySubject.onComplete();

subject產(chǎn)生錯(cuò)誤,observer會(huì)輸出錯(cuò)誤

BehaviorSubject<Object> errorSubject = BehaviorSubject.create();
errorSubject.onNext("zero");
errorSubject.onNext("one");
errorSubject.onError(new RuntimeException("error"));
//錯(cuò)誤信息
        errorSubject.subscribe(System.out::println);

彈珠圖

image.png
image.png

image.png
image.png

PublishSubject

PublishSubject僅向觀察者發(fā)出訂閱之后源Observable發(fā)出的那些項(xiàng)。請(qǐng)注意,PublishSubject可能會(huì)在創(chuàng)建后立即開始發(fā)射項(xiàng)目(除非您已采取措施防止這種情況發(fā)生),因此存在這樣的風(fēng)險(xiǎn),即在創(chuàng)建主題和觀察者訂閱該主題之間可能會(huì)丟失一個(gè)或多個(gè)項(xiàng)目。
如果您需要保證從Observable訂閱到所有數(shù)據(jù),您將需要通過“Create”來形成“可觀察”行為,以便您手動(dòng)重新引入“冷”可觀察行為(在開始發(fā)出項(xiàng)目之前檢查所有觀察者是否已訂閱)。或改用ReplaySubject。
如果源Observable終止并發(fā)生錯(cuò)誤,則PublishSubject不會(huì)將任何項(xiàng)目發(fā)送給后續(xù)的觀察者,而只是傳遞來自源Observable的錯(cuò)誤通知。

代碼示例

如下的例子observer1只會(huì)輸出three,four,訂閱之前的數(shù)據(jù)無法訂閱到

PublishSubject<String> subject = PublishSubject.create();
subject.onNext("one");
subject.onNext("two");
subject.subscribe(System.out::println); //輸出three,four
subject.onNext("three");
subject.onNext("four");
subject.onComplete();
subject.test().assertComplete();

PublishSubject遇到了error,則訂閱者可以觀測(cè)到異常。

PublishSubject<String> errorSubject = PublishSubject.create();
errorSubject.onError(new RuntimeException("哈哈哈"));
errorSubject.subscribe(System.out::println);

彈珠圖

image.png
image.png

image.png
image.png

ReplaySubject

ReplaySubject向觀察者發(fā)出源Observable發(fā)出的所有數(shù)據(jù),無論觀察者何時(shí)訂閱。
還有一些ReplaySubject版本,一旦重播緩沖區(qū)有可能增長到超過一定大小,或者自從最初發(fā)出項(xiàng)目以來經(jīng)過了指定的時(shí)間跨度,它們就會(huì)丟棄舊數(shù)據(jù)。
如果使用ReplaySubject作為觀察者,請(qǐng)注意不要從多個(gè)線程調(diào)用其onNext方法(或其他on方法),因?yàn)檫@可能導(dǎo)致同時(shí)發(fā)生(非順序發(fā)生)的調(diào)用,這違反了Observable規(guī)約,并在生成的Subject中產(chǎn)生了關(guān)于哪個(gè)數(shù)據(jù)或通知應(yīng)首先重放的不確定性。

代碼示例

ReplaySubject<Object> subject = ReplaySubject.create();
subject.onNext("one");
subject.onNext("two");
subject.onNext("three");
subject.onComplete();

subject.subscribe(r -> System.out.println("[observer1] " + r));
System.out.println("================");
subject.subscribe(r -> System.out.println("[observer2] " + r));

輸出結(jié)果

[observer1] one
[observer1] two
[observer1] three
================
[observer2] one
[observer2] two
[observer2] three

彈珠圖

image.png
image.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容