Subject是一種橋接或代理,可在ReactiveX的某些實(shí)現(xiàn)中用作觀察者(observer)和可觀察對(duì)象(observable)。因?yàn)樗怯^察者,所以可以訂閱一個(gè)或多個(gè)Observable。因?yàn)樗强捎^察的,所以它可以通過釋放觀測(cè)到的數(shù)據(jù)(reemitting),也可以發(fā)出新的數(shù)據(jù)(emit new items)。
因?yàn)橐粋€(gè)subject訂閱了一個(gè)Observable,它將觸發(fā)Observable開始發(fā)射數(shù)據(jù)(如果Observable是“冷”的-也就是說,如果它在開始發(fā)出數(shù)據(jù)之前等待訂閱)。這可以使最終的Subject成為原始“cold”可觀察到的“hot”可觀察到的變體(subject中的一些類型可以立即發(fā)射數(shù)據(jù),無需像Observable一樣subscribe才觸發(fā))。
針對(duì)特定用例設(shè)計(jì)了四種subject,并非所有這些實(shí)現(xiàn)在所有實(shí)現(xiàn)中都可用,并且某些實(shí)現(xiàn)使用其他命名約定。
AsyncSubject
使用AsyncSubject.create()的創(chuàng)建AsyncSubject。AsyncSubject只會(huì)在onComplete方法調(diào)用才會(huì)發(fā)射數(shù)據(jù),只會(huì)發(fā)射一次最后的數(shù)據(jù)。如果源Observable不發(fā)出任何值,則AsyncSubject也將完成而不會(huì)發(fā)出任何值(可以使用observer.assertNoValues())。
它將向所有后續(xù)觀察者發(fā)出相同的最終值(比如下面的observer和observer1)。
但是,如果源Observable終止并發(fā)生錯(cuò)誤,則AsyncSubject將不會(huì)發(fā)出任何項(xiàng)目,而只會(huì)傳遞來自源Observable的錯(cuò)誤通知。
代碼示例
多次onNext,但是只可以看到最后一次的數(shù)據(jù)。多個(gè)observer都可以看到
AsyncSubject<String> subject = AsyncSubject.create();
TestObserver<String> observer = subject.test();
observer.assertEmpty();
subject.onNext("1");
observer.assertEmpty();
//AsyncSubject only emits when onComplete was called.
subject.onNext("2");
subject.onComplete();
observer.assertResult("2");
TestObserver<String> observer1 = subject.test();
observer1.assertResult("2");
沒有發(fā)射出數(shù)據(jù)
AsyncSubject<String> subject = AsyncSubject.create();
TestObserver<String> observer = subject.test();
subject.onComplete();
//observer.assertComplete();
observer.assertNoValues();
subject發(fā)生了異常,下游可以看到。
AsyncSubject<String> subject = AsyncSubject.create();
TestObserver<String> observer = subject.test();
Throwable t = new RuntimeException("發(fā)生異常啦");
subject.onError(t);
observer.assertError(t);
彈珠圖


BehaviorSubject
當(dāng)觀察者訂閱了BehaviorSubject,它首先發(fā)出源Observable最新發(fā)出的數(shù)據(jù)(如果尚未發(fā)出,則為默認(rèn)值)
然后繼續(xù)發(fā)出源Observable稍后發(fā)出的任何其他項(xiàng)目。
如果源Observable終止并發(fā)生錯(cuò)誤,那么BehaviorSubject將不會(huì)向后續(xù)觀察者發(fā)出任何項(xiàng)目,而只會(huì)傳遞源Observable的錯(cuò)誤通知。
包含default值的訂閱, 輸出內(nèi)容是default,one,two,three
代碼示例
BehaviorSubject<String> subjectHasDefault = BehaviorSubject.createDefault("default");
//default,one,two,three
subjectHasDefault.subscribe(System.out::println);
subjectHasDefault.onNext("one");
subjectHasDefault.onNext("two");
subjectHasDefault.onNext("three");
已經(jīng)發(fā)射出數(shù)據(jù)后訂閱,將從最新的數(shù)據(jù)開始輸出。如下輸出結(jié)果one,two,three
BehaviorSubject<Object> subject = BehaviorSubject.create();
subject.onNext("zero");
subject.onNext("one");
//one,two,three
subject.subscribe(System.out::println);
subject.onNext("two");
subject.onNext("three");
沒有發(fā)出任何數(shù)據(jù), 直接onComplete
BehaviorSubject<Object> emptySubject = BehaviorSubject.create();
emptySubject.onNext("zero");
emptySubject.onNext("one");
emptySubject.onComplete();
subject產(chǎn)生錯(cuò)誤,observer會(huì)輸出錯(cuò)誤
BehaviorSubject<Object> errorSubject = BehaviorSubject.create();
errorSubject.onNext("zero");
errorSubject.onNext("one");
errorSubject.onError(new RuntimeException("error"));
//錯(cuò)誤信息
errorSubject.subscribe(System.out::println);
彈珠圖


