RxJava中實(shí)現(xiàn)觀察者模式的原理

前言:

    本文所有代碼基于RxJava 2.2.3和RxAndroid 2.1.0,并使用kotlin來介紹RxJava中常用的操作符的用法及效果。

第一步:創(chuàng)建被觀察者Observable。
1、 create其實(shí)就是Observable里的一個(gè)靜態(tài)方法用于創(chuàng)建一個(gè)Observable實(shí)例。在下面這段代碼中我們主動(dòng)創(chuàng)建了兩個(gè)對象實(shí)例,一個(gè)是由Observable.create方法返回的Observable實(shí)例,一個(gè)是傳入create方法的參數(shù)ObservableOnSubscribe匿名對象并覆寫了定義事件發(fā)送順序的方法 ObservableOnSubscribe.subscribe。

         //第一步
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter emitter) throws Exception {
                Log.e("Create","ObservableOnSubscribe對象的subscribe方法被調(diào)用");
                emitter.onNext("數(shù)據(jù)1");
                emitter.onNext("數(shù)據(jù)2");
                emitter.onNext("數(shù)據(jù)3");
                emitter.onNext("數(shù)據(jù)4");
                emitter.onError(new Exception());
                emitter.onComplete();
            }
        });

2、Observable的create方法,該方法在這種情況下實(shí)際上返回的就是新new的 ObservableCreate對象,并且會將ObservableOnSubscribe對象賦值給其內(nèi)部的成員變量source

    //Observable的create方法
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }
    //ObservableCreate類的成員變量及其構(gòu)造函數(shù) create傳入的參數(shù)ObservableOnSubscribe對象最終賦值給了source
   final ObservableOnSubscribe<T> source;
   public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
    }
    
   //RxJavaPlugins.onAssembly方法
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {//此時(shí) f == null 所以會返回source即傳入的參數(shù)不會有其他操作
            return apply(f, source);
        }
        return source;
    }

第一步結(jié)論:創(chuàng)建了一個(gè)ObservableCreate對象,并將決定訂閱時(shí)事件調(diào)動(dòng)順序的ObservableOnSubscribe對象賦值給其成員變量source。

第二步:創(chuàng)建觀察者Observer

        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e("Create","Observer的onSubscribe方法");
            }

            @Override
            public void onNext(String s) {
                Log.e("Create","Observer的onNext方法 收到的參數(shù)為:"+s);
            }

            @Override
            public void onError(Throwable e) {
                Log.e("Create","Observer的onError方法");
            }

            @Override
            public void onComplete() {
                Log.e("Create","Observer的onComplete方法");
            }
        };
第二步結(jié)論:第二步就是單純的創(chuàng)建了一個(gè)觀察者,并覆寫如上所示的四個(gè)方法。

第三步:讓被觀察者(Observable)和觀察者(Observer)之間產(chǎn)生聯(lián)系

//觀察者訂閱被觀察者 兩者關(guān)系發(fā)生
 observable.subscribe(observer);

1、訂閱之后立即調(diào)用被觀察者(observable)的subscribe方法。該方法中調(diào)用了抽象方法subscribeActual(observer);并將觀察者(observer)作為參數(shù)傳入傳入。

    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);
            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned 
            a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe 
            for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
            //核心代碼是這句這個(gè)方法在Observable類中是個(gè)抽象方法第一步創(chuàng)建的ObservableCreate對象,覆寫了該方法,該方法的參數(shù)為
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

2、當(dāng)這個(gè)創(chuàng)建的Observable對象調(diào)用subscribe方法時(shí),subscribe方法會調(diào)用ObservableCreate類中覆寫的subscrubeActual方法,subscrubeActual的代碼如下:

   
    protected void subscribeActual(Observer<? super T> observer) {
        //將訂閱的Observer對象進(jìn)行封裝到CreateEmitter中,作為source調(diào)用subscribe方法時(shí)的參數(shù)
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
        try {
            //source即為調(diào)用create操作符時(shí)傳入的用于定義調(diào)用順序的ObservableOnSubscribe對象,在此處調(diào)用了其subscribe方法
           //傳入的參數(shù)是上面封裝了的CreateEmitter對象,所以我們在外面定義的調(diào)用順序就是調(diào)用的CreateEmitter的方法如onNext,
          //onError,onComplete等方法
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

3、再看看CreateEmitter中對onNext,onError,onComplete的封裝。三個(gè)方法都調(diào)用了觀察者(observer)的相應(yīng)方法,但是在調(diào)用前都對其進(jìn)行了處理,判斷其是否被取消。若調(diào)用了onError或者onComplete中的任意一個(gè)方法時(shí),就會被取消,即用dispose()方法,這時(shí) !isDisposed())的值就為false了。所以后續(xù)的觀察者的其他方法都不可調(diào)用了。

        //observer對象即為我們在第二步創(chuàng)建的觀察者(observer)
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally
                                                   not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not 
                                                                allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }

        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

第三步總結(jié):被觀察者(observable)調(diào)用subscribe方法,此方法最終將觀察者(observer)對象封裝成Emitter并作為參數(shù),調(diào)用ObservableCreate中source即第一步創(chuàng)建時(shí)傳入的參數(shù)的subscribe方法。即實(shí)現(xiàn)了調(diào)用第一步時(shí)定義的調(diào)用順序。而Emitter的封裝使onError及onComplete的調(diào)用做了限制,保證了onError和onComplete方法只能調(diào)用其中一個(gè)。

最后總結(jié):

上述一系列操作,可歸結(jié)為以下偽代碼:

     //規(guī)定調(diào)用順序
       ObservableOnSubscribe<String> source = new ObservableOnSubscribe<String>(){
           @Override
           public void subscribe(ObservableEmitter emitter) throws Exception {
               Log.e("Create","ObservableOnSubscribe對象的subscribe方法被調(diào)用");
               emitter.onNext("數(shù)據(jù)1");
               emitter.onNext("數(shù)據(jù)2");
               emitter.onNext("數(shù)據(jù)3");
               emitter.onNext("數(shù)據(jù)4");
               emitter.onError(new Exception());
               emitter.onComplete();
           }
       };
   //創(chuàng)建被觀察者
       Observable<String> observable = new ObservableCreate<>(source);
       observable.source = source;
     //以上為第一步
//-------------------------------------------------------------------------
       Observer<String> observer = new Observer<String>();
//創(chuàng)建觀察者這是第二步
//---------------------------------------------------------------------------
//調(diào)用observable.subscribe(observer)
       Emitter<String> emitter = new EmitterCreate(observer);
       observable.source.subscribe(emitter);
//第三步訂閱
//-----------------------------------------------------------------------------

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容