RxJava源碼分析之線程調(diào)度(一)

RxJava強(qiáng)大的地方之一是他的鏈?zhǔn)秸{(diào)用,輕松地在線程之間進(jìn)行切換。這幾天也大概分析了一下RxJava的線程切換的主流程于是打算寫一篇文章及記錄一下。

我們使用RxJava進(jìn)行線程切換的場景很多時(shí)候都是在進(jìn)行網(wǎng)絡(luò)請求的時(shí)候,在IO線程進(jìn)行網(wǎng)絡(luò)數(shù)據(jù)的請求處理,最后在Android的主線程進(jìn)行請求數(shù)據(jù)的結(jié)果處理。

.subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())

當(dāng)然因?yàn)檫@段代碼的使用場景太多我們還可以利用ObservableTransformer操作符對其進(jìn)行簡化

   public static <T>ObservableTransformer<T,T> io_main()
    {
        return new ObservableTransformer<T, T>() {
            @Override
            public ObservableSource<T> apply(@NonNull Observable<T> upstream) {
                return upstream.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
            }
        };
    }

這樣我們在使用的時(shí)候就是這樣的:

.compose(RxTransformUtil.<Object>io_main())

是不是感覺方便了一丟丟

好了扯遠(yuǎn)了,現(xiàn)在來分析一下RxJava是如何做到線程的輕松調(diào)度的。
首先有幾個(gè)概念是非常重要的:
Scheduler官方的解釋是這樣的

A Scheduler is an object that specifies an API for scheduling units of work with or without delays or periodically. 

初步看來Scheduler就是一個(gè)任務(wù)調(diào)度器相當(dāng)于就是一個(gè)調(diào)度中心的指揮者。當(dāng)然它是一個(gè)抽象類就肯定了Scheduler有很多具體的實(shí)現(xiàn)類,例如IO線程的具體調(diào)度器就是IoScheduler。就像調(diào)度中心指揮者有客運(yùn)中心的指揮者,有機(jī)場中心的指揮者一樣分別有不同的實(shí)現(xiàn)類。
當(dāng)然現(xiàn)在只有指揮者是肯定不行的,光頭司令怎么得行?這個(gè)時(shí)候關(guān)鍵的Worker類出現(xiàn)了,Worker官方的解釋是這樣的

Sequential Scheduler for executing actions on a single thread or event loop.
Disposing the Scheduler.Worker cancels all outstanding work and allows resource cleanup.

可以看到Worker就是線程任務(wù)的具體執(zhí)行者了。和Scheduler一樣Worker同樣也是抽象類,在不同的Scheduler具體實(shí)現(xiàn)類里面Worker也有自己的具體實(shí)現(xiàn)類,例如在IoScheduler類里面,Worker的具體實(shí)現(xiàn)類就是EventLoopWorker,它負(fù)責(zé)管理IO線程的具體操作,接下來我們就找到切入點(diǎn)看一看RxJava源碼里面都做了什么。

這里我們就以最典型的IO線程和主線程之間的切換為例來分析,線程切換的代碼就是上面的代碼。
Scheduler是以工廠方法對外提供它具體的實(shí)現(xiàn)類的。Schedulers.io()可以提供一個(gè)IoScheduler的對象。你可以往里面看最后源碼是如何進(jìn)行IoScheduler的創(chuàng)建的

//創(chuàng)建IoScheduler
static final class IoHolder {
        static final Scheduler DEFAULT = new IoScheduler();
    }
//接著就行了IoScheduler的一系列初始化,CachedWorkerPool地初始化 ,并由RxThreadFactory進(jìn)行線程地創(chuàng)建,線程優(yōu)先級別設(shè)置,是否是守護(hù)進(jìn)程等等

現(xiàn)在IoScheduler有了,我們就看subscribe里面到底做了什么

public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

Hook我們不用管,可以看到是把當(dāng)前ObservableCreater對象和IoScheduler一起傳進(jìn)了ObservableSubscribeoOn的構(gòu)造函數(shù)里面。進(jìn)入到ObservableSubscribeOn里面看看。

//AbstractObservableWithUpstream只是用來保存上游的源事件流的,就是保存剛剛傳入進(jìn)來的ObservableCreater
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
//裝飾模式 把下游的Observer裝飾成SubscribeOnObserver
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);     //執(zhí)行下游Observer的onSubscribe(Disposable disposabel)方法,當(dāng)前線程是上游的執(zhí)行線程
        s.onSubscribe(parent);
//開啟的子線程最終是以帶Disposable的返回值返回的
//在這里是將子線程加入管理,因?yàn)檫@里是并發(fā)操作所以使用了AtomicReference<Object>的院子操作類,是一種效率高于synchronized的樂觀鎖,感興趣的可以自行上網(wǎng)搜索
//我們只用知道這里加入管理了以后方便在以后我們切斷上下游的時(shí)候可以將我們的子線程一同dispose().
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> actual;

        final AtomicReference<Disposable> s;

        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }
     
    //這中間的代碼和最基本的鏈?zhǔn)秸{(diào)用關(guān)系是一樣的,只不過在onNext、onError、onComplete中實(shí)際上還是調(diào)用的下游真正的onNext、onError、onComplete

        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }
        @Override
        public void onError(Throwable t) {
            actual.onError(t);
        }
        @Override
        public void onComplete() {
            actual.onComplete();
        }
              void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
        }
    }
