RxJava 原理分析(一)訂閱關(guān)系的形成

最近復(fù)習(xí)RxJava的時(shí)候發(fā)現(xiàn)解說RxJava的原理不多,所以,來機(jī)智的騙心心來了。

依賴:

implementation "io.reactivex.rxjava2:rxjava:2.2.4"

簡(jiǎn)單使用

待會(huì)兒講解的原理呢,是由這個(gè)使用 demo 來講解。

        Observable.create(new ObservableOnSubscribe<Integer>() {  // 創(chuàng)建被觀察者
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);   // 發(fā)射器,發(fā)射一個(gè)事件
                emitter.onNext(2);
            }
        }).subscribe(new Consumer<Integer>() {  // 觀察者
            @Override
            public void accept(Integer integer) throws Exception {  // 用于處理 onNext 事件
                Log.d(TAG, "accept: "+integer);
            }
        }, new Consumer<Throwable>() {
                    @Override
                    public void accept(Throwable throwable) throws Exception {
                        Log.d(TAG, "accept: "+throwable.getMessage());
                    }
                });

原理解析

Observable.create(ObservableOnSubscribe<T>)

在上面的使用demo中,這是整個(gè)訂閱過程的開始:創(chuàng)建一個(gè)被觀察者。來,我們看一下我們的被觀察者是什么。

Observable.class

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");  // 判空
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));  // 默認(rèn)直接返回這個(gè)new出來的ObservableCreate
    }

這個(gè)ObjectHelper.requireNonNull怎么判空?。?/p>

    public static <T> T requireNonNull(T object, String message) {
        if (object == null) {
            throw new NullPointerException(message);
        }
        return object;
    }

簡(jiǎn)直不要太簡(jiǎn)單,就是判斷輸入的object是不是null,不是就返回,是就拋出異常。

那這個(gè)RxJavaPlugins又是干嘛的呢?用過的都知道,沒用過推薦看一下給初學(xué)者的RxJava2.0教程(十) 。其實(shí)我們沒有對(duì)這個(gè) RxJavaPlugins 做設(shè)置的話,就是返回我們傳進(jìn)來的這個(gè)參數(shù)。后面我們看到這個(gè)RxJavaPlugins就可以默認(rèn)為直接返回參數(shù)。

書接上文,那調(diào)用 Observable.create(ObservableOnSubscribe) ,豈不是就是把自己在外部實(shí)現(xiàn)的 ObservableOnSubscribe 匿名內(nèi)部類包裝到 ObservableCreate 中去羅。

public final class ObservableCreate<T> extends Observable<T> {  // 注意繼承關(guān)系哦
    final ObservableOnSubscribe<T> source;  // 最原始的目標(biāo):被觀察者

    public ObservableCreate(ObservableOnSubscribe<T> source) { 
        this.source = source;  // 傳進(jìn)來的是外面new的匿名內(nèi)部類ObservableOnSubscribe
    }
...
}

看到這里,emmm,nice,保存住了被觀察者,返回了ObservableCreate 對(duì)象。注意哦,這個(gè)時(shí)候后面一個(gè) . 操作的就是這個(gè) ObservableCreate對(duì)象(換言之,后面點(diǎn)的就是ObservableCreate中的方法)。

好的,下面就是 .subscribe

.subscribe

這個(gè)方法就很有意思了,實(shí)現(xiàn)了訂閱這個(gè)操作,或者說,是觸發(fā)了onSuscribe、onNext 等操作。

先看一下 subscribe 這個(gè)方法的重載:

    public final Disposable subscribe() {}   // 注意哦,這里的都是final方法
    public final Disposable subscribe(Consumer<? super T> onNext) {}
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} 
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {}
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {}
    public final void subscribe(Observer<? super T> observer) {}

有點(diǎn)多哈,但是實(shí)現(xiàn)呢?很簡(jiǎn)單,大家都調(diào)用最后一個(gè)處理,沒有傳參的都添上默認(rèn)的 Custom 就可以了。什么叫默認(rèn)實(shí)現(xiàn),就是實(shí)現(xiàn)了接口,但是默認(rèn)不處理,比如默認(rèn)的Action,還有就是簡(jiǎn)單處理,比如onError,但是都不是很重要,我們主要關(guān)注我們自己實(shí)現(xiàn)的東西嘛。

    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
        return subscribe(onNext, onError, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Disposable> onSubscribe) {
        ObjectHelper.requireNonNull(onNext, "onNext is null");  // 挨個(gè)判空
        ObjectHelper.requireNonNull(onError, "onError is null");
        ObjectHelper.requireNonNull(onComplete, "onComplete is null");
        ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

        LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);  // 用一個(gè)類來包裹

        subscribe(ls);  // 包裹類的訂閱

        return ls;
    }

那么這個(gè)用來包裹的 LambdaObserver 又是什么鬼呢?

