RxJava 觀察綁定和事件發(fā)送流程及其中的線程切換分析

本文的所有分析都是基于 RxJava2 進(jìn)行的。以下的 RxJava 指 RxJava2
閱讀本文你將會知道:

  • RxJava 的觀察綁定和事件發(fā)送過程
  • RxJava 觀察綁定和事件發(fā)送過程中的線程切換

從 RxJava1.0 到 RxJava2.0,在項(xiàng)目開發(fā)中已經(jīng)使用了很長時(shí)間這個(gè)庫了。鏈?zhǔn)秸{(diào)用,絲滑的線程切換很香,但是如果沒弄清楚其中的奧妙很容易掉進(jìn)線程調(diào)度的坑里。這篇文章我們就來對 RxJava 的訂閱過程、時(shí)間發(fā)送過程、線程調(diào)度進(jìn)行分析

訂閱和事件流

先說結(jié)論

  • 按著代碼書寫順序,事件自上向下發(fā)送
  • 訂閱從 subscribe() 開始自下向上訂閱,這也是整個(gè)事件流的起點(diǎn),當(dāng)訂閱開始整個(gè)操作才會生效執(zhí)行
  • 訂閱完成后才會發(fā)送事件

圖解

為了更便于理解訂閱的流轉(zhuǎn)方向,我將Observable調(diào)用 subscribe() 訂閱描述為了 Observer beSubscribed()

訂閱及數(shù)據(jù)發(fā)送

源碼分析

Observabe 創(chuàng)建過程

此過程對應(yīng)圖中黑色箭頭部分,以操作符中的map()操作為例:

   @CheckReturnValue
   @SchedulerSupport(SchedulerSupport.NONE)
   public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
       ObjectHelper.requireNonNull(mapper, "mapper is null");
       return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
   }

調(diào)用map操作符時(shí),RxJavaPliguns 會注冊一個(gè)新的 ObservableMap 對象,查看其它操作符會發(fā)現(xiàn)都有對應(yīng)的 Observable 對象產(chǎn)生。同時(shí),上游的 Observabe會作為 source 參數(shù)傳入賦值給這個(gè)新的 Observablesource屬性。層層向下,可以對這個(gè)新生成的 Observable又可以繼續(xù)使用操作符。

訂閱過程:

當(dāng)調(diào)用最后一個(gè) Observablesubscribe() 方法時(shí),即開始訂閱過程。此過程對應(yīng)圖中紅色箭頭部分

   @SchedulerSupport(SchedulerSupport.NONE)
   @Override
   public final void subscribe(Observer<? super T> observer) {
       ObjectHelper.requireNonNull(observer, "observer is null");
       try {
           observer = RxJavaPlugins.onSubscribe(this, observer);

           ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

           subscribeActual(observer);
       } catch (NullPointerException e) { // NOPMD
           throw e;
       } catch (Throwable e) {
           Exceptions.throwIfFatal(e);
           // can't call onError because no way to know if a Disposable has been set or not
           // can't call onSubscribe because the call might have set a Subscription already
           RxJavaPlugins.onError(e);

           NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
           npe.initCause(e);
           throw npe;
       }
   }

在調(diào)用subscribe(Observer) 時(shí)實(shí)際上會去調(diào)用各個(gè) Observable實(shí)現(xiàn)子類中的 subscribeActual() 方法:

   @Override
   public void subscribeActual(Observer<? super U> t) {
       source.subscribe(new MapObserver<T, U>(t, function));
   }

而在這個(gè)subscribeActual() 方法也很簡單,調(diào)用了 source 去訂閱一個(gè)新生成的 Observer 對象,同時(shí)這個(gè)新的MapObserver會將調(diào)用subscribe()時(shí)傳入的 observer,賦值給downstream屬性。這樣每一級訂閱都會將上級的 Observable、本級生成的 Observer、訂閱下級傳入的Observer聯(lián)系起來,直到達(dá)到 Observable 最初創(chuàng)建的地方整個(gè)訂閱過程結(jié)束。

事件發(fā)送過程:

此過程對應(yīng)圖中綠色箭頭部分Observable 事件起點(diǎn)創(chuàng)建有很多中操作符,他們都會創(chuàng)建出最初發(fā)送的事件/數(shù)據(jù),以 ObservableCreate為例:

   @Override
   protected void subscribeActual(Observer<? super T> observer) {
       CreateEmitter<T> parent = new CreateEmitter<T>(observer);
       observer.onSubscribe(parent);

       try {
           source.subscribe(parent);
       } catch (Throwable ex) {
           Exceptions.throwIfFatal(ex);
           parent.onError(ex);
       }
   }

訂閱時(shí)會調(diào)用source.subscrebe(parent),而這個(gè)source 又是從哪兒來的呢?

   public ObservableCreate(ObservableOnSubscribe<T> source) {
       this.source = source;
   }
   Observable.create(object : ObservableOnSubscribe<String> {
          override fun subscribe(emitter: ObservableEmitter<String>) {
               emitter.onNext("data")
          }

   })

