詳解 RxJava2 的線程切換原理

轉(zhuǎn)載請標(biāo)明地址 QuincySx:[http://www.itdecent.cn/p/a9ebf730cd08 ]


讀了這篇文章你將會(huì)收獲什么

  • RxJava2 基本的運(yùn)行流程(并不會(huì)詳述)
  • RxJava2 線程切換原理
  • 為什么 subscribeOn() 只有第一次切換有效
  • RxAndroid 簡單分析

PS:建議您對(duì) RxJava 有一些了解或使用經(jīng)驗(yàn)再看此文章,推薦結(jié)合源碼品嘗
RxJava入門文章 [給 Android 開發(fā)者的 RxJava 詳解-扔物線(http://gank.io/post/560e15be2dca930e00da1083)

然后貼一下本篇文章分析的示例代碼

CompositeDisposable comDisposable = new CompositeDisposable();

protected void test() {
        Observable<String> observable = Observable
                .create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(@NonNull ObservableEmitter<String> emitter) throws
                            Exception {
                        emitter.onNext("hello");
                    }
                })
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String s) throws Exception {
                        return s;
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread());

        observable.subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                comDisposable.add(d);
            }

            @Override
            public void onNext(String s) {
                Log.i(TAG, s);
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });
}

RxJava2 基本的運(yùn)行流程

根據(jù)上述源碼分析出流程圖,這里顏色相同的代表同一對(duì)象。根據(jù)流程圖看一遍源碼基本流程就能理通

RxJava2 線程切換原理流程圖

RxJava2 線程切換原理

RxJava 切換線程怎么用我就不多說了請參考我的另一篇文章 Android:隨筆——RxJava的線程切換

一、observeOn() 的線程切換原理

根據(jù)運(yùn)行流程來看 observeOn() 執(zhí)行后是得到 ObservableObserveOn 對(duì)象,那么當(dāng) ObservableObserveOn 綁定監(jiān)聽者的時(shí)候要運(yùn)行 subscribe() 方法

public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);
        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
        //調(diào)用 subscribeActual()
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        ...
    }
}

接下來我們看一下 subscribeActual() 方法

protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        //scheduler 是傳進(jìn)來的線程調(diào)度對(duì)象,如 Schedulers.io() 、AndroidSchedulers.mainThread() 等,這里調(diào)用了 createWorker() 方法暫時(shí)看一下就好稍后分析 RxAndroid 會(huì)說明 
        Scheduler.Worker w = scheduler.createWorker();
        //我們看到他把 w 參數(shù)傳進(jìn)去了
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

從上述代碼我們可以看到 ObservableObserveOn 是被 ObserveOnObserver 監(jiān)聽的,所以收到通知也是由 ObserveOnObserver 作出響應(yīng),接下來我們假設(shè)當(dāng) Rxjava 發(fā)送 onNext 通知時(shí)會(huì)調(diào)用 ObserveOnObserver 的 onNext() 方法 ( PS:當(dāng)然如果是 onComplete()、onError() 等也是一樣的邏輯 ),然后我們來看一看 ObserveOnObserver 的 onNext() 方法,

@Override
public void onNext(T t) {
    if (done) {
        return;
    }
    if (sourceMode != QueueDisposable.ASYNC) {
        queue.offer(t);
    }
    //切換線程
    schedule();
}

void schedule() {
    if (getAndIncrement() == 0) {
        //直接調(diào)用了 worker 的 schedule 方法,需要注意的是這里他把自己傳了進(jìn)去
        worker.schedule(this);
    }
}

現(xiàn)在我先把把 schedule(Runnable run) 貼出來

