Android Rxjava框架的原理和使用

原理

Rx是Reactive Extensions的縮寫的簡寫,可以使用可觀察數(shù)據(jù)流對(duì)編程接口進(jìn)行異步編程,它結(jié)合了觀察者模式,迭代器模式和函數(shù)式的精華。

Rxjava是一種異步數(shù)據(jù)處理庫,也是一種觀察者模式。最早是Netflix公司用于重構(gòu)當(dāng)前架構(gòu)時(shí)減少REST調(diào)用的次數(shù),參考了Microsoft公司的響應(yīng)式編程,把Microsoft的Rx庫遷移到Java JVM中,其中最有名的就是RxJava。

它的特點(diǎn)主要有以下:

  1. 支持Java 8 Lambda。
  2. 支持異步和同步。
  3. 單一依賴關(guān)系。
  4. 簡潔,優(yōu)雅。

RxAndroid

在開發(fā)項(xiàng)目的時(shí)候,開發(fā)者在使用Rxjava時(shí)會(huì)搭配RxAndroid,他是針對(duì)Rxjava在Android平臺(tái)使用的一個(gè)響應(yīng)式擴(kuò)展組件。使用RxAndroid的Schedulers(調(diào)度器)可以解決Android主線程問題, 多線程等問題。

觀察者模式的四大要素

  1. Observable 被觀察者
  2. Observer
  3. 觀察者 subscribe 訂閱
  4. 事件
image.png

觀察者訂閱被觀察者,一旦被觀察者發(fā)出事件,觀察者就可以接收到。

擴(kuò)展的觀察者模式

image.png

onNext()訂閱了一個(gè)事件,當(dāng)事件完成時(shí)會(huì)回調(diào)onComplete(),在完成過程中發(fā)生了異常會(huì)回調(diào)onError()。

使用

依賴

//在Project的gradle下添加maven倉庫
maven { url "https://oss.jfrog.org/libs-snapshot" }

implementation 'io.reactivex.rxjava3:rxjava:3.0.4'
implementation 'io.reactivex.rxjava3:rxandroid:3.0.0'

Hello World

//1.創(chuàng)建被觀察者
Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("hello world");
                emitter.onComplete();
            }
        });
//2.創(chuàng)建觀察者
Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                System.out.println("onSubscribe():");
            }

            @Override
            public void onNext(@NonNull String s) {
                System.out.println("onNext():" + s);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                System.out.println("onError():" + e.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete():");
            }
        };
//3.訂閱事件
observable.subscribe(observer);

注意:onError()和onComplete()只會(huì)回調(diào)一個(gè)。

操作符

Creating Observables(創(chuàng)建 Observable)

Create

//鏈?zhǔn)綄懛?        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("hello world");
                emitter.onComplete();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("onSubscribe():"+d.toString());
            }

            @Override
            public void onNext(String o) {
                System.out.println("onNext():" + o);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("onError():" + e.getMessage());
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete():");
            }
        });

Just
使用將為你創(chuàng)建一個(gè)Observable并自動(dòng)為你調(diào)用onNext( )發(fā)射數(shù)據(jù),just中傳遞的參數(shù)將直接在Observer的onNext()方法中接收到。

