Rxjava2入門教程五:Flowable背壓支持——對(duì)Flowable最全面而詳細(xì)的講解

如需下載源碼,請(qǐng)?jiān)L問(wèn)
https://github.com/fengchuanfang/Rxjava2Tutorial
文章原創(chuàng),轉(zhuǎn)載請(qǐng)注明出處:
Rxjava2入門教程五:Flowable背壓支持——對(duì)Flowable最全面而詳細(xì)的講解


背壓(backpressure)

當(dāng)上下游在不同的線程中,通過(guò)Observable發(fā)射,處理,響應(yīng)數(shù)據(jù)流時(shí),如果上游發(fā)射數(shù)據(jù)的速度快于下游接收處理數(shù)據(jù)的速度,這樣對(duì)于那些沒(méi)來(lái)得及處理的數(shù)據(jù)就會(huì)造成積壓,這些數(shù)據(jù)既不會(huì)丟失,也不會(huì)被垃圾回收機(jī)制回收,而是存放在一個(gè)異步緩存池中,如果緩存池中的數(shù)據(jù)一直得不到處理,越積越多,最后就會(huì)造成內(nèi)存溢出,這便是響應(yīng)式編程中的背壓(backpressure)問(wèn)題。
例如,運(yùn)行以下代碼:

   public void demo1() {
        Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                        int i = 0;
                        while (true) {
                            i++;
                            e.onNext(i);
                        }
                    }
                })
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.newThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Thread.sleep(5000);
                        System.out.println(integer);
                    }
                });
    }

創(chuàng)建一個(gè)可觀察對(duì)象Observable在Schedulers.newThread()的線程中不斷發(fā)送數(shù)據(jù),而觀察者Observer在Schedulers.newThread()的另一個(gè)線程中每隔5秒接收打印一條數(shù)據(jù)。
運(yùn)行后,查看內(nèi)存使用如下:

backpressure.gif

由于上游通過(guò)Observable發(fā)射數(shù)據(jù)的速度大于下游通過(guò)Consumer接收處理數(shù)據(jù)的速度,而且上下游分別運(yùn)行在不同的線程中,下游對(duì)數(shù)據(jù)的接收處理不會(huì)堵塞上游對(duì)數(shù)據(jù)的發(fā)射,造成上游數(shù)據(jù)積壓,內(nèi)存不斷增加,最后便會(huì)導(dǎo)致內(nèi)存溢出。

Flowable

既然在函數(shù)響應(yīng)式編程中會(huì)產(chǎn)生背壓(backpressure)問(wèn)題,那么在函數(shù)響應(yīng)式編程中就應(yīng)該有解決方案。
Rxjava2相對(duì)于Rxjava1最大的更新就是把對(duì)背壓?jiǎn)栴}的處理邏輯從Observable中抽取出來(lái)產(chǎn)生了新的可觀察對(duì)象Flowable。

在Rxjava2中,F(xiàn)lowable可以看做是為了解決背壓?jiǎn)栴},在Observable的基礎(chǔ)上優(yōu)化后的產(chǎn)物,與Observable不處在同一組觀察者模式下,Observable是ObservableSource/Observer這一組觀察者模式中ObservableSource的典型實(shí)現(xiàn),而Flowable是Publisher與Subscriber這一組觀察者模式中Publisher的典型實(shí)現(xiàn)。

所以在使用Flowable的時(shí)候,可觀察對(duì)象不再是Observable,而是Flowable;觀察者不再是Observer,而是Subscriber。Flowable與Subscriber之間依然通過(guò)subscribe()進(jìn)行關(guān)聯(lián)。

雖然在Rxjava2中,F(xiàn)lowable是在Observable的基礎(chǔ)上優(yōu)化后的產(chǎn)物,Observable能解決的問(wèn)題Flowable也都能解決,但是并不代表Flowable可以完全取代Observable,在使用的過(guò)程中,并不能拋棄Observable而只用Flowable。

由于基于Flowable發(fā)射的數(shù)據(jù)流,以及對(duì)數(shù)據(jù)加工處理的各操作符都添加了背壓支持,附加了額外的邏輯,其運(yùn)行效率要比Observable慢得多。

只有在需要處理背壓?jiǎn)栴}時(shí),才需要使用Flowable。

由于只有在上下游運(yùn)行在不同的線程中,且上游發(fā)射數(shù)據(jù)的速度大于下游接收處理數(shù)據(jù)的速度時(shí),才會(huì)產(chǎn)生背壓?jiǎn)栴};
所以,如果能夠確定:
1、上下游運(yùn)行在同一個(gè)線程中,
2、上下游工作在不同的線程中,但是下游處理數(shù)據(jù)的速度不慢于上游發(fā)射數(shù)據(jù)的速度,
3、上下游工作在不同的線程中,但是數(shù)據(jù)流中只有一條數(shù)據(jù)
則不會(huì)產(chǎn)生背壓?jiǎn)栴},就沒(méi)有必要使用Flowable,以免影響性能。

類似于Observable,在使用Flowable時(shí),也可以通過(guò)create操作符創(chuàng)建發(fā)射數(shù)據(jù)流,代碼如下:

public void demo2() {
        Flowable
                .create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                        System.out.println("發(fā)射----> 1");
                        e.onNext(1);
                        System.out.println("發(fā)射----> 2");
                        e.onNext(2);
                        System.out.println("發(fā)射----> 3");
                        e.onNext(3);
                        System.out.println("發(fā)射----> 完成");
                        e.onComplete();
                    }
                }, BackpressureStrategy.BUFFER) //create方法中多了一個(gè)BackpressureStrategy類型的參數(shù)
                .subscribeOn(Schedulers.newThread())//為上下游分別指定各自的線程
                .observeOn(Schedulers.newThread())
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {   //onSubscribe回調(diào)的參數(shù)不是Disposable而是Subscription
                        s.request(Long.MAX_VALUE);            //注意此處,暫時(shí)先這么設(shè)置
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println("接收----> " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("接收----> 完成");
                    }
                });
    }

