RxJava常用操作

一、 拆分使用

先創(chuàng)建被觀察者和觀察者,然后建立訂閱關(guān)系,這樣在觀察者中就會(huì)接收到個(gè)生命周期的回調(diào):

    @Test
    public void test(){
        //1. 創(chuàng)建被觀察者
        Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                // 發(fā)送消息
                e.onNext(1);
                e.onNext(2);
                e.onComplete();
            }
        });

        //2. 創(chuàng)建觀察者
        Observer<Integer> observer = new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("建立訂閱關(guān)系");
            }

            @Override
            public void onNext(Integer integer) {
                //接受到消息
                System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {
                System.out.println("完成");
            }
        };

        //3. 建立訂閱關(guān)系
        observable.subscribe(observer);
    }

運(yùn)行結(jié)果:

建立訂閱關(guān)系
1
2
完成

二、 鏈?zhǔn)秸{(diào)用(一般都是這種寫(xiě)法):

    @Test
    public void test2() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onComplete();
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("建立訂閱關(guān)系");
            }

            @Override
            public void onNext(Integer integer) {
                //接受到消息
                System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {
                System.out.println("完成");
            }
        });
    }

三、更簡(jiǎn)單的觀察者

    @Test
    public void test3() {
        Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
            }
        }).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println(integer);
            }
        });
    }

Consumer相對(duì)Observer簡(jiǎn)化了很多,沒(méi)有了onSubscribe() onError () onComplete (),當(dāng)然也無(wú)法對(duì)這些進(jìn)行監(jiān)聽(tīng)了。

四、創(chuàng)建操作符

上面用的creat是創(chuàng)建被觀察者的一種操作符,另外常用的還有just、justArratrange、empty,直接看運(yùn)行結(jié)果去理解就好了。
empty這里說(shuō)下,這個(gè)使用場(chǎng)景比如一個(gè)耗時(shí)操作不要任何數(shù)據(jù)反饋去更新UI,只是顯示和隱藏加載動(dòng)畫(huà)。(先不用去糾結(jié)耗時(shí)操作在哪里添加)

    @Test
    public void test4() {
        System.out.println("-----------------just");
        Observable.just(1, 2, 3).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println(integer);
            }
        });

        System.out.println("-----------------fromArray");
        Observable.fromArray(new Integer[]{1,2,3}).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println(integer);
            }
        });

        System.out.println("-----------------range");
        Observable.range(0, 3).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println(integer);
            }
        });

        System.out.println("-----------------empty");
        Observable.empty().subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("建立訂閱關(guān)系");
            }

            @Override
            public void onNext(Object object) {
                //接受到消息
                System.out.println(object);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {
                System.out.println("完成");
            }
        });

運(yùn)行結(jié)果:

-----------------just
1
2
3
-----------------fromArray
1
2
3
-----------------range
0
1
2
-----------------empty
建立訂閱關(guān)系
完成

五、合并操作符

合并操作是指合并被觀察者,用同一個(gè)觀察者去接受,常用的有concatWith、startWith、concatmerge、zip,這里為了顯示出合并的區(qū)別,用了另一個(gè)創(chuàng)建創(chuàng)建操作符intervalRange,比如Observable.intervalRange(0, 10, 0, 1, TimeUnit.SECONDS),這個(gè)代表從0開(kāi)始發(fā)送10個(gè)數(shù),延遲0秒后開(kāi)始執(zhí)行,每1秒發(fā)送一次。
用這兩個(gè)被觀察者測(cè)試上面幾個(gè)合并操作符:

    //發(fā)送0-4
    Observable observable1 = Observable.intervalRange(0, 5, 0, 1, TimeUnit.SECONDS);
    //發(fā)送10-14
    Observable observable2 = Observable.intervalRange(10, 5, 0, 1, TimeUnit.SECONDS);

