RxJava 2.x下篇(操作符)

前言

通過上篇的實(shí)例,相信你對(duì)RxJava的用處肯定有個(gè)基本的了解了。如果還沒看過,我強(qiáng)烈建議你先去看上篇,先對(duì)整體有個(gè)了解,有興趣之后,帶著目的去學(xué)枯燥乏味而且多的操作符。

  • RxJava操作符分類
    RxJava操作符分類.png

看到這一堆的操作符是不是感覺瞬間就不想往下學(xué)了,但是我這里只挑部分我認(rèn)為比較常用的來講下,畢竟這么多要全講的話不僅費(fèi)時(shí)費(fèi)力,而且效果也不太好,先學(xué)會(huì)常用的,其它的也是類似的,到時(shí)用的再去查詢下上手也會(huì)很快的。

  • RxJava三部曲

第一步:初始化 Observable
第二步:初始化 Observer
第三步:通過subscribe建立訂閱關(guān)系

創(chuàng)建操作符

  • 創(chuàng)建被觀察者( Observable) 對(duì)象 & 發(fā)送事件。

1、create()操作符

  • create 操作符應(yīng)該是最常見的操作符了,主要用于產(chǎn)生一個(gè) Obserable 被觀察者對(duì)象。
  • 被觀察者 Observable 也可以稱為發(fā)射器(上游事件),觀察者 Observer 也可以稱為接收器(下游事件)。
    create操作符.png
  • 頁面部分代碼和其它一些基礎(chǔ)部分就不貼了,僅貼出關(guān)鍵代碼和運(yùn)行效果日志。這里做個(gè)說明頁面其實(shí)很簡單就一個(gè)按鈕,按鈕點(diǎn)擊之后就去執(zhí)行貼出的關(guān)鍵部分代碼。
  • 按鈕點(diǎn)擊事件里面的關(guān)鍵部分代碼:
        final String TAG = "RxCreateActivity";
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                Log.e(TAG, "Observable emit 1" + "\n");
                e.onNext(1);
                Log.e(TAG, "Observable emit 2" + "\n");
                e.onNext(2);
                Log.e(TAG, "Observable emit 3" + "\n");
                e.onNext(3);
                e.onComplete();
                Log.e(TAG, "Observable emit 4" + "\n");
                e.onNext(4);
            }
        }).subscribe(new Observer<Integer>() {
            private int i;
            private Disposable mDisposable;

            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.e(TAG, "onSubscribe : " + d.isDisposed() + "\n");
                mDisposable = d;
            }

            @Override
            public void onNext(@NonNull Integer integer) {
                Log.e(TAG, "onNext : value : " + integer + "\n");
                i++;
                if (i == 2) {
                    // 在RxJava 2.x 中,新增的Disposable可以做到切斷的操作,讓Observer觀察者不再接收上游事件
                    mDisposable.dispose();
                    Log.e(TAG, "onNext : isDisposable : " + mDisposable.isDisposed() + "\n");
                }
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.e(TAG, "onError : value : " + e.getMessage() + "\n");
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete" + "\n");
            }
        });
  • 運(yùn)行效果日志:
onSubscribe : false
Observable emit 1
onNext : value : 1
Observable emit 2
onNext : value : 2
onNext : isDisposable : true
Observable emit 3
Observable emit 4
  • 需要注意的幾點(diǎn)是:
  • 并且 2.x 中有一個(gè) Disposable 概念,這個(gè)東西可以直接調(diào)用切斷接收器,可以看到,當(dāng)它的 isDisposed() 返回為 false 的時(shí)候,接收器能正常接收事件,但當(dāng)其為 true 的時(shí)候,接收器停止了接收。所以可以通過此參數(shù)動(dòng)態(tài)控制接收事件了。
  • 在發(fā)射事件中,我們?cè)诎l(fā)射了數(shù)值 3 之后,直接調(diào)用了 e.onComplete(),由于在接收器接收了數(shù)值 2 之后接收器調(diào)用dispose()方法停止接收了,所以本該在接收器中執(zhí)行的onComplete()方法就不會(huì)執(zhí)行了。
  • 調(diào)用e.onComplete()方法之后接收器就不會(huì)再接收事件了,但發(fā)送事件還是繼續(xù)的,通過日志能打印出來發(fā)射數(shù)值3和4我們就可以分析出這一點(diǎn)。如果調(diào)用e.onError(Throwable error)效果也是和onComplete()方法類似的,e.onError(Throwable error)對(duì)應(yīng)執(zhí)行的是接收器中的onError(@NonNull Throwable e)方法。
  • 另外一個(gè)值得注意的點(diǎn)是,在 RxJava 2.x 中,可以看到發(fā)射事件方法相比 1.x 多了一個(gè) throws Excetion拋出異常,意味著我們做一些特定操作再也不用 try-catch 了。