運(yùn)行結(jié)果如下:

System.out: 發(fā)射----> 1
System.out: 發(fā)射----> 2
System.out: 發(fā)射----> 3
System.out: 發(fā)射----> 完成
System.out: 接收----> 1
System.out: 接收----> 2
System.out: 接收----> 3
System.out: 接收----> 完成

發(fā)射與處理數(shù)據(jù)流在形式上與Observable大同小異,發(fā)射器中均有onNext,onError,onComplete方法,訂閱器中也均有onSubscribe,onNext,onError,onComplete方法。

但是在細(xì)節(jié)方面還是有三點(diǎn)不同
一、create方法中多了一個(gè)BackpressureStrategy類型的參數(shù)。
二、訂閱器Subscriber中,方法onSubscribe回調(diào)的參數(shù)不是Disposable而是Subscription,多了行代碼:

s.request(Long.MAX_VALUE);

三、Flowable發(fā)射數(shù)據(jù)時(shí),使用特有的發(fā)射器FlowableEmitter,不同于Observable的ObservableEmitter

正是這三點(diǎn)不同賦予了Flowable不同于Observable的特性。

BackpressureStrategy背壓策略

在通過(guò)create操作符創(chuàng)建Flowable時(shí),多了一個(gè)BackpressureStrategy類型的參數(shù),BackpressureStrategy是個(gè)枚舉,源碼如下:

package io.reactivex;

/**
 * Represents the options for applying backpressure to a source sequence.
 */
public enum BackpressureStrategy {
    /**
     * OnNext events are written without any buffering or dropping.
     * Downstream has to deal with any overflow.
     * <p>Useful when one applies one of the custom-parameter onBackpressureXXX operators.
     */
    MISSING,
    /**
     * Signals a MissingBackpressureException in case the downstream can't keep up.
     */
    ERROR,
    /**
     * Buffers <em>all</em> onNext values until the downstream consumes it.
     */
    BUFFER,
    /**
     * Drops the most recent onNext value if the downstream can't keep up.
     */
    DROP,
    /**
     * Keeps only the latest onNext value, overwriting any previous value if the
     * downstream can't keep up.
     */
    LATEST
}

當(dāng)上游發(fā)送數(shù)據(jù)的速度快于下游接收數(shù)據(jù)的速度,且運(yùn)行在不同的線程中時(shí),F(xiàn)lowable通過(guò)自身特有的異步緩存池,來(lái)緩存沒(méi)來(lái)得及處理的數(shù)據(jù),緩存池的容量上限為128,在Flowable源碼的開(kāi)頭即可看到

  /** The default buffer size. */
    static final int BUFFER_SIZE;
    static {
        BUFFER_SIZE = Math.max(1, Integer.getInteger("rx2.buffer-size", 128));
    }

不同于Observable,其異步緩存沒(méi)有容量限制,對(duì)于沒(méi)來(lái)得及處理的數(shù)據(jù)可以一直向里面添加,數(shù)據(jù)過(guò)多就會(huì)產(chǎn)生內(nèi)存溢出(OOM)。

BackpressureStrategy的作用便是用來(lái)設(shè)置Flowable通過(guò)異步緩存池緩存數(shù)據(jù)的策略。在源碼FlowableCreate類中,可以看到五個(gè)泛型分別對(duì)應(yīng)五個(gè)java類

MISSING   ----> MissingEmitter
ERROR     ----> ErrorAsyncEmitter
DROP      ----> DropAsyncEmitter
LATEST    ----> LatestAsyncEmitter
BUFFER    ----> BufferAsyncEmitter

通過(guò)代理模式對(duì)原始的發(fā)射器進(jìn)行了包裝。

@Override
    public void subscribeActual(Subscriber<? super T> t) {
        BaseEmitter<T> emitter;

        switch (backpressure) {
            case MISSING: {
                emitter = new MissingEmitter<T>(t);
                break;
            }
            case ERROR: {
                emitter = new ErrorAsyncEmitter<T>(t);
                break;
            }
            case DROP: {
                emitter = new DropAsyncEmitter<T>(t);
                break;
            }
            case LATEST: {
                emitter = new LatestAsyncEmitter<T>(t);
                break;
            }
            default: {
                emitter = new BufferAsyncEmitter<T>(t, bufferSize());
                break;
            }
        }

        t.onSubscribe(emitter);
        try {
            source.subscribe(emitter);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            emitter.onError(ex);
        }
    }

ERROR

對(duì)應(yīng)于ErrorAsyncEmitter<T>類,在其源碼

static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
        private static final long serialVersionUID = 338953216916120960L;

        ErrorAsyncEmitter(Subscriber<? super T> actual) {
            super(actual);
        }

        @Override
        void onOverflow() {
            onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
        }

    }

onOverflow方法中可以看到
在此策略下,如果放入Flowable的異步緩存池中的數(shù)據(jù)超限了,則會(huì)拋出MissingBackpressureException異常。

運(yùn)行如下代碼:

public void demo3() {
        Flowable
                .create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                        for (int i = 1; i <= 129; i++) {
                            e.onNext(i);
                        }
                        e.onComplete();
                    }
                }, BackpressureStrategy.ERROR)
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.newThread())
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(Long.MAX_VALUE);            //注意此處,暫時(shí)先這么設(shè)置
                    }

                    @Override
                    public void onNext(Integer integer) {
                        try {
                            Thread.sleep(10000);
                        } catch (InterruptedException ignore) {
                        }
                        System.out.println(integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        t.printStackTrace();
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("接收----> 完成");
                    }
                });
    }

創(chuàng)建并通過(guò)Flowable發(fā)射129條數(shù)據(jù),Subscriber的onNext方法睡10秒之后再開(kāi)始接收,運(yùn)行后會(huì)發(fā)現(xiàn)控制臺(tái)打印如下異常:

W/System.err: io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
W/System.err:     at io.reactivex.internal.operators.flowable.FlowableCreate$ErrorAsyncEmitter.onOverflow(FlowableCreate.java:411)
W/System.err:     at io.reactivex.internal.operators.flowable.FlowableCreate$NoOverflowBaseAsyncEmitter.onNext(FlowableCreate.java:377)
W/System.err:     at net.fbi.rxjava2.RxJava2Demo$6.subscribe(RxJava2Demo.java:103)
W/System.err:     at io.reactivex.internal.operators.flowable.FlowableCreate.subscribeActual(FlowableCreate.java:72)
W/System.err:     at io.reactivex.Flowable.subscribe(Flowable.java:12218)
W/System.err:     at io.reactivex.internal.operators.flowable.FlowableSubscribeOn$SubscribeOnSubscriber.run(FlowableSubscribeOn.java:82)
W/System.err:     at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:59)
W/System.err:     at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:51)
W/System.err:     at java.util.concurrent.FutureTask.run(FutureTask.java:237)
W/System.err:     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:154)
W/System.err:     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:269)
W/System.err:     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1113)
W/System.err:     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:588)
W/System.err:     at java.lang.Thread.run(Thread.java:818)

如果將Flowable發(fā)射數(shù)據(jù)的條數(shù)改為128,則不會(huì)出現(xiàn)此異常。

DROP

對(duì)應(yīng)于DropAsyncEmitter<T>類,通過(guò)DropAsyncEmitter類和它父類NoOverflowBaseAsyncEmitter的源碼

    static final class DropAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
        private static final long serialVersionUID = 8360058422307496563L;

        DropAsyncEmitter(Subscriber<? super T> actual) {
            super(actual);
        }

        @Override
        void onOverflow() {
            // nothing to do
        }
    }
abstract static class NoOverflowBaseAsyncEmitter<T> extends BaseEmitter<T> {

        private static final long serialVersionUID = 4127754106204442833L;

        NoOverflowBaseAsyncEmitter(Subscriber<? super T> actual) {
            super(actual);
        }

        @Override
        public final void onNext(T t) {
            if (isCancelled()) {
                return;
            }

            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }

            if (get() != 0) {
                actual.onNext(t);
                BackpressureHelper.produced(this, 1);
            } else {
                onOverflow();
            }
        }

        abstract void onOverflow();
    }

可以看到,DropAsyncEmitter的onOverflow是個(gè)空方法,沒(méi)有執(zhí)行任何操作,父類的onNext中,在判斷get() != 0,即緩存池未滿的情況下,才會(huì)讓被代理類調(diào)用onNext方法。
所以在此策略下,如果Flowable的異步緩存池滿了,會(huì)丟掉上游發(fā)送的數(shù)據(jù)。
運(yùn)行如下代碼:

public void demo4() {
        Flowable
                .create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                        String threadName = Thread.currentThread().getName();
                        System.out.println(threadName + "開(kāi)始發(fā)射數(shù)據(jù)" + System.currentTimeMillis());
                        for (int i = 1; i <= 500; i++) {
                            System.out.println(threadName + "發(fā)射---->" + i);
                            e.onNext(i);
                            try {
                                Thread.sleep(100);//每隔100毫秒發(fā)射一次數(shù)據(jù)
                            } catch (Exception ex) {
                                e.onError(ex);
                            }
                        }
                        System.out.println(threadName + "發(fā)射數(shù)據(jù)結(jié)束" + System.currentTimeMillis());
                        e.onComplete();
                    }
                }, BackpressureStrategy.DROP)
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.newThread())
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(Long.MAX_VALUE);            //注意此處,暫時(shí)先這么設(shè)置
                    }

                    @Override
                    public void onNext(Integer integer) {
                        try {
                            Thread.sleep(300);//每隔300毫秒接收一次數(shù)據(jù)
                        } catch (InterruptedException ignore) {
                        }
                        System.out.println(Thread.currentThread().getName() + "接收---------->" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        t.printStackTrace();
                    }

                    @Override
                    public void onComplete() {
                        System.out.println(Thread.currentThread().getName() + "接收----> 完成");
                    }
                });
    }

通過(guò)創(chuàng)建Flowable發(fā)射500條數(shù)據(jù),每隔100毫秒發(fā)射一次,并記錄開(kāi)始發(fā)射和結(jié)束發(fā)射的時(shí)間,下游每隔300毫秒接收一次數(shù)據(jù),運(yùn)行后,控制臺(tái)打印日志如下:

GIF111.gif

通過(guò)日志

1.jpg

我們可以發(fā)現(xiàn)Subscriber在接收完第128條數(shù)據(jù)后,再次接收的時(shí)候已經(jīng)到了288,而這之間的160條數(shù)據(jù)正是因?yàn)榫彺娉貪M了而被丟棄掉了。
那么問(wèn)題來(lái)了,當(dāng)Flowable在發(fā)射第129條數(shù)據(jù)的時(shí)候,Subscriber已經(jīng)接收了42條數(shù)據(jù)了,第129條數(shù)據(jù)為什么沒(méi)有放入緩存池中呢?日志如下:


2.jpg

那是因?yàn)榫彺娉刂袛?shù)據(jù)的清理,并不是Subscriber接收一條,便清理一條,而是存在一個(gè)延遲,等累積一段時(shí)間后統(tǒng)一清理一次。也就是Subscriber接收到第96條數(shù)據(jù)時(shí),緩存池才開(kāi)始清理數(shù)據(jù),之后Flowable發(fā)射的數(shù)據(jù)才得以放入。

3.jpg

查看日志可以發(fā)現(xiàn),Subscriber接收到第96條數(shù)據(jù)后,F(xiàn)lowable發(fā)射第288條數(shù)據(jù)。而第128到288之間的數(shù)據(jù),正好處于緩存池存滿的狀態(tài),而被丟棄,所以Subscriber在接收完第128條數(shù)據(jù)之后,接收到的是第288條數(shù)據(jù),而不是第129條。

LATEST

對(duì)應(yīng)于LatestAsyncEmitter<T>類
與Drop策略一樣,如果緩存池滿了,會(huì)丟掉將要放入緩存池中的數(shù)據(jù),不同的是,不管緩存池的狀態(tài)如何,LATEST都會(huì)將最后一條數(shù)據(jù)強(qiáng)行放入緩存池中,來(lái)保證觀察者在接收到完成通知之前,能夠接收到Flowable最新發(fā)射的一條數(shù)據(jù)。
將上述代碼中的DROP策略改為L(zhǎng)ATEST:

public void demo5() {
        Flowable
                .create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                        String threadName = Thread.currentThread().getName();
                        System.out.println(threadName + "開(kāi)始發(fā)射數(shù)據(jù)" + System.currentTimeMillis());
                        for (int i = 1; i <= 500; i++) {
                            System.out.println(threadName + "發(fā)射---->" + i);
                            e.onNext(i);
                            try {
                                Thread.sleep(100);
                            } catch (Exception ignore) {
                            }
                        }
                        System.out.println(threadName + "發(fā)射數(shù)據(jù)結(jié)束" + System.currentTimeMillis());
                        e.onComplete();

                    }
                }, BackpressureStrategy.LATEST)
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.newThread())
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(Long.MAX_VALUE);            //注意此處,暫時(shí)先這么設(shè)置
                    }

                    @Override
                    public void onNext(Integer integer) {
                        try {
                            Thread.sleep(300);
                        } catch (InterruptedException ignore) {
                        }
                        System.out.println(Thread.currentThread().getName() + "接收---------->" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        t.printStackTrace();
                    }

                    @Override
                    public void onComplete() {
                        System.out.println(Thread.currentThread().getName() + "接收----> 完成");
                    }
                });
    }

運(yùn)行后日志對(duì)比如下:
DROP:


DROP.jpg

LATEST:

LATEST.jpg

latest策略下Subscriber在接收完成之前,接收的數(shù)據(jù)是Flowable發(fā)射的最后一條數(shù)據(jù),而Drop策略下不是。

BUFFER

Flowable處理背壓的默認(rèn)策略,對(duì)應(yīng)于BufferAsyncEmitter<T>類
其部分源碼為:

static final class BufferAsyncEmitter<T> extends BaseEmitter<T> {
        private static final long serialVersionUID = 2427151001689639875L;
        final SpscLinkedArrayQueue<T> queue;
        . . . . . .
        final AtomicInteger wip;
        BufferAsyncEmitter(Subscriber<? super T> actual, int capacityHint) {
            super(actual);
            this.queue = new SpscLinkedArrayQueue<T>(capacityHint);
            this.wip = new AtomicInteger();
        }
        . . . . . .
}

在其構(gòu)造方法中可以發(fā)現(xiàn),其內(nèi)部維護(hù)了一個(gè)緩存池SpscLinkedArrayQueue,其大小不限,此策略下,如果Flowable默認(rèn)的異步緩存池滿了,會(huì)通過(guò)此緩存池暫存數(shù)據(jù),它與Observable的異步緩存池一樣,可以無(wú)限制向里添加數(shù)據(jù),不會(huì)拋出MissingBackpressureException異常,但會(huì)導(dǎo)致OOM。
運(yùn)行如下代碼:

public void demo6() {
        Flowable
                .create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                        int i = 0;
                        while (true) {
                            i++;
                            e.onNext(i);
                        }
                    }
                }, BackpressureStrategy.BUFFER)
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.newThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        Thread.sleep(5000);
                        System.out.println(integer);
                    }
                });
    }

查看內(nèi)存使用:

GIF222.gif

會(huì)發(fā)現(xiàn)和使用Observable時(shí)一樣,都會(huì)導(dǎo)致內(nèi)存劇增,最后導(dǎo)致OOM,不同的是使用Flowable內(nèi)存增長(zhǎng)的速度要慢得多,那是因?yàn)榛贔lowable發(fā)射的數(shù)據(jù)流,以及對(duì)數(shù)據(jù)加工處理的各操作符都添加了背壓支持,附加了額外的邏輯,其運(yùn)行效率要比Observable低得多。

MISSING

對(duì)應(yīng)于MissingEmitter<T>類,
通過(guò)其源碼:

static final class MissingEmitter<T> extends BaseEmitter<T> {


        private static final long serialVersionUID = 3776720187248809713L;

        MissingEmitter(Subscriber<? super T> actual) {
            super(actual);
        }

