Android--RxJava操作符

Func1 和 Action1 非常相似,也是 RxJava 的一個接口,用于包裝含有一個參數(shù)的方法。 Func1 和 Action 的區(qū)別在于 Func1 包裝的是有返回值的方法。另外,和 ActionX 一樣, FuncX 也有多個,用于不同參數(shù)個數(shù)的方法。FuncX 和 ActionX 的區(qū)別在 FuncX 包裝的是有返回值的方法。

1.flatMapIterable: (將數(shù)據(jù)轉(zhuǎn)換后再發(fā)送)

private Observable<String> flatMapIterableObserver(){
        return Observable.just(1,2,3)
                .flatMapIterable(new Func1<Integer, Iterable<? extends String>>() {
                    @Override
                    public Iterable<? extends String> call(Integer integer) {
                        ArrayList<String> strings = new ArrayList<>();
                        for (int i=0;i<3;i++){
                            strings.add("flatMapIterableObserver  "+integer);
                        }
                        return strings;
                    }
                });
    }

09-13 18:07:27.610 29295-29295/com.example.administrator.rxjavademo E/call: flatMapIterableObserver  1
09-13 18:07:27.610 29295-29295/com.example.administrator.rxjavademo E/call: flatMapIterableObserver  1
09-13 18:07:27.610 29295-29295/com.example.administrator.rxjavademo E/call: flatMapIterableObserver  1
09-13 18:07:27.610 29295-29295/com.example.administrator.rxjavademo E/call: flatMapIterableObserver  2
09-13 18:07:27.610 29295-29295/com.example.administrator.rxjavademo E/call: flatMapIterableObserver  2
09-13 18:07:27.610 29295-29295/com.example.administrator.rxjavademo E/call: flatMapIterableObserver  2
09-13 18:07:27.610 29295-29295/com.example.administrator.rxjavademo E/call: flatMapIterableObserver  3
09-13 18:07:27.610 29295-29295/com.example.administrator.rxjavademo E/call: flatMapIterableObserver  3
09-13 18:07:27.610 29295-29295/com.example.administrator.rxjavademo E/call: flatMapIterableObserver  3

2.map:(將數(shù)據(jù)源Observable發(fā)送給每個數(shù)據(jù)進(jìn)行指定函數(shù)轉(zhuǎn)換,再將轉(zhuǎn)換后的數(shù)據(jù)發(fā)送出去)

private Observable<Integer> mapObservable(){
        return Observable.just(1,2,3)
                .map(new Func1<Integer, Integer>() {
                    @Override
                    public Integer call(Integer integer) {
                        return integer*10;
                    }
                });
    }

09-13 19:30:11.802 32095-32095/com.example.administrator.rxjavademo E/call: map: 10
09-13 19:30:11.803 32095-32095/com.example.administrator.rxjavademo E/call: map: 20
09-13 19:30:11.803 32095-32095/com.example.administrator.rxjavademo E/call: map: 30
  • flatMap() 和 map() 有一個相同點:它也是把傳入的參數(shù)轉(zhuǎn)化之后返回另一個對象。
  • flatMap() 中返回的是個 Observable 對象,并且這個 Observable 對象并不是被直接發(fā)送到了 Subscriber 的回調(diào)方法中,map返回的是結(jié)果集。
  • flatMap() 的原理是這樣的:1. 使用傳入的事件對象創(chuàng)建一個 Observable 對象;2. 并不發(fā)送這個 Observable, 而是將它激活,于是它開始發(fā)送事件;3. 每一個創(chuàng)建出來的 Observable 發(fā)送的事件,都被匯入同一個 Observable ,而這個 Observable 負(fù)責(zé)將這些事件統(tǒng)一交給 Subscriber 的回調(diào)方法。這三個步驟,把事件拆成了兩級,通過一組新創(chuàng)建的 Observable 將初始的對象『鋪平』之后通過統(tǒng)一路徑分發(fā)了下去。而這個『鋪平』就是 flatMap() 所謂的 flat
  • map適用于一對一轉(zhuǎn)換,flatmap適用于一對多,多對多的場景

3.groupBy:(將數(shù)據(jù)源轉(zhuǎn)換為groupBy篩選后的Observable對象)

private Observable<GroupedObservable<Integer,String>> groupedByStringObservable(){
        return Observable.just(1,2,3,4,5,6,7,8,9)
                .groupBy(new Func1<Integer, Integer>() {
                    @Override
                    public Integer call(Integer integer) {
                        return integer % 2;
                    }
                }, new Func1<Integer, String>() {
                    @Override
                    public String call(Integer integer) {
                        return "groupBy: "+integer;
                    }
                });
    }