//這就是實(shí)際執(zhí)行的Runnable 會把其傳入IoScheduler中供Worker使用。
    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
/*看到了吧,SubscribeOnObserver 作用其實(shí)就是將源事件流發(fā)生的地點(diǎn)和下游的事件流處理的地點(diǎn)訂閱在了子線程中進(jìn)行處理。
這樣上游發(fā)送事件流的地方就被切換到了子線程中。*/
            source.subscribe(parent);
        }
    }
}

接下來我們仔細(xì)看一下上面代碼的這一段:

 @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        s.onSubscribe(parent);
//這里scheduler.schedlerDirect非常的重要,可以看到RxJava把剛剛包裝好的Runnable對象傳入了方法里
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

我們跟進(jìn)去看一下里面的具體實(shí)現(xiàn)

 @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
//實(shí)際上是調(diào)用的下面3個(gè)參數(shù)的方法,延遲時(shí)間為0
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
 @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//創(chuàng)建具體的Worker類
        final Worker w = createWorker();
//hook函數(shù)我們不用管,只要沒有設(shè)置依舊返回的是傳入的Runnable
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//將runnable和worker封裝到DisposeTask中
        DisposeTask task = new DisposeTask(decoratedRun, w);
//執(zhí)行Worker的schedule方法具體的就是EventLoopWorker里面的schedule方法
        w.schedule(task, delay, unit);

        return task;
    }

接下來我們來看一下EventLoopWorker里面的schedule方法是怎么實(shí)現(xiàn)的

 @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            //判斷是否解除訂閱
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }
            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }

可以看到這里如果沒有被解除訂閱的話又會執(zhí)行到NewThreadWorker的scheduleActual方法里面。

@NonNull
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        //hook函數(shù)我們這里不用管decoratedRun依然是傳進(jìn)來的Runnable對象run
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        //ScheduledRunnable是一個(gè)即實(shí)現(xiàn)了Runnable接口又實(shí)現(xiàn)了Callable接口的對象,為了后面能成功加入到線程池當(dāng)中    
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
        //將sr加入到CompositeDisposable中,方便管理
        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }
     
        Future<?> f;
        try {
            if (delayTime <= 0) {
              //將sr加入到線程池當(dāng)中 并將線程的執(zhí)行結(jié)果返回給 Future<?> f
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);//對運(yùn)行結(jié)果進(jìn)行處理
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                //在CompositeDisposable中一處剛剛加入的sr
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }
  
        return sr;
    }

接下來看一下ScheduledRunnable是如何對返回的結(jié)果進(jìn)行處理的

  public void setFuture(Future<?> f) {
//一個(gè)死循環(huán)會一直判斷返回回來的結(jié)果 因?yàn)槠鋵?shí)原子操作類,樂觀鎖的機(jī)制決定了如果不是想要的結(jié)果的話會重新執(zhí)行一次
        for (;;) {
            Object o = get(FUTURE_INDEX);
            if (o == DONE) {
                //完成直接return
                return;
            }
              //如果取消訂閱了則直接取消線程任務(wù)
            if (o == DISPOSED) {
                f.cancel(get(THREAD_INDEX) != Thread.currentThread());
                return;
            }
            //前兩者都不滿足的話 就將future的值存下來
            if (compareAndSet(FUTURE_INDEX, o, f)) {
                return;
            }
        }
    }

到現(xiàn)在為止上游的線程切換大體的流程就分析的差不多了,我們從源碼中也可以分析出很多網(wǎng)上經(jīng)常說的一些結(jié)論,最經(jīng)典的一條就是上游切換線程只有第一次生效,后面的線程切換都不起作用了,其實(shí)分析這點(diǎn)最重要的就是理解 ObservableSubscribeOn類里面下面的這段代碼了

final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;
        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }
        @Override
        public void run() {
            source.subscribe(parent);
        }
    }

再結(jié)合RxJava的鏈?zhǔn)讲僮鳎幚頂?shù)據(jù)的時(shí)候是自下而上,而發(fā)射數(shù)據(jù)的時(shí)候是自上而下(這句話網(wǎng)上說的太多了,我最開始也是不理解,只有自己真正看過源碼分析了,自己Debug一邊才能真正地理解)。
好了先寫到這里了,剩下的內(nèi)容我會放到另外一篇博客里面,感覺文章太長不利于閱讀。

這篇文章也是我第一次試著去分析源碼最后寫出的,很多都是我自己的理解,所以肯定有不妥當(dāng)或者錯(cuò)誤的地方希望大家看到了以后能給我指出來,我一定改正!

最后

沒有最后了 大家再見~~~

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容