RxJava日常使用總結(jié)(六)輔助操作

本篇介紹RxJava的輔助操作。例如Delay延時,Doxx系列事件鉤子,線程切換等。

delay操作符

延遲一段指定的時間再發(fā)射來自O(shè)bservable的發(fā)射物。就是推遲指定發(fā)射Observable的事件。

image.png
Observable
.fromIterable(mItems)
.delay(1, TimeUnit.SECONDS)
.as(RxLifecycleUtil.bindLifecycle(this))
.subscribe();

do操作符

注冊一個動作作為原始Observable生命周期事件的一種占位符。

  • Observable發(fā)射的事件,我們可以在注冊subscribe中處理,但有時候Observable是提供出去的,在哪里注冊都是未知的,不可能每個subscribe的地方都寫一套。這時候do操作符就用處了,do系列的操作符相當(dāng)于事件的鉤子,在執(zhí)行時調(diào)用使用do系列的操作的回調(diào)。

doOnEach操作符

doOnEach操作符讓你可以注冊一個回調(diào),它產(chǎn)生的Observable每發(fā)射一項數(shù)據(jù)就會調(diào)用它一次。參數(shù)是Notification。

image.png
Observable.fromIterable(mItems)
    .doOnEach(new Consumer<Notification<Object>>() {
        @Override
        public void accept(Notification<Object> notification) throws Exception {
            //...
        }
    })
    .as(RxLifecycleUtil.bindLifecycle(this))
    .subscribe();

doOnNext操作符

doOnNext操作符類似于doOnEach(Action1) ,但是它的Action不是接受一個Notification參數(shù),而是接受發(fā)射的數(shù)據(jù)項。

image.png
Observable.fromIterable(mItems)
    .doOnNext(new Consumer<Object>() {
        @Override
        public void accept(Object o) throws Exception {
            L.d(o.toString());
        }
    })
    .as(RxLifecycleUtil.bindLifecycle(this))
    .subscribe();

doOnSubscribe操作符

doOnSubscribe操作符注冊一個動作,當(dāng)觀察者訂閱它生成的Observable它就會被調(diào)用。

image.png
  • 注冊時就調(diào)用,例如接口請求的Observable,注冊時就彈出等待框。
Observable.fromIterable(mItems)
    .doOnSubscribe(new Consumer<Disposable>() {
        @Override
        public void accept(Disposable disposable) throws Exception {
              //準(zhǔn)備開始
            showLoading();
        }
    })
    .as(RxLifecycleUtil.bindLifecycle(this))
    .subscribe();

doOnComplete操作符

image.png

注冊一個完成回調(diào)

Observable.fromIterable(mItems)
        .doOnComplete(new Action() {
            @Override
            public void run() throws Exception {
                //完成...
            }
        })
        .as(RxLifecycleUtil.bindLifecycle(this))
        .subscribe();

doOnError操作符

image.png

注冊一個出錯回調(diào)

Observable.fromIterable(mItems)
    .doOnError(new Consumer<Throwable>() {
        @Override
        public void accept(Throwable throwable) throws Exception {
            //異常了
        }
    })
    .as(RxLifecycleUtil.bindLifecycle(this))
    .subscribe();

doTerminate操作符

doTerminate操作符注冊一個動作,當(dāng)它產(chǎn)生的Observable終止之前會被調(diào)用,無論是正 常還是異常終止。

image.png
Observable.fromIterable(mItems)
        .doOnTerminate(new Action() {
            @Override
            public void run() throws Exception {
                //...準(zhǔn)備結(jié)束了
            }
        })
        .as(RxLifecycleUtil.bindLifecycle(this))
        .subscribe();

doAfterTerminate操作符

doAfterTerminate操作符注冊一個動作,當(dāng)它產(chǎn)生的Observable終止之后會被調(diào)用,無論是正常還 是異常終止。

  • 結(jié)束時調(diào)用,接口請求成功或失敗后,隱藏彈窗
Observable.fromIterable(mItems)
        .doAfterTerminate(new Action() {
            @Override
            public void run() throws Exception {
                   //已經(jīng)結(jié)束
                hideLoading();
            }
        })
        .as(RxLifecycleUtil.bindLifecycle(this))
        .subscribe();

observeOn和subscribeOn操作符

observeOn.png
subscribeOn.png

