RxJava2源碼分析(二) ---- subscribeOn

subscribeOn

    Observable.create((ObservableOnSubscribe<Integer>) e -> {
        System.out.println("observable : " + Thread.currentThread());
        e.onNext(1);
    })
            .subscribeOn(Schedulers.single())
            .subscribe(integer -> {
                System.out.println(integer);
                System.out.println("observer:  " + Thread.currentThread());
            });

Rxjava默認是在當前線程生發(fā)送事件, subscribeOn可以切換Observable發(fā)送事件所在的線程;
如果沒有使用ObserveOn指定消費事件的線程, Observer將在Observable發(fā)送事件的的線程, 消費事件;

源碼分析目的:

  1. Schduler 作用
  2. subscribeOn 做了什么

1. Schduler

Schduler不好直接用代碼解釋, 先說結論, 后面再去具體代碼分析;

  1. 切換線程, 需要提供對應的Schduler;
  2. Schduler可以通過createWorker方法, 創(chuàng)建一個Worker類的實例;
  3. Worker有一個schedule方法, 提交runnable去運行; 切換線程, 就是把各個onNext的調用方法,封裝成一個runnable 提交到指定線程去運行;
  4. 通過Worker.schedule提交runnable后, 會返回一個disposable對象, 用于取消或控制Observale的發(fā)射任務;
  5. Schduler本身是個管理類, 一般內部會創(chuàng)建具體的線程池, 同時通過統(tǒng)一的start shutdown等方法管理著線程池
  6. Schduler同時也管理著由createWorker創(chuàng)建的Worker; Worker一般都是持有Schduler中的線程池, 提交的runnable也是提交到該線程池

2. subscribeOn

2.1 Observable.subscribeOn
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

1. 參數(shù)檢測
2. 創(chuàng)建`ObservableOnSubscribe`對象, 并將當前Observable和Schduler傳入;
3. RxJavaPlugins的hook; 這個前面說過, 用于hook, 默認傳入什么 就返回什么;
2.2 ObservableSubscribeOn
  1. ObservableSubscribeOn是Observable的子類, 內部包含一個Observableschduler, 用于對原Obverable擴展, 是一個裝飾模式;
  2. 上面說了, ObservableSubscribeOn是一個裝飾模式, 繼承于HasUpstreamObservableSource, 有一個source方法去獲取被裝飾的Observable對象;
  3. 上一篇說過, Observable.create方法創(chuàng)建的Observable, 實際是一個ObservableCreater對象, 現(xiàn)在ObservableScbscribeOn中包含的Observable即ObservableCreater;
  4. Observable的subscribe方法, 實際調用的是具體子類的subscribeActual方法;
2.3 ObservableSubscribeOn.subscribeActual

直接看 ObservableSubscribeOn.subscribeActual 的代碼;

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
  1. 創(chuàng)建 SubscribeOnObserver, 并傳入原始的Observer
  2. 調用Observer的onSubscribe方法
  3. 構建SubscribeTask, 并提交給Schduler去執(zhí)行
2.4 SubscribeOnObserver

SubscribeOnObserverObservableSubscribeOn的靜態(tài)內部類, 同時也是繼承于Observer, 內部也包含一個原始的Observer, 也是一個裝飾模式;

SubscribeOnObserver對被裝飾類沒有額外增加功能, 僅僅是一個封裝, 在onNext, onError等方法中, 直接是調用的actual.onNext, actual.onError;

2.5 SubscribeTask

SubscribeTask 是一個runnable對象, 是ObservableSubscribeOn的內部類; 前面Schduler中說過, 切換線程, 就是將消息發(fā)送,包裝成一個runnable, 提交給Worker去執(zhí)行;
這個SubscribeTask將原先的發(fā)送事件代碼 封裝成的runnable, 然后送去對應的線程池執(zhí)行;

直接看run方法

    public void run() {
        source.subscribe(parent);
    }

ObservableSubscribeOn是Observable的子類, 同時是裝飾模式, 內部持有一個Observable, source是被包裝的Observable, 在此處的代碼中, source即是ObservableCreater, parent是SubscribeOnObserver, source.subscribe即和第一篇中的邏輯一樣了;

2.6 scheduler.scheduleDirect
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}

@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();

    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

    DisposeTask task = new DisposeTask(decoratedRun, w);

    w.schedule(task, delay, unit);

    return task;
}
  1. 通過createWorker創(chuàng)建相應的Worker;
  2. hook處理相應的runnable, 默認沒處理;
  3. 創(chuàng)建DisposeTask, 將需要運行的runnable對象, 封裝成disposable對象, 用于執(zhí)行取消操作;
  4. 將封裝后的runnable提交給worker去運行;

此處的scheduler由Schedulers.single()生成, 實際是一個SingleScheduler;

2.6.1 Worker.schedule()

直接看 SingleScheduler的代碼

####### Schedulers.createWorker 創(chuàng)建Worker; 獲取公共的線程池, 創(chuàng)建Worker

    public Worker createWorker() {
        return new ScheduledWorker(executor.get());
    }

####### Worker.scheduler

    public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        if (disposed) {
            return EmptyDisposable.INSTANCE;
        }

        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, tasks);
        tasks.add(sr);

        try {
            Future<?> f;
            if (delay <= 0L) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delay, unit);
            }

            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            dispose();
            RxJavaPlugins.onError(ex);
            return EmptyDisposable.INSTANCE;
        }

        return sr;
    }
  1. 封裝傳入的runnable對象, 將其封裝成ScheduledRunnable對象
  2. 提交給線程池運行, ScheduledRunnable本身是一個Callable對象, 可以用于取消執(zhí)行

上述提交給線程池運行的流程, 最終封裝的運行的run方法, 其實還是最先封裝的SubscribeTask中的source.subscribe(parent);這一句代碼;
SubscribeTask本身對應的runnable被一次次傳遞封裝, 最后給線程池運行;
source.subscribe(parent);中, 上面說到是一個裝飾模式, 運行的還是Observable.subscribeActual方法, 最后的運行邏輯和上一篇相同;
最后會調到ObservableSubscribeOn.onNext方法, 內部沒做處理, 裝飾模式,調用上一級的onNext方法

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容