本篇介紹RxJava的輔助操作。例如Delay延時,Doxx系列事件鉤子,線程切換等。
delay操作符
延遲一段指定的時間再發(fā)射來自O(shè)bservable的發(fā)射物。就是推遲指定發(fā)射Observable的事件。

Observable
.fromIterable(mItems)
.delay(1, TimeUnit.SECONDS)
.as(RxLifecycleUtil.bindLifecycle(this))
.subscribe();
do操作符
注冊一個動作作為原始Observable生命周期事件的一種占位符。
- Observable發(fā)射的事件,我們可以在注冊subscribe中處理,但有時候Observable是提供出去的,在哪里注冊都是未知的,不可能每個subscribe的地方都寫一套。這時候do操作符就用處了,do系列的操作符相當(dāng)于事件的鉤子,在執(zhí)行時調(diào)用使用do系列的操作的回調(diào)。
doOnEach操作符
doOnEach操作符讓你可以注冊一個回調(diào),它產(chǎn)生的Observable每發(fā)射一項數(shù)據(jù)就會調(diào)用它一次。參數(shù)是Notification。

Observable.fromIterable(mItems)
.doOnEach(new Consumer<Notification<Object>>() {
@Override
public void accept(Notification<Object> notification) throws Exception {
//...
}
})
.as(RxLifecycleUtil.bindLifecycle(this))
.subscribe();
doOnNext操作符
doOnNext操作符類似于doOnEach(Action1) ,但是它的Action不是接受一個Notification參數(shù),而是接受發(fā)射的數(shù)據(jù)項。

Observable.fromIterable(mItems)
.doOnNext(new Consumer<Object>() {
@Override
public void accept(Object o) throws Exception {
L.d(o.toString());
}
})
.as(RxLifecycleUtil.bindLifecycle(this))
.subscribe();
doOnSubscribe操作符
doOnSubscribe操作符注冊一個動作,當(dāng)觀察者訂閱它生成的Observable它就會被調(diào)用。

- 注冊時就調(diào)用,例如接口請求的Observable,注冊時就彈出等待框。
Observable.fromIterable(mItems)
.doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
//準(zhǔn)備開始
showLoading();
}
})
.as(RxLifecycleUtil.bindLifecycle(this))
.subscribe();
doOnComplete操作符

注冊一個完成回調(diào)
Observable.fromIterable(mItems)
.doOnComplete(new Action() {
@Override
public void run() throws Exception {
//完成...
}
})
.as(RxLifecycleUtil.bindLifecycle(this))
.subscribe();
doOnError操作符

注冊一個出錯回調(diào)
Observable.fromIterable(mItems)
.doOnError(new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
//異常了
}
})
.as(RxLifecycleUtil.bindLifecycle(this))
.subscribe();
doTerminate操作符
doTerminate操作符注冊一個動作,當(dāng)它產(chǎn)生的Observable終止之前會被調(diào)用,無論是正 常還是異常終止。

Observable.fromIterable(mItems)
.doOnTerminate(new Action() {
@Override
public void run() throws Exception {
//...準(zhǔn)備結(jié)束了
}
})
.as(RxLifecycleUtil.bindLifecycle(this))
.subscribe();
doAfterTerminate操作符
doAfterTerminate操作符注冊一個動作,當(dāng)它產(chǎn)生的Observable終止之后會被調(diào)用,無論是正常還 是異常終止。
- 結(jié)束時調(diào)用,接口請求成功或失敗后,隱藏彈窗
Observable.fromIterable(mItems)
.doAfterTerminate(new Action() {
@Override
public void run() throws Exception {
//已經(jīng)結(jié)束
hideLoading();
}
})
.as(RxLifecycleUtil.bindLifecycle(this))
.subscribe();
observeOn和subscribeOn操作符


