Rxjava3使用教程:操作符-過(guò)濾

過(guò)濾操作符

根據(jù)應(yīng)用場(chǎng)景對(duì)過(guò)濾操作符進(jìn)行分類,可分成以下4個(gè)類別:

1. 根據(jù)"指定條件"過(guò)濾事件
操作符 作用
filter 過(guò)濾 特定條件的事件
ofType 過(guò)濾 特定數(shù)據(jù)類型的數(shù)據(jù)
skip/skipLast 跳過(guò)某個(gè)事件
distinct/distinctUntilChanged 過(guò)濾事件序列中重復(fù)的事件 / 連續(xù)重復(fù)的事件
  • filter()

    Observable.just("hello", 1, "haha", 2).filter(new Predicate<Serializable>() {
        @Override
        public boolean test(Serializable serializable) throws Throwable {
            return serializable instanceof Integer;
        }
    }).subscribe(observer);
    

    結(jié)果:

    2022-05-05 14:59:17.674 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onSubscribe:
    2022-05-05 14:59:17.674 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:1
    2022-05-05 14:59:17.675 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:2
    2022-05-05 14:59:17.675 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onComplete
    
  • ofType()

    Observable.just(1,2,"hello").ofType(String.class).subscribe(observer);
    

    結(jié)果:

    2022-05-05 14:58:09.137 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onSubscribe:
    2022-05-05 14:58:09.137 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:hello
    2022-05-05 14:58:09.137 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onComplete
    
  • skip()

    Observable.just(1,2,3,4,5).skip(2).subscribe(observer);
    

    結(jié)果:

    2022-05-05 15:00:10.151 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onSubscribe:
    2022-05-05 15:00:10.151 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:3
    2022-05-05 15:00:10.151 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:4
    2022-05-05 15:00:10.152 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:5
    2022-05-05 15:00:10.152 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onComplete
    
  • skipLast()

    Observable.just(1,2,3,4,5).skipLast(3).subscribe(observer);
    

    結(jié)果:

    2022-05-05 15:01:03.328 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onSubscribe:
    2022-05-05 15:01:03.328 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:1
    2022-05-05 15:01:03.328 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:2
    2022-05-05 15:01:03.328 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onComplete
    
  • distinct()

    //過(guò)濾事件序列中重復(fù)的事件
    Observable.just(1,2,2,4,4,5,6, 1).distinct().subscribe(observer);
    

    結(jié)果:

    2022-05-05 15:01:42.336 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onSubscribe:
    2022-05-05 15:01:42.337 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:1
    2022-05-05 15:01:42.337 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:2
    2022-05-05 15:01:42.337 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:4
    2022-05-05 15:01:42.337 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:5
    2022-05-05 15:01:42.337 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:6
    2022-05-05 15:01:42.337 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onComplete
    
  • distinctUntilChanged()

    //過(guò)濾事件序列中連續(xù)重復(fù)的事件
    Observable.just(1,2,2,4,4,5,6, 1).distinctUntilChanged().subscribe(observer);
    

    結(jié)果:

    2022-05-05 15:02:07.275 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onSubscribe:
    2022-05-05 15:02:07.276 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:1
    2022-05-05 15:02:07.276 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:2
    2022-05-05 15:02:07.276 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:4
    2022-05-05 15:02:07.276 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:5
    2022-05-05 15:02:07.276 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:6
    2022-05-05 15:02:07.276 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:1
    2022-05-05 15:02:07.276 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onComplete
    
2. 根據(jù)"指定事件數(shù)量"過(guò)濾事件
操作符 作用
take 指定觀察者最多能接收到的事件數(shù)量
takeLast 指定觀察者只能接收到被觀察者發(fā)送的最后幾個(gè)事件
  • take

    Observable.just(1,2,3,4,5).take(2).subscribe(observer);
    

    結(jié)果:

    2022-05-05 15:06:45.219 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onSubscribe:
    2022-05-05 15:06:45.219 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:1
    2022-05-05 15:06:45.219 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:2
    2022-05-05 15:06:45.219 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onComplete
    
  • takeLast

    Observable.just(1,2,3,4,5).takeLast(1).subscribe(observer);
    

    結(jié)果:

    2022-05-05 15:06:58.362 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onSubscribe:
    2022-05-05 15:06:58.363 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:5
    2022-05-05 15:06:58.363 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onComplete
    
