RxJava2源碼學(xué)習(xí)
Rxjava最引以為傲的鏈?zhǔn)讲僮?,每個方法都是產(chǎn)生一個Obserable,這樣才能鏈?zhǔn)秸{(diào)用。每個方法產(chǎn)生的Obserable內(nèi)部都有三個東西,代理Observer,下游Observer,上游Obserable。
1.事件的創(chuàng)建:
這是一段沒有任何操作符和線程調(diào)度的代碼:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
進去看看create操作符到底干了什么,怎么創(chuàng)建的一個Obserable。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
//檢查是否為null
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
--->onAssembly,這里創(chuàng)建了一個ObservableCreate傳了進去
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
//f默認為null
if (f != null) {
return apply(f, source);
}
return source;
}
這個方法接受一個Obserable,方法里的f默認為null,需要我們預(yù)先設(shè)置,這個有關(guān)hook,一般用不到,后面所有有關(guān)hook的代碼都先忽略。所以實際上就是返回了我們剛剛傳進來的suorce。這樣create方法實際返回了ObservableCreate對象,就是我們需要的Obserable了。
--->ObservableCreate,構(gòu)造方法的代碼,可以看到繼承自O(shè)bserable
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
//構(gòu)造方法傳進來了我們在外面的回調(diào)
this.source = source;
}
}
創(chuàng)建完成了,再看訂閱:
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
//有關(guān)hook,忽略
observer = RxJavaPlugins.onSubscribe(this, observer);
//檢查observer是否為null
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
//這是實際訂閱方法
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
--->subscribeActual,這是個Obserable的抽象方法,需要子類具體Obserable去實現(xiàn)。我們?nèi)倓偸褂胏reate方法創(chuàng)建的ObservableCreate看看實現(xiàn)。
@Override
protected void subscribeActual(Observer<? super T> observer) {
//創(chuàng)建一個發(fā)射器,同時也是一個開關(guān),數(shù)據(jù)將由它源源不斷的發(fā)射到下游
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//調(diào)用下游觀察者的onSubscribe將開關(guān)傳遞下去,用來控制事件發(fā)射
observer.onSubscribe(parent);
try {
//suorce就是我們的回調(diào)類,在subscribe里面我們操縱e.onNext(),e.onComplete(),e.onError()發(fā)射事件
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
這里可以看到上游的Obserable已經(jīng)可以將事件發(fā)射出去了,那到底怎么傳遞到下游的,既然是通過e.onNext(),e.onComplete(),e.onError()發(fā)射出去的,看看唄
//繼承了發(fā)射器ObservableEmitter,實現(xiàn)了Disposable開關(guān),同時還發(fā)現(xiàn)它還繼承了AtomicReference<Disposable>,AtomicXXX系列的類是線程安全的原子操作類,不用加鎖,Rxjava里面的Disposable開關(guān)的控制就是通過它來保證線程安全的。
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
//構(gòu)造方法,將下游Observer保存
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
//重點,數(shù)據(jù)傳遞
@Override
public void onNext(T t) {
//null判斷,因此Rxjava2不能發(fā)射null了。
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
//檢查控制訂閱關(guān)系的開關(guān)Dispose,不為false才發(fā)送數(shù)據(jù)
if (!isDisposed()) {
observer.onNext(t);
}
}
//異常事件
@Override
public void onError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
//同樣檢查開關(guān)
if (!isDisposed()) {
try {
observer.onError(t);
} finally {
//這是finally塊,因為產(chǎn)生onError或complete,就意味著訂閱關(guān)系已終止,必須解除
dispose();
}
} else {
RxJavaPlugins.onError(t);
}
}
@Override
public void onComplete() {
//同樣檢查開關(guān)
if (!isDisposed()) {
try {
//這是finally塊,因為產(chǎn)生onError或complete,就意味著訂閱關(guān)系已終止,必須解除
observer.onComplete();
} finally {
dispose();
}
}
}
@Override
public void setDisposable(Disposable d) {
DisposableHelper.set(this, d);
}
@Override
public void setCancellable(Cancellable c) {
setDisposable(new CancellableDisposable(c));
}
@Override
public ObservableEmitter<T> serialize() {
return new SerializedEmitter<T>(this);
}
//解除訂閱關(guān)系
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
//是否已解除訂閱關(guān)系
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
}
2.操作符map:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
//RxJavaPlugins.onAssembly有關(guān)hook,實際就是返回了ObservableMap對象,上面講過
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
--->ObservableMap
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
//保存上游Obserable
super(source);
//保存提供實際轉(zhuǎn)換操作的外部回調(diào)對象
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
//這里創(chuàng)建了一個Observer,用于訂閱上游,這樣數(shù)據(jù)才能鏈?zhǔn)絺鬟f,但是它只是一個中間代理,用于接受上游數(shù)據(jù),但是還需要轉(zhuǎn)換并且傳遞到下游。所以傳進去下游Observer t和回調(diào)的 function對象。
source.subscribe(new MapObserver<T, U>(t, function));
}
}
從第一個創(chuàng)建的Obserable說起,它要做的工作是調(diào)用下游的onNext等等方法傳遞事件。那么這需要一個Observer對象,但是現(xiàn)在我們做了中間操作,事件需要經(jīng)過處理,因此就需要在本節(jié)點Obserable內(nèi)部維護一個代理Observer用于訂閱上游的事件,然后完成特定的操作如map數(shù)據(jù)類型轉(zhuǎn)換。再繼續(xù)調(diào)用下游的Observer.onNext等方法,將事件傳遞下去。
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
//保存下游observer
super(actual);
//保存外部轉(zhuǎn)換操作的回調(diào)對象
this.mapper = mapper;
}
@Override
public void onNext(T t) {
//當(dāng)onComplete或onError事件發(fā)生后,done為true,是開關(guān)
if (done) {
return;
}
//sourceMode默認為NONE
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
//數(shù)據(jù)轉(zhuǎn)換
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
//將數(shù)據(jù)傳遞給調(diào)用下游
actual.onNext(v);
}
}
在中間插了個map操作符的Rx鏈子:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
}
}).map(new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
return null;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String integer) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
Obserable--->map--->Observer
Obserable鏈子的產(chǎn)生是從上游到下游,每個方法都是產(chǎn)生一個Obserable,每個下游的Obserable在創(chuàng)建時就保存了上游的Obserable。事件訂閱動作肯定是發(fā)生在最后一個Obserable。每次Obserable的subcribe動作都是直接調(diào)用的subscribeActual方法
map Obserable訂閱observer:
ObservableMap.subcribe(observer);
再到-->
@Override
public void subscribeActual(Observer<? super U> t) {
//這個suorce哪來的,不就是ObservableMap創(chuàng)建時傳進來的上游Obserable嗎
source.subscribe(new MapObserver<T, U>(t, function));
}
訂閱到這里還不夠啊,因為數(shù)據(jù)源在最頂部d的Obserable,于是必須要創(chuàng)建中間代理Observer訂閱上游Obserable,接受上游的事件。
@Override
protected void subscribeActual(Observer<? super T> observer) {
//將下游Observer保存
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//開關(guān)也是這里傳遞下去的
observer.onSubscribe(parent);
try {
//最終訂閱到了這里,就是我們數(shù)據(jù)發(fā)射的源頭,外部回調(diào)對象
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
于是就這樣,訂閱動作從鏈子最底部傳到了最頂部的Obserable。接著畫風(fēng)一轉(zhuǎn),訂閱流程結(jié)束,開始事件發(fā)射流程:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
//數(shù)據(jù)發(fā)射
e.onNext(1);
}
}).
而在發(fā)射器的onNext方法里面:
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
//開始向下游傳遞事件了,這個Observer顯然就是ObservableMap里面的代理Observer。
observer.onNext(t);
}
--->ObservableMap.onNext,事件已經(jīng)從上一個Obserable傳遞到了ObservableMap
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
//做完數(shù)據(jù)類型轉(zhuǎn)換
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
//繼續(xù)調(diào)用下游onNext向下游發(fā)送事件,這里顯然就是最終的Observer
actual.onNext(v);
}
總結(jié):先從下往上訂閱,再從上往下發(fā)送。