Observable.just("hello world").subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(String s) {
                System.out.println(s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

From

將一個(gè)Iterable, 一個(gè)Future, 或者一個(gè)數(shù)組轉(zhuǎn)換成一個(gè)Observable,遍歷集合,發(fā)送每個(gè)item。相當(dāng)于多次回調(diào)onNext()方法,每次傳入一個(gè)item。

List<String> list = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            list.add("Hello" + i);
        }

        Observable.fromArray(list).subscribe(new Observer<List<String>>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(List<String> strings) {
                System.out.println(strings);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

Defer

當(dāng)觀察者訂閱時(shí),才創(chuàng)建Observable,并且針對(duì)每個(gè)觀察者創(chuàng)建都是一個(gè)新的Observable。 以何種方式創(chuàng)建這個(gè)Observable對(duì)象,當(dāng)滿足回調(diào)條件后,就會(huì)進(jìn)行相應(yīng)的回調(diào)。

value = "2020/12/13";
    Observable<String> observable = Observable.defer(new Supplier<ObservableSource<? extends String>>() {
        @Override
        public ObservableSource<? extends String> get() throws Throwable {
            return Observable.just(value);
        }
    });
    value = "12345";
    observable.subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {

        }

        @Override
        public void onNext(String s) {
            System.out.println(s);
        }

        @Override
        public void onError(Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    });

Empty/Never/Throw

Empty是創(chuàng)建一個(gè)不發(fā)射任何數(shù)據(jù)但是正常終止的Observable。 Never是創(chuàng)建一個(gè)不發(fā)射數(shù)據(jù)也不終止的Observable。 Throw是創(chuàng)建一個(gè)不發(fā)射數(shù)據(jù)以一個(gè)錯(cuò)誤終止的Observable。 這三個(gè)操作符生成的Observable行為非常特殊和受限。測(cè)試的時(shí)候很有用,有時(shí)候也用于結(jié)合其它的Observables,或者作為其它需要Observable的操作符的參數(shù)。

Observable.defer(new Supplier<ObservableSource<?>>() {
            @Override
            public ObservableSource<?> get() throws Throwable {
                return Observable.error(new Throwable("你寫了個(gè)bug"));
            }
        }).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Object o) {
            }

            @Override
            public void onError(@NonNull Throwable e) {
                System.out.println(e.getMessage());
            }

            @Override
            public void onComplete() {

            }
        });

Interval

創(chuàng)建一個(gè)按固定時(shí)間間隔發(fā)射整數(shù)序列的Observable,可用作定時(shí)器。即按照固定1秒一次調(diào)用onNext()方法。

//TrampolineScheduler不會(huì)立即執(zhí)行,當(dāng)其他排隊(duì)任務(wù)結(jié)束時(shí)才執(zhí)行,TrampolineScheduler運(yùn)行在主線程。

Observable.interval(1000, TimeUnit.MILLISECONDS, Schedulers.trampoline()).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {
            }

            @Override
            public void onNext(@NonNull Long aLong) {
                System.out.println(aLong);
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

Repeat

創(chuàng)建一個(gè)Observable,該Observable的事件可以重復(fù)調(diào)用。

 Observable.just(123).repeat(2).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

Start

返回一個(gè)Observable,它發(fā)射一個(gè)類似于函數(shù)聲明的值。

Timer

創(chuàng)建一個(gè)Observable,它在一個(gè)給定的延遲后發(fā)射一個(gè)特殊的值,即表示延遲2秒后,調(diào)用onNext()方法。

 Observable.timer(2000, TimeUnit.MILLISECONDS,Schedulers.trampoline()).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Long aLong) {
                System.out.println(aLong);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

Transforming Observables(轉(zhuǎn)換 Observable)

Map

Map就是把原來的Observable對(duì)象轉(zhuǎn)換成另一個(gè)Observable對(duì)象,同時(shí)將傳輸?shù)臄?shù)據(jù)進(jìn)行一些靈活的操作,方便Observer獲得想要的數(shù)據(jù)形式。