        @Override
        public void onNext(T t) {
            if (isCancelled()) {
                return;
            }

            if (t != null) {
                actual.onNext(t);
            } else {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }

            for (;;) {
                long r = get();
                if (r == 0L || compareAndSet(r, r - 1)) {
                    return;
                }
            }
        }

    }

可以發(fā)現(xiàn),在傳遞數(shù)據(jù)時(shí)

actual.onNext(t);

并沒(méi)有對(duì)緩存池的狀態(tài)進(jìn)行判斷,所以在此策略下,通過(guò)Create方法創(chuàng)建的Flowable相當(dāng)于沒(méi)有指定背壓策略,不會(huì)對(duì)通過(guò)onNext發(fā)射的數(shù)據(jù)做緩存或丟棄處理,需要下游通過(guò)背壓操作符(onBackpressureBuffer()/onBackpressureDrop()/onBackpressureLatest())指定背壓策略。

onBackpressureXXX背壓操作符

Flowable除了通過(guò)create創(chuàng)建的時(shí)候指定背壓策略,也可以在通過(guò)其它創(chuàng)建操作符just,fromArray等創(chuàng)建后通過(guò)背壓操作符指定背壓策略。
onBackpressureBuffer()對(duì)應(yīng)BackpressureStrategy.BUFFER
onBackpressureDrop()對(duì)應(yīng)BackpressureStrategy.DROP
onBackpressureLatest()對(duì)應(yīng)BackpressureStrategy.LATEST
例如代碼

    public void demo7() {
        Flowable
                .create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                        for (int i = 0; i < 500; i++) {
                            e.onNext(i);
                        }
                        e.onComplete();
                    }
                }, BackpressureStrategy.DROP)
                .observeOn(Schedulers.newThread())
                .subscribeOn(Schedulers.newThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });
    }

等同于,代碼:

    public void demo8() {
        Flowable.range(0, 500)
                .onBackpressureDrop()
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.newThread())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(@NonNull Integer integer) throws Exception {
                        System.out.println(integer);
                    }
                });
    }

Subscription

Subscription與Disposable均是觀察者與可觀察對(duì)象建立訂閱狀態(tài)后回調(diào)回來(lái)的參數(shù),如同通過(guò)Disposable的dispose()方法可以取消Observer與Oberverable的訂閱關(guān)系一樣,通過(guò)Subscription的cancel()方法也可以取消Subscriber與Flowable的訂閱關(guān)系。
不同的是接口Subscription中多了一個(gè)方法request(long n),如上面代碼中的:

 s.request(Long.MAX_VALUE);   

此方法的作用是什么呢,去掉這個(gè)方法會(huì)有什么影響呢?
運(yùn)行如下代碼:

public void demo9() {
        Flowable
                .create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                        System.out.println("發(fā)射----> 1");
                        e.onNext(1);
                        System.out.println("發(fā)射----> 2");
                        e.onNext(2);
                        System.out.println("發(fā)射----> 3");
                        e.onNext(3);
                        System.out.println("發(fā)射----> 完成");
                        e.onComplete();
                    }
                }, BackpressureStrategy.BUFFER) //create方法中多了一個(gè)BackpressureStrategy類型的參數(shù)
                .subscribeOn(Schedulers.newThread())//為上下游分別指定各自的線程
                .observeOn(Schedulers.newThread())
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        //去掉代碼s.request(Long.MAX_VALUE);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println("接收----> " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("接收----> 完成");
                    }
                });
    }

運(yùn)行結(jié)果如下:

System.out: 發(fā)射----> 1
System.out: 發(fā)射----> 2
System.out: 發(fā)射----> 3
System.out: 發(fā)射----> 完成

我們發(fā)現(xiàn)Flowable照常發(fā)送數(shù)據(jù),而Subsriber不再接收數(shù)據(jù)。
這是因?yàn)镕lowable在設(shè)計(jì)的時(shí)候,采用了一種新的思路——響應(yīng)式拉取方式,來(lái)設(shè)置下游對(duì)數(shù)據(jù)的請(qǐng)求數(shù)量,上游可以根據(jù)下游的需求量,按需發(fā)送數(shù)據(jù)。
如果不顯示調(diào)用request則默認(rèn)下游的需求量為零,所以運(yùn)行上面的代碼后,上游Flowable發(fā)射的數(shù)據(jù)不會(huì)交給下游Subscriber處理。
運(yùn)行如下代碼:

public void demo10() {
        Flowable
                .create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                        System.out.println("發(fā)射----> 1");
                        e.onNext(1);
                        System.out.println("發(fā)射----> 2");
                        e.onNext(2);
                        System.out.println("發(fā)射----> 3");
                        e.onNext(3);
                        System.out.println("發(fā)射----> 完成");
                        e.onComplete();
                    }
                }, BackpressureStrategy.BUFFER) //create方法中多了一個(gè)BackpressureStrategy類型的參數(shù)
                .subscribeOn(Schedulers.newThread())//為上下游分別指定各自的線程
                .observeOn(Schedulers.newThread())
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(2);//設(shè)置Subscriber的消費(fèi)能力為2
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println("接收----> " + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("接收----> 完成");
                    }
                });
    }

運(yùn)行結(jié)果如下:

System.out: 發(fā)射----> 1
System.out: 發(fā)射----> 2
System.out: 發(fā)射----> 3
System.out: 發(fā)射----> 完成
System.out: 接收----> 1
System.out: 接收----> 2

