5章 RxJava背壓策略

本篇文章已授權(quán)微信公眾號(hào) YYGeeker 獨(dú)家發(fā)布轉(zhuǎn)載請(qǐng)標(biāo)明出處

CSDN學(xué)院課程地址

5. RxJava背壓策略(BackpressureStrategy)

5.1 背壓是什么

背壓的概念是在平時(shí)業(yè)務(wù)開發(fā)時(shí)較為常見,大多數(shù)是針對(duì)高并發(fā)的業(yè)務(wù),背壓是必須考慮的因素之一。在異步場(chǎng)景中,由于數(shù)據(jù)流的發(fā)射速度高于數(shù)據(jù)流的接收速度,就會(huì)導(dǎo)致數(shù)據(jù)不能及時(shí)處理,從而導(dǎo)致數(shù)據(jù)流的阻塞。背壓所要做的事情就是主動(dòng)控制數(shù)據(jù)流發(fā)射的速度

在RxJava2.0中,推出了Flowable用來支持背壓,去除了Observable對(duì)背壓的支持,下面在背壓策略的講解中,我們都使用Flowable作為我們的響應(yīng)類型。在使用背壓時(shí),只需要在create()方法中第二個(gè)參數(shù)添加背壓策略即可

  1. 在訂閱的時(shí)候如果使用FlowableSubscriber,那么需要通過s.request(Long.MAX_VALUE)去主動(dòng)請(qǐng)求上游的數(shù)據(jù)項(xiàng)。如果遇到背壓報(bào)錯(cuò)的時(shí)候,FlowableSubscriber默認(rèn)已經(jīng)將錯(cuò)誤try-catch,并通過onError()進(jìn)行回調(diào),程序并不會(huì)崩潰
  2. 在訂閱的時(shí)候如果使用Consumer,那么不需要主動(dòng)去請(qǐng)求上游數(shù)據(jù),默認(rèn)已經(jīng)調(diào)用了s.request(Long.MAX_VALUE)。如果遇到背壓報(bào)錯(cuò)、且對(duì)Throwable的Consumer沒有new出來,則程序直接崩潰
  3. 背壓策略的上游的默認(rèn)緩存池是128
public abstract class Flowable<T> implements Publisher<T> {
    /** The default buffer size. */
    static final int BUFFER_SIZE;
    static {
        BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));
    }
}

5.2 MISSING

MISSING表示OnNext事件沒有任何緩存和丟棄,下游要處理任何溢出,可以理解為相當(dāng)于沒有指定背壓策略。Flowable相當(dāng)于沒有指定背壓策略可以將下游要處理任何溢出理解為,上游發(fā)射的數(shù)據(jù)未得到處理,就會(huì)緩存起來,當(dāng)緩存容量達(dá)到128時(shí),再增加一個(gè)未處理的數(shù)據(jù)項(xiàng),就會(huì)拋出MissingBackpressureException,且?guī)в嘘?duì)列已經(jīng)滿了的友好提示。這里就好比一個(gè)大水缸,當(dāng)水注滿的時(shí)候,它就會(huì)把蓋子蓋上,不讓你再繼續(xù)注水了

這里我們模擬上游發(fā)送速度高于下游數(shù)據(jù)流的處理速度,在數(shù)據(jù)處理的時(shí)候加上Thread.sleep(1000)

public void missing() {
    Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
            for (int i = 0; i < 129; i++) {
                emitter.onNext(i);
            }
        }
    }, BackpressureStrategy.MISSING)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new FlowableSubscriber<Integer>() {
                @Override
                public void onSubscribe(Subscription s) {
                    s.request(Long.MAX_VALUE);
                }

                @Override
                public void onNext(Integer integer) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    Log.e("TAG", "onNext=" + integer);
                }

                @Override
                public void onError(Throwable t) {
                    t.printStackTrace();
                }

                @Override
                public void onComplete() {

                }
            });
}

輸出

io.reactivex.exceptions.MissingBackpressureException: Queue is full?!

5.3 ERROR

ERROR表示在下游無法跟上時(shí),會(huì)拋出MissingBackpressureException??梢詫⑾掠螣o法跟上理解為,上游發(fā)射的數(shù)據(jù)未得到處理,就會(huì)緩存起來,當(dāng)緩存容量達(dá)到128時(shí),再增加一個(gè)未處理的數(shù)據(jù)項(xiàng),就會(huì)拋出MissingBackpressureException。這里好比一個(gè)大水缸,當(dāng)水注滿的時(shí)候,它會(huì)把水缸撐破了,直接破裂

public void error() {
    Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
            for (int i = 0; i < 129; i++) {
                emitter.onNext(i);
            }
        }
    }, BackpressureStrategy.ERROR)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new FlowableSubscriber<Integer>() {
                @Override
                public void onSubscribe(Subscription s) {
                    s.request(Long.MAX_VALUE);
                }

                @Override
                public void onNext(Integer integer) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    Log.e("TAG", "onNext=" + integer);
                }

                @Override
                public void onError(Throwable t) {
                    t.printStackTrace();
                }

                @Override
                public void onComplete() {

                }
            });
}