//Integer to String
Observable.just(123).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer s) throws Exception {
                return s.toString();
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String s) {
                System.out.println(s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

flatMap對(duì)于數(shù)據(jù)的轉(zhuǎn)換比map()更加徹底,如果發(fā)送的數(shù)據(jù)是集合,flatmap()重新生成一個(gè)Observable對(duì)象,并把數(shù)據(jù)轉(zhuǎn)換成Observer想要的數(shù)據(jù)形式。它可以返回任何它想返回的Observable對(duì)象。

 Observable.just(1, 2, 3, 4, 5).flatMap(new Function<Integer, ObservableSource<? extends String>>() {
            @Override
            public ObservableSource<? extends String> apply(Integer integer) throws Exception {
                return Observable.just(integer.toString());
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String o) {
                System.out.println(o);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

GroupBy

根據(jù)規(guī)則對(duì)數(shù)據(jù)進(jìn)行分組。

Observable.just(1, 2, 3, 4, 5).groupBy(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return integer % 2==0?"偶數(shù)":"奇數(shù)";
            }
        }).subscribe(new Observer<GroupedObservable<String, Integer>>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull final GroupedObservable<String, Integer> arg0) {
                arg0.subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println(arg0.getKey() + "-------" + integer);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

Buffer

定期收集Observable的數(shù)據(jù)放進(jìn)一個(gè)數(shù)據(jù)包裹,然后發(fā)射這些數(shù)據(jù)包裹,而不是一次發(fā)射一個(gè)值。

Observable.just(1, 2, 3, 4, 5,6).buffer(3).subscribe(new Observer<List<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(List<Integer> integers) {
                System.out.println(integers);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

Scan

將數(shù)據(jù)進(jìn)行累加。

Observable.range(1, 5).scan(new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

Window

window和buffer相似,它返回的是一個(gè)Observable對(duì)象,它根據(jù)一系列任務(wù)規(guī)則把數(shù)據(jù)聚集到一個(gè)列表。

//        window第一個(gè)參數(shù)count:每個(gè)窗口應(yīng)發(fā)射前的最大大小;第二個(gè):在啟動(dòng)新窗口之前需要跳過多少項(xiàng)
        Observable.range(1, 5).window(5, 1).subscribe(new Observer<Observable<Integer>>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(final Observable<Integer> arg0) {
                System.out.println(arg0);
                arg0.subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println("---"+integer);
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

Filtering Observables(過濾 Observable)

Debounce

操作間隔一定時(shí)間內(nèi)沒有做任何操作,數(shù)據(jù)才會(huì)發(fā)送到觀察者。

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> arg0) throws Exception {
                for (int i = 0; i < 10; i++) {
                    Thread.sleep(2000);
                    arg0.onNext(i);
                }
                arg0.onComplete();
            }
        }).debounce(1, TimeUnit.SECONDS).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer o) {
                System.out.println(o);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {
            }
        });

Distinct

去掉重復(fù)數(shù)據(jù)的操作符。

Observable.just(1, 2, 3, 2, 3).distinct().subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

ElementAt

取出指定位置的數(shù)據(jù)。

Observable.just(1, 2, 3, 2, 3).distinct().subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

Filter

對(duì)數(shù)據(jù)進(jìn)行指定規(guī)則的過濾。

Observable.just(1, 2, 3, 4).filter(new Predicate<Integer>() {
            @Override
            public boolean test(Integer integer) throws Exception {

                return integer > 2;
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

First

取數(shù)據(jù)中的第一個(gè)數(shù)據(jù)。

//first參數(shù):defaultItem: 當(dāng)前Observable不發(fā)射任何內(nèi)容時(shí)發(fā)出的默認(rèn)項(xiàng)
        Observable.just(1, 2, 3,4,5).first(10).subscribe(new SingleObserver<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onSuccess(Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {

            }
        });

IgnoreElements

忽略所有的數(shù)據(jù),不向觀察者發(fā)送數(shù)據(jù),直接回調(diào)onError或onComplete()。

 Observable.just(6, 9, 1, 3).distinct().ignoreElements().subscribe(new CompletableObserver() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onComplete() {

            }

            @Override
            public void onError(Throwable e) {

            }

        });

Last

列表數(shù)據(jù)最后指定的數(shù)位項(xiàng)數(shù)據(jù)。 SingleObserver只發(fā)射一條單一的數(shù)據(jù),或者一條異常通知,不能發(fā)射完成通知,其中數(shù)據(jù)與通知只能發(fā)射一個(gè)。

     Observable.just(6, 4, 2,4).distinct().map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return integer.toString();
            }
        }).last("4").subscribe(new SingleObserver<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onSuccess(String s) {
                System.out.println(s);
            }

            @Override
            public void onError(Throwable e) {

            }
        });
 Observable.just(6, 4, 2,4).distinct().map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return integer.toString();
            }
        }).last("4").subscribe(new SingleObserver<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onSuccess(String s) {
                System.out.println(s);
            }

            @Override
            public void onError(Throwable e) {

            }
        });

