RxJava源碼分析--S型調(diào)用整體流程

簡介

這篇RxJava源碼分析非常精簡,僅通過最簡單的調(diào)用方式對RxJava的整個調(diào)用流程做深入剖析,未貼大量源碼,需要結合源碼,對比流程分析,跟上思路。

先看下簡單使用

Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(ObservableEmitter<Object> emitter) throws Exception {
                emitter.onNext("發(fā)射一條消息");
            }
        }).subscribe(new Observer<Object>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Object value) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        });

調(diào)用Observable的create時,返回一個ObservableCreate并傳入ObservableOnSubscribe(即source)



@SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

鏈式調(diào)用Observable的訂閱方法subscribe,傳入觀察者Observer


@SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

再調(diào)用ObservableCreate的subscribeActual方法,可以看到創(chuàng)建了一個發(fā)射器其入?yún)⑹怯^察者,然后立馬調(diào)用了觀察者的onSubscribe方法,最終再調(diào)用我們最初傳入的source的subscribe方法。


整體流程詳盡分析

Observable
                .create(new ObservableOnSubscribe<Integer>() {
                    @Override
                    public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                        emitter.onNext(99);
                    }
                })
                .map(new Function<Integer, String>() {
                    @Override
                    public String apply(Integer integer) throws Exception {
                        if (integer == 99){
                            return "一切正常";
                        }
                        return "";
                    }
                })
                .subscribe(new Observer<String>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(String value) {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

通過這段調(diào)用代碼,對源碼調(diào)用流程進行深入分析,精心繪制流程圖如下:


image.png

整體流程概述,大概可分為三個階段:

  • 創(chuàng)建階段(從上向下創(chuàng)建):從Observable.create開始,創(chuàng)建一個ObservableCreate(傳入一個發(fā)射消息源source即為ObservableOnSubscribe)返回-->然后再通過返回的ObservableCreate的.map創(chuàng)建一個ObserverableMap(并將自己傳入作為ObserverableMap的source),返回一個ObserverableMap。
  • 訂閱階段(從下向上訂閱):這時候來到最后的.subscribe(傳入終點觀察者Observer)(即ObserverableMap的subscribe())時,會進而調(diào)用它的subscribeActual(),其內(nèi)部則又會創(chuàng)建一個MapObserver(并傳入終點觀察者Observer),調(diào)用創(chuàng)建階段傳入的source的.subscribe(傳入MapObserver),而這個source即為ObservableCreate --> 接著ObservableCreate也一樣調(diào)用它的subscribeActual(),內(nèi)部創(chuàng)建一個發(fā)射器CreateEmitter(并傳入MapObserver),再調(diào)用之前傳入的source的.subscribe(傳入發(fā)射器),這個source其實就是最初通過Observable.create傳入ObservableOnSubscribe
  • 發(fā)射階段(從上向下分發(fā)):又回到了開始調(diào)用ObservableOnSubscribe.subscribe(),內(nèi)部調(diào)用了emitter.onNext();這個CreateEmitter就是剛剛我們訂閱時在ObservableCreate中創(chuàng)建的,它的onNext中又會調(diào)用我們創(chuàng)建發(fā)射器時傳入的observer(即為MapObserver)的onNext(),而MapObserver.onNext()內(nèi)部會通過創(chuàng)建ObserverableMap時傳入的function進行轉(zhuǎn)換,最后再調(diào)用真正的observer.onNext(),即最終的Observer。

如果上面明白了,加入線程切換其實就非常簡單了,根據(jù)整體調(diào)用流向加入中間層。

.subscribeOn(Schedulers.io())
//Schedulers.io()返回一個線程池
.observeOn(AndroidSchedulers.mainThread())
//AndroidSchedulers.mainThread()返回一個持有主線程Handler

如上.subscribeOn()也只是包裹了一層Observerable而已,當向上訂閱調(diào)用上層的source.subscribe()時,會把這句放入子線程執(zhí)行,所以從此后向上訂閱以及再向下分發(fā)的部分都是在子線程;
而.observeOn同樣也只是包裝了一層Observerable,當向下發(fā)射調(diào)用到observer.onNext時會通過Handler切換到主線程,后續(xù)的分發(fā)就都切換到主線程了。

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容