RxJava 知識(shí)梳理(3) - RxJava2 基礎(chǔ)知識(shí)小結(jié)

前言

首先要感謝 Season_zlc 的一系列RxJava2的教程,關(guān)于上游、下游、水缸的類比,讓我對(duì)于整個(gè)RxJava2的基本思想有了更加清晰的認(rèn)識(shí)。大家有興趣的話一定要多看看,寫的通俗易懂,傳送門:給初學(xué)者的 RxJava 2.0 教程 (一) ,本文的思想都來源于它的一系列文章。

文章比較長,為了避免耽誤大家的時(shí)間,先列出需要介紹的知識(shí)點(diǎn):


一、RxJava2 的基本模型

1.1 使用實(shí)例

在開始學(xué)習(xí)之前,我們先看一下最簡(jiǎn)單的例子:

  • 第一步:導(dǎo)入依賴包:
dependencies {
    //在build.gradle中,導(dǎo)入依賴。
    compile 'io.reactivex.rxjava2:rxjava:2.0.1'
    compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
}
  • 第二步:使用最基本的Observable + Observer的最簡(jiǎn)單示例,這里我們?cè)谏嫌伟l(fā)送了四個(gè)onNext(String s)事件之后,最后發(fā)送了一個(gè)onComplete()事件。
    public static void classicalSample() {
        Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
                observableEmitter.onNext("1");
                observableEmitter.onNext("2");
                observableEmitter.onNext("3");
                observableEmitter.onNext("4");
                observableEmitter.onComplete();

            }
        }).subscribe(new Observer<String>() {

            private Disposable mDisposable;

            @Override
            public void onSubscribe(Disposable disposable) {
                Log.d(TAG, "onSubscribe");
                mDisposable = disposable;
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext=" + s);
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "onError");

            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");

            }
        });
    }
  • 第三步:運(yùn)行結(jié)果,訂閱成功之后,會(huì)依次回調(diào)以下三步操作:onSubscribe;onNextonComplete。

1.2 基本元素

在上面的例子中,涉及到了以下五個(gè)類:

  • Observable:上游。
  • ObservableOnSubscribe:上游的create方法所接收的參數(shù)。
  • ObservableEmitter:上游事件的發(fā)送者。
  • Observer:下游的接收者。
  • Disposable:用于維系上游、下游之間的聯(lián)系。

對(duì)于整個(gè)模型,可以總結(jié)為以下幾點(diǎn):

  • RxJava2簡(jiǎn)單的來說,就是一個(gè)發(fā)送事件、接收事件的過程,我們可以將發(fā)送事件方類比作上游,而接收事件方類比作下游。
  • 上游每產(chǎn)生一個(gè)事件,下游就能收到事件,上游對(duì)應(yīng)Observable,而下游對(duì)應(yīng)Observer。
  • 只有當(dāng)上游和下游建立連接之后,上游才會(huì)開始發(fā)送事件,這一關(guān)系的建立是通過subscribe方法。

各關(guān)鍵元素的UML圖如下:

1.3 ObservableEmitter

用于 發(fā)出事件,它可以分別發(fā)出onNext/onComplete/onError事件:

  • 上游可以發(fā)送無限個(gè)onNext,下游也可以接收無限個(gè)onNext。
  • 當(dāng)上游發(fā)送了一個(gè)onComplete/onError后,上游onComplete/onError后的事件將會(huì)繼續(xù)發(fā)送,但是下游在收到onComplete/onError事件后不再繼續(xù)接收事件。
  • 上游可以不發(fā)送onComplete或者onError事件。
  • 調(diào)用onError或者onComplete切斷了上游和下游的聯(lián)系,在聯(lián)系切斷后上游再發(fā)送onError事件就會(huì)報(bào)錯(cuò),onCompleteonError的調(diào)用情況有以下幾種:
    (1) onComplete可以發(fā)送多次,但是只會(huì)收到一次回調(diào)。
    (2) onError只可以發(fā)送一次,發(fā)送多次會(huì)報(bào)錯(cuò)。
    (3) onComplete之后不可以發(fā)送onError,否則會(huì)報(bào)錯(cuò)。
    (4) onError之后可以發(fā)送onComplete,但是只會(huì)收到onError事件。
  • onError的參數(shù)不允許為空。

