RxJava2.x 從源碼分析原理

RxJava 相信各位已經(jīng)使用了很久,大部分人在剛學(xué)習(xí) RxJava 感嘆切換線程的方便,調(diào)用邏輯清晰的同時(shí),并不知道其中的原理,主要是靠記住運(yùn)行的順序。
隨著我們?cè)O(shè)計(jì)出的 RxJava流 越來越復(fù)雜,一些復(fù)雜的問題并不能靠著記住的運(yùn)行順序就能解決。
下面,就通過最常用的操作符的源碼來看看所謂的是什么運(yùn)行的。

首先我們用Single舉例,設(shè)計(jì)一個(gè)最基本的 RxJava 流,只有一個(gè) Observable(ColdObservable)Obsever

Disposable disposable = Single.just("wtf")
                        .subscribe(it -> Log.i("subscribe", it));

上游發(fā)送一個(gè)"wtf" ,下游接受時(shí)將其打印出來。上游發(fā)送端使用 Single.just 作為創(chuàng)建方法,
看一下 just() 方法里做了什么。

    public static <T> Single<T> just(final T item) {
        ObjectHelper.requireNonNull(item, "value is null");
        return RxJavaPlugins.onAssembly(new SingleJust<T>(item));
    }
    
    public static <T> Single<T> onAssembly(@NonNull Single<T> source) {
    Function<? super Single, ? extends Single> f = onSingleAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}

其中 ObjectHelper.requireNonNull 只是空檢查。
RxJavaPlugins.onAssembly 方法,這個(gè)方法其實(shí)就是通過一個(gè)全局的變量 onSingleAssembly 來對(duì)方法進(jìn)行 Hook ,這一系列xxxAssembly全局變量默認(rèn)為空,所以實(shí)際上當(dāng)我們沒有設(shè)置的時(shí)候其實(shí) just 方法是直接返回了一個(gè) 新實(shí)例化的SingleJust對(duì)象。

再看看SingleJust內(nèi)部:

public final class SingleJust<T> extends Single<T> {

    final T value;
    public SingleJust(T value) {
        this.value = value;
    }

    @Override
    protected void subscribeActual(SingleObserver<? super T> observer) {
        observer.onSubscribe(Disposables.disposed());
        observer.onSuccess(value);
    }

}

實(shí)例化的時(shí)候只是將值保存了下來,沒有其它操作。
下一步調(diào)用subscribe()來啟動(dòng)這個(gè)流(ColdObservable),然后看看subscribe中做了什么:

    public final void subscribe(SingleObserver<? super T> subscriber) {
        ObjectHelper.requireNonNull(subscriber, "subscriber is null");
        subscriber = RxJavaPlugins.onSubscribe(this, subscriber);
        ObjectHelper.requireNonNull(subscriber, "subscriber returned by the RxJavaPlugins hook is null");

        try {
             //核心邏輯
            subscribeActual(subscriber);
        } catch (NullPointerException ex) {
            throw ex;
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            NullPointerException npe = new NullPointerException("subscribeActual failed");
            npe.initCause(ex);
            throw npe;
        }
    }

同樣 RxJavaPlugins.onSubscribe 默認(rèn)沒有作用,實(shí)際的核心邏輯是調(diào)用了subscribeActual(SingleObserver)。
對(duì)于我們上面設(shè)計(jì)的流,則是調(diào)用了 SingleJust 中的 subscribeActual(SingleObserver)

回顧上面 SingleJustsubscribeActual(SingleObserver) 的實(shí)現(xiàn):

        observer.onSubscribe(Disposables.disposed());
        observer.onSuccess(value);

得到兩個(gè)信息

  • 首先調(diào)用下游觀察者 SingleObserverOnSubscribe 方法并傳遞用于取消操作的 Disposable
  • 調(diào)用OnSuccess 方法并傳遞之前保存下來的 value

Map 操作符

現(xiàn)在我們加入一個(gè)常用且重要的Map操作到流中

Disposable disposable = Single.just("wtf")
                 .map(it-> 0)
                 .subscribe(it -> Log.i("subscribe", String.of(it)));