public Disposable schedule(@NonNull Runnable run) {
    return schedule(run, 0L, TimeUnit.NANOSECONDS);
}
  1. 我們看到這個(gè)他接收的參數(shù)是一個(gè) Runnable,這是怎么回事呢,我們看一下 ObserveOnObserver 對(duì)象,他不但實(shí)現(xiàn)了 Observer 接口并且也實(shí)現(xiàn)了 Runnable 接口
  2. 接下看,繼續(xù)調(diào)用 schedule( Runnable action, long delayTime, TimeUnit unit) 方法,但是這個(gè)方法是個(gè)抽象方法,這里我們就假設(shè)這里這個(gè) worker 是 IO 線程,所以我直接貼 IoScheduler 的代碼了
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
     if (tasks.isDisposed()) {
         // don't schedule, we are unsubscribed
         return EmptyDisposable.INSTANCE;
     }
     return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}

然后再貼一下 scheduleActual 的方法

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    //就是個(gè) Runnable
    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
        
    if (parent != null) {
        if (!parent.add(sr)) {
            return sr;
        }
    }

    Future<?> f;
    try {
        //判斷延遲時(shí)間,然后使用線程池運(yùn)行 Runnable
        if (delayTime <= 0) {
            f = executor.submit((Callable<Object>)sr);
        } else {
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        if (parent != null) {
            parent.remove(sr);
        }
        RxJavaPlugins.onError(ex);
    }
    return sr;
}

這樣一來就會(huì)在相應(yīng)的線程中運(yùn)行 ObserveOnObserver 的 run 方法

public void run() {
    //這個(gè)地方具體的我還沒有搞明白,大概就是在這個(gè)方法里調(diào)用 onNext() ,然后 observeOn() 操作符之后的監(jiān)聽者的運(yùn)行線程就變了
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
        }
    }
二、subscribeOn() 的線程切換原理

PS:這個(gè)切換原理其實(shí)和 observeOn() 原理很像

跟 observeOn() 一樣,只不過這個(gè)操作的對(duì)象是 ObservableSubscribeOn, 這個(gè)對(duì)象也是同樣的代碼邏輯,運(yùn)行 subscribe() 方法,然后調(diào)用 subscribeActual() 方法,所以就直接貼 subscribeActual() 的代碼

public void subscribeActual(final Observer<? super T> s) {
    //創(chuàng)建與之綁定的 SubscribeOnObserver
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    s.onSubscribe(parent);
    //1. 創(chuàng)建 SubscribeTask 實(shí)際上就是個(gè) Runnable
    //2. 然后調(diào)用 scheduler.scheduleDirect 方法
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

我們看一下 scheduleDirect 的方法

public Disposable scheduleDirect(@NonNull Runnable run) {
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    //一個(gè) Runnable 具體作用沒分析
    DisposeTask task = new DisposeTask(decoratedRun, w);
    //這個(gè)代碼看著熟悉嗎  沒錯(cuò)上面 observeOn 提到過,知道它是運(yùn)行 Runnable 我們就直接看 Runnable 里面的 run() 了
    w.schedule(task, delay, unit);
    return task;
}

我們看一下 DisposeTask 的 run()

public void run() {
    runner = Thread.currentThread();
    try {
        decoratedRun.run();
    } finally {
        dispose();
        runner = null;
    }
}

調(diào)來調(diào)去我們又回到了 SubscribeTask 的 run()

public void run() {
    source.subscribe(parent);
}

這個(gè)地方的運(yùn)行線程已經(jīng)被切換了,他又開始往上一層層的去訂閱,所以 create(new ObservableOnSubscribe<String>(){})這個(gè)匿名實(shí)現(xiàn)接口運(yùn)行 subscribe 的線程運(yùn)行環(huán)境都被改變了,再去調(diào)用 onNext() 等方法線程環(huán)境也是被改變的

為什么 subscribeOn() 只有第一次切換有效

寫到這里我們這個(gè)問題也就能回答了
因?yàn)?RxJava 最終能影響 ObservableOnSubscribe 這個(gè)匿名實(shí)現(xiàn)接口的運(yùn)行環(huán)境的只能是最后一次運(yùn)行的 subscribeOn() ,又因?yàn)?RxJava 訂閱的時(shí)候是從下往上訂閱,所以從上往下第一個(gè) subscribeOn() 就是最后運(yùn)行的,這就造成了寫多個(gè) subscribeOn() 并沒有什么亂用的現(xiàn)象。


