RxJava2.0 操作符(9)—— Connectable Observable 連接操作符

具有更精確控制訂閱動態(tài)的專業(yè)觀察器。

Connect
Connect

首先我們有必要來了解一下什么是 Connectable Observable: 就是一種特殊的 Observable 對象,ConnectableObservable 在被訂閱時(shí)并不開始發(fā)射數(shù)據(jù),而是只有在調(diào)用 connect 操作符的時(shí)候才開始發(fā)射數(shù)據(jù),所以可以用來更靈活的控制數(shù)據(jù)發(fā)射的時(shí)機(jī)。

9.1 Public

Publish 操作符將普通的 Observable 轉(zhuǎn)換為可連接的(ConnectableObservable)。

注意:如果一個(gè) ConnectableObservable 已經(jīng)開始發(fā)射數(shù)據(jù),再對其進(jìn)行訂閱只能接受之后發(fā)射的數(shù)據(jù),訂閱之前已經(jīng)發(fā)射過的數(shù)據(jù)就丟失了。

示例代碼:見 9.2

9.2 Connect

指示一個(gè) ConnectableObservable 開始發(fā)射數(shù)據(jù)。

Connect 操作符就是用來觸發(fā) ConnectableObservable 發(fā)射數(shù)據(jù)的。調(diào)用 Connect 操作符后會返回一個(gè) Subscription 對象,通過這個(gè) Subscription 對象,我們可以調(diào)用其 unsubscribe 方法來終止數(shù)據(jù)的發(fā)射。另外,如果還沒有訂閱者訂閱的時(shí)候就應(yīng)用 Connect 操作符也是可以使其開始發(fā)射數(shù)據(jù)的。

示例代碼:

//使用 publish 操作符創(chuàng)建一個(gè) ConnectableObservable:
ConnectableObservable<Long> connectableObservable = Observable.interval(100,
    TimeUnit.MILLISECONDS).take(6).publish();

// 創(chuàng)建兩個(gè) Consumer 對象
Consumer<Long> consumer1 = new Consumer<Long>(){
    @Override
    public void accept(@NonNull Long aLong) throws Exception {
        Log.e(TAG, "step 1 -> accept:" + aLong);
    }
};

Consumer<Long> consumer2 = new Consumer<Long>(){
    @Override
    public void accept(@NonNull Long aLong) throws Exception {
        Log.e(TAG, "step 2 -> accept:" + aLong);
    }
};

connectableObservable.subscribe(consumer1);
//延遲 300 毫秒訂閱 consumer2
connectableObservable.delay(330, TimeUnit.MILLISECONDS).subscribe(consumer2);
//如果不調(diào)用 connect 方法,connectableObservable 則不會發(fā)射數(shù)據(jù)
connectableObservable.connect();

輸出結(jié)果:

step 1 -> accept:0
step 1 -> accept:1
step 1 -> accept:2
step 1 -> accept:3
step 2 -> accept:0
step 1 -> accept:4
step 2 -> accept:1
step 1 -> accept:5
step 2 -> accept:2
step 2 -> accept:3
step 2 -> accept:4
step 2 -> accept:5

9.3 RefCount / share

讓一個(gè) ConnectableObservable 行為像普通的 Observable。

RefCount
RefCount

RefCount 操作符把從一個(gè) ConnectableObservable 連接和斷開的過程自動化了。調(diào)用 RefCount,返回一個(gè)普通的 Observable。當(dāng)?shù)谝粋€(gè)訂閱者訂閱這個(gè) Observable 時(shí),RefCount 連接到下層的可連接 Observable。RefCount 跟蹤有多少個(gè)觀察者訂閱它,直到最后一個(gè)觀察者完成才斷開與下層可連接 Observable 的連接。

示例代碼:

//使用 publish 操作符創(chuàng)建一個(gè) ConnectableObservable:
ConnectableObservable<Long> connectableObservable = Observable.interval(100, 
    TimeUnit.MILLISECONDS).take(6).publish();

// 創(chuàng)建兩個(gè) Consumer 對象
Consumer<Long> consumer1 = new Consumer<Long>(){
    @Override
    public void accept(@NonNull Long aLong) throws Exception {
        Log.e(TAG, "step 1 -> accept:" + aLong);
    }
};

Consumer<Long> consumer2 = new Consumer<Long>(){
    @Override
    public void accept(@NonNull Long aLong) throws Exception {
        Log.e(TAG, "step 2 -> accept:" + aLong);
    }
};