Sample

對(duì)數(shù)據(jù)源進(jìn)行樣本采集,發(fā)送給觀察者。

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> arg0) throws Exception {
                for (int i = 0; i < 10; i++) {
                    Thread.sleep(1000);
                    arg0.onNext(i);
                }
                arg0.onComplete();
            }
        }).sample(4, TimeUnit.SECONDS).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {

                System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

Skip

跳過指定列表項(xiàng)數(shù)據(jù)的指定項(xiàng)數(shù)據(jù)。

Observable.just(6, 3, 2, 1).skip(2).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

SkipLast

跳過列表數(shù)據(jù)的最后幾位數(shù)據(jù)。

Observable.just(1, 2, 3, 5).skipLast(2).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

Take

只取列表數(shù)據(jù)的前幾項(xiàng)。

Observable.just(1, 2, 3, 4).take(2).takeLast(1).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

TakeLast

取列表數(shù)據(jù)項(xiàng)的最后幾項(xiàng)數(shù)據(jù)。 Consumer是簡易版的Observer,他有多重重載,可以自定義你需要處理的信息,他只提供一個(gè)回調(diào)接口accept,由于沒有onError和onCompete,無法再 接受到onError或者onCompete之后,實(shí)現(xiàn)函數(shù)回調(diào)。

Observable.just(1, 2, 3, 4).takeLast(2).take(1).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println(integer);
            }
        });
Combining Observables(組合 Observable)

Zip

通過一個(gè)函數(shù)將多個(gè)Observables的發(fā)射物結(jié)合到一起,基于這個(gè)函數(shù)的結(jié)果為每個(gè)結(jié)合體發(fā)射單個(gè)數(shù)據(jù)項(xiàng)。當(dāng)其中一個(gè)Observable發(fā)送數(shù)據(jù)結(jié)束或異常,另外一個(gè)也停止發(fā)送。

 Observable<Integer> observable = Observable.just(10, 20, 30);
        Observable<Integer> observable1 = Observable.just(4, 8, 12, 16);
        Observable.zip(observable, observable1, new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

Merge

合并多個(gè)Observables的發(fā)射物。

Observable<Integer> observable = Observable.just(10, 20, 30);
        Observable<Integer> observable1 = Observable.just(4, 8, 12, 16);
        Observable.merge(observable, observable1).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

StartWith

在數(shù)據(jù)序列的開頭插入一條指定的項(xiàng)。

Observable<Integer> observable = Observable.just(10, 20, 30);
        Observable<Integer> observable1 = Observable.just(4, 8, 12, 16);
        Disposable subscribe = observable.startWith(observable1).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                System.out.println(integer);
            }
        });

CombineLatest

當(dāng)兩個(gè)Observables中的任何一個(gè)發(fā)射了數(shù)據(jù)時(shí),使用一個(gè)函數(shù)結(jié)合每個(gè)Observable發(fā)射的最近數(shù)據(jù)項(xiàng),并且基于這個(gè)函數(shù)的結(jié)果發(fā)射數(shù)據(jù)。