分析一下 RxAndroid

RxAndroid 源碼

其實(shí) RxAndroid 里面并沒有什么復(fù)雜的代碼,他其實(shí)只是提供一個(gè)能切換到 Android 主線程線程調(diào)度器。

其實(shí)它的原理和 RxJava 自帶的那些線程調(diào)度器一樣,如果你想了解 RxJava 的 IO 線程池,什么的可以自己看一看,我這里分析 RxAndroid 主要有以下幾點(diǎn)原因

  1. 弄清楚 RxAndroid 這個(gè)庫的具體作用
  2. 弄清楚他是怎么就能把線程切換到主線程(他是怎么提供的主線程環(huán)境)
  3. 弄清楚線程調(diào)度器的運(yùn)行原理
  4. 最重要的是它相對(duì)于 RxJava 自帶的那些調(diào)度器,他比較簡單容易分析

正文開始

首先我們找一下入口 AndroidSchedulers.mainThread() 這個(gè)地方應(yīng)該是就是入口了,我們看一下 AndroidSchedulers 這個(gè)類的源碼吧,總共也沒幾行

private static final class MainHolder {
    static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
        new Callable<Scheduler>() {
            @Override public Scheduler call() throws Exception {
                return MainHolder.DEFAULT;
            }
        }
    );

    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }

    public static Scheduler from(Looper looper) {
        if (looper == null) throw new NullPointerException("looper == null");
        return new HandlerScheduler(new Handler(looper));
    }

這個(gè)應(yīng)該不用我多說大家都能看明白,看到這里我們基本上明白了 RxAndroid 就是通過 Handler 來拿到主線程的

我們拿 subscribeOn() 中的一些流程來說

public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    DisposeTask task = new DisposeTask(decoratedRun, w);
    w.schedule(task, delay, unit);
    return task;
}

首先我們看到調(diào)用了 createWorker() 這是個(gè)抽象方法我們找到具體實(shí)現(xiàn)類 HandlerScheduler

public Worker createWorker() {
    return new HandlerWorker(handler);
}

單純的創(chuàng)建一個(gè) Worker 并把主線程的 Handler 傳進(jìn)去,然后調(diào)用 Worker 的 schedule() 方法

public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
    /**忽略一些代碼**/
    run = RxJavaPlugins.onSchedule(run);

    ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

    Message message = Message.obtain(handler, scheduled);
    message.obj = this; // Used as token for batch disposal of this worker's runnables.

    handler.sendMessageDelayed(message, unit.toMillis(delay));

    if (disposed) {
          handler.removeCallbacks(scheduled);
          return Disposables.disposed();
    }
    return scheduled;
}

到這里看明白 RxJava 如何通過 RxAndroid 來切換到主線程運(yùn)行,其實(shí) RxAndroid 的核心就是 Handler


總結(jié)

本篇參考 RxJava 2.1.12 與 RxAndroid:2.0.2 源碼

  1. observeOn() 只是在收到 onNext() 等消息的時(shí)候改變了從下一個(gè)開始的操作符的線程運(yùn)行環(huán)境。
  2. subscribeOn() 線程切換是在 subscribe() 訂閱的時(shí)候切換,他會(huì)切換他下面訂閱的操作符的運(yùn)行環(huán)境,因?yàn)橛嗛喌倪^程是自下而上的,所以第一個(gè)出現(xiàn)的 subscribeOn() 操作符反而是最后一次運(yùn)行的。

observeOn()、subscribeOn() 沒有任何先后順序的問題。

不得不說 Handler 在安卓中的地位真的是很牛逼
見解不到的地方歡迎大家指出

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容