RxJava操作符-concat之onNext、onComplete、onError觸發(fā)順序填坑

concat操作符可以接收若干個(gè)Observables,并且保證發(fā)射的數(shù)據(jù)是有序的。

官方文檔:Returns an Observable that emits the items emitted by three Observables, one after the other, without interleaving them.

concat流程圖

onNext、onComplete的觸發(fā)順序

//關(guān)鍵代碼示例
Observable.concat(firstObservable, secondObservable)
          .observeOn(AndroidSchedulers.mainThread())
          .subscribeOn(Schedulers.io())
          .subscribe(concatObserver);

以上面的代碼為例,總結(jié)一下onNext、onComplete的執(zhí)行順序。
1、concatObserver按順序接收到firstObservable的onNext傳遞的數(shù)據(jù),secondObservable的onNext傳遞的數(shù)據(jù),最后再觸發(fā)onComplete。
2、firstObservable必須要執(zhí)行emitter.onComplete后,secondObservable的emitter.onNext才能傳遞到concatObserver的onNext方法。
3、firstObservable和secondObservable必須都要調(diào)用emitter.onComplete才能執(zhí)行concatObserver的onComplete方法。
4、firstObservable、secondObservable在emitter.onComplete方法后調(diào)用的emitter.onNext并不會(huì)抵達(dá)concatObserver的onNext方法。emitter.onError方法后的emitter.onNext方法同上。但不要再emitter.onComplete后調(diào)用emitter.onError,否則出現(xiàn)io.reactivex.exceptions.UndeliverableException

onNext、onError的觸發(fā)順序

1、firstObservable執(zhí)行emitter.onError后,secondObservable的emitter.onNext不會(huì)觸發(fā),且secondObservable的subscribe都沒有觸發(fā)。

onNext、onError的坑

一般情況,eg不切換線程,secondObservable必須等firstObservable的onComplete之后才會(huì)觸發(fā)。但是在開發(fā)中遇到一個(gè)場景,firstObservable查詢db緩存正常,觸發(fā)emitter.onNext,emitter.onComplete方法。但是secondObservable因網(wǎng)絡(luò)異常立即返回了Exception觸發(fā)emitter.onError。這時(shí)concatObserver竟然沒有觸發(fā)onNext,只觸發(fā)了一次onError。

各種排查后,看到stackoverflow上一個(gè)提問,ReactiveX concat doesn't produce onNext from first observable if second fails immediately恍然大悟。
根據(jù)observerOn默認(rèn)方法的javadoc說明,onError事件可能插隊(duì)到onNext之前執(zhí)行。說明如下:

Note that onError notifications will cut ahead of onNext notifications on the emission thread if Scheduler is truly asynchronous.

可以用observerOn的一個(gè)重載方法,增加一個(gè)delayError參數(shù)為true。

indicates if the onError notification may not cut ahead of onNext notification on the other side of the scheduling boundary. If true a sequence ending in onError will be replayed in the same order as was received from upstream

我理解的是firstObservable在其observeOn的線程準(zhǔn)備觸發(fā)concat的observer;secondObservable在其observeOn線程觸發(fā);二者最終都需要在concat的observeOn上運(yùn)行。在這個(gè)過程中,如果firstObservable和secondObservable還有concat的Observer都不在一個(gè)線程,就可能出現(xiàn)時(shí)序問題,導(dǎo)致onError截?cái)嗟給nNext之前。
所以另一種方案也生效:

Observable.concat(
    getContentFromCache.subscribeOn(dbScheduler).observeOn(AndroidSchedulers.mainThread()),
    getContentFromNetwork.subscibeOn(networkScheduler).observeOn(AndroidSchedulers.mainThread())
)
 .subscribe(subscriber);

參考文檔:
1、Rxjava2中Concat操作符onNext,OnError,OnComplte的執(zhí)行順序
2、 ReactiveX concat doesn't produce onNext from first observable if second fails immediately

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容