Observable<Integer> observable = Observable.just(1, 3, 5);
        Observable<Integer> observable1 = Observable.just(2, 4, 6);
        Observable.combineLatest(observable, observable1, new BiFunction<Integer, Integer, Integer>() {
            @Override
            public Integer apply(Integer integer, Integer integer2) throws Throwable {
                System.out.println("integer:" + integer + "---" + "integer2:" + integer2);
                return integer + integer2;
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(@NonNull Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

Join

任何時(shí)候,只要在另一個(gè)Observable發(fā)射的數(shù)據(jù)定義的時(shí)間窗口內(nèi),這個(gè)Observable發(fā)射了一條數(shù)據(jù),就結(jié)合兩個(gè)Observable發(fā)射的數(shù)據(jù)。

String[] args1 = new String[]{"張欣1", "張欣2", "張欣3", "張欣4", "張欣5"};
    String[] args2 = new String[]{"春曉1", "春曉2", "春曉3", "春曉4"};
    Observable<String> o1 = Observable.fromArray(args1);
    Observable<String> o2 = Observable.fromArray(args2);
    //相同的數(shù)組可以進(jìn)行合并
    o2.join(o1, new Function<String, Observable<Long>>() {
        @Override
        public Observable<Long> apply(String s) throws Exception {
            return Observable.timer(2, TimeUnit.SECONDS);
        }
    }, new Function<String, Observable<Long>>() {
        @Override
        public Observable<Long> apply(String s) {
            return Observable.timer(2, TimeUnit.SECONDS);
        }
    }, new BiFunction<String, String, String>() {
        @Override
        public String apply(String s, String s2) throws Exception {
            return s + "-&--" + s2;
        }
    }).subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {

        }

        @Override
        public void onNext(@NonNull String s) {
            System.out.println(s);
        }

        @Override
        public void onError(@NonNull Throwable e) {

        }

        @Override
        public void onComplete() {

        }
    });

SwitchOnNext

將一個(gè)發(fā)射多個(gè)Observables的Observable轉(zhuǎn)換成另一個(gè)單獨(dú)的Observable,后者發(fā)射那些Observables最近發(fā)射的數(shù)據(jù)項(xiàng)。

        final Observable<Long> observable1 = Observable.intervalRange(1, 5, 1, 500, TimeUnit.MILLISECONDS);
        final Observable<Long> observable2 = Observable.intervalRange(10, 5, 1, 500, TimeUnit.MILLISECONDS);
        Observable<Observable<Long>> sources = Observable.create(new ObservableOnSubscribe<Observable<Long>>() {
            @Override
            public void subscribe(ObservableEmitter<Observable<Long>> emitter) throws Exception {
                emitter.onNext(observable1);
                Thread.sleep(1000);
                // 此時(shí)發(fā)射一個(gè)新的observable2,將會(huì)取消訂閱observable1
                emitter.onNext(observable2);
                emitter.onComplete();
            }
        });

        // 創(chuàng)建發(fā)射含有Error通知的Observable序列的Observable
        Observable<Observable<Long>> sourcesError = Observable.create(new ObservableOnSubscribe<Observable<Long>>() {

            @Override
            public void subscribe(ObservableEmitter<Observable<Long>> emitter) throws Exception {
                emitter.onNext(observable1);
//                emitter.onNext(Observable.error(new Exception("Error Test1!"))); // 發(fā)射一個(gè)發(fā)射Error通知的Observable
//                emitter.onNext(Observable.error(new Exception("Error Test2!"))); // 發(fā)射一個(gè)發(fā)射Error通知的Observable
                Thread.sleep(1000);
                // 此時(shí)發(fā)射一個(gè)新的observable2,將會(huì)取消訂閱observable1
                emitter.onNext(observable2);
                emitter.onComplete();
            }
        });

        // 1. switchOnNext(ObservableSource<ObservableSource> sources, int bufferSize)
        // 可選參數(shù) bufferSize: 緩存數(shù)據(jù)項(xiàng)大小
        // 接受一個(gè)發(fā)射Observable序列的Observable類型的sources,
        // 當(dāng)sources發(fā)射一個(gè)新的Observable后,則會(huì)取消訂閱前面的舊observable,直接開始接受新Observable的數(shù)據(jù)
        Disposable subscribe = Observable.switchOnNext(sources)
                .subscribe(new Consumer<Long>() {

                    @Override
                    public void accept(Long integer) throws Exception {
                        System.out.println("--> accept(1): " + integer);
                    }
                });
        System.out.println("--------------------------------------------------------------------");
        // 2. switchOnNextDelayError(ObservableSource<ObservableSource> sources, int prefetch)
        // 可選參數(shù) prefetch: 與讀取數(shù)據(jù)項(xiàng)大小
        // 當(dāng)sources發(fā)射一個(gè)新的Observable后,則會(huì)取消訂閱前面的舊observable,直接開始接受新Observable的數(shù)據(jù),
        // 保留onError通知直到合并后的Observable所有的數(shù)據(jù)發(fā)射完成,在那時(shí)它才會(huì)把onError傳遞給觀察者
        Observable.switchOnNextDelayError(sourcesError)
                .subscribe(new Observer<Long>() {

                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("--> onSubscribe(2)");
                    }

                    @Override
                    public void onNext(Long t) {
                        System.out.println("--> onNext(2): " + t);
                    }

                    @Override
                    public void onError(Throwable e) {
                        // 判斷是否是CompositeException對(duì)象(發(fā)生多個(gè)Observable出現(xiàn)Error時(shí)會(huì)發(fā)送的對(duì)象)
                        if (e instanceof CompositeException) {
                            CompositeException compositeException = (CompositeException) e;
                            List<Throwable> exceptions = compositeException.getExceptions();
                            System.out.println("--> onError(2): " + exceptions);
                        } else {
                            System.out.println("--> onError(2): " + e);
                        }
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("--> onComplete(2)");
                    }
                });

Error Handling Operators(處理錯(cuò)誤)

Catch

Catch操作符攔截原始Observable的onError通知,將它替換為其它的數(shù)據(jù)項(xiàng)或數(shù)據(jù)序列,讓產(chǎn)生的Observable能夠正常終止或者根本不終止。 還有一個(gè)叫onErrorResumeNext的操作符,它的行為與Catch相似。 RxJava將Catch實(shí)現(xiàn)為三個(gè)不同的操作符:

  • onErrorReturn 讓Observable遇到錯(cuò)誤時(shí)發(fā)射一個(gè)特殊的項(xiàng)并且正常終止。
  • onErrorResumeNext 讓Observable在遇到錯(cuò)誤時(shí)開始發(fā)射第二個(gè)Observable的數(shù)據(jù)序。
  • onExceptionResumeNext 讓Observable在遇到錯(cuò)誤時(shí)繼續(xù)發(fā)射后面的數(shù)據(jù)項(xiàng)。
Observable.just(1,2,3).onErrorReturn(new Function<Throwable, Integer>() {
            @Override
            public Integer apply(Throwable throwable) throws Throwable {
                return  null;
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull Integer integer) {
                System.out.println(integer);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                System.out.println(e.getMessage());
            }

            @Override
            public void onComplete() {

            }
        });

Retry

如果原始Observable遇到錯(cuò)誤,重新訂閱它期望它能正常終止。 retryWhen和retry類似,區(qū)別是,retryWhen將onError中的Throwable傳遞給一個(gè)函數(shù),這個(gè)函數(shù)產(chǎn)生另一個(gè)Observable,retryWhen觀察它的結(jié)果再?zèng)Q定是不是要重新訂閱原始的Observable。如果這個(gè)Observable發(fā)射了一項(xiàng)數(shù)據(jù),它就重新訂閱,如果這個(gè)Observable發(fā)射的是onError通知,它就將這個(gè)通知傳遞給觀察者然后終止。 retryWhen默認(rèn)在trampoline調(diào)度器上執(zhí)行,你可以通過參數(shù)指定其它的調(diào)度器。

