
Subject 是一種特殊的存在
在前面一篇文章Cold Observable 和 Hot Observable中,曾經(jīng)介紹過 Subject 既是 Observable 又是 Observer(Subscriber)。官網(wǎng)稱 Subject 可以看成是一個(gè)橋梁或者代理。
Subject的分類
Subject包含四種類型分別是AsyncSubject、BehaviorSubject、ReplaySubject和PublishSubject。
1. AsyncSubject
Observer會(huì)接收AsyncSubject的onComplete()之前的最后一個(gè)數(shù)據(jù)。
AsyncSubject<String> subject = AsyncSubject.create();
subject.onNext("asyncSubject1");
subject.onNext("asyncSubject2");
subject.onComplete();
subject.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("asyncSubject:"+s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
System.out.println("asyncSubject onError"); //不輸出(異常才會(huì)輸出)
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("asyncSubject:complete"); //輸出 asyncSubject onComplete
}
});
subject.onNext("asyncSubject3");
subject.onNext("asyncSubject4");
執(zhí)行結(jié)果:
asyncSubject:asyncSubject2
asyncSubject:complete
改一下代碼,將subject.onComplete()放在最后。
AsyncSubject<String> subject = AsyncSubject.create();
subject.onNext("asyncSubject1");
subject.onNext("asyncSubject2");
subject.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("asyncSubject:"+s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
System.out.println("asyncSubject onError"); //不輸出(異常才會(huì)輸出)
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("asyncSubject:complete"); //輸出 asyncSubject onComplete
}
});
subject.onNext("asyncSubject3");
subject.onNext("asyncSubject4");
subject.onComplete();
執(zhí)行結(jié)果:
asyncSubject:asyncSubject4
asyncSubject:complete
注意,subject.onComplete()必須要調(diào)用才會(huì)開始發(fā)送數(shù)據(jù),否則Subscriber將不接收任何數(shù)據(jù)。
2. BehaviorSubject
Observer會(huì)接收到BehaviorSubject被訂閱之前的最后一個(gè)數(shù)據(jù),再接收訂閱之后發(fā)射過來的數(shù)據(jù)。如果BehaviorSubject被訂閱之前沒有發(fā)送任何數(shù)據(jù),則會(huì)發(fā)送一個(gè)默認(rèn)數(shù)據(jù)。
BehaviorSubject<String> subject = BehaviorSubject.createDefault("behaviorSubject1");
subject.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("behaviorSubject:"+s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
System.out.println("behaviorSubject onError"); //不輸出(異常才會(huì)輸出)
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("behaviorSubject:complete"); //輸出 behaviorSubject onComplete
}
});
subject.onNext("behaviorSubject2");
subject.onNext("behaviorSubject3");
執(zhí)行結(jié)果:
behaviorSubject:behaviorSubject1
behaviorSubject:behaviorSubject2
behaviorSubject:behaviorSubject3
在這里,behaviorSubject1是默認(rèn)值。因?yàn)閳?zhí)行了
BehaviorSubject<String> subject = BehaviorSubject.createDefault("behaviorSubject1");
稍微改一下代碼,在subscribe()之前,再發(fā)射一個(gè)事件。
BehaviorSubject<String> subject = BehaviorSubject.createDefault("behaviorSubject1");
subject.onNext("behaviorSubject2");
subject.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("behaviorSubject:"+s); //輸出asyncSubject:asyncSubject3
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
System.out.println("behaviorSubject onError"); //不輸出(異常才會(huì)輸出)
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("behaviorSubject:complete"); //輸出 behaviorSubject onComplete
}
});
subject.onNext("behaviorSubject3");
subject.onNext("behaviorSubject4");
執(zhí)行結(jié)果:
behaviorSubject:behaviorSubject2
behaviorSubject:behaviorSubject3
behaviorSubject:behaviorSubject4
這次丟棄了默認(rèn)值,而發(fā)射behaviorSubject2。
因?yàn)锽ehaviorSubject 每次只會(huì)發(fā)射調(diào)用subscribe()方法之前的最后一個(gè)事件和調(diào)用subscribe()方法之后的事件。
BehaviorSubject還可以緩存最近一次發(fā)出信息的數(shù)據(jù)。
3. ReplaySubject
ReplaySubject會(huì)發(fā)射所有來自原始Observable的數(shù)據(jù)給觀察者,無論它們是何時(shí)訂閱的。
ReplaySubject<String> subject = ReplaySubject.create();
subject.onNext("replaySubject1");
subject.onNext("replaySubject2");
subject.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("replaySubject:"+s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
System.out.println("replaySubject onError"); //不輸出(異常才會(huì)輸出)
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("replaySubject:complete"); //輸出 replaySubject onComplete
}
});
subject.onNext("replaySubject3");
subject.onNext("replaySubject4");
執(zhí)行結(jié)果:
replaySubject:replaySubject1
replaySubject:replaySubject2
replaySubject:replaySubject3
replaySubject:replaySubject4
稍微改一下代碼,將create()改成createWithSize(1)只緩存訂閱前最后發(fā)送的1條數(shù)據(jù)
ReplaySubject<String> subject = ReplaySubject.createWithSize(1);
subject.onNext("replaySubject1");
subject.onNext("replaySubject2");
subject.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("replaySubject:"+s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
System.out.println("replaySubject onError"); //不輸出(異常才會(huì)輸出)
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("replaySubject:complete"); //輸出 replaySubject onComplete
}
});
subject.onNext("replaySubject3");
subject.onNext("replaySubject4");
執(zhí)行結(jié)果:
replaySubject:replaySubject2
replaySubject:replaySubject3
replaySubject:replaySubject4
這個(gè)執(zhí)行結(jié)果跟BehaviorSubject是一樣的。但是從并發(fā)的角度來看,ReplaySubject 在處理并發(fā) subscribe() 和 onNext() 時(shí)會(huì)更加復(fù)雜。
ReplaySubject除了可以限制緩存數(shù)據(jù)的數(shù)量和還能限制緩存的時(shí)間。使用createWithTime()即可。
4. PublishSubject
Observer只接收PublishSubject被訂閱之后發(fā)送的數(shù)據(jù)。
PublishSubject<String> subject = PublishSubject.create();
subject.onNext("publicSubject1");
subject.onNext("publicSubject2");
subject.onComplete();
subject.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("publicSubject:"+s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
System.out.println("publicSubject onError"); //不輸出(異常才會(huì)輸出)
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("publicSubject:complete"); //輸出 publicSubject onComplete
}
});
subject.onNext("publicSubject3");
subject.onNext("publicSubject4");
執(zhí)行結(jié)果:
publicSubject:complete
因?yàn)閟ubject在訂閱之前,已經(jīng)執(zhí)行了onComplete()方法,所以無法發(fā)射數(shù)據(jù)。稍微改一下代碼,將onComplete()方法放在最后。
PublishSubject<String> subject = PublishSubject.create();
subject.onNext("publicSubject1");
subject.onNext("publicSubject2");
subject.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println("publicSubject:"+s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
System.out.println("publicSubject onError"); //不輸出(異常才會(huì)輸出)
}
}, new Action() {
@Override
public void run() throws Exception {
System.out.println("publicSubject:complete"); //輸出 publicSubject onComplete
}
});
subject.onNext("publicSubject3");
subject.onNext("publicSubject4");
subject.onComplete();
執(zhí)行結(jié)果:
publicSubject:publicSubject3
publicSubject:publicSubject4
publicSubject:complete
最后,一句話總結(jié)一下四個(gè)Subject的特性。
| Subject | 發(fā)射行為 |
|---|---|
| AsyncSubject | 不論訂閱發(fā)生在什么時(shí)候,只會(huì)發(fā)射最后一個(gè)數(shù)據(jù) |
| BehaviorSubject | 發(fā)送訂閱之前一個(gè)數(shù)據(jù)和訂閱之后的全部數(shù)據(jù) |
| ReplaySubject | 不論訂閱發(fā)生在什么時(shí)候,都發(fā)射全部數(shù)據(jù) |
| PublishSubject | 發(fā)送訂閱之后全部數(shù)據(jù) |
可能錯(cuò)過的事件
Subject 作為一個(gè)Observable時(shí),可以不停地調(diào)用onNext()來發(fā)送事件,直到遇到onComplete()才會(huì)結(jié)束。
PublishSubject<String> subject = PublishSubject.create();
subject.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println(s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
}
},new Action() {
@Override
public void run() throws Exception {
System.out.println("completed");
}
});
subject.onNext("Foo");
subject.onNext("Bar");
subject.onComplete();
執(zhí)行的結(jié)果:
Foo
Bar
completed
如果,使用 subsribeOn 操作符將 subject 切換到IO線程,再使用 Thread.sleep(2000) 讓主線程休眠2秒。
PublishSubject<String> subject = PublishSubject.create();
subject.subscribeOn(Schedulers.io())
.subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
System.out.println(s);
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
}
},new Action() {
@Override
public void run() throws Exception {
System.out.println("completed");
}
});
subject.onNext("Foo");
subject.onNext("Bar");
subject.onComplete();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
這時(shí),其執(zhí)行的結(jié)果變?yōu)椋?/p>
completed
為何會(huì)缺少打印Foo和Bar?
因?yàn)?,subject 發(fā)射元素的線程被指派到了 IO 線程,此時(shí) IO 線程正在初始化還沒起來,subject 發(fā)射前這兩個(gè)元素Foo、Bar還在主線程中,主線程的這兩個(gè)元素往 IO 線程轉(zhuǎn)發(fā)的過程中由于 IO 線程還沒有起來,所以就被丟棄了。此時(shí),無論Thread睡了多少秒,F(xiàn)oo、Bar都不會(huì)被打印出來。
其實(shí),解決辦法也很簡單,將subject改成使用Observable.create()來替代,它允許為每個(gè)訂閱者精確控制事件的發(fā)送,這樣就不會(huì)缺少打印Foo和Bar。
使用PublishSubject來實(shí)現(xiàn)簡化的RxBus
下面的代碼是一個(gè)簡化版本的Event Bus,在這里使用了PublishSubject。因?yàn)槭录偩€是基于發(fā)布/訂閱模式實(shí)現(xiàn)的,如果某一事件在多個(gè)Activity/Fragment中被訂閱的話,在App的任意地方一旦發(fā)布該事件,則多個(gè)訂閱的地方都能夠同時(shí)收到這一事件(在這里,訂閱事件的Activity/Fragment不能被destory,一旦被destory就不能收到事件),這很符合Hot Observable的特性。所以,我們使用PublishSubject,考慮到多線程的情況,還需要使用 Subject 的 toSerialized() 方法。
import io.reactivex.Observable;
import io.reactivex.subjects.PublishSubject;
import io.reactivex.subjects.Subject;
public class RxBus {
private final Subject<Object> mBus;
private RxBus() {
mBus = PublishSubject.create().toSerialized();
}
public static RxBus get() {
return Holder.BUS;
}
public void post(Object obj) {
mBus.onNext(obj);
}
public <T> Observable<T> toObservable(Class<T> tClass) {
return mBus.ofType(tClass);
}
public Observable<Object> toObservable() {
return mBus;
}
public boolean hasObservers() {
return mBus.hasObservers();
}
private static class Holder {
private static final RxBus BUS = new RxBus();
}
}
在這里Subject的toSerialized(),使用SerializedSubject包裝了原先的Subject。
/**
* Wraps this Subject and serializes the calls to the onSubscribe, onNext, onError and
* onComplete methods, making them thread-safe.
* <p>The method is thread-safe.
* @return the wrapped and serialized subject
*/
@NonNull
public final Subject<T> toSerialized() {
if (this instanceof SerializedSubject) {
return this;
}
return new SerializedSubject<T>(this);
}
這個(gè)版本的Event Bus比較簡單,并沒有考慮到背壓的情況,因?yàn)樵?RxJava2.x 中 Subject 已經(jīng)不再支持背壓了。如果要增加背壓的處理,可以使用Processor,我們需要將 PublishSubject 改成 PublishProcessor,對(duì)應(yīng)的 Observable 也需要改成 Flowable。
使用BehaviorSubject來實(shí)現(xiàn)預(yù)加載
預(yù)加載可以很好的提高程序的用戶體驗(yàn)。
每當(dāng)用戶處于弱網(wǎng)絡(luò)時(shí),打開一個(gè)App可能出現(xiàn)一片空白或者一直在loading,那用戶一定會(huì)很煩躁。此時(shí),如果能夠預(yù)先加載一些數(shù)據(jù),例如上一次打開App時(shí)保存的數(shù)據(jù),這樣不至于會(huì)損傷App的用戶體驗(yàn)。
下面是借助 BehaviorSubject 的特性來實(shí)現(xiàn)一個(gè)簡單的預(yù)加載類RxPreLoader。
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.subjects.BehaviorSubject;
/**
* Created by Tony Shen on 2017/6/2.
*/
public class RxPreLoader<T> {
//能夠緩存訂閱之前的最新數(shù)據(jù)
private BehaviorSubject<T> mData;
private Disposable disposable;
public RxPreLoader(T defaultValue) {
mData = BehaviorSubject.createDefault(defaultValue);
}
/**
* 發(fā)送事件
* @param object
*/
public void publish(T object) {
mData.onNext(object);
}
/**
* 訂閱事件
* @param onNext
* @return
*/
public Disposable subscribe(Consumer onNext) {
disposable = mData.subscribe(onNext);
return disposable;
}
/**
* 反訂閱
*
*/
public void dispose() {
if (disposable != null && !disposable.isDisposed()) {
disposable.dispose();
disposable = null;
}
}
/**
* 獲取緩存數(shù)據(jù)的Subject
*
* @return
*/
public BehaviorSubject<T> getCacheDataSubject() {
return mData;
}
/**
* 直接獲取最近的一個(gè)數(shù)據(jù)
*
* @return
*/
public T getLastCacheData() {
return mData.getValue();
}
}
可以考慮在基類的Activity/Fragment中也實(shí)現(xiàn)一個(gè)類似的RxPreLoader。
總結(jié)
RxJava 的 Subject 是一種特殊的存在,它的靈活性在使用時(shí)也會(huì)伴隨著風(fēng)險(xiǎn),沒有用好它的話會(huì)錯(cuò)過事件,并且使用時(shí)還要小心 Subject 不是線程安全的。當(dāng)然很多開源框架都在使用Subject,例如大名鼎鼎的RxLifecycle使用了BehaviorSubject。