測(cè)試函數(shù):

    private void concatWith() {
        Log.e(TAG, "-----------------concatWith");
        observable1.concatWith(observable2).subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.e(TAG, "concatWith: " + aLong);
                    }
                });
    }

    private void startWith() {
        Log.e(TAG, "-----------------startWith");
        observable1.startWith(observable2).subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.e(TAG, "startWith: " + aLong);
                    }
                });
    }

    public void concat() {
        Log.e(TAG, "-----------------concat");
        Observable.concat(observable1,observable2).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                Log.e(TAG, "concat: " + aLong);
            }
        });
    }

    public void merge() {
        Log.e(TAG, "-----------------merge");
        Observable.merge(observable1,observable2).subscribe(new Consumer<Long>() {
            @Override
            public void accept(Long aLong) throws Exception {
                Log.e(TAG, "merge: " + aLong);
            }
        });
    }

運(yùn)行結(jié)果:

E/MainActivity: -----------------concatWith
E/MainActivity: accept: 0
E/MainActivity: accept: 1
E/MainActivity: accept: 2
E/MainActivity: accept: 3
E/MainActivity: accept: 4
E/MainActivity: accept: 10
E/MainActivity: accept: 11
E/MainActivity: accept: 12
E/MainActivity: accept: 13
E/MainActivity: accept: 14
E/MainActivity: -----------------startWith
E/MainActivity: accept: 10
E/MainActivity: accept: 11
E/MainActivity: accept: 12
E/MainActivity: accept: 13
E/MainActivity: accept: 14
E/MainActivity: accept: 0
E/MainActivity: accept: 1
E/MainActivity: accept: 2
E/MainActivity: accept: 3
E/MainActivity: accept: 4
E/MainActivity: -----------------concat
E/MainActivity: accept: 0
E/MainActivity: accept: 1
E/MainActivity: accept: 2
E/MainActivity: accept: 3
E/MainActivity: accept: 4
E/MainActivity: accept: 10
E/MainActivity: accept: 11
E/MainActivity: accept: 12
E/MainActivity: accept: 13
E/MainActivity: accept: 14
E/MainActivity: -----------------merge
E/MainActivity: -----------------merge
E/MainActivity: accept: 10
E/MainActivity: accept: 0
E/MainActivity: accept: 1
E/MainActivity: accept: 11
E/MainActivity: accept: 2
E/MainActivity: accept: 12
E/MainActivity: accept: 3
E/MainActivity: accept: 13
E/MainActivity: accept: 14
E/MainActivity: accept: 4

根據(jù)上面結(jié)果總結(jié):

  1. concatWithstartWith是執(zhí)行的先后順序不一樣,是同步執(zhí)行的
  2. concatWithconcat都是順序執(zhí)行,只是寫(xiě)法不一樣
  3. concatmerge寫(xiě)法一樣,但是merge是異步的,兩個(gè)被觀察者沒(méi)有先后順序,各自執(zhí)行。

還有一種zip操作符,把被觀察者合并時(shí)一一對(duì)應(yīng),直接看使用方式:

    private void zip() {
        Observable observable1 = Observable.just("語(yǔ)文", "數(shù)學(xué)", "英語(yǔ)");
        Observable observable2 = Observable.just("100", "80", "60");
        Observable.zip(observable1, observable2, new BiFunction() {
            @Override
            public Object apply(Object o, Object o2) throws Exception {
                return o.toString() + ":" + o2.toString();
            }
        })
                .subscribe(new Consumer() {
                    @Override
                    public void accept(Object o) throws Exception {
                        Log.e(TAG, "accept: " + o);
                    }
                });
    }

運(yùn)行結(jié)果:

E/MainActivity: accept: 語(yǔ)文:100
E/MainActivity: accept: 數(shù)學(xué):80
E/MainActivity: accept: 英語(yǔ):60

六、變換操作符

常見(jiàn)的有map、concatMapflatMap、groupBy、buffer
先通過(guò)最簡(jiǎn)單的map來(lái)看看變換操作符是干什么的

    private void map() {
        Log.e(TAG, "-----------------map");
        Observable.just(1)
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(Integer integer) throws Exception {
                        return "轉(zhuǎn)化為String" + integer;
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String string) throws Exception {
                        Log.e(TAG, "merge: " + string);
                    }
                });
    }

運(yùn)行結(jié)果:

E/MainActivity: -----------------map
E/MainActivity: accept: 轉(zhuǎn)化為String1