場景:網(wǎng)絡(luò)請(qǐng)求失敗重試操作。

final AtomicInteger atomicInteger = new AtomicInteger(3);
        Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
                emitter.onNext(String.valueOf(System.currentTimeMillis()));
                emitter.onError(new Error(String.valueOf(atomicInteger.decrementAndGet())));
            }
        }).retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
            @Override
            public ObservableSource<?> apply(Observable<Throwable> throwableObservable) throws Throwable {
                return throwableObservable;
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(@NonNull Disposable d) {

            }

            @Override
            public void onNext(@NonNull String s) {
                System.out.println(s);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                System.out.println(e.getMessage());
            }

            @Override
            public void onComplete() {

            }
        });

更多相關(guān)文章

Android如何進(jìn)階:http://docs.qq.com/doc/DWHFqVHBMVEJPWUx
Android面試題匯總:http://docs.qq.com/doc/DWGZIRFh5VEtYWE1D
Android音視頻需要學(xué)習(xí)哪些:http://docs.qq.com/doc/DWFFWZHNPTHZVdHFX
Android常有的開源框架有哪些框:docs.qq.com/doc/DWHlGYUdseVhsSUda
Android車載應(yīng)需要學(xué)習(xí)哪些:docs.qq.com/doc/DWEl0blBabXVvU2Nw Android
Framework怎么學(xué):docs.qq.com/doc/DWFdlc2JocEtNbEJ1

