具有更精確控制訂閱動態(tài)的專業(yè)觀察器。

首先我們有必要來了解一下什么是 Connectable Observable: 就是一種特殊的 Observable 對象,ConnectableObservable 在被訂閱時(shí)并不開始發(fā)射數(shù)據(jù),而是只有在調(diào)用 connect 操作符的時(shí)候才開始發(fā)射數(shù)據(jù),所以可以用來更靈活的控制數(shù)據(jù)發(fā)射的時(shí)機(jī)。
9.1 Public
Publish 操作符將普通的 Observable 轉(zhuǎn)換為可連接的(ConnectableObservable)。
注意:如果一個(gè) ConnectableObservable 已經(jīng)開始發(fā)射數(shù)據(jù),再對其進(jìn)行訂閱只能接受之后發(fā)射的數(shù)據(jù),訂閱之前已經(jīng)發(fā)射過的數(shù)據(jù)就丟失了。
示例代碼:見 9.2
9.2 Connect
指示一個(gè) ConnectableObservable 開始發(fā)射數(shù)據(jù)。
Connect 操作符就是用來觸發(fā) ConnectableObservable 發(fā)射數(shù)據(jù)的。調(diào)用 Connect 操作符后會返回一個(gè) Subscription 對象,通過這個(gè) Subscription 對象,我們可以調(diào)用其 unsubscribe 方法來終止數(shù)據(jù)的發(fā)射。另外,如果還沒有訂閱者訂閱的時(shí)候就應(yīng)用 Connect 操作符也是可以使其開始發(fā)射數(shù)據(jù)的。
示例代碼:
//使用 publish 操作符創(chuàng)建一個(gè) ConnectableObservable:
ConnectableObservable<Long> connectableObservable = Observable.interval(100,
TimeUnit.MILLISECONDS).take(6).publish();
// 創(chuàng)建兩個(gè) Consumer 對象
Consumer<Long> consumer1 = new Consumer<Long>(){
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "step 1 -> accept:" + aLong);
}
};
Consumer<Long> consumer2 = new Consumer<Long>(){
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "step 2 -> accept:" + aLong);
}
};
connectableObservable.subscribe(consumer1);
//延遲 300 毫秒訂閱 consumer2
connectableObservable.delay(330, TimeUnit.MILLISECONDS).subscribe(consumer2);
//如果不調(diào)用 connect 方法,connectableObservable 則不會發(fā)射數(shù)據(jù)
connectableObservable.connect();
輸出結(jié)果:
step 1 -> accept:0
step 1 -> accept:1
step 1 -> accept:2
step 1 -> accept:3
step 2 -> accept:0
step 1 -> accept:4
step 2 -> accept:1
step 1 -> accept:5
step 2 -> accept:2
step 2 -> accept:3
step 2 -> accept:4
step 2 -> accept:5
9.3 RefCount / share
讓一個(gè) ConnectableObservable 行為像普通的 Observable。

RefCount 操作符把從一個(gè) ConnectableObservable 連接和斷開的過程自動化了。調(diào)用 RefCount,返回一個(gè)普通的 Observable。當(dāng)?shù)谝粋€(gè)訂閱者訂閱這個(gè) Observable 時(shí),RefCount 連接到下層的可連接 Observable。RefCount 跟蹤有多少個(gè)觀察者訂閱它,直到最后一個(gè)觀察者完成才斷開與下層可連接 Observable 的連接。
示例代碼:
//使用 publish 操作符創(chuàng)建一個(gè) ConnectableObservable:
ConnectableObservable<Long> connectableObservable = Observable.interval(100,
TimeUnit.MILLISECONDS).take(6).publish();
// 創(chuàng)建兩個(gè) Consumer 對象
Consumer<Long> consumer1 = new Consumer<Long>(){
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "step 1 -> accept:" + aLong);
}
};
Consumer<Long> consumer2 = new Consumer<Long>(){
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "step 2 -> accept:" + aLong);
}
};
//這兩種實(shí)現(xiàn)方法結(jié)果一致
// Observable observable = Observable.interval(100, TimeUnit.MILLISECONDS).take(6).share();
Observable observable = connectableObservable.refCount();
observable.subscribe(consumer1);
//延遲 300 毫秒訂閱 consumer2
observable.delay(330, TimeUnit.MILLISECONDS).subscribe(consumer2);
輸出結(jié)果:
step 1 -> accept:0
step 1 -> accept:1
step 1 -> accept:2
step 1 -> accept:3
step 2 -> accept:0
step 1 -> accept:4
step 2 -> accept:1
step 1 -> accept:5
step 2 -> accept:2
step 2 -> accept:3
step 2 -> accept:4
step 2 -> accept:5
9.3.1 Share
Share 操作符作用與 refCount 相似。內(nèi)部實(shí)現(xiàn)為:
public final Observable<T> share() {
return publish().refCount();
}
9.4 replay
保證所有的觀察者收到相同的數(shù)據(jù)序列,即使它們在 Observable 開始發(fā)射數(shù)據(jù)之后才訂閱。

Replay 操作符返回一個(gè) ConnectableObservable 對象并且可以緩存其發(fā)射過的數(shù)據(jù),這樣即使有訂閱者在其發(fā)射數(shù)據(jù)之后進(jìn)行訂閱也能收到其之前發(fā)射過的數(shù)據(jù)。不過使用 Replay 操作符我們最好還是限定其緩存的大小,否則緩存的數(shù)據(jù)太多了可會占用很大的一塊內(nèi)存。
Replay 操作符能指定緩存的大小或者時(shí)間,這樣能避免耗費(fèi)太多內(nèi)存。
- Javadoc: replay()
- Javadoc: replay(int)
- Javadoc: replay(long,TimeUnit)
- Javadoc: replay(int,long,TimeUnit)
有一種 replay 返回一個(gè)普通的 Observable。它可以接受一個(gè)變換函數(shù)為參數(shù),這個(gè)函數(shù)接受原始 Observable 發(fā)射的數(shù)據(jù)項(xiàng)為參數(shù),返回結(jié)果 Observable 要發(fā)射的一項(xiàng)數(shù)據(jù)。因此,這個(gè)操作符其實(shí)是 replay 變換之后的數(shù)據(jù)項(xiàng)。
- Javadoc: replay(Function)
- Javadoc: replay(Function,int)
- Javadoc: replay(Function,long,TimeUnit)
- Javadoc: replay(Function,int,long,TimeUnit)
示例代碼 1:
Observable<Long> ob1 = Observable.just(1L, 12L);
//緩存兩次,三秒時(shí)間內(nèi)有效。
ConnectableObservable<Long> connectableObservable = ob1.replay(2, 3, TimeUnit.SECONDS);
//ConnectableObservable<Long> connectableObservable = ob1.publish();
Consumer<Long> consumer1 = new Consumer<Long>(){
@Override
public void accept(@NonNull Long aLong) throws Exception {
Log.e(TAG, "accept:" + aLong);
}
};
Disposable disposable = connectableObservable.subscribe(consumer1);
connectableObservable.connect();
try {
//間隔一秒取消訂閱,然后重新訂閱,有緩存數(shù)據(jù)
Thread.sleep(1000);
disposable.dispose();
disposable = connectableObservable.subscribe(consumer1);
//間隔兩秒取消訂閱后,然后重新訂閱,緩存數(shù)據(jù)失效
Thread.sleep(2000);
disposable.dispose();
disposable = connectableObservable.subscribe(consumer1);
} catch (InterruptedException e) {
e.printStackTrace();
}
輸出結(jié)果:
accept:1
accept:12
accept:1
accept:12