首先看下我們RxJava的常規(guī)使用方法
代碼A 調(diào)用類(lèi)
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext(XXX);
e.onComplete();
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.e("tag", s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
1、Schedulers.io()為IoScheduler
下面看下subscribeOn(Schedulers.io())這個(gè)方法,把代碼貼出來(lái)
代碼B ObservableCreate類(lèi)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
最后真正執(zhí)行的是ObservableSubscribeOn類(lèi)中的subscribeActual方法
代碼C ObservableSubscribeOn類(lèi)
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
形參 s 為觀察者,在外層執(zhí)行Observable.subscribe(Observer)之后,觀察者的onSubscribe()方法是首先會(huì)被調(diào)用的,調(diào)用位置便在 代碼C 中的s.onSubscribe(parent)。
重點(diǎn)來(lái)看一下 代碼C 中的最后一行代碼。首先看下SubscribeTask類(lèi)。
代碼D ObservableSubscribeOn類(lèi)
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
實(shí)際上SubscribeTask就是一個(gè)Runnable類(lèi),在其run方法中,執(zhí)行了source.subscribe(parent);其中source就是我們 代碼B 中new ObservableSubscribeOn<T>(this, scheduler)傳入的this,在這里也就是ObservableCreate類(lèi),parent就是 代碼C 中s的包裝類(lèi),在這里可以看成是觀察者類(lèi)。
接著看下 代碼C 中的 scheduler.scheduleDirect(new SubscribeTask(parent))
//scheduleDirect源碼如下
代碼E Scheduler類(lèi)(IoScheduler)
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
createWorker() 方法是一個(gè)抽象方法,IoScheduler類(lèi)的具體實(shí)現(xiàn)如下,new了一個(gè) EventLoopWorker
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
pool.get()是從線程池CachedWorkerPool中取一個(gè)線程N(yùn)ewThreadWorker
重點(diǎn)是在 代碼E中的w.schedule(task, delay, unit),即eventLoopWorker.schedule方法,一直跟下去,到最終調(diào)用處如下
代碼F NewThreadWorker類(lèi)
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
因?yàn)槲覀円矝](méi)有設(shè)置delay時(shí)間,所以我們重點(diǎn)看下executor.submit((Callable<Object>)sr);
經(jīng)過(guò)層層傳遞,其中傳入的參數(shù)sr即為 代碼C 中的SubscribeTask。此時(shí)此刻,基本上都清晰了,在這里執(zhí)行submit放法,實(shí)際上就是執(zhí)行我們 代碼D 中的source.subscribe(parent);由于此處是在子線程中調(diào)用,所以能滿足在最外層調(diào)用
subscribeOn(Schedulers.io())之前的上游代碼都運(yùn)行在子線程中。
說(shuō)到上游,我們回到代碼A。ObservableSubscribeOn的上游便是我們通過(guò)方法Observable.create()創(chuàng)建的ObservableCreate類(lèi)。
代碼D 中的source.subscribe(parent)便相當(dāng)于直接調(diào)到了ObservableCreate類(lèi)的subscribe()方法。相同的套路,最終都會(huì)調(diào)到ObservableCreate類(lèi)的subscribeActual()方法。
代碼G ObservableCreate類(lèi)
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
source.subscribe(parent)中的source即為 代碼A 中Observable.create(new ObservableOnSubscribe<String>())方法傳入的ObservableOnSubscribe類(lèi)。調(diào)用source.subscribe()方法,即調(diào)用 ObservableOnSubscribe 類(lèi)中的 subscribe() 方法。
ObservableOnSubscribe為一個(gè)接口,代碼如下:
public interface ObservableOnSubscribe<T> {
/**
* Called for each Observer that subscribes.
* @param e the safe emitter instance, never null
* @throws Exception on error
*/
void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}
這下便回到了我們熟悉的外部調(diào)用。其中形參 e 便為 代碼G 中第9行傳入的parent,即observer的包裝類(lèi)。
e.onNext(XXX)方法的分析請(qǐng)看下一篇文章RxJava線程切換之observeOn源碼分析