RxJava 的消息訂閱

1 簡單使用步驟

1、創(chuàng)建被觀察者(Observable),定義要發(fā)送的事件。
2、創(chuàng)建觀察者(Observer),接受事件并做出響應操作。
3、觀察者通過訂閱(subscribe)被觀察者把它們連接到一起。

2 RxJava的消息訂閱例子

  //步驟1. 創(chuàng)建被觀察者(Observable),定義要發(fā)送的事件。
        Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("文章1");
                emitter.onNext("文章2");
                emitter.onNext("文章3");
                emitter.onComplete();
            }
        });

        //步驟2. 創(chuàng)建觀察者(Observer),接受事件并做出響應操作。
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext : " + s);
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError : " + e.toString());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        };

        //步驟3. 觀察者通過訂閱(subscribe)被觀察者把它們連接到一起。
        observable.subscribe(observer);

其輸出結果為:

onSubscribe
onNext : 文章1
onNext : 文章2
onNext : 文章3
onComplete

3 源碼分析

3.1 創(chuàng)建被觀察者過程

首先來看下創(chuàng)建被觀察者(Observable)的過程,上面的例子中我們是直接使用Observable.create()來創(chuàng)建Observable

3.1.1 Observable類的create()

創(chuàng)建一個ObservableCreate對象出來,然后把我們自定義的ObservableOnSubscribe作為參數(shù)傳到ObservableCreate中去,最后就是調用 RxJavaPlugins.onAssembly()方法。

3.1.2 ObservableCreate類

public final class ObservableCreate<T> extends Observable<T> {//繼承自Observable
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;//把我們創(chuàng)建的ObservableOnSubscribe對象賦值給source。
    }
}

可以看到,ObservableCreate是繼承自Observable的,并且會把ObservableOnSubscribe對象給存起來

3.1.3 RxJavaPlugins類的onAssembly()

    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        //省略無關代碼
        return source;
    }

很簡單,就是把上面創(chuàng)建的ObservableCreate給返回。

3.1.4 簡單總結

Observable.create()中就是把我們自定義的ObservableOnSubscribe對象重新包裝成一個ObservableCreate對象,然后返回這個ObservableCreate對象。

3.1.5

Observable.create()的時序圖如下所示:


3.2 訂閱過程

3.2.1 Observable類的subscribe()

    public final void subscribe(Observer<? super T> observer) {
            //省略無關代碼

            observer = RxJavaPlugins.onSubscribe(this, observer);

            subscribeActual(observer);

            //省略無關代碼
    }

可以看到,實際上其核心的代碼也就兩句,我們分開來看下:

3.2.2 RxJavaPlugins類的onSubscribe()

    public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
        //省略無關代碼

        return observer;
    }

跟之前代碼一樣,這里同樣也是把原來的observer返回而已。
再來看下subscribeActual()方法。

3.2.3 Observable類的subscribeActual()

 protected abstract void subscribeActual(Observer<? super T> observer);

Observable類的subscribeActual()中的方法是一個抽象方法,那么其具體實現(xiàn)在哪呢?還記得我們前面創(chuàng)建被觀察者的過程嗎,最終會返回一個ObservableCreate對象,這個ObservableCreate就是Observable的子類,我們點進去看下:

3.2.4 ObservableCreate類的subscribeActual()

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //觸發(fā)我們自定義的Observer的onSubscribe(Disposable)方法
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

可以看到,subscribeActual()方法中首先會創(chuàng)建一個CreateEmitter對象,然后把我們自定義的觀察者observer作為參數(shù)給傳進去。這里同樣也是包裝起來

這個CreateEmitter實現(xiàn)了ObservableEmitter接口和Disposable接口,如下:

這個CreateEmitter實現(xiàn)了ObservableEmitter接口和Disposable接口,如下:

然后就是調用了observer.onSubscribe(parent),實際上就是調用觀察者的onSubscribe()方法,即告訴觀察者已經成功訂閱到了被觀察者。

繼續(xù)往下看,subscribeActual()方法中會繼續(xù)調用source.subscribe(parent),這里的source就是ObservableOnSubscribe對象,即這里會調用ObservableOnSubscribe的subscribe()方法。

我們具體定義的subscribe()方法如下:

        Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("文章1");
                emitter.onNext("文章2");
                emitter.onNext("文章3");
                emitter.onComplete();
            }
        });

ObservableEmitter,顧名思義,就是被觀察者發(fā)射器。
所以,subscribe()里面的三個onNext()方法和一個onComplete()會逐一被調用。

