Rxjava2源碼淺析(一)

面試的時(shí)候被問(wèn)道各種框架的原理架構(gòu),也是很尷尬,自以為寫(xiě)的代碼不少,用過(guò)的框架也不少,深入的去研究源碼的還真是不多,也是給自己敲了一個(gè)警鐘,今天就來(lái)嘗試剖析一下Rxjava2的源碼,水平有限,就先看一下基礎(chǔ)的用法相關(guān),一些難度更高的操作符就慢慢來(lái)分析吧。
就按照平時(shí)使用的順序來(lái)分析:

一、初始化Observerble

基本使用實(shí)例:

Observable<String> observable= Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("aaa");
            }
        });

先看一下內(nèi)部的參數(shù) ObservableOnSubscribe<>() 。

public interface ObservableOnSubscribe<T> {
    void subscribe(ObservableEmitter<T> e) throws Exception;
}

就是一個(gè)接口,這里用的就是它的一個(gè)匿名實(shí)現(xiàn)類。而接口內(nèi)部的方法中我們看到ObservableEmitter<> 是一個(gè)Rxjava2新推出的類,俗稱發(fā)射器。

public interface ObservableEmitter<T> extends Emitter<T> {

    /**
     * Sets a Disposable on this emitter; any previous Disposable
     * or Cancellation will be unsubscribed/cancelled.
     * @param d the disposable, null is allowed
     */
    void setDisposable(Disposable d);

    /**
     * Sets a Cancellable on this emitter; any previous Disposable
     * or Cancellation will be unsubscribed/cancelled.
     * @param c the cancellable resource, null is allowed
     */
    void setCancellable(Cancellable c);

    /**
     * Returns true if the downstream disposed the sequence.
     * @return true if the downstream disposed the sequence
     */
    boolean isDisposed();

    /**
     * Ensures that calls to onNext, onError and onComplete are properly serialized.
     * @return the serialized ObservableEmitter
     */
    ObservableEmitter<T> serialize();
}

這里面幾個(gè)回調(diào)方法的作用注釋也說(shuō)的很清楚了就不多說(shuō)了。
它繼承自Emitter

public interface Emitter<T> {

    /**
     * Signal a normal value.
     * @param value the value to signal, not null
     */
    void onNext(@NonNull T value);

    /**
     * Signal a Throwable exception.
     * @param error the Throwable to signal, not null
     */
    void onError(@NonNull Throwable error);

    /**
     * Signal a completion.
     */
    void onComplete();
}

可以看到,這里面就是我們比較熟悉的next、complete、error三個(gè)回調(diào)方法了。其實(shí)這個(gè)create方法內(nèi)部的參數(shù)就是兩個(gè)接口的回調(diào),理解就行了,然后看一下create方法。

 public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

requireNonNull是很好理解的,看名字也能猜測(cè)出是測(cè)試傳進(jìn)來(lái)的ObservableOnSubscribe是否為空

  public static <T> T requireNonNull(T object, String message) {
        if (object == null) {
            throw new NullPointerException(message);
        }
        return object;
    }

而源碼也驗(yàn)證了我們的想法。關(guān)鍵是后面一句,先看一下具體的方法實(shí)現(xiàn)。

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

一句一句來(lái)分析:

public interface Function<T, R> {
    /**
     * Apply some calculation to the input value and return some other value.
     * @param t the input value
     * @return the output value
     * @throws Exception on error
     */
    @NonNull
    R apply(@NonNull T t) throws Exception;
}

這里的Function也是一個(gè)接口,作用也很明顯,將T類型的數(shù)據(jù)轉(zhuǎn)化成R類型數(shù)據(jù)。那是我們?cè)谑褂玫?/p>

        observable.map(new Function<String, Object>() {
            @Override
            public Object apply(@NonNull String s) throws Exception {
                return null;
            }
        })

類似這種類型轉(zhuǎn)換的語(yǔ)句時(shí)候才會(huì)用到,這里我們先不管它,一開(kāi)始是默認(rèn)為null的,所以這個(gè)方法最后就會(huì)return source;就是將括號(hào)中的new ObservableCreate<T>(source)原樣返回。這個(gè)ObservableCreate又是什么呢?

public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

源碼比較長(zhǎng)我們就之看一下它的構(gòu)造函數(shù)就可以了,目前只需要知道這是一個(gè)Observerble的子類就可以了,至于Observerble這個(gè)類,等到大概摸清楚了事件流程再回頭來(lái)分析。所以到現(xiàn)在我們的第一步初始化就算是分析完了流程。

二、初始化一個(gè)Observer

用法示例:

Observer<String> observer=new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                subscription=d;
            }

            @Override
            public void onNext(String value) {
                LogUtil.log(TAG,"  "+value);
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
                LogUtil.log(TAG,"complete");
            }
        };

這個(gè)分析就要簡(jiǎn)單很多了,Observer只是一個(gè)簡(jiǎn)單的接口,這里也只是具體實(shí)現(xiàn)了一下接口回調(diào)。

public interface Observer<T> {

    /**
     * Provides the Observer with the means of cancelling (disposing) the
     * connection (channel) with the Observable in both
     * synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
     * @param d the Disposable instance whose {@link Disposable#dispose()} can
     * be called anytime to cancel the connection
     * @since 2.0
     */
    void onSubscribe(Disposable d);

    /**
     * Provides the Observer with a new item to observe.
     * <p>
     * The {@link Observable} may call this method 0 or more times.
     * <p>
     * The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
     * {@link #onError}.
     *
     * @param t
     *          the item emitted by the Observable
     */
    void onNext(T t);

    /**
     * Notifies the Observer that the {@link Observable} has experienced an error condition.
     * <p>
     * If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
     * {@link #onComplete}.
     *
     * @param e
     *          the exception encountered by the Observable
     */
    void onError(Throwable e);

    /**
     * Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
     * <p>
     * The {@link Observable} will not call this method if it calls {@link #onError}.
     */
    void onComplete();

}

不過(guò)這里和Rxjava1也是有些區(qū)別的,多了一個(gè)onSubscribe 注釋也說(shuō)的很清楚,用于隨時(shí)取消訂閱。
第二步很輕松,下面看一下第三步

三、建立訂閱關(guān)系

用法示例:

observable.subscribe(observer);

這里我們就只分析最簡(jiǎn)單的一種,看一下源碼:

@SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

第一句還是一樣,判斷是否為空,平時(shí)自己寫(xiě)代碼也要像這樣注意代碼的健壯性。
重點(diǎn)就是這三句了。

 observer = RxJavaPlugins.onSubscribe(this, observer);

 ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

 subscribeActual(observer);

一句一句來(lái)看:

 public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
        BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
        if (f != null) {
            return apply(f, source, observer);
        }
        return observer;
    }

在這個(gè)onsubscribe中是不是覺(jué)得有些眼熟?就跟剛剛的onAssenmbly幾乎一樣,由于我們沒(méi)有其它的功能,所以這里onObservableSubscribe也是null,也是返回原值,下面的requireNonNull我們也見(jiàn)過(guò)了,又驗(yàn)證一遍是否為空,因?yàn)槿绻覀兗尤肓薋unction函數(shù),上面就不會(huì)返回原來(lái)的observer了,所以還要再驗(yàn)證一遍。
于是就到了最后一句

protected abstract void subscribeActual(Observer<? super T> observer);

???
怎么是個(gè)abstract方法?那么它是在哪實(shí)現(xiàn)的呢?
回想看我們的observable初始化過(guò)程。哪里出的問(wèn)題呢?就是我們一開(kāi)始沒(méi)有分析的ObservableCreate,我們?cè)诔跏蓟臅r(shí)候就將一個(gè)ObservableCreate類向上轉(zhuǎn)型賦值給了Observerble,所以方法的具體實(shí)現(xiàn)也就在ObservableCreate里了。
繼續(xù)跟進(jìn)。果不其然:

@Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

還是一句一句來(lái)看

CreateEmitter<T> parent = new CreateEmitter<T>(observer);

又是一個(gè)新的類

    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {


        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
            } else {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

        @Override
        public void setDisposable(Disposable d) {
            DisposableHelper.set(this, d);
        }

        @Override
        public void setCancellable(Cancellable c) {
            setDisposable(new CancellableDisposable(c));
        }

        @Override
        public ObservableEmitter<T> serialize() {
            return new SerializedEmitter<T>(this);
        }

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
    }

所以我們的前兩句

CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);

所以我們的前兩局就是回調(diào)了onSubscribe接口,從而將這個(gè)CreateEmitter類型轉(zhuǎn)型成Disposable輸出了。而CreateEmitter的初始化參數(shù)又是observer本身,所以大體上可以看成回調(diào)了另一個(gè)格式的自己。。。然后一般可用于自殺(取消訂閱)。。。
然后就來(lái)到的最后一句

source.subscribe(parent);

這里的source就是

new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("test");
            }
        }

剛剛我們初始化observable傳入的。這個(gè)parent->這里的參數(shù)e。于是就這樣完成了Observerble和Observer的綁定,也就能實(shí)現(xiàn)接口回調(diào)了。

沒(méi)有任何其它功能,只是走了一邊最基本流程的Rxjava源碼,后面還會(huì)繼續(xù)更新的。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容