我們發(fā)現(xiàn)通過(guò)s.request(2);設(shè)置Subscriber的數(shù)據(jù)請(qǐng)求量為2條,超出其請(qǐng)求范圍之外的數(shù)據(jù)則沒(méi)有接收。
多次調(diào)用request會(huì)產(chǎn)生怎樣的結(jié)果呢?
運(yùn)行如下代碼:

public void demo11() {
        Flowable
                .create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                        String threadName = Thread.currentThread().getName();
                        for (int i = 1; i <= 10; i++) {
                            System.out.println(threadName + "發(fā)射---->" + i);
                            e.onNext(i);
                        }
                        System.out.println(threadName + "發(fā)射數(shù)據(jù)結(jié)束" + System.currentTimeMillis());
                        e.onComplete();
                    }
                }, BackpressureStrategy.BUFFER)
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.newThread())
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(3);//調(diào)用兩次request
                        s.request(4);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println(Thread.currentThread().getName() + "接收---------->" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        t.printStackTrace();
                    }

                    @Override
                    public void onComplete() {
                        System.out.println(Thread.currentThread().getName() + "接收----> 完成");
                    }
                });
    }

通過(guò)Flowable發(fā)射10條數(shù)據(jù),在onSubscribe(Subscription s) 方法中調(diào)用兩次request,運(yùn)行結(jié)果如下:

AB417C9CAC5A4BD98375240B5A5C1D6A.jpg

我們發(fā)現(xiàn)Subscriber總共接收了7條數(shù)據(jù),是兩次需求累加后的數(shù)量。

通過(guò)日志我們發(fā)現(xiàn),上游并沒(méi)有根據(jù)下游的實(shí)際需求,發(fā)送數(shù)據(jù),而是能發(fā)送多少,就發(fā)送多少,不管下游是否需要。
而且超出下游需求之外的數(shù)據(jù),仍然放到了異步緩存池中。這點(diǎn)我們可以通過(guò)以下代碼來(lái)驗(yàn)證:

public void demo12() {
        Flowable
                .create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                        for (int i = 1; i < 130; i++) {
                            System.out.println("發(fā)射---->" + i);
                            e.onNext(i);
                        }
                        e.onComplete();
                    }
                }, BackpressureStrategy.ERROR)
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.newThread())
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(1);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println("接收------>" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        t.printStackTrace();
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("接收------>完成");
                    }
                });
    }

通過(guò)Flowable發(fā)射130條數(shù)據(jù),通過(guò)s.request(1)設(shè)置下游的數(shù)據(jù)請(qǐng)求量為1條,設(shè)置緩存策略為BackpressureStrategy.ERROR,如果異步緩存池超限,會(huì)導(dǎo)致MissingBackpressureException異常。
運(yùn)行之后,日志如下:

MissingBackpressureException.jpg

久違的異常出現(xiàn)了,所以超出下游需求之外的數(shù)據(jù),仍然放到了異步緩存池中,并導(dǎo)致緩存池溢出。

那么上游如何才能按照下游的請(qǐng)求數(shù)量發(fā)送數(shù)據(jù)呢,
雖然通過(guò)request可以設(shè)置下游的請(qǐng)求數(shù)量,但是上游并沒(méi)有獲取到這個(gè)數(shù)量,如何獲取呢?
這便需要用到Flowable與Observable的第三點(diǎn)區(qū)別,F(xiàn)lowable特有的發(fā)射器FlowableEmitter

FlowableEmitter

flowable的發(fā)射器FlowableEmitter與observable的發(fā)射器ObservableEmitter均繼承自Emitter(Emitter在教程二中已經(jīng)說(shuō)過(guò)了)
比較兩者源碼可以發(fā)現(xiàn);

public interface ObservableEmitter<T> extends Emitter<T> {

    void setDisposable(Disposable d);

    void setCancellable(Cancellable c);

    boolean isDisposed();
  
    ObservableEmitter<T> serialize();
}

public interface FlowableEmitter<T> extends Emitter<T> {

    void setDisposable(Disposable s);

    void setCancellable(Cancellable c);

    long requested();

    boolean isCancelled();

    FlowableEmitter<T> serialize();
}

接口FlowableEmitter中多了一個(gè)方法

long requested();

我們可以通過(guò)這個(gè)方法來(lái)獲取當(dāng)前未完成的請(qǐng)求數(shù)量,
運(yùn)行下面的代碼,這次我們要先喪失一下原則,雖然我們之前說(shuō)過(guò)同步狀態(tài)下不使用Flowable,但是這次我們需要先看一下同步狀態(tài)下情況。

public void demo13() {
        Flowable
                .create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                        String threadName = Thread.currentThread().getName();
                        for (int i = 1; i <= 5; i++) {
                            System.out.println("當(dāng)前未完成的請(qǐng)求數(shù)量-->" + e.requested());
                            System.out.println(threadName + "發(fā)射---->" + i);
                            e.onNext(i);
                        }
                        System.out.println(threadName + "發(fā)射數(shù)據(jù)結(jié)束" + System.currentTimeMillis());
                        e.onComplete();
                    }
                }, BackpressureStrategy.BUFFER)//上下游運(yùn)行在同一線程中
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(3);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println(Thread.currentThread().getName() + "接收---------->" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        t.printStackTrace();
                    }

                    @Override
                    public void onComplete() {
                        System.out.println(Thread.currentThread().getName() + "接收----> 完成");
                    }
                });
    }

打印日志如下:

4.jpg

