RxJava系列之debounce,switchMap操作符及項(xiàng)目實(shí)際使用案例

1. debounce操作符

debounce:“抖動(dòng)”,該操作符對(duì)Observable每產(chǎn)生一個(gè)結(jié)果后,如果在規(guī)定的間隔時(shí)間內(nèi)沒有別的結(jié)果產(chǎn)生,則把這個(gè)結(jié)果提交給訂閱者處理,否則忽略該結(jié)果。

debounce.png
示例原理用法
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                // send events with simulated time wait
                emitter.onNext(1); // skip
                Thread.sleep(400);
                emitter.onNext(2); // deliver
                Thread.sleep(505);
                emitter.onNext(3); // skip
                Thread.sleep(100);
                emitter.onNext(4); // deliver
                Thread.sleep(605);
                emitter.onNext(5); // deliver
                Thread.sleep(510);
                emitter.onComplete();
            }
        })
              // 設(shè)置時(shí)間為0.5秒
            .debounce(500, TimeUnit.MILLISECONDS)
             // Run on a background thread
             .subscribeOn(Schedulers.io())
             // Be notified on the main thread
             .observeOn(AndroidSchedulers.mainThread())
              .subscribe(getObserver());

運(yùn)行結(jié)果
“2”,“4”,“5”

分析
  • 事件1發(fā)射后過了400毫秒后發(fā)射事件2,此時(shí)事件1不滿足時(shí)間的條件被遺棄,然后重新計(jì)時(shí);
  • 事件2發(fā)出后休眠了505毫秒,超過了500毫秒,所以事件2被發(fā)射出來(lái);
  • 事件3發(fā)出來(lái)后又過了100毫秒事件4發(fā)出來(lái),所以事件3被遺棄;
  • 事件4重新計(jì)時(shí),后又過了605毫秒下一個(gè)事件才發(fā)出,所以4被發(fā)射了出來(lái);
  • 同理,5之后的0.5秒內(nèi)也沒有再發(fā)出別的事件,所以最終5也被發(fā)射了出來(lái)。

2. switchMap操作符

當(dāng)源Observable發(fā)射一個(gè)新的數(shù)據(jù)項(xiàng)時(shí),如果舊數(shù)據(jù)項(xiàng)訂閱還未完成,就取消舊訂閱數(shù)據(jù)和停止監(jiān)視那個(gè)數(shù)據(jù)項(xiàng)產(chǎn)生的Observable,開始監(jiān)視新的數(shù)據(jù)項(xiàng)。如果都是在同一個(gè)線程里跑的話,那么該操作符與ContactMap無(wú)異;只有在不同的線程里跑的時(shí)候,即線程方案為newThread的時(shí)候,才會(huì)出現(xiàn)這種情況。

switchmap.png
同一線程
Observable.just("A", "B", "C", "D", "E").switchMap(new Func1<String, Observable<String>>() {
          @Override
          public Observable<String> call(String s) {
              Observable<String> ob = Observable.just(s);
              return ob;
          }
      }).subscribeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<String>() {
          @Override
          public void onCompleted() {
              Log.d("------>onCompleted()");
          }

          @Override
          public void onError(Throwable e) {
              Log.d("------>onError()" + e);
          }

          @Override
          public void onNext(String s) {
              Log.d("------>onNext:" + s);
          }
      });

輸出結(jié)果:

------>onNext:A
------>onNext:B
------>onNext:C
------>onNext:D
------>onNext:E
------>onCompleted()
并發(fā)
Observable.just("A", "B", "C", "D", "E").switchMap(new Func1<String, Observable<String>>() {
            @Override
            public Observable<String> call(String s) {
                return Observable.just(s).subscribeOn(Schedulers.newThread());
            }
        }).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<String>() {
            @Override
            public void onCompleted() {
                Log.d("------>onCompleted()");
            }
 
            @Override
            public void onError(Throwable e) {
                Log.d("------>onError()" + e);
            }
 
            @Override
            public void onNext(String s) {
                Log.d("------>onNext:" + s);
            }
        });