從代碼中我們可以看出,這個(gè) source 即為我們創(chuàng)建時(shí)傳入的 ObservableOnSubscribe,因此emitter.onNext("data")即是事件發(fā)送的起點(diǎn)。我們再繼續(xù)看emitteronNext() 做了什么:

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

源碼中現(xiàn)實(shí)調(diào)用了observer.onNext(),而這個(gè)observer 則是前面訂閱過程中 source.subscribe(new MapObserver<T, U>(t, function)) 傳入的那個(gè) observer,從而將事件發(fā)送到了下一級,下一級的 Observer 同樣在 onNext() 將事件發(fā)送到更下一級,一直到最終我們 subscribe()時(shí)傳入的那個(gè)Observer 實(shí)例完畢。

線程調(diào)度

事件訂閱發(fā)送流程通過上面的文章基本已經(jīng)能夠摸清了,我們接下來關(guān)注另一個(gè)重點(diǎn) 線程調(diào)度問題。

調(diào)度方式

RxJava 中線程變換通過 subscribeOn()observeOn()兩個(gè)操作來進(jìn)行。其中 subscribeOn()改變的是訂閱線程的執(zhí)行線程,即事件發(fā)生的線程。observeOn()改變的是事件結(jié)果觀察者回調(diào)所在線程,即 onNext()方法所在的線程。

舉個(gè)栗子

使用 RxJava + Retrofit 進(jìn)行網(wǎng)絡(luò)請求時(shí),用 RxJava 管理網(wǎng)絡(luò)請求過程的線程切換。subscribeOn()指定的是網(wǎng)絡(luò)請求的線程,observeOn()指定的是網(wǎng)絡(luò)請求后事件流的執(zhí)行線程。

源碼分析

前面說過,每次操作符的使用,RxJava 都會生成一個(gè)對應(yīng)的新的 Observable對象。observeOn()subscribeOn()也不例外。線程調(diào)度的核心邏輯都在 ObservableSubscribeOnObservableObserveOn兩個(gè)類中

subscribeOn()過程

  @CheckReturnValue
  @SchedulerSupport(SchedulerSupport.CUSTOM)
  public final Observable<T> subscribeOn(Scheduler scheduler) {
      ObjectHelper.requireNonNull(scheduler, "scheduler is null");
       return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
   }

調(diào)用 subscribeOn() 時(shí)會產(chǎn)生一個(gè)新的ObservableSubscribeOn并把當(dāng)前這個(gè)Observable 和傳入的 Scheduler作為參數(shù)傳入。前面分析過當(dāng)最終調(diào)用 subscribe()時(shí)會引起整個(gè)觀察鏈的 Observable 自下而上調(diào)用 subscribe(),而這個(gè)subscribe()方法中實(shí)際為調(diào)用抽象類 Observable的各個(gè)實(shí)現(xiàn)子類的 subscribeActual()方法 。

   @Override
   public void subscribeActual(final Observer<? super T> observer) {
       final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

       observer.onSubscribe(parent);

       parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
   }

主要看這句 scheduler.scheduleDirect(new SubscribeTask(parent));,SubscribeTask 前面內(nèi)容已經(jīng)分析過,就是調(diào)用上級 Observable 來訂閱生成的這個(gè) SubscribeOnObserver。

   @NonNull
   public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
       final Worker w = createWorker();

       final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

       DisposeTask task = new DisposeTask(decoratedRun, w);

       w.schedule(task, delay, unit);

       return task;
   }

scheduleDirect 方法,會使用傳入的 scheduler 在指定的線程創(chuàng)建一個(gè) Worker 對象來執(zhí)行SubscribeTask,從而達(dá)到了切換訂閱線程的目的。所以多個(gè)subscribeOn()疊加時(shí),最終線程還是會回到最后執(zhí)行的(代碼第一次出現(xiàn)的)subscribeOn() 指定的線程。

observeOn()過程

調(diào)用 observeOn(Scheduler) 方法,會調(diào)用內(nèi)部的同名方法生成一個(gè)新的 ObservableObserveOn對象,并把當(dāng)前這個(gè)Observable 和傳入的 Scheduler作為參數(shù)傳入。訂閱過程與ObservableSubscribeOn不一樣,會直接在當(dāng)前線程調(diào)用上級Observable訂閱自己,,我們主要看ObservableObserveOnObserveOnObserver是如何調(diào)度結(jié)果數(shù)據(jù)發(fā)送的線程的。

       @Override
       public void onNext(T t) {
           if (done) {
               return;
           }

           if (sourceMode != QueueDisposable.ASYNC) {
               queue.offer(t);
           }
           schedule();
       }

       void schedule() {
           if (getAndIncrement() == 0) {
               worker.schedule(this);
           }
       }

從源碼中可以發(fā)現(xiàn),最終會使用 worker 去向下游發(fā)送事件。這個(gè) worker就是我們observeOn() 方法中指定的線程創(chuàng)建的 worker。從而達(dá)到切換線程的目的,由于事件又是自上而下的,所以每次切換都能在下游事件中感受到線程的變化。

日志分析

