從源碼分析RxJava訂閱過程

wdroid.jpg

都知道觀察模式吧?

在開始之前讓我們簡單了解一下觀察模式,就是某對象A的變化引起其他多個(gè)對象B變化,但是前提是你需要去訂閱我,打個(gè)比方:就是我的狀態(tài)發(fā)生了改變,那我怎么通知你呢?所以我需要知道的如何去通知其他對象說我這里已經(jīng)改變了,你看看那需不需要做出改變。就比如微信的訂閱號,如果你不訂閱,那該訂閱號在發(fā)布內(nèi)容也不會(huì)通知,這里的訂閱號就是被觀察者,而用戶就是觀察者。那怎么說讓這兩者關(guān)聯(lián)來呢?前面說的訂閱號是要提供一個(gè)接口,允許用戶去訂閱的,所以最后就是被觀察者和觀察者兩個(gè)都得提供接口,訂閱號提供的接口讓用戶去訂閱類比微信號,當(dāng)訂閱號發(fā)布內(nèi)容,就通過這個(gè)微信號通知觀察者,所以訂閱就是這兩者的關(guān)聯(lián)點(diǎn)。

開始之前的兩個(gè)重要的類或接口:ObservableObserver

  • Observable 它實(shí)現(xiàn)ObservableSource接口,通俗來講Observable就是一個(gè)被觀察者也有人叫可觀察的資源,這里就叫被觀察者;
  • Observer 觀察者;
    涉及的類:


    RxJava2.png

訂閱流程分析

開始RxJava的訂閱流程分析之前,來個(gè)簡單的栗子,代碼如下:

Observable<String> observableCreate = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            emitter.onNext("發(fā)射 subscribe");
            emitter.onComplete();
        }
    });//ObservableCreate

    Observable observableSubscribeOn = observableSubscribeOn.subscribeOn(Schedulers.io());//1

    observableSubscribeOn.subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            showLog("onSubscribe");
        }

        @Override
        public void onNext(String s) {
            showLog("onNext");
        }

        @Override
        public void onError(Throwable e) {
            showLog("onError");
        }

        @Override
        public void onComplete() {
            showLog("onComplete");
        }
    });

日志結(jié)果:

onSubscribe ,Thread: main
onNext ,Thread: RxNewThreadScheduler-3
onComplete ,Thread: RxNewThreadScheduler-3

如上代碼,之所以分開來寫是為了更清晰的去理解每一步RxJava生成的相關(guān)類。

如果你認(rèn)真看前面的內(nèi)容,你一下就明白Observable.subscribe()方法也就是訂閱的意思,是 ObservableObserver 的關(guān)聯(lián)點(diǎn),也就是被觀察者和觀察者的關(guān)聯(lián)點(diǎn),所以我們的分析就從Observable.subscribe(Observer observer)方法開始代碼如下:

 public final void subscribe(Observer<? super T> observer) {
    try {

        // .....此處省略幾億代碼....

        //此方法在Observable類是中是抽象的,注定是子類實(shí)現(xiàn)
        subscribeActual(observer);

        // .....此處省略幾億代碼....
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
       // .....此處省略幾億代碼....
    }
}
  • 上面代碼不難理解在subscribe方法中直接就調(diào)用了subscribeActual(observer)方法,我可以翻譯為 實(shí)際訂閱;
  • subscribe方法是Observable類的方法,他是抽象類,傳入了一個(gè) Observer 對象,開始的時(shí)候栗子我們可以知道Observable是通過我們調(diào)用Observable.create(ObservableOnSubscribe) 所創(chuàng)建出來的;
  • 那subscribeActual在Observable中是抽象方法,肯定是子類去實(shí)現(xiàn)了該方法,從第二點(diǎn)知道子類肯定是在Observable.create(ObservableOnSubscribe)中給new出來的,那么接下我們看看Observable.create(ObservableOnSubscribe)方法的實(shí)現(xiàn);

// Observable.create(ObservableOnSubscribe)

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    // .....此處省略幾億代碼....

    // 直接就創(chuàng)建了ObservableCreate,并把source作為參數(shù)傳進(jìn)去
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

// onAssembly

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}

我們從上面代碼我們知道在Observable.create(ObservableOnSubscribe)中直接就創(chuàng)建了ObservableCreate,而ObservableCreate是Observable的子類,并把source作為參數(shù)傳進(jìn)去,最后調(diào)用RxJavaPlugins.onAssembly方法,我們默認(rèn)返回ObservableCreate實(shí)例,所以O(shè)bservable.create方法最后返回的是ObservableCreate實(shí)例,所以就驗(yàn)證了上面的第三點(diǎn)實(shí)際調(diào)用的是ObservableCreate.subscribeActual(observer)方法,這是在不考慮其他變換和線程切換的情況,那我們就來看看ObservableCreate.subscribeActual(observer)方法的實(shí)現(xiàn),代碼如下:

 @Override