public final class LambdaObserver<T> extends AtomicReference<Disposable>
        implements Observer<T>, Disposable, LambdaConsumerIntrospection {

    private static final long serialVersionUID = -7251123623727029452L;
    final Consumer<? super T> onNext;
    final Consumer<? super Throwable> onError;
    final Action onComplete;
    final Consumer<? super Disposable> onSubscribe;

    public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete,
            Consumer<? super Disposable> onSubscribe) {
        super();
        this.onNext = onNext;
        this.onError = onError;
        this.onComplete = onComplete;
        this.onSubscribe = onSubscribe;
    }
...
}

就是將四個(gè)類型(onNext,onError,onComplete,onSubscribe)封裝成一個(gè)一個(gè)對(duì)象,方便對(duì)整個(gè)流程的調(diào)用。

將四個(gè)對(duì)象封裝在一起了過后,就是應(yīng)該是訂閱了把?


    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);  // 檢查需要執(zhí)行預(yù)操作不,默認(rèn)返回 observer
            // 判空
            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
    
            subscribeActual(observer);  // 真正處理的函數(shù)
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);  // 錯(cuò)誤的話,捕獲,并交給RxJavaPlugins#onError來處理,沒有交給我們自定義的onError,莫看錯(cuò)啦

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

真正有用的就是 subscribeActual(observer);這一句,吐血哦,還不訂閱,我都快等不及了。好嘛,大神就是大神,一看,自己沒實(shí)現(xiàn)。后面一想,就是牛逼。

    /**
     * Operator implementations (both source and intermediate) should implement this method that
     * performs the necessary business logic and handles the incoming {@link Observer}s.
     * <p>There is no need to call any of the plugin hooks on the current {@code Observable} instance or
     * the {@code Observer}; all hooks and basic safeguards have been
     * applied by {@link #subscribe(Observer)} before this method gets called.
     * @param observer the incoming Observer, never null
     */
    protected abstract void subscribeActual(Observer<? super T> observer);

所有的操作符都會(huì)重寫這個(gè)方法,我是不是暴露了什么,emm,所有運(yùn)算符返回都是 Observable 的子類,就比如說 create 返回的就是 Observable 的子類 ObservableCreate,后面就是在這基礎(chǔ)上調(diào)用了。這里我們可以看到,其實(shí)所有的 subscribe 我們都重寫不了,唯一能重寫且必須重寫的就是這個(gè) subscribeActual。這就意味著,這個(gè)方法是我子類實(shí)現(xiàn)真正訂閱的入口。

前面講解我們知道,現(xiàn)在在demo中調(diào)用的是 ObservableCreate 的 subscribe,那么事情就變得很簡(jiǎn)單,我們直接找到 ObservableCreate#subscribeActual(observer) 進(jìn)行分析就好。在分析之前,先用圖總結(jié)下前面的東西:


好的嘞,那我們接下來就看一下核心的 subscribeActual 方法如何實(shí)現(xiàn)的?

    @Override
    protected void subscribeActual(Observer<? super T> observer) {  //傳進(jìn)來的就是前面的 LambdaObserver
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);  // 發(fā)射器,每一個(gè)操作符對(duì)應(yīng)的類內(nèi)部都會(huì)自己實(shí)現(xiàn),因?yàn)樘幚磉壿嫴灰粯影?        observer.onSubscribe(parent);  // 回調(diào)了自己實(shí)現(xiàn)的 onSubscribe 對(duì)用的Consumer

        try {
            source.subscribe(parent);   // 這個(gè)source就是初始化類的時(shí)候傳進(jìn)來的被觀察者,這里將發(fā)送器給了它
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);  // 這里捕獲的異常才交給自己寫的Consumer處理
        }
    }

這里代碼不多,干的事還不少,首先是new出我們的發(fā)射器,然后觸發(fā)被觀察者的訂閱回調(diào),然后再,執(zhí)行被觀察者的subscribe方法,如果捕獲到異常就交給自定義的 onError 處理。

有一個(gè)概念可以先了解,無論以后把這個(gè)對(duì)象怎么包裹,傳遞,只有這里才調(diào)用了被觀察者 Observable 的 subscribe 方法。

接下來我們來詳解一下,這個(gè)回調(diào)流程。
observer.onSubscribe(parent); 這里的 observer 是什么?傳進(jìn)來的 包裝有自定義的觀察者的 LambdaObserver ,他是 Observable 的子類。接下來看一下他的調(diào)用:

    @Override
    public void onSubscribe(Disposable d) {
        if (DisposableHelper.setOnce(this, d)) {  // 設(shè)置并判斷是否是第一次
            try {
                onSubscribe.accept(this);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                d.dispose();   // 出現(xiàn)異常,解除訂閱
                onError(ex);   // 并向下傳遞
            }
        }
    }

    @Override
    public void onError(Throwable t) {
        if (!isDisposed()) {   // 判斷是否斷開連接
            lazySet(DisposableHelper.DISPOSED);
            try {
                onError.accept(t);   //交給自定義的消費(fèi)者
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                RxJavaPlugins.onError(new CompositeException(t, e));
            }
        } else {
            RxJavaPlugins.onError(t);
        }
    }