也就是說(shuō)map里可以把被觀察者傳遞過(guò)來(lái)的數(shù)據(jù)轉(zhuǎn)換成另一種數(shù)據(jù)格式傳遞給觀察者,這里是Integer轉(zhuǎn)String,比如你也可以被觀察者傳遞過(guò)來(lái)一個(gè)URL,在Function直接網(wǎng)絡(luò)請(qǐng)求,轉(zhuǎn)化成請(qǐng)求結(jié)果給觀察者。
扯多了,繼續(xù)看上面的操作符flatMap

    private void flatMap() {
        Observable.just(1)
                .flatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(final Integer integer) throws Exception {
                        return Observable.create(new ObservableOnSubscribe<String>() {
                            @Override
                            public void subscribe(ObservableEmitter<String> e) throws Exception {
                                e.onNext("轉(zhuǎn)化為String" + integer);
                                e.onNext("我還可以再發(fā)送" + integer);
                                e.onNext("我還可以隨便發(fā)送" + integer);
                            }
                        });
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String string) throws Exception {
                        Log.e(TAG,  "accept: " + string);
                    }
                });
    }

運(yùn)行結(jié)果:

E/MainActivity: accept: 轉(zhuǎn)化為String1
E/MainActivity: accept: 我還可以再發(fā)送1
E/MainActivity: accept: 我還可以隨便發(fā)送1

這個(gè)相對(duì)map更靈活,map是的Function里直接返回的是轉(zhuǎn)換之后的數(shù)據(jù),一對(duì)一的,而flatMap的Function返回的是另一個(gè)被觀察者,所以這個(gè)可以在里面隨意發(fā)送給觀察者。
在用concatMap之前先看flatMap的另一種操作:

    private void flatMap2() {
        Observable.just(1, 2, 3)
                .flatMap(new Function<Integer, ObservableSource<String>>() {
                    @Override
                    public ObservableSource<String> apply(Integer integer) throws Exception {
                        List<String> list = new ArrayList<>();
                        for (int i = 0; i < 3; i++) {
                            list.add(integer + "." + (1 + i));
                        }
                        return Observable.fromIterable(list).delay(1, TimeUnit.SECONDS);
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String string) throws Exception {
                        Log.e(TAG, "accept: " + string);
                    }
                });
    }

在收到被觀察者發(fā)來(lái)的數(shù)據(jù)后,生產(chǎn)一個(gè)List再延遲1s發(fā)送給觀察者,看下運(yùn)行結(jié)果:

E/MainActivity: accept: 2.1
E/MainActivity: accept: 2.2
E/MainActivity: accept: 2.3
E/MainActivity: accept: 1.1
E/MainActivity: accept: 3.1
E/MainActivity: accept: 3.2
E/MainActivity: accept: 3.3
E/MainActivity: accept: 1.2
E/MainActivity: accept: 1.3

每個(gè)都是先.1再.2再.3沒(méi)錯(cuò),但是整體并沒(méi)有按照1、2、3順序執(zhí)行,說(shuō)明他們是異步執(zhí)行的,類似合并操作符中的merge(其實(shí)內(nèi)部調(diào)用的就是merge)??赐赀@個(gè)問(wèn)題,就可以猜到concatMap的作用了,就不貼了,是完全按順序同步輸出的。
然后來(lái)groupBy操作符:

    private void group() {
        Observable.just(20, 40, 60, 80, 100)
                .groupBy(new Function<Integer, String>() {
                    @Override
                    public String apply(Integer integer) throws Exception {
                        return integer >= 60 ? "及格" : "不及格";
                    }
                })
                .subscribe(new Consumer<GroupedObservable<String, Integer>>() {
                    @Override
                    public void accept(final GroupedObservable<String, Integer> stringIntegerGroupedObservable) throws Exception {
                        stringIntegerGroupedObservable.subscribe(new Consumer<Integer>() {
                            @Override
                            public void accept(Integer integer) throws Exception {
                                Log.e(TAG, "accept: " + integer + ":" + stringIntegerGroupedObservable.getKey());
                            }
                        });
                    }
                });
    }

