RxJava操作符大全

作者: maplejaw
本篇只解析標準包中的操作符。對于擴展包,由于使用率較低,如有需求,請讀者自行查閱文檔。

創(chuàng)建操作

以下操作符用于創(chuàng)建Observable。

  • create: 使用OnSubscribe從頭創(chuàng)建一個Observable,這種方法比較簡單。需要注意的是,使用該方法創(chuàng)建時,建議在OnSubscribe#call方法中檢查訂閱狀態(tài),以便及時停止發(fā)射數(shù)據(jù)或者運算。

    
        Observable.create(new Observable.OnSubscribe<String>() {
    
            @Override
            public void call(Subscriber<? super String> subscriber) {
    
                subscriber.onNext("item1");
                subscriber.onNext("item2");
                subscriber.onCompleted();
            }
        });
    
  • from: 將一個Iterable, 一個Future, 或者一個數(shù)組,內(nèi)部通過代理的方式轉(zhuǎn)換成一個Observable。Future轉(zhuǎn)換為OnSubscribe是通過OnSubscribeToObservableFuture進行的,Iterable轉(zhuǎn)換通過OnSubscribeFromIterable進行。數(shù)組通過OnSubscribeFromArray轉(zhuǎn)換。

    Observable#from
    Observable#from

        //Iterable
        List<String> list=new ArrayList<>();
        ...
        Observable.from(list)
                .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                
            }
        });
        
        //Future
         Future<String> futrue= Executors.newSingleThreadExecutor().submit(new Callable<String>() {
    
            @Override
            public String call() throws Exception {
                Thread.sleep(1000);
                return "maplejaw";
            }
        });
    
        Observable.from(futrue)
                  .subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                
            }
        });
    

