RxJava常用操作符startWith、merge、concat、zip、combineLatest、retryWhen

1.startWith:給你被觀察者的數(shù)據(jù)流前再增加一點(diǎn)同類型的數(shù)據(jù)或者增加一個(gè)數(shù)據(jù)流
startWith.png
Observable.just("a", "b", "c").startWith("呵呵", "哦哦")
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.e("Tag", "call: " + s);
                    }
                });

or

Observable.just("a", "b", "c")
                .startWith(Observable.from(new String[]{"呵呵", "哦哦"}))
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.e("Tag", "call: " + s);
                    }
                });

輸出日志:
07-19 15:50:50.023 27691-27691/com.example.audiotalk E/Tag: call: 呵呵
07-19 15:50:50.023 27691-27691/com.example.audiotalk E/Tag: call: 哦哦
07-19 15:50:50.023 27691-27691/com.example.audiotalk E/Tag: call: a
07-19 15:50:50.023 27691-27691/com.example.audiotalk E/Tag: call: b
07-19 15:50:50.023 27691-27691/com.example.audiotalk E/Tag: call: c

2.merge:把多個(gè)被觀察者合并到一個(gè)被觀察者身上輸出,但是可能會(huì)讓合并的被觀察者發(fā)射的數(shù)據(jù)交錯(cuò)。
merge.png
Observable.merge(Observable.just("a", "b", "c"), Observable.just("d", "e", "f"),
                               Observable.just("g", "h", "i"))
                .subscribe(s -> Log.e("Tag", "merge: " + s));

輸出日志:
07-19 15:55:51.304 29657-29657/com.example.audiotalk E/Tag: merge: a
07-19 15:55:51.304 29657-29657/com.example.audiotalk E/Tag: merge: b
07-19 15:55:51.304 29657-29657/com.example.audiotalk E/Tag: merge: c
07-19 15:55:51.305 29657-29657/com.example.audiotalk E/Tag: merge: d
07-19 15:55:51.305 29657-29657/com.example.audiotalk E/Tag: merge: e
07-19 15:55:51.305 29657-29657/com.example.audiotalk E/Tag: merge: f
07-19 15:55:51.305 29657-29657/com.example.audiotalk E/Tag: merge: g
07-19 15:55:51.305 29657-29657/com.example.audiotalk E/Tag: merge: h
07-19 15:55:51.305 29657-29657/com.example.audiotalk E/Tag: merge: i

3.concat:同是合并數(shù)據(jù),但是嚴(yán)格按照順序發(fā)射,一個(gè)被觀察者數(shù)據(jù)發(fā)送完前不會(huì)發(fā)送后一個(gè)被觀察者的數(shù)據(jù)。所以說(shuō)concat肯定是不會(huì)發(fā)生數(shù)據(jù)交錯(cuò)發(fā)射的情況。
concat.png
Observable.concat(Observable.just("a", "b", "c"), Observable.just("d", "e", "f"),
                               Observable.just("g", "h","i"))
                  .subscribe(s -> Log.e("concat", "initTest: " + s));

輸出日志:
07-19 15:59:42.885 30429-30429/com.example.audiotalk E/concat: initTest: a
07-19 15:59:42.885 30429-30429/com.example.audiotalk E/concat: initTest: b
07-19 15:59:42.885 30429-30429/com.example.audiotalk E/concat: initTest: c
07-19 15:59:42.885 30429-30429/com.example.audiotalk E/concat: initTest: d
07-19 15:59:42.885 30429-30429/com.example.audiotalk E/concat: initTest: e
07-19 15:59:42.886 30429-30429/com.example.audiotalk E/concat: initTest: f
07-19 15:59:42.886 30429-30429/com.example.audiotalk E/concat: initTest: g
07-19 15:59:42.886 30429-30429/com.example.audiotalk E/concat: initTest: h
07-19 15:59:42.886 30429-30429/com.example.audiotalk E/concat: initTest: i