輸出結(jié)果:

E/MainActivity: accept: 20:不及格
E/MainActivity: accept: 40:不及格
E/MainActivity: accept: 60:及格
E/MainActivity: accept: 80:及格
E/MainActivity: accept: 100:及格

buffer操作符:

    private void buffer() {
        Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                        for (int i = 0; i < 100; i++) {
                            e.onNext(i);
                        }
                        e.onComplete();
                    }
                })
                .buffer(20)
                .subscribe(new Consumer<List<Integer>>() {
                    @Override
                    public void accept(List<Integer> integer) throws Exception {
                        Log.d(TAG, "accept: " + integer);
                    }
                });
    }

運(yùn)行結(jié)果:

E/MainActivity: accept: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
E/MainActivity: accept: [20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39]
E/MainActivity: accept: [40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59]
E/MainActivity: accept: [60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79]
E/MainActivity: accept: [80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]

這兩種沒(méi)什么特殊的,groupBy是按條件分組, buffer是分批發(fā)送。

七、過(guò)濾操作符

filter、take、distinct、elementAl

    //條件篩選,輸出B、C
    public void filter() {
        Observable.just("A", "B", "C")
                .filter(new Predicate<String>() {
                    @Override
                    public boolean test(String s) throws Exception {
                        if ("A".equals(s)) {
                            return false;
                        }

                        return true;
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e(TAG, "accept: " + s);
                    }
                });
    }



    //用于停止定時(shí)器,輸出0、1、2、3、4
    public void take() {
        Observable.interval(1, TimeUnit.SECONDS)
                .take(5)// 5次之后停下
                .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Log.e(TAG, "accept: " + aLong);
                    }
                });

    }

    //過(guò)濾重復(fù),輸出1、2、3
    public void distinct() {
        Observable.just(1,1,2,3,3)
                .distinct()
                .subscribe(new Consumer<Integer>() { // 下游 觀察者
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.d(TAG, "accept: " + integer);
                    }
                });
    }

    //制定發(fā)送角標(biāo),輸出B
    public void elementAt() {
        Observable.just("A", "B", "C")
                .elementAt(1, "X")
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.d(TAG, "accept: " + s);
                    }
                });


    }

八、條件操作符

any、all、contains,這些就是改變Java中if的書(shū)寫(xiě)方式,與、或、包含
all:全部為true,才是true,只要有一個(gè)為false,就是false
any:全部為 false,才是false, 只要有一個(gè)為true,就是true
contains :是否包含

    //等于Java中if的連續(xù)判斷,有一個(gè)等于C就返回false,輸出false
    public void all() {
        Observable.just("A", "B", "C", "D")
                .all(new Predicate<String>() {
                    @Override
                    public boolean test(String s) throws Exception {
                        return !s.equals("C");
                    }
                })
                .subscribe(new Consumer<Boolean>() { // 下游 觀察者
                    @Override
                    public void accept(Boolean s) throws Exception {
                        Log.d(TAG, "accept: " + s);
                    }
                });
    }

    //判斷包含
    public void contains() {
        Observable.just("A", "B", "C", "D")
                .contains("C")
                .subscribe(new Consumer<Boolean>() {
                    @Override
                    public void accept(Boolean s) throws Exception {
                        Log.d(TAG, "accept: " + s);
                    }
                });
    }

    //和上面的All相反,有一個(gè)等于C就返回true,輸出true
    public void any() {
        Observable.just("A", "B", "C", "D")
                .any(new Predicate<String>() {
                    @Override
                    public boolean test(String s) throws Exception {
                        return !s.equals("C");
                    }
                })
                .subscribe(new Consumer<Boolean>() { // 下游 觀察者
                    @Override
                    public void accept(Boolean s) throws Exception {
                        Log.d(TAG, "accept: " + s);
                    }
                });
    }

九、異常處理操作符

onErrorReturn、onErrorResumeNext、onExceptionResumeNext、retry
先模擬個(gè)錯(cuò)誤:

    public void onError() {

        Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                        for (int i = 0; i < 10; i++) {
                            if (i == 5) {
                               e.onError(new Throwable("模擬一個(gè)錯(cuò)誤"));
                            }
                            e.onNext(i);
                        }
                        e.onComplete();
                    }
                })

                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: " + e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete: ");
                    }
                });
    }

