理解RxJava(三)線程調(diào)度原理分析

概述

在我的上一篇文章 《理解RxJava(二)操作符流程原理分析》 中,分析了依靠多個操作符鏈?zhǔn)秸{(diào)用的原理。

簡單總結(jié)如下:

1.創(chuàng)建:訂閱前,每一步都生成對應(yīng)的Observable對象,中間的每一步都將上游的Observable存儲;
2.訂閱: 每一步都會生成對應(yīng)的Observer對上一步生成并存儲的Observable進(jìn)行訂閱。訂閱的執(zhí)行順序是由下到上的。
3.執(zhí)行:先執(zhí)行每一步傳入的函數(shù)操作,然后將操作后的數(shù)據(jù)交給下游的Observer繼續(xù)處理。 數(shù)據(jù)的傳遞和處理順序是由上到下的。

本文我將嘗試對RxJava最核心的 線程調(diào)度 的原理進(jìn)行分析。

基本代碼

來看一下基本代碼:

    @Test
    public void test() throws Exception {
        Observable.create((ObservableOnSubscribe<Integer>) e -> {
            e.onNext(1);
            e.onNext(2);
            e.onComplete();
        }).subscribeOn(Schedulers.io())
          .observeOn(AndroidSchedulers.mainThread())
          .subscribe(i -> System.out.println("onNext : i= " + i));
    }

很簡單,即訂閱時將task交給子線程去做,而數(shù)據(jù)的回調(diào)則在Android主線程中執(zhí)行。

一、subscribeOn()

點(diǎn)擊查看源碼:

    public final Observable<T> subscribeOn(Scheduler scheduler) {
        //非空判斷和hook
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

有了前兩篇文章的基礎(chǔ),我們很清楚,排除掉非空判斷和hook相關(guān)邏輯,實(shí)際上這個方法返回了一個ObservableSubscribeOn對象。

我們有理由猜測這個ObservableSubscribeOn應(yīng)該和上文的ObservableMap及ObservableDoOnEach相似,都是Observable的一個包裝類(裝飾器):

//1.和上文基本一樣,ObservableSubscribeOn也是Observable的一個裝飾器
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
       //2.存儲上游的ObservableSource和調(diào)度器
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        //3.new 一個SubscribeOnObserver
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        //4.回調(diào)方法,這說明下游的onSubscribe回調(diào)方法所在線程和線程調(diào)度無關(guān)
        //  是訂閱時所在的線程
        s.onSubscribe(parent);

        //5.立即執(zhí)行線程調(diào)度
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
}

前兩步我們不需要 再多解釋,直接看第三點(diǎn),我們看看SubscribeOnObserver這個類:

SubscribeOnObserver

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        private static final long serialVersionUID = 8094547886072529208L;
        //下游的Observer
        final Observer<? super T> actual;
        //保存上游的Disposable,自身dispose時,連同上游一起dispose
        final AtomicReference<Disposable> s;

        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }

        @Override
        public void onSubscribe(Disposable s) {
            DisposableHelper.setOnce(this.s, s);
        }

        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }

        @Override
        public void onError(Throwable t) {
            actual.onError(t);
        }

        @Override
        public void onComplete() {
            actual.onComplete();
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(s);
            DisposableHelper.dispose(this);
        }

類似Observable和ObservableMap,SubscribeOnObserver同樣是DisposableObserver的一個裝飾器,提供了對下游數(shù)據(jù)的傳遞,以及將task dispose的接口。

第4步我們之前就講過了,直接看第5步:

         //5.立即執(zhí)行線程調(diào)度
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

我們看看SubscribeTask這個類:

SubscribeTask

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }

        @Override
        public void run() {
            source.subscribe(parent);
        }
    }

難以置信的簡單,SubscribeTask 僅僅是一個Runnable 接口的實(shí)現(xiàn)類而已,通過將SubscribeOnObserver作為參數(shù)存起來,在run()方法中添加了上游Observable的被訂閱事件,就沒有了別的操作,

