part05_Rxjava操作符

作者:IT魔幻師
博客:www.huyingzi.top
轉(zhuǎn)載請注明出處:http://www.itdecent.cn/p/afeba5aea533


一、創(chuàng)建型操作符

主要用于創(chuàng)建被觀察者

  • just
    create的快捷創(chuàng)建操作,create操作符必須手動調(diào)用onNext才能觸發(fā)事件,just會自動觸發(fā)

    @Test
     public void testjust() {
         //just是create的快捷創(chuàng)建操作
         Observable.just("我是你爸爸","我是你爸爸2").subscribe(new Observer<String>() {
             @Override
             public void onSubscribe(Disposable d) {
             }
    
             @Override
             public void onNext(String s) {
                 //此處會依次收到j(luò)ust參數(shù)傳遞過來的值
             }
    
             @Override
             public void onError(Throwable e) {
    
             }
    
             @Override
             public void onComplete() {
    
             }
         });
     }
    
  • fromArray
    相比于just,fromArray適用于多參數(shù)的情況.

      @Test
      public void testFromArray() {
          Observable.fromArray(new String[]{"我是你爸爸",
                  "我是你爸爸2",
                  "我是你爸爸3",
                  "我是你爸爸4"}).subscribe(new Observer<String>() {
              @Override
              public void onSubscribe(Disposable d) {
              }
    
              @Override
              public void onNext(String s) {
                  System.out.println("onNext  "+s);
              }
    
              @Override
              public void onError(Throwable e) {
              }
    
              @Override
              public void onComplete() {
              }
          });
      }
    
  • range
    創(chuàng)建在一定范圍內(nèi)的事件

    @Test
      public void testRange() {
          //從5開始執(zhí)行11次事件
          Observable.range(5,11).subscribe(new Observer<Integer>() {
              @Override
              public void onSubscribe(Disposable d) {
              }
              @Override
              public void onNext(Integer integer) {
                  System.out.println(integer);
              }
              @Override
              public void onError(Throwable e) {
              }
              @Override
              public void onComplete() {
              }
          });
      }
    
  • empty
    主要適用于調(diào)用后不需要返回參數(shù)只需要關(guān)心結(jié)果,如:發(fā)起網(wǎng)絡(luò)請求后在onComplete()中處理結(jié)果即可他不會回調(diào)onNext函數(shù).

    
      @Test
      public void testempty() {
          Observable.empty().subscribe(new Observer<Object>() {
              @Override
              public void onSubscribe(Disposable d) {
              }
    
              @Override
              public void onNext(Object o) {
              }
    
              @Override
              public void onError(Throwable e) {
              }
    
              @Override
              public void onComplete() {
                  System.out.println("執(zhí)行結(jié)束");
              }
          });
      }
    
  • interval
    定時器操作符,需要依賴Android的api不能在純java環(huán)境下使用

    //每隔1單位秒的時間執(zhí)行一次
    Observable.interval(1, TimeUnit.SECONDS);
    
  • intervalRange
    定時器操作符,需要依賴Android的api不能在純java環(huán)境下使用

     //從0開始每隔1000毫秒發(fā)送50個事件  初始延時0
          Observable.intervalRange(0,50,0,1000, TimeUnit.MILLISECONDS).subscribe(new Consumer<Long>() {
              @Override
              public void accept(Long aLong) throws Exception {
                  System.out.println(aLong);
              }
          });
    
  • timer
    跟interval一樣.

二、轉(zhuǎn)換操作符

