RxAndroid筆記

? RxAndroid學習筆記--2019-1-31

? 原文鏈接: http://www.itdecent.cn/p/0cd258eecf60

? https://juejin.im/entry/5993a80cf265da249150e93c

  1. 配置

    implementation 'io.reactivex.rxjava2:rxjava:2.2.6'
    implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
    
  2. 基礎(chǔ)用法

    Observable被觀察者,即發(fā)射器(上游事件)

    Observer 觀察者,即接收器(下游事件)

    Observable.create(new ObservableOnSubscribe<String>() {
                            @Override
                            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                                    emitter.onNext("測試1");
                                    emitter.onNext("測試2");
                                    emitter.onNext("測試3");
                                    emitter.onNext("測試4");
                                    emitter.onComplete();
                            }
                    }).subscribe(new Observer<String>() {
                            @Override
                            public void onSubscribe(Disposable d) {

                            }

                            @Override
                            public void onNext(String s) {
                                    Log.d(TAG, s);
                            }

                            @Override
                            public void onError(Throwable e) {

                            }

                            @Override
                            public void onComplete() {
                                    Log.d(TAG, "onComplete");
                            }
                    });

運行程序

com.dxl.myapplication D/dxl: 測試1
com.dxl.myapplication D/dxl: 測試1
com.dxl.myapplication D/dxl: 測試2
com.dxl.myapplication D/dxl: 測試3
com.dxl.myapplication D/dxl: 測試4
com.dxl.myapplication D/dxl: onComplete
  1. 在發(fā)射事件過程中,我們調(diào)用了onComplete后,接收事件將停止,但是發(fā)射事件仍將繼續(xù):

例如:

public void subscribe(ObservableEmitter<String> emitter) throws Exception {
             emitter.onNext("測試1");
             Log.d(TAG, "emitter.onNext(\"測試1\")");

             emitter.onNext("測試2");
             Log.d(TAG, "emitter.onNext(\"測試2\")");

             emitter.onNext("測試3");
             Log.d(TAG, "emitter.onNext(\"測試3\")");

             emitter.onComplete();
             Log.d(TAG, "emitter.onComplete()");

             emitter.onNext("測試4");
             Log.d(TAG, "emitter.onNext(\"測試4\")");
                            }
                    })