接下來我們看一下scheduler.scheduleDirect(SubscribeTask)中的代碼:

public abstract class Scheduler {
    //...
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }

    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        // Worker 本身就是Disposable 的實(shí)現(xiàn)類
        // 請注意, createWorker()所創(chuàng)建的worker,
        // 實(shí)際就是Schdulers.io()所提供的IoScheduler所創(chuàng)建的worker
        final Worker w = createWorker();
        
        //hook相關(guān)
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);
        
        //即 worker.schedule(task, 0, TimeUnit.NANOSECONDS): 立即執(zhí)行task
        w.schedule(task, delay, unit);

        return task;
    }
    //...
}

我們不要追究過深,我們看一下這個createWorker方法的注釋說明:

    /**
     * Retrieves or creates a new {@link Scheduler.Worker} that represents serial execution of actions.
     * 檢索或創(chuàng)建一個新的{@link Scheduler.Worker}表示一系列的action
     * 
     * When work is completed it should be unsubscribed using {@link Scheduler.Worker#dispose()}.
     * 當(dāng)work完成后,應(yīng)使用{@link Scheduler.Worker#dispose()}取消訂閱。
     * 
     * Work on a {@link Scheduler.Worker} is guaranteed to be sequential.
     *  {@link Scheduler.Worker} 上面的work保證是順序執(zhí)行的
     */

現(xiàn)在我們知道了:

我們通過調(diào)用subscribeOn()傳入Scheduler,當(dāng)下游ObservableSource被訂閱時(請注意,訂閱順序是由下到上的),距離最近的線程調(diào)度subscribeOn()方法中,保存的Scheduler會創(chuàng)建一個worker(對應(yīng)相應(yīng)的線程,本文中為IoScheduler),在其對應(yīng)的線程中,立即執(zhí)行task

關(guān)于對應(yīng)的Worker相關(guān)和IoScheduler相關(guān),篇幅所限,本文不做細(xì)講,有興趣的同學(xué)可以自行研究。

多次subscribeOn()

現(xiàn)在考慮一個問題,假如在我們的代碼中,多次使用了subscribeOn()代碼,到線程會怎么處理呢?

上文已經(jīng)講到了,不管我們怎么通過subscribeOn()方法切換線程,由于訂閱執(zhí)行順序是由下到上,因此當(dāng)最上游的ObservableSource被訂閱時,所在線程當(dāng)然是距離上游最近的subscribeOn()所提供的線程,即最終Observable總是在第一個subscribeOn()所在的線程中執(zhí)行。

二、observeOn()

先看observeOn()內(nèi)部,果然是hook+Observable的包裝類:

    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }

    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");

        //實(shí)例化ObservableObserveOn對象并返回
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

再看ObservableObserveOn:

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        //1.相關(guān)依賴注入
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            //2.創(chuàng)建主線程的worker
            Scheduler.Worker w = scheduler.createWorker();
            //3.上游數(shù)據(jù)源被訂閱
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
}

和subscribeOn()不同的是,我們并不是立即在對應(yīng)的線程執(zhí)行task,而是將對應(yīng)的線程(實(shí)際上是worker)作為參數(shù),實(shí)例化ObserveOnObserver并存儲起來。

當(dāng)上游的數(shù)據(jù)傳遞過來時,ObserveOnObserver執(zhí)行對應(yīng)的方法,比如onNext(T),再切換到對應(yīng)線程中,并交由下游的Observer去接收:

ObserveOnObserver

ObserveOnObserver中代碼極多,我們簡單了解原理后,以onNext(T)為例:

static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
    implements Observer<T>, Runnable {

        //...省略其他代碼
        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
        //隊(duì)列
        SimpleQueue<T> queue;

       @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            //將數(shù)據(jù)存入隊(duì)列
            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            } 
            //對應(yīng)線程取出數(shù)據(jù)并交由下游的Observer
            schedule();
        }

        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }
         //...省略其他代碼
}

多次observerOn()