3.線程調(diào)度
- subcribeOn():
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
//創(chuàng)建一個代理Observer,同時又是一個Disposed開關(guān)
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//開關(guān)傳遞到下游
s.onSubscribe(parent);
//將線程調(diào)度返回一個Disposed開關(guān),方便對線程進行控制管理
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
//這里好像并沒有suorce.subcribe(s);
}
}
--->SubscribeTask
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
//訂閱在這里,注意,是將subscrible操作放到了線程當(dāng)中哦,想想后面哪些操作在這個線程中。
source.subscribe(parent);
}
}
}
--->scheduleDirect
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
再到
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//這里由選擇的線程決定,會獲得不同的Worker,subscribeOn(Schedulers.io())
final Worker w = createWorker();
//先不管
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//將線程加入Disposed控制,以便在取消訂閱時,能及時關(guān)閉線程
DisposeTask task = new DisposeTask(decoratedRun, w);
//任務(wù)開始執(zhí)行
w.schedule(task, delay, unit);
return task;
}
--->DisposeTask
//就是個包裝過后的任務(wù),實現(xiàn)了Disposed接口,實現(xiàn)了開關(guān)管理的控制
static final class DisposeTask implements Runnable, Disposable {
//要實施的任務(wù)
final Runnable decoratedRun;
//線程工作
final Worker w;
Thread runner;
DisposeTask(Runnable decoratedRun, Worker w) {
this.decoratedRun = decoratedRun;
this.w = w;
}
@Override
public void run() {
runner = Thread.currentThread();
try {
decoratedRun.run();
} finally {
//運行完了必須切斷
dispose();
runner = null;
}
}
//關(guān)閉線程
@Override
public void dispose() {
if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
((NewThreadWorker)w).shutdown();
} else {
w.dispose();
}
}
@Override
public boolean isDisposed() {
return w.isDisposed();
}
}
選中Worker,Ctrl+H查看他的繼承樹。隨便找個具體看看。

