RxJava 源碼淺析

本文只分析 RxJava 的基本原理與流程,不深入探討具體操作符的實(shí)現(xiàn)細(xì)節(jié)。

背景

為什么使用 RxJava?

解決異步回調(diào)多重嵌套

比如類(lèi)似的代碼:

new Thread(new Runnable() {
    @Override
    public void run() {
        //do something
        handler.post(new Runnable() {
            @Override
            public void run() {
                //getResult do something
            }
        });
    }
}).start();

RxJava 的實(shí)現(xiàn)

Observable.create((ObservableOnSubscribe<String>) e -> {
    //do something
})
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(s -> {
        //getResult do something
    });

鏈?zhǔn)秸{(diào)用,解決代碼多層嵌套問(wèn)題,在邏輯越發(fā)復(fù)雜時(shí)優(yōu)勢(shì)也將越發(fā)明顯。

解決回調(diào)地獄

當(dāng)有倆個(gè)及以上要求順序訪問(wèn)的網(wǎng)絡(luò)請(qǐng)求,往往會(huì)出現(xiàn)下面的代碼:

Net.mockNet(TOKEN_URL, new Net.Callback() {
    @Override
    public void onResponse(String result) {
        // getResult do something
        Net.mockNet(CART_URL, new Net.Callback() {
            @Override
            public void onResponse(String result) {
                // getResult do something
            }
        });
    }
});

這種不優(yōu)雅的實(shí)現(xiàn),可以用 RxJava 解決:

Observable<String> o1 = Observable.create((ObservableOnSubscribe<String>) e -> e.onNext(Net.mockNet(TOKEN_URL, 2000)));

o1.concatMap((Function<String, ObservableSource<?>>) s -> {
    return Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception {
            //getResult do something
            e.onNext(Net.mockNet(CART_URL, 2000));
        }
    });
})
        .cast(String.class)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(s -> {
        //getResult do something
        });

看似并不簡(jiǎn)潔,但是如果我們把創(chuàng)建 Observable 對(duì)象的過(guò)程(o1、o2)封裝為方法(暫不考慮封裝的實(shí)現(xiàn)),代碼就會(huì)變?yōu)槿缦滦Ч?/p>

askToken().concatMap(s -> askCart((String) s)) // getResult do something
        //.concatMap(s -> askAnother((String) s)) // 你還可以接著續(xù)接
        .cast(String.class)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(s ->
                // getResult do something
        );

實(shí)際上使用 Retrofit 框架后,代碼就會(huì)變得像上述一樣簡(jiǎn)潔,而且就邏輯而言也更加清晰易懂。

其它

配合其它框架實(shí)現(xiàn)更多優(yōu)勢(shì)功能。

使用

RxJava 的使用教程網(wǎng)上有很多,這里不作具體說(shuō)明。

源碼分析

基本原理分析 - 1

Rxjava 2.0 和 1.0 的核心原理區(qū)別不大,都是基于觀察者模式。

關(guān)于觀察者模式的核心原理,可以參考下面的演示代碼(初級(jí) demo,僅用作啟發(fā)):


/**
 * ?? 觀察者將實(shí)例交給被觀察者,并且當(dāng)被觀察者數(shù)據(jù)變化時(shí),被觀察者'主動(dòng)'去調(diào)用觀察者實(shí)例的方法。
 */

//被觀察者
public class MyObservable {

    private MyObserver myObserver;

    public void setObserver(MyObserver observer) {
        this.myObserver = observer;
    }

    public void deleteObserver() {
        this.myObserver = null;
    }

    protected void notifyData(Object obj) {
        if (myObserver != null) {
            myObserver.updateData(obj);
        }
    }
}


//觀察者
public interface MyObserver {

    void updateData(Object obj);
}

首先我們從 RxJava 最基礎(chǔ)的實(shí)現(xiàn)開(kāi)始,研究其原理:

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        // todo something
        e.onNext("test");
    }
})
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(String value) {
                //回調(diào)
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

Observable 是被觀察者,Observer 是觀察者,根據(jù)觀察者模式原理,消息是從被觀察者發(fā)送給觀察者的,所以我們從 Observable 的代碼開(kāi)始分析,先看 create 方法:

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

閱讀源碼注意不能被細(xì)節(jié)牽絆,要充分利用命名的意義。這里我們可以大致推斷出來(lái),我們得到的 Observable 就是 new ObservableCreate<T>(source),閱讀 RxJavaPlugins.onAssembly 源碼也能印證這一點(diǎn),這里就不深入討論,直接看 ObservableCreate 源碼。

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

    static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
            } else {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }
        ...
    }
    ...

這里截取了類(lèi)的部分代碼,先分析它的關(guān)鍵方法 subscribeActual()

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

subscribeActual() 最終調(diào)用了 source.subscribe(parent);,從上下文可知 source 即我們重寫(xiě)并傳入的 ObservableOnSubscribe 的參數(shù),所以 subscribeActual() 最終就是執(zhí)行我們 Observable.create 實(shí)現(xiàn)的 subscribe() 方法。

而 parent,就是在 Observable.create 中調(diào)用 emitter.onNext()ObservableEmitter 的具體實(shí)現(xiàn)。它的代碼也在上述源碼中:

@Override
public void onNext(T t) {
    if (t == null) {
        onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
        return;
    }
    if (!isDisposed()) {
        observer.onNext(t);
    }
}

也就是說(shuō) Emitter 作為發(fā)射器,最終還是將 onNext 還是交給了觀察者 observer.onNext()。

至此下游鏈路通了,代碼的執(zhí)行順序?yàn)?ObservableCreate.subscribeActual() → ObservableOnSubscribe.subscribe() → CreateEmitter.onNext() → Observer.onNext()。