由上文得知,與subscribeOn()相反,observerOn()操作會將切換到對應(yīng)的線程,然后交由下游的Observer處理,因此observerOn()僅對下游的Observer生效,并且,如果多次調(diào)用,observerOn()的線程調(diào)度會持續(xù)到下一個observerOn()操作之前。

我在《RxJava(11-線程調(diào)度Scheduler)》 @open-Xu文章中看到了這張圖片,完美詮釋了observerOn()的原理:

observerOn()的原理

注意:

接下來的內(nèi)容都是@open-Xu在他的文章《RxJava(11-線程調(diào)度Scheduler)》中總結(jié)的筆記,為了方便以后翻閱,特此轉(zhuǎn)載過來,下文的轉(zhuǎn)載請@原作者open-Xu并注明原文地址,謝謝!

調(diào)度器的種類

調(diào)度器類型 效果
Schedulers.computation(?) 用于計(jì)算任務(wù),如事件循環(huán)或和回調(diào)處理,不要用于IO操作(IO操作請使用Schedulers.io());默認(rèn)線程數(shù)等于處理器的數(shù)量
Schedulers.from(executor) 使用指定的Executor作為調(diào)度器
Schedulers.immediate(?) 在當(dāng)前線程立即開始執(zhí)行任務(wù)
Schedulers.newThread(?) 為每個任務(wù)創(chuàng)建一個新線程
Schedulers.io(?) 用于IO密集型任務(wù),如異步阻塞IO操作,這個調(diào)度器的線程池會根據(jù)需要增長;對于普通的計(jì)算任務(wù),請使用Schedulers.computation();Schedulers.io(?)默認(rèn)是一個CachedThreadScheduler,很像一個有線程緩存的新線程調(diào)度器
Schedulers.trampoline(?) 當(dāng)其它排隊(duì)的任務(wù)完成后,在當(dāng)前線程排隊(duì)開始執(zhí)行
AndroidSchedulers.mainThread(?) 主線程,UI線程,可以用于更新界面

總結(jié)

subscribeOn()

  • 訂閱順序當(dāng)從下到上,上游的ObservableSource被訂閱時,先切換線程,然后立即執(zhí)行task;
  • 當(dāng)存在多個subscribeOn()方法時,僅第一個subscribeOn()有效。

observerOn()

  • 訂閱順序當(dāng)從下到上,上游的ObservableSource被訂閱時,會將對應(yīng)的worker創(chuàng)建并作為構(gòu)造參數(shù)存儲在Observer的裝飾器中,并不會立即切換線程;
  • 當(dāng)數(shù)據(jù)由上游發(fā)送過來時,先將數(shù)據(jù)存儲到隊(duì)列中,然后切換線程,然后在新的線程中將數(shù)據(jù)發(fā)送給下游的Observer;
  • 當(dāng)存在多個observerOn()方法時,僅對距下游下一個observerOn()之前的observer有效

參考文章

1.《RxJava(11-線程調(diào)度Scheduler)》 @open-Xu

2.《RxJava2 源碼解析(二)》@張旭童

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 我從去年開始使用 RxJava ,到現(xiàn)在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy閱讀 5,777評論 7 62
  • 最近項(xiàng)目里面有用到Rxjava框架,感覺很強(qiáng)大的巨作,所以在網(wǎng)上搜了很多相關(guān)文章,發(fā)現(xiàn)一片文章很不錯,今天把這篇文...
    Scus閱讀 6,994評論 2 50
  • 原文地址:http://gank.io/post/560e15be2dca930e00da1083 前言 我從去年...
    AFinalStone閱讀 2,329評論 5 23
  • 前言 終究沒有經(jīng)受住RxJava的誘惑,只恨自己來的比較晚,走起~ RxJava 是什么? 一個在 Java VM...
    王永迪閱讀 4,396評論 3 37
  • 作者寄語 很久之前就想寫一個專題,專寫Android開發(fā)框架,專題的名字叫 XXX 從入門到放棄 ,沉淀了這么久,...
    戴定康閱讀 7,742評論 13 85

友情鏈接更多精彩內(nèi)容