輸出結(jié)果:

------>onNext:E
------>onCompleted()

3. 搜索功能

使用RxJava2提供的三個(gè)操作符進(jìn)行了優(yōu)化:

  • 使用debounce操作符,當(dāng)輸入框發(fā)生變化時(shí),不會(huì)立刻將事件發(fā)送給下游,而是等待200ms,只有在這段事件內(nèi),輸入框沒有發(fā)生變化,那么才發(fā)送該事件;
  • 使用switchMap操作符,當(dāng)發(fā)起了123的請(qǐng)求之后,即使12的結(jié)果返回了,也不會(huì)發(fā)送給下游。
public class SearchActivity extends AppCompatActivity {

    private EditText mEtSearch;
    private TextView mTvSearch;
    private PublishSubject<String> mPublishSubject;
    private DisposableObserver<String> mDisposableObserver;
    private CompositeDisposable mCompositeDisposable;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_search);
        mEtSearch = (EditText) findViewById(R.id.et_search);
        mTvSearch = (TextView) findViewById(R.id.tv_search_result);
        mEtSearch.addTextChangedListener(new TextWatcher() {

            @Override
            public void beforeTextChanged(CharSequence s, int start, int count, int after) {
            }

            @Override
            public void onTextChanged(CharSequence s, int start, int before, int count) {
            }

            @Override
            public void afterTextChanged(Editable s) {
                startSearch(s.toString());
            }
        });
        mPublishSubject = PublishSubject.create();
        mDisposableObserver = new DisposableObserver<String>() {

            @Override
            public void onNext(String s) {
                   mPublishSubject.onNext(s);
            }

            @Override
            public void onError(Throwable throwable) {
            }

            @Override
            public void onComplete() {
            }
        };
        mPublishSubject.debounce(200, TimeUnit.MILLISECONDS).filter(new Predicate<String>() {

            @Override
            public boolean test(String s) throws Exception {
                return s.length() > 0;
            }

        }).switchMap(new Function<String, ObservableSource<String>>() {

            @Override
            public ObservableSource<String> apply(String query) throws Exception {
                return getSearchObservable(query);
            }

        }).observeOn(AndroidSchedulers.mainThread()).subscribe(mDisposableObserver);
        mCompositeDisposable = new CompositeDisposable();
        mCompositeDisposable.add(mDisposableObserver);
    }

    private Observable<String> getSearchObservable(final String query) {
        return Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
                try {
                    Thread.sleep(100 + (long) (Math.random() * 500));
                } catch (InterruptedException e) {
                    if (!observableEmitter.isDisposed()) {
                        observableEmitter.onError(e);
                    }
                }
                observableEmitter.onComplete();
            }
        }).subscribeOn(Schedulers.io());
    }

    @Override
    protected void onDestroy() {
        super.onDestroy();
        mCompositeDisposable.clear();
    }
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 前言 按照官方的分類,操作符大致分為以下幾種: Creating Observables(Observable的創(chuàng)...
    小玉1991閱讀 1,111評(píng)論 0 1
  • 注:只包含標(biāo)準(zhǔn)包中的操作符,用于個(gè)人學(xué)習(xí)及備忘參考博客:http://blog.csdn.net/maplejaw...
    小白要超神閱讀 2,366評(píng)論 2 8
  • 一、RxJava操作符概述 RxJava中的操作符就是為了提供函數(shù)式的特性,函數(shù)式最大的好處就是使得數(shù)據(jù)處理簡(jiǎn)潔易...
    無(wú)求_95dd閱讀 3,487評(píng)論 0 21
  • 如果把整個(gè)事件看作是工廠的流水線,Observable就是原料。Observer就是我們的產(chǎn)品經(jīng)理,這個(gè)產(chǎn)品是怎么...
    吻中求勝閱讀 702評(píng)論 0 6
  • 記錄RxJava操作符,方便查詢(2.2.2版本) 英文文檔地址:http://reactivex.io/docu...
    凌云飛魚閱讀 1,048評(píng)論 0 0

友情鏈接更多精彩內(nèi)容