ObserveOn:指定一個觀察者在哪個調(diào)度器上觀察這個Observable??梢哉f就是任務(wù)結(jié)束時進行回調(diào)的線程,而Android一般都是主線程。

SubscribeOn:指定Observable自身在哪個調(diào)度器上執(zhí)行??梢哉f是耗時操作指定的線程,一般為IO線程或computation計算線程。

  • Android的Handler的Scheduler一般都使用RxAnroid上提供的,怎么引用就不說了,看Github。例如
Observable.fromIterable(mItems)
          //Observable執(zhí)行在子線程,所以在子線程遍歷
        .subscribeOn(Schedulers.io())
         //回調(diào)線程,在主線程
        .observeOn(AndroidSchedulers.mainThread())
        .as(RxLifecycleUtil.bindLifecycle(this))
        .subscribe();

timeout操作符

對原始Observable的一個鏡像,如果過了一個指定的時長仍沒有發(fā)射數(shù)據(jù),它會發(fā)一個錯誤 通知。

image.png

一般用來定時檢查,Observable指定時間內(nèi)時候發(fā)送過事件,超過時間發(fā)送一個onError(),異常對象為TimeoutException。

  • 例如封裝WebSocket的Observable,指定時間內(nèi)沒有發(fā)出事件,發(fā)出onError()事件,再配合retry重試,嘗試重新連接WebSocket。
Observable
        .create(new WebSocketOnSubscribe(url))
        //如果數(shù)據(jù)源指定之間內(nèi)沒有發(fā)出消息,會發(fā)送一個超時異常,配合retry吃掉這個異常后,重試
        .timeout(timeout, timeUnit)
        .retry()
        //將回調(diào)都放置到主線程回調(diào),外部調(diào)用方直接觀察,實現(xiàn)響應(yīng)回調(diào)方法做UI處理
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());

timestamp操作符

給Observable發(fā)射的數(shù)據(jù)項附加一個時間戳

image.png
  • 例如WebSocket實踐中,后端要求我們定時多少秒發(fā)送一個WebSocket心跳消息。里面需要帶一個時間戳,就可以使用timestamp操作符,包裝數(shù)據(jù)為Timed對象。(內(nèi)部其實就是map操作符,將數(shù)據(jù)包裝在Timed對象,Timed對象中有個time字段為當(dāng)前時間戳)
/**
* 發(fā)送心跳包
*/
public Observable<Boolean> sendHeartBeatMsg() {
return getRxWebSocket().heartBeat(getUrl(),
        AskTeacherConstant.CONSULTING_ROOM_PING_MSG_INTERVAL_TIME,
        TimeUnit.SECONDS, new HeartBeatGenerateCallback() {
            @Override
            public String onGenerateHeartBeatMsg(long timestamp) {
                return GsonUtil.toJson(new HeartBeatMsgRequestModel(WssCommandTypeEnum.HEART_BEAT.getCode(),
                        String.valueOf(timestamp / 1000)));
            }
        });
}

@Override
public Observable<Boolean> heartBeat(String url, int period, TimeUnit unit,HeartBeatGenerateCallback heartBeatGenerateCallback) {
    return Observable
    .interval(period, unit)
    //timestamp操作符,給每個事件加一個時間戳
    .timestamp()
    .retry()
    .flatMap(new Function<Timed<Long>, ObservableSource<Boolean>>() {
            @Override
            public ObservableSource<Boolean> apply(Timed<Long> timed) throws Exception {
                long timestamp = timed.time();
                  String heartBeatMsg = heartBeatGenerateCallback.onGenerateHeartBeatMsg(timestamp);
                    Logger.d(TAG, "發(fā)送心跳消息: " + heartBeatMsg);
                    return send(url, heartBeatMsg);
            }
        });
}

serialize操作符

強制一個Observable連續(xù)調(diào)用并保證行為正確。

Observable發(fā)射事件的線程是多個不同子線程(異步)進行發(fā)射,就可能造成事件混亂??赡躱nNext()、onError、onComplete順序不是正確的,使用serialize操作符能使事件按同步順序返回。

image.png
Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        emitter.onNext("wally");
        emitter.onNext("wally");
        emitter.onComplete();
        emitter.onNext("wally");
    }
})
.serialize()
.subscribe();

//結(jié)果
//wally
//wally
//wally
//onComplete
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容