2、just()操作符

  • 簡單的發(fā)射器,快速的創(chuàng)建被觀察者對(duì)象,依次調(diào)用onNext() 方法直接發(fā)送傳入的事件對(duì)象。

注:最多只能發(fā)送10個(gè)參數(shù)

        // 1. 創(chuàng)建時(shí)傳入字符串型1、2、3,
        // 當(dāng)然也可以為其它類型,被觀察者發(fā)射事件對(duì)象聲明為泛型,但是一般同一個(gè)被觀察者事件類型都是一致的
        // 在創(chuàng)建后就會(huì)發(fā)送這些對(duì)象,相當(dāng)于執(zhí)行了onNext("1")、onNext("2")、onNext("3")、onComplete()
        Observable.just("1", "2", "3")
                // 2. 通過通過訂閱(subscribe)連接觀察者和被觀察者
                .subscribe(
                        // 3. 創(chuàng)建觀察者 & 定義響應(yīng)事件的行為
                        // Consumer是RxJava 2.x提供的用于實(shí)現(xiàn)觀察者的簡便式模式
                        new Consumer<String>() {
                            @Override
                            // 每次接收到Observable的onNext()方法發(fā)射的事件都會(huì)調(diào)用Consumer.accept()
                            public void accept(@NonNull String s) throws Exception {
                                Log.e(TAG, "accept : onNext : " + s + "\n");
                            }
                        });
accept : onNext : 1
accept : onNext : 2
accept : onNext : 3

3、fromArray()操作符

  • 數(shù)組數(shù)據(jù)發(fā)射器,直接發(fā)送傳入的數(shù)組元素?cái)?shù)據(jù),會(huì)將數(shù)組中的數(shù)據(jù)轉(zhuǎn)換為Observable對(duì)象
        // 1. 設(shè)置需要傳入的數(shù)組
        Integer[] items = {0, 1, 2, 3, 4};
        // 2. 創(chuàng)建被觀察者對(duì)象(Observable)時(shí)傳入數(shù)組
        // 在創(chuàng)建后就會(huì)將該數(shù)組轉(zhuǎn)換成Observable & 發(fā)送該對(duì)象中的所有數(shù)據(jù)
        Observable.fromArray(items)
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.e(TAG, "開始采用subscribe連接");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.e(TAG, "接收到了事件" + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "對(duì)Error事件作出響應(yīng)");
                    }

                    @Override
                    public void onComplete() {
                        Log.e(TAG, "對(duì)Complete事件作出響應(yīng)");
                    }
                });
開始采用subscribe連接
接收到了事件0
接收到了事件1
接收到了事件2
接收到了事件3
接收到了事件4
對(duì)Complete事件作出響應(yīng)

4、fromIterable()操作符

  • 集合數(shù)據(jù)發(fā)射器,直接發(fā)送傳入的集合List里數(shù)據(jù),會(huì)將集合中的數(shù)據(jù)轉(zhuǎn)換為Observable對(duì)象,與數(shù)組類似,這里就不貼代碼了。

5、timer()操作符

  • 相當(dāng)于一個(gè)定時(shí)任務(wù),延遲指定時(shí)間后,發(fā)送1個(gè)數(shù)值0

注意:默認(rèn)在新線程,也就是子線程

timer操作符.png
        // 注:timer操作符默認(rèn)運(yùn)行在一個(gè)新線程上
        // 也可自定義線程調(diào)度器(第3個(gè)參數(shù)):timer(long,TimeUnit,Scheduler)
        // 該例子 = 延遲2s后,發(fā)送一個(gè)long類型數(shù)值0
        Observable.timer(2, TimeUnit.SECONDS)
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.e(TAG, "開始采用subscribe連接");
                    }
                    @Override
                    public void onNext(Long value) {
                        Log.e(TAG, "接收到了事件" + value);
                    }
                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "對(duì)Error事件作出響應(yīng)");
                    }
                    @Override
                    public void onComplete() {
                        Log.e(TAG, "對(duì)Complete事件作出響應(yīng)");
                    }
                });
11-12 14:24:19.586 E: 開始采用subscribe連接
11-12 14:24:21.589 E: 接收到了事件0
11-12 14:24:21.590 E: 對(duì)Complete事件作出響應(yīng)

6、interval()操作符

  • 每隔指定時(shí)間就發(fā)送事件,其接受三個(gè)參數(shù),分別是第一次發(fā)送延遲,間隔時(shí)間,時(shí)間單位。