其繼承關(guān)系如下圖所示:


1.4 Disposable

理解成為 水管的機(jī)關(guān),當(dāng)調(diào)用它的dispose方法時(shí),將會(huì)將上游和下游之間的管道切斷,從而導(dǎo)致 下游接收不到事件。

  • ObserveronSubscribe回調(diào)中,會(huì)傳入一個(gè)Disposable對(duì)象,下游可以通過該對(duì)象的dispose()方法主動(dòng)切斷和上游的聯(lián)系,在這之后上游的observableEmitter.isDisposed()方法將返回true
  • 當(dāng)上游和下游的聯(lián)系切斷之后,下游收不到包括onComplete/onError在內(nèi)的任何事件,若此時(shí)上游再調(diào)用onError方法發(fā)送事件,那么將會(huì)報(bào)錯(cuò)。

我們來模擬一下,在下游收到2之后,通過Disposable來切斷上游和下游之間的聯(lián)系:

    public static void classicalSample() {
        Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
                observableEmitter.onNext("1");
                observableEmitter.onNext("2");
                observableEmitter.onNext("3");
                observableEmitter.onNext("4");
                observableEmitter.onComplete();

            }
        }).subscribe(new Observer<String>() {

            private Disposable mDisposable;

            @Override
            public void onSubscribe(Disposable disposable) {
                Log.d(TAG, "onSubscribe");
                mDisposable = disposable;
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext=" + s);
                if ("2".equals(s)) {
                    mDisposable.dispose();
                }
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "onError");

            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");

            }
        });
    }

最終的運(yùn)行結(jié)果為:


1.5 Subscribe 的重載方法

通過subscribe確定上游和下游的聯(lián)系有以下幾種方法:


可以看到,這里可以分為三類:

  • 不帶參數(shù)
  • Consumer<T>
  • Observer
  • Action

對(duì)于不使用Observer類作為形參的subscribe函數(shù),其實(shí)實(shí)現(xiàn)的功能和使用Observer類作為參數(shù)的方法相同,只不過它們是將Observer的四個(gè)回調(diào)分解成形參,有參數(shù)的回調(diào)用Consumer<T>代替,而沒有參數(shù)的則用Action代替。

二、線程切換

2.1 基本概念

  • 當(dāng)我們?cè)谏嫌蝿?chuàng)建一個(gè)Observable來發(fā)送事件,那么這個(gè)上游就默認(rèn)在主線程發(fā)送事件;而當(dāng)我們?cè)谙掠蝿?chuàng)建一個(gè)Observer來接收事件,那么這個(gè)下游就默認(rèn)在主線程中接收事件。
  • subscribeOn指定的是 上游發(fā)送事件 的線程,而observeOn指定的是 下游接收事件 的線程。
  • 多次調(diào)用subscribeOn只有第一次有效,而每調(diào)用一次observeOn,那么下游接收消息的線程就會(huì)切換一次。
  • CompositeDisposable可以用來容納Disposable對(duì)象,每當(dāng)我們得到一個(gè)Disposable對(duì)象時(shí),就通過add方法將它添加進(jìn)入容器,在退出的時(shí)候,調(diào)用clear方法,即可切斷所有的水管。

2.2 線程類型

  • Schedulers.io():代表IO操作,通常用于網(wǎng)絡(luò)請(qǐng)求、文件讀寫等IO密集型的操作。
  • Schedulers.computation():代表CPU密集型的操作,適用于大量計(jì)算。
  • Schedulers.newThread():創(chuàng)建新的常規(guī)線程。
  • AndroidSchedulers.mainThread():代表Android的主線程。

2.3 示例

