拆輪子系列--RxJava理解(二)--subscribeOn

本系列文章如下:

上一篇文章主要介紹了RxJava中基本的調(diào)用流程以及常用的操作符--map。本文主要介紹RxJava中線程調(diào)度的核心操作符之一subscribeOn,本文源碼分析基于RxJava2。
本文的大綱如下:

  • 一個具體的例子
  • subscribeOn源碼分析
  • 總結(jié)

1 . 一個具體的例子

Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("1");
                e.onNext("2");
                e.onNext("3");
                e.onComplete();
            }
        }).map(new Function<String, Integer>() {
            @Override
            public Integer apply(String str) throws Exception {
               Log.e("TAG", "-map-thread-" + Thread.currentThread().getName() + "-string-" + s);
                return Integer.valueOf(s);
            }
        }).subscribeOn(Schedulers.io())
             .subscribe(new Consumer<Integer>() {
              @Override
              public void accept(Integer integer) throws Exception {
                       Log.e("TAG", "-sub-" + Thread.currentThread().getName() + "-integer-" + integer);
                    }
                });
    }

運行下上面的代碼,我們先看看結(jié)果會是怎么樣的:

E/TAG: -map-thread-RxCachedThreadScheduler-1-string-1
E/TAG: -sub-RxCachedThreadScheduler-1-integer-1
E/TAG: -map-thread-RxCachedThreadScheduler-1-string-2
E/TAG: -sub-RxCachedThreadScheduler-1-integer-2
E/TAG: -map-thread-RxCachedThreadScheduler-1-string-3
E/TAG: -sub-RxCachedThreadScheduler-1-integer-3

如果我們調(diào)用多次的subscribeOn()會是怎么樣子的呢?我們改下代碼看看:

Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("1");
                e.onNext("2");
                e.onNext("3");
                e.onComplete();
            }
        })
                .subscribeOn(AndroidSchedulers.mainThread())
                .map(new Function<String, Integer>() {
                    @Override
                    public Integer apply(String s) throws Exception {
                        Log.e("TAG", "-map-thread-" + Thread.currentThread().getName() + "-string-" + s);
                        return Integer.valueOf(s);
                    }
                })
                .subscribeOn(Schedulers.io())
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        Log.e("TAG", "-sub-" + Thread.currentThread().getName() + "-integer-" + integer);
                    }
                });
    }

我們再來看看打印的信息:

E/TAG: -map-thread-main-string-1
E/TAG: -sub-main-integer-1
E/TAG: -map-thread-main-string-2
E/TAG: -sub-main-integer-2
E/TAG: -map-thread-main-string-3
E/TAG: -sub-main-integer-3

有沒有發(fā)現(xiàn),當(dāng)我們調(diào)用兩次的subscribeOn()這個操作符,只有最上面的那個起了作用,后面的subscribeOn()操作符無效。這個是為什么呢?接下來進(jìn)行源碼分析。

2. subscribeOn源碼分析

首先,我們來看看subscribeOn()這個操作符到底干了些什么事情,然后再來解釋為什么多次調(diào)用subscribeOn()這個操作符,只有第一個操作符有效。

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
   ObjectHelper.requireNonNull(scheduler, "scheduler is null");
   return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

我們首先看看onAssembly()這個里面到底干了些什么事情:

public static <T> Observable<T> onAssembly(Observable<T> source) {
        Function<Observable, Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

從源碼中我們可以看出,它并沒有干什么事情,就是返回了一個Observable的對象,接下來我們看看ObservableSubscribeOn()這部分的源碼:

public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    s.onSubscribe(parent);
    parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
         @Override
         public void run() {
              source.subscribe(parent);
         }
     }));
  }

在上面的代碼中,真正的執(zhí)行操作是在subscribeActual()方法中,該方法中首先執(zhí)行的操作就是通過代理的方式使用SubscribeOnObserverObserver進(jìn)行封裝,設(shè)置Disposable來將subscribe進(jìn)行線程切換到scheduler的線程中。
簡單點來說,subscribeOn這個操作符就是將上一層的ObservableSource(就是上一層的Observable)放到一個新的線程去發(fā)射元素。
ok,這里也就解釋了為什么調(diào)用多個subscribeOn()時,只有最上面的subscribeOn()起作用,那么根據(jù)上面的分析,當(dāng)我們在操作的時候加入多個map()操作符的時候,情況會是怎么樣子的?如下示例:

