subject是個(gè)神奇的對(duì)象,他可以是一個(gè)Observable(觀察)同時(shí)也可以是一個(gè)Observer(觀察者):它作為鏈接這兩個(gè)世界的橋梁。一個(gè)Subject可以訂閱一個(gè)Observable,就像一個(gè)觀察者,并且它可以發(fā)射新的數(shù)據(jù),就像一個(gè)Observable。很明顯,作為Obserable,觀察者們或者其它Subject都可以訂閱它。
一旦Subhect訂閱了Observable,他將會(huì)觸發(fā)Observable開始發(fā)射。如果原始的Observable是“冷”的,這將會(huì)對(duì)訂閱一個(gè)“熱”的Observable變量產(chǎn)生影響。
RxJava提供了四種不同的Subject:
- PublishSubject (發(fā)布)
- BehaviorSubject (行為/反應(yīng))
- ReplaySubject (重播)
- AsyncSubject (異步)
PublishSubject
Publish是Subject的一個(gè)基礎(chǔ)子類。讓我們看看用PublishSubject實(shí)現(xiàn)傳統(tǒng)的Observable
PublishSubject<String> stringPublishSubject = PublishSubject.create();
Subscription subscriptionPrint = stringPublishSubject.subscribe(new Observer<String>() {
@Override
public void onCompleted() {
System.out.println("Observable completed");
}
@Override
public void onError(Throwable e) {
System.out.println("Oh,no!Something wrong happened!");
}
@Override
public void onNext(String message) {
System.out.println(message);
}
});
stringPublishSubject.onNext("Hello World");
在剛才的例子中,我們創(chuàng)建一個(gè)PulishSubject,用create()發(fā)射一個(gè)S聽(tīng)值,然后我們訂閱PublishSubject。此時(shí),沒(méi)有數(shù)據(jù)要發(fā)送,因此我們的觀察者只能等待,沒(méi)有阻塞線程,也沒(méi)有資源消耗。
最后一行代碼展示了手動(dòng)發(fā)射字符串“Hello World”,它觸發(fā)了觀察者的onNext()方法,讓我們?cè)诳刂婆_(tái)打印出“Hello World”信息。
讓我們看一個(gè)更復(fù)雜的例子。話說(shuō)我們有一個(gè)private聲明的Observable,外部不能訪問(wèn)。Observable在它生命周期內(nèi)發(fā)射值,我們不用關(guān)心這些值,我們只關(guān)心他們的結(jié)束。
首先,我們創(chuàng)建一個(gè)新的PublishSubject來(lái)響應(yīng)它的onNext()方法,并且外部也可以訪問(wèn)它。
final PublishSubject<Boolean> subject = PublishSubject.create();
subject.subscribe(new Observer<Boolean>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Boolean aBoolean) {
System.out.println("Observable Completed");
}
});
然后,我們創(chuàng)建“私有”的Observable,只有subject才可以訪問(wèn)的到。
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 5; i++) {
subscriber.onNext(i);
}
subscriber.onCompleted();
}
}).doOnCompleted(new Action0() {
@Override
public void call() {
subject.onNext(true);
}
}).subscribe();
observable.create()方阿飛包含了我們for循環(huán),發(fā)射數(shù)字。doOnCompleted()方法指定當(dāng)Observable結(jié)束時(shí)要做什么事情;在subject上發(fā)射true。最后,我們訂閱了Observable。很明顯,空的subscribe()調(diào)用僅僅是為了開啟Observable,而不用管已發(fā)出的任何值,也不用管完成事件或者錯(cuò)誤事件。為了這個(gè)例子我們需要它像這樣。
在這個(gè)例子中,我們創(chuàng)建了一個(gè)可以連接Observables并且同時(shí)可被觀測(cè)的實(shí)體。當(dāng)我們想為公共資源創(chuàng)建獨(dú)立、抽象或更易觀測(cè)的點(diǎn)時(shí),這是極其有用的。
BehaviorSubject
簡(jiǎn)單的說(shuō),BehaviorDSubject會(huì)首先向他的訂閱這發(fā)送截至訂閱前最新的一個(gè)數(shù)據(jù)對(duì)象(或初始值),然后正常發(fā)送訂閱后的數(shù)據(jù)流。
BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create(1);