在鏈?zhǔn)秸{(diào)用當(dāng)中,我們可以通過observeOn方法多次切換管道下游處理消息的線程,例如下面的代碼,我們對(duì)下游進(jìn)行了兩次線程的切換:

    static void mapSample() {
        Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
                Log.d(TAG, "observableEmitter's thread=" + Thread.currentThread().getId() + ",string=true");
                observableEmitter.onNext("true");
                Log.d(TAG, "observableEmitter's thread=" + Thread.currentThread().getId() + ",string=false");
                observableEmitter.onNext("false");
                Log.d(TAG, "observableEmitter's thread=" + Thread.currentThread().getId() + ",onComplete");
                observableEmitter.onComplete();
            }
        //1.指定了subscribe方法執(zhí)行的線程,并進(jìn)行第一次下游線程的切換,將其切換到新的子線程。   
        }).subscribeOn(Schedulers.io()).observeOn(Schedulers.newThread()).map(new Function<String, Boolean>() {

            @Override
            public Boolean apply(String s) throws Exception {
                Log.d(TAG, "apply's thread=" + Thread.currentThread().getId() + ",s=" + s);
                return "true".equals(s);
            }
        //2.進(jìn)行第二次下游線程的切換,將其切換到主線程。    
        }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Boolean>() {

            @Override
            public void onSubscribe(Disposable disposable) {

            }

            @Override
            public void onNext(Boolean aBoolean) {
                Log.d(TAG, "Observer's thread=" + Thread.currentThread().getId() + ",boolean=" + aBoolean);
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onComplete() {
                Log.d(TAG, "Observer's thread=" + Thread.currentThread().getId() + ",onComplete");
            }
        });
    }

以上代碼的運(yùn)行的結(jié)果為:


三、Map 和 FlatMap 操作符

3.1 Map

  • Map操作符的作用是對(duì)上游發(fā)送的每一個(gè)事件應(yīng)用一個(gè)函數(shù),使得每個(gè)事件按照函數(shù)的邏輯進(jìn)行變換,通過Map就可以把上游發(fā)送的每一個(gè)事件,轉(zhuǎn)換成Object或者集合,其英文注釋為:
  • 以下面使用map的代碼為例,可以看到map接收一個(gè)Function類,它有兩個(gè)泛型變量,分別為調(diào)用map方法的Observable<T><T>泛型,和返回的Obervable<R><R>泛型。
    public static void mapVerify() {
        Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
            }
        });
        Observable<String> convertObservable = sourceObservable.map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return integer.toString();
            }
        });
        Log.d(TAG, "sourceObservable=" + sourceObservable + "\n convertObservable=" + convertObservable);
    }

Function為一個(gè)接口:


并且在map函數(shù)調(diào)用完畢之后,將返回一個(gè)新的Observable,它的類型為ObservableMap

3.2 FlatMap

  • FlatMap用于將一個(gè)發(fā)送事件的上游Observable變換成多個(gè)發(fā)送事件的Observable,然后將它們發(fā)送的事件合并,放進(jìn)一個(gè)單獨(dú)的Observable中,其注釋為:
  • 上游每發(fā)送一個(gè)事件,就會(huì)針對(duì)該事件創(chuàng)建一個(gè)單獨(dú)的水管,然后發(fā)送轉(zhuǎn)換后的新的事件,下游接收到的就是這些新的水管發(fā)送的事件。
  • FlatMap不保證不同水管之間事件的順序,如果需要保證順序,則需要使用contactMap。

3.2.1 示例

    static void flatMapSample() {
        Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                observableEmitter.onNext(1);
                observableEmitter.onNext(2);
                observableEmitter.onNext(3);
            }
        });
        Observable<String> flatObservable = sourceObservable.flatMap(new Function<Integer, ObservableSource<String>>() {

            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                return Observable.fromArray("a value of " + integer + ",b value of " + integer);
            }
        });
        flatObservable.subscribe(new Consumer<String>() {

            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        });
    }

map操作符類似,它也接收一個(gè)類型為Function的接口,只不過它的? extends R參數(shù)類型換成了? extends Observable<? extends R>

3.2.2 FlatMap 不保證下游接收事件的順序