運(yùn)行結(jié)果:

D/MainActivity: onNext: 0
D/MainActivity: onNext: 1
D/MainActivity: onNext: 2
D/MainActivity: onNext: 3
D/MainActivity: onNext: 4
D/MainActivity: onError: 模擬一個(gè)錯(cuò)誤

上面代碼會(huì)在觀察者的onError中收到回調(diào),然后來(lái)看一下異常操作符能干什么,先看onErrorReturnonErrorResumeNext,區(qū)別就是onErrorReturn發(fā)送一次,onErrorResumeNext可以任意發(fā),跟上面很多其他的操作符一樣:

    private void onErrorReturn() {
        Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                        for (int i = 0; i < 10; i++) {
                            if (i == 5) {
                                e.onError(new Throwable("模擬一個(gè)錯(cuò)誤"));
                            }
                            e.onNext(i);
                        }
                        e.onComplete();
                    }
                })
                .onErrorReturn(new Function<Throwable, Integer>() {
                    @Override
                    public Integer apply(Throwable throwable) throws Exception {
                        return 400;
                    }
                })

                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: " + e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete: ");
                    }
                });
    }

運(yùn)行結(jié)果:

D/MainActivity: onNext: 400
D/MainActivity: onComplete: 

onErrorResumeNext:

    public void onErrorResumeNext() {

        Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                        for (int i = 0; i < 10; i++) {
                            if (i == 5) {
                                e.onError(new Throwable("模擬一個(gè)錯(cuò)誤"));
                            }
                            e.onNext(i);
                        }
                        e.onComplete();
                    }
                })

                .onErrorResumeNext(new Function<Throwable, ObservableSource<? extends Integer>>() {
                    @Override
                    public ObservableSource<? extends Integer> apply(Throwable throwable) throws Exception {

                        return Observable.create(new ObservableOnSubscribe<Integer>() {
                            @Override
                            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                                e.onNext(400);
                                e.onNext(4000);
                                e.onNext(40000);
                                e.onComplete();
                            }
                        });
                    }
                })

                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: " + e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete: ");
                    }
                });

    }

運(yùn)行結(jié)果:

D/MainActivity: onNext: 0
D/MainActivity: onNext: 1
D/MainActivity: onNext: 2
D/MainActivity: onNext: 3
D/MainActivity: onNext: 4
D/MainActivity: onNext: 400
D/MainActivity: onNext: 4000
D/MainActivity: onNext: 40000
D/MainActivity: onComplete: 

這里兩個(gè)注意點(diǎn):

  • onErrorReturn發(fā)生error后會(huì)自動(dòng)調(diào)用onComplete(),而onErrorResumeNext需要根據(jù)需要手動(dòng)調(diào)用
  • 都不會(huì)再觸發(fā)觀察者的onError()回調(diào),除非onErrorResumeNext中再手動(dòng)調(diào)用e.onError()

然后看下onExceptionResumeNext代碼:

    public void onExceptionResumeNext() {

        Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                        for (int i = 0; i < 10; i++) {
                            if (i == 5) {
                                e.onError(new Throwable("模擬一個(gè)錯(cuò)誤"));
                            }
                            e.onNext(i);
                        }
                        e.onComplete();
                    }
                })

                .onExceptionResumeNext(new ObservableSource<Integer>() {
                    @Override
                    public void subscribe(Observer<? super Integer> observer) {
                        observer.onNext(400);
                        observer.onNext(4000);
                        observer.onNext(40000);
                    }
                })

                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: " + e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete: ");
                    }
                });
    }

運(yùn)行結(jié)果:

D/MainActivity: onNext: 0
D/MainActivity: onNext: 1
D/MainActivity: onNext: 2
D/MainActivity: onNext: 3
D/MainActivity: onNext: 4
D/MainActivity: onError: 模擬一個(gè)錯(cuò)誤