PublishSubject
PublishSubject僅向觀察者發(fā)出訂閱之后源Observable發(fā)出的那些項(xiàng)。請(qǐng)注意,PublishSubject可能會(huì)在創(chuàng)建后立即開始發(fā)射項(xiàng)目(除非您已采取措施防止這種情況發(fā)生),因此存在這樣的風(fēng)險(xiǎn),即在創(chuàng)建主題和觀察者訂閱該主題之間可能會(huì)丟失一個(gè)或多個(gè)項(xiàng)目。
如果您需要保證從Observable訂閱到所有數(shù)據(jù),您將需要通過“Create”來形成“可觀察”行為,以便您手動(dòng)重新引入“冷”可觀察行為(在開始發(fā)出項(xiàng)目之前檢查所有觀察者是否已訂閱)。或改用ReplaySubject。
如果源Observable終止并發(fā)生錯(cuò)誤,則PublishSubject不會(huì)將任何項(xiàng)目發(fā)送給后續(xù)的觀察者,而只是傳遞來自源Observable的錯(cuò)誤通知。
代碼示例
如下的例子observer1只會(huì)輸出three,four,訂閱之前的數(shù)據(jù)無法訂閱到
PublishSubject<String> subject = PublishSubject.create();
subject.onNext("one");
subject.onNext("two");
subject.subscribe(System.out::println); //輸出three,four
subject.onNext("three");
subject.onNext("four");
subject.onComplete();
subject.test().assertComplete();
PublishSubject遇到了error,則訂閱者可以觀測(cè)到異常。
PublishSubject<String> errorSubject = PublishSubject.create();
errorSubject.onError(new RuntimeException("哈哈哈"));
errorSubject.subscribe(System.out::println);
彈珠圖


ReplaySubject
ReplaySubject向觀察者發(fā)出源Observable發(fā)出的所有數(shù)據(jù),無論觀察者何時(shí)訂閱。
還有一些ReplaySubject版本,一旦重播緩沖區(qū)有可能增長到超過一定大小,或者自從最初發(fā)出項(xiàng)目以來經(jīng)過了指定的時(shí)間跨度,它們就會(huì)丟棄舊數(shù)據(jù)。
如果使用ReplaySubject作為觀察者,請(qǐng)注意不要從多個(gè)線程調(diào)用其onNext方法(或其他on方法),因?yàn)檫@可能導(dǎo)致同時(shí)發(fā)生(非順序發(fā)生)的調(diào)用,這違反了Observable規(guī)約,并在生成的Subject中產(chǎn)生了關(guān)于哪個(gè)數(shù)據(jù)或通知應(yīng)首先重放的不確定性。
代碼示例
ReplaySubject<Object> subject = ReplaySubject.create();
subject.onNext("one");
subject.onNext("two");
subject.onNext("three");
subject.onComplete();
subject.subscribe(r -> System.out.println("[observer1] " + r));
System.out.println("================");
subject.subscribe(r -> System.out.println("[observer2] " + r));
輸出結(jié)果
[observer1] one
[observer1] two
[observer1] three
================
[observer2] one
[observer2] two
[observer2] three
彈珠圖