protected void subscribeActual(Observer<? super T> observer) {
    //事件發(fā)射器
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    //直接回調(diào)Observer的onSubscribe方法,這個(gè)方法是和線程切換無關(guān),只在當(dāng)前的線程中執(zhí)行
    observer.onSubscribe(parent);

    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

代碼不多,也很好理解:

  • 首先調(diào)用observer.onSubscribe(parent)方法通知Observer已經(jīng)訂閱成功了。
  • 最后調(diào)用source.subscribe(parent)方法完成訂閱,source又是什么呢?我們知道在ObservableCreate是在Observable.create方法時(shí)創(chuàng)建的,并把ObservableOnSubscribe傳進(jìn)來,所以source就是ObservableOnSubscribe,直接回調(diào)ObservableOnSubscribe.subscribe方法并把CreateEmitter作為參數(shù)傳遞進(jìn)去,之后再我們是栗子中通過這個(gè)對象調(diào)研onNext方法或者onComplete方法發(fā)射事件;

看一下CreateEmitter的實(shí)現(xiàn),代碼如下:

static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {

    //通過構(gòu)造方法注入 觀察者實(shí)例
    final Observer<? super T> observer;

    CreateEmitter(Observer<? super T> observer) {
        this.observer = observer;
    }

    @Override
    public void onNext(T t) {
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }

    @Override
    public void onError(Throwable t) {
        if (!tryOnError(t)) {
            RxJavaPlugins.onError(t);
        }
    }

   // .....此處省略幾億代碼....

    @Override
    public void onComplete() {
        if (!isDisposed()) {
            try {
                observer.onComplete();
            } finally {
                dispose();
            }
        }
    }

   // .....此處省略幾億代碼....
}

為了簡介清晰我刪掉很多無關(guān)代碼,只保留onNext等這些相關(guān)的方法。

  • 其實(shí)CreateEmitter是Observable的靜態(tài)內(nèi)部類,
  • 在上面我們知道Observable.subscribeActual方法中創(chuàng)建了CreateEmitter實(shí)例并將Observer作為參數(shù)通過構(gòu)造方法注入Observer實(shí)例,作為CreateEmitter的成員變量;
  • 之后在subscribeActual方法中調(diào)用ObservableOnSubscribe.subscribe的方法并把CreateEmitter實(shí)例作為方法參數(shù)傳遞進(jìn)去;
  • 簡單來說CreateEmitter的作用就是發(fā)射事件,里面分裝了Observer實(shí)例,發(fā)射事件就回調(diào)到Obsever中的方法,如onNext等方法;

有沒有發(fā)現(xiàn)從一開始我們就僅僅講了從Obsevable的創(chuàng)建到訂閱,這是比較漢理解的,如果我增加一個(gè)map或線程切換呢?這里暫時(shí)不展開講線程切換。

重新把栗子的代碼在貼一遍:

Observable<String> observableCreate = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            emitter.onNext("發(fā)射 subscribe");
            emitter.onComplete();
        }
    });//ObservableCreate

    Observable observableSubscribeOn = observableSubscribeOn.map(new Function<String, Object>() {
        @Override
        public Object apply(String s) throws Exception {
            Log.e("tag", "map");
            return "aa";
        }
    })

    observableSubscribeOn.subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            showLog("onSubscribe");
        }

        @Override
        public void onNext(String s) {
            showLog("onNext");
        }

        @Override
        public void onError(Throwable e) {
            showLog("onError");
        }

        @Override
        public void onComplete() {
            showLog("onComplete");
        }
    });

如上代碼,訂閱流程會(huì)和之前的有什么不一樣呢?那么我們看個(gè)究竟,就從 Observable observableSubscribeOn = observableSubscribeOn.subscribeOn(Schedulers.io())開始,代碼如下:

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper){
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    //這里把上游this傳進(jìn)去也就是source,以便調(diào)用上游的subscribe方法
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

從上面代碼看,我們知道在map方法中創(chuàng)建了ObservableMap并把上游的Observable參進(jìn)去了,而我們知道從Observable.subscribe方法開始訂閱就會(huì)調(diào)用 subscribeActual(observer)方法,所以在Observable.subscribe之后就會(huì)調(diào)用ObservableMap的subscribbeActual方法,代碼如下:

 public void subscribeActual(Observer<? super U> t) {
    source.subscribe(new MapObserver<T, U>(t, function));
}

在ObservableMap的subscribbeActual方法中,直接調(diào)用傳進(jìn)來的Observable的subscribe方法又間接調(diào)用subscribbeActual方法沒所以,訂閱的過程實(shí)際上是一樣的。

總結(jié)

  • Observable是由上游往下游傳遞的,并且每個(gè)操作符都會(huì)創(chuàng)建新的Observable對象包裹上游的實(shí)例;
  • Observer是由下游往上游傳遞的,也就是從Observable.subscribe方法開始。

流程圖:


流程圖.png

時(shí)序圖:
rxjava.png

上圖包括訂閱流程、線程切換 以及 事件發(fā)布流傳的過程,非常詳細(xì)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容