groupedByStringObservable().subscribe(new Action1<GroupedObservable<Integer, String>>() {
                    @Override
                    public void call(GroupedObservable<Integer, String> integerStringGroupedObservable) {
                        if (integerStringGroupedObservable.getKey()==0){
                            integerStringGroupedObservable.subscribe(new Action1<String>() {
                                @Override
                                public void call(String s) {
                                    Log.e("call", s);
                                }
                            });
                        }
                    }
                });

09-13 19:17:28.812 31719-31719/com.example.administrator.rxjavademo E/call: groupBy: 2
09-13 19:17:28.812 31719-31719/com.example.administrator.rxjavademo E/call: groupBy: 4
09-13 19:17:28.812 31719-31719/com.example.administrator.rxjavademo E/call: groupBy: 6
09-13 19:17:28.812 31719-31719/com.example.administrator.rxjavademo E/call: groupBy: 8

4.cast:(將Observable發(fā)送的數(shù)據(jù)強(qiáng)轉(zhuǎn)成另外一種類型)

5.scan:(做一次計算,有條件、有篩選的輸出最終結(jié)果)

6.throttleWithTimeout:(時間限流, 低于指定時間的都將被過濾)

private Observable<Integer> createObserver(){
        return Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                for (int i=0;i<10;i++){
                    if (!subscriber.isUnsubscribed()){
                        subscriber.onNext(i);
                    }
                    int sleep = 100;
                    if (i%3==0){
                        sleep = 300;
                    }
                    try{
                        Thread.sleep(sleep);
                    } catch (Exception e){
                        e.printStackTrace();
                    }
                }
                subscriber.onCompleted();
            }
        }).subscribeOn(Schedulers.computation());
    }

private Observable<Integer> throttleWithThimoutObserver(){
    return createObserver().throttleWithTimeout(200,TimeUnit.MILLISECONDS);
}

對小于3的倍數(shù)的數(shù)據(jù)延遲100毫秒后發(fā)送新數(shù)據(jù), 100毫秒小于過濾的時間,數(shù)據(jù)被過濾掉;
輸出日志如下:

09-14 10:06:19.673 2916-2964/com.example.administrator.rxjavademo E/call: integer: 0
09-14 10:06:20.174 2916-2964/com.example.administrator.rxjavademo E/call: integer: 3
09-14 10:06:20.674 2916-2964/com.example.administrator.rxjavademo E/call: integer: 6
09-14 10:06:21.175 2916-2964/com.example.administrator.rxjavademo E/call: integer: 9

7.distinct:(去重,不能List元素出重, 可以list對象出重)

 private Observable<Integer> distinctObserver(){
        return Observable.just(1,2,4,5,6,8,4,3,2,1).distinct();
    }

09-14 10:33:51.124 3710-3710/com.example.administrator.rxjavademo E/call: integer: 1
09-14 10:33:51.124 3710-3710/com.example.administrator.rxjavademo E/call: integer: 2
09-14 10:33:51.124 3710-3710/com.example.administrator.rxjavademo E/call: integer: 4
09-14 10:33:51.124 3710-3710/com.example.administrator.rxjavademo E/call: integer: 5
09-14 10:33:51.125 3710-3710/com.example.administrator.rxjavademo E/call: integer: 6
09-14 10:33:51.125 3710-3710/com.example.administrator.rxjavademo E/call: integer: 8
09-14 10:33:51.125 3710-3710/com.example.administrator.rxjavademo E/call: integer: 3

8.elementAt:(下標(biāo)順序過濾數(shù)據(jù)源)

private Observable<Integer> elementAtObserver(){
        return Observable.just(1,3,8,10,9).elementAt(3);
    }

09-14 11:05:48.591 5006-5006/com.example.administrator.rxjavademo E/call: integer: 10

9.filter:(根據(jù)函數(shù)進(jìn)行過濾操作,返回true就往下執(zhí)行,否則過濾掉 , 和last類似)

 private Observable<Integer> filterObserver(){
        return Observable.just(0,1,2,3,4,5,6,7,8,9)
                .filter(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer<3;
                    }
                });
    }