上面這個(gè)流包括了三種典型的操作 創(chuàng)建Creation 操作符Transformation和 訂閱Subscribe。

依然先檢查map() 方法,可以看到其中實(shí)例化了一個(gè)SingleMap

    public final <R> Single<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new SingleMap<T, R>(this, mapper));
    }

再看看 SingleMap

public final class SingleMap<T, R> extends Single<R> {
    final SingleSource<? extends T> source;
    final Function<? super T, ? extends R> mapper;

    public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
        this.source = source;
        this.mapper = mapper;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super R> t) {
        source.subscribe(new MapSingleObserver<T, R>(t, mapper));
    }

    static final class MapSingleObserver<T, R> implements SingleObserver<T> {

        final SingleObserver<? super R> t;
        final Function<? super T, ? extends R> mapper;

        MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
            this.t = t;
            this.mapper = mapper;
        }

        @Override
        public void onSubscribe(Disposable d) {
            t.onSubscribe(d);
        }

        @Override
        public void onSuccess(T value) {
            R v;
            try {
                v = ObjectHelper.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                onError(e);
                return;
            }

            t.onSuccess(v);
        }

        @Override
        public void onError(Throwable e) {
            t.onError(e);
        }
    }
}

類中信息稍微復(fù)雜一些:

  1. 首先我們關(guān)注在SingleMap實(shí)例化的時(shí)候也是只做了保存數(shù)據(jù)的操作,而沒有實(shí)際邏輯:將流的上游保存為 source 將數(shù)據(jù)轉(zhuǎn)換的方法保存為 mapper
  2. 第二步我們知道下游觀察者 SingleObserver 會(huì)調(diào)用核心邏輯 subscribeActual方法來啟動(dòng)流
  3. 在這里的subscribeActual方法中可以看到幾個(gè)重要的信息
    • MapSingleObserver是一個(gè)觀察者
    • MapSingleObserver 保存了下游的觀察者 SingleObserver 以及 mapper
    • 上游 sourceMapSingleObserver 訂閱

由此可以看出在SingleMap被下游觀察者訂閱了之后,實(shí)例化了一個(gè)新的觀察者MapSingleObserver并保存下游觀察者SingleObserver的信息,再去訂閱上游SingleJust。
這種模式創(chuàng)建了一個(gè)裝飾類,用來包裝原有的類,并在保持類方法簽名完整性的前提下,提供了額外的功能的設(shè)計(jì)模式稱為裝飾者模式。

總結(jié)上面的執(zhí)行順序:

  1. Rx流的最后一步調(diào)用 subscribe啟動(dòng)流(ColdObservable)
  2. 首先執(zhí)行SingleMap中的subscribeActual方法,其中包括生成新的MapSingleObserver并訂閱 SingleJust
  3. 執(zhí)行SingleJust中的subscribeActual:調(diào)用下游MapSingleObserveronSubscribe onSuccess方法
  4. MapSingleObserver中的onSubsribe onSuccess方法也很簡單,分別調(diào)用下游 ObserveronSubsribe``onSuccess(異常時(shí) onError)方法

observeOn 操作符

Rxjava首先被大家津津樂道之處是可以方便的切換線程,避免Callback Hell,現(xiàn)在來看看線程切換操作符。
我們加入線程切換操作符 observeOn

Disposable disposable = Single.just("wtf")
                 .map(it-> 0)
                 .observeOn(Schedulers.io())
                 .subscribe(it -> Log.i("subscribe", String.of(it)));

同樣的,在 observeOn方法中實(shí)例化了一個(gè)SingleObserveOn

    public final Single<T> observeOn(final Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new SingleObserveOn<T>(this, scheduler));
    }

繼續(xù)看SingleObserveOn類中信息

public final class SingleObserveOn<T> extends Single<T> {

    final SingleSource<T> source;
    final Scheduler scheduler;

    public SingleObserveOn(SingleSource<T> source, Scheduler scheduler) {
        this.source = source;
        this.scheduler = scheduler;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super T> s) {
        source.subscribe(new ObserveOnSingleObserver<T>(s, scheduler));
    }

