RxJava (源碼待續(xù))學(xué)習(xí)之組合操作符

上一篇:RxJava 源碼學(xué)習(xí)之過濾操作符

今天我們繼續(xù)學(xué)習(xí)RxJava的組合操作符。

StartWith

  • 作用分析

StartWith操作符可以在發(fā)射一個Observable的數(shù)據(jù)之前先發(fā)射一個指定的數(shù)據(jù)序列。
操作符

Paste_Image.png

  • 示例代碼

可接受一個Iterable或者多個Observable作為函數(shù)的參數(shù)。
Javadoc: startWith(Iterable)
Javadoc: startWith(T) (最多接受九個參數(shù))

//測試代碼
Integer[] nums = {1,2,3,4};
Observable.from(nums)
        .startWith(9,8)
        .subscribe(new Subscriber<Integer>() {
            @Override
            public void onNext(Integer item) {System.out.println("Next: " + item);            }
            @Override
            public void onError(Throwable error) {  System.err.println("Error: " + error.getMessage()); }
            @Override
            public void onCompleted() { System.out.println("Sequence complete."); }
});
//###########################################
輸出結(jié)果:
Next: 9
Next: 8
Next: 1
Next: 2
Next: 3
Next: 4
Sequence complete.

Merge

  • 作用分析

merge可以將多個Observables的輸出合并,就好像它們是一個單個的Observable一樣。merge可能會讓合并的Observables發(fā)射的數(shù)據(jù)交錯(有一個類似的操作符 Concat不會讓數(shù)據(jù)交錯,它會按順序一個接著一個發(fā)射多個Observables的發(fā)射物)。
注意:如果傳遞給merge的任何一個的Observable發(fā)射了onError通知終止了,merge操作符生成的Observable也會立即以onError通知終止。如果你想讓它繼續(xù)發(fā)射數(shù)據(jù),在最后才報告錯誤,可以使用mergeDelayError。

  • 示例代碼

Observable<Integer> odds = Observable.just(1, 3, 5).subscribeOn(someScheduler); // subscribeOn 切換線程
Observable<Integer> evens = Observable.just(2, 4, 6);
Observable.merge(odds, evens)
          .subscribe(new Subscriber<Integer>() {
        @Override
        public void onNext(Integer item) {
            System.out.println("Next: " + item);
        }

        @Override
        public void onError(Throwable error) {
            System.err.println("Error: " + error.getMessage());
        }

        @Override
        public void onCompleted() {
            System.out.println("Sequence complete.");
        }
    });
//###########################################
輸出結(jié)果:
Next: 1
Next: 3
Next: 5
Next: 2
Next: 4
Next: 6
Sequence complete.

ZIP

  • 作用分析

Zip操作符返回一個Obversable,它使用特定函數(shù)按順序結(jié)合兩個或多個Observables發(fā)射的數(shù)據(jù)項,然后它發(fā)射這個函數(shù)返回的結(jié)果。它按照嚴格的順序應(yīng)用這個函數(shù)。它只發(fā)射與發(fā)射數(shù)據(jù)項最少的那個Observable一樣多的數(shù)據(jù)。
下圖:把 shape(形狀)、size(尺寸) 和 color(顏色) 合并后,發(fā)射出來。

Paste_Image.png

  • 示例代碼

Observable.zip(
        Observable.just("a1","a2","a3","a4","a5"),
        Observable.just(1,2,3,4),
        Observable.just("b1","b2","b3","b4","b5","b6"),
        new Func3<String,Integer,String,String>(){
            @Override
            public String call(String s, Integer integer, String s2) {
                return s+"_"+integer+"_"+s2;
            }
        }).subscribe(new Subscriber<String>() {
    @Override
    public void onCompleted() {        System.out.println("onCompleted.");    }
    @Override
    public void onError(Throwable e) {        System.out.println("onError: " + e.getMessage());    }
    @Override
    public void onNext(String s) {        System.out.println("onNext: " + s);    }});
//############################################
輸出結(jié)果:
onNext: a1_1_b1
onNext: a2_2_b2
onNext: a3_3_b3
onNext: a4_4_b4
onCompleted.

Join

  • 作用分析

任何時候,只要在另一個Observable發(fā)射的數(shù)據(jù)定義的時間窗口內(nèi),這個Observable發(fā)射了一條數(shù)據(jù),就結(jié)合兩個Observable發(fā)射的數(shù)據(jù)。
比如: ObservableA 在 5s內(nèi)發(fā)射一條數(shù)據(jù) dataA1, ObservableB 這時剛好也在發(fā)射數(shù)據(jù)dataB1,就把ObservableA 的數(shù)據(jù)dataA1和 ObservableB的數(shù)據(jù)dataB1合并一起發(fā)射;5s還沒結(jié)束,ObservableB又發(fā)射數(shù)據(jù)dataB2,就把ObservableA 的數(shù)據(jù)dataA1和 ObservableB的數(shù)據(jù)dataB2合并一起發(fā)射。

Paste_Image.png
  • 示例代碼

