RxJava2.0 - 文章七

前言

上一節(jié)我們學習了使用Observable解決上、下游發(fā)射事件速度不平衡的問題,之所以學習 Observable,是因為Observable有很多的使用場景,而這一節(jié)我們要學習一個新的操作符 —— Flowable操作符,這個操作符可以解決絕大部分的問題,但是Observable、Flowable各有應用場景,也各有優(yōu)勢和缺點。

1. Flowable


Flowable如下圖所示:


Flowable.png
  • Observable的上、下游分別是Observable、Observer,建立連接是subscribe();
  • Flowable的上、下游分別是Flowable、Subscriber,建立連接是subscribe();
Flowable最基本用法如下:
/**
     * Flowable最基本用法
     */
    public static void demo1(){
        // 創(chuàng)建一個上游:Flowable
        Flowable<Integer> upStream = Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                Log.e("TAG" , "emit 1") ;
                emitter.onNext(1);
                Log.e("TAG" , "emit 2") ;
                emitter.onNext(2);
                Log.e("TAG" , "emit 3") ;
                emitter.onNext(3);

                Log.e("TAG" , "emit complete") ;
                emitter.onComplete();

            }   // 參數(shù)BackpressureStrategy.ERROR作用:
                // 用來選擇背壓,用于解決上下游發(fā)射數(shù)據(jù)速度不平衡問題,如果速度不一致,
                // 直接拋異常MissingBackpressureException
        } , BackpressureStrategy.ERROR) ;


        // 創(chuàng)建一個下游:Subscriber
        Subscriber<Integer> downStream = new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                Log.e("TAG" , "subscribe") ;
                s.request(Long.MAX_VALUE);  
            }

            @Override
            public void onNext(Integer integer) {
                Log.e("TAG" , "next -> " + integer) ;
            }

            @Override
            public void onError(Throwable t) {
                Log.e("TAG" , "error -> " + t) ;
            }

            @Override
            public void onComplete() {
                Log.e("TAG" , "complete") ;
            }
        } ;

        // 建立連接
        upStream.subscribe(downStream) ;
    }

運行結果如下:

TAG: subscribe
TAG: emit 1
TAG: next -> 1
TAG: emit 2
TAG: next -> 2
TAG: emit 3
TAG: next -> 3
TAG: emit complete
TAG: complete
Flowable寫法與Observable稍有區(qū)別:
  • 第一個區(qū)別是:Flowable上游中新增一個參數(shù),BackpressureStrategy.ERROR,用來選擇背壓,用于是解決上下游發(fā)射數(shù)據(jù)速度不平衡問題,如果速度不一致,直接拋出異常MissingBackpressureException;
  • 第二個區(qū)別是:在下游的onSubscribe()方法中,接收的是Subscription ,而不是Observable中的Disposable,這兩個作用都是用于切斷水管,調用Subscription.cancel()、調用Disposable.dispose(),不同的是在 Subscription中需要調用 request(),即就是代碼中的
s.request(Long.MAX_VALUE); 
  • 如果不添加 s.request(Long.MAX_VALUE); ,直接報錯MissingBackpressureException,示例代碼如下:
    /**
     * Flowable用法:在onSubscribe()中不加 s.request(Long.MAX_VALUE);
     */
    public static void demo2(){
        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                Log.e("TAG" , "emit 1") ;
                emitter.onNext(1);
                Log.e("TAG" , "emit 2") ;
                emitter.onNext(2);
                Log.e("TAG" , "emit 3") ;
                emitter.onNext(3);

                Log.e("TAG" , "emit complete") ;
                emitter.onComplete();
            }
        } , BackpressureStrategy.ERROR).subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                Log.e("TAG" , "subscribe") ;
            }

            @Override
            public void onNext(Integer integer) {
                Log.e("TAG" , "next -> " + integer) ;
            }

            @Override
            public void onError(Throwable t) {
                Log.e("TAG" , "error -> " + t) ;
            }

            @Override
            public void onComplete() {
                Log.e("TAG" , "complete") ;
            }
        });

    }
圖片.png

可以看到,在上游發(fā)送第一個事件后,下游直接拋出著名異常MissingBackpressureException異常,且下游并沒有收到任何其余的事件,上、下游在同一個線程,是同步關系,按道理來講,上游每次發(fā)送事件都應該等下游處理完事件后,才會繼續(xù)發(fā)送事件,下邊我們看異步線程的情況。

2. Flowable讓上下游在異步線程中執(zhí)行

代碼如下:

/**
     * Flowable:
     *          讓上、下游處于異步線程中,也就是說讓上游在子線程中執(zhí)行,下游在主線程中執(zhí)行
     */
    public static void demo3(){
        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                Log.e("TAG" , "emit 1") ;
                emitter.onNext(1);
                Log.e("TAG" , "emit 2") ;
                emitter.onNext(2);
                Log.e("TAG" , "emit 3") ;
                emitter.onNext(3);

                Log.e("TAG" , "emit complete") ;
                emitter.onComplete();
            }
        } , BackpressureStrategy.ERROR).subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        Log.e("TAG" , "subscribe") ;
                        mSubscription = s ;
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.e("TAG" , "next -> " + integer) ;
                    }

                    @Override
                    public void onError(Throwable t) {
                        Log.e("TAG" , "error -> " + t) ;
                    }

                    @Override
                    public void onComplete() {
                        Log.e("TAG" , "complete") ;
                    }
                }) ;
    }

運行結果如下:

TAG: subscribe
TAG: emit 1
TAG: emit 2
TAG: emit 3
TAG: emit complete

可以看到,上游發(fā)送所有事件,但是下游一個都沒有收到

  • 在onSubscribe()方法中調用的s.request(Long.MAX_VALUE); 可以把request()方法當做一種能力,當成下游處理事件的能力,下游能處理幾個,就告訴上游我要幾個,可以解決上游一次性發(fā)射全部數(shù)據(jù)到下游,導致OOM,可以解決上一節(jié)學習的兩種處理方式的缺陷:
    1>:取樣事件會導致事件丟失;
    2>:減速導致性能丟失;
    而調用s.request(Long.MAX_VALUE)方法 這種 方式既解決了事件丟失又解決了性能丟失;

3. 認識 request()方法


1>:對于上、下游二者在同一個線程中,上游發(fā)射第一個事件,下游直接拋出異常MissingBackpressureException異常,是因為下游沒有調用request()方法,表示上游認為下游沒有處理事件的能力,上游發(fā)送事件,下游必須在onSubscribe()方法中調用request()方法,表示下游有處理事件的能力,比如 request(3)或者request(Long.MAX_VALUE)都是可以的;

2>:對于上、下游二者沒有在同一個線程中,即異步操作,上游可以正確發(fā)送所有事件,因為在 Flowable中默認有一個 128的水缸,當上下游在不同的線程中工作時,上游會先把事件發(fā)送到這個水缸中,所以說,即使下游沒有調用 request()方法,但是水缸中保存著上游發(fā)射的事件,只有當下游調用request()方法時,才從水缸中取出事件發(fā)送給 下游;

注意:

1>:Flowable中的 水缸大小只有 128,如果是129,直接拋MissingBackpressureException異常;
2>:我們這里把上游發(fā)射的事件全部存儲到 水缸中,下游一個都沒有消費,只要下游快速消費一個,就不會OOM,如果下游10秒之后再消費也會OOM;

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容