    static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable, Runnable {
        private static final long serialVersionUID = 3528003840217436037L;

        final SingleObserver<? super T> actual;
        final Scheduler scheduler;

        T value;
        Throwable error;

        ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
            this.actual = actual;
            this.scheduler = scheduler;
        }

        @Override
        public void onSubscribe(Disposable d) {
            if (DisposableHelper.setOnce(this, d)) {
                actual.onSubscribe(this);
            }
        }

        @Override
        public void onSuccess(T value) {
            this.value = value;
            Disposable d = scheduler.scheduleDirect(this);
            DisposableHelper.replace(this, d);
        }

        @Override
        public void onError(Throwable e) {
            this.error = e;
            Disposable d = scheduler.scheduleDirect(this);
            DisposableHelper.replace(this, d);
        }

        @Override
        public void run() {
            Throwable ex = error;
            if (ex != null) {
                actual.onError(ex);
            } else {
                actual.onSuccess(value);
            }
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
    }
}


類似的

  • 構(gòu)造函數(shù)中保存了上游和線程切換的信息
  • subscribeActual 實(shí)例化了一個(gè)新的觀察者ObserveOnSingleObserver

不同的

  • ObserveOnSingleObserver 還繼承了AtomicReference<Disposable>、實(shí)現(xiàn)了Disposable``Runnable接口
  • onSuccess``onError中都沒有直接調(diào)用下游的onSuccess onError方法,而是調(diào)用了Disposable d = scheduler.scheduleDirect(this);來執(zhí)行run方法中的邏輯,而run方法中的邏輯則是調(diào)用下游的onSuccess onError方法

查看schedulerDirect內(nèi)部信息

    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        DisposeTask task = new DisposeTask(decoratedRun, w);
        w.schedule(task, delay, unit);
        return task;
    }

創(chuàng)建了一個(gè)對(duì)應(yīng)線程的Worker和一個(gè)可用于取消的DisposeTask并執(zhí)行,對(duì)于IoScheduler則是創(chuàng)建了EventLoopWorker,再看看EventLoopWorker中的信息。

    @Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }
    static final class EventLoopWorker extends Scheduler.Worker {
        private final CompositeDisposable tasks;
        private final CachedWorkerPool pool;
        private final ThreadWorker threadWorker;

        final AtomicBoolean once = new AtomicBoolean();

        EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.tasks = new CompositeDisposable();
            this.threadWorker = pool.get();
        }

        @Override
        public void dispose() {
            if (once.compareAndSet(false, true)) {
                tasks.dispose();

                // releasing the pool should be the last action
                pool.release(threadWorker);
            }
        }

        @Override
        public boolean isDisposed() {
            return once.get();
        }

        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }

            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    }

EventLoopWorker中則是維護(hù)了一套包含相應(yīng)的線程池、可取消的CompositeDisposable、以及用于運(yùn)行RunableThreadWorker??偟膩碚f就是一套可以在相應(yīng)線程運(yùn)行且可取消的類和邏輯。

  • 上面則解釋了為什么observeOn可以切換下游的線程(onSuccess onError)
  • 同樣解釋了為什么不會(huì)改變onSubsribe的調(diào)用線程,因?yàn)榭梢钥吹?code>onSubscribe方法中直接調(diào)用了下游的onSucsribe,并沒有受到線程切換的影響。

SubscribeOn

實(shí)際上,subsribeOn 是 RxJava2.x 中比較復(fù)雜也是相較于 RxJava1.x 改動(dòng)比較大的一個(gè)操作符,它甚至?xí)绊懥鞯膱?zhí)行順序。(可以參見唐雪茂寫的 Rxjava流的設(shè)計(jì) 中的1 2兩個(gè)流)

我們現(xiàn)在設(shè)計(jì)兩個(gè)Rx流

