RxJava2源碼淺析(一)

前言

我們經(jīng)常看RxJava的文章,很多都是API性的介紹.今天我們就用一段來(lái)理解它吧,了解它的內(nèi)幕

本文編譯需要:
compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

public static void debug() {
   1.     Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
   2.        @Override
   3.         public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
   4.            emitter.onNext(1);
   5.        }
   6.     });
   7.     Consumer<Integer> consumer = new Consumer<Integer>() {
   8.         @Override
   9.        public void accept(Integer integer) throws Exception {
 10.               Log.d(TAG, "accept: " + integer);
 11.          }
 12.     };
 13.    observable.subscribe(consumer);
}

這篇文章的目的就是分析這段代碼,去挖掘RxJava到底做了什么

讓我們開(kāi)始吧

我們都知道RxJava最重要的東西就是Obervable(被觀察者)和Observer(觀察者), 或者針對(duì)事件來(lái)說(shuō)就是上游和下游, 上游發(fā)送事件下游處理事件. 為了便于分析,我把代碼標(biāo)了行號(hào),便于引用.

粗略的分析上面這段代碼就是一個(gè)Observable訂閱了一個(gè)Consumer

弄清Obervable的來(lái)源

來(lái)看Observable.create() 方法的原型:

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

它的參數(shù)是ObservableOnSubscribe類型,它只有一個(gè)方法待實(shí)現(xiàn):

void subscribe(ObservableEmitter<T> e) throws Exception;

為什么需要ObservableEmitter形參呢.我們不用管,因?yàn)楸籲ew進(jìn)ObservableCreate構(gòu)造函數(shù)里去了,肯定是回調(diào)作用,這在我們平時(shí)寫(xiě)代碼時(shí)見(jiàn)多了.實(shí)際上我們點(diǎn)進(jìn)ObservableCreate里看到ObservableEmitter是一個(gè)內(nèi)部類, 那問(wèn)題是RxJavaPlugins.onAssembly()又是干嘛的,

public static <T> Observable<T> onAssembly(Observable<T> source) {
        Function<Observable, Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

可以看到把onObservableAssembly賦給f, onObservableAssembly是什么 , 其實(shí)就是一個(gè)屬性而已, 含有g(shù)etter和setter.而我們也沒(méi)看到調(diào)用set函數(shù),所以該函數(shù)就直接返回了source,所以我們得到最終的Observable就是ObservableCreate對(duì)象.為什么用這個(gè)代替Observabel呢,看下類聲明

public final class ObservableCreate<T> extends Observable<T> {
...
}

原來(lái)就是Observable的子類.
到了這里,終于清楚了Observable的來(lái)龍去脈.

弄清Observer的來(lái)源

我們看到這片代碼用的是Consumer,它跟Observer有什么關(guān)系呢. 看下它的聲明

public interface Consumer<T> {
    /**
     * Consume the given value.
     * @param t the value
     * @throws Exception on error
     */
    void accept(T t) throws Exception;
}

這不就是一個(gè)普普通通的接口嘛,實(shí)現(xiàn)accept的方法就ok了,是的,consumer就是這么簡(jiǎn)單

是時(shí)候讓Observable和Consumer發(fā)生關(guān)聯(lián)了

只剩下observable.subscribe(consumer); 這句了, 顧名思義,被觀察者訂閱了消費(fèi)者(觀察者),我們跟進(jìn)去看看代碼怎么寫(xiě)的

@SchedulerSupport(SchedulerSupport.NONE)
    public final Disposable subscribe(Consumer<? super T> onNext) {
        return subscribe(onNext, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }

它調(diào)用了subscribe方法,用了我們傳進(jìn)去的consumer對(duì)象即onNext,而后面三個(gè)參數(shù)是系統(tǒng)提供默認(rèn)的實(shí)現(xiàn),這也說(shuō)明,肯定還能傳進(jìn)我們自己想要的參數(shù),先不管.繼續(xù)跟:

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Disposable> onSubscribe) {
        ObjectHelper.requireNonNull(onNext, "onNext is null");
        ObjectHelper.requireNonNull(onError, "onError is null");
        ObjectHelper.requireNonNull(onComplete, "onComplete is null");
        ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

        LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

        subscribe(ls);

        return ls;
    }

這片代碼先是進(jìn)行了一些非空判斷,然后把我們的onNext轉(zhuǎn)換成LambdaObserver對(duì)象了,LambdaObserver是什么呢:

public final class LambdaObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
...
}

