本文的所有分析都是基于 RxJava2 進(jìn)行的。以下的 RxJava 指 RxJava2
閱讀本文你將會知道:
- RxJava 的觀察綁定和事件發(fā)送過程
- RxJava 觀察綁定和事件發(fā)送過程中的線程切換
從 RxJava1.0 到 RxJava2.0,在項(xiàng)目開發(fā)中已經(jīng)使用了很長時(shí)間這個(gè)庫了。鏈?zhǔn)秸{(diào)用,絲滑的線程切換很香,但是如果沒弄清楚其中的奧妙很容易掉進(jìn)線程調(diào)度的坑里。這篇文章我們就來對 RxJava 的訂閱過程、時(shí)間發(fā)送過程、線程調(diào)度進(jìn)行分析
訂閱和事件流
先說結(jié)論
- 按著代碼書寫順序,事件自上向下發(fā)送
- 訂閱從
subscribe()開始自下向上訂閱,這也是整個(gè)事件流的起點(diǎn),當(dāng)訂閱開始整個(gè)操作才會生效執(zhí)行 - 訂閱完成后才會發(fā)送事件
圖解
為了更便于理解訂閱的流轉(zhuǎn)方向,我將Observable調(diào)用 subscribe() 訂閱描述為了 Observer beSubscribed()

源碼分析
Observabe 創(chuàng)建過程
此過程對應(yīng)圖中黑色箭頭部分,以操作符中的map()操作為例:
@CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper)); }
調(diào)用map操作符時(shí),RxJavaPliguns 會注冊一個(gè)新的 ObservableMap 對象,查看其它操作符會發(fā)現(xiàn)都有對應(yīng)的 Observable 對象產(chǎn)生。同時(shí),上游的 Observabe會作為 source 參數(shù)傳入賦值給這個(gè)新的 Observable 的 source屬性。層層向下,可以對這個(gè)新生成的 Observable又可以繼續(xù)使用操作符。
訂閱過程:
當(dāng)調(diào)用最后一個(gè) Observable 的 subscribe() 方法時(shí),即開始訂閱過程。此過程對應(yīng)圖中紅色箭頭部分
@SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins"); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }
在調(diào)用subscribe(Observer) 時(shí)實(shí)際上會去調(diào)用各個(gè) Observable實(shí)現(xiàn)子類中的 subscribeActual() 方法:
@Override public void subscribeActual(Observer<? super U> t) { source.subscribe(new MapObserver<T, U>(t, function)); }
而在這個(gè)subscribeActual() 方法也很簡單,調(diào)用了 source 去訂閱一個(gè)新生成的 Observer 對象,同時(shí)這個(gè)新的MapObserver會將調(diào)用subscribe()時(shí)傳入的 observer,賦值給downstream屬性。這樣每一級訂閱都會將上級的 Observable、本級生成的 Observer、訂閱下級傳入的Observer聯(lián)系起來,直到達(dá)到 Observable 最初創(chuàng)建的地方整個(gè)訂閱過程結(jié)束。
事件發(fā)送過程:
此過程對應(yīng)圖中綠色箭頭部分Observable 事件起點(diǎn)創(chuàng)建有很多中操作符,他們都會創(chuàng)建出最初發(fā)送的事件/數(shù)據(jù),以 ObservableCreate為例:
@Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } }
訂閱時(shí)會調(diào)用source.subscrebe(parent),而這個(gè)source 又是從哪兒來的呢?
public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; }
Observable.create(object : ObservableOnSubscribe<String> { override fun subscribe(emitter: ObservableEmitter<String>) { emitter.onNext("data") } })
從代碼中我們可以看出,這個(gè) source 即為我們創(chuàng)建時(shí)傳入的 ObservableOnSubscribe,因此emitter.onNext("data")即是事件發(fā)送的起點(diǎn)。我們再繼續(xù)看emitter的 onNext() 做了什么:
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
源碼中現(xiàn)實(shí)調(diào)用了observer.onNext(),而這個(gè)observer 則是前面訂閱過程中 source.subscribe(new MapObserver<T, U>(t, function)) 傳入的那個(gè) observer,從而將事件發(fā)送到了下一級,下一級的 Observer 同樣在 onNext() 將事件發(fā)送到更下一級,一直到最終我們 subscribe()時(shí)傳入的那個(gè)Observer 實(shí)例完畢。
線程調(diào)度
事件訂閱發(fā)送流程通過上面的文章基本已經(jīng)能夠摸清了,我們接下來關(guān)注另一個(gè)重點(diǎn) 線程調(diào)度問題。
調(diào)度方式
RxJava 中線程變換通過 subscribeOn()和 observeOn()兩個(gè)操作來進(jìn)行。其中 subscribeOn()改變的是訂閱線程的執(zhí)行線程,即事件發(fā)生的線程。observeOn()改變的是事件結(jié)果觀察者回調(diào)所在線程,即 onNext()方法所在的線程。