Disposable disposable = Single.just("wtf")
                 .doOnSubsribe(it-> Log.i("doOnSubsribe", 0)
                 .doOnSubsribe(it-> Log.i("doOnSubsribe", 1)
                 .doOnSubsribe(it-> Log.i("doOnSubsribe", 2)
                 .doOnSubsribe(it-> Log.i("doOnSubsribe", 3)
                 .subscribe(it -> Log.i("subscribe", 4);
Disposable disposable2 = Single.just("wtf")
                 .doOnSubsribe(it-> Log.i("doOnSubsribe", 0)
                 .doOnSubsribe(it-> Log.i("doOnSubsribe", 1)
                 .subscribeOn(Schedulers.io())
                 .doOnSubsribe(it-> Log.i("doOnSubsribe", 2)
                 .doOnSubsribe(it-> Log.i("doOnSubsribe", 3)
                 .subscribe(it -> Log.i("subscribe", 4);

你可能已經(jīng)知道并記住了兩個(gè)流的打印的順序分別是 01234 23014,但是為什么doOnSubsribe方法和RxJava1中調(diào)用順序完全不一樣,為什么通過subscribeOn切換線程會(huì)影響執(zhí)行順序?

先找到 SingleSubscribeOn

public final class SingleSubscribeOn<T> extends Single<T> {
    final SingleSource<? extends T> source;
    final Scheduler scheduler;
    
    public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) {
        this.source = source;
        this.scheduler = scheduler;
    }

    @Override
    protected void subscribeActual(final SingleObserver<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s, source);
        //直接調(diào)用下游 onSubscribe
        s.onSubscribe(parent);
        //再執(zhí)行訂閱上游的方法
        Disposable f = scheduler.scheduleDirect(parent);
        parent.task.replace(f);
    }

    static final class SubscribeOnObserver<T>
    extends AtomicReference<Disposable>
    implements SingleObserver<T>, Disposable, Runnable {

        private static final long serialVersionUID = 7000911171163930287L;
        final SingleObserver<? super T> actual;
        final SequentialDisposable task;
        final SingleSource<? extends T> source;
        
        SubscribeOnObserver(SingleObserver<? super T> actual, SingleSource<? extends T> source) {
            this.actual = actual;
            this.source = source;
            this.task = new SequentialDisposable();
        }

        @Override
        public void onSubscribe(Disposable d) {
              //沒有繼續(xù)調(diào)用下游的 onSubscribe 方法
            DisposableHelper.setOnce(this, d);
        }

        @Override
        public void onSuccess(T value) {
            actual.onSuccess(value);
        }

        @Override
        public void onError(Throwable e) {
            actual.onError(e);
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
            task.dispose();
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }

        @Override
        public void run() {
            source.subscribe(this);
        }
    }

}

同樣的直接看subscribeActual方法及onSubscribe方法,發(fā)現(xiàn)事情并沒有那么簡單,和之前的操作符的邏輯區(qū)別很大:

  • SubscribeOnObserver同樣還繼承了AtomicReference<Disposable>,實(shí)現(xiàn)了Disposable``Runnable接口
  • 并沒有直接調(diào)用subscribe訂閱上游,而是執(zhí)行了其它操作符在 onSubscribe中訂閱下游的操作
  • 然后再結(jié)合Disposable f = scheduler.scheduleDirect(parent);run方法可以知道在新的線程中執(zhí)行了訂閱上游的操作 source.subscribe(this);
  • onSubsribe中并沒有再繼續(xù)調(diào)用下游的 onSubsribe

綜合起來可以知道,本來應(yīng)該在整個(gè)流從下至上訂閱完成后按照從上至下的順序執(zhí)行 onSubscribe的流,在使用subsribeOn操作符的后,在訂閱的時(shí)(執(zhí)行subscribeActual),就開始執(zhí)行下游的onSubscribe且在當(dāng)前線程!然后才在指定的io線程執(zhí)行之下而上的操作,這也是為什么subsribeOn影響的是上游的線程。

小結(jié):

我認(rèn)為實(shí)際上 Rx 使用了很多優(yōu)秀的設(shè)計(jì)將我們各種常用的操作進(jìn)行了封裝,讓我們自由組合使用,其本身并沒有用什么黑科技。例如切換線程本質(zhì)上則是幫我們啟用了一個(gè)新的線程并把接下來的代碼放進(jìn)去執(zhí)行。
當(dāng)然,其中還有很多更深入的內(nèi)容需要我們繼續(xù)發(fā)現(xiàn)和學(xué)習(xí)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容