3. 根據(jù)"指定時(shí)間"過(guò)濾事件
操作符 作用
throttleFirst 在某段時(shí)間內(nèi),只發(fā)送該段時(shí)間內(nèi)第1次事件
throttleLast/sample 在某段時(shí)間內(nèi),只發(fā)送該段時(shí)間內(nèi)最后1次事件
throttleWithTimeout / debounce 發(fā)送數(shù)據(jù)事件時(shí),若2次發(fā)送事件的間隔<指定時(shí)間,就會(huì)丟棄前一次的數(shù)據(jù),直到指定時(shí)間內(nèi)都沒(méi)有新數(shù)據(jù)發(fā)射時(shí)才會(huì)發(fā)送后一次的數(shù)據(jù)
  • throttleFirst

    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            // 隔段事件發(fā)送時(shí)間
            e.onNext(1);
            Thread.sleep(500);
    
            e.onNext(2);
            Thread.sleep(400);
    
            e.onNext(3);
            Thread.sleep(300);
    
            e.onNext(4);
            Thread.sleep(300);
    
            e.onNext(5);
            Thread.sleep(300);
    
            e.onNext(6);
            Thread.sleep(400);
    
            e.onNext(7);
            Thread.sleep(300);
            e.onNext(8);
    
            Thread.sleep(300);
            e.onNext(9);
    
            Thread.sleep(300);
            e.onComplete();
        }
    }).throttleFirst(1, TimeUnit.SECONDS)//每1秒中采用數(shù)據(jù)
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "開(kāi)始采用subscribe連接");
                }
    
                @Override
                public void onNext(Integer value) {
                    Log.d(TAG, "接收到了事件"+ value  );
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
                }
            });
    

    結(jié)果:

    2022-05-05 15:27:20.009 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: 開(kāi)始采用subscribe連接
    2022-05-05 15:27:20.009 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: 接收到了事件1
    2022-05-05 15:27:21.214 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: 接收到了事件4
    2022-05-05 15:27:22.218 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: 接收到了事件7
    2022-05-05 15:27:23.121 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: 對(duì)Complete事件作出響應(yīng)
    
  • throttleLast/sample

    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
            // 隔段事件發(fā)送時(shí)間
            e.onNext(1);
            Thread.sleep(500);
    
            e.onNext(2);
            Thread.sleep(400);
    
            e.onNext(3);
            Thread.sleep(300);
    
            e.onNext(4);
            Thread.sleep(300);
    
            e.onNext(5);
            Thread.sleep(300);
    
            e.onNext(6);
            Thread.sleep(400);
    
            e.onNext(7);
            Thread.sleep(300);
            e.onNext(8);
    
            Thread.sleep(300);
            e.onNext(9);
    
            Thread.sleep(300);
            e.onComplete();
        }
    }).throttleLast(1, TimeUnit.SECONDS)//每1秒中采用數(shù)據(jù)
            .subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "開(kāi)始采用subscribe連接");
                }
    
                @Override
                public void onNext(Integer value) {
                    Log.d(TAG, "接收到了事件"+ value  );
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
                }
            });
    

    結(jié)果:

    2022-05-05 15:27:23.124 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: 開(kāi)始采用subscribe連接
    2022-05-05 15:27:24.129 3744-3924/com.alsan.rxjavademo D/RxFilterSymbolActivity: 接收到了事件3
    2022-05-05 15:27:25.129 3744-3924/com.alsan.rxjavademo D/RxFilterSymbolActivity: 接收到了事件6
    2022-05-05 15:27:26.129 3744-3924/com.alsan.rxjavademo D/RxFilterSymbolActivity: 接收到了事件9
    2022-05-05 15:27:26.234 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: 對(duì)Complete事件作出響應(yīng)
    
  • throttleWithTimeout/debounce

    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                    // 隔段事件發(fā)送時(shí)間
                    e.onNext(1);
                    Thread.sleep(500);
                    e.onNext(2); // 1和2之間的間隔小于指定時(shí)間1s,所以前1次數(shù)據(jù)(1)會(huì)被拋棄,2會(huì)被保留
                    Thread.sleep(1500);  // 因?yàn)?和3之間的間隔大于指定時(shí)間1s,所以之前被保留的2事件將發(fā)出
                    e.onNext(3);
                    Thread.sleep(1500);  // 因?yàn)?和4之間的間隔大于指定時(shí)間1s,所以3事件將發(fā)出
                    e.onNext(4);
                    Thread.sleep(500); // 因?yàn)?和5之間的間隔小于指定時(shí)間1s,所以前1次數(shù)據(jù)(4)會(huì)被拋棄,5會(huì)被保留
                    e.onNext(5);
                    Thread.sleep(500); // 因?yàn)?和6之間的間隔小于指定時(shí)間1s,所以前1次數(shù)據(jù)(5)會(huì)被拋棄,6會(huì)被保留
                    e.onNext(6);
                    Thread.sleep(1500); // 因?yàn)?和Complete實(shí)踐之間的間隔大于指定時(shí)間1s,所以之前被保留的6事件將發(fā)出
    
                    e.onComplete();
                }
            }).throttleWithTimeout(1, TimeUnit.SECONDS)//每1秒中采用數(shù)據(jù)
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Integer value) {
                            Log.d(TAG, "接收到了事件"+ value  );
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            Log.d(TAG, "對(duì)Error事件作出響應(yīng)");
                        }
    
                        @Override
                        public void onComplete() {
                            Log.d(TAG, "對(duì)Complete事件作出響應(yīng)");
                        }
                    });
    

    結(jié)果:

    2022-05-05 15:21:14.416 3428-3490/com.alsan.rxjavademo D/RxFilterSymbolActivity: 接收到了事件2
    2022-05-05 15:21:15.916 3428-3490/com.alsan.rxjavademo D/RxFilterSymbolActivity: 接收到了事件3
    2022-05-05 15:21:18.420 3428-3490/com.alsan.rxjavademo D/RxFilterSymbolActivity: 接收到了事件6
    2022-05-05 15:21:18.920 3428-3428/com.alsan.rxjavademo D/RxFilterSymbolActivity: 對(duì)Complete事件作出響應(yīng)
    