Schedulers(調(diào)度器)

它是RxJava以一種及其簡單的方式解決多線程問題的機(jī)制。

種類

io() 用于I/O操作。 computation() 計(jì)算,計(jì)算工作默認(rèn)的調(diào)度器,與I/O操作無關(guān)。 immediate() 立即執(zhí)行,允許立即在當(dāng)前線程執(zhí)行你指定的工作。 newThread() 新線程,為指定任務(wù)創(chuàng)建新線程。 trampoline() 順序處理,按需處理隊(duì)列,并運(yùn)行隊(duì)列的每一個(gè)任務(wù)。

AndroidSchedulers

RxAndroid提供在Android平臺(tái)的調(diào)度器(指定觀察者在主線程)。

  • SubscribeOn 方法用于每個(gè)Observable對(duì)象
  • ObserveOn 方法用于每個(gè)Subscriber(Observer)對(duì)象
 observable.subscribeOn(Schedulers.newThread())
                    .unsubscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .onErrorResumeNext(new HttpErrorHandler<T>())
                    .subscribe(observer);

使用場景

與Retrofit結(jié)合使用

retrofitBuilder = new Retrofit.Builder();
        retrofitBuilder.client(okHttpClient)
                .addConverterFactory(ScalarsConverterFactory.create())
                .addConverterFactory(GsonConverterFactory.create())
                .addCallAdapterFactory(RxJava2CallAdapterFactory.create());
                
public <T> void doCall(LifecycleOwner owner, Observable<T> observable, final HttpCallBack<T> httpCallBack) {

        if (observable == null || httpCallBack == null) {
            throw new IllegalArgumentException("observable或HttpCallBack為空");
        }

        //觀察者_(dá)網(wǎng)絡(luò)請(qǐng)求狀態(tài)
        BaseObserver<T> observer = new BaseObserver<T>() {
            @Override
            public void onNext(T t) {
                try {
                    if (t != null) {
                        httpCallBack.onSuccess(t);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    httpCallBack.onFailure(e);
                }
            }

            @Override
            public void onError(Throwable e) {
                httpCallBack.onFailure(e);
            }

        };

        if (owner == null) {
            //被觀察者訂閱觀察者,根據(jù)生命周期取消訂閱,子線程訂閱主線程觀察
            observable.subscribeOn(Schedulers.newThread())
                    .unsubscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .map(getAppErrorHandler())
                    .onErrorResumeNext(new HttpErrorHandler<T>())
                    .subscribe(observer);
        } else {
            //被觀察者訂閱觀察者,根據(jù)生命周期取消訂閱,子線程訂閱主線程觀察
            observable.subscribeOn(Schedulers.newThread())
                    .unsubscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .map(getAppErrorHandler())
                    .onErrorResumeNext(new HttpErrorHandler<T>())
                    .as(AutoDispose.autoDisposable(AndroidLifecycleScopeProvider.from(owner, Lifecycle.Event.ON_DESTROY)))
                    .subscribe(observer);
        }
    }

與RxPermission結(jié)合使用

RxPermission是基于RxJava的Android動(dòng)態(tài)權(quán)限申請(qǐng)框架。

使用(簡單封裝)

  public void initPermissions(String[] permissions, PermissionResult permissionResult) {
        if (rxPermissions == null) {
            rxPermissions = new RxPermissions(this);
        }
        rxPermissions.requestEachCombined(permissions)
                .subscribe(permission -> {
                    if (permission.granted) {
                        permissionResult.onSuccess();
                    } else if (permission.shouldShowRequestPermissionRationale) {
                        permissionResult.onFailure();
                    } else {
                        permissionResult.onFailureWithNeverAsk();
                    }
                });
    }

代替EventBus

EventBus是一個(gè)Android端優(yōu)化的publish/subscribe消息總線,簡化了應(yīng)用程序內(nèi)各組件間、組件與后臺(tái)線程間的通信。更多相關(guān)請(qǐng)參考Android事件總線之EventBus。 RxJava也可以實(shí)現(xiàn)事件總線,因?yàn)樗鼈兌家罁?jù)于觀察者模式。我們使用RxJava替換EventBus,可以減少App的體積。

