RxJava:基本訂閱流程

一個(gè)簡單的實(shí)例:

    //傳參數(shù)url,獲取gif的基礎(chǔ)信息,并傳遞給下游做處理
    private Observable<Bundle> showGifDialog(String url) {
        return Observable.create(new ObservableOnSubscribe<Bundle>() {
            @Override
            public void subscribe(ObservableEmitter<Bundle> e) throws Exception {
                e.onNext(BitmapUtils.getGifBitmapInfo(url));
            }
        }).subscribeOn(Schedulers.io());
    }
    //拿到gif信息后,展示一個(gè)自定義播放gif的Dialog
    Disposable disposable = showGifDialog(picUrl)
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Consumer<Bundle>() {
                @Override
                public void accept(Bundle bundle) throws Exception {
                    //
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    //
                }
            });

我們從Observable的創(chuàng)建開始,分析源碼:

    public static <T> Observable<T> create(ObservableCreate<T>(source)<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));//創(chuàng)建+封裝操作
    }

? Observable.create()方法傳入一個(gè)ObservableOnSubscribe對象作為參數(shù),返回一個(gè)Observable對象,其內(nèi)部核心執(zhí)行的是ObservableCreate<T>(source)。源碼如下:

public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

ObservableCreate是Observable的實(shí)現(xiàn)類,ObservableCreate<T>(source)將傳入的參數(shù)-----ObservableCreate對象賦值給ObservableCreate.source。后續(xù)的一操作是以該source為基礎(chǔ)執(zhí)行的。

? 除去線程切換的部分,Observable創(chuàng)建完,我們需要調(diào)用Observable.subscribe()方法將觀察者訂閱給Observable。

在subscribe()方法內(nèi)部完成真正訂閱之前,Consume將被new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);轉(zhuǎn)換成一個(gè)Observer對象。

? Observable.subscribe()方法如下,它傳入?yún)?shù)的是我們定義的下游觀察者,真正完成訂閱的是subscribeActual(observer)方法,將上游的被觀察者和觀察者結(jié)合起來,這是訂閱的核心步驟。

    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

? 在Observable中,subscribeActual(observer)是一個(gè)抽象方法。我們之前提到,Observable.create()方法內(nèi)部實(shí)際創(chuàng)建的是一個(gè)ObservableCreate對象,所以,在ObservableCreate中找到真正執(zhí)行的subscribeActual方法,如下:

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);//發(fā)射器對象的實(shí)現(xiàn)類
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

? 在subscribeActual()方法中,主要執(zhí)行兩個(gè)操作:

  1. observer.onSubscribe(parent); // 觸發(fā) Observer#onSubscribe(Disposable)

  2. source.subscribe(parent);

    傳入事件發(fā)射器執(zhí)行發(fā)射事件,發(fā)射傳給下游觀察者的數(shù)據(jù),即我們例子中的:

    public void subscribe(ObservableEmitter<Bundle> e) throws Exception {
                    e.onNext(BitmapUtils.getGifBitmapInfo(url));
                }
    

    我們重點(diǎn)關(guān)注的是實(shí)際進(jìn)行發(fā)射事件的CreateEmitter.onNext()方法,它的實(shí)現(xiàn)如下:

        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

? 很簡單,它調(diào)用observer.onNext()方法將我們傳入的Value,傳遞給下游觀察者。而這個(gè)observer,正是Observeable.subscribe(Observer<? super T> observer)方法調(diào)用subscribeActual(observer);時(shí)傳入的。

下一篇:RxJava:線程切換

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容