使用 RxJava + Retrofit 進(jìn)行網(wǎng)絡(luò)請求時(shí),用 RxJava 管理網(wǎng)絡(luò)請求過程的線程切換。
subscribeOn()指定的是網(wǎng)絡(luò)請求的線程,observeOn()指定的是網(wǎng)絡(luò)請求后事件流的執(zhí)行線程。
源碼分析
前面說過,每次操作符的使用,RxJava 都會生成一個(gè)對應(yīng)的新的 Observable對象。observeOn()與 subscribeOn()也不例外。線程調(diào)度的核心邏輯都在 ObservableSubscribeOn 與 ObservableObserveOn兩個(gè)類中
subscribeOn()過程
@CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); }
調(diào)用 subscribeOn() 時(shí)會產(chǎn)生一個(gè)新的ObservableSubscribeOn并把當(dāng)前這個(gè)Observable 和傳入的 Scheduler作為參數(shù)傳入。前面分析過當(dāng)最終調(diào)用 subscribe()時(shí)會引起整個(gè)觀察鏈的 Observable 自下而上調(diào)用 subscribe(),而這個(gè)subscribe()方法中實(shí)際為調(diào)用抽象類 Observable的各個(gè)實(shí)現(xiàn)子類的 subscribeActual()方法 。
@Override public void subscribeActual(final Observer<? super T> observer) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer); observer.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }
主要看這句 scheduler.scheduleDirect(new SubscribeTask(parent));,SubscribeTask 前面內(nèi)容已經(jīng)分析過,就是調(diào)用上級 Observable 來訂閱生成的這個(gè) SubscribeOnObserver。
@NonNull public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); DisposeTask task = new DisposeTask(decoratedRun, w); w.schedule(task, delay, unit); return task; }
scheduleDirect 方法,會使用傳入的 scheduler 在指定的線程創(chuàng)建一個(gè) Worker 對象來執(zhí)行SubscribeTask,從而達(dá)到了切換訂閱線程的目的。所以多個(gè)subscribeOn()疊加時(shí),最終線程還是會回到最后執(zhí)行的(代碼第一次出現(xiàn)的)subscribeOn() 指定的線程。
observeOn()過程
調(diào)用 observeOn(Scheduler) 方法,會調(diào)用內(nèi)部的同名方法生成一個(gè)新的 ObservableObserveOn對象,并把當(dāng)前這個(gè)Observable 和傳入的 Scheduler作為參數(shù)傳入。訂閱過程與ObservableSubscribeOn不一樣,會直接在當(dāng)前線程調(diào)用上級Observable訂閱自己,,我們主要看ObservableObserveOn的ObserveOnObserver是如何調(diào)度結(jié)果數(shù)據(jù)發(fā)送的線程的。
@Override public void onNext(T t) { if (done) { return; } if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } schedule(); } void schedule() { if (getAndIncrement() == 0) { worker.schedule(this); } }
從源碼中可以發(fā)現(xiàn),最終會使用 worker 去向下游發(fā)送事件。這個(gè) worker就是我們observeOn() 方法中指定的線程創(chuàng)建的 worker。從而達(dá)到切換線程的目的,由于事件又是自上而下的,所以每次切換都能在下游事件中感受到線程的變化。
日志分析
把subscribeOn()和 observeOn()放一起來說不太容易說明白其中的線程變換,我先看看單獨(dú)使用其中的一個(gè)操作符的時(shí)候,導(dǎo)致的線程變化。
僅調(diào)用 subscribeOn() 調(diào)度線程
Observable.just("Data")
.map {
Log.d("Map 1", Thread.currentThread().name)
return@map it
}
.subscribeOn(Schedulers.io())
.doOnSubscribe {
Log.d("doOnSubscribe 1 ", Thread.currentThread().name)
}
.map {
Log.d("Map 2 ", Thread.currentThread().name)
return@map it
}
.subscribeOn(Schedulers.newThread())
.doOnSubscribe {
Log.d("doOnSubscribe 2 ", Thread.currentThread().name)
}
.map {
Log.d("Map 3 ", Thread.currentThread().name)
return@map it
}
.subscribe(object : Observer<String> {
override fun onComplete() {
}
override fun onSubscribe(d: Disposable) {
Log.d("onSubscribe", Thread.currentThread().name)
}
override fun onNext(t: String) {
Log.d("onNext", Thread.currentThread().name)
}
override fun onError(e: Throwable) {
e.printStackTrace()
}
})
執(zhí)行結(jié)果:

從日志可以看出:
- 1、訂閱是自下向上的(onSubscribe -->doOnSubscribe 2 -->doOnsubscribe 1)
- 2、自下向上看,每次調(diào)用
subscribeOn訂閱線程將會發(fā)生改變,直到下次調(diào)用subscribeOn - 3、事件是自上向下傳遞的(Map 1 --> Map 2 --> Map 3 --> onNext),且所在線程為最后一次線程切換后所在的線程
RxCachedThreadScheduler-1
僅調(diào)用 subscribeOn() 調(diào)度線程
Observable.just("Data")
.map {
Log.d("Map 1", Thread.currentThread().name)
return@map it
}
// .doOnSubscribe {
// Log.d("doOnSubscribe 1 ", Thread.currentThread().name)
// }
// .subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map {
Log.d("Map 2 ", Thread.currentThread().name)
return@map it
}
// .doOnSubscribe {
// Log.d("doOnSubscribe 2 ", Thread.currentThread().name)
// }
// .subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.map {
Log.d("Map 3 ", Thread.currentThread().name)
return@map it
}
.subscribe(object : Observer<String> {
override fun onComplete() {
}
override fun onSubscribe(d: Disposable) {
Log.d("onSubscribe", Thread.currentThread().name)
}
override fun onNext(t: String) {
Log.d("onNext", Thread.currentThread().name)
}
override fun onError(e: Throwable) {
e.printStackTrace()
}
})
執(zhí)行結(jié)果:

從日志可以看出:
- 1、事件發(fā)送是正常的自上向下(Map 1 --> Map 2 --> Map 3 --> onNex)
- 2、自上向下,每次調(diào)用
observeOn觀察結(jié)果回調(diào)線程都將切換一次(main -->RxNewThreadScheduler-1 -->RxNewThreadScheduler-2)
混合使用調(diào)度線程
我們把上述代碼中注釋部分都打開,得到的日志如下:

通過上面的三次日志打印我們可以看出:
訂閱鏈的日志自下而上打印完畢后,再自上而下打印觀察結(jié)果。subscribeOn 會切換線程,并不是像有的文章所說只有第一次指定線程(即自下而上的最后一次)有效。第一次有效只是我們的錯(cuò)覺,因?yàn)橛嗛喪亲韵露系模还芮懊娴木€程怎樣切換追蹤都會切換到 subscribeOn第一次指定線程(即自下而上的最后一次)。我們在回調(diào)結(jié)果中未進(jìn)行線程切換操作時(shí),只能感知到這一次線程切換 (Map1 與 doOnSubscribe 1 所在線程一致)。observeOn的每次指定線程都會讓事件流切換到對應(yīng)的線程中去。完整的事件訂閱和發(fā)送流程如下圖所示,從我們調(diào)用 subscribe()將觀察者和觀察對象關(guān)聯(lián)起來開始,subscribe() 中傳入的 Observer 的 onNext 或 onError結(jié)束,形成了一個(gè)逆時(shí)針的 n 形的鏈條。右邊部分的觀察鏈中,每次 subscribeOn 都會切換觀察線程。左邊部分的事件發(fā)送鏈,會從觀察鏈的最后一次指定的線程開始發(fā)送事件,每次調(diào)用 observeOn都會指定新的事件發(fā)送線程。
圖解
參照上面的源碼和日志分析,再結(jié)合本圖相信大家會對 RxJava 的現(xiàn)場調(diào)度有一個(gè)更立體的認(rèn)識