09-14 11:39:34.319 5856-5856/com.example.administrator.rxjavademo E/call: integer: 0
09-14 11:39:34.319 5856-5856/com.example.administrator.rxjavademo E/call: integer: 1
09-14 11:39:34.319 5856-5856/com.example.administrator.rxjavademo E/call: integer: 2

10.first:(只會返回第一條或者滿足條件的第一條數(shù)據(jù))

 Observable.just(0,1,2,3,4,5).first(new Func1<Integer, Boolean>() {
                    @Override
                    public Boolean call(Integer integer) {
                        return integer>3;
                    }
                }).subscribe(new Action1<Integer>() {
                    @Override
                    public void call(Integer integer) {
                        Log.e("call","integer: "+integer);
                    }
                });


//BlockingObservable不會做任何處理,只會阻塞;
 int result = Observable.just(0,1,2,3,4,5)
                        .toBlocking()
                        .first(new Func1<Integer, Boolean>() {
                            @Override
                            public Boolean call(Integer integer) {
                                return integer>3;
                            }
                        });

09-14 11:56:10.987 6328-6328/com.example.administrator.rxjavademo E/call: integer: 4

11.skip和take: (skip操作符過濾掉前面的數(shù)據(jù),take只取前面數(shù)據(jù) ,將后面的數(shù)據(jù)全部過濾掉)

12.sample和throttleFrist:(sample一次性發(fā)送間隔的幾個數(shù)據(jù),throttleFrist間隔時間后發(fā)送一個數(shù)據(jù))

13.join:(將兩個Observable在有效的時間內(nèi)拼接)

private Observable<String> getLeftObservable(){
        return Observable.just("a","b","c");
}

private Observable<Long> getRightObservable(){
      return Observable.just(1L,2L,3L);
}

private Observable<String> joinObserver(){
    return getLeftObservable().join(getRightObservable(), new Func1<String, Observable<Long>>() {
        @Override
        public Observable<Long> call(String s) {
            return Observable.timer(1000, TimeUnit.MILLISECONDS);
        }
    }, new Func1<Long, Observable<Long>>() {
        @Override
        public Observable<Long> call(Long aLong) {
            return Observable.timer(1000, TimeUnit.MILLISECONDS);
        }
    }, new Func2<String, Long, String>() {
        @Override
        public String call(String s, Long aLong) {
            return s+" : "+aLong;
        }
    });
}


09-14 15:08:57.223 9448-9448/com.example.administrator.rxjavademo E/call: joinObserver: b : 1
09-14 15:08:57.223 9448-9448/com.example.administrator.rxjavademo E/call: joinObserver: a : 1
09-14 15:08:57.223 9448-9448/com.example.administrator.rxjavademo E/call: joinObserver: c : 1
09-14 15:08:57.224 9448-9448/com.example.administrator.rxjavademo E/call: joinObserver: b : 2
09-14 15:08:57.224 9448-9448/com.example.administrator.rxjavademo E/call: joinObserver: a : 2
09-14 15:08:57.224 9448-9448/com.example.administrator.rxjavademo E/call: joinObserver: c : 2
09-14 15:08:57.225 9448-9448/com.example.administrator.rxjavademo E/call: joinObserver: b : 3
09-14 15:08:57.225 9448-9448/com.example.administrator.rxjavademo E/call: joinObserver: a : 3
09-14 15:08:57.225 9448-9448/com.example.administrator.rxjavademo E/call: joinObserver: c : 3

14.merge:(將多個Observable發(fā)送的數(shù)據(jù)整合在一起后發(fā)送,但發(fā)送的數(shù)據(jù)可能是錯亂的,如果不想錯亂可以使用concat)

private Observable<Integer> mergeObserver(){
    return Observable.merge(Observable.just(1,2,3),Observable.just(4,5,6));
}

09-14 15:39:05.915 10013-10013/com.example.administrator.rxjavademo E/call: mergeObserver: 1
09-14 15:39:05.915 10013-10013/com.example.administrator.rxjavademo E/call: mergeObserver: 2
09-14 15:39:05.915 10013-10013/com.example.administrator.rxjavademo E/call: mergeObserver: 3
09-14 15:39:05.915 10013-10013/com.example.administrator.rxjavademo E/call: mergeObserver: 4
09-14 15:39:05.915 10013-10013/com.example.administrator.rxjavademo E/call: mergeObserver: 5
09-14 15:39:05.915 10013-10013/com.example.administrator.rxjavademo E/call: mergeObserver: 6

15.mergeDelayError:(類似merge ,但是遇到異常后會繼續(xù)組合操作,等所有數(shù)據(jù)發(fā)送完成后才將這個異常拋出)

