1. debounce操作符
debounce:“抖動(dòng)”,該操作符對(duì)Observable每產(chǎn)生一個(gè)結(jié)果后,如果在規(guī)定的間隔時(shí)間內(nèi)沒有別的結(jié)果產(chǎn)生,則把這個(gè)結(jié)果提交給訂閱者處理,否則忽略該結(jié)果。

debounce.png
示例原理用法
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
// send events with simulated time wait
emitter.onNext(1); // skip
Thread.sleep(400);
emitter.onNext(2); // deliver
Thread.sleep(505);
emitter.onNext(3); // skip
Thread.sleep(100);
emitter.onNext(4); // deliver
Thread.sleep(605);
emitter.onNext(5); // deliver
Thread.sleep(510);
emitter.onComplete();
}
})
// 設(shè)置時(shí)間為0.5秒
.debounce(500, TimeUnit.MILLISECONDS)
// Run on a background thread
.subscribeOn(Schedulers.io())
// Be notified on the main thread
.observeOn(AndroidSchedulers.mainThread())
.subscribe(getObserver());
運(yùn)行結(jié)果
“2”,“4”,“5”
分析
- 事件1發(fā)射后過了400毫秒后發(fā)射事件2,此時(shí)事件1不滿足時(shí)間的條件被遺棄,然后重新計(jì)時(shí);
- 事件2發(fā)出后休眠了505毫秒,超過了500毫秒,所以事件2被發(fā)射出來(lái);
- 事件3發(fā)出來(lái)后又過了100毫秒事件4發(fā)出來(lái),所以事件3被遺棄;
- 事件4重新計(jì)時(shí),后又過了605毫秒下一個(gè)事件才發(fā)出,所以4被發(fā)射了出來(lái);
- 同理,5之后的0.5秒內(nèi)也沒有再發(fā)出別的事件,所以最終5也被發(fā)射了出來(lái)。
2. switchMap操作符
當(dāng)源Observable發(fā)射一個(gè)新的數(shù)據(jù)項(xiàng)時(shí),如果舊數(shù)據(jù)項(xiàng)訂閱還未完成,就取消舊訂閱數(shù)據(jù)和停止監(jiān)視那個(gè)數(shù)據(jù)項(xiàng)產(chǎn)生的Observable,開始監(jiān)視新的數(shù)據(jù)項(xiàng)。如果都是在同一個(gè)線程里跑的話,那么該操作符與ContactMap無(wú)異;只有在不同的線程里跑的時(shí)候,即線程方案為newThread的時(shí)候,才會(huì)出現(xiàn)這種情況。

switchmap.png
同一線程
Observable.just("A", "B", "C", "D", "E").switchMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String s) {
Observable<String> ob = Observable.just(s);
return ob;
}
}).subscribeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<String>() {
@Override
public void onCompleted() {
Log.d("------>onCompleted()");
}
@Override
public void onError(Throwable e) {
Log.d("------>onError()" + e);
}
@Override
public void onNext(String s) {
Log.d("------>onNext:" + s);
}
});
輸出結(jié)果:
------>onNext:A
------>onNext:B
------>onNext:C
------>onNext:D
------>onNext:E
------>onCompleted()
并發(fā)
Observable.just("A", "B", "C", "D", "E").switchMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String s) {
return Observable.just(s).subscribeOn(Schedulers.newThread());
}
}).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<String>() {
@Override
public void onCompleted() {
Log.d("------>onCompleted()");
}
@Override
public void onError(Throwable e) {
Log.d("------>onError()" + e);
}
@Override
public void onNext(String s) {
Log.d("------>onNext:" + s);
}
});
輸出結(jié)果:
------>onNext:E
------>onCompleted()
3. 搜索功能
使用RxJava2提供的三個(gè)操作符進(jìn)行了優(yōu)化:
- 使用debounce操作符,當(dāng)輸入框發(fā)生變化時(shí),不會(huì)立刻將事件發(fā)送給下游,而是等待200ms,只有在這段事件內(nèi),輸入框沒有發(fā)生變化,那么才發(fā)送該事件;
- 使用switchMap操作符,當(dāng)發(fā)起了123的請(qǐng)求之后,即使12的結(jié)果返回了,也不會(huì)發(fā)送給下游。
public class SearchActivity extends AppCompatActivity {
private EditText mEtSearch;
private TextView mTvSearch;
private PublishSubject<String> mPublishSubject;
private DisposableObserver<String> mDisposableObserver;
private CompositeDisposable mCompositeDisposable;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_search);
mEtSearch = (EditText) findViewById(R.id.et_search);
mTvSearch = (TextView) findViewById(R.id.tv_search_result);
mEtSearch.addTextChangedListener(new TextWatcher() {
@Override
public void beforeTextChanged(CharSequence s, int start, int count, int after) {
}
@Override
public void onTextChanged(CharSequence s, int start, int before, int count) {
}
@Override
public void afterTextChanged(Editable s) {
startSearch(s.toString());
}
});
mPublishSubject = PublishSubject.create();
mDisposableObserver = new DisposableObserver<String>() {
@Override
public void onNext(String s) {
mPublishSubject.onNext(s);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
};
mPublishSubject.debounce(200, TimeUnit.MILLISECONDS).filter(new Predicate<String>() {
@Override
public boolean test(String s) throws Exception {
return s.length() > 0;
}
}).switchMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(String query) throws Exception {
return getSearchObservable(query);
}
}).observeOn(AndroidSchedulers.mainThread()).subscribe(mDisposableObserver);
mCompositeDisposable = new CompositeDisposable();
mCompositeDisposable.add(mDisposableObserver);
}
private Observable<String> getSearchObservable(final String query) {
return Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
try {
Thread.sleep(100 + (long) (Math.random() * 500));
} catch (InterruptedException e) {
if (!observableEmitter.isDisposed()) {
observableEmitter.onError(e);
}
}
observableEmitter.onComplete();
}
}).subscribeOn(Schedulers.io());
}
@Override
protected void onDestroy() {
super.onDestroy();
mCompositeDisposable.clear();
}
}