concat操作符可以接收若干個(gè)Observables,并且保證發(fā)射的數(shù)據(jù)是有序的。
官方文檔:Returns an Observable that emits the items emitted by three Observables, one after the other, without interleaving them.

onNext、onComplete的觸發(fā)順序
//關(guān)鍵代碼示例
Observable.concat(firstObservable, secondObservable)
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe(concatObserver);
以上面的代碼為例,總結(jié)一下onNext、onComplete的執(zhí)行順序。
1、concatObserver按順序接收到firstObservable的onNext傳遞的數(shù)據(jù),secondObservable的onNext傳遞的數(shù)據(jù),最后再觸發(fā)onComplete。
2、firstObservable必須要執(zhí)行emitter.onComplete后,secondObservable的emitter.onNext才能傳遞到concatObserver的onNext方法。
3、firstObservable和secondObservable必須都要調(diào)用emitter.onComplete才能執(zhí)行concatObserver的onComplete方法。
4、firstObservable、secondObservable在emitter.onComplete方法后調(diào)用的emitter.onNext并不會(huì)抵達(dá)concatObserver的onNext方法。emitter.onError方法后的emitter.onNext方法同上。但不要再emitter.onComplete后調(diào)用emitter.onError,否則出現(xiàn)io.reactivex.exceptions.UndeliverableException
onNext、onError的觸發(fā)順序
1、firstObservable執(zhí)行emitter.onError后,secondObservable的emitter.onNext不會(huì)觸發(fā),且secondObservable的subscribe都沒有觸發(fā)。
onNext、onError的坑
一般情況,eg不切換線程,secondObservable必須等firstObservable的onComplete之后才會(huì)觸發(fā)。但是在開發(fā)中遇到一個(gè)場景,firstObservable查詢db緩存正常,觸發(fā)emitter.onNext,emitter.onComplete方法。但是secondObservable因網(wǎng)絡(luò)異常立即返回了Exception觸發(fā)emitter.onError。這時(shí)concatObserver竟然沒有觸發(fā)onNext,只觸發(fā)了一次onError。
各種排查后,看到stackoverflow上一個(gè)提問,ReactiveX concat doesn't produce onNext from first observable if second fails immediately恍然大悟。
根據(jù)observerOn默認(rèn)方法的javadoc說明,onError事件可能插隊(duì)到onNext之前執(zhí)行。說明如下:
Note that onError notifications will cut ahead of onNext notifications on the emission thread if Scheduler is truly asynchronous.
可以用observerOn的一個(gè)重載方法,增加一個(gè)delayError參數(shù)為true。
indicates if the onError notification may not cut ahead of onNext notification on the other side of the scheduling boundary. If true a sequence ending in onError will be replayed in the same order as was received from upstream
我理解的是firstObservable在其observeOn的線程準(zhǔn)備觸發(fā)concat的observer;secondObservable在其observeOn線程觸發(fā);二者最終都需要在concat的observeOn上運(yùn)行。在這個(gè)過程中,如果firstObservable和secondObservable還有concat的Observer都不在一個(gè)線程,就可能出現(xiàn)時(shí)序問題,導(dǎo)致onError截?cái)嗟給nNext之前。
所以另一種方案也生效:
Observable.concat(
getContentFromCache.subscribeOn(dbScheduler).observeOn(AndroidSchedulers.mainThread()),
getContentFromNetwork.subscibeOn(networkScheduler).observeOn(AndroidSchedulers.mainThread())
)
.subscribe(subscriber);
參考文檔:
1、Rxjava2中Concat操作符onNext,OnError,OnComplte的執(zhí)行順序
2、 ReactiveX concat doesn't produce onNext from first observable if second fails immediately