測試3發(fā)送完成后,調(diào)用了onComplete方法后,測試4仍然會發(fā)送,但是無法接收到

  1. Disposable概念,可以切斷接收。當它的isDisposed為false時,可以繼續(xù)接收到事件。如果為true,將不再接收事件。使用方法:

    ...
    }).subscribe(new Observer<String>() {
    
                private Disposable mDisposable;
    
                @Override
                public void onSubscribe(Disposable d) {
                    mDisposable = d;
                }
    
                @Override
                public void onNext(String s) {
                    if (s.equals("測試3")) {
                        mDisposable.dispose();
                    }
                    Log.d(TAG, s);
                }
      ...
    

    當接收到測試3 后,切斷接收事件。后續(xù)測試4 將不會再接收到。

  2. Map

    它的作用是對發(fā)射時間發(fā)送的每一個事件應(yīng)用一個函數(shù),每一個事件都按照指定的函數(shù)去變化.

    舉例:發(fā)送的是1,2,3,我們對發(fā)送的數(shù)字做*10處理。

       Observable.just(1, 2, 3).map(new Function<Integer, Integer>() {
                   @Override
                   public Integer apply(Integer integer) throws Exception {
                       return integer * 10;
                   }
               }).subscribe(new Consumer<Integer>() {
                   @Override
                   public void accept(Integer integer) throws Exception {
                       Log.d(TAG, integer + "");
                   }
               });
    

    log輸出:

       01-31 09:56:29.435 20363-20363/com.dxl.myapplication D/dxl: 10
       01-31 09:56:29.435 20363-20363/com.dxl.myapplication D/dxl: 20
       01-31 09:56:29.435 20363-20363/com.dxl.myapplication D/dxl: 30
    
  3. ZIP

    專用于合并事件,該合并不是連接(連接操作符后面會說),而是兩兩配對,也就意味著,最終配對出的Observable發(fā)射事件數(shù)目只和少的那個相同。

       Observable.zip(Observable.just(1, 2, 3), Observable.just("one", "two"),
                       new BiFunction<Integer, String, String>() {
                           @Override
                           public String apply(Integer integer, String s) throws Exception {
                               return integer + s;
                           }
                       }).subscribe(new Consumer<String>() {
                   @Override
                   public void accept(String s) throws Exception {
                       Log.d(TAG, s);
                   }
               });
    

    日志輸出:

       01-31 10:49:59.577 30063-30063/com.dxl.myapplication D/dxl: 1one
       01-31 10:49:59.577 30063-30063/com.dxl.myapplication D/dxl: 2two
    
  4. Concat

    兩個發(fā)射器連接成一個發(fā)射器

    Observable.concat(Observable.just(1,2,3), Observable.just(4,5,6))
                      .subscribe(new Consumer<Integer>() {
                          @Override
                          public void accept(@NonNull Integer integer) throws Exception {
                              Log.e(TAG, "concat : "+ integer + "\n" );
                          }
                      });
    

    注意,concat必須是第一個發(fā)射器執(zhí)行完complete之后,才會去執(zhí)行第二個。如果第一個發(fā)射器沒有執(zhí)行onComplete,那么第二個將不會被執(zhí)行。

    比如:

    Observable<String> observable1 = Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    emitter.onNext("1");
                    emitter.onNext("2");
                    emitter.onNext("3");
                    //沒有調(diào)用onComplete,observable2將不會被執(zhí)行
    //                emitter.onComplete();
                }
            });
    
            Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                    emitter.onNext("4");
                    emitter.onNext("5");
                    emitter.onNext("6");
                    emitter.onComplete();
                }
            });
    
            Observable.concat(observable1, observable2).subscribe(new Observer<String>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(String s) {
                    Log.d(TAG, s);
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete");
                }
            });
    

    這時候輸出的結(jié)果為:

    02-01 08:26:45.516 30613-30613/com.dxl.myapplication D/dxl: 1
    02-01 08:26:45.516 30613-30613/com.dxl.myapplication D/dxl: 2
    02-01 08:26:45.516 30613-30613/com.dxl.myapplication D/dxl: 3
    

    如果我們把注釋打開:此時observable2就會被執(zhí)行了。

    02-01 08:27:34.313 31055-31055/com.dxl.myapplication D/dxl: 1
    02-01 08:27:34.313 31055-31055/com.dxl.myapplication D/dxl: 2
    02-01 08:27:34.313 31055-31055/com.dxl.myapplication D/dxl: 3
    02-01 08:27:34.313 31055-31055/com.dxl.myapplication D/dxl: 4
    02-01 08:27:34.313 31055-31055/com.dxl.myapplication D/dxl: 5
    02-01 08:27:34.313 31055-31055/com.dxl.myapplication D/dxl: 6
    02-01 08:27:34.313 31055-31055/com.dxl.myapplication D/dxl: onComplete
    

    如果把concat改為merge, 則observable1和observable2將都會被執(zhí)行。

    用途舉例,比如有些時候,對數(shù)據(jù)不太敏感時,我們需要先從緩存中讀取數(shù)據(jù),如果緩存中沒有數(shù)據(jù)的話,再去讀取網(wǎng)絡(luò)數(shù)據(jù)。

    這時候可以分別定義緩存的observable和在線的observable,當成功從緩存中讀取數(shù)據(jù)時,調(diào)用onNext,如果緩存獲取不到,直接調(diào)用onComplete去執(zhí)行在線獲取的observable。

  5. FlatMap

    將一個發(fā)射器Observable轉(zhuǎn)換為多個發(fā)射器Observables,然后再將多個發(fā)射器裝入一個單一的發(fā)射器Observable。

    有個需要注意的是,flatMap 并不能保證事件的順序,如果需要保證,需要用到我們下面要講的 ConcatMap

     Observable.just(1,2,3).flatMap(new Function<Integer, ObservableSource<String>>() {
                @Override
                public ObservableSource<String> apply(Integer integer) throws Exception {
                    List<String> list = new ArrayList<>();
                    for (int i = 0; i < integer; i++) {
                        list.add("integer:" + integer + "--" + i + "");
                    }
                    return Observable.fromIterable(list);
                }
            }).subscribe(new Consumer<String>() {
                @Override
                public void accept(String s) throws Exception {
                    Log.d(TAG, s);
                }
            });
    

    輸出結(jié)果:

    01-31 11:36:27.957 32001-32001/com.dxl.myapplication D/dxl: integer:1--0
    01-31 11:36:27.957 32001-32001/com.dxl.myapplication D/dxl: integer:2--0
    01-31 11:36:27.957 32001-32001/com.dxl.myapplication D/dxl: integer:2--1
    01-31 11:36:27.957 32001-32001/com.dxl.myapplication D/dxl: integer:3--0
    01-31 11:36:27.957 32001-32001/com.dxl.myapplication D/dxl: integer:3--1
    01-31 11:36:27.957 32001-32001/com.dxl.myapplication D/dxl: integer:3--2
    
  6. concatMap

    concatMapFlatMap 的唯一區(qū)別就是 concatMap 保證了順序,其他使用是一樣的。

  7. distinct

    去重。例如Observable.just(1,1,2,3,3) 輸出結(jié)果為1,2 ,3

  8. Fliter
    過濾器。接收一個參數(shù),過濾掉不需要的結(jié)果。
    例如:

    Observable.just(1, 2, 3, 4, 5).filter(new Predicate<Integer>() {
               @Override
               public boolean test(Integer integer) throws Exception {
                     //不滿足此條件的將被過濾
                     return integer > 3;
                      }
               }).subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                       Log.d(TAG, integer + "");
                     }
              });
    

    輸出結(jié)果:

     01-31 11:52:09.085 2005-2005/com.dxl.myapplication D/dxl: 4
     01-31 11:52:09.085 2005-2005/com.dxl.myapplication D/dxl: 5
  1. timer
    相當于一個定時任務(wù),默認在新線程。
    如:

        Log.d(TAG, "定時開始");
        Observable.timer(2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
             @Override
             public void accept(Long aLong) throws Exception {
                  Log.d(TAG, "定時結(jié)束");
             }
        });
    

