RxJava 線程切換之subscribeOn源碼分析

首先看下我們RxJava的常規(guī)使用方法

代碼A 調(diào)用類(lèi)

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        e.onNext(XXX);
        e.onComplete();
    }
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
    }

    @Override
    public void onNext(String s) {
        Log.e("tag", s);
    }

    @Override
    public void onError(Throwable e) {
    }

    @Override
    public void onComplete() {
    }

});

1、Schedulers.io()為IoScheduler

下面看下subscribeOn(Schedulers.io())這個(gè)方法,把代碼貼出來(lái)

代碼B ObservableCreate類(lèi)

public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

最后真正執(zhí)行的是ObservableSubscribeOn類(lèi)中的subscribeActual方法

代碼C ObservableSubscribeOn類(lèi)

@Override

public void subscribeActual(final Observer<? super T> s) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    s.onSubscribe(parent);
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

形參 s 為觀察者,在外層執(zhí)行Observable.subscribe(Observer)之后,觀察者的onSubscribe()方法是首先會(huì)被調(diào)用的,調(diào)用位置便在 代碼C 中的s.onSubscribe(parent)。

重點(diǎn)來(lái)看一下 代碼C 中的最后一行代碼。首先看下SubscribeTask類(lèi)。

代碼D ObservableSubscribeOn類(lèi)

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }

    @Override
    public void run() {
        source.subscribe(parent);
    }
}

實(shí)際上SubscribeTask就是一個(gè)Runnable類(lèi),在其run方法中,執(zhí)行了source.subscribe(parent);其中source就是我們 代碼B 中new ObservableSubscribeOn<T>(this, scheduler)傳入的this,在這里也就是ObservableCreate類(lèi),parent就是 代碼C 中s的包裝類(lèi),在這里可以看成是觀察者類(lèi)。

接著看下 代碼C 中的 scheduler.scheduleDirect(new SubscribeTask(parent))

//scheduleDirect源碼如下

代碼E Scheduler類(lèi)(IoScheduler)

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    DisposeTask task = new DisposeTask(decoratedRun, w);
    w.schedule(task, delay, unit);
    return task;
}

createWorker() 方法是一個(gè)抽象方法,IoScheduler類(lèi)的具體實(shí)現(xiàn)如下,new了一個(gè) EventLoopWorker
public Worker createWorker() {
    return new EventLoopWorker(pool.get());
}

pool.get()是從線程池CachedWorkerPool中取一個(gè)線程N(yùn)ewThreadWorker

重點(diǎn)是在 代碼E中的w.schedule(task, delay, unit),即eventLoopWorker.schedule方法,一直跟下去,到最終調(diào)用處如下

代碼F NewThreadWorker類(lèi)

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {

    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    if (parent != null) {
        if (!parent.add(sr)) {
            return sr;
        }
    }

    Future<?> f;
    try {
        if (delayTime <= 0) {
            f = executor.submit((Callable<Object>)sr);
        } else {
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        if (parent != null) {
            parent.remove(sr);
        }
        RxJavaPlugins.onError(ex);
    }

    return sr;
}

因?yàn)槲覀円矝](méi)有設(shè)置delay時(shí)間,所以我們重點(diǎn)看下executor.submit((Callable<Object>)sr);

經(jīng)過(guò)層層傳遞,其中傳入的參數(shù)sr即為 代碼C 中的SubscribeTask。此時(shí)此刻,基本上都清晰了,在這里執(zhí)行submit放法,實(shí)際上就是執(zhí)行我們 代碼D 中的source.subscribe(parent);由于此處是在子線程中調(diào)用,所以能滿足在最外層調(diào)用
subscribeOn(Schedulers.io())之前的上游代碼都運(yùn)行在子線程中。

說(shuō)到上游,我們回到代碼A。ObservableSubscribeOn的上游便是我們通過(guò)方法Observable.create()創(chuàng)建的ObservableCreate類(lèi)。

代碼D 中的source.subscribe(parent)便相當(dāng)于直接調(diào)到了ObservableCreate類(lèi)的subscribe()方法。相同的套路,最終都會(huì)調(diào)到ObservableCreate類(lèi)的subscribeActual()方法。

代碼G ObservableCreate類(lèi)

@Override

protected void subscribeActual(Observer<? super T> observer) {

    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    observer.onSubscribe(parent);
    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

source.subscribe(parent)中的source即為 代碼A 中Observable.create(new ObservableOnSubscribe<String>())方法傳入的ObservableOnSubscribe類(lèi)。調(diào)用source.subscribe()方法,即調(diào)用 ObservableOnSubscribe 類(lèi)中的 subscribe() 方法。

ObservableOnSubscribe為一個(gè)接口,代碼如下:

public interface ObservableOnSubscribe<T> {

    /**
    * Called for each Observer that subscribes.
    * @param e the safe emitter instance, never null
    * @throws Exception on error
    */
    void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;

}

這下便回到了我們熟悉的外部調(diào)用。其中形參 e 便為 代碼G 中第9行傳入的parent,即observer的包裝類(lèi)。

e.onNext(XXX)方法的分析請(qǐng)看下一篇文章RxJava線程切換之observeOn源碼分析

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容