ObserveOn:指定一個觀察者在哪個調(diào)度器上觀察這個Observable??梢哉f就是任務(wù)結(jié)束時進行回調(diào)的線程,而Android一般都是主線程。
SubscribeOn:指定Observable自身在哪個調(diào)度器上執(zhí)行??梢哉f是耗時操作指定的線程,一般為IO線程或computation計算線程。
- Android的Handler的Scheduler一般都使用RxAnroid上提供的,怎么引用就不說了,看Github。例如
Observable.fromIterable(mItems)
//Observable執(zhí)行在子線程,所以在子線程遍歷
.subscribeOn(Schedulers.io())
//回調(diào)線程,在主線程
.observeOn(AndroidSchedulers.mainThread())
.as(RxLifecycleUtil.bindLifecycle(this))
.subscribe();
timeout操作符
對原始Observable的一個鏡像,如果過了一個指定的時長仍沒有發(fā)射數(shù)據(jù),它會發(fā)一個錯誤 通知。

一般用來定時檢查,Observable指定時間內(nèi)時候發(fā)送過事件,超過時間發(fā)送一個onError(),異常對象為TimeoutException。
- 例如封裝WebSocket的Observable,指定時間內(nèi)沒有發(fā)出事件,發(fā)出onError()事件,再配合retry重試,嘗試重新連接WebSocket。
Observable
.create(new WebSocketOnSubscribe(url))
//如果數(shù)據(jù)源指定之間內(nèi)沒有發(fā)出消息,會發(fā)送一個超時異常,配合retry吃掉這個異常后,重試
.timeout(timeout, timeUnit)
.retry()
//將回調(diào)都放置到主線程回調(diào),外部調(diào)用方直接觀察,實現(xiàn)響應(yīng)回調(diào)方法做UI處理
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
timestamp操作符
給Observable發(fā)射的數(shù)據(jù)項附加一個時間戳

- 例如WebSocket實踐中,后端要求我們定時多少秒發(fā)送一個WebSocket心跳消息。里面需要帶一個時間戳,就可以使用timestamp操作符,包裝數(shù)據(jù)為Timed對象。(內(nèi)部其實就是map操作符,將數(shù)據(jù)包裝在Timed對象,Timed對象中有個time字段為當(dāng)前時間戳)
/**
* 發(fā)送心跳包
*/
public Observable<Boolean> sendHeartBeatMsg() {
return getRxWebSocket().heartBeat(getUrl(),
AskTeacherConstant.CONSULTING_ROOM_PING_MSG_INTERVAL_TIME,
TimeUnit.SECONDS, new HeartBeatGenerateCallback() {
@Override
public String onGenerateHeartBeatMsg(long timestamp) {
return GsonUtil.toJson(new HeartBeatMsgRequestModel(WssCommandTypeEnum.HEART_BEAT.getCode(),
String.valueOf(timestamp / 1000)));
}
});
}
@Override
public Observable<Boolean> heartBeat(String url, int period, TimeUnit unit,HeartBeatGenerateCallback heartBeatGenerateCallback) {
return Observable
.interval(period, unit)
//timestamp操作符,給每個事件加一個時間戳
.timestamp()
.retry()
.flatMap(new Function<Timed<Long>, ObservableSource<Boolean>>() {
@Override
public ObservableSource<Boolean> apply(Timed<Long> timed) throws Exception {
long timestamp = timed.time();
String heartBeatMsg = heartBeatGenerateCallback.onGenerateHeartBeatMsg(timestamp);
Logger.d(TAG, "發(fā)送心跳消息: " + heartBeatMsg);
return send(url, heartBeatMsg);
}
});
}
serialize操作符
強制一個Observable連續(xù)調(diào)用并保證行為正確。
Observable發(fā)射事件的線程是多個不同子線程(異步)進行發(fā)射,就可能造成事件混亂??赡躱nNext()、onError、onComplete順序不是正確的,使用serialize操作符能使事件按同步順序返回。

Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("wally");
emitter.onNext("wally");
emitter.onComplete();
emitter.onNext("wally");
}
})
.serialize()
.subscribe();
//結(jié)果
//wally
//wally
//wally
//onComplete