RxJava源碼分析

RxJava大概流程:

1.Observable.create 創(chuàng)建事件源,但并不生產(chǎn)也不發(fā)射事件。

代碼創(chuàng)建被觀察者Observable

Observable<String> observable =  Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("a");
                e.onComplete();
            }
        });

源碼跟進(jìn)Observable.create

    @CheckReturnValue
    @NonNull
    @SchedulerSupport(SchedulerSupport.NONE)
    //ObservableOnSubscribe是個(gè)接口,只包含subscribe方法。
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");//判段是否為空
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

繼續(xù)跟進(jìn)RxJavaPlugins.onAssembly方法

/**
     * Calls the associated hook function.
     * @param <T> the value type
     * @param source the hook's input value
     * @return the value returned by the hook
     */
    @SuppressWarnings({ "rawtypes", "unchecked" })
    @NonNull
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }

通過(guò)調(diào)試得知靜態(tài)對(duì)象onObservableAssembly默認(rèn)為null, 所以此方法直接返回傳入的參數(shù)source。

由此可以看出來(lái)整個(gè)Observable.create方法就是創(chuàng)建了個(gè)事件源new ObservableCreate()對(duì)象,將ObservableOnSubscribe作為參數(shù)傳遞給ObservableCreate的構(gòu)造函數(shù)。事件是由接口ObservableOnSubscribe的subscribe方法上產(chǎn)生的,但此時(shí)并不生產(chǎn)也不發(fā)射事件。

2.實(shí)現(xiàn)observer接口,此時(shí)沒(méi)有也無(wú)法接受到任何發(fā)射來(lái)的事件。

代碼創(chuàng)建觀察者Observer

Observer observer = new Observer<String>() {

            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(String str) {
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        };

源碼跟進(jìn)Observer

public interface Observer<T> {

    /**
     * Provides the Observer with the means of cancelling (disposing) the
     * connection (channel) with the Observable in both
     * synchronous (from within {@link #onNext(Object)}) and asynchronous manner.
     * @param d the Disposable instance whose {@link Disposable#dispose()} can
     * be called anytime to cancel the connection
     * @since 2.0
     */
    void onSubscribe(@NonNull Disposable d);

    /**
     * Provides the Observer with a new item to observe.
     * <p>
     * The {@link Observable} may call this method 0 or more times.
     * <p>
     * The {@code Observable} will not call this method again after it calls either {@link #onComplete} or
     * {@link #onError}.
     *
     * @param t
     *          the item emitted by the Observable
     */
    void onNext(@NonNull T t);

    /**
     * Notifies the Observer that the {@link Observable} has experienced an error condition.
     * <p>
     * If the {@link Observable} calls this method, it will not thereafter call {@link #onNext} or
     * {@link #onComplete}.
     *
     * @param e
     *          the exception encountered by the Observable
     */
    void onError(@NonNull Throwable e);

    /**
     * Notifies the Observer that the {@link Observable} has finished sending push-based notifications.
     * <p>
     * The {@link Observable} will not call this method if it calls {@link #onError}.
     */
    void onComplete();

}

Observer就是個(gè)接口,包含了上面四個(gè)方法,所以此時(shí)沒(méi)有也無(wú)法接受到任何發(fā)射來(lái)的事件

3.訂閱 observable.subscribe(observer), 此時(shí)會(huì)調(diào)用具體Observable的實(shí)現(xiàn)類中的subscribeActual方法,此時(shí)才會(huì)真正觸發(fā)事件源生產(chǎn)事件,事件源生產(chǎn)出來(lái)的事件通過(guò)Emitter的onNext,onError,onComplete發(fā)射給observer對(duì)應(yīng)的方法由下游observer消費(fèi)掉。從而完成整個(gè)事件流的處理。

代碼關(guān)聯(lián)觀察者和被觀察者

observable.subscribe(observer);

代碼跟進(jìn)subscribe

@SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            //hook ,默認(rèn)直接返回observer
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
            //真正實(shí)現(xiàn)訂閱的方法。
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

跟進(jìn)subscribeActual方法

//subscribeActual 是抽象方法,所以需要到實(shí)現(xiàn)類中去看具體實(shí)現(xiàn),也就是上文中1.提到的ObservableCreate中
protected abstract void subscribeActual(Observer<? super T> observer);
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        //事件源,生產(chǎn)事件的接口
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //發(fā)射器
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //直接回調(diào)了觀察者的onSubscribe 并傳回了Disposable
        //observer中可以利用Disposable來(lái)隨時(shí)中斷事件流的發(fā)射。
        observer.onSubscribe(parent);

        try {
            // 調(diào)用了事件源subscribe方法生產(chǎn)事件,同時(shí)將發(fā)射器傳給事件源。
            // 現(xiàn)在我們明白了,數(shù)據(jù)源生產(chǎn)事件的subscribe方法只有在observable.subscribe(observer)被執(zhí)行后才執(zhí)行的。 換言之,事件流是在訂閱后才產(chǎn)生的。
            // 而observable被創(chuàng)建出來(lái)時(shí)并不生產(chǎn)事件,同時(shí)也不發(fā)射事件。
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

現(xiàn)在可以看出來(lái),只有subscribe訂閱后,數(shù)據(jù)源才會(huì)開始生產(chǎn)事件和發(fā)射事件。
接下來(lái)看看事件是如何被發(fā)射出去的,看下CreateEmitter源碼

static final class CreateEmitter<T> extends AtomicReference<Disposable> 
implements ObservableEmitter<T>, Disposable {}

CreateEmitter 實(shí)現(xiàn)了ObservableEmitter接口,同時(shí)ObservableEmitter接口又繼承了Emitter接口。
CreateEmitter 還實(shí)現(xiàn)了Disposable接口,這個(gè)disposable接口是用來(lái)判斷是否中斷事件發(fā)射的。

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

看到這就能看出來(lái),當(dāng)訂閱成功后,數(shù)據(jù)源ObservableOnSubscribe開始生產(chǎn)事件,調(diào)用Emitter的onNext,onComplete向下游發(fā)射事件,Emitter包含了observer的引用,又調(diào)用了observer onNext,onComplete,這樣下游observer就接收到了上游發(fā)射的數(shù)據(jù)。

RxJava的線程調(diào)度機(jī)制

感謝大神們的心得:
https://www.cnblogs.com/linghu-java/p/9719427.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容