Rxjava操作符講解(4)Combining 結(jié)合操作

之前沒有了解過Rxjava的童鞋,建議先閱讀《Rxjava 從入門到開發(fā)》這篇文章,對入門比較有幫助。

StartWith操作符

作用:在數(shù)據(jù)序列的開頭增加一項(xiàng)數(shù)據(jù)

Observable.just(1,2).startWith(Observable.just(-1)).observeOn(Schedulers.io()).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer aLong) {
            Logger.i(String.valueOf(aLong));
        }
    });

輸出結(jié)果:

例子說明:在Startwith的作用下-1輸出在結(jié)果的最前面。

CombineLatest操作符

作用:當(dāng)兩個Observables中的任何一個發(fā)射了數(shù)據(jù)時,使用一個函數(shù)結(jié)合每個Observable發(fā)射的最近數(shù)據(jù)項(xiàng),并且基于這個函數(shù)的結(jié)果發(fā)射數(shù)據(jù)。

     //產(chǎn)生0,5,10數(shù)列
    Observable<Long> observable1 = Observable.interval(1000, TimeUnit.MILLISECONDS)
            .map(new Func1<Long, Long>() {
                @Override
                public Long call(Long aLong) {
                    return aLong * 5;
                }
            }).take(3);

    //產(chǎn)生0,10,20數(shù)列
    Observable<Long> observable2 = Observable.interval(500, TimeUnit.MILLISECONDS)
            .map(new Func1<Long, Long>() {
                @Override
                public Long call(Long aLong) {
                    return aLong * 10;
                }
            }).take(3);


    subscription=Observable.combineLatest(observable1, observable2, new Func2<Long, Long, Long>() {
        @Override
        public Long call(Long aLong, Long aLong2) {
            System.out.println("aLong: " + aLong+"  aLong2: "+aLong2);

            return aLong+aLong2;
        }
    }).subscribe(new Subscriber<Long>() {
        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }

        @Override
        public void onError(Throwable e) {
            System.err.println("Error: " + e.getMessage());
        }

        @Override
        public void onNext(Long aLong) {
            System.out.println("Next: " + aLong);
        }
    });

輸出結(jié)果:

例子說明:observable1輸出的是0,5,10;observable2輸出的是0,10,20,輸出結(jié)果的關(guān)系如下圖:


數(shù)字的間隔就是時間差。

Join操作符

作用:join操作符把類似于combineLatest操作符,也是兩個Observable產(chǎn)生的結(jié)果進(jìn)行合并,合并的結(jié)果組成一個新的Observable,但是join操作符可以控制每個Observable產(chǎn)生結(jié)果的生命周期,在每個結(jié)果的生命周期內(nèi),可以與另一個Observable產(chǎn)生的結(jié)果按照一定的規(guī)則進(jìn)行合并。

join方法有四個參數(shù),具體定義如下:
observableA.join(observableB, observableA產(chǎn)生結(jié)果生命周期控制函數(shù), observableB產(chǎn)生結(jié)果生命周期控制函數(shù), observableA產(chǎn)生的結(jié)果與observableB產(chǎn)生的結(jié)果的合并規(guī)則)

    //產(chǎn)生0,5數(shù)列
    Observable<Long> observable1 = Observable.interval(1000, TimeUnit.MILLISECONDS)
            .map(new Func1<Long, Long>() {
                @Override
                public Long call(Long aLong) {
                    return aLong * 5;
                }
            }).take(3);

    //產(chǎn)生0,10,20數(shù)列
    Observable<Long> observable2 = Observable.interval(1000, TimeUnit.MILLISECONDS)
            .map(new Func1<Long, Long>() {
                @Override
                public Long call(Long aLong) {
                    return aLong * 10;
                }
            }).take(3);

    subscription=observable1.join(observable2, new Func1<Long, Observable<Long>>() {
        @Override
        public Observable<Long> call(Long aLong) {
            return Observable.just(aLong).delay(500, TimeUnit.MILLISECONDS);
        }
    }, new Func1<Long, Observable<Long>>() {
        @Override
        public Observable<Long> call(Long aLong) {
            return Observable.just(aLong).delay(1000, TimeUnit.MILLISECONDS);
        }
    }, new Func2<Long, Long, Long>() {
        @Override
        public Long call(Long aLong, Long aLong2) {
            System.out.println("aLong: " + aLong+"  aLong2: "+aLong2);
            return aLong + aLong2;
        }
    }).subscribe(new Subscriber<Long>() {
        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }

        @Override
        public void onError(Throwable e) {
            System.err.println("Error: " + e.getMessage());
        }

        @Override
        public void onNext(Long aLong) {
            System.out.println("Next: " + aLong);
        }
    });