onErrorResumeNext的運(yùn)行結(jié)果對(duì)比,很明顯沒(méi)有400、4000、40000,說(shuō)明新的Observer并不會(huì)起作用,這里用的是Throwable,如果是用Exception,同樣也會(huì)有400、4000、40000,所以:onErrorResumeNextonExceptionResumeNext對(duì)Exception的處理是一樣的流程,區(qū)別在于對(duì)Error處理的時(shí)候,是否會(huì)使用新的Observer發(fā)送消息,也就是onExceptionResumeNext不處理Error,直接回調(diào)觀察者的onError (),onErrorResumeNext都處理,不會(huì)再調(diào)用觀察者的onError ()。
然后是retry這個(gè)操作符,這個(gè)很簡(jiǎn)單,貼出三種常用的:

    public void retry() {

        Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                        for (int i = 0; i < 10; i++) {
                            if (i == 5) {
                                e.onError(new IllegalAccessError("模擬錯(cuò)誤"));
                            }
                            e.onNext(i);
                        }
                        e.onComplete();
                    }
                })
                
                //不設(shè)置重試次數(shù)
                .retry( new Predicate<Throwable>() {
                    @Override
                    public boolean test(Throwable throwable) throws Exception {
                        //true表示不停地重試 ,  false表示不重試
                        return true;
                    }
                })

//                //設(shè)置重試次數(shù)
//                .retry(3, new Predicate<Throwable>() {
//                    @Override
//                    public boolean test(Throwable throwable) throws Exception {
//                        //true表示按設(shè)置的次數(shù)重試 ,  false表示不重試
//                        return true;
//                    }
//                })
//
//                //可獲取重試次數(shù)
//                .retry(new BiPredicate<Integer, Throwable>() {
//                    @Override
//                    public boolean test(Integer integer, Throwable throwable) throws Exception {
//                        //相對(duì)上面兩種,這個(gè)integer表示重試次數(shù), 返回值跟上面一樣
//                        return true;
//                    }
//                })

                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.d(TAG, "onNext: " + integer);
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "onError: " + e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "onComplete: ");
                    }
                });
    }

十、線程切換

默認(rèn)發(fā)送和接收都是在主線程:

    private void schedulers() {
        Observable
                .create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(ObservableEmitter<String> e) throws Exception {
                        Log.e(TAG, "發(fā)送: " + Thread.currentThread().getName());
                        e.onNext("123");
                    }
                })
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e(TAG, "接收: " + Thread.currentThread().getName());
                    }
                });
    }

輸出:

E/MainActivity: 發(fā)送: main
E/MainActivity: 接收: main

可以通過(guò)subscribeOn()會(huì)同時(shí)修改觀察者和被觀察者的線程,通過(guò)observeOn()只設(shè)置觀察者線程,通過(guò)AndroidSchedulers.mainThread()得到主線程,通過(guò)Schedulers.io()得到子線程:

    private void schedulers() {
        Observable
                .create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(ObservableEmitter<String> e) throws Exception {
                        Log.e(TAG, "發(fā)送: " + Thread.currentThread().getName());
                        e.onNext("123");
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String s) throws Exception {
                        Log.e(TAG, "接收: " + Thread.currentThread().getName());
                    }
                });
    }

這一段,先通過(guò)subscribeOn(Schedulers.io())把觀察者和被觀察者都設(shè)置到子線程,如果不寫(xiě)下面這句observeOn(AndroidSchedulers.mainThread()),會(huì)輸出:

E/MainActivity: 發(fā)送: RxCachedThreadScheduler-1
E/MainActivity: 接收:  RxCachedThreadScheduler-1

但是下面又用observeOn(AndroidSchedulers.mainThread())把觀察者改回子線程,所以輸出:

E/MainActivity: 發(fā)送: RxCachedThreadScheduler-1
E/MainActivity: 接收: main

十一、背壓模式

