RxJava部分操作符解析

今天我們來看部分RxJava相關(guān)的知識,主要是上一篇RxJava內(nèi)存泄漏的一種解決方案提到的開源框架RxLifecycle里面會涉及到的知識點,有下面幾個:

1 Subject
2 takeUntil
3 filter
4 compose

1.Subject

從代碼可以看出來Subject既可以當觀察者也可以當被觀察者。

public abstract class Subject<T> extends Observable<T> implements Observer<T>

所以可以在生命周期中通過Subject發(fā)送事件然后又自己接收,從而根據(jù)事件類型做相應(yīng)的操作。

Subject總共有四種類型

1 AsyncSubject
2 BehaviorSubject
3 PublishSubject
4 ReplaySubject

今天我們就說下第二種類型BehaviorSubject,它可以給訂閱者發(fā)送訂閱前最近的事件和訂閱后發(fā)送的事件:

BehaviorSubject Rx.PNG

圖中橙色的就是訂閱前最近發(fā)送的事件,在訂閱后也可以收到。文字解釋始終太蒼白,我們來看下代碼:

BehaviorSubject<Integer> behaviorSubject = BehaviorSubject.create();
        behaviorSubject.onNext(1);
        behaviorSubject.onNext(2);
        behaviorSubject.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Timber.tag(TAG).d("running num : " + integer);
            }
        });
        behaviorSubject.onNext(3);
        behaviorSubject.onNext(4);

上面代碼運行結(jié)果就是收到2, 3,4


behaviorSubject.PNG

2.takeUntil

這是一個操作符,可以這樣用

AObservable.takeUntil(BObservable)

可以AObservable監(jiān)聽另外一個BObservable,如果BObservable開始發(fā)送數(shù)據(jù),AObservable就不再發(fā)送數(shù)據(jù)。
看一下官方的圖片解釋,B發(fā)送0數(shù)據(jù)后,A就停止發(fā)送數(shù)據(jù)了。

takeUntil.PNG

talk is cheap, show me the code:

Observable.interval(1, TimeUnit.SECONDS).
           subscribeOn(Schedulers.io()).
           takeUntil(Observable.timer(5, TimeUnit.SECONDS)).
           subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long num) throws Exception {
                    Timber.tag(TAG).d("running num : " + num);
                }
            });

上面代碼的意思就是從0開發(fā)每隔1秒發(fā)送一個數(shù)據(jù),5s時停止發(fā)送,看下運行結(jié)果,和我們的預(yù)期完美一致:


takeUntil Result.PNG

3.filter

filter操作符就是過濾的意思,只有事件滿足過濾條件時被觀察者才會發(fā)送給觀察者??聪鹿俜降慕忉寛D,很清晰明了我就不做解釋了哈。


filter.PNG

看一下怎么用,這個代碼的意思還是每個1s發(fā)送數(shù)據(jù),但是會進行過濾只發(fā)送偶數(shù),也是5秒后停止發(fā)送:

Observable.interval(1, TimeUnit.SECONDS).
                subscribeOn(Schedulers.io()).
                filter(new Predicate<Long>() {
                    @Override
                    public boolean test(Long aLong) throws Exception {
                        return aLong % 2 == 0;
                    }
                }).
                takeUntil(Observable.timer(5, TimeUnit.SECONDS)).
                subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long num) throws Exception {
                        Timber.tag(TAG).e("running num : " + num);
                    }
                });

上面代碼的運行效果,確實是只收到了偶數(shù)。


filter result.PNG

4.compose

compose操作符是用來對Observable進行轉(zhuǎn)換操作的,并且可以保證調(diào)用鏈不被破壞。
比如我們經(jīng)常這樣用:

Observable.interval(1,TimeUnit.SECONDS)
                .subscribeOn(Schedulers.io()).
                observeOn(AndroidSchedulers.mainThread());

這部分代碼經(jīng)常寫,怎么進行封裝呢?可能有的小伙伴立馬就想到下面的方法:

private Observable composeObservable(Observable observable){
        return observable.subscribeOn(Schedulers.io()).
                        observeOn(AndroidSchedulers.mainThread());
}

但是上面這樣用就破壞了調(diào)用鏈了,因為你肯定得這樣調(diào)用,這樣就會變得怪怪的,不是Observable開頭了,變成函數(shù)開頭。

composeObservable(Observable.interval(1,TimeUnit.SECONDS)).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
            }
});

這個問題用compose就可以完美解決:

Observable.interval(1, TimeUnit.SECONDS).
                compose(bindUntil(5)).
                subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long num) throws Exception {
                        Timber.tag(TAG).d("running num : " + num);
                    }
 });

private ObservableTransformer<Long, Long> bindUntil(final long deleyTime) {
        return new ObservableTransformer<Long, Long>() {
            @Override
            public ObservableSource<Long> apply(Observable<Long> upstream) {
                return upstream.subscribeOn(Schedulers.io()).takeUntil(Observable.timer(deleyTime, TimeUnit.SECONDS));
            }
        };
}

5.總結(jié)

上面的內(nèi)容是假定大家有一點點RxJava的知識的,沒有涉及到基本的使用。本次分享可能看起來毫無章法哈,其實還是有針對目的的,就是前面提到的開源框架RxLifecycle,這次分享就是針對里面用到的RxJava的一些知識點進行解析。RxJava的操作符挺多的,也不太可能也沒必要一個個進行分析,用到的時候進行查找官方文檔就可以了。

下面會用前面提到的這些知識點來自己實現(xiàn)一個類似于RxLifecycle的小Demo,歡迎大家關(guān)注和點贊哈。

最后感謝@右傾傾的理解和支持哈。

以上!

歡迎關(guān)注公眾號:JueCode

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容