4.zip:合并多個(gè)被觀察者發(fā)出的數(shù)據(jù)項(xiàng),但是如果一個(gè)被觀察者的數(shù)據(jù)更多,多出來(lái)的那部分不會(huì)被發(fā)送。(zipWith和zip類似)

此操作符適用情況:比如說(shuō)我們?cè)谡?qǐng)求第三個(gè)接口的時(shí)候需要用到前兩個(gè)接口請(qǐng)求成功之后返回的數(shù)據(jù),這時(shí)候我們可能會(huì)想到使用嵌套請(qǐng)求,在第一個(gè)接口請(qǐng)求成功之后,再請(qǐng)求第二個(gè)接口,第二個(gè)接口請(qǐng)求成功之后,再請(qǐng)求第三個(gè)接口。但是這樣嵌套起來(lái)代碼可讀性非常差。使用zip操作符就清晰明朗很多。
zip

zip.png
Observable.zip(Observable.just(1), Observable.just(2), new Func2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) {
                return integer+integer2;
            }
        }).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                // TODO: 2018/7/19 0019  do next request
                
            }
        });

zipWith

Observable.just(1).zipWith( Observable.just(2), new Func2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) {
                return integer+integer2;
            }
        }).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                // TODO: 2018/7/19 0020  do next request
                
            }
        });
5.combineLatest:他可以組合兩個(gè)Observable,進(jìn)行一定的操作之后,再次發(fā)射下去

它繼續(xù)發(fā)射的前提是:其中的一個(gè)Observable還有數(shù)據(jù)沒(méi)有發(fā)射,那么,他講兩個(gè)Observable目前最新發(fā)射的數(shù)據(jù)組合在一起,比如上面,第一個(gè)Observable最新的數(shù)據(jù)是8,然后第二個(gè)的依次在變,然后再把他們組合在一起。8&&10 8&&11 ...

combineLatest.png
Observable.combineLatest(Observable.range(7, 2),
                Observable.range(10, 4), new Func2<Integer, Integer, String>() {
                    @Override
                    public String call(Integer integer, Integer integer2) {
                        return integer + "&&" + integer2;
                    }
                }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.e("combineLatest", s);
            }
        });

輸出日志:
07-19 17:08:56.778 6545-6545/com.example.audiotalk E/combineLatest: 8&&10
07-19 17:08:56.778 6545-6545/com.example.audiotalk E/combineLatest: 8&&11
07-19 17:08:56.778 6545-6545/com.example.audiotalk E/combineLatest: 8&&12
07-19 17:08:56.778 6545-6545/com.example.audiotalk E/combineLatest: 8&&13

6.retryWhen:請(qǐng)求失敗重試

下面是我自定義的一個(gè)請(qǐng)求失敗重試的類,供參考(可根據(jù)請(qǐng)求失敗返回的異常信息,自定義是否需要重試請(qǐng)求)

retryWhen.png

使用方法:

Observable.just(2)//這里換成自己的接口請(qǐng)求
                .retryWhen(new RetryWhenProcess(5, 3))//請(qǐng)求失敗之后延時(shí)5,秒重試,總共重試次數(shù)3次
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {

                    }
                }, new Action1<Throwable>() {
                    @Override
                    public void call(Throwable throwable) {

                    }
                });

封裝的請(qǐng)求失敗重試類

public static class RetryWhenProcess implements Func1<Observable<? extends Throwable>, Observable<?>> {

        private long mInterval;//請(qǐng)求失敗延時(shí)幾秒之后重試
        private int times;//重試次數(shù)
        private int count;

        public RetryWhenProcess(long interval, int times) {
            mInterval = interval;
            this.times = times;
        }

        @Override
        public Observable<?> call(Observable<? extends Throwable> observable) {
            return observable.flatMap((Func1<Throwable, Observable<?>>) throwable -> {

                if (throwable instanceof UnknownHostException/*||throwable instanceof SocketTimeoutException*/) {
                    return Observable.error(throwable);
                }
                if (this.times <= count) {
                    return Observable.error(throwable);
                }
                count++;
                return Observable.just(Observable.timer((long) Math.pow(mInterval, 1), TimeUnit.SECONDS));
            });
        }
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容