通過(guò)日志我們發(fā)現(xiàn), 通過(guò)e.requested()獲取到的是一個(gè)動(dòng)態(tài)的值,會(huì)隨著下游已經(jīng)接收的數(shù)據(jù)的數(shù)量而遞減。
在上面的代碼中,我們沒(méi)有指定上下游的線程,上下游運(yùn)行在同一線程中。
這與我們之前提到的,同步狀態(tài)下不使用Flowable相違背。那是因?yàn)楫惒角闆r下e.requested()的值太復(fù)雜,必須通過(guò)同步情況過(guò)渡一下才能說(shuō)得明白。
我們?cè)谏厦娲a的基礎(chǔ)上,給上下游指定獨(dú)立的線程,代碼如下

public void demo14() {
        Flowable
                .create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                        String threadName = Thread.currentThread().getName();
                        for (int i = 1; i <= 5; i++) {
                            System.out.println("當(dāng)前未完成的請(qǐng)求數(shù)量-->" + e.requested());
                            System.out.println(threadName + "發(fā)射---->" + i);
                            e.onNext(i);
                        }
                        System.out.println(threadName + "發(fā)射數(shù)據(jù)結(jié)束" + System.currentTimeMillis());
                        e.onComplete();
                    }
                }, BackpressureStrategy.BUFFER)
                .subscribeOn(Schedulers.newThread())//添加兩行代碼,為上下游分配獨(dú)立的線程
                .observeOn(Schedulers.newThread())
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(3);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println(Thread.currentThread().getName() + "接收---------->" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        t.printStackTrace();
                    }

                    @Override
                    public void onComplete() {
                        System.out.println(Thread.currentThread().getName() + "接收----> 完成");
                    }
                });
    }

運(yùn)行后日志如下:

log5.jpg

雖然我們指定了下游的數(shù)據(jù)請(qǐng)求量為3,但是我們?cè)谏嫌潍@取未完成請(qǐng)求數(shù)量的時(shí)候,并不是3,而是128。難道上游有個(gè)最小未完成請(qǐng)求數(shù)量?只要下游設(shè)置的數(shù)據(jù)請(qǐng)求量小于128,上游獲取到的都是128?
帶著這個(gè)疑問(wèn),我們?cè)囈幌庐?dāng)下游的數(shù)據(jù)請(qǐng)求量為500,大于128時(shí)的情況。

public void demo15() {
        Flowable
                .create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                        String threadName = Thread.currentThread().getName();
                        for (int i = 1; i <= 5; i++) {
                            System.out.println("當(dāng)前未完成的請(qǐng)求數(shù)量-->" + e.requested());
                            System.out.println(threadName + "發(fā)射---->" + i);
                            e.onNext(i);
                        }
                        System.out.println(threadName + "發(fā)射數(shù)據(jù)結(jié)束" + System.currentTimeMillis());
                        e.onComplete();
                    }
                }, BackpressureStrategy.BUFFER)
                .subscribeOn(Schedulers.newThread())//添加兩行代碼,為上下游分配獨(dú)立的線程
                .observeOn(Schedulers.newThread())
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(500);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println(Thread.currentThread().getName() + "接收---------->" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        t.printStackTrace();
                    }

                    @Override
                    public void onComplete() {
                        System.out.println(Thread.currentThread().getName() + "接收----> 完成");
                    }
                });
    }

運(yùn)行日志如下;

log6.jpg

結(jié)果還是128.
其實(shí)不論下游通過(guò)s.request();設(shè)置多少請(qǐng)求量,我們?cè)谏嫌潍@取到的初始未完成請(qǐng)求數(shù)量都是128。
這是為啥呢?
還記得之前我們說(shuō)過(guò),F(xiàn)lowable有一個(gè)異步緩存池,上游發(fā)射的數(shù)據(jù),先放到異步緩存池中,再由異步緩存池交給下游。所以上游在發(fā)射數(shù)據(jù)時(shí),首先需要考慮的不是下游的數(shù)據(jù)請(qǐng)求量,而是緩存池中能不能放得下,否則在緩存池滿的情況下依然會(huì)導(dǎo)致數(shù)據(jù)遺失或者背壓異常。如果緩存池可以放得下,那就發(fā)送,至于是否超出了下游的數(shù)據(jù)需求量,可以在緩存池向下游傳遞數(shù)據(jù)時(shí),再作判斷,如果未超出,則將緩存池中的數(shù)據(jù)傳遞給下游,如果超出了,則不傳遞。
如果下游對(duì)數(shù)據(jù)的需求量超過(guò)緩存池的大小,而上游能獲取到的最大需求量是128,上游對(duì)超出128的需求量是怎么獲取到的呢?
帶著這個(gè)疑問(wèn),我們運(yùn)行一下,下面的代碼,上游發(fā)送150個(gè)數(shù)據(jù),下游也需要150個(gè)數(shù)據(jù)。

public void demo16() {
        Flowable
                .create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                        String threadName = Thread.currentThread().getName();
                        for (int i = 1; i <= 150; i++) {
                            System.out.println("當(dāng)前未完成的請(qǐng)求數(shù)量-->" + e.requested());
                            System.out.println(threadName + "發(fā)射---->" + i);
                            e.onNext(i);
                        }
                        System.out.println(threadName + "發(fā)射數(shù)據(jù)結(jié)束" + System.currentTimeMillis());
                        e.onComplete();
                    }
                }, BackpressureStrategy.BUFFER)
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.newThread())
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(150);
                    }

                    @Override
                    public void onNext(Integer integer) {
                        System.out.println(Thread.currentThread().getName() + "接收---------->" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        t.printStackTrace();
                    }

                    @Override
                    public void onComplete() {
                        System.out.println(Thread.currentThread().getName() + "接收----> 完成");
                    }
                });
    }

截取部分日志如下:

log7.jpg