aLong暫時沒有意義。都是0

       01-31 13:10:47.518 8435-8435/com.dxl.myapplication D/dxl: 定時開始
       01-31 13:10:49.560 8435-8505/com.dxl.myapplication D/dxl: 定時結(jié)束
  1. interval

    interval 操作符用于間隔時間執(zhí)行某個操作,其接受三個參數(shù),分別是第一次發(fā)送延遲,間隔時間,時間單位。

    返回值是Disposable,可以利用用于取消事件。

      Log.d(TAG, "定時開始");
              mDisposable = Observable.interval(1, 2, TimeUnit.SECONDS).subscribe(new Consumer<Long>() {
                  @Override
                  public void accept(Long aLong) throws Exception {
                      Log.d(TAG, "aLong = " + aLong);
                      if (aLong >= 5) {
                          mDisposable.dispose();
                      }
                  }
              });
      
      @Override
          protected void onDestroy() {
              super.onDestroy();
              if (mDisposable != null && !mDisposable.isDisposed()) {
                  mDisposable.dispose();
              }
          }
    

輸出:

      01-31 13:17:01.063 9118-9118/com.dxl.myapplication D/dxl: 定時開始
      01-31 13:17:02.104 9118-9143/com.dxl.myapplication D/dxl: aLong = 0
      01-31 13:17:04.096 9118-9143/com.dxl.myapplication D/dxl: aLong = 1
      01-31 13:17:06.098 9118-9143/com.dxl.myapplication D/dxl: aLong = 2
      01-31 13:17:08.100 9118-9143/com.dxl.myapplication D/dxl: aLong = 3
      01-31 13:17:10.102 9118-9143/com.dxl.myapplication D/dxl: aLong = 4
      01-31 13:17:12.104 9118-9143/com.dxl.myapplication D/dxl: aLong = 5

倒計時:

/**
     * 倒計時方法
     * @param time
     * @return
     */
    private Flowable<Long> countDown(final int time) {
        return Flowable.interval(1, TimeUnit.SECONDS)
                .map(new Function<Long, Long>() {
                    @Override
                    public Long apply(Long aLong) throws Exception {
                        return time - aLong;
                    }
                }).take(time + 1);
    }
    
    mDisposable = countDown(5).observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<Long>() {
                        @Override
                        public void accept(Long aLong) throws Exception {
                            textview.setText(aLong + "");
                        }
                    });