當(dāng)上下游運(yùn)行在不同的線程中,且上游發(fā)射數(shù)據(jù)的速度大于下游接收處理數(shù)據(jù)的速度時(shí),就會(huì)產(chǎn)生背壓?jiǎn)栴},內(nèi)存使用越來(lái)越多,這時(shí)候就需要用Flowable去處理。Flowable會(huì)對(duì)上游發(fā)送的時(shí)間進(jìn)行緩存,緩存池也滿了(超出128)的時(shí)候會(huì)有4種不通的處理方式:

  • BackpressureStrategy.ERROR:就會(huì)拋出異常
  • BackpressureStrategy.DROP:把后面發(fā)射的事件丟棄
  • BackpressureStrategy.LATEST:把前面發(fā)射的事件丟棄
  • BackpressureStrategy.BUFFER:這種不會(huì)有上限,但是如果上游發(fā)送太多,也會(huì)造成內(nèi)存使用越來(lái)越大

Flowable的使用跟Observable很類似,簡(jiǎn)單使用:

    private void backpressure() {
        Flowable
                .create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                        // 改成129就會(huì)崩潰
                        for (int i = 0; i < 128; i++) {
                            e.onNext(i); // todo 1
                        }
                        e.onComplete();
                    }
                }, BackpressureStrategy.ERROR)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e(TAG, "接收: " + integer);
                    }
                });
    }

然后看一種完整模式的觀察者:

    private void backpressure() {
        Flowable
                .create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                        // 改成129就會(huì)崩潰
                        for (int i = 0; i < 128; i++) {
                            e.onNext(i); // todo 1
                        }
                        e.onComplete();
                    }
                }, BackpressureStrategy.ERROR)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.e(TAG, "接收: " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
    }

這一會(huì)發(fā)現(xiàn)觀察者收不到任何消息,這里跟Observable有個(gè)區(qū)別,就是訂閱的方法subscribe()的參數(shù),Observable訂閱對(duì)應(yīng)的是Observer,而Flowable對(duì)應(yīng)的是Subscriber,Observer和Subscriber對(duì)應(yīng)的回調(diào)onSubscribe(..)參數(shù)不同,Subscriber的onSubscribe(..)參數(shù)拿到的是一個(gè)Subscription,這個(gè)需要主動(dòng)去取數(shù)據(jù),比如:

                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(10);
                    }

這樣就會(huì)onNext()中就會(huì)收到前10個(gè)。那這個(gè)使用就很靈活了,根據(jù)代碼需要,可以在需要的地方主動(dòng)調(diào)用s.request(..),讓觀察者接收到數(shù)據(jù)。

十二、一個(gè)展示網(wǎng)絡(luò)圖片的例子

    private void getImage(final String path) {
        Observable.just(path)
                // 通過(guò)map變換操作符把String轉(zhuǎn)換成Bitmap
                .map(new Function<String, Bitmap>() {
                    @Override
                    public Bitmap apply(String s) throws Exception {
                        URL url = new URL(path);
                        HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
                        httpURLConnection.setConnectTimeout(5000);
                        int responseCode = httpURLConnection.getResponseCode();
                        if (HttpURLConnection.HTTP_OK == responseCode) {
                            Bitmap bitmap = BitmapFactory.decodeStream(httpURLConnection.getInputStream());
                            return bitmap;
                        }

                        return null;
                    }
                })
                // 下載圖片在子線程中
                .subscribeOn(Schedulers.io())
                // 設(shè)置圖片在主線程中
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Bitmap>() {

                    @Override
                    // 開(kāi)始操作前
                    public void onSubscribe(Disposable d) {
                        progressDialog = new ProgressDialog(MainActivity.this);
                        progressDialog.setMessage("正在下載中...");
                        progressDialog.show();
                    }

                    @Override
                    // 收到Bitmap
                    public void onNext(Bitmap bitmap) {
                        if (imageView != null) {
                            imageView.setImageBitmap(bitmap);
                        }
                    }

                    @Override
                    // 下載錯(cuò)誤
                    public void onError(Throwable e) {
                        if (progressDialog != null) {
                            progressDialog.dismiss();
                        }
                        if (imageView != null) {
                            imageView.setImageResource(R.mipmap.ic_launcher);
                        }
                        Log.e(TAG, "onError: " + e.toString());
                    }

                    @Override
                    // 下載完成
                    public void onComplete() {
                        if (progressDialog != null) {
                            progressDialog.dismiss();
                        }
                    }
                });
    }
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容