前面我們說到,flatMap操作符不會(huì)保證下游接收事件的順序,下面,我們就以一個(gè)例子來說明,在flatMapapply函數(shù)中,我們將一個(gè)事件轉(zhuǎn)換成兩個(gè)Observable,并且加上了延時(shí):

    static void flatMapOrderSample() {
        Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                Log.d(TAG, "flatMapOrderSample emit 1");
                observableEmitter.onNext(1);
                Log.d(TAG, "flatMapOrderSample emit 2");
                observableEmitter.onNext(2);
                Log.d(TAG, "flatMapOrderSample emit 3");
                observableEmitter.onNext(3);
            }
        });
        Observable<String> flatObservable = sourceObservable.flatMap(new Function<Integer, ObservableSource<String>>() {

            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                Log.d(TAG, "flatMapOrderSample apply=" + integer);
                long delay = (3 - integer) * 100;
                return Observable.fromArray("a value of " + integer, "b value of " + integer).delay(delay, TimeUnit.MILLISECONDS);
            }
        });
        flatObservable.subscribe(new Consumer<String>() {

            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        });
    }

可以看到,最終的輸出結(jié)果和flatMap收到事件的順序并不相同:


下面,還是同樣的場(chǎng)景,將flatMap換成contactMap

    static void contactMapOrderSample() {
        Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                Log.d(TAG, "contactMapOrderSample emit 1");
                observableEmitter.onNext(1);
                Log.d(TAG, "contactMapOrderSample emit 1");
                observableEmitter.onNext(2);
                Log.d(TAG, "contactMapOrderSample emit 1");
                observableEmitter.onNext(3);
            }
        });
        Observable<String> flatObservable = sourceObservable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).concatMap(new Function<Integer, ObservableSource<String>>() {

            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception {
                Log.d(TAG, "contactMapOrderSample apply=" + integer);
                long delay = (3 - integer) * 100;
                return Observable.fromArray("a value of " + integer, "b value of " + integer).delay(delay, TimeUnit.MILLISECONDS);
            }
        });
        flatObservable.subscribe(new Consumer<String>() {

            @Override
            public void accept(String s) throws Exception {
                Log.d(TAG, s);
            }
        });
    }

最終的運(yùn)行結(jié)果為:


四、Zip 操作符

4.1 基本概念

  • Zip通過一個(gè)函數(shù)從多個(gè)Observable每次各取出一個(gè)事件,合并成一個(gè)新的事件發(fā)送給下游。
  • 組合的順序是嚴(yán)格按照事件發(fā)送的順序來的。
  • 最終下游收到的事件數(shù)量和上游中發(fā)送事件最少的那一根水管的事件數(shù)量相同。

4.1.1 兩個(gè) Observable 運(yùn)行在同一線程當(dāng)中

    static void zipSample() {
        Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                Log.d(TAG, "sourceObservable emit 1");
                observableEmitter.onNext(1);
                Thread.sleep(1000);
                Log.d(TAG, "sourceObservable emit 2");
                observableEmitter.onNext(2);
                Log.d(TAG, "sourceObservable emit 3");
                observableEmitter.onNext(3);
                Log.d(TAG, "sourceObservable emit 4");
                observableEmitter.onNext(4);
            }
        });
        Observable<Integer> otherObservable = Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                Log.d(TAG, "otherObservable emit 1");
                observableEmitter.onNext(1);
                Log.d(TAG, "otherObservable emit 2");
                observableEmitter.onNext(2);
                Log.d(TAG, "otherObservable emit 3");
                observableEmitter.onNext(3);
            }
        });
        Observable.zip(sourceObservable, otherObservable, new BiFunction<Integer, Integer, Integer>() {

            @Override
            public Integer apply(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }

        }).subscribe(new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable disposable) {
                Log.d(TAG, "resultObservable onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "resultObservable onNext=" + integer);
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "resultObservable onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "resultObservable onComplete");

            }
        });
    }

此時(shí)的運(yùn)行結(jié)果為:


4.1.2 兩個(gè) Observable 運(yùn)行在不同的線程

    static void zipSample() {
        Observable<Integer> sourceObservable = Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                Log.d(TAG, "sourceObservable emit 1");
                observableEmitter.onNext(1);
                Thread.sleep(1000);
                Log.d(TAG, "sourceObservable emit 2");
                observableEmitter.onNext(2);
                Log.d(TAG, "sourceObservable emit 3");
                observableEmitter.onNext(3);
                Log.d(TAG, "sourceObservable emit 4");
                observableEmitter.onNext(4);
            }
        });
        Observable<Integer> otherObservable = Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                Log.d(TAG, "otherObservable emit 1");
                observableEmitter.onNext(1);
                Log.d(TAG, "otherObservable emit 2");
                observableEmitter.onNext(2);
                Log.d(TAG, "otherObservable emit 3");
                observableEmitter.onNext(3);
            }
        }).subscribeOn(Schedulers.io());
        Observable.zip(sourceObservable, otherObservable, new BiFunction<Integer, Integer, Integer>() {

            @Override
            public Integer apply(Integer integer, Integer integer2) throws Exception {
                return integer + integer2;
            }

        }).subscribe(new Observer<Integer>() {

            @Override
            public void onSubscribe(Disposable disposable) {
                Log.d(TAG, "resultObservable onSubscribe");
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "resultObservable onNext=" + integer);
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "resultObservable onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "resultObservable onComplete");

            }
        });
    }

運(yùn)行結(jié)果為:


五、背壓

“背壓”其實(shí)就是一種用于解決問題的工具,那么我們的問題又是什么呢?

  • 問題:當(dāng)上游發(fā)送事件的速度很快,下游消費(fèi)事件的速度又很慢,而系統(tǒng)又必須緩存這些上游發(fā)送的消息以便下游處理,那么就會(huì)導(dǎo)致系統(tǒng)中堆積了很多的資源。
  • 工具:下游告知上游目前自己的處理能力,上游根據(jù)下游的處理能力,進(jìn)行適當(dāng)?shù)恼{(diào)整。

想必大家在很多文章中都聽過這個(gè)一句話:在RxJava2中,Observable不支持“背壓”,而Flowable支持背壓。

5.1 不支持背壓的 Observable

關(guān)于Observable不支持背壓,我們應(yīng)當(dāng)從兩種情況去考慮,即上游、下游是否位于相同的線程。

5.1.1 Observable 之上游、下游位于相同線程

首先,我們不調(diào)用observeOnsubscribeOn方法來改變上游、下游的工作線程,這樣,上游和下游就位于同一線程,同時(shí),我們?cè)谙掠蔚奶幚砗瘮?shù)中,每收到一個(gè)消息就休眠2000ms,以模擬上游處理速度大于下游的場(chǎng)景。

    static void oomSample() {
        Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                for (int i = 0; i < 1000; i++) {
                    Log.d(TAG, "observableEmitter=" + i);
                    observableEmitter.onNext(i);
                }
            }
        }).subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer integer) throws Exception {
                Thread.sleep(2000);
                Log.d(TAG, "accept=" + integer);
            }

        });
    }

從下面的打印結(jié)果可以看到,當(dāng)“使用 Observable,并且上游、下游位于相同線程”時(shí),并不會(huì)出現(xiàn)消息堆積的情況,因?yàn)樯嫌伟l(fā)射完一條消息后,必須要等到下游處理完該消息,才會(huì)發(fā)射一條新的消息。

5.1.2 Observable 之上游、下游位于不同線程

接著,我們采用subscribeOnobserveOn來使得上游和下游位于不同的工作線程,其它均和2.2中相同。

    static void oomSample() {
        Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(ObservableEmitter<Integer> observableEmitter) throws Exception {
                for (int i = 0; i < 1000; i++) {
                    Log.d(TAG, "observableEmitter=" + i);
                    observableEmitter.onNext(i);
                }
            }
        }).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer integer) throws Exception {
                Thread.sleep(2000);
                Log.d(TAG, "accept=" + integer);
            }

        });
    }