subscribeOn()observeOn()放一起來說不太容易說明白其中的線程變換,我先看看單獨(dú)使用其中的一個(gè)操作符的時(shí)候,導(dǎo)致的線程變化。

僅調(diào)用 subscribeOn() 調(diào)度線程

Observable.just("Data")
                .map {
                    Log.d("Map 1", Thread.currentThread().name)
                    return@map it
                }
                .subscribeOn(Schedulers.io()) 
                .doOnSubscribe {
                    Log.d("doOnSubscribe 1 ", Thread.currentThread().name)
                }
                .map {
                    Log.d("Map 2 ", Thread.currentThread().name)
                    return@map it
                }
                .subscribeOn(Schedulers.newThread())
                .doOnSubscribe {
                    Log.d("doOnSubscribe 2 ", Thread.currentThread().name)
                }
                .map {
                    Log.d("Map 3 ", Thread.currentThread().name)
                    return@map it
                }
                .subscribe(object : Observer<String> {
                    override fun onComplete() {

                    }

                    override fun onSubscribe(d: Disposable) {
                        Log.d("onSubscribe", Thread.currentThread().name)
                    }

                    override fun onNext(t: String) {
                        Log.d("onNext", Thread.currentThread().name)
                    }

                    override fun onError(e: Throwable) {
                        e.printStackTrace()
                    }

                })

執(zhí)行結(jié)果:

image.png

從日志可以看出:

  • 1、訂閱是自下向上的(onSubscribe -->doOnSubscribe 2 -->doOnsubscribe 1)
  • 2、自下向上看,每次調(diào)用 subscribeOn 訂閱線程將會發(fā)生改變,直到下次調(diào)用 subscribeOn
  • 3、事件是自上向下傳遞的(Map 1 --> Map 2 --> Map 3 --> onNext),且所在線程為最后一次線程切換后所在的線程 RxCachedThreadScheduler-1

僅調(diào)用 subscribeOn() 調(diào)度線程

        Observable.just("Data")
                .map {
                    Log.d("Map 1", Thread.currentThread().name)
                    return@map it
                }
//                .doOnSubscribe {
//                    Log.d("doOnSubscribe 1 ", Thread.currentThread().name)
//                }
//                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.newThread())
                .map {
                    Log.d("Map 2 ", Thread.currentThread().name)
                    return@map it
                }
//                .doOnSubscribe {
//                    Log.d("doOnSubscribe 2 ", Thread.currentThread().name)
//                }
//                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.newThread())
                .map {
                    Log.d("Map 3 ", Thread.currentThread().name)
                    return@map it
                }
                .subscribe(object : Observer<String> {
                    override fun onComplete() {

                    }

                    override fun onSubscribe(d: Disposable) {
                        Log.d("onSubscribe", Thread.currentThread().name)
                    }

                    override fun onNext(t: String) {
                        Log.d("onNext", Thread.currentThread().name)
                    }

                    override fun onError(e: Throwable) {
                        e.printStackTrace()
                    }

                })

執(zhí)行結(jié)果:

日志打印

從日志可以看出:

  • 1、事件發(fā)送是正常的自上向下(Map 1 --> Map 2 --> Map 3 --> onNex)
  • 2、自上向下,每次調(diào)用 observeOn 觀察結(jié)果回調(diào)線程都將切換一次(main -->RxNewThreadScheduler-1 -->RxNewThreadScheduler-2)

混合使用調(diào)度線程

我們把上述代碼中注釋部分都打開,得到的日志如下:

日志打印

通過上面的三次日志打印我們可以看出:

訂閱鏈的日志自下而上打印完畢后,再自上而下打印觀察結(jié)果。subscribeOn 會切換線程,并不是像有的文章所說只有第一次指定線程(即自下而上的最后一次)有效。第一次有效只是我們的錯(cuò)覺,因?yàn)橛嗛喪亲韵露系模还芮懊娴木€程怎樣切換追蹤都會切換到 subscribeOn第一次指定線程(即自下而上的最后一次)。我們在回調(diào)結(jié)果中未進(jìn)行線程切換操作時(shí),只能感知到這一次線程切換 (Map1 與 doOnSubscribe 1 所在線程一致)。observeOn的每次指定線程都會讓事件流切換到對應(yīng)的線程中去。完整的事件訂閱和發(fā)送流程如下圖所示,從我們調(diào)用 subscribe()將觀察者和觀察對象關(guān)聯(lián)起來開始,subscribe() 中傳入的 Observer 的 onNextonError結(jié)束,形成了一個(gè)逆時(shí)針的 n 形的鏈條。右邊部分的觀察鏈中,每次 subscribeOn 都會切換觀察線程。左邊部分的事件發(fā)送鏈,會從觀察鏈的最后一次指定的線程開始發(fā)送事件,每次調(diào)用 observeOn都會指定新的事件發(fā)送線程。

圖解

參照上面的源碼和日志分析,再結(jié)合本圖相信大家會對 RxJava 的現(xiàn)場調(diào)度有一個(gè)更立體的認(rèn)識

RxJava2 線程切換流程
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容