輸出

io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests

5.4 BUFFER

上游不斷的發(fā)出onNext請(qǐng)求,直到下游處理完,上游發(fā)射的數(shù)據(jù)項(xiàng)的緩存池是無限大的,程序也不會(huì)拋出錯(cuò)誤,但是要注意程序OOM的現(xiàn)象,因?yàn)榫彺嬖酱?,占用的?nèi)存就越多。例子中發(fā)射129個(gè)數(shù)據(jù)項(xiàng),然而程序并沒有崩潰,只會(huì)一直讀取緩存池的數(shù)據(jù)項(xiàng),直到數(shù)據(jù)項(xiàng)被處理完。這里就是一個(gè)無限大的水缸

背壓策略除了BUFFER策略的緩存池是無限大之外,其他默認(rèn)的緩存池都是128

public void buffer() {
    Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
            for (int i = 0; i < 1000; i++) {
                emitter.onNext(i);
            }
        }
    }, BackpressureStrategy.BUFFER)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new FlowableSubscriber<Integer>() {
                @Override
                public void onSubscribe(Subscription s) {
                    s.request(Long.MAX_VALUE);
                }

                @Override
                public void onNext(Integer integer) {
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    Log.e("TAG", "onNext=" + integer);
                }

                @Override
                public void onError(Throwable t) {
                    t.printStackTrace();
                }

                @Override
                public void onComplete() {

                }
            });
}

輸出

onNext=0
onNext=1
onNext=2
......
onNext=998
onNext=999

5.5 DROP

會(huì)在下游跟不上速度時(shí),把onNext的值丟棄,簡(jiǎn)單的說就是,超過緩存區(qū)大?。?28)的數(shù)據(jù)項(xiàng)都會(huì)被丟棄。例子中通過發(fā)射800個(gè)數(shù)據(jù)項(xiàng),那么我們只會(huì)收到0-127的數(shù)據(jù)項(xiàng)。如果我們?cè)俅握{(diào)用request(),這時(shí)候取到的數(shù)據(jù)就是上一次request()后的128個(gè)數(shù)據(jù)。這里好比一個(gè)大水缸,當(dāng)水注滿的時(shí)候,水還是在繼續(xù)的流,一旦有request調(diào)用的時(shí)候,它就會(huì)去取出水缸里的所有水,這時(shí)候水缸就是空的,但水一直在流,所以水缸馬上又會(huì)被注滿,這個(gè)時(shí)候就要等request再次取出水缸里的水

public void drop() {
    Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
            for (int i = 0; i < 1000; i++) {
                emitter.onNext(i);
            }
        }
    }, BackpressureStrategy.DROP)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new FlowableSubscriber<Integer>() {
                @Override
                public void onSubscribe(Subscription s) {
                    s.request(Long.MAX_VALUE);
                }

                @Override
                public void onNext(Integer integer) {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    Log.e("TAG", "onNext=" + integer);
                }

                @Override
                public void onError(Throwable t) {
                    t.printStackTrace();
                }

                @Override
                public void onComplete() {

                }
            });
}

輸出

onNext=0
onNext=1
onNext=2
......
onNext=127

5.6 LATEST

LATEST與Drop策略一樣,如果超過緩存池容量大小的數(shù)據(jù)項(xiàng)都會(huì)被丟棄。不同的是,不管緩存池的狀態(tài)如何,LATEST都會(huì)將最后一條數(shù)據(jù)強(qiáng)行放入緩存池中。這里的水缸容納下了最后一滴水

public void latest() {
    Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
            for (int i = 0; i < 1000; i++) {
                emitter.onNext(i);
            }
        }
    }, BackpressureStrategy.LATEST)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new FlowableSubscriber<Integer>() {
                @Override
                public void onSubscribe(Subscription s) {
                    s.request(Long.MAX_VALUE);
                }

                @Override
                public void onNext(Integer integer) {
                    try {
                        Thread.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    Log.e("TAG", "onNext=" + integer);
                }

                @Override
                public void onError(Throwable t) {
                    t.printStackTrace();
                }

                @Override
                public void onComplete() {

                }
            });
}

輸出

onNext=0
onNext=1
......
onNext=126
onNext=127
onNext=999

5.7 小結(jié)

  1. MISSING:沒有任何緩存和丟棄,下游要處理任何溢出
  2. ERROR:下游的處理速度無法跟上上游的發(fā)射速度時(shí)報(bào)錯(cuò)
  3. BUFFER:數(shù)據(jù)項(xiàng)的緩存池?zé)o限大
  4. DROP:下游的處理速度無法跟上上游的發(fā)射速度時(shí)丟棄
  5. LATEST:最后一條數(shù)據(jù)項(xiàng)被強(qiáng)行放入緩存池
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容