看看NewThreadWorker關(guān)鍵源碼:
//線程調(diào)度的最終方法
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//包裝任務(wù)
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
//一般為null,忽略
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
//加入線程池
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
//記得之前在創(chuàng)建worker時就將它加入了Disposed管理。這里控制了線程調(diào)度
@Override
public void dispose() {
if (!disposed) {
disposed = true;
executor.shutdownNow();
}
}
再來看看:
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
@Override
public void onNext(T t) {
//代碼執(zhí)行到這里,一定是在某個調(diào)度的線程當(dāng)中,但是不一定就是在這次調(diào)度的線程,因為它不一定就是離頂部最近的subscribeOn,因為線程任務(wù)都會加入Disposed管理,因此這里不需要判斷了
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
前面知道了整個事件鏈子是先從下往上訂閱,再從上往下發(fā)射。source.subscribe(parent);這句代碼發(fā)生在調(diào)度線程中。因此在后面的所有操作都是發(fā)生在了這個調(diào)度線程當(dāng)總中,這也就解釋了為什么多個subscribeOn()只有第一個有效,因為subscribeOn()是在訂閱的鏈子上,所有的發(fā)射操作都是在訂閱的后面,自然發(fā)射操作也就只受離頂部最近的subscribeOn的影響了。而下面的subscribeOn只是影響了一些訂閱操作而已,但是這我們察覺不出來,并不關(guān)心。
- ObserveOn():
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
//這里好像沒有RxJavaPlugins.onAssembly()那家伙了。。。bufferSize()是一個常量,緩沖池的大小
return observeOn(scheduler, false, bufferSize());
}
再看
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
//又來了。。。
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
//保存上游Obserable
super(source);
//保存Scheduler
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
//特殊調(diào)度器,暫不做考慮
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//直接創(chuàng)建Worker
Scheduler.Worker w = scheduler.createWorker();
//訂閱
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
}
--->ObserveOnObserver
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
private static final long serialVersionUID = 6576896619930983584L;
final Observer<? super T> actual;
final Scheduler.Worker worker;
final boolean delayError;
final int bufferSize;
//緩沖隊列,上游發(fā)射過來的事件都會先存到這里,然后在這里取事件發(fā)射給下游
SimpleQueue<T> queue;
Disposable s;
//存儲異常
Throwable error;
//是否已完成
volatile boolean done;
volatile boolean cancelled;
int sourceMode;
boolean outputFused;
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onNext(T t) {
//判斷是否終止
if (done) {
return;
}
//true
if (sourceMode != QueueDisposable.ASYNC) {
//將事件加入隊列
queue.offer(t);
}
//調(diào)度線程,注意啊,onNext方法是在發(fā)射鏈子上的,因此可以想到,ObserveOn()影響發(fā)射過程,且只影響后面的發(fā)射操作
schedule();
}
void schedule() {
//原子操作,自增然后返回原值
if (getAndIncrement() == 0) {
//把自己當(dāng)做任務(wù),傳了進去
worker.schedule(this);
}
}
}
--->run
@Override
public void run() {
//默認false
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
--->drainNormal
void drainNormal() {
int missed = 1;
//事件緩沖隊列
final SimpleQueue<T> q = queue;
//下游
final Observer<? super T> a = actual;
for (;;) {
//檢查是否已終結(jié)
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
//事件出列
v = q.poll();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
//再次檢查是否終結(jié)
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
//將事件發(fā)射給下游
a.onNext(v);
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
總結(jié):subscribeOn在訂閱鏈子上執(zhí)行,observeOn在發(fā)射鏈子上執(zhí)行,影響的操作

但是有些操作或方法比較特別點:doOnSubscribe()和onSubscribe()
doOnSubscribe是指在onSubscribe()發(fā)生之前調(diào)用。
看看ObserableCreate:
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//在這里將Dispose開關(guān)傳遞下去,
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
一般的操作符也是規(guī)規(guī)矩矩的將這個Dispose開關(guān)一樣的傳下去,例如ObserableMap里面的代理Observer:
@SuppressWarnings("unchecked")
@Override
public final void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {
this.qs = (QueueDisposable<T>)s;
}
if (beforeDownstream()) {
//直接傳遞
actual.onSubscribe(this);
afterDownstream();
}
}
}
但是,看看subscribeOn里面的代理Observer的代碼:
@Override
public void onSubscribe(Disposable s) {
//沒有直接傳遞dispose開關(guān),只是對上游的開關(guān)做了設(shè)置
DisposableHelper.setOnce(this.s, s);
}
那么我們的下游需要的開關(guān)在哪里呢?
ObservableSubscribeOn
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//自己創(chuàng)建了一個dispose傳給了下游,并且在下面的線程調(diào)度之前執(zhí)行
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
可以看到,在訂閱階段,并且在線程調(diào)度之前執(zhí)行。
因此我們可以得出結(jié)論:
如果有subscribeOn在doOnSubscribe()的下面,那么doOnSubscribe()和onSubscribe()都執(zhí)行在下面最近的subscribeOn指定的線程里,否則執(zhí)行在默認線程里面。ObserveOn不對doOnSubscribe()和onSubscribe()造成任何影響,因為前面說過,ObserveOn只對訂閱之后的發(fā)射階段可能造成影響。