@Override
 protected void onDestroy() {
     super.onDestroy();
     if (mDisposable != null && !mDisposable.isDisposed()) {
         mDisposable.dispose();
     }
 }
  1. delay

    延時發(fā)送數(shù)據(jù)。

    mDisposable = Observable.just(1).delay(2, TimeUnit.SECONDS)
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                             Log.d(TAG, System.currentTimeMillis() + " " + " integer = " + integer);
                        }
                    });
    
  1. 背壓BackPressure

    背壓產(chǎn)生的原因: 被觀察者發(fā)送消息太快以至于它的操作符或者訂閱者不能及時處理相關(guān)的消息

    為了解決這個問題,在RxJava2里,引入了Flowable這個類:Observable不包含 backpressure 處理,而 Flowable 包含。

    下面我們來模擬一個觸發(fā)背壓的實例 , 發(fā)射器每1毫秒發(fā)射一個數(shù)據(jù),接收器每一秒處理一個數(shù)據(jù)。數(shù)據(jù)產(chǎn)生是數(shù)據(jù)處理的1000 倍。

    首先用 RxJava 2.x 版本的 Observable 來實現(xiàn)。

    Observable.interval(1, TimeUnit.MILLISECONDS)
              .subscribeOn(Schedulers.io())
              .observeOn(Schedulers.newThread())
              .subscribe(new Consumer<Long>() {
                    @Override
                    public void accept(Long aLong) throws Exception {
                        Thread.sleep(1000);
                        Log.e("zhao", "onNext: " + aLong);
                    }
           });
    

    經(jīng)過測試,app 很健壯,沒有發(fā)生崩潰,日志每1秒打印一次。在上面我們說到 2.x 版本中 Observable 不再支持背壓,發(fā)神器生成的數(shù)據(jù)全部緩存在內(nèi)存中。

    Observable :

    • 不支持 backpressure 處理,不會發(fā)生 MissingBackpressureException 異常。
    • 所有沒有處理的數(shù)據(jù)都緩存在內(nèi)存中,等待被訂閱者處理。
    • 壞處是:當產(chǎn)生的數(shù)據(jù)過快,內(nèi)存中緩存的數(shù)據(jù)越來越多,占用大量內(nèi)存。

    然后用 RxJava 2.x 版本的 Flowable 來實現(xiàn)。

    Flowable.interval(1, TimeUnit.MILLISECONDS)
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.newThread())
            .subscribe(new Consumer<Long>() {
                  @Override
                   public void accept(Long aLong) throws Exception {
                        Thread.sleep(1000);
                        Log.e("zhao", "onNext: " + aLong);
                    }
             });
    

    運行起來發(fā)生崩潰,崩潰日志如下:

    io.reactivex.exceptions.OnErrorNotImplementedException: Can't deliver value 128 due to lack of requests
    ...
    ...
      Caused by: io.reactivex.exceptions.MissingBackpressureException: Can't deliver value 128 due to lack of requests
    

    很明顯發(fā)生了 MissingBackpressureException 異常 , 128 代表是 Flowable 最多緩存 128 個數(shù)據(jù),緩存次超過 128 個數(shù)據(jù),就會報錯。可喜的是,Rxjava 已經(jīng)給我們提供了解決背壓的策略。

    onBackpressureDrop

    onBackpressureDrop() :當緩沖區(qū)數(shù)據(jù)滿 128 個時候,再新來的數(shù)據(jù)就會被丟棄,如果此時有數(shù)據(jù)被消費了,那么就會把當前最新產(chǎn)生的數(shù)據(jù),放到緩沖區(qū)。簡單來說 Drop 就是直接把存不下的事件丟棄。

    onBackpressureDrop 測試

    Flowable.interval( 1 , TimeUnit.MILLISECONDS)
            .onBackpressureDrop() //onBackpressureDrop 一定要放在 interval 后面否則不會生效
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.newThread())
            .subscribe(new Consumer<Long>() {
                  @Override
                   public void accept(Long aLong) throws Exception {
                       Thread.sleep(1000);
                       Log.e("zhao", "onNext: " + aLong);
                   }
           });
    

    效果如下:

    E/zhao: onNext: 0
    E/zhao: onNext: 1
    ...
    E/zhao: onNext: 126
    E/zhao: onNext: 127
    E/zhao: onNext: 96129
    E/zhao: onNext: 96130
    E/zhao: onNext: 96131
    

    從日志上分析來看,發(fā)射器發(fā)射的 0 ~ 127 總共 128 個數(shù)據(jù)是連續(xù)的,下一個數(shù)據(jù)就是 96129 , 128 ~ 96128 的數(shù)據(jù)被丟棄了。

    注意事項

    1、onBackpressureDrop 一定要放在 interval 后面否則不會生效

    onBackpressureLatest

    onBackpressureLatest 就是只保留最新的事件。

    onBackpressureBuffer

    • onBackpressureBuffer:默認情況下緩存所有的數(shù)據(jù),不會丟棄數(shù)據(jù),這個方法可以解決背壓問題,但是它有像 Observable 一樣的缺點,緩存數(shù)據(jù)太多,占用太多內(nèi)存。
    • onBackpressureBuffer(int capacity) :設(shè)置緩存隊列大小,但是如果緩沖數(shù)據(jù)超過了設(shè)置的值,就會報錯,發(fā)生崩潰。

    onBackpressureBuffer(int capacity) 測試

    Flowable.interval( 1 , TimeUnit.MILLISECONDS)
            .onBackpressureBuffer( 1000 ) //設(shè)置緩沖隊列大小為 1000
            .subscribeOn(Schedulers.io())
            .observeOn(Schedulers.newThread())
            .subscribe(new Consumer<Long>() {
                  @Override
                   public void accept(Long aLong) throws Exception {
                      Thread.sleep(1000);
                      Log.e("zhao", "onNext: " + aLong);
                   }
              });
    

    運行起來后,過了幾秒鐘,發(fā)生崩潰,日志如下:

    io.reactivex.exceptions.OnErrorNotImplementedException: Buffer is full
    ···
    Caused by: io.reactivex.exceptions.MissingBackpressureException: Buffer is full
    

    通過日志可以看出,緩沖區(qū)已經(jīng)滿了。

  1. doOnNext

    可以讓訂閱者在接收到數(shù)據(jù)之前做一些操作,比如把數(shù)據(jù)進行保存。

    Observable.just(1,2,3).doOnNext(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, "doOnNext-" + integer);
                }
            }).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, integer + "");
                }
            });
    

    輸出結(jié)果:

    01-31 14:34:59.107 21047-21047/com.dxl.myapplication D/dxl: doOnNext-1
    01-31 14:34:59.107 21047-21047/com.dxl.myapplication D/dxl: 1
    01-31 14:34:59.107 21047-21047/com.dxl.myapplication D/dxl: doOnNext-2
    01-31 14:34:59.107 21047-21047/com.dxl.myapplication D/dxl: 2
    01-31 14:34:59.107 21047-21047/com.dxl.myapplication D/dxl: doOnNext-3
    01-31 14:34:59.107 21047-21047/com.dxl.myapplication D/dxl: 3
    
  2. skip

    跳過count個數(shù)目開始接收。

    Observable.just(1,2,3,4,5)
                    .skip(2)
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(@NonNull Integer integer) throws Exception {
                            Log.d(TAG, "skip : "+integer + "\n");
                        }
                    });
    
  3. take

    接受一個 long 型參數(shù) count ,代表至多接收 count 個數(shù)據(jù)。

    Observable.just(1,2,3,4,5)
                          .take(2)
                          .subscribe(new Consumer<Integer>() {
                              @Override
                              public void accept(@NonNull Integer integer) throws Exception {
                                  Log.d(TAG, "skip : "+integer + "\n");
                              }
                          });
    
  4. just

    簡單的一個發(fā)射器,依次調(diào)用next方法

  1. Single
    只會接收一個參數(shù),而 SingleObserver 只會調(diào)用 onError() 或者 onSuccess()

    Single.just(new Random().nextInt(10))
            .subscribe(new SingleObserver<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "dispose");
                }
    
                @Override
                public void onSuccess(Integer integer) {
                    Log.d(TAG, integer + "");
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, e.getMessage());
                }
            });
    

    輸出結(jié)果:

    01-31 14:47:59.949 22093-22093/com.dxl.myapplication D/dxl: dispose
    01-31 14:47:59.949 22093-22093/com.dxl.myapplication D/dxl: 5
    
  1. debounce
    去除發(fā)送頻率過快的項

    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    // send events with simulated time wait
                    emitter.onNext(1); // skip
                    Thread.sleep(400);
                    emitter.onNext(2); // deliver
                    Thread.sleep(505);
                    emitter.onNext(3); // skip
                    Thread.sleep(100);
                    emitter.onNext(4); // deliver
                    Thread.sleep(605);
                    emitter.onNext(5); // deliver
                    Thread.sleep(510);
                    emitter.onComplete();
                }
            }).debounce(500, TimeUnit.MILLISECONDS)
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            Log.d(TAG, integer + "");
                        }
                    });
    

    設(shè)置的時間間隔是500ms, 發(fā)送1之后,400ms后發(fā)送2,所以1被舍棄。依次類推。

    最后輸出結(jié)果:

    01-31 14:53:50.290 22568-22601/com.dxl.myapplication D/dxl: 2
    01-31 14:53:50.901 22568-22601/com.dxl.myapplication D/dxl: 4
    01-31 14:53:51.502 22568-22601/com.dxl.myapplication D/dxl: 5
    
  2. defer
    簡單地時候就是每次訂閱都會創(chuàng)建一個新的 Observable,并且如果沒有被訂閱,就不會產(chǎn)生新的 Observable。

    Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<Integer>>() {
                @Override
                public ObservableSource<Integer> call() throws Exception {
                    return Observable.create(new ObservableOnSubscribe<Integer>() {
                        @Override
                        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                            emitter.onNext(1);
                            Log.d(TAG, "1");
                            emitter.onNext(2);
                            Log.d(TAG, "2");
                            emitter.onNext(3);
                            Log.d(TAG, "3");
                            emitter.onNext(4);
                            Log.d(TAG, "4");
                            emitter.onComplete();
                        }
                    });
                }
            });
    
     observable.subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, "accept : " + integer);
                }
            });
    

    輸出結(jié)果:

    01-31 15:03:28.124 23895-23895/com.dxl.myapplication D/dxl: accept : 1
    01-31 15:03:28.124 23895-23895/com.dxl.myapplication D/dxl: 1
    01-31 15:03:28.124 23895-23895/com.dxl.myapplication D/dxl: accept : 2
    01-31 15:03:28.124 23895-23895/com.dxl.myapplication D/dxl: 2
    01-31 15:03:28.124 23895-23895/com.dxl.myapplication D/dxl: accept : 3
    01-31 15:03:28.124 23895-23895/com.dxl.myapplication D/dxl: 3
    01-31 15:03:28.124 23895-23895/com.dxl.myapplication D/dxl: accept : 4
    01-31 15:03:28.124 23895-23895/com.dxl.myapplication D/dxl: 4
    
  3. last
    僅取出可觀察到的最后一個值,或者是滿足某些條件的最后一項。(參數(shù)表示默認值,如果沒有發(fā)送的數(shù)據(jù),取默認值)

    Observable.just(1,2,3,4).last(2).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, integer + "");
                }
            });
    

    輸出結(jié)果為4.

    如果改為:

    Observable.just(1,2,3,4).skip(5).last(2).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, integer + "");
                }
            });
    

    這時候全部跳過,沒有要發(fā)送的數(shù)據(jù),返回默認值2

  4. merge
    作用是把多個 Observable 結(jié)合起來,接受可變參數(shù),也支持迭代器集合。注意它和 concat 的區(qū)別在于,不用等到 發(fā)射器 A 發(fā)送完所有的事件再進行發(fā)射器 B 的發(fā)送。
    操作符每次用一個方法處理一個值
    例如

    Observable.just(1, 2, 3, 4, 5).reduce(new BiFunction<Integer, Integer, Integer>() {
                @Override
                public Integer apply(Integer integer, Integer integer2) throws Exception {
                    Log.d(TAG, "integer : " + integer + ", integer2 : " + integer2);
                    return integer + integer2;
    
                }
            }).subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    Log.d(TAG, integer + "");
                }
            });
    

    第一次先取1,2,進行求和得到3,第二次利用求和得到的3與下一個3進行運算,得到6,依次類推。

    最后輸出結(jié)果:

    01-31 15:36:24.452 27942-27942/com.dxl.myapplication D/dxl: integer : 1, integer2 : 2
    01-31 15:36:24.452 27942-27942/com.dxl.myapplication D/dxl: integer : 3, integer2 : 3
    01-31 15:36:24.452 27942-27942/com.dxl.myapplication D/dxl: integer : 6, integer2 : 4
    01-31 15:36:24.452 27942-27942/com.dxl.myapplication D/dxl: integer : 10, integer2 : 5
    01-31 15:36:24.452 27942-27942/com.dxl.myapplication D/dxl: 15
    
  5. scan
    和reduce相似,但是reduce只輸出最后的結(jié)果,scan會輸出過程。

    例如上面的代碼reduce改為scan,輸出結(jié)果如下:

    01-31 15:50:17.795 28628-28628/com.dxl.myapplication D/dxl: 1
    01-31 15:50:17.795 28628-28628/com.dxl.myapplication D/dxl: integer : 1, integer2 : 2
    01-31 15:50:17.795 28628-28628/com.dxl.myapplication D/dxl: 3
    01-31 15:50:17.795 28628-28628/com.dxl.myapplication D/dxl: integer : 3, integer2 : 3
    01-31 15:50:17.795 28628-28628/com.dxl.myapplication D/dxl: 6
    01-31 15:50:17.795 28628-28628/com.dxl.myapplication D/dxl: integer : 6, integer2 : 4
    01-31 15:50:17.795 28628-28628/com.dxl.myapplication D/dxl: 10
    01-31 15:50:17.795 28628-28628/com.dxl.myapplication D/dxl: integer : 10, integer2 : 5
    01-31 15:50:17.795 28628-28628/com.dxl.myapplication D/dxl: 15
    
  6. 實例

    url: http://gank.io/api/xiandu/categories