private Observable<Integer> mergeDelayErrorObserver(){
    return Observable.mergeDelayError(Observable
    .create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            for (int i=0;i<5;i++){
                if (i==3){
                    subscriber.onError(new Throwable("onError"));
                }
                subscriber.onNext(i);
            }
        }
    }),Observable.create(new Observable.OnSubscribe<Integer>() {
        @Override
        public void call(Subscriber<? super Integer> subscriber) {
            for (int i=0;i<5;i++){
                if (i==3){
                    subscriber.onNext(i+5);
                }
                subscriber.onCompleted();
            }
        }
    }));
}

09-14 15:44:31.088 10353-10353/com.example.administrator.rxjavademo E/call: mergeObserver: 0
09-14 15:44:31.088 10353-10353/com.example.administrator.rxjavademo E/call: mergeObserver: 1
09-14 15:44:31.088 10353-10353/com.example.administrator.rxjavademo E/call: mergeObserver: 2
09-14 15:44:31.088 10353-10353/com.example.administrator.rxjavademo E/call: mergeObserver: 3
09-14 15:44:31.088 10353-10353/com.example.administrator.rxjavademo E/call: mergeObserver: 4
09-14 15:44:31.088 10353-10353/com.example.administrator.rxjavademo E/call: mergeObserver: 8
09-14 15:44:31.090 10353-10353/com.example.administrator.rxjavademo E/AndroidRuntime: FATAL EXCEPTION: main
    Process: com.example.administrator.rxjavademo, PID: 10353
    rx.exceptions.OnErrorNotImplementedException: onError

16.startWith:(需要發(fā)送的數(shù)據(jù)源前面插入數(shù)據(jù))

private Observable<Integer> startWithObserver(){
        return Observable.just(1,2,3).startWith(-1,4);
    }

09-14 15:49:55.637 10729-10729/com.example.administrator.rxjavademo E/call: mergeObserver: -1
09-14 15:49:55.637 10729-10729/com.example.administrator.rxjavademo E/call: mergeObserver: 4
09-14 15:49:55.637 10729-10729/com.example.administrator.rxjavademo E/call: mergeObserver: 1
09-14 15:49:55.637 10729-10729/com.example.administrator.rxjavademo E/call: mergeObserver: 2
09-14 15:49:55.637 10729-10729/com.example.administrator.rxjavademo E/call: mergeObserver: 3

17.onErrorReturn:(捕獲異常并返回了指定的字符串給訂閱者)

private Observable<String> createObserver(){
    return Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            for (int i=1;i<=6;i++){
                if (i<3){
                    subscriber.onNext("onNext: "+i);
                } else {
                    subscriber.onError(new Throwable("onError"));
                }
            }
        }
    });
}
private Observable<String> onErrorReturnObserver() {
    return createObserver().onErrorReturn(new Func1<Throwable, String>() {
        @Override
        public String call(Throwable throwable) {
            return "onErrorReturn";
        }
    });
}

09-14 16:25:28.983 11558-11558/com.example.administrator.rxjavademo E/onErrorReturnObserver: onNext: onNext: 1
09-14 16:25:28.983 11558-11558/com.example.administrator.rxjavademo E/onErrorReturnObserver: onNext: onNext: 2
09-14 16:25:28.983 11558-11558/com.example.administrator.rxjavademo E/onErrorReturnObserver: onNext: onErrorReturn
09-14 16:25:28.983 11558-11558/com.example.administrator.rxjavademo E/onErrorReturnObserver: onCompleted

18.onErrorResumeNext:(發(fā)生異常的時候, 創(chuàng)建新的Observable來繼續(xù)發(fā)送數(shù)據(jù))

19.onExceptionResumeNext:(類似onErrorResumeNext,不同之處是對異常數(shù)據(jù)做判斷 ,如果是Exception就會使用另一個Observable代替原來的繼續(xù)發(fā)數(shù)據(jù),否則將錯誤分發(fā)給Subscriber)

20.retry:(發(fā)生錯誤的時候會重新訂閱,而且可以重復(fù)多次,但是這樣也就有可能造成死循環(huán),建議指定最大重復(fù)次數(shù))

private Observable<Integer> createObserver(){
        return Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                Log.e("createObserver","subscriber");
                for (int i=0;i<3;i++){
                    if (i==2){
                        subscriber.onError(new Exception("Exception"));
                    } else {
                        subscriber.onNext(i);
                    }
                }
            }
        });
    }