2.2中不同,當(dāng)上游和下游位于不同的工作線程,那么上游發(fā)送消息時(shí),不會(huì)考慮下游是否已經(jīng)處理了之前的消息,它會(huì)直接發(fā)送,而這些發(fā)送的消息被存放在水缸當(dāng)中,下游每處理完一條消息,就去水缸中取下一條數(shù)據(jù),那么隨著水缸中數(shù)據(jù)越來越多,那么系統(tǒng)中的無用資源就會(huì)急劇增加。

5.1.3 關(guān)于 Observable 不支持背壓的小結(jié)

我們之所以說Observable不支持“背壓”,就是在2.1介紹的整個(gè)族譜中,沒有一個(gè)類,一種方法能讓下游通知上游說:不要再發(fā)消息到水缸里了,我已經(jīng)處理不過來了!

那是不是說Flowable支持“背壓”,而Observable不支持,那么Observable就要被取代了呢,其實(shí)不然,Flowable對(duì)于“背壓”的支持是以性能為代價(jià)的,我們應(yīng)當(dāng)只在有可能出現(xiàn)2.3中上游下游速率不匹配的問題時(shí),才去使用Flowable,否則就應(yīng)當(dāng)使用Observable,也就是滿足兩點(diǎn)條件:

  • 上游和下游位于不同的工作線程
  • 上游發(fā)送消息的速度,要遠(yuǎn)遠(yuǎn)大于下游處理消息的速度,有可能造成消息的堆積。

5.2 支持背壓的 Flowable

5.2.1 基本概念

  • FlowableSubscriber分別對(duì)應(yīng)于之前討論的ObservableObserver,它們直接的連接仍然是通過subscribe方法。
  • Flowable在設(shè)計(jì)的時(shí)候采用了 響應(yīng)式拉取 的思想,當(dāng)下游調(diào)用了Subscriptionrequest方法時(shí),就表明了下游處理事件的能力,這樣上游就可以根據(jù)這個(gè)值來控制事件發(fā)送的頻率,避免出現(xiàn)前面談到的上游發(fā)送太快,而下游處理太慢從而導(dǎo)致OOM的發(fā)生。
  • 只有上游根據(jù)下游的處理能力來發(fā)送事件,才能達(dá)到理想的效果。