將事件類型轉(zhuǎn)換成我們想要的結(jié)果

  • map
     @Test
      public void testMap() {
          //場景:根據(jù)圖片地址最終轉(zhuǎn)換成bitmap
          Observable.just("icon01.png","icon02.png").map(new Function<String, Bitmap>() {
              @Override
              public Bitmap apply(String url) throws Exception {
                  //在此次模擬執(zhí)行網(wǎng)絡(luò)請求等操作
                  // ...  此處省略
                  Bitmap mBitmap = Bitmap.createBitmap(200,200, Bitmap.Config.ARGB_8888);
                  return mBitmap;
                  
              }
          }).subscribe(new Observer<Bitmap>() {
              @Override
              public void onSubscribe(Disposable d) {
                  
              }
    
              @Override
              public void onNext(Bitmap bitmap) {
                  //在此次就可以 以此得到請求到的圖片 
                  System.out.println("得到結(jié)果:"+bitmap);
              }
    
              @Override
              public void onError(Throwable e) {
    
              }
    
              @Override
              public void onComplete() {
    
              }
          });
      }
    
    
  • flatMap
    在上一個事件完成后才能開始下一個事件的情況
    @Test
      public void testFlatMap() {
          //比如:token過期了 必須先請求一個token 再進(jìn)行登錄請求
          Observable.just("getToken","login").flatMap(new Function<String, ObservableSource<?>>() {
              @Override
              public ObservableSource<?> apply(String s) throws Exception {
                  System.out.println("執(zhí)行事件:"+s);
                  return createRespone(s);
              }
          }).subscribe(new Observer<Object>() {
              @Override
              public void onSubscribe(Disposable d) {
              }
    
              @Override
              public void onNext(Object o) {
                  //依次回調(diào)處理結(jié)果
                  System.out.println(o);
    
              }
    
              @Override
              public void onError(Throwable e) {
              }
    
              @Override
              public void onComplete() {
              }
          });
    
      }
    
      private ObservableSource<?> createRespone(final String s) {
          //根據(jù)請求再創(chuàng)建一個被觀察者,觀察上一個請求是否成功了
          return Observable.create(new ObservableOnSubscribe<String>() {
              @Override
              public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                  System.out.println("上一個事件已經(jīng)執(zhí)行完成開始執(zhí)行此事件:"+s);
                  //此處是基于getToken完成之后才會執(zhí)行
                  emitter.onNext(s);
              }
          });
      }
    
  • groupBy
    對傳入的事件進(jìn)行分組,分組的條件可以自己指定
    @Test
      public void testGroupBy() {
          Observable.just(1,2,3,4).groupBy(new Function<Integer, String>() {
              @Override
              public String apply(Integer integer) throws Exception {
                  return integer>2?"A組":"B組";
              }
          }).subscribe(new Consumer<GroupedObservable<String, Integer>>() {
              @Override
              public void accept(final GroupedObservable<String, Integer> stringIntegerGroupedObservable)
                      throws Exception {
                  //stringIntegerGroupedObservable 是一個分組后的被觀察者
                  stringIntegerGroupedObservable.subscribe(new Consumer<Integer>() {
                      @Override
                      public void accept(Integer integer) throws Exception {
                          String key = stringIntegerGroupedObservable.getKey();
                          System.out.println("key="+key+" "+integer);
                      }
                  });
              }
          });
      }
    
  • buffer
    大批量數(shù)據(jù)需要處理的時候,對其進(jìn)行分批次處理
     @Test
      public void testBuffer() {
          //將6條數(shù)據(jù)每2條分一個組執(zhí)行
          Observable.just(1,2,3,4,5,6).buffer(2).subscribe(new Observer<List<Integer>>() {
              @Override
              public void onSubscribe(Disposable d) {
    
              }
    
              @Override
              public void onNext(List<Integer> integers) {
                  //以此回調(diào)每個組的數(shù)據(jù)
                  System.out.println(integers);
              }
    
              @Override
              public void onError(Throwable e) {
    
              }
    
              @Override
              public void onComplete() {
    
              }
          });
      }
    
  • range
    上一個結(jié)果作為下一個參數(shù),所有的結(jié)果累加得到最終結(jié)果,文件合并或者字符串拼接等場景.
      @Test
      public void testScan() {
          Observable.range(1,5).scan(new BiFunction<Integer, Integer, Integer>() {
              @Override
              public Integer apply(Integer integer, Integer integer2) throws Exception {
                  //第一個參數(shù)integer為之前所有結(jié)果的和  就是累加的形式
                  //相當(dāng)于 第一個文件跟第二個文件合并,合并后的結(jié)果跟第三個文件合并...最終合并成一個大文件
                  return integer+integer2;
              }
          }).subscribe(new Observer<Integer>() {
              @Override
              public void onSubscribe(Disposable d) {
    
              }
    
              @Override
              public void onNext(Integer integer) {
                  System.out.println(integer);
              }
    
              @Override
              public void onError(Throwable e) {
    
              }
    
              @Override
              public void onComplete() {
    
              }
          });
      }
    