注意:
默認(rèn)在新線程,也就是子線程
發(fā)送的事件序列 = 從0開始、無限遞增1的的整數(shù)序列

interval操作符.png
private Disposable mDisposable;

        // 注:interval默認(rèn)在computation調(diào)度器上執(zhí)行
        // 也可自定義指定線程調(diào)度器(第3個(gè)參數(shù)):interval(long,long,TimeUnit,Scheduler)
        /*參數(shù)說明:
          參數(shù)1 = 第1次延遲時(shí)間;
          參數(shù)2 = 間隔時(shí)間數(shù)字;
          參數(shù)3 = 時(shí)間單位;*/
        Observable.interval(3, 1, TimeUnit.SECONDS)
                // 該例子發(fā)送的事件序列特點(diǎn):延遲3s后發(fā)送事件,每隔1秒產(chǎn)生1個(gè)數(shù)字(從0開始遞增1,無限個(gè))
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        mDisposable = d;
                        // 默認(rèn)最先調(diào)用復(fù)寫的 onSubscribe()
                        Log.e(TAG, "開始采用subscribe連接");
                    }

                    @Override
                    public void onNext(Long value) {
                        Log.e(TAG, "接收到了事件" + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "對(duì)Error事件作出響應(yīng)");
                    }

                    @Override
                    public void onComplete() {
                        Log.e(TAG, "對(duì)Complete事件作出響應(yīng)");
                    }
                });
        //使用簡便的Consumer觀察者,返回值Disposable用于停止任務(wù)
        mDisposable = Observable.interval(3, 1, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {

            }
        });
11-12 14:33:22.057 E: 開始采用subscribe連接
11-12 14:33:25.060 E: 接收到了事件0
11-12 14:33:26.060 E: 接收到了事件1
11-12 14:33:27.060 E: 接收到了事件2
11-12 14:33:28.060 E: 接收到了事件3
11-12 14:33:29.060 E: 接收到了事件4
...
  • 由于我們這個(gè)是間隔執(zhí)行,所以當(dāng)我們的Activity 都銷毀的時(shí)候,實(shí)際上這個(gè)操作還依然在進(jìn)行,所以,我們得花點(diǎn)小心思讓我們?cè)诓恍枰臅r(shí)候干掉它。然而,心細(xì)的小伙伴可能會(huì)發(fā)現(xiàn)我們上面代碼里面有一個(gè)mDisposable對(duì)象,其實(shí)這個(gè)對(duì)象就是我留來停止任務(wù)的。
  @Override
    protected void onDestroy() {
        super.onDestroy();
        if (mDisposable != null && !mDisposable.isDisposed()) {
            mDisposable.dispose();
        }
    }

7、defer()操作符

  • 直到有觀察者(Observer )訂閱時(shí),才動(dòng)態(tài)創(chuàng)建被觀察者對(duì)象(Observable) & 發(fā)送事件,這可以確保Observable對(duì)象里的數(shù)據(jù)是最新的。
//1. 第1次對(duì)i賦值
private Integer i = 10;

        // 2. 通過defer 定義被觀察者對(duì)象
        // 注:此時(shí)被觀察者對(duì)象還沒創(chuàng)建
        Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<? extends Integer>>() {
            @Override
            public ObservableSource<? extends Integer> call() throws Exception {
                return Observable.just(i);
            }
        });
        //2. 第2次對(duì)i賦值
        i = 15;
        //3. 觀察者開始訂閱
        // 注:此時(shí),才會(huì)調(diào)用defer()創(chuàng)建被觀察者對(duì)象(Observable)
        observable.subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "接收到的整數(shù)是" + integer);
            }
        });
  • 因?yàn)槭窃谟嗛啎r(shí)才創(chuàng)建,所以i值會(huì)取第2次的賦值
11-12 15:06:38.161 E: 接收到的整數(shù)是15

變換操作符

  • 對(duì)事件序列中的事件 / 整個(gè)事件序列進(jìn)行加工處理(即變換),使得其轉(zhuǎn)變成不同的事件 / 整個(gè)事件序列。

1、map()操作符

  • 對(duì)被觀察者發(fā)送的每1個(gè)事件都通過指定的函數(shù)處理,從而變換成另外一種事件;即, 將被觀察者發(fā)送的事件轉(zhuǎn)換為任意的類型事件。


    map操作符.png
Observable.just(1, 2, 3).map(new Function<Integer, String>() {
            @Override
            public String apply(@NonNull Integer integer) throws Exception {
                return "This is result " + integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                Log.e(TAG, "accept : " + s + "\n");
            }
        });
  • map 基本作用就是將一個(gè) Observable 通過某種函數(shù)關(guān)系,轉(zhuǎn)換為另一種 Observable,上面例子中就是把我們的 Integer 數(shù)據(jù)變成了 String 類型。從下面Log日志顯而易見。