3.2.5 CreateEmitter類的onNext()和onComplete()等

        //省略其他代碼

        @Override
        public void onNext(T t) {
            //省略無關代碼
            if (!isDisposed()) {
                //調用觀察者的onNext()
                observer.onNext(t);
            }
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    //調用觀察者的onComplete()
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }

可以看到,最終就是會調用到觀察者的onNext()和onComplete()方法。

可以看到,上面有個isDisposed()方法能控制消息的走向,即能夠切斷消息的傳遞,這個后面再來說。

3.2.6 簡單總結

Observable(被觀察者)和Observer(觀察者)建立連接(訂閱)之后,會創(chuàng)建出一個發(fā)射器CreateEmitter,發(fā)射器會把被觀察者中產生的事件發(fā)送到觀察者中去,觀察者對發(fā)射器中發(fā)出的事件做出響應處理。

可以看到,是訂閱之后,Observable(被觀察者)才會開始發(fā)送事件。

3.2.7 時序流程圖

再來看下訂閱過程的時序流程圖:


4 切斷消息

4.1 切斷消息

        Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                emitter.onNext("文章1");
                emitter.onNext("文章2");
                emitter.onNext("文章3");
                emitter.onComplete();
            }
        });

        Observer<String> observer = new Observer<String>() {
            private Disposable mDisposable;
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe : " + d);
                mDisposable=d;
            }

            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext : " + s);
                mDisposable.dispose();
                Log.d(TAG, "切斷觀察者與被觀察者的連接");
            }

            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError : " + e.toString());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        };

        observable.subscribe(observer);

輸出結果為

onSubscribe : null
onNext : 文章1
切斷觀察者與被觀察者的連接

可以看到,要切斷消息的傳遞很簡單,調用下Disposable的dispose()方法即可。調用dispose()之后,被觀察者雖然能繼續(xù)發(fā)送消息,但是觀察者卻收不到消息了
另外有一點需要注意,上面onSubscribe輸出的Disposable值是"null",并不是空引用null。

4.2 切斷消息源碼分析

Disposable是一個接口,可以理解Disposable為一個連接器,調用dispose()后,這個連接器將會中斷。

其具體實現(xiàn)在CreateEmitter類.

4.2.1 CreateEmitter的dispose()

        @Override
        public void dispose() {
            DisposableHelper.dispose(this);
        }

就是調用DisposableHelper.dispose(this)而已。

4.2.2 DisposableHelper類

public enum DisposableHelper implements Disposable {

    DISPOSED
    ;

    //其他代碼省略

    public static boolean isDisposed(Disposable d) {
        //判斷Disposable類型的變量的引用是否等于DISPOSED
        //即判斷該連接器是否被中斷
        return d == DISPOSED;
    }

    public static boolean dispose(AtomicReference<Disposable> field) {
        Disposable current = field.get();
        Disposable d = DISPOSED;
        if (current != d) {
            //這里會把field給設為DISPOSED
            current = field.getAndSet(d);
            if (current != d) {
                if (current != null) {
                    current.dispose();
                }
                return true;
            }
        }
        return false;
    }
}

可以看到DisposableHelper是一個枚舉類,并且只有一個值:DISPOSED。dispose()方法中會把一個原子引用field設為DISPOSED,即標記為中斷狀態(tài)。

因此后面通過isDisposed()方法即可以判斷連接器是否被中斷。

4.2.3 CreateEmitter類中的方法

        @Override
        public void onNext(T t) {
            //省略無關代碼

            if (!isDisposed()) {
                //如果沒有dispose(),才會調用onNext()
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            if (!tryOnError(t)) {
                //如果dispose()了,會調用到這里,即最終會崩潰
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            //省略無關代碼
            if (!isDisposed()) {
                try {
                    //如果沒有dispose(),才會調用onError()
                    observer.onError(t);
                } finally {
                    //onError()之后會dispose()
                    dispose();
                }
                //如果沒有dispose(),返回true
                return true;
            }
            //如果dispose()了,返回false
            return false;
        }

        @Override
        public void onComplete() {
            if (!isDisposed()) {
                try {
                    //如果沒有dispose(),才會調用onComplete()
                    observer.onComplete();
                } finally {
                    //onComplete()之后會dispose()
                    dispose();
                }
            }
        }
  1. 如果沒有dispose,observer.onNext()才會被調用到。

  2. onError()和onComplete()互斥,只能其中一個被調用到,因為調用了他們的任意一個之后都會調用dispose()。

  3. 先onError()后onComplete(),onComplete()不會被調用到。反過來,則會崩潰,因為onError()中拋出了異常:RxJavaPlugins.onError(t)。實際上是dispose后繼續(xù)調用onError()都會炸。

參考

詳解 RxJava 的消息訂閱和線程切換原理

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容