三、過濾操作符

  • filter
    對事件進(jìn)行過濾或者不過濾的處理
    @Test
      public void testFilter() {
          Observable.just(1,2,3,4,5,6).filter(new Predicate<Integer>() {
              @Override
              public boolean test(Integer integer) throws Exception {
                  //此次決定是否過濾
                  //true 不過濾
                  //false 過濾掉不計(jì)入結(jié)果中
                  return integer>2;
              }
          }).subscribe(new Observer<Integer>() {
              @Override
              public void onSubscribe(Disposable d) {
    
              }
    
              @Override
              public void onNext(Integer integer) {
                  //接受過濾后的結(jié)果 
                  System.out.println(integer);
              }
    
              @Override
              public void onError(Throwable e) {
    
              }
    
              @Override
              public void onComplete() {
    
              }
          });
      }
    
  • take
    限制產(chǎn)生事件的數(shù)量
        @Test
      public void testTake() {
          //每隔1單位秒的時間執(zhí)行一次 take限制只執(zhí)行5次
          Observable.interval(1, TimeUnit.SECONDS).take(5).subscribe(new Observer<Long>() {
              @Override
              public void onSubscribe(Disposable d) {
              }
    
              @Override
              public void onNext(Long aLong) {
                  System.out.println(aLong+"");
                  
              }
    
              @Override
              public void onError(Throwable e) {
              }
    
              @Override
              public void onComplete() {
              }
          });
      }
    
  • distinct
    過濾重復(fù)事件
        @Test
      public void testDistinct() {
          Observable.just(1,2,2,2,3,3,6,6,7).distinct().subscribe(new Observer<Integer>() {
              @Override
              public void onSubscribe(Disposable d) {
              }
    
              @Override
              public void onNext(Integer integer) {
                  System.out.println(integer);
              }
    
              @Override
              public void onError(Throwable e) {
              }
    
              @Override
              public void onComplete() {
              }
          });
      }
    
  • elementAt
    過濾指定的事件
          //指定過慮出第5個事件
          Observable.just(1,2,2,2,3,3,6,6,7).elementAt(5).subscribe(new Consumer<Integer>() {
              @Override
              public void accept(Integer integer) throws Exception {
                  System.out.println(integer);
              }
          });
    