//這兩種實(shí)現(xiàn)方法結(jié)果一致
// Observable observable =  Observable.interval(100, TimeUnit.MILLISECONDS).take(6).share();
Observable observable = connectableObservable.refCount();

observable.subscribe(consumer1);
//延遲 300 毫秒訂閱 consumer2
observable.delay(330, TimeUnit.MILLISECONDS).subscribe(consumer2);

輸出結(jié)果:

step 1 -> accept:0
step 1 -> accept:1
step 1 -> accept:2
step 1 -> accept:3
step 2 -> accept:0
step 1 -> accept:4
step 2 -> accept:1
step 1 -> accept:5
step 2 -> accept:2
step 2 -> accept:3
step 2 -> accept:4
step 2 -> accept:5

9.3.1 Share

Share 操作符作用與 refCount 相似。內(nèi)部實(shí)現(xiàn)為:

public final Observable<T> share() {
    return publish().refCount();
}

9.4 replay

保證所有的觀察者收到相同的數(shù)據(jù)序列,即使它們在 Observable 開始發(fā)射數(shù)據(jù)之后才訂閱。

replay
replay

Replay 操作符返回一個(gè) ConnectableObservable 對象并且可以緩存其發(fā)射過的數(shù)據(jù),這樣即使有訂閱者在其發(fā)射數(shù)據(jù)之后進(jìn)行訂閱也能收到其之前發(fā)射過的數(shù)據(jù)。不過使用 Replay 操作符我們最好還是限定其緩存的大小,否則緩存的數(shù)據(jù)太多了可會占用很大的一塊內(nèi)存。
Replay 操作符能指定緩存的大小或者時(shí)間,這樣能避免耗費(fèi)太多內(nèi)存。

有一種 replay 返回一個(gè)普通的 Observable。它可以接受一個(gè)變換函數(shù)為參數(shù),這個(gè)函數(shù)接受原始 Observable 發(fā)射的數(shù)據(jù)項(xiàng)為參數(shù),返回結(jié)果 Observable 要發(fā)射的一項(xiàng)數(shù)據(jù)。因此,這個(gè)操作符其實(shí)是 replay 變換之后的數(shù)據(jù)項(xiàng)。

示例代碼 1:

Observable<Long> ob1 = Observable.just(1L, 12L);
//緩存兩次,三秒時(shí)間內(nèi)有效。
ConnectableObservable<Long> connectableObservable = ob1.replay(2, 3, TimeUnit.SECONDS);
//ConnectableObservable<Long> connectableObservable = ob1.publish();

Consumer<Long> consumer1 = new Consumer<Long>(){
    @Override
    public void accept(@NonNull Long aLong) throws Exception {
        Log.e(TAG, "accept:" + aLong);
    }
};
Disposable disposable = connectableObservable.subscribe(consumer1);
connectableObservable.connect();

try {
    //間隔一秒取消訂閱,然后重新訂閱,有緩存數(shù)據(jù)
    Thread.sleep(1000);
    disposable.dispose();
    disposable = connectableObservable.subscribe(consumer1);

    //間隔兩秒取消訂閱后,然后重新訂閱,緩存數(shù)據(jù)失效
    Thread.sleep(2000);
    disposable.dispose();
    disposable = connectableObservable.subscribe(consumer1);
} catch (InterruptedException e) {
    e.printStackTrace();
}

輸出結(jié)果:

accept:1
accept:12
accept:1
accept:12
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 作者: maplejaw本篇只解析標(biāo)準(zhǔn)包中的操作符。對于擴(kuò)展包,由于使用率較低,如有需求,請讀者自行查閱文檔。 創(chuàng)...
    maplejaw_閱讀 46,201評論 8 93
  • 本篇文章介主要紹RxJava中操作符是以函數(shù)作為基本單位,與響應(yīng)式編程作為結(jié)合使用的,對什么是操作、操作符都有哪些...
    嘎啦果安卓獸閱讀 2,978評論 0 10
  • 注:只包含標(biāo)準(zhǔn)包中的操作符,用于個(gè)人學(xué)習(xí)及備忘參考博客:http://blog.csdn.net/maplejaw...
    小白要超神閱讀 1,053評論 0 3
  • RxJava正在Android開發(fā)者中變的越來越流行。唯一的問題就是上手不容易,尤其是大部分人之前都是使用命令式編...
    劉啟敏閱讀 2,013評論 1 7
  • 注:只包含標(biāo)準(zhǔn)包中的操作符,用于個(gè)人學(xué)習(xí)及備忘參考博客:http://blog.csdn.net/maplejaw...
    小白要超神閱讀 2,366評論 2 8

友情鏈接更多精彩內(nèi)容