4. 根據(jù)"指定事件位置"過(guò)濾事件
操作符 作用
firstElement 僅選取第1個(gè)元素
lastElement 僅選取最后一個(gè)元素
elementAt 指定接收某個(gè)元素(通過(guò) 索引值 確定)
elementAtOrError 在elementAt()的基礎(chǔ)上,當(dāng)出現(xiàn)越界情況(即獲取的位置索引 > 發(fā)送事件序列長(zhǎng)度)時(shí),即拋出異常
  • firstElement

    Observable.just("hello", 1, "haha", 2).firstElement().subscribe((Consumer<Serializable>) serializable -> {
        Log.d(TAG, serializable.toString());
    });
    

    結(jié)果:

    2022-05-05 15:38:09.536 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: hello
    
  • lastElement

    Observable.just("hello", 1, "haha", 2).lastElement().subscribe((Consumer<Serializable>) serializable -> {
      Log.d(TAG, serializable.toString());
    });
    

    結(jié)果:

    2022-05-05 15:38:22.242 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: 2
    
  • elementAt

    Observable.just(1,2,2,4,4,5,6, 1).elementAt(1).subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Throwable {
            Log.d(TAG, "accept:" + integer);
        }
    });
    

    結(jié)果:

    2022-05-05 15:39:14.759 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: accept:2
    
  • elementAtOrError

    Observable.just(1,2,3,4,5).elementAtOrError(9).subscribe(new SingleObserver<Integer>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {
            Log.d(TAG, "onSubscribe...");
        }
    
        @Override
        public void onSuccess(@NonNull Integer integer) {
            Log.d(TAG, "onSuccess..." + integer);
        }
    
        @Override
        public void onError(@NonNull Throwable e) {
            Log.d(TAG, "onError:" + Log.getStackTraceString(e));
        }
    });
    

    結(jié)果:

    022-05-05 15:41:16.713 4285-4285/com.alsan.rxjavademo D/RxFilterSymbolActivity: onSubscribe...
    2022-05-05 15:41:16.715 4285-4285/com.alsan.rxjavademo D/RxFilterSymbolActivity: onError:java.util.NoSuchElementException
            at io.reactivex.rxjava3.internal.operators.observable.ObservableElementAtSingle$ElementAtObserver.onComplete(ObservableElementAtSingle.java:115)
            at io.reactivex.rxjava3.internal.operators.observable.ObservableFromArray$FromArrayDisposable.run(ObservableFromArray.java:112)
            at io.reactivex.rxjava3.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java:38)
            at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13095)
            at io.reactivex.rxjava3.internal.operators.observable.ObservableElementAtSingle.subscribeActual(ObservableElementAtSingle.java:37)
            at io.reactivex.rxjava3.core.Single.subscribe(Single.java:4813)
            at com.alsan.rxjavademo.RxFilterSymbolActivity.lambda$onCreate$17$RxFilterSymbolActivity(RxFilterSymbolActivity.java:311)
            at com.alsan.rxjavademo.-$$Lambda$RxFilterSymbolActivity$bbHK8cGk2DGgxkmEzj5endgLd8A.onClick(Unknown Source:2)
            at android.view.View.performClick(View.java:6291)
            at com.google.android.material.button.MaterialButton.performClick(MaterialButton.java:967)
            at android.view.View$PerformClick.run(View.java:24931)
            at android.os.Handler.handleCallback(Handler.java:808)
            at android.os.Handler.dispatchMessage(Handler.java:101)
            at android.os.Looper.loop(Looper.java:166)
            at android.app.ActivityThread.main(ActivityThread.java:7529)
            at java.lang.reflect.Method.invoke(Native Method)
            at com.android.internal.os.Zygote$MethodAndArgsCaller.run(Zygote.java:245)
            at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:921)
    
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容