E: accept : This is result 1
E: accept : This is result 2
E: accept : This is result 3

2、flatMap()操作符

  • 將被觀察者發(fā)送的事件序列中的每個(gè)事件進(jìn)行拆分,拆分成多個(gè)事件序列;之后對(duì)拆分出來的事件進(jìn)行單獨(dú)轉(zhuǎn)換,轉(zhuǎn)換完成后再將事件無序的合并成一個(gè)新的事件序列,最后再進(jìn)行發(fā)送。
        // 采用RxJava基于事件流的鏈?zhǔn)讲僮?,也就是:?duì)象.XXX()方法
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            }
        })
                // 采用flatMap()變換操作符
                .flatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(Integer integer) throws Exception {
                        final List<String> list = new ArrayList<>();
                        for (int i = 0; i < 3; i++) {
                            list.add("我是事件 " + integer + "拆分后的子事件" + i);
                            // 通過flatMap中將被觀察者生產(chǎn)的事件序列先進(jìn)行拆分,再將每個(gè)事件轉(zhuǎn)換為一個(gè)新的發(fā)送三個(gè)String事件的事件序列
                            // 最終合并,再發(fā)送給觀察者
                        }
                        //合并之后發(fā)射事件加個(gè)延時(shí),不然執(zhí)行速度太快可能很難出現(xiàn)無序的現(xiàn)象
                        int delayTime = (int) (1 + Math.random() * 10);
                        return Observable.fromIterable(list).delay(delayTime, TimeUnit.MILLISECONDS);
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e(TAG, s);
                    }
                });
  • 新合并生成的事件序列順序是無序的,即與舊序列發(fā)送事件的順序無關(guān)。很可能你的得到的運(yùn)行得到的結(jié)果事件還是3個(gè)為一組的出現(xiàn)(也就是下面第一行日志和第四行替換的結(jié)果),但是事件不按組出現(xiàn)情況也是存在的,就如下面的日志就是我執(zhí)行出來不按組出現(xiàn)的運(yùn)行結(jié)果,只是相對(duì)來講會(huì)難復(fù)現(xiàn)一點(diǎn)。所以你只要記住新合并生成的事件序列是無序就行了。
我是事件 1拆分后的子事件0
我是事件 3拆分后的子事件0
我是事件 3拆分后的子事件1
我是事件 3拆分后的子事件2
我是事件 1拆分后的子事件1
我是事件 1拆分后的子事件2
我是事件 2拆分后的子事件0
我是事件 2拆分后的子事件1
我是事件 2拆分后的子事件2

3、concatMap()操作符

  • concatMap 與 FlatMap 的唯一區(qū)別就是 concatMap 保證了順序,所以,我們就直接把 flatMap 替換為
    concatMap 驗(yàn)證吧。代碼就不貼了,直接將上面 flatMap 的關(guān)鍵代碼中的flatMap替換為concatMap就好了。下面我們貼下運(yùn)行結(jié)果:
我是事件 1拆分后的子事件0
我是事件 1拆分后的子事件1
我是事件 1拆分后的子事件2
我是事件 2拆分后的子事件0
我是事件 2拆分后的子事件1
我是事件 2拆分后的子事件2
我是事件 3拆分后的子事件0
我是事件 3拆分后的子事件1
我是事件 3拆分后的子事件2

新合并生成的事件序列順序是有序的,即嚴(yán)格按照舊序列發(fā)送事件的順序,可能有時(shí)候這樣打印出來日志結(jié)果還是無序的,這個(gè)原因是因?yàn)槲覀兗恿搜訒r(shí)導(dǎo)致日志打印順序變了導(dǎo)致的,忽略就好了。這里就是記住是有序的就好了。