Javadoc: Join(Observable,Func1,Func1,Func2)

  • 第二個Observable和源Observable結(jié)合。
  • Func1參數(shù):在指定的由時間窗口定義時間間隔內(nèi),源Observable發(fā)射的數(shù)據(jù)和從第二個Observable發(fā)射的數(shù)據(jù)相互配合返回的Observable。
  • Func1參數(shù):在指定的由時間窗口定義時間間隔內(nèi),第二個Observable發(fā)射的數(shù)據(jù)和從源Observable發(fā)射的數(shù)據(jù)相互配合返回的Observable。
  • Func2參數(shù):定義已發(fā)射的數(shù)據(jù)如何與新發(fā)射的數(shù)據(jù)項相結(jié)合。
Observable<Integer> create1 = Observable.create(new Observable.OnSubscribe<Integer>() {
    @Override
    public void call(Subscriber<? super Integer> subscriber) {
        for (int i = 0; i < 6; i++) {
            subscriber.onNext(i);
            try {
                Thread.sleep(600);
            } catch (InterruptedException e) {
                subscriber.onError(e);
            }
        }
    }}).subscribeOn(Schedulers.newThread());
Observable<String> create2 = Observable.create(new Observable.OnSubscribe<String>() {
    @Override
    public void call(Subscriber<? super String> subscriber) {
        for (int i = 0; i < 4; i++) {
            subscriber.onNext("hello_"+ i);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                subscriber.onError(e);
            }
        }
    }}).subscribeOn(Schedulers.newThread());
create1.join(create2,
        new Func1<Integer, Observable<Long>>() {
            @Override
            public Observable<Long> call(Integer integer) {
                return Observable.timer(1000, TimeUnit.MILLISECONDS);
            }
        },
        new Func1<String, Observable<Long>>() {
            @Override
            public Observable<Long> call(String s) {
                return Observable.timer(1000, TimeUnit.MILLISECONDS);
            }
        },
        new Func2<Integer, String, String>() {
            @Override
            public String call(Integer integer1, String s) {
                return integer1 + "-" + s;
            }
        }).subscribe(new Subscriber<String>() {
    @Override
    public void onCompleted() {        System.out.println("onCompleted.");    }
    @Override
    public void onError(Throwable e) {        System.out.println("onError: " + e.getMessage());    }
    @Override
    public void onNext(String s) {        System.out.println("onNext: " + s);    }});
//##############################################
輸出結(jié)果:
onNext: 0-hello_0
onNext: 1-hello_0
onNext: 1-hello_1
onNext: 2-hello_1
onNext: 3-hello_1
onNext: 3-hello_2
onNext: 2-hello_2
onNext: 4-hello_2
onNext: 4-hello_3
onNext: 5-hello_3

Switch

有這樣一個復(fù)雜的場景就是在一個subscribe-unsubscribe的序列里我們能夠從一個Observable自動取消訂閱來訂閱一個新的Observable。

RxJava的switch(),正如定義的,將一個發(fā)射多個Observables的Observable轉(zhuǎn)換成另一個單獨的Observable,后者發(fā)射那些Observables最近發(fā)射的數(shù)據(jù)項。

給出一個發(fā)射多個Observables序列的源Observable,switch()訂閱到源Observable然后開始發(fā)射由第一個發(fā)射的Observable發(fā)射的一樣的數(shù)據(jù)。當源Observable發(fā)射一個新的Observable時,switch()立即取消訂閱前一個發(fā)射數(shù)據(jù)的Observable(因此打斷了從它那里發(fā)射的數(shù)據(jù)流)然后訂閱一個新的Observable,并開始發(fā)射它的數(shù)據(jù)。

Paste_Image.png

結(jié)束語

ok,RxJava之組合操作符已經(jīng)學(xué)習(xí)完啦,當然這里都是分析一些常用的,想了解更多的操作符就去看RxJava官方文檔吧。

下一篇:RxJava 源碼學(xué)習(xí)之調(diào)度器Scheduler。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 創(chuàng)建unfaseCreate(create)創(chuàng)建一個Observable(被觀察者),當被觀察者(Observer...
    chuwe1閱讀 7,150評論 3 8
  • 注:只包含標準包中的操作符,用于個人學(xué)習(xí)及備忘參考博客:http://blog.csdn.net/maplejaw...
    小白要超神閱讀 2,377評論 2 8
  • 版權(quán)聲明:本文為小斑馬偉原創(chuàng)文章,轉(zhuǎn)載請注明出處! 上篇簡單的闡述了響應(yīng)式編程的基本理論。這篇主要對響應(yīng)編程進行詳...
    ZebraWei閱讀 3,270評論 0 2
  • 作者: maplejaw本篇只解析標準包中的操作符。對于擴展包,由于使用率較低,如有需求,請讀者自行查閱文檔。 創(chuàng)...
    maplejaw_閱讀 46,219評論 8 93
  • 下方展示了幾種創(chuàng)建Observable的方法 just() ---將一個或者多個對象轉(zhuǎn)換成發(fā)射這個或這些對象的一個...
    菜鳥_一枚閱讀 319評論 0 0

友情鏈接更多精彩內(nèi)容