createObserver().retry(2).subscribe(new Subscriber<Integer>() {
                    @Override
                    public void onCompleted() {
                        Log.e("retry","onCompleted");
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.e("retry","onError: "+e.getMessage());
                    }

                    @Override
                    public void onNext(Integer integer) {
                        Log.e("retry","call: "+integer);
                    }
                });
09-17 17:34:02.069 18668-18668/com.example.administrator.rxjavademo E/createObserver: subscriber
09-17 17:34:02.069 18668-18668/com.example.administrator.rxjavademo E/retry: call: 0
09-17 17:34:02.069 18668-18668/com.example.administrator.rxjavademo E/retry: call: 1
09-17 17:34:02.070 18668-18668/com.example.administrator.rxjavademo E/createObserver: subscriber
09-17 17:34:02.070 18668-18668/com.example.administrator.rxjavademo E/retry: call: 0
09-17 17:34:02.070 18668-18668/com.example.administrator.rxjavademo E/retry: call: 1
09-17 17:34:02.071 18668-18668/com.example.administrator.rxjavademo E/createObserver: subscriber
09-17 17:34:02.071 18668-18668/com.example.administrator.rxjavademo E/retry: call: 0
09-17 17:34:02.071 18668-18668/com.example.administrator.rxjavademo E/retry: call: 1
09-17 17:34:02.071 18668-18668/com.example.administrator.rxjavademo E/retry: onError: Exception

21.delay:(延遲發(fā)送)

22.do:(給Observable的各個階段加上監(jiān)聽,執(zhí)行到的時候就觸發(fā))

  • doOnEach() :Observable每次發(fā)送一個數(shù)據(jù)的時候就會觸發(fā)這個回調(diào),無論Observable調(diào)用的是onNext,onError還是onCompleted.
  • doOnNext: 只有Observable調(diào)用onNext 發(fā)送數(shù)據(jù)的時候才會調(diào)用;
  • doOnError: 只有Observable通過onError 分發(fā)錯誤的時候才會觸發(fā)回調(diào),并且調(diào)用Throwble對象作為參數(shù)傳遞到回調(diào)函數(shù)去;
  • doOnComplete:只有Observable調(diào)用doOnComplete 發(fā)送結(jié)束事件的時候才會觸發(fā)回調(diào);
  • doOnSubscribe和doOnUnsubscribe: 會在Subscrible進(jìn)行訂閱和反訂閱的時候才會觸發(fā)回調(diào);
  • doOnTerminate:會在Observable結(jié)束前觸發(fā)回調(diào),無論是正常結(jié)束還是異常結(jié)束;
  • finallyDo: 會在Observable結(jié)束后觸發(fā)回調(diào),無論是正常結(jié)束還是異常結(jié)束;

23.subscribeOn和observeOn:(subscribeOn是在哪個線程上訂閱,也就是用subscribeOn指定要工作的線程;observeOn是在哪個線程上觀察,也就是結(jié)果被使用的線程)

 private Observable<Integer> observableOnserver(){
        return createObserver()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeOn(Schedulers.newThread());
    }

    private Observable<Integer> createObserver(){
        return Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                Log.e("createObserver","call: "+Thread.currentThread().getName());
                subscriber.onNext(1);
                subscriber.onCompleted();
            }
        });
    }

    private Observable<Integer> subscribeOnObserver(){
        return createObserver()
                .subscribeOn(Schedulers.computation())
                .observeOn(Schedulers.immediate());
    }

09-18 10:34:52.832 3048-3092/com.example.administrator.rxjavademo E/createObserver: call: RxComputationThreadPool-1
09-18 10:34:52.833 3048-3092/com.example.administrator.rxjavademo E/computation: call: 1   RxComputationThreadPool-1
09-18 10:34:52.843 3048-3091/com.example.administrator.rxjavademo E/createObserver: call: RxNewThreadScheduler-1
09-18 10:34:52.844 3048-3048/com.example.administrator.rxjavademo E/mainThread: call: 1   main

24.using:(創(chuàng)建一個在Observable生命周期內(nèi)存活的資源,但是Observable終止的時候,資源也會被銷毀)

25.all:(對Observable發(fā)送的所有數(shù)據(jù)進(jìn)行判斷,如果全部滿足就返回true,否則返回false)

26.amb:(最多將9個Observable結(jié)合起來,看哪個先發(fā)送(包括onError和onComplete),后發(fā)送的將被丟棄)