4、buffer()操作符

  • 定期從被觀察者(Obervable)需要發(fā)送的事件中獲取一定數(shù)量的事件 & 放到緩存區(qū)中,最終發(fā)送。
        Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
                /*參數(shù)解釋:
                  count:緩存區(qū)大小 = 每次從被觀察者中獲取的事件數(shù)量
                  skip:步長 = 每次獲取新事件時(shí)跳過事件的數(shù)量*/
                .buffer(3, 4) //在事件足夠的時(shí)候每次取count個(gè)值,每次跳過skip個(gè)事件
                .subscribe(new Consumer<List<Integer>>() {
                    @Override
                    public void accept(@NonNull List<Integer> integers) throws Exception {
                        Log.e(TAG, "buffer size : " + integers.size() + "\n");
                        Log.e(TAG, "buffer value : ");
                        for (Integer i : integers) {
                            Log.e(TAG, i + "");
                        }
                        Log.e(TAG, "\n");
                    }
                });
  • 看了下面日志結(jié)果,相信聰明的你肯定也知道buffer操作符的用法了。count參數(shù)還是比較好理解的,skip可能有些人會(huì)一時(shí)反應(yīng)不過來,這里簡單說下,skip其實(shí)就是每組之間發(fā)射的間隔,這里再結(jié)合上面實(shí)例說明下就很清晰了。就拿發(fā)射第二次事件的第一個(gè)value值怎么得來來說好了,這個(gè)值就是第一次事件的第一個(gè)value1+步長4得來的,1+4=5就是這么簡單。
buffer size : 3
buffer value : 
1
2
3
buffer size : 3
buffer value : 
5
6
7
buffer size : 2
buffer value : 
9
10

組合/合并操作符

  • 組合多個(gè)被觀察者(Observable) & 合并需要發(fā)送的事件

組合被觀察者數(shù)量≤4個(gè),如果要組合大于4個(gè)的觀察者用concatArray()操作符,用法是一樣的這里對(duì)concatArray操作符就略過了

1、concat()操作符

  • 把兩個(gè)發(fā)射器連接成一個(gè)發(fā)射器,發(fā)射器合并后按發(fā)送順序串行執(zhí)行。


    concat操作符.png
Observable.concat(Observable.just(1,2,3), Observable.just(4,5,6))
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        Log.e(TAG, "concat : "+ integer + "\n" );
                    }
                });
concat : 1
concat : 2
concat : 3
concat : 4
concat : 5
concat : 6
  • 從日志可以看出,發(fā)射器 B 把自己的三個(gè)孩子送給了發(fā)射器 A,讓他們組合成了一個(gè)新的發(fā)射器。

2、merge()操作符

  • 組合多個(gè)被觀察者一起發(fā)送數(shù)據(jù),合并后按時(shí)間線并行執(zhí)行。
  • 區(qū)別上述concat()操作符:同樣是組合多個(gè)被觀察者一起發(fā)送數(shù)據(jù),但concat()操作符合并后是按發(fā)送順序串行執(zhí)行,而merge操作符是按時(shí)間線并行執(zhí)行的。


    merge.png
        // merge():組合多個(gè)被觀察者(<4個(gè))一起發(fā)送數(shù)據(jù)
        // 注:合并后按照時(shí)間線并行執(zhí)行
        Observable.merge(
                /*intervalRange操作符參數(shù):
                 * long start:發(fā)射事件起始值
                 * long count:發(fā)射事件數(shù)量
                 * long initialDelay:初始發(fā)射延時(shí)
                 * long period:反射周期
                 * */
                // 從0開始發(fā)送、共發(fā)送3個(gè)數(shù)據(jù)、第1次事件延遲發(fā)送時(shí)間 = 1s、間隔時(shí)間 = 1s
                Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS),
                // 從2開始發(fā)送、共發(fā)送3個(gè)數(shù)據(jù)、第1次事件延遲發(fā)送時(shí)間 = 1s、間隔時(shí)間 = 1s
                Observable.intervalRange(2, 3, 1, 1, TimeUnit.SECONDS))
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Long value) {
                        Log.d(TAG, "接收到了事件" + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
                    }
                });

日志輸出結(jié)果:兩個(gè)被觀察者發(fā)送事件并行執(zhí)行,輸出結(jié)果 = 0,2 -> 1,3 -> 2,4


merge操作符.gif

3、zip()操作符

  • 合并多個(gè)被觀察者(Observable)發(fā)送的事件,生成一個(gè)新的事件序列(即組合過后的事件序列),并最終發(fā)送。
  • 被觀察者之間兩兩配對(duì),也就意味著,最終配對(duì)出的 Observable 發(fā)射事件數(shù)目只和少的那個(gè)相同。
        //需要被合并的第一個(gè)被觀察者:發(fā)射字符串類型事件
        Observable<String> stringObservable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
                if (!e.isDisposed()) {
                    e.onNext("A");
                    Log.e(TAG, "String emit : A \n");
                    e.onNext("B");
                    Log.e(TAG, "String emit : B \n");
                    e.onNext("C");
                    Log.e(TAG, "String emit : C \n");
                }
            }
        });

        //需要被合并的第二個(gè)被觀察者:發(fā)射整型事件
        Observable<Integer> integerObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                if (!e.isDisposed()) {
                    e.onNext(1);
                    Log.e(TAG, "Integer emit : 1 \n");
                    e.onNext(2);
                    Log.e(TAG, "Integer emit : 2 \n");
                    e.onNext(3);
                    Log.e(TAG, "Integer emit : 3 \n");
                    e.onNext(4);
                    Log.e(TAG, "Integer emit : 4 \n");
                    e.onNext(5);
                    Log.e(TAG, "Integer emit : 5 \n");
                }
            }
        });
        
        Observable.zip(stringObservable, integerObservable, new BiFunction<String, Integer, String>() {
            @Override
            public String apply(@NonNull String s, @NonNull Integer integer) throws Exception {
                return s + integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                Log.e(TAG, "zip : accept : " + s + "\n");
            }
        });
