過(guò)濾操作符
根據(jù)應(yīng)用場(chǎng)景對(duì)過(guò)濾操作符進(jìn)行分類,可分成以下4個(gè)類別:
1. 根據(jù)"指定條件"過(guò)濾事件
| 操作符 | 作用 |
|---|---|
| filter | 過(guò)濾 特定條件的事件 |
| ofType | 過(guò)濾 特定數(shù)據(jù)類型的數(shù)據(jù) |
| skip/skipLast | 跳過(guò)某個(gè)事件 |
| distinct/distinctUntilChanged | 過(guò)濾事件序列中重復(fù)的事件 / 連續(xù)重復(fù)的事件 |
-
filter()
Observable.just("hello", 1, "haha", 2).filter(new Predicate<Serializable>() { @Override public boolean test(Serializable serializable) throws Throwable { return serializable instanceof Integer; } }).subscribe(observer);結(jié)果:
2022-05-05 14:59:17.674 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onSubscribe: 2022-05-05 14:59:17.674 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:1 2022-05-05 14:59:17.675 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:2 2022-05-05 14:59:17.675 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onComplete -
ofType()
Observable.just(1,2,"hello").ofType(String.class).subscribe(observer);結(jié)果:
2022-05-05 14:58:09.137 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onSubscribe: 2022-05-05 14:58:09.137 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:hello 2022-05-05 14:58:09.137 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onComplete -
skip()
Observable.just(1,2,3,4,5).skip(2).subscribe(observer);結(jié)果:
2022-05-05 15:00:10.151 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onSubscribe: 2022-05-05 15:00:10.151 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:3 2022-05-05 15:00:10.151 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:4 2022-05-05 15:00:10.152 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:5 2022-05-05 15:00:10.152 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onComplete -
skipLast()
Observable.just(1,2,3,4,5).skipLast(3).subscribe(observer);結(jié)果:
2022-05-05 15:01:03.328 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onSubscribe: 2022-05-05 15:01:03.328 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:1 2022-05-05 15:01:03.328 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:2 2022-05-05 15:01:03.328 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onComplete -
distinct()
//過(guò)濾事件序列中重復(fù)的事件 Observable.just(1,2,2,4,4,5,6, 1).distinct().subscribe(observer);結(jié)果:
2022-05-05 15:01:42.336 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onSubscribe: 2022-05-05 15:01:42.337 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:1 2022-05-05 15:01:42.337 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:2 2022-05-05 15:01:42.337 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:4 2022-05-05 15:01:42.337 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:5 2022-05-05 15:01:42.337 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:6 2022-05-05 15:01:42.337 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onComplete -
distinctUntilChanged()
//過(guò)濾事件序列中連續(xù)重復(fù)的事件 Observable.just(1,2,2,4,4,5,6, 1).distinctUntilChanged().subscribe(observer);結(jié)果:
2022-05-05 15:02:07.275 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onSubscribe: 2022-05-05 15:02:07.276 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:1 2022-05-05 15:02:07.276 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:2 2022-05-05 15:02:07.276 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:4 2022-05-05 15:02:07.276 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:5 2022-05-05 15:02:07.276 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:6 2022-05-05 15:02:07.276 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:1 2022-05-05 15:02:07.276 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onComplete
2. 根據(jù)"指定事件數(shù)量"過(guò)濾事件
| 操作符 | 作用 |
|---|---|
| take | 指定觀察者最多能接收到的事件數(shù)量 |
| takeLast | 指定觀察者只能接收到被觀察者發(fā)送的最后幾個(gè)事件 |
-
take
Observable.just(1,2,3,4,5).take(2).subscribe(observer);結(jié)果:
2022-05-05 15:06:45.219 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onSubscribe: 2022-05-05 15:06:45.219 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:1 2022-05-05 15:06:45.219 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:2 2022-05-05 15:06:45.219 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onComplete -
takeLast
Observable.just(1,2,3,4,5).takeLast(1).subscribe(observer);結(jié)果:
2022-05-05 15:06:58.362 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onSubscribe: 2022-05-05 15:06:58.363 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onNext:5 2022-05-05 15:06:58.363 1522-1522/com.alsan.rxjavademo D/RxFilterSymbolActivity: onComplete
3. 根據(jù)"指定時(shí)間"過(guò)濾事件
| 操作符 | 作用 |
|---|---|
| throttleFirst | 在某段時(shí)間內(nèi),只發(fā)送該段時(shí)間內(nèi)第1次事件 |
| throttleLast/sample | 在某段時(shí)間內(nèi),只發(fā)送該段時(shí)間內(nèi)最后1次事件 |
| throttleWithTimeout / debounce | 發(fā)送數(shù)據(jù)事件時(shí),若2次發(fā)送事件的間隔<指定時(shí)間,就會(huì)丟棄前一次的數(shù)據(jù),直到指定時(shí)間內(nèi)都沒(méi)有新數(shù)據(jù)發(fā)射時(shí)才會(huì)發(fā)送后一次的數(shù)據(jù) |
-
throttleFirst
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { // 隔段事件發(fā)送時(shí)間 e.onNext(1); Thread.sleep(500); e.onNext(2); Thread.sleep(400); e.onNext(3); Thread.sleep(300); e.onNext(4); Thread.sleep(300); e.onNext(5); Thread.sleep(300); e.onNext(6); Thread.sleep(400); e.onNext(7); Thread.sleep(300); e.onNext(8); Thread.sleep(300); e.onNext(9); Thread.sleep(300); e.onComplete(); } }).throttleFirst(1, TimeUnit.SECONDS)//每1秒中采用數(shù)據(jù) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "開(kāi)始采用subscribe連接"); } @Override public void onNext(Integer value) { Log.d(TAG, "接收到了事件"+ value ); } @Override public void onError(Throwable e) { Log.d(TAG, "對(duì)Error事件作出響應(yīng)"); } @Override public void onComplete() { Log.d(TAG, "對(duì)Complete事件作出響應(yīng)"); } });結(jié)果:
2022-05-05 15:27:20.009 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: 開(kāi)始采用subscribe連接 2022-05-05 15:27:20.009 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: 接收到了事件1 2022-05-05 15:27:21.214 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: 接收到了事件4 2022-05-05 15:27:22.218 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: 接收到了事件7 2022-05-05 15:27:23.121 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: 對(duì)Complete事件作出響應(yīng) -
throttleLast/sample
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { // 隔段事件發(fā)送時(shí)間 e.onNext(1); Thread.sleep(500); e.onNext(2); Thread.sleep(400); e.onNext(3); Thread.sleep(300); e.onNext(4); Thread.sleep(300); e.onNext(5); Thread.sleep(300); e.onNext(6); Thread.sleep(400); e.onNext(7); Thread.sleep(300); e.onNext(8); Thread.sleep(300); e.onNext(9); Thread.sleep(300); e.onComplete(); } }).throttleLast(1, TimeUnit.SECONDS)//每1秒中采用數(shù)據(jù) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "開(kāi)始采用subscribe連接"); } @Override public void onNext(Integer value) { Log.d(TAG, "接收到了事件"+ value ); } @Override public void onError(Throwable e) { Log.d(TAG, "對(duì)Error事件作出響應(yīng)"); } @Override public void onComplete() { Log.d(TAG, "對(duì)Complete事件作出響應(yīng)"); } });結(jié)果:
2022-05-05 15:27:23.124 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: 開(kāi)始采用subscribe連接 2022-05-05 15:27:24.129 3744-3924/com.alsan.rxjavademo D/RxFilterSymbolActivity: 接收到了事件3 2022-05-05 15:27:25.129 3744-3924/com.alsan.rxjavademo D/RxFilterSymbolActivity: 接收到了事件6 2022-05-05 15:27:26.129 3744-3924/com.alsan.rxjavademo D/RxFilterSymbolActivity: 接收到了事件9 2022-05-05 15:27:26.234 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: 對(duì)Complete事件作出響應(yīng) -
throttleWithTimeout/debounce
Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> e) throws Exception { // 隔段事件發(fā)送時(shí)間 e.onNext(1); Thread.sleep(500); e.onNext(2); // 1和2之間的間隔小于指定時(shí)間1s,所以前1次數(shù)據(jù)(1)會(huì)被拋棄,2會(huì)被保留 Thread.sleep(1500); // 因?yàn)?和3之間的間隔大于指定時(shí)間1s,所以之前被保留的2事件將發(fā)出 e.onNext(3); Thread.sleep(1500); // 因?yàn)?和4之間的間隔大于指定時(shí)間1s,所以3事件將發(fā)出 e.onNext(4); Thread.sleep(500); // 因?yàn)?和5之間的間隔小于指定時(shí)間1s,所以前1次數(shù)據(jù)(4)會(huì)被拋棄,5會(huì)被保留 e.onNext(5); Thread.sleep(500); // 因?yàn)?和6之間的間隔小于指定時(shí)間1s,所以前1次數(shù)據(jù)(5)會(huì)被拋棄,6會(huì)被保留 e.onNext(6); Thread.sleep(1500); // 因?yàn)?和Complete實(shí)踐之間的間隔大于指定時(shí)間1s,所以之前被保留的6事件將發(fā)出 e.onComplete(); } }).throttleWithTimeout(1, TimeUnit.SECONDS)//每1秒中采用數(shù)據(jù) .subscribe(new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Integer value) { Log.d(TAG, "接收到了事件"+ value ); } @Override public void onError(Throwable e) { Log.d(TAG, "對(duì)Error事件作出響應(yīng)"); } @Override public void onComplete() { Log.d(TAG, "對(duì)Complete事件作出響應(yīng)"); } });結(jié)果:
2022-05-05 15:21:14.416 3428-3490/com.alsan.rxjavademo D/RxFilterSymbolActivity: 接收到了事件2 2022-05-05 15:21:15.916 3428-3490/com.alsan.rxjavademo D/RxFilterSymbolActivity: 接收到了事件3 2022-05-05 15:21:18.420 3428-3490/com.alsan.rxjavademo D/RxFilterSymbolActivity: 接收到了事件6 2022-05-05 15:21:18.920 3428-3428/com.alsan.rxjavademo D/RxFilterSymbolActivity: 對(duì)Complete事件作出響應(yīng)
4. 根據(jù)"指定事件位置"過(guò)濾事件
| 操作符 | 作用 |
|---|---|
| firstElement | 僅選取第1個(gè)元素 |
| lastElement | 僅選取最后一個(gè)元素 |
| elementAt | 指定接收某個(gè)元素(通過(guò) 索引值 確定) |
| elementAtOrError | 在elementAt()的基礎(chǔ)上,當(dāng)出現(xiàn)越界情況(即獲取的位置索引 > 發(fā)送事件序列長(zhǎng)度)時(shí),即拋出異常 |
-
firstElement
Observable.just("hello", 1, "haha", 2).firstElement().subscribe((Consumer<Serializable>) serializable -> { Log.d(TAG, serializable.toString()); });結(jié)果:
2022-05-05 15:38:09.536 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: hello -
lastElement
Observable.just("hello", 1, "haha", 2).lastElement().subscribe((Consumer<Serializable>) serializable -> { Log.d(TAG, serializable.toString()); });結(jié)果:
2022-05-05 15:38:22.242 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: 2 -
elementAt
Observable.just(1,2,2,4,4,5,6, 1).elementAt(1).subscribe(new Consumer<Integer>() { @Override public void accept(Integer integer) throws Throwable { Log.d(TAG, "accept:" + integer); } });結(jié)果:
2022-05-05 15:39:14.759 3744-3744/com.alsan.rxjavademo D/RxFilterSymbolActivity: accept:2 -
elementAtOrError
Observable.just(1,2,3,4,5).elementAtOrError(9).subscribe(new SingleObserver<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { Log.d(TAG, "onSubscribe..."); } @Override public void onSuccess(@NonNull Integer integer) { Log.d(TAG, "onSuccess..." + integer); } @Override public void onError(@NonNull Throwable e) { Log.d(TAG, "onError:" + Log.getStackTraceString(e)); } });結(jié)果:
022-05-05 15:41:16.713 4285-4285/com.alsan.rxjavademo D/RxFilterSymbolActivity: onSubscribe... 2022-05-05 15:41:16.715 4285-4285/com.alsan.rxjavademo D/RxFilterSymbolActivity: onError:java.util.NoSuchElementException at io.reactivex.rxjava3.internal.operators.observable.ObservableElementAtSingle$ElementAtObserver.onComplete(ObservableElementAtSingle.java:115) at io.reactivex.rxjava3.internal.operators.observable.ObservableFromArray$FromArrayDisposable.run(ObservableFromArray.java:112) at io.reactivex.rxjava3.internal.operators.observable.ObservableFromArray.subscribeActual(ObservableFromArray.java:38) at io.reactivex.rxjava3.core.Observable.subscribe(Observable.java:13095) at io.reactivex.rxjava3.internal.operators.observable.ObservableElementAtSingle.subscribeActual(ObservableElementAtSingle.java:37) at io.reactivex.rxjava3.core.Single.subscribe(Single.java:4813) at com.alsan.rxjavademo.RxFilterSymbolActivity.lambda$onCreate$17$RxFilterSymbolActivity(RxFilterSymbolActivity.java:311) at com.alsan.rxjavademo.-$$Lambda$RxFilterSymbolActivity$bbHK8cGk2DGgxkmEzj5endgLd8A.onClick(Unknown Source:2) at android.view.View.performClick(View.java:6291) at com.google.android.material.button.MaterialButton.performClick(MaterialButton.java:967) at android.view.View$PerformClick.run(View.java:24931) at android.os.Handler.handleCallback(Handler.java:808) at android.os.Handler.dispatchMessage(Handler.java:101) at android.os.Looper.loop(Looper.java:166) at android.app.ActivityThread.main(ActivityThread.java:7529) at java.lang.reflect.Method.invoke(Native Method) at com.android.internal.os.Zygote$MethodAndArgsCaller.run(Zygote.java:245) at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:921)