它實(shí)現(xiàn)了Observer, 到了這里,終于明白了,原來(lái)我們傳進(jìn)來(lái)的Consumer被處理成LambdaObserver對(duì)象了,繼續(xù)跟進(jìn)subscribe(ls);

public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

記住,我們手里現(xiàn)在握著的Observable對(duì)象是什么,Observer是什么, 當(dāng)然是ObservableCreate和LambdaObserver的對(duì)象.

我們又看到了RxJavaPlugins的調(diào)用,跟我們上一步講的onAssembly是類似的,直接返回了observer.往下就是一個(gè)非空的判斷了.接著是subscribeActual方法的調(diào)用.
看下改方法的聲明:

protected abstract void subscribeActual(Observer<? super T> observer);

它是Observable類的一個(gè)抽象方法,問(wèn)題是誰(shuí)來(lái)實(shí)現(xiàn)它,就是ObservableCreate了.因?yàn)槭撬麑?shí)現(xiàn)了Observable.

public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

這是ObservableCreate的代碼片段.看到它的構(gòu)造函數(shù)沒(méi),參數(shù)source不就是我們開(kāi)始傳進(jìn)來(lái)的嘛.
繼續(xù)看subscribeActual方法, 第一行用我們的LambdaObserver對(duì)象為參,構(gòu)造了CreateEmitter對(duì)象,繼續(xù)跟進(jìn). 因?yàn)閛bserver被LambdaObserver實(shí)現(xiàn)了,所以這里當(dāng)然跳進(jìn)它的onSubscribe方法

public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete,
            Consumer<? super Disposable> onSubscribe) {
        super();
        this.onNext = onNext;
        this.onError = onError;
        this.onComplete = onComplete;
        this.onSubscribe = onSubscribe;
    }
@Override
    public void onSubscribe(Disposable s) {
        if (DisposableHelper.setOnce(this, s)) {
            try {
                onSubscribe.accept(this);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                onError(ex);
            }
        }
    }

這里先是用一個(gè)幫助類判斷了一下,下一步的onSubscribe是什么呢,我們看到它的構(gòu)造函數(shù),就是我們之前講的系統(tǒng)給我們提供的默認(rèn)實(shí)現(xiàn),默認(rèn)實(shí)現(xiàn)是空實(shí)現(xiàn).所以這個(gè)方法我們認(rèn)為什么都沒(méi)做.但是,如果onSubscribe不是系統(tǒng)提供的呢,那么它的執(zhí)行時(shí)間點(diǎn)是比onNext早的.
我們繼續(xù)跟進(jìn)subscribeActual方法.現(xiàn)在要處理try塊了

source.subscribe(parent);

注意這個(gè)source是ObservableOnSubscribe對(duì)象.這個(gè)對(duì)象又是啥呢?這就不是我們一開(kāi)始new出來(lái)的嘛

      ...    new ObservableOnSubscribe<Integer>() {
   2.        @Override
   3.         public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
   4.            emitter.onNext(1);
   5.        }
   6.     }

參數(shù)剛好也是ObservableEmitter, 第4行的emitter參數(shù)就是

CreateEmitter<T> parent = new CreateEmitter<T>(observer);

傳進(jìn)去的. 緊跟著emitter.onNext(1);就是parent對(duì)象的事了

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {


        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
...

還記得吧,這里的observer就是LambdaObserver對(duì)象也就是一開(kāi)始的Consumer了.
isDisposed()是什么,表示是否取消被觀察者和觀察者的關(guān)聯(lián),我們一路跟進(jìn)來(lái),沒(méi)有關(guān)于它的操作,肯定返回true
然后就是observer.onNext(t);即回調(diào)了Consumer的onNext方法.

結(jié)束

到了這里一個(gè)簡(jiǎn)單的流程終于完成了,謝謝大家!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容