Observable.create(new ObservableOnSubscribe<Response>() {
            @Override
            public void subscribe(ObservableEmitter<Response> emitter) throws Exception {
                Log.d(TAG, "create : " + Thread.currentThread().getName());
                Request request = new Request.Builder().url("http://gank.io/api/xiandu/categories").build();
                Response response = new OkHttpClient().newCall(request).execute();
                if (response.isSuccessful()) {
                    emitter.onNext(response);
                } else {
                    emitter.onError(new Exception(response.message()));
                }
                emitter.onComplete();
            }
        })
                //指定map的操作線程
                .observeOn(Schedulers.computation())
                .map(new Function<Response, Category>() {
                    @Override
                    public Category apply(Response response) throws Exception {
                        Log.d(TAG, "map : " + Thread.currentThread().getName());
                        ResponseBody responseBody = response.body();
                        Category category = new Gson().fromJson(responseBody.string(), Category.class);
                        return category;
                    }
                })
                //指定doOnNext的線程
                .observeOn(AndroidSchedulers.mainThread())
                .doOnNext(new Consumer<Category>() {
                    @Override
                    public void accept(Category category) throws Exception {
                        Log.d(TAG, "doOnNext1 : " + Thread.currentThread().getName());
                    }
                })
                //指定第二次doOnNext的線程
                .observeOn(Schedulers.io())
                .doOnNext(new Consumer<Category>() {
                    @Override
                    public void accept(Category category) throws Exception {
                        Log.d(TAG, "doOnNext2 : " + Thread.currentThread().getName());
                    }
                })
                //指定事件產(chǎn)生的線程
                .subscribeOn(Schedulers.io())
                //指定事件消費線程
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Category>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Category category) {
                        Log.d(TAG, "subscribe : " + Thread.currentThread().getName());
                        Log.d(TAG, category.toString());
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, e.toString());
                    }

                    @Override
                    public void onComplete() {

                    }
                });

    /**
     * 實體類
     * @author dxl
     * @date 2019/1/31 15:57
     */
    public class Category {

        private boolean error;
        public List<Results> results;

        public class Results {

            public String _id;
            public String en_name;
            public String name;
            public int rank;

            @Override
            public String toString() {
                return "Results{" +
                        "_id='" + _id + '\'' +
                        ", en_name='" + en_name + '\'' +
                        ", name='" + name + '\'' +
                        ", rank=" + rank +
                        '}';
            }
        }

        @Override
        public String toString() {
            return "Category{" +
                    "error=" + error +
                    ", results=" + results.toString() +
                    '}';
        }
    }