那么是什么時(shí)候調(diào)用的 subscribeActual() 的呢?

我們知道被觀察者通知觀察者需要持有觀察者的實(shí)例,所以在這個(gè)問(wèn)題之前,我們先分析觀察者 observer 的實(shí)例是多會(huì)交給被觀察者 Observable 的。

在最初的示例代碼中,Observable 在 create 之后,緊接著調(diào)用 subscribe(new Observer()) 將觀察者傳給被觀察者:

obervable.subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(String value) {
                        //回調(diào)
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

閱讀 Observable.subscribe 源碼:

    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer); //??
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

就是說(shuō) Observable.subscribe() 直接調(diào)用了 Observable.subscribeActual()

所以對(duì)于這個(gè)簡(jiǎn)單模型,代碼執(zhí)行的完整順序?yàn)椋?/p>

基礎(chǔ)模型代碼調(diào)用順序

基本原理分析 - 2

上面理清了 RxJava 最基礎(chǔ)代碼的實(shí)現(xiàn)原理 -- 觀察者模式。下面稍加擴(kuò)展,通過(guò)分析 subscribeOnconcatMap 等操作符的基本實(shí)現(xiàn),研究整個(gè) Rx 鏈路的串行原理。這里不會(huì)分析 subscribeOn 如何跨線程、concatMap 如何分發(fā),有興趣可以自行研究。

subscribeOn 是指定被觀察者所在線程,看下源碼:

public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

這里,最終返回了 new ObservableSubscribeOn<T>(this, scheduler);

先來(lái)看下 ObservableSubscribeOn 以及它的參數(shù)究竟是誰(shuí)。

1.類(lèi) ObservableSubscribeOn

【ObservableSubscribeOn】 extends 【AbstractObservableWithUpstream】 extends 【Observable】 implements 【ObservableSource】

2.參數(shù) this

ObservableSource

3.參數(shù) scheduler

指定線程類(lèi)型,不具體分析。

subscribeOn() 的返回值 ObservableSubscribeOn 對(duì)象也是 Observable 類(lèi)型,且參數(shù)是 ObservableSource 類(lèi)型,而 Observable 實(shí)現(xiàn)了 ObservableSource 接口。

這種實(shí)現(xiàn)是不是很熟悉,類(lèi)似于代理模式,通過(guò)嵌套同一接口,實(shí)現(xiàn)功能的鏈?zhǔn)蒋B加。也就是說(shuō) subscribeOn() 方法返回了上層 Observable 的代理對(duì)象,我們看下 ObservableSubscribeOn 的源碼:

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); //2

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
            @Override
            public void run() {
                source.subscribe(parent); //1.
            }
        }));
    }

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> actual;

        final AtomicReference<Disposable> s;

        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }

        @Override
        public void onSubscribe(Disposable s) {
            DisposableHelper.setOnce(this.s, s);
        }

        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }

        ...
    }
}

注釋 1 處,在線程調(diào)度之后,ObservableSubscribeOn 最終調(diào)用了上層的 ObservableSource(即原 Observable)。

同樣的,注釋 2 處,Observer 也使用了類(lèi)似的嵌套。

所以對(duì)于基礎(chǔ)模型上加 subscribeOn(),代碼執(zhí)行順序?yàn)椋?/p>

ObservableSubscribeOn.subscribe() → ObservableSubscribeOn.subscribeActual() → ObservableCreate.subscribe() → ObservableCreate.subscribeActual() → ObservableOnSubscribe.subscribe() 你的被觀察者實(shí)現(xiàn) → CreateEmitter.onNext() → SubscribeOnObserver.onNext() → Observer.onNext() 你的觀察者實(shí)現(xiàn)。

從上面分析可以推測(cè),正是這種層層代理實(shí)現(xiàn)了 Rxjava 的鏈?zhǔn)秸{(diào)用,實(shí)際代碼調(diào)用順序可以對(duì)比參考如下模型,在把握宏觀流程的情況下,有助于閱讀代碼細(xì)節(jié),如具體操作符的實(shí)現(xiàn)等。

Rx 鏈?zhǔn)秸{(diào)用模型

簡(jiǎn)單看下 concatMap 的源碼:

public final class ObservableConcatMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends ObservableSource<? extends U>> mapper;
    final int bufferSize;

    final ErrorMode delayErrors;

    public ObservableConcatMap(ObservableSource<T> source, Function<? super T, ? extends ObservableSource<? extends U>> mapper,
            int bufferSize, ErrorMode delayErrors) {
        super(source);
        this.mapper = mapper;
        this.delayErrors = delayErrors;
        this.bufferSize = Math.max(8, bufferSize);
    }
    @Override
    public void subscribeActual(Observer<? super U> s) {

        if (ObservableScalarXMap.tryScalarXMapSubscribe(source, s, mapper)) {
            return;
        }

        if (delayErrors == ErrorMode.IMMEDIATE) {
            SerializedObserver<U> serial = new SerializedObserver<U>(s);
            source.subscribe(new SourceObserver<T, U>(serial, mapper, bufferSize));
        } else {
            source.subscribe(new ConcatMapDelayErrorObserver<T, U>(s, mapper, bufferSize, delayErrors == ErrorMode.END));
        }
    }
    ...

可以看出大同小異遵循上述模式。其余細(xì)節(jié)有興趣可以自行研究。

總結(jié)

閱讀源碼旨在學(xué)習(xí)框架結(jié)構(gòu),從中獲得啟發(fā),細(xì)節(jié)實(shí)現(xiàn)并非研究重點(diǎn)。

源碼閱讀遵循從宏觀到微觀、從簡(jiǎn)單到復(fù)雜逐步拆解分析的原則,千萬(wàn)不要被細(xì)節(jié)所牽絆。

后面接著分析 Retrofit。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容