String emit : A 
String emit : B 
String emit : C 
zip : accept : A1
Integer emit : 1 
zip : accept : B2
Integer emit : 2 
zip : accept : C3
Integer emit : 3 
Integer emit : 4 
Integer emit : 5 
  • zip 組合事件的過程就是分別從發(fā)射器 A 和發(fā)射器 B 各取出一個(gè)事件來組合,并且一個(gè)事件只能被使用一次,組合的順序是嚴(yán)格按照事件發(fā)送的順序來進(jìn)行的,所以上面截圖中,可以看到,1 永遠(yuǎn)是和 A 結(jié)合的,2 永遠(yuǎn)是和 B 結(jié)合的。
  • 最終接收器收到的事件數(shù)量是和發(fā)送器發(fā)送事件最少的那個(gè)發(fā)送器的發(fā)送事件數(shù)目相同,上面就是stringObservable發(fā)射器。

4、reduce()操作符

  • 把被觀察者需要發(fā)送的事件聚合成1個(gè)事件 & 發(fā)送
  • 聚合的邏輯根據(jù)需求撰寫,但本質(zhì)都是前2個(gè)數(shù)據(jù)聚合,然后與后1個(gè)數(shù)據(jù)繼續(xù)進(jìn)行聚合,依次類推


    reduce.png
Observable.just(1, 2, 3, 4)
                .reduce(new BiFunction<Integer, Integer, Integer>() {
                    // 在該復(fù)寫方法中復(fù)寫聚合的邏輯
                    @Override
                    public Integer apply(@NonNull Integer s1, @NonNull Integer s2) throws Exception {
                        Log.e(TAG, "本次計(jì)算的數(shù)據(jù)是:" + s1 + " 乘 " + s2);
                        return s1 * s2;
                        // 本次聚合的邏輯是:全部數(shù)據(jù)相乘起來
                        // 原理:第1次取前2個(gè)數(shù)據(jù)相乘,之后每次獲取到的數(shù)據(jù) = 返回的數(shù)據(jù) * 原始下1個(gè)數(shù)據(jù)每
                    }
                }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(@NonNull Integer s) throws Exception {
                Log.e(TAG, "最終計(jì)算的結(jié)果是: " + s);
            }
        });
本次計(jì)算的數(shù)據(jù)是:1 乘 2
本次計(jì)算的數(shù)據(jù)是:2 乘 3
本次計(jì)算的數(shù)據(jù)是:6 乘 4
最終計(jì)算的結(jié)果是: 24

功能操作符

  • 輔助被觀察者(Observable)在發(fā)送事件時(shí)實(shí)現(xiàn)一些功能性需求如錯(cuò)誤處理、線程調(diào)度等等。

1、subscribe()操作符

  • 訂閱,即連接觀察者 & 被觀察者。Observable只是生產(chǎn)事件,真正的發(fā)送事件是在它被訂閱的時(shí)候,即當(dāng) subscribe() 方法執(zhí)行時(shí)。subscribe操作符上面已經(jīng)用過很多次了,相信大家都已經(jīng)知道它的用法和作用了,這里就不再啰嗦了。

2、subscribeOn()操作符

  • 用于指定被觀察者的工作線程類型,若Observable.subscribeOn()多次指定被觀察者生產(chǎn)事件的線程,則只有第一次指定有效,其余的指定線程無效。
  • RxJava中,內(nèi)置的線程類型:
類型 含義 應(yīng)用場景
Schedulers.immediate() 當(dāng)前線程 = 不指定線程 默認(rèn)
AndroidSchedulers.mainThread() Android主線程 操作UI
Schedulers.newThread() 常規(guī)新線程 耗時(shí)等操作
Schedulers.io() io操作線程 網(wǎng)絡(luò)請(qǐng)求、讀寫文件等io密集型操作
Schedulers.computation() CPU計(jì)算操作線程 大量計(jì)算操作