使用

private static volatile RxBus instance;
    private PublishSubject<Object> mRxtBus;
    
    public static RxBus getDefault() {
        if (instance == null) {
            synchronized (RxBus.class) {
                instance = new RxBus();
            }
        }
        return instance;
    }

    private RxBus() {
        mRxtBus = PublishSubject.create();
    }

    public void post(String tag, Object event) {
        Message msg = new Message(tag, event);
        mRxtBus.onNext(msg);
    }

    public <T> Observable<T> toEvent(Class<T> eventType) {
        return mRxtBus.ofType(eventType);
    }
 }

發(fā)送

RxBus.getDefault().post("payValue",code);

接收

subscribe = RxBus.getDefault().toEvent(RxBus.Message.class).subscribe(new Consumer<RxBus.Message>() {
            @Override
            public void accept(RxBus.Message message) throws Throwable {
                if ("payValue".equals(message.getTag())) {
                    Log.e("yhj", "accept: " + message.getEvent().toString());
                }
            }
        });

解綁

if (subscribe != null && !subscribe.isDisposed()) {
            subscribe.dispose();
        }

Rxjava內(nèi)存泄漏的處理

Rxjava的使用不當(dāng)會(huì)導(dǎo)致內(nèi)存泄漏,使用AutoDispose可以解決這個(gè)問題,它是一個(gè)隨Android生命周期事件自動(dòng)解綁Rxjava訂閱的方便工具。

使用

結(jié)合JetPack的LifeCycle(生命周期感知型組件),根據(jù)生命周期取消訂閱。

//被觀察者訂閱觀察者,根據(jù)生命周期取消訂閱,子線程訂閱主線程觀察
            observable.subscribeOn(Schedulers.newThread())
                    .unsubscribeOn(Schedulers.newThread())
                    .observeOn(AndroidSchedulers.mainThread())
                    .map(getAppErrorHandler())
                    .onErrorResumeNext(new HttpErrorHandler<T>())
                    .as(AutoDispose.autoDisposable(AndroidLifecycleScopeProvider.from(owner, Lifecycle.Event.ON_DESTROY)))
                    .subscribe(observer);

總結(jié)

本文主要是對(duì)RxJava使用及Android常見使用場景進(jìn)行總結(jié),掌握這些還遠(yuǎn)遠(yuǎn)不夠,RxJava還有許多強(qiáng)大的功能,諸如從磁盤/內(nèi)存中獲取緩存數(shù)據(jù),背壓策略,聯(lián)想搜索優(yōu)化等等。后面在項(xiàng)目開發(fā)中遇到相關(guān)場景再進(jìn)行總結(jié),更新。本文若有不當(dāng)之處,請(qǐng)批評(píng)指正。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容