輸出結(jié)果:

01-31 16:51:19.458 6151-6235/com.dxl.myapplication D/dxl: create : RxCachedThreadScheduler-1
01-31 16:51:19.738 6151-6151/com.dxl.myapplication D/dxl: map : main
01-31 16:51:19.778 6151-6151/com.dxl.myapplication D/dxl: doOnNext1 : main
01-31 16:51:19.788 6151-6282/com.dxl.myapplication D/dxl: doOnNext2 : RxCachedThreadScheduler-2
01-31 16:51:19.788 6151-6151/com.dxl.myapplication D/dxl: subscribe : main
01-31 16:51:19.788 6151-6151/com.dxl.myapplication D/dxl: Category{error=false, results=[Results{_id='57c83777421aa97cbd81c74d', en_name='wow', name='科技資訊', rank=1}, Results{_id='57c83577421aa97cb162d8b1', en_name='apps', name='趣味軟件/游戲', rank=5}, Results{_id='57c83627421aa97cbd81c74b', en_name='imrich', name='裝備黨', rank=50}, Results{_id='57c836b4421aa97cbd81c74c', en_name='funny', name='草根新聞', rank=100}, Results{_id='5827dc81421aa911e32d87cc', en_name='android', name='Android', rank=300}, Results{_id='582c5346421aa95002741a8e', en_name='diediedie', name='創(chuàng)業(yè)新聞', rank=340}, Results{_id='5829c2bc421aa911e32d87e7', en_name='thinking', name='獨立思想', rank=400}, Results{_id='5827dd7b421aa911d3bb7eca', en_name='iOS', name='iOS', rank=500}, Results{_id='5829b881421aa911dbc9156b', en_name='teamblog', name='團隊博客', rank=600}]}

  • subscribeOn事件產(chǎn)生的線程只能指定一次, observeOn可以指定多次。
  1. 線程調(diào)度

    subScribeOn

    subscribeOn 用于指定 subscribe() 時所發(fā)生的線程

    observeOn

    observeOn 方法用于指定下游 Observer 回調(diào)發(fā)生的線程。

    線程切換需要注意的

    RxJava 內(nèi)置的線程調(diào)度器的確可以讓我們的線程切換得心應(yīng)手,但其中也有些需要注意的地方。

    • 簡單地說,subscribeOn() 指定的就是發(fā)射事件的線程,observerOn 指定的就是訂閱者接收事件的線程。
    • 多次指定發(fā)射事件的線程只有第一次指定的有效,也就是說多次調(diào)用 subscribeOn() 只有第一次的有效,其余的會被忽略。
    • 但多次指定訂閱者接收線程是可以的,也就是說每調(diào)用一次 observerOn(),下游的線程就會切換一次。
    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                    Log.e(TAG, "Observable thread is : " + Thread.currentThread().getName());
                    e.onNext(1);
                    e.onComplete();
                }
            }).subscribeOn(Schedulers.newThread())
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .doOnNext(new Consumer<Integer>() {
                        @Override
                        public void accept(@NonNull Integer integer) throws Exception {
                            Log.e(TAG, "After observeOn(mainThread),Current thread is " + Thread.currentThread().getName());
                        }
                    })
                    .observeOn(Schedulers.io())
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(@NonNull Integer integer) throws Exception {
                            Log.e(TAG, "After observeOn(io),Current thread is " + Thread.currentThread().getName());
                        }
                    });
    
    07-03 14:54:01.177 15121-15438/com.nanchen.rxjava2examples E/RxThreadActivity: Observable thread is : RxNewThreadScheduler-1
    07-03 14:54:01.178 15121-15121/com.nanchen.rxjava2examples E/RxThreadActivity: After observeOn(mainThread),Current thread is main
    07-03 14:54:01.179 15121-15439/com.nanchen.rxjava2examples E/RxThreadActivity: After observeOn(io),Current thread is RxCachedThreadScheduler-2
    

    實例代碼中,分別用 Schedulers.newThread()Schedulers.io() 對發(fā)射線程進行切換,并采用 observeOn(AndroidSchedulers.mainThread()Schedulers.io() 進行了接收線程的切換??梢钥吹捷敵鲋邪l(fā)射線程僅僅響應(yīng)了第一個 newThread,但每調(diào)用一次 observeOn() ,線程便會切換一次,因此如果我們有類似的需求時,便知道如何處理了。

    RxJava 中,已經(jīng)內(nèi)置了很多線程選項供我們選擇,例如有:

    • Schedulers.io() 代表io操作的線程, 通常用于網(wǎng)絡(luò),讀寫文件等io密集型的操作;
    • Schedulers.computation() 代表CPU計算密集型的操作, 例如需要大量計算的操作;
    • Schedulers.newThread() 代表一個常規(guī)的新線程;
    • AndroidSchedulers.mainThread() 代表Android的主線程

    這些內(nèi)置的 Scheduler 已經(jīng)足夠滿足我們開發(fā)的需求,因此我們應(yīng)該使用內(nèi)置的這些選項,而 RxJava 內(nèi)部使用的是線程池來維護這些線程,所以效率也比較高。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容