3、observeOn()操作符

  • 用于指定觀察者的工作線程類型,若Observable.observeOn()多次指定觀察者接收 & 響應(yīng)事件的線程,則每次指定均有效,即每指定一次,就會(huì)進(jìn)行一次線程的切換,下游的線程就會(huì)切換到指定線程。
Observable.just(1, 2, 3).subscribeOn(Schedulers.newThread()) //指定被觀察者工作線程為子線程
                .doOnNext(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception { //這里沒指定線程會(huì)延續(xù)被觀察者的子線程

                    }
                })
                .observeOn(Schedulers.io()) //指定觀察者工作線程為io線程
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(Integer integer) throws Exception { //io線程
                        return null;
                    }
                })
                .observeOn(AndroidSchedulers.mainThread()) //切換到UI線程
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception { //UI線程

                    }
                });

4、delay()操作符

  • 使得被觀察者延遲一段時(shí)間再發(fā)送事件
  • delay()具備多個(gè)重載方法,具體如下:
// 1. 指定延遲時(shí)間
// 參數(shù)1 = 時(shí)間;參數(shù)2 = 時(shí)間單位
delay(long delay,TimeUnit unit)

// 2. 指定延遲時(shí)間 & 調(diào)度器
// 參數(shù)1 = 時(shí)間;參數(shù)2 = 時(shí)間單位;參數(shù)3 = 線程調(diào)度器
delay(long delay,TimeUnit unit,mScheduler scheduler)

// 3. 指定延遲時(shí)間  & 錯(cuò)誤延遲
// 錯(cuò)誤延遲,即:若存在Error事件,則如常執(zhí)行,執(zhí)行后再拋出錯(cuò)誤異常
// 參數(shù)1 = 時(shí)間;參數(shù)2 = 時(shí)間單位;參數(shù)3 = 錯(cuò)誤延遲參數(shù)
delay(long delay,TimeUnit unit,boolean delayError)

// 4. 指定延遲時(shí)間 & 調(diào)度器 & 錯(cuò)誤延遲
// 參數(shù)1 = 時(shí)間;參數(shù)2 = 時(shí)間單位;參數(shù)3 = 線程調(diào)度器;參數(shù)4 = 錯(cuò)誤延遲參數(shù)
delay(long delay,TimeUnit unit,mScheduler scheduler,boolean delayError): 指定延遲多長時(shí)間并添加調(diào)度器,錯(cuò)誤通知可以設(shè)置是否延遲
  • 具體使用
Observable.just(1, 2, 3)
                .delay(3, TimeUnit.SECONDS) // 延遲3s再發(fā)送(onNext(1)、onNext(2)、onNext(3)),其它重載方法由于使用類似,所以此處不作全部展示
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {

                    }
                });

5、retry()操作符

  • 重試,即當(dāng)出現(xiàn)錯(cuò)誤時(shí),讓被觀察者(Observable)重新發(fā)射數(shù)據(jù)
  1. 接收到 onError()時(shí),重新訂閱 & 發(fā)送事件
  2. Throwable 和 Exception都可攔截
  • 共有5種重載方法
<-- 1. retry() -->
// 作用:出現(xiàn)錯(cuò)誤時(shí),讓被觀察者重新發(fā)送數(shù)據(jù)
// 注:若一直錯(cuò)誤,則一直重新發(fā)送

<-- 2. retry(long time) -->
// 作用:出現(xiàn)錯(cuò)誤時(shí),讓被觀察者重新發(fā)送數(shù)據(jù)(具備重試次數(shù)限制
// 參數(shù) = 重試次數(shù)
 
<-- 3. retry(Predicate predicate) -->
// 作用:出現(xiàn)錯(cuò)誤后,判斷是否需要重新發(fā)送數(shù)據(jù)(若需要重新發(fā)送& 持續(xù)遇到錯(cuò)誤,則持續(xù)重試)
// 參數(shù) = 判斷邏輯

<--  4. retry(new BiPredicate<Integer, Throwable>) -->
// 作用:出現(xiàn)錯(cuò)誤后,判斷是否需要重新發(fā)送數(shù)據(jù)(若需要重新發(fā)送 & 持續(xù)遇到錯(cuò)誤,則持續(xù)重試
// 參數(shù) =  判斷邏輯(傳入當(dāng)前重試次數(shù) & 異常錯(cuò)誤信息)

<-- 5. retry(long time,Predicate predicate) -->
// 作用:出現(xiàn)錯(cuò)誤后,判斷是否需要重新發(fā)送數(shù)據(jù)(具備重試次數(shù)限制
// 參數(shù) = 設(shè)置重試次數(shù) & 判斷邏輯
  • 具體使用,重載方法較多,這里就舉個(gè)最簡單的例子,其它的就自行擴(kuò)展了