Observable.create(new ObservableOnSubscribe<String>() {

            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("1");
                e.onNext("2");
                e.onComplete();
            }
        })
                .map(new Function<String, Integer>() {
                    @Override
                    public Integer apply(String s) throws Exception {
                        Log.e("TAG", "-map-thread-" + Thread.currentThread().getName() + "-string-" + s);
                        return Integer.valueOf(s);
                    }
                })
                .subscribeOn(Schedulers.io())
                .map(new Function<Integer, Long>() {
                @Override
                public Long apply(Integer integer) throws Exception {
                        Log.e("TAG", "-map-thread-" + Thread.currentThread().getName() + "-Integer-" + integer);
                        return Long.valueOf(integer);
                    }
                })
                .subscribe(new Consumer<Long>() {
                @Override
                public void accept(Long longValue) throws Exception {
                        Log.e("TAG", "-sub-" + Thread.currentThread().getName() + "-longValue-" + longValue);
                    }
                });

結(jié)果如下:

E/TAG: -map-thread-RxCachedThreadScheduler-1-string-1
E/TAG: -map-thread-RxCachedThreadScheduler-1-Integer-1
E/TAG: -sub-RxCachedThreadScheduler-1-longValue-1
E/TAG: -map-thread-RxCachedThreadScheduler-1-string-2
E/TAG: -map-thread-RxCachedThreadScheduler-1-Integer-2
E/TAG: -sub-RxCachedThreadScheduler-1-longValue-2

從結(jié)果中我們發(fā)現(xiàn),不管執(zhí)行了多少次的map()操作,每個Observable只有第一個subscribeOn()方法起作用,而且多次的map()操作不會影響線程的變換。

3. 總結(jié)

首先,用一張圖來展示subscribeOn()這個操作符基本運行的流程:

subscribeOn基本流程.png

subscribeOn這個操作符主要的執(zhí)行邏輯現(xiàn)在總結(jié)如下:

    1. 代理Observable做的事情就是將Observable1發(fā)射到scheduler指定的線程中進(jìn)行subscribe(),并且調(diào)用下一層傳上來的observer1來接收事件;
    1. 當(dāng)我們多次調(diào)用subscribeOn()方法后,其實此時的事件并沒有發(fā)射出去,當(dāng)執(zhí)行到第一個subscribeOn()方法時,后面subscribeOn()切換線程就會被第一個subscribeOn()切換線程的方法截斷,因此,多次調(diào)用subscribeOn()對整個流程是不產(chǎn)生影響的,從而這里可以解釋前面的問題,當(dāng)使用多個subscribeOn()的時候,也只有第一個subscribeOn()起作用。

再舉個淺顯的栗子來說明下多次調(diào)用subscribeOn的情況:

new Thread(new Runnable() {
            @Override
            public void run() {
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        new Thread(new Runnable() {
                            @Override
                            public void run() {
                                Log.e("TAG", "which thread?=" + Thread.currentThread().getName());
                            }
                        }, "線程3").start();
                    }
                }, "線程2").start();
            }
        }, "線程1").start();

上面代碼執(zhí)行的結(jié)果:

E/TAG: which thread?=線程3
    1. 當(dāng)我們使用subscribeOn()且使用多個map()操作符的時候,subscribeOn會將上一層Observable切換到一個指定的線程進(jìn)行事件的發(fā)射,而上一層Observable拿到的observer已經(jīng)是由下層observer一路封裝上來的,也就是該observer經(jīng)歷了所有的轉(zhuǎn)換。因此,不管執(zhí)行了多少次的map()操作,
      不管該map()操作是執(zhí)行在subscribeOn()之前還是之后,其最后的發(fā)射事件會在subscribeOn()指定的線程執(zhí)行。

RxJava中線程調(diào)度方法subscribeOn()方法這里就介紹完畢,下一篇繼續(xù)分析RxJava中另一個線程調(diào)度方法observableOn()。

如果文章中有什么疏漏或者錯誤的地方,還望各位指正,你們的監(jiān)督是我最大的動力,謝謝!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容