;
```

  • just: 將一個或多個對象轉(zhuǎn)換成發(fā)射這個或這些對象的一個Observable。如果是單個對象,內(nèi)部創(chuàng)建的是ScalarSynchronousObservable對象。如果是多個對象,則是調(diào)用了from方法創(chuàng)建。

  • empty: 創(chuàng)建一個什么都不做直接通知完成的Observable

  • error: 創(chuàng)建一個什么都不做直接通知錯誤的Observable

  • never: 創(chuàng)建一個什么都不做的Observable

        Observable observable1=Observable.empty();//直接調(diào)用onCompleted。
        Observable observable2=Observable.error(new RuntimeException());//直接調(diào)用onError。這里可以自定義異常
        Observable observable3=Observable.never();//啥都不做
    
  • timer: 創(chuàng)建一個在給定的延時之后發(fā)射數(shù)據(jù)項為0的Observable<Long>,內(nèi)部通過OnSubscribeTimerOnce工作

     Observable.timer(1000,TimeUnit.MILLISECONDS)
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                        Log.d("JG",aLong.toString()); // 0
                    }
                });
    
  • interval: 創(chuàng)建一個按照給定的時間間隔發(fā)射從0開始的整數(shù)序列的Observable<Long>,內(nèi)部通過OnSubscribeTimerPeriodically工作。

      Observable.interval(1, TimeUnit.SECONDS)
                .subscribe(new Action1<Long>() {
                    @Override
                    public void call(Long aLong) {
                         //每隔1秒發(fā)送數(shù)據(jù)項,從0開始計數(shù)
                         //0,1,2,3....
                    }
                });
    
  • range: 創(chuàng)建一個發(fā)射指定范圍的整數(shù)序列的Observable<Integer>

     Observable.range(2,5).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d("JG",integer.toString());// 2,3,4,5,6 從2開始發(fā)射5個數(shù)據(jù)
            }
        });
    
  • defer: 只有當訂閱者訂閱才創(chuàng)建Observable,為每個訂閱創(chuàng)建一個新的Observable。內(nèi)部通過OnSubscribeDefer在訂閱時調(diào)用Func0創(chuàng)建Observable。

      Observable.defer(new Func0<Observable<String>>() {
            @Override
            public Observable<String> call() {
                return Observable.just("hello");
            }
        }).subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                Log.d("JG",s);
            }
        });
    

合并操作

以下操作符用于組合多個Observable。

注意,為了使結(jié)構(gòu)更加清晰以及縮小代碼量,之后的例子部分地方將會使用Lambda表達式書寫,如果你對Lambda表達式不太熟悉的話,可以閱讀JAVA8 Lambda表達式完全解析這篇文章。

  • concat: 按順序連接多個Observables。需要注意的是Observable.concat(a,b)等價于a.concatWith(b)。

        Observable<Integer> observable1=Observable.just(1,2,3,4);
        Observable<Integer>  observable2=Observable.just(4,5,6);
    
        Observable.concat(observable1,observable2)
                .subscribe(item->Log.d("JG",item.toString()));//1,2,3,4,4,5,6
    
  • startWith: 在數(shù)據(jù)序列的開頭增加一項數(shù)據(jù)。startWith的內(nèi)部也是調(diào)用了concat

     Observable.just(1,2,3,4,5)
                .startWith(6,7,8)
        .subscribe(item->Log.d("JG",item.toString()));//6,7,8,1,2,3,4,5
    
  • merge: 將多個Observable合并為一個。不同于concat,merge不是按照添加順序連接,而是按照時間線來連接。其中mergeDelayError將異常延遲到其它沒有錯誤的Observable發(fā)送完畢后才發(fā)射。而merge則是一遇到異常將停止發(fā)射數(shù)據(jù),發(fā)送onError通知。

    merge流程圖

  • zip: 使用一個函數(shù)組合多個Observable發(fā)射的數(shù)據(jù)集合,然后再發(fā)射這個結(jié)果。如果多個Observable發(fā)射的數(shù)據(jù)量不一樣,則以最少的Observable為標準進行壓合。內(nèi)部通過OperatorZip進行壓合。

    Observable<Integer>  observable1=Observable.just(1,2,3,4);
    Observable<Integer>  observable2=Observable.just(4,5,6);
        Observable.zip(observable1, observable2, new Func2<Integer, Integer, String>() {
            @Override
            public String call(Integer item1, Integer item2) {
                return item1+"and"+item2;
            }
        })
        .subscribe(item->Log.d("JG",item)); //1and4,2and5,3and6
    
  • combineLatest: 。當兩個Observables中的任何一個發(fā)射了一個數(shù)據(jù)時,通過一個指定的函數(shù)組合每個Observable發(fā)射的最新數(shù)據(jù)(一共兩個數(shù)據(jù)),然后發(fā)射這個函數(shù)的結(jié)果。類似于zip,但是,不同的是zip只有在每個Observable都發(fā)射了數(shù)據(jù)才工作,而combineLatest任何一個發(fā)射了數(shù)據(jù)都可以工作,每次與另一個Observable最近的數(shù)據(jù)壓合。具體請看下面流程圖。
    zip工作流程

    zip流程圖

    combineLatest工作流程


    combineLatest流程

過濾操作

  • filter: 過濾數(shù)據(jù)。內(nèi)部通過OnSubscribeFilter過濾數(shù)據(jù)。

      Observable.just(3,4,5,6)
                .filter(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer>4;
                    }
                })
        .subscribe(item->Log.d("JG",item.toString())); //5,6 
    
  • ofType: 過濾指定類型的數(shù)據(jù),與filter類似,

    Observable.just(1,2,"3")
                .ofType(Integer.class)
                .subscribe(item -> Log.d("JG",item.toString()));
    
  • take: 只發(fā)射開始的N項數(shù)據(jù)或者一定時間內(nèi)的數(shù)據(jù)。內(nèi)部通過OperatorTakeOperatorTakeTimed過濾數(shù)據(jù)。

      Observable.just(3,4,5,6)
                .take(3)//發(fā)射前三個數(shù)據(jù)項
                .take(100, TimeUnit.MILLISECONDS)//發(fā)射100ms內(nèi)的數(shù)據(jù)
    
  • takeLast: 只發(fā)射最后的N項數(shù)據(jù)或者一定時間內(nèi)的數(shù)據(jù)。內(nèi)部通過OperatorTakeLastOperatorTakeLastTimed過濾數(shù)據(jù)。takeLastBuffer和takeLast類似,不同點在于takeLastBuffer會收集成List后發(fā)射。

     Observable.just(3,4,5,6)
                .takeLast(3)
                .subscribe(integer -> Log.d("JG",integer.toString()));//4,5,6
    
  • takeFirst:提取滿足條件的第一項。內(nèi)部實現(xiàn)源碼如下:

    public final Observable<T> takeFirst(Func1<? super T, Boolean> predicate) {
          return filter(predicate).take(1); //先過濾,后提取
    }
    
  • first/firstOrDefault:只發(fā)射第一項(或者滿足某個條件的第一項)數(shù)據(jù),可以指定默認值。

     Observable.just(3,4,5,6)
                .first()
                .subscribe(integer -> Log.d("JG",integer.toString()));//3
                
        Observable.just(3,4,5,6)
                   .first(new Func1<Integer, Boolean>() {
                       @Override
                       public Boolean call(Integer integer) {
                           return integer>3;
                       }
                   }) .subscribe(integer -> Log.d("JG",integer.toString()));//4
    
  • last/lastOrDefault:只發(fā)射最后一項(或者滿足某個條件的最后一項)數(shù)據(jù),可以指定默認值。

  • skip:跳過開始的N項數(shù)據(jù)或者一定時間內(nèi)的數(shù)據(jù)。內(nèi)部通過OperatorSkipOperatorSkipTimed實現(xiàn)過濾。

      Observable.just(3,4,5,6)
                   .skip(1)
                .subscribe(integer -> Log.d("JG",integer.toString()));//4,5,6
    
  • skipLast:跳過最后的N項數(shù)據(jù)或者一定時間內(nèi)的數(shù)據(jù)。內(nèi)部通過OperatorSkipLastOperatorSkipLastTimed實現(xiàn)過濾。

  • elementAt/elementAtOrDefault:發(fā)射某一項數(shù)據(jù),如果超過了范圍可以的指定默認值。內(nèi)部通過OperatorElementAt過濾。

        Observable.just(3,4,5,6)
                 .elementAt(2)
        .subscribe(item->Log.d("JG",item.toString())); //5
  • ignoreElements:丟棄所有數(shù)據(jù),只發(fā)射錯誤或正常終止的通知。內(nèi)部通過OperatorIgnoreElements實現(xiàn)。

  • distinct:過濾重復數(shù)據(jù),內(nèi)部通過OperatorDistinct實現(xiàn)。

     Observable.just(3,4,5,6,3,3,4,9)
           .distinct()
          .subscribe(item->Log.d("JG",item.toString())); //3,4,5,6,9
    
  • distinctUntilChanged:過濾掉連續(xù)重復的數(shù)據(jù)。內(nèi)部通過OperatorDistinctUntilChanged實現(xiàn)

     Observable.just(3,4,5,6,3,3,4,9)
           .distinctUntilChanged()
          .subscribe(item->Log.d("JG",item.toString())); //3,4,5,6,3,4,9
    
  • throttleFirst:定期發(fā)射Observable發(fā)射的第一項數(shù)據(jù)。內(nèi)部通過OperatorThrottleFirst實現(xiàn)。

    Observable.create(subscriber -> {
            subscriber.onNext(1);
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                throw Exceptions.propagate(e);
            }
            subscriber.onNext(2);
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                throw Exceptions.propagate(e);
            }
    
            subscriber.onNext(3);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw Exceptions.propagate(e);
            }
            subscriber.onNext(4);
            subscriber.onNext(5);
            subscriber.onCompleted();
    
        }).throttleFirst(999, TimeUnit.MILLISECONDS)
                .subscribe(item-> Log.d("JG",item.toString())); //結(jié)果為1,3,4
    
    
  • throttleWithTimeout/debounce:發(fā)射數(shù)據(jù)時,如果兩次數(shù)據(jù)的發(fā)射間隔小于指定時間,就會丟棄前一次的數(shù)據(jù),直到指定時間內(nèi)都沒有新數(shù)據(jù)發(fā)射時
    才進行發(fā)射

     Observable.create(subscriber -> {
             subscriber.onNext(1);
             try {
                 Thread.sleep(500);
             } catch (InterruptedException e) {
                 throw Exceptions.propagate(e);
             }
             subscriber.onNext(2);
             try {
                 Thread.sleep(500);
             } catch (InterruptedException e) {
                 throw Exceptions.propagate(e);
             }
    
             subscriber.onNext(3);
             try {
                 Thread.sleep(1000);
             } catch (InterruptedException e) {
                 throw Exceptions.propagate(e);
             }
             subscriber.onNext(4);
             subscriber.onNext(5);
             subscriber.onCompleted();
    
         }).debounce(999, TimeUnit.MILLISECONDS)//或者為throttleWithTimeout(1000, TimeUnit.MILLISECONDS)
                 .subscribe(item-> Log.d("JG",item.toString())); //結(jié)果為3,5
    
  • sample/throttleLast:定期發(fā)射Observable最近的數(shù)據(jù)。內(nèi)部通過OperatorSampleWithTime實現(xiàn)。

     Observable.create(subscriber -> {
            subscriber.onNext(1);
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                throw Exceptions.propagate(e);
            }
            subscriber.onNext(2);
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                throw Exceptions.propagate(e);
            }
    
            subscriber.onNext(3);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw Exceptions.propagate(e);
            }
            subscriber.onNext(4);
            subscriber.onNext(5);
            subscriber.onCompleted();
    
        }).sample(999, TimeUnit.MILLISECONDS)//或者為throttleLast(1000, TimeUnit.MILLISECONDS)
                .subscribe(item-> Log.d("JG",item.toString())); //結(jié)果為2,3,5
    
  • timeout: 如果原始Observable過了指定的一段時長沒有發(fā)射任何數(shù)據(jù),就發(fā)射一個異?;蛘呤褂脗溆玫腛bservable。

       Observable.create(( subscriber) -> {
            subscriber.onNext(1);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                throw Exceptions.propagate(e);
            }
            subscriber.onNext(2);
           
            subscriber.onCompleted();
    
        }).timeout(999, TimeUnit.MILLISECONDS,Observable.just(99,100))//如果不指定備用Observable將會拋出異常
                .subscribe(item-> Log.d("JG",item.toString()),error->Log.d("JG","onError")); //結(jié)果為1,99,100  如果不指定備用Observable結(jié)果為1,onError
    }
    

條件/布爾操作

  • all: 判斷所有的數(shù)據(jù)項是否滿足某個條件,內(nèi)部通過OperatorAll實現(xiàn)。

      Observable.just(2,3,4,5)
                .all(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer>3;
                    }
                })
        .subscribe(new Action1<Boolean>() {
            @Override
            public void call(Boolean aBoolean) {
                Log.d("JG",aBoolean.toString()); //false
            }
        })
        ;
    
  • exists: 判斷是否存在數(shù)據(jù)項滿足某個條件。內(nèi)部通過OperatorAny實現(xiàn)。

       Observable.just(2,3,4,5)
                .exists(integer -> integer>3)
                .subscribe(aBoolean -> Log.d("JG",aBoolean.toString())); //true
    
    
  • contains: 判斷在發(fā)射的所有數(shù)據(jù)項中是否包含指定的數(shù)據(jù),內(nèi)部調(diào)用的其實是exists

      Observable.just(2,3,4,5)
                .contains(3)
                .subscribe(aBoolean -> Log.d("JG",aBoolean.toString())); //true
    
  • sequenceEqual: 用于判斷兩個Observable發(fā)射的數(shù)據(jù)是否相同(數(shù)據(jù),發(fā)射順序,終止狀態(tài))。

     Observable.sequenceEqual(Observable.just(2,3,4,5),Observable.just(2,3,4,5))
                .subscribe(aBoolean -> Log.d("JG",aBoolean.toString()));//true
    
  • isEmpty: 用于判斷Observable發(fā)射完畢時,有沒有發(fā)射數(shù)據(jù)。有數(shù)據(jù)false,如果只收到了onComplete通知則為true。

      Observable.just(3,4,5,6)
                   .isEmpty()
                  .subscribe(item -> Log.d("JG",item.toString()));//false
    
  • amb: 給定多個Observable,只讓第一個發(fā)射數(shù)據(jù)的Observable發(fā)射全部數(shù)據(jù),其他Observable將會被忽略。

        Observable<Integer> observable1=Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    subscriber.onError(e);
                }
                subscriber.onNext(1);
                subscriber.onNext(2);
                subscriber.onCompleted();
            }
        }).subscribeOn(Schedulers.computation());
    
        Observable<Integer> observable2=Observable.create(subscriber -> {
            subscriber.onNext(3);
            subscriber.onNext(4);
            subscriber.onCompleted();
        });
    
        Observable.amb(observable1,observable2)
        .subscribe(integer -> Log.d("JG",integer.toString())); //3,4
    
  • switchIfEmpty: 如果原始Observable正常終止后仍然沒有發(fā)射任何數(shù)據(jù),就使用備用的Observable。

       Observable.empty()
                .switchIfEmpty(Observable.just(2,3,4))
        .subscribe(o -> Log.d("JG",o.toString())); //2,3,4
    
  • defaultIfEmpty: 如果原始Observable正常終止后仍然沒有發(fā)射任何數(shù)據(jù),就發(fā)射一個默認值,內(nèi)部調(diào)用的switchIfEmpty。

  • takeUntil: 當發(fā)射的數(shù)據(jù)滿足某個條件后(包含該數(shù)據(jù)),或者第二個Observable發(fā)送完畢,終止第一個Observable發(fā)送數(shù)據(jù)。

     Observable.just(2,3,4,5)
                .takeUntil(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer==4;
                    }
                }).subscribe(integer -> Log.d("JG",integer.toString())); //2,3,4
    
    
  • takeWhile: 當發(fā)射的數(shù)據(jù)滿足某個條件時(不包含該數(shù)據(jù)),Observable終止發(fā)送數(shù)據(jù)。

      Observable.just(2,3,4,5)
                .takeWhile(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer==4;
                    }
                })
                .subscribe(integer -> Log.d("JG",integer.toString())); //2,3
    
  • skipUntil: 丟棄Observable發(fā)射的數(shù)據(jù),直到第二個Observable發(fā)送數(shù)據(jù)。(丟棄條件數(shù)據(jù))

  • skipWhile: 丟棄Observable發(fā)射的數(shù)據(jù),直到一個指定的條件不成立(不丟棄條件數(shù)據(jù))

聚合操作

  • reduce: 對序列使用reduce()函數(shù)并發(fā)射最終的結(jié)果,內(nèi)部使用OnSubscribeReduce實現(xiàn)。

      Observable.just(2,3,4,5)
                .reduce(new Func2<Integer, Integer, Integer>() {
                    @Override
                    public Integer call(Integer sum, Integer item) {
                        return sum+item;
                    }
                })
                .subscribe(integer -> Log.d("JG",integer.toString()));//14
    
  • collect: 使用collect收集數(shù)據(jù)到一個可變的數(shù)據(jù)結(jié)構(gòu)。

      Observable.just(3,4,5,6)
                   .collect(new Func0<List<Integer>>() { //創(chuàng)建數(shù)據(jù)結(jié)構(gòu)
    
                       @Override
                       public List<Integer> call() {
                           return new ArrayList<Integer>();
                       }
                   }, new Action2<List<Integer>, Integer>() { //收集器
                       @Override
                       public void call(List<Integer> integers, Integer integer) {
                           integers.add(integer);
                       }
                   })
                  .subscribe(new Action1<List<Integer>>() {
                      @Override
                      public void call(List<Integer> integers) {
                          
                      }
                  });
    
  • count/countLong: 計算發(fā)射的數(shù)量,內(nèi)部調(diào)用的是reduce.

轉(zhuǎn)換操作

  • toList: 收集原始Observable發(fā)射的所有數(shù)據(jù)到一個列表,然后返回這個列表.
        Observable.just(2,3,4,5)
                .toList()
                .subscribe(new Action1<List<Integer>>() {
                    @Override
                    public void call(List<Integer> integers) {
                        
                    }
                });
    
    
  • toSortedList: 收集原始Observable發(fā)射的所有數(shù)據(jù)到一個有序列表,然后返回這個列表。
       Observable.just(6,2,3,4,5)
                .toSortedList(new Func2<Integer, Integer, Integer>() {//自定義排序
                    @Override
                    public Integer call(Integer integer, Integer integer2) {
                        return integer-integer2; //>0 升序 ,<0 降序
                    }
                })
                .subscribe(new Action1<List<Integer>>() {
                    @Override
                    public void call(List<Integer> integers) {
                        Log.d("JG",integers.toString()); // [2, 3, 4, 5, 6]
                    }
                });
    
    
  • toMap: 將序列數(shù)據(jù)轉(zhuǎn)換為一個Map。我們可以根據(jù)數(shù)據(jù)項生成key和生成value。
    
        Observable.just(6,2,3,4,5)
                .toMap(new Func1<Integer, String>() {
                    @Override
                    public String call(Integer integer) {
                        return "key:" + integer; //根據(jù)數(shù)據(jù)項生成map的key
                    }
                }, new Func1<Integer, String>() {
                    @Override
                    public String call(Integer integer) {
                        return "value:"+integer; //根據(jù)數(shù)據(jù)項生成map的kvalue
                    }
                }).subscribe(new Action1<Map<String, String>>() {
            @Override
            public void call(Map<String, String> stringStringMap) {
                Log.d("JG",stringStringMap.toString()); // {key:6=value:6, key:5=value:5, key:4=value:4, key:2=value:2, key:3=value:3}
            }
        });
    
    
    
  • toMultiMap: 類似于toMap,不同的地方在于map的value是一個集合。

變換操作

  • map: 對Observable發(fā)射的每一項數(shù)據(jù)都應用一個函數(shù)來變換。

     Observable.just(6,2,3,4,5)
                .map(integer -> "item:"+integer)
                .subscribe(s -> Log.d("JG",s));//item:6,item:2....
    
    
  • cast: 在發(fā)射之前強制將Observable發(fā)射的所有數(shù)據(jù)轉(zhuǎn)換為指定類型

  • flatMap: 將Observable發(fā)射的數(shù)據(jù)變換為Observables集合,然后將這些Observable發(fā)射的數(shù)據(jù)平坦化的放進一個單獨的Observable,內(nèi)部采用merge合并。

           Observable.just(2,3,5)
                .flatMap(new Func1<Integer, Observable<String>>() {
                    @Override
                    public Observable<String> call(Integer integer) {
                        return Observable.create(subscriber -> {
                            subscriber.onNext(integer*10+"");
                            subscriber.onNext(integer*100+"");
                            subscriber.onCompleted();
                        });
                    }
                })
        .subscribe(o -> Log.d("JG",o)) //20,200,30,300,50,500
    
  • flatMapIterable: 和flatMap的作用一樣,只不過生成的是Iterable而不是Observable。

            Observable.just(2,3,5)
                .flatMapIterable(new Func1<Integer, Iterable<String>>() {
                    @Override
                    public Iterable<String> call(Integer integer) {
                        return Arrays.asList(integer*10+"",integer*100+"");
                    }
                }).subscribe(new Action1<String>() {
                  @Override
                  public void call(String s) {
                
                  }
        });
    
  • concatMap: 類似于flatMap,由于內(nèi)部使用concat合并,所以是按照順序連接發(fā)射。

  • switchMap: 和flatMap很像,將Observable發(fā)射的數(shù)據(jù)變換為Observables集合,當原始Observable發(fā)射一個新的數(shù)據(jù)(Observable)時,它將取消訂閱前一個Observable。

      Observable.create(new Observable.OnSubscribe<Integer>() {
    
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                for(int i=1;i<4;i++){
                    subscriber.onNext(i);
                    Utils.sleep(500,subscriber);//線程休眠500ms
                }
    
                subscriber.onCompleted();
            }
        }).subscribeOn(Schedulers.newThread())
          .switchMap(new Func1<Integer, Observable<Integer>>() {
                 @Override
               public Observable<Integer> call(Integer integer) {
                       //每當接收到新的數(shù)據(jù),之前的Observable將會被取消訂閱
                        return Observable.create(new Observable.OnSubscribe<Integer>() {
                            @Override
                            public void call(Subscriber<? super Integer> subscriber) {
                                subscriber.onNext(integer*10);
                                Utils.sleep(500,subscriber);
                                subscriber.onNext(integer*100);
                                subscriber.onCompleted();
                            }
                        }).subscribeOn(Schedulers.newThread());
                    }
                })
                .subscribe(s -> Log.d("JG",s.toString()));//10,20,30,300
    
    
  • scan: 與reduce很像,對Observable發(fā)射的每一項數(shù)據(jù)應用一個函數(shù),然后按順序依次發(fā)射每一個值。

      Observable.just(2,3,5)
                .scan(new Func2<Integer, Integer, Integer>() {
                    @Override
                    public Integer call(Integer sum, Integer item) {
                        return sum+item;
                    }
                })
        .subscribe(integer -> Log.d("JG",integer.toString())) //2,5,10
    
  • groupBy: 將Observable分拆為Observable集合,將原始Observable發(fā)射的數(shù)據(jù)按Key分組,每一個Observable發(fā)射一組不同的數(shù)據(jù)。

       Observable.just(2,3,5,6)
                .groupBy(new Func1<Integer, String>() {
                    @Override
                    public String call(Integer integer) {//分組
                        return integer%2==0?"偶數(shù)":"奇數(shù)";
                    }
                })
        .subscribe(new Action1<GroupedObservable<String, Integer>>() {
            @Override
            public void call(GroupedObservable<String, Integer> o) {
    
                o.subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.d("JG",o.getKey()+":"+integer.toString()); //偶數(shù):2,奇數(shù):3,...
                    }
                });
            }
        })
    
  • buffer: 它定期從Observable收集數(shù)據(jù)到一個集合,然后把這些數(shù)據(jù)集合打包發(fā)射,而不是一次發(fā)射一個

    
        Observable.just(2,3,5,6)
                .buffer(3)
                .subscribe(new Action1<List<Integer>>() {
                    @Override
                    public void call(List<Integer> integers) {
                        
                    }
                })
    
  • window: 定期將來自Observable的數(shù)據(jù)分拆成一些Observable窗口,然后發(fā)射這些窗口,而不是每次發(fā)射一項。

       Observable.just(2,3,5,6)
                .window(3)
                .subscribe(new Action1<Observable<Integer>>() {
                    @Override
                    public void call(Observable<Integer> integerObservable) {
                        integerObservable.subscribe(new Action1<Integer>() {
                            @Override
                            public void call(Integer integer) {
                                
                            }
                        });
                    }
                })
    

錯誤處理/重試機制

  • onErrorResumeNext: 當原始Observable在遇到錯誤時,使用備用Observable。。

      Observable.just(1,"2",3)
        .cast(Integer.class)
        .onErrorResumeNext(Observable.just(1,2,3))
        .subscribe(integer -> Log.d("JG",integer.toString())) //1,2,3
        ;
    
    
  • onExceptionResumeNext: 當原始Observable在遇到異常時,使用備用的Observable。與onErrorResumeNext類似,區(qū)別在于onErrorResumeNext可以處理所有的錯誤,onExceptionResumeNext只能處理異常。

  • onErrorReturn: 當原始Observable在遇到錯誤時發(fā)射一個特定的數(shù)據(jù)。

     Observable.just(1,"2",3)
                .cast(Integer.class)
                .onErrorReturn(new Func1<Throwable, Integer>() {
                    @Override
                    public Integer call(Throwable throwable) {
                        return 4;
                    }
                }).subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer integer) {
                Log.d("JG",integer.toString());1,4
            }
        });
    
  • retry: 當原始Observable在遇到錯誤時進行重試。

    
        Observable.just(1,"2",3)
        .cast(Integer.class)
        .retry(3)
        .subscribe(integer -> Log.d("JG",integer.toString()),throwable -> Log.d("JG","onError"))
        ;//1,1,1,1,onError
    
  • retryWhen: 當原始Observable在遇到錯誤,將錯誤傳遞給另一個Observable來決定是否要重新訂閱這個Observable,內(nèi)部調(diào)用的是retry

      Observable.just(1,"2",3)
        .cast(Integer.class)
        .retryWhen(new Func1<Observable<? extends Throwable>, Observable<Long>>() {
            @Override
            public Observable<Long> call(Observable<? extends Throwable> observable) {
                return Observable.timer(1, TimeUnit.SECONDS);
            }
        })
        .subscribe(integer -> Log.d("JG",integer.toString()),throwable -> Log.d("JG","onError"));
        //1,1
    

連接操作

ConnectableObservable與普通的Observable差不多,但是可連接的Observable在被訂閱時并不開始發(fā)射數(shù)據(jù),只有在它的connect()被調(diào)用時才開始。用這種方法,你可以等所有的潛在訂閱者都訂閱了這個Observable之后才開始發(fā)射數(shù)據(jù)。
ConnectableObservable.connect()指示一個可連接的Observable開始發(fā)射數(shù)據(jù).
Observable.publish()將一個Observable轉(zhuǎn)換為一個可連接的Observable
Observable.replay()確保所有的訂閱者看到相同的數(shù)據(jù)序列的ConnectableObservable,即使它們在Observable開始發(fā)射數(shù)據(jù)之后才訂閱。
ConnectableObservable.refCount()讓一個可連接的Observable表現(xiàn)得像一個普通的Observable。


       ConnectableObservable<Integer> co= Observable.just(1,2,3)
                .publish();

        co .subscribe(integer -> Log.d("JG",integer.toString()) );
        co.connect();//此時開始發(fā)射數(shù)據(jù)

阻塞操作

BlockingObservable是一個阻塞的Observable。普通的Observable 轉(zhuǎn)換為 BlockingObservable,可以使用 Observable.toBlocking(?)方法或者BlockingObservable.from(?)方法。內(nèi)部通過CountDownLatch實現(xiàn)了阻塞操作。。

以下的操作符可以用于BlockingObservable,如果是普通的Observable,務必使用Observable.toBlocking()轉(zhuǎn)為阻塞Observable后使用,否則達不到預期的效果。

  • forEach: 對BlockingObservable發(fā)射的每一項數(shù)據(jù)調(diào)用一個方法,會阻塞直到Observable完成。
    Observable.just(2,3).observeOn(Schedulers.newThread()).toBlocking()
              .forEach(integer -> {
                  Log.d("JG",integer.toString()+" "+Thread.currentThread().getName());
                  Utils.sleep(500);
              });
    
    Log.d("JG",Thread.currentThread().getName());
        // 2 RxNewThreadScheduler-1
        // 3 RxNewThreadScheduler-1
        // main
    
  • first/firstOrDefault/last/lastOrDefault:這幾個操作符之前有介紹過。也可以用于阻塞操作。
  • single/singleOrDefault:如果Observable終止時只發(fā)射了一個值,返回那個值,否則拋出異?;蛘甙l(fā)射默認值。
  • mostRecent:返回一個總是返回Observable最近發(fā)射的數(shù)據(jù)的Iterable。
  • next: 返回一個Iterable,會阻塞直到Observable發(fā)射了第二個值,然后返回那個值。
  • latest: 返回一個iterable,會阻塞直到或者除非Observable發(fā)射了一個iterable沒有返回的值,然后返回這個值
  • toFuture: 將Observable轉(zhuǎn)換為一個Future
  • toIterable:將一個發(fā)射數(shù)據(jù)序列的Observable轉(zhuǎn)換為一個Iterable。
  • getIterator:將一個發(fā)射數(shù)據(jù)序列的Observable轉(zhuǎn)換為一個Iterator

工具集

  • materialize: 將Observable轉(zhuǎn)換成一個通知列表。

     Observable.just(1,2,3)
               .materialize()
               .subscribe(new Action1<Notification<Integer>>() {
                   @Override
                   public void call(Notification<Integer> notification) {
                       Log.d("JG",notification.getKind()+" "+notification.getValue());
                       //OnNext 1
                       //OnNext 2
                       //OnNext 3
                       //OnCompleted null
                   }
               });
    
  • dematerialize: 與上面的作用相反,將通知逆轉(zhuǎn)回一個Observable。

  • timestamp: 給Observable發(fā)射的每個數(shù)據(jù)項添加一個時間戳。

      Observable.just(1,2,3)
               .timestamp()
               .subscribe(new Action1<Timestamped<Integer>>() {
                   @Override
                   public void call(Timestamped<Integer> timestamped) {
                       Log.d("JG",timestamped.getTimestampMillis()+" "+timestamped.getValue());
                       //1472627510548 1
                       //1472627510549 2
                       //1472627510549 3
                   }
               });
    
  • timeInterval:給Observable發(fā)射的兩個數(shù)據(jù)項間添加一個時間差,實現(xiàn)在OperatorTimeInterval

    timeInterval
    timeInterval

  • serialize: 強制Observable按次序發(fā)射數(shù)據(jù)并且要求功能是完好的

  • cache: 緩存Observable發(fā)射的數(shù)據(jù)序列并發(fā)射相同的數(shù)據(jù)序列給后續(xù)的訂閱者

  • observeOn: 指定觀察者觀察Observable的調(diào)度器

  • subscribeOn: 指定Observable執(zhí)行任務的調(diào)度器

  • doOnEach: 注冊一個動作,對Observable發(fā)射的每個數(shù)據(jù)項使用

       Observable.just(2,3)
                 .doOnEach(new Action1<Notification<? super Integer>>() {
                     @Override
                     public void call(Notification<? super Integer> notification) {
                         Log.d("JG","--doOnEach--"+notification.toString());
                     }
                 })
                 .subscribe(integer -> Log.d("JG",integer.toString()));
     //結(jié)果為:            
      // --doOnEach--[rx.Notification@133c40b0 OnNext 2]
     // 2
      // --doOnEach--[rx.Notification@133c40b0 OnNext 3]
     // 3
    // --doOnEach--[rx.Notification@df4db0e OnCompleted]
    
  • doOnCompleted: 注冊一個動作,對正常完成的Observable使用

  • doOnError: 注冊一個動作,對發(fā)生錯誤的Observable使用

  • doOnTerminate:注冊一個動作,對完成的Observable使用,無論是否發(fā)生錯誤

      Observable.just(2,3)
                .doOnTerminate(new Action0() {
                    @Override
                    public void call() {
                        Log.d("JG","--doOnTerminate--");
                    }
                })
                .subscribe(integer -> Log.d("JG",integer.toString()));
    // 2 , 3 , --doOnTerminate--
    
    
  • doOnSubscribe: 注冊一個動作,在觀察者訂閱時使用。內(nèi)部由OperatorDoOnSubscribe實現(xiàn)

    doOnSubscribe
    doOnSubscribe

  • doOnUnsubscribe: 注冊一個動作,在觀察者取消訂閱時使用。內(nèi)部由OperatorDoOnUnsubscribe實現(xiàn),在call中加入一個解綁動作。

    doOnUnsubscribe
    doOnUnsubscribe

  • finallyDo/doAfterTerminate: 注冊一個動作,在Observable完成時使用

    Observable.just(2,3)
                .doAfterTerminate(new Action0() {
                    @Override
                    public void call() {
                        Log.d("JG","--doAfterTerminate--");
                    }
                })
                .subscribe(integer -> Log.d("JG",integer.toString()));
    //2,3,  --doAfterTerminate-- 
    
  • delay: 延時發(fā)射Observable的結(jié)果。即讓原始Observable在發(fā)射每項數(shù)據(jù)之前都暫停一段指定的時間段。效果是Observable發(fā)射的數(shù)據(jù)項在時間上向前整體平移了一個增量(除了onError,它會即時通知)。

  • delaySubscription: 延時處理訂閱請求。實現(xiàn)在OnSubscribeDelaySubscription

    delaySubscription
    delaySubscription

  • using: 創(chuàng)建一個只在Observable生命周期存在的資源,當Observable終止時這個資源會被自動釋放。

      Observable.using(new Func0<File>() {//資源工廠
            @Override
            public File call() {
    
                File file = new File(getCacheDir(), "a.txt");
                if(!file.exists()){
                    try {
                        Log.d("JG","--create--");
                        file.createNewFile();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                return file;
            }
        }, new Func1<File, Observable<String>>() { //Observable
            @Override
            public Observable<String> call(File file) {
                return Observable.just(file.exists() ? "exist" : "no exist");
            }
        }, new Action1<File>() {//釋放資源動作
            @Override
            public void call(File file) {
                if(file!=null&&file.exists()){
                    Log.d("JG","--delete--");
                    file.delete();
                }
            }
        })
        .subscribe(s -> Log.d("JG",s))
        ;
     //--create--
     //exist
     //--delete--
     
    
  • single/singleOrDefault: 強制返回單個數(shù)據(jù),否則拋出異?;蚰J數(shù)據(jù)。

最后

關于RxJava標準庫的操作符已經(jīng)介紹完畢,純粹當個備忘錄。如有錯誤之處,歡迎指出。


本文部分操作符描述參考了【ReactiveX文檔中文翻譯】。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

  • 注:只包含標準包中的操作符,用于個人學習及備忘參考博客:http://blog.csdn.net/maplejaw...
    小白要超神閱讀 2,366評論 2 8
  • 注:只包含標準包中的操作符,用于個人學習及備忘參考博客:http://blog.csdn.net/maplejaw...
    小白要超神閱讀 1,053評論 0 3
  • 創(chuàng)建unfaseCreate(create)創(chuàng)建一個Observable(被觀察者),當被觀察者(Observer...
    chuwe1閱讀 7,135評論 3 8
  • 創(chuàng)建操作 用于創(chuàng)建Observable的操作符Create通過調(diào)用觀察者的方法從頭創(chuàng)建一個ObservableEm...
    rkua閱讀 1,952評論 0 1
  • 原創(chuàng)文|蝸牛爬行者 寫在前面:我曾經(jīng)總把自己歸為理想主義者一類,以為憑著一腔憤世嫉俗的熱血和不屑世故的態(tài)度就可以成...
    蝸牛爬行者閱讀 1,188評論 2 12

友情鏈接更多精彩內(nèi)容