四、條件操作符

  • all
    判斷所有事件是否滿足一個條件,如果全部滿足則為true
     Observable.just(1,2,3,4,5,6).all(new Predicate<Integer>() {
              @Override
              public boolean test(Integer integer) throws Exception {
                  //所有的事件都大于2嗎
                  return integer>2;
              }
          }).subscribe(new Consumer<Boolean>() {
              @Override
              public void accept(Boolean aBoolean) throws Exception {
                  //此次返回時間結(jié)果
                  System.out.println(aBoolean);
              }
          });
    
  • contains
    判斷所有事件中是否包含某項(xiàng)事件
     Observable.just(1,2,3,4,5).contains(3).subscribe(new Consumer<Boolean>() {
             @Override
             public void accept(Boolean aBoolean) throws Exception {
                 //此處返回是否包含3的結(jié)果
                 System.out.println(aBoolean);
             }
         });
    
  • any
    所有事件中只要有有一個符合條件即為true
    Observable.just(1,2,3,4,5).any(new Predicate<Integer>() {
              @Override
              public boolean test(Integer integer) throws Exception {
                  return integer==3;
              }
          }).subscribe(new Consumer<Boolean>() {
              @Override
              public void accept(Boolean aBoolean) throws Exception {
                  System.out.println(aBoolean);
              }
          });
    
  • isEmpty
    判斷一個觀察者是否有事件
     Observable.just(1).isEmpty().subscribe(new Consumer<Boolean>() {
              @Override
              public void accept(Boolean aBoolean) throws Exception {
                  //有事件返回true  空事件返回false
                  System.out.println(aBoolean);
              }
          });
    
  • defaultIfEmpty
    如果被觀察者不發(fā)送任何事件,則會發(fā)送默認(rèn)事件
    .defaultIfEmpty(0)
    
  • skipWhile
    跳過滿足條件的事件
             //從0開始每隔1000毫秒發(fā)送50個事件  初始延時0  
            Observable.intervalRange(0,50,0,1000, TimeUnit.MILLISECONDS).skipWhile(new Predicate<Long>() {
              @Override
              public boolean test(Long aLong) throws Exception {
                  //跳過<10的事件
                  return aLong<10;
              }
          }).subscribe(new Consumer<Long>() {
              @Override
              public void accept(Long aLong) throws Exception {
                  System.out.println(aLong);
              }
          });
    

五、合并操作符

將被觀察者進(jìn)行合并

  • startWith
    把需要的事件合并成一個事件進(jìn)行處理,會先處理startWith添加的事件
            //把需要的事件合并成一個事件進(jìn)行處理,會先處理2,4,6,8的事件
          Observable.just(1,3,5,7).startWith(Observable.just(2,4,6,8))
          .subscribe(new Consumer<Integer>() {
              @Override
              public void accept(Integer integer) throws Exception {
                  System.out.println(integer);
              }
          });
    
  • concat
    合并最多4個事件 以先來后到的順序進(jìn)行處理,跟startWith相反。
            //合并兩個事件 123 會優(yōu)先處理
          Observable.concat(
                  Observable.just(1,2,3),
                  Observable.just(4,5,6))
          .subscribe(new Consumer<Integer>() {
              @Override
              public void accept(Integer integer) throws Exception {
                  System.out.println(integer);
              }
          });
    
  • merge
    merge合并多個被觀察者,合并之后按照時間順序并行執(zhí)行
            Flowable observable1 = Flowable.intervalRange(0,4,1,500,TimeUnit.MILLISECONDS);
          Flowable observable2 = Flowable.intervalRange(10,4,1,500,TimeUnit.MILLISECONDS);
          Flowable observable3 = Flowable.intervalRange(20,4,1,500,TimeUnit.MILLISECONDS);
    
          Flowable.merge(observable2,observable3,observable1).subscribe(new Consumer() {
              @Override
              public void accept(Object o) throws Exception {
                  System.out.println(o);
              }
          });
    
  • mergeDelayError
    延遲拋出異常事件,當(dāng)合并的其它事件都執(zhí)行完成之后再拋出異常
    //延遲拋出異常事件
          Flowable observable1 = Flowable.create(new FlowableOnSubscribe<Publisher<?>>() {
              @Override
              public void subscribe(FlowableEmitter<Publisher<?>> emitter) throws Exception {
                  //假設(shè)此處發(fā)生了異常
                  emitter.onError(new NullPointerException());
              }
          }, BackpressureStrategy.BUFFER);
          Flowable observable2 = Flowable.intervalRange(10,4,1,500,TimeUnit.MILLISECONDS);
    
    
          Flowable.mergeDelayError(observable1,observable2).subscribe(new Consumer() {
              @Override
              public void accept(Object o) throws Exception {
                  System.out.println(o);
              }
          });
    
  • zip
    將多個被觀察者壓縮成單個,輸出事件最少的被觀察者結(jié)果
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容