//retry()
        // 作用:出現(xiàn)錯(cuò)誤時(shí),讓被觀察者重新發(fā)送數(shù)據(jù)
        // 注:若一直錯(cuò)誤,則一直重新發(fā)送
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onError(new Exception("發(fā)生錯(cuò)誤了"));
                e.onNext(3);
            }
        })
                .retry() // 遇到錯(cuò)誤時(shí),讓被觀察者重新發(fā)射數(shù)據(jù)(若一直錯(cuò)誤,則一直重新發(fā)送
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.e(TAG, "接收到了事件" + value);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "對(duì)Error事件作出響應(yīng)");
                    }

                    @Override
                    public void onComplete() {
                        Log.e(TAG, "對(duì)Complete事件作出響應(yīng)");
                    }
                });
  • 運(yùn)行日志輸出如下,會(huì)一直不停的循環(huán)打印
接收到了事件1
接收到了事件2
接收到了事件1
接收到了事件2
...

過濾操作符

1、take()操作符

  • 指定觀察者最多能接收到的事件數(shù)量
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                // 1. 發(fā)送5個(gè)事件
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onNext(4);
                emitter.onNext(5);
            }

            // 采用take()變換操作符
            // 指定了觀察者只能接收2個(gè)事件
        }).take(2)
        .subscribe(new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                Log.e(TAG, "開始采用subscribe連接");
            }

            @Override
            public void onNext(Integer value) {
                Log.e(TAG, "過濾后得到的事件是:"+ value  );
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "對(duì)Error事件作出響應(yīng)");
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "對(duì)Complete事件作出響應(yīng)");
            }
        });

// 實(shí)際上,可理解為:被觀察者還是發(fā)送了5個(gè)事件,只是因?yàn)椴僮鞣拇嬖跀r截了3個(gè)事件,最終觀察者接收到的是2個(gè)事件
開始采用subscribe連接
過濾后得到的事件是:1
過濾后得到的事件是:2
對(duì)Complete事件作出響應(yīng)

條件/布爾操作符

背壓

  • 背壓是指在異步場景中,被觀察者發(fā)送事件速度遠(yuǎn)快于觀察者的處理速度的情況下,一種告訴上游的被觀察者降低發(fā)送速度的策略。
  • Observeable是不支持背壓的的被觀察者,F(xiàn)lowable是支持背壓的被觀察者,還有相應(yīng)與之對(duì)應(yīng)的觀察者如下圖:


    image.png

結(jié)語

最近的博客可能都會(huì)有些顯得匆忙,里面有些部分甚至都沒有寫完(之后有空會(huì)來完善)。這當(dāng)中主要原因是自己為了達(dá)成今年共50篇博客的目標(biāo),加之有些博客的知識(shí)點(diǎn)確實(shí)太多了,要寫全的話時(shí)間就來不及了,所以就先大體寫下,缺的那點(diǎn)其實(shí)是無傷大雅的,你要認(rèn)真看下來收獲還是挺大的。

感謝

Carson_Ho的一系列關(guān)于RxJava的文章
這可能是最好的 RxJava 2.x 入門教程(完結(jié)版)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 本篇文章介主要紹RxJava中操作符是以函數(shù)作為基本單位,與響應(yīng)式編程作為結(jié)合使用的,對(duì)什么是操作、操作符都有哪些...
    嘎啦果安卓獸閱讀 2,976評(píng)論 0 10
  • 一、RxJava操作符概述 RxJava中的操作符就是為了提供函數(shù)式的特性,函數(shù)式最大的好處就是使得數(shù)據(jù)處理簡潔易...
    BrotherChen閱讀 1,779評(píng)論 0 10
  • 一、RxJava操作符概述 RxJava中的操作符就是為了提供函數(shù)式的特性,函數(shù)式最大的好處就是使得數(shù)據(jù)處理簡潔易...
    無求_95dd閱讀 3,487評(píng)論 0 21
  • 一、RxJava操作符概述 RxJava中的操作符就是為了提供函數(shù)式的特性,函數(shù)式最大的好處就是使得數(shù)據(jù)處理簡潔易...
    測(cè)天測(cè)地測(cè)空氣閱讀 683評(píng)論 0 1
  • 注:只包含標(biāo)準(zhǔn)包中的操作符,用于個(gè)人學(xué)習(xí)及備忘參考博客:http://blog.csdn.net/maplejaw...
    小白要超神閱讀 2,366評(píng)論 2 8

友情鏈接更多精彩內(nèi)容