很簡(jiǎn)單,對(duì)不對(duì),就是直接調(diào)用了 onNext 消費(fèi)者的 accept 方法(自定義或者默認(rèn))。出現(xiàn)異常,如果沒有斷開連接,且是第一次接收到,就交給 onError (自定義或者默認(rèn))處理。

那么 source.subscribe(parent);不會(huì)也這么簡(jiǎn)單把?
source 就是傳進(jìn)來的 ObservableOnSubscribe,自定義的被觀察者
parent 就是發(fā)射器。這一調(diào)用,我們自己寫的被觀察者的邏輯就巴拉巴拉的執(zhí)行了

這里就是直接就回調(diào)了。嚇人。。

parent.onError(ex); 怎么實(shí)現(xiàn)的?這就要真正說道說道這個(gè)發(fā)射器了。

發(fā)送器是什么狗東西呢?它是ObservableCreate的靜態(tài)內(nèi)部類:

    static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {

        final Observer<? super T> observer;  // 觀察者,就是傳進(jìn)來的 LambdaObserver

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;  
        }

        @Override
        public void onNext(T t) {  
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {   // 是否已經(jīng)斷開連接
                observer.onNext(t);  // 調(diào)用自定義的Custom回調(diào)處理
            }
        }

        @Override
        public void onError(Throwable t) { 
            if (!tryOnError(t)) {  // 交給觀察者
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {  //沒斷開,不為null
                try {
                    observer.onError(t);
                } finally {
                    dispose();  //失敗直接斷開連接
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();  // 交給觀察者
                } finally {
                    dispose();
                }
            }
        }

        @Override
        public void setDisposable(Disposable d) {   DisposableHelper.set(this, d);   }

        @Override
        public void dispose() {    DisposableHelper.dispose(this);    }

        @Override
        public boolean isDisposed() {    return DisposableHelper.isDisposed(get());   }

    }

咦,這結(jié)構(gòu)簡(jiǎn)直不要太簡(jiǎn)單,就相當(dāng)于一次中轉(zhuǎn),只不過是加入了一些容錯(cuò)機(jī)制。到這里整個(gè)流程要用到的類就結(jié)束了,當(dāng)我們?cè)跁鴮懕挥^察者的時(shí)候,使用這個(gè)發(fā)射器提交東西,就是直接調(diào)用這個(gè)東西來調(diào)用消費(fèi)者的對(duì)應(yīng)回調(diào)。

千言不如一圖


不對(duì),放錯(cuò)了


  1. 創(chuàng)建真正的被觀察者包裹對(duì)象(繼承于 Observable),并將自己寫的真正被觀察者包裹起來
  2. 調(diào)用被觀察者的subscribe方法,將自己創(chuàng)建的觀察者包裹起來
  3. 作為參數(shù),傳入ObservableCreate 對(duì)方實(shí)現(xiàn)的 subscribeActual(Observer) 中。
  4. 在 subscribeActual 方法中生成 emitter,并且回調(diào)被觀察者的onSubscribe,確定連接關(guān)系。
  5. 自己寫的被觀察者的 subscribe 調(diào)用,我們可以使用 emitter 提交東西,一提交就將提交的東西交給 被觀察者對(duì)應(yīng)的方法執(zhí)行
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 0.版權(quán)聲明 本文由玉剛說寫作平臺(tái)提供寫作贊助,版權(quán)歸玉剛說微信公眾號(hào)所有原作者:四月葡萄版權(quán)聲明:未經(jīng)玉剛說許可...
    四月葡萄閱讀 819評(píng)論 0 3
  • 我從去年開始使用 RxJava ,到現(xiàn)在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    Jason_andy閱讀 5,759評(píng)論 7 62
  • ??樓主最近在找實(shí)習(xí)工作,由于簡(jiǎn)歷上說了解RxJava,所以在面試的時(shí)候應(yīng)該會(huì)問到RxJava的知識(shí),于是樓主結(jié)合...
    瓊珶和予閱讀 3,220評(píng)論 3 5
  • 對(duì)于Rxjava想必大家都很熟悉了,這里不再贅述什么是Rxjava。 今天的主題是:從源碼角度(2.0)分析,Rx...
    sjandroid閱讀 1,129評(píng)論 0 2
  • 我從去年開始使用 RxJava ,到現(xiàn)在一年多了。今年加入了 Flipboard 后,看到 Flipboard 的...
    huqj閱讀 1,983評(píng)論 0 21

友情鏈接更多精彩內(nèi)容