5.2.2 基本使用

    static void flowSample() {
        Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {

            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }

        }, BackpressureStrategy.ERROR);

        sourceFlow.subscribe(new Subscriber<Integer>() {

            @Override
            public void onSubscribe(Subscription subscription) {
                Log.d(TAG, "onSubscribe");
                subscription.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext=" + integer);
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }

其類結(jié)構(gòu)圖和Observable幾乎完全一致:

5.3 Flowable 支持背壓的策略

從上面的類圖可以看出,FlowableObservable最大的不同,就是在create方法中,需要傳入額外的參數(shù),它表示的是“背壓”的策略,這里可選的值包括:

  • ERROR
  • BUFFER
  • DROP
  • LATEST

5.3.1 使用 ERROR 的策略

  • 當(dāng)上游和下游位于同一個(gè)線程時(shí),如果上游發(fā)送的事件超過了下游聲明的request(n)的值,那么會(huì)拋出MissingBackpressureException異常。
  • 當(dāng)上游和下游位于不同線程時(shí),如果上游發(fā)送的事件超過了下游的聲明,事件會(huì)被放在水缸當(dāng)中,這個(gè)水缸默認(rèn)的大小是128,只有當(dāng)下游調(diào)用request時(shí),才從水缸中取出事件發(fā)送給下游,如果水缸中事件的個(gè)數(shù)超過了128,那么也會(huì)拋出MissingBackpressureException異常。

下面這段代碼,我們先將三個(gè)事件放入到水缸當(dāng)中,之后每次調(diào)用request方法就會(huì)從水缸當(dāng)中取出一個(gè)事件發(fā)送給下游。

   static void flowSample() {
        Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {

            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();
            }

        }, BackpressureStrategy.ERROR).subscribeOn(Schedulers.io());

        sourceFlow.observeOn(Schedulers.newThread()).subscribe(new Subscriber<Integer>() {

            @Override
            public void onSubscribe(Subscription subscription) {
                Log.d(TAG, "onSubscribe");
                sSubscription = subscription;
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext=" + integer);
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }

    static void clickSubscription() {
        if (sSubscription != null) {
            sSubscription.request(1);
        }
    }

當(dāng)上游和下游位于不同的線程,每次通過Subscription調(diào)用request就會(huì)從水缸中取出一個(gè)事件,發(fā)送給下游:

5.3.2 BUFFER 策略

  • 使用BUFFER策略時(shí),相當(dāng)于在上游放置了一個(gè)容量無限大的水缸,所有下游暫時(shí)無法處理的消息都放在水缸當(dāng)中,這里不再像ERROR策略一樣,區(qū)分上游和下游是否位于同一線程。
  • 因此,如果下游一直沒有處理消息,那么將會(huì)導(dǎo)致內(nèi)存一直增長,從而引起OOM
    static void clickSubscription() {
        if (sSubscription != null) {
            sSubscription.request(10);
        }
    }

    static void flowBufferSample() {
        Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {

            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 10000;i ++) {
                    emitter.onNext(i);
                }
                emitter.onComplete();
            }

        }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io());

        sourceFlow.observeOn(Schedulers.newThread()).subscribe(new Subscriber<Integer>() {

            @Override
            public void onSubscribe(Subscription subscription) {
                Log.d(TAG, "onSubscribe");
                sSubscription = subscription;
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext=" + integer);
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }

在上面的例子中,我們先把10000條消息放入到水缸當(dāng)中,之后通過Subscription每次從水缸中取出10條消息發(fā)送給下游,演示結(jié)果為:

5.3.3 DROP 策略

  • 使用DROP策略時(shí),會(huì)把水缸無法存放的事件丟棄掉,這里同樣不會(huì)受到下游和下游是否處于同一個(gè)線程的限制。
    static void flowDropSample() {
        Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {

            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 130; i++) {
                    emitter.onNext(i);
                }
            }

        }, BackpressureStrategy.DROP).subscribeOn(Schedulers.io());

        sourceFlow.observeOn(Schedulers.io()).subscribe(new Subscriber<Integer>() {

            @Override
            public void onSubscribe(Subscription subscription) {
                Log.d(TAG, "onSubscribe");
                sSubscription = subscription;
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext=" + integer);
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }

我們先往水缸中放入130條消息,之后每次通過Subscription取出60條消息發(fā)送給下游,可以看到,最后最多只取到了第128條消息,第129/130條消息被丟棄了。

5.3.4 LATEST 策略

  • DROP類似,當(dāng)水缸無法容納下消息時(shí),會(huì)將它丟棄,但是除此之外,上游還會(huì)緩存最新的一條消息,實(shí)例如下:
    static void flowLatestSample() {
        Flowable<Integer> sourceFlow = Flowable.create(new FlowableOnSubscribe<Integer>() {

            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 130; i++) {
                    emitter.onNext(i);
                }
            }

        }, BackpressureStrategy.LATEST).subscribeOn(Schedulers.io());

        sourceFlow.observeOn(Schedulers.io()).subscribe(new Subscriber<Integer>() {

            @Override
            public void onSubscribe(Subscription subscription) {
                Log.d(TAG, "onSubscribe");
                sSubscription = subscription;
            }

            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext=" + integer);
            }

            @Override
            public void onError(Throwable throwable) {
                Log.d(TAG, "onError");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        });
    }

從下面的運(yùn)行結(jié)果可以看出,當(dāng)取出最后一批數(shù)據(jù)的時(shí)候,上游除了收到存儲(chǔ)在水缸當(dāng)中的數(shù)據(jù),還額外收到了最后一條消息,也就是第130條數(shù)據(jù),這就是DROP策略和LATEST策略的區(qū)別:


更多文章,歡迎訪問我的 Android 知識(shí)梳理系列:

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容