27.contains:(判斷發(fā)送的所有數(shù)據(jù)有沒有包含某個數(shù)據(jù),如果包含就返回true,Observable沒發(fā)送完所有數(shù)據(jù)前不會返回數(shù)據(jù))

28.isEmpty:(判斷Observable是否發(fā)送過數(shù)據(jù),如果發(fā)送過了就返回false;如果Observavble已經(jīng)結(jié)束了都還沒發(fā)送這個數(shù)據(jù),則返回true)

29.concat:(將發(fā)送的數(shù)據(jù)組合起來,類似startWith和merge)

30.from:(接收一個對象作為參數(shù)來創(chuàng)建Observable,參數(shù)對象可以是Iterable,Callable,Future和數(shù)組)

31.just:(接收對象作為輸入,然后創(chuàng)建一個發(fā)送該對象的Observable,對象可以是數(shù)字,字符串,數(shù)組,Iterate對象等)

  • from()創(chuàng)建方式和just()操作符類似,但是just操作符創(chuàng)建的Observable會將整個參數(shù)對象作為數(shù)據(jù)一下子發(fā)送出去,例如參數(shù)是個含有10個數(shù)字的數(shù)組,使用from創(chuàng)建Observable就會發(fā)送10次,而just創(chuàng)建的Observable會一次將整個數(shù)組發(fā)送出去;
  • 一般如果用from轉(zhuǎn)換多個數(shù)據(jù),比如 ArrayList等包含多個數(shù)據(jù)的數(shù)組或者列表, just用于處理單個的數(shù)據(jù)。
  • from 遇到只包含一個數(shù)據(jù)的時候,會使用just創(chuàng)建Observable; just 遇到多于一個的情況,會使用from 創(chuàng)建 Observable

32.自定義操作符:
A. 可以多次調(diào)用Subscriber的onNext方法,但是同個數(shù)據(jù)只能調(diào)用一次;
B. 可以調(diào)用Subscriber的onComplete或者onError方法,但是這兩個方法是互斥的,調(diào)用了其中一個就不能調(diào)用另一個,并且一旦調(diào)用了兩者中的任何一個方法就不能調(diào)用onNext方法;
C. 如果無法保證無法保證上面兩條原則,可以對自定義操作符加上serialize操作符,這個操作符會強(qiáng)制性發(fā)送正確的數(shù)據(jù);
D. 自定義操作內(nèi)部不能阻塞;
E.如果有異常的時候,不能繼續(xù)發(fā)送正常的數(shù)據(jù),要立刻調(diào)用Subscriber的onError() 來將異常拋出;
F.null也屬于一種數(shù)據(jù), 可以正常發(fā)送,和完全不發(fā)送是兩回事;
G.如果通過組合多個Rxjava原生操作符就能達(dá)到目的, 就不要使用自定義操作符實現(xiàn);例如:

  • first(操作符是通過take(1).single()來實現(xiàn)的;
  • ignoreElements()是通過filter(alwaysFalse())來實現(xiàn)的;
  • reduce(a)是通過scan(a).last()來實現(xiàn)的;

參考:

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 一、RxJava操作符概述 RxJava中的操作符就是為了提供函數(shù)式的特性,函數(shù)式最大的好處就是使得數(shù)據(jù)處理簡潔易...
    BrotherChen閱讀 1,779評論 0 10
  • 一、RxJava操作符概述 RxJava中的操作符就是為了提供函數(shù)式的特性,函數(shù)式最大的好處就是使得數(shù)據(jù)處理簡潔易...
    無求_95dd閱讀 3,487評論 0 21
  • 一、RxJava操作符概述 RxJava中的操作符就是為了提供函數(shù)式的特性,函數(shù)式最大的好處就是使得數(shù)據(jù)處理簡潔易...
    測天測地測空氣閱讀 683評論 0 1
  • 本篇文章介主要紹RxJava中操作符是以函數(shù)作為基本單位,與響應(yīng)式編程作為結(jié)合使用的,對什么是操作、操作符都有哪些...
    嘎啦果安卓獸閱讀 2,975評論 0 10
  • 作者: maplejaw本篇只解析標(biāo)準(zhǔn)包中的操作符。對于擴(kuò)展包,由于使用率較低,如有需求,請讀者自行查閱文檔。 創(chuàng)...
    maplejaw_閱讀 46,188評論 8 93

友情鏈接更多精彩內(nèi)容