我們發(fā)現(xiàn)通過(guò)e.requested()獲取到的上游當(dāng)前未完成請(qǐng)求數(shù)量并不是一直遞減的,在遞減到33時(shí),又回升到了128.而回升的時(shí)機(jī)正好是在下游接收了96條數(shù)據(jù)之后。我們之前說(shuō)過(guò),異步緩存池中的數(shù)據(jù)并不是向下游發(fā)射一條便清理一條,而是每等累積到95條時(shí),清理一次。通過(guò)e.requested()獲取到的值,正是在異步緩存池清理數(shù)據(jù)時(shí),回升的。也就是,異步緩存池每次清理后,有剩余的空間時(shí),都會(huì)導(dǎo)致上游未完成請(qǐng)求數(shù)量的回升,這樣既不會(huì)引發(fā)背壓異常,也不會(huì)導(dǎo)致數(shù)據(jù)遺失。
上游在發(fā)送數(shù)據(jù)的時(shí)候并不需要考慮下游需不需要,而只需要考慮異步緩存池中是否放得下,放得下便發(fā),放不下便暫停。所以,通過(guò)e.requested()獲取到的值,并不是下游真正的數(shù)據(jù)請(qǐng)求數(shù)量,而是異步緩存池中可放入數(shù)據(jù)的數(shù)量。數(shù)據(jù)放入緩存池中后,再由緩存池按照下游的數(shù)據(jù)請(qǐng)求量向下傳遞,待到傳遞完的數(shù)據(jù)累積到95條之后,將其清除,騰出空間存放新的數(shù)據(jù)。如果下游處理數(shù)據(jù)緩慢,則緩存池向下游傳遞數(shù)據(jù)的速度也相應(yīng)變慢,進(jìn)而沒(méi)有傳遞完的數(shù)據(jù)可清除,也就沒(méi)有足夠的空間存放新的數(shù)據(jù),上游通過(guò)e.requested()獲取的值也就變成了0,如果此時(shí),再發(fā)送數(shù)據(jù)的話,則會(huì)根據(jù)BackpressureStrategy背壓策略的不同,拋出MissingBackpressureException異常,或者丟掉這條數(shù)據(jù)。
所以上游只需要在e.requested()等于0時(shí),暫停發(fā)射數(shù)據(jù),便可解決背壓?jiǎn)栴}。

最終方案

下面我們回到最初的問(wèn)題
運(yùn)行下面代碼:

public void demo17() {
        Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                        int i = 0;
                        while (true) {
                            i++;
                            System.out.println("發(fā)射---->" + i);
                            e.onNext(i);
                        }
                    }
                })
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.newThread())
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                    }

                    @Override
                    public void onNext(Integer integer) {
                        try {
                            Thread.sleep(50);
                            System.out.println("接收------>" + integer);
                        } catch (InterruptedException ignore) {
                        }
                    }

                    @Override
                    public void onError(Throwable e) {
                        e.printStackTrace();
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("接收------>完成");
                    }
                });
    }

由于下游處理數(shù)據(jù)的速度(Thread.sleep(50))趕不上上游發(fā)射數(shù)據(jù)的速度,則會(huì)導(dǎo)致背壓?jiǎn)栴}。
運(yùn)行后查看內(nèi)存使用如下:

GIF333.gif

內(nèi)存暴增,很快就會(huì)OOM
下面,對(duì)其通過(guò)Flowable做些改進(jìn),讓其既不會(huì)產(chǎn)生背壓?jiǎn)栴},也不會(huì)引起異?;蛘邤?shù)據(jù)丟失。
代碼如下:

public void demo18() {
        Flowable
                .create(new FlowableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                        int i = 0;
                        while (true) {
                            if (e.requested() == 0) continue;//此處添加代碼,讓flowable按需發(fā)送數(shù)據(jù)
                            System.out.println("發(fā)射---->" + i);
                            i++;
                            e.onNext(i);
                        }
                    }
                }, BackpressureStrategy.MISSING)
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.newThread())
                .subscribe(new Subscriber<Integer>() {
                    private Subscription mSubscription;

                    @Override
                    public void onSubscribe(Subscription s) {
                        s.request(1);            //設(shè)置初始請(qǐng)求數(shù)據(jù)量為1
                        mSubscription = s;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        try {
                            Thread.sleep(50);
                            System.out.println("接收------>" + integer);
                            mSubscription.request(1);//每接收到一條數(shù)據(jù)增加一條請(qǐng)求量
                        } catch (InterruptedException ignore) {
                        }
                    }

                    @Override
                    public void onError(Throwable t) {
                    }

                    @Override
                    public void onComplete() {
                    }
                });
    }

下游處理數(shù)據(jù)的速度Thread.sleep(50)趕不上上游發(fā)射數(shù)據(jù)的速度,
不同的是,我們?cè)谙掠?em>onNext(Integer integer) 方法中,每接收一條數(shù)據(jù)增加一條請(qǐng)求量,

mSubscription.request(1)

在上游添加代碼

if(e.requested()==0)continue;

讓上游按需發(fā)送數(shù)據(jù)。
運(yùn)行后查看內(nèi)存:

GIF999.gif

內(nèi)存一直相當(dāng)?shù)钠届o,而且上游嚴(yán)格按照下游的需求量發(fā)送數(shù)據(jù),不會(huì)產(chǎn)生MissingBackpressureException異常,或者丟失數(shù)據(jù)。
上一篇:Rxjava2入門教程四:Scheduler線程調(diào)度器
下一篇:Rxjava2入門教程六:Single、Completable、Maybe——簡(jiǎn)化版的Observable

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容