輸出結(jié)果:

例子說明:其實(shí)輸出排列方式和combineLatest差不多,主要差別在通過join可以進(jìn)一步控制observable結(jié)果的輸出時間周期,另外順便一提的是如果observable1輸出的數(shù)據(jù)只有兩個,那么即使observable2輸出3個數(shù)據(jù),但最終輸出的也僅有兩個。

Merge操作符

作用:將多個Observable合并為一個。

    //產(chǎn)生0,5數(shù)列
    Observable<Long> observable1 = Observable.interval(1000, TimeUnit.MILLISECONDS)
            .map(new Func1<Long, Long>() {
                @Override
                public Long call(Long aLong) {
                    return aLong * 5;
                }
            }).take(2);

    //產(chǎn)生0,10,20數(shù)列
    Observable<Long> observable2 = Observable.interval(1000, TimeUnit.MILLISECONDS)
            .map(new Func1<Long, Long>() {
                @Override
                public Long call(Long aLong) {
                    return aLong * 10;
                }
            }).take(3);

    subscription=Observable.merge(observable1,observable2).subscribe(new Action1<Long>() {
        @Override
        public void call(Long aLong) {
            System.out.println("out: " + aLong);
        }
    });

輸出結(jié)果:

例子說明:把observable1,observable2的數(shù)據(jù)合并輸出。

switchOnNext操作符

作用:把一組Observable轉(zhuǎn)換成一個Observable,轉(zhuǎn)換規(guī)則為:對于這組Observable中的每一個Observable所產(chǎn)生的結(jié)果,如果在同一個時間內(nèi)存在兩個或多個Observable提交的結(jié)果,只取最后一個Observable提交的結(jié)果給訂閱者。

Observable<Observable<Long>> observable = Observable.interval(500, TimeUnit.MILLISECONDS).map(new Func1<Long, Observable<Long>>() {
        @Override
        public Observable<Long> call(Long aLong) {
            //每隔200毫秒產(chǎn)生一組數(shù)據(jù)(0,10,20,30,40)
            return Observable.interval(200, TimeUnit.MILLISECONDS).map(new Func1<Long, Long>() {
                @Override
                public Long call(Long aLong) {
                    return aLong * 10;
                }
            }).take(3);
        }
    }).take(2);

    subscription=Observable.switchOnNext(observable).subscribe(new Action1<Long>() {
        @Override
        public void call(Long aLong) {
            System.out.println("Next:" + aLong);
        }
    });

輸出結(jié)果:

Paste_Image.png

例子說明:沒人Switch的作用下,輸出結(jié)果為“0,10,20,0,10,20”,但是在Switch的作用下,結(jié)果為“0,10,0,10,20”,少了“20”,這就是switch的作用。

zip操作符

作用:把兩個observable提交的結(jié)果,嚴(yán)格按照順序進(jìn)行合并。

   Observable<Long> observable1=  Observable.interval(1, TimeUnit.SECONDS).take(3);
    Observable<Long> observable2=  Observable.interval(1, TimeUnit.SECONDS).take(4);
    subscription=Observable.zip(observable1, observable2, new Func2<Long, Long, Integer>() {

        @Override
        public Integer call(Long aLong, Long aLong2) {
            System.out.println("aLong:" + aLong+"   aLong2:  "+aLong2);

            return (int) (aLong+aLong);
        }
    }).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer integer) {
            System.out.println("result:" + integer);
        }
    });

輸出結(jié)果:

例子說明:最終輸出為“0,2,4”,兩個Observable輸出的數(shù)據(jù)一一按順序?qū)?yīng),無法對應(yīng)的數(shù)據(jù)就拋棄,例子中的observable2本應(yīng)該輸出4個數(shù)據(jù)take(4)但是由于操作符zip的作用下,最終只輸出3個數(shù)據(jù)。

以上便是結(jié)合操作符的主要內(nèi)容了。rxjava的主要操作符講了差不多,有啥問題歡迎大家留言交流下??,歡迎關(guān)注。

附錄:
文章demo

參考:
ReactiveX/RxJava文檔中文版
Android RxJava使用介紹(四) RxJava的操作符

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容