Android RxJava 2.x入門例子詳解(一)

前言

關(guān)于RxJava的介紹這里就不多說了,網(wǎng)上有很多相關(guān)的資料。
但有一點(diǎn)需要說明一下,很多同學(xué)可能在網(wǎng)上找到很多RxJava 1.X的教程,那么1.X和2.X有什么區(qū)別?學(xué)習(xí)2.X前需不需要先學(xué)習(xí)1.X?
其實(shí)1.X和2.X有很大的改變,如果你已學(xué)習(xí)過1.X,那么恭喜你,你只需要看看2.X有什么更新就可以了。如果你沒學(xué)習(xí)過1.X,那么也不需要擔(dān)心,你可以直接跳過1.X,來學(xué)習(xí)2.X。
所以本教程是直接使用2.X,概念性的東西這里也不多說,本教程直接使用例子一步一步帶大家入門。

先在Android Studio 項(xiàng)目添加Gradle配置:

compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
compile 'io.reactivex.rxjava2:rxjava:2.1.3'

可能你想添加最新的版本,那在哪里可以找到最新的版本呢?答案在這里:
https://github.com/ReactiveX/RxAndroid

入門例子:

//create創(chuàng)建一個(gè)上游Observable(被觀察者)
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                Log.d(TAG, "Observable發(fā)出:1");
                e.onNext(1);//向下游(觀察者)發(fā)射事件
                Log.d(TAG, "Observable發(fā)出:2");
                e.onNext(2);
                Log.d(TAG, "Observable發(fā)出:3");
                e.onNext(3);
                Log.d(TAG, "Observable發(fā)出:Complete");
                e.onComplete();
                Log.d(TAG, "Observable發(fā)出:4");
                e.onNext(4);//雖然無法接收事件,但發(fā)送事件還是繼續(xù)的
            }

        });
        //創(chuàng)建一個(gè)下游Observer(觀察者)
        Observer<Integer> observer = new Observer<Integer>() {

            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.d(TAG, "subscribe");
            }

            @Override
            public void onNext(@NonNull Integer integer) {
                Log.d(TAG, "onNext收到:" + integer);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.d(TAG, "error");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "Observer complete");
            }
        };
        //建立連接(訂閱),開始發(fā)送事件
        observable.subscribe(observer);

ObservableEmitter是發(fā)射器,就是用來發(fā)出事件的,它可以發(fā)出三種類型的事件。通過調(diào)用emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分別發(fā)出next事件、complete事件和error事件。

寫成RxJava引以為傲的鏈?zhǔn)讲僮鳎?/p>

//create創(chuàng)建一個(gè)上游Observable(被觀察者)
Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                Log.d(TAG, "Observable發(fā)出:1");
                e.onNext(1);//向下游(觀察者)發(fā)射內(nèi)容1
                Log.d(TAG, "Observable發(fā)出:2");
                e.onNext(2);
                Log.d(TAG, "Observable發(fā)出:3");
                e.onNext(3);
                Log.d(TAG, "Observable發(fā)出:Complete");
                e.onComplete();
                Log.d(TAG, "Observable發(fā)出:4");
                e.onNext(4);//雖然無法接收事件,但發(fā)送事件還是繼續(xù)的
            }
        }).subscribe(new Observer<Integer>() {
            //創(chuàng)建一個(gè)下游Observer(觀察者)
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.d(TAG, "subscribe");
            }

            @Override
            public void onNext(@NonNull Integer integer) {
                Log.d(TAG, "onNext收到:" + integer);
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.d(TAG, "error");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "Observer complete");
            }
        });

1、上游可以發(fā)送無限個(gè)onNext,下游也可以接收無限個(gè)onNext。
2、 當(dāng)上游發(fā)送了一個(gè)onComplete后, 上游onComplete之后的事件將會(huì)繼續(xù)發(fā)送,而下游收到onComplete事件之后將不再繼續(xù)接收事件。
3、當(dāng)上游發(fā)送了一個(gè)onError后,上游onError之后的事件將繼續(xù)發(fā)送,而下游收到onError事件之后將不再繼續(xù)接收事件。
4、上游可以不發(fā)送onComplete或onError。
5、最為關(guān)鍵的是onComplete和onError必須唯一并且互斥, 即不能發(fā)多個(gè)onComplete, 也不能發(fā)多個(gè)onError,也不能先發(fā)一個(gè)onComplete, 然后再發(fā)一個(gè)onError,反之亦然。

    注:關(guān)于onComplete和onError唯一并且互斥這一點(diǎn), 是需要自行在代碼中進(jìn)行控制, 如果你的代碼邏輯中違背了這個(gè)規(guī)則, 并不一定會(huì)導(dǎo)致程序崩潰. 比如發(fā)送多個(gè)onComplete是可以正常運(yùn)行的, 依然是收到第一個(gè)onComplete就不再接收了, 但若是發(fā)送多個(gè)onError, 則收到第二個(gè)onError事件會(huì)導(dǎo)致程序會(huì)崩潰.

入門例子:

//create創(chuàng)建一個(gè)上游 Observable(被觀察者)
Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                Log.d(TAG, "Observable發(fā)出:1");
                e.onNext(1);//向下游(觀察者)發(fā)射內(nèi)容1
                Log.d(TAG, "Observable發(fā)出:2");
                e.onNext(2);
                Log.d(TAG, "Observable發(fā)出:3");
                e.onNext(3);
                Log.d(TAG, "Observable發(fā)出:4");
                e.onNext(4);
                Log.d(TAG, "Observable發(fā)出:Complete");
                e.onComplete();
                Log.d(TAG, "Observable發(fā)出:5");
                e.onNext(5);//雖然無法接收事件,但發(fā)送事件還是繼續(xù)的
            }
        }).subscribe(new Observer<Integer>() {
            //創(chuàng)建一個(gè)下游 Observer(觀察者)

            private Disposable mDisposable;
            private int i;
            @Override
            public void onSubscribe(@NonNull Disposable d) {
                Log.d(TAG, "subscribe");
                mDisposable=d;
            }

            @Override
            public void onNext(@NonNull Integer integer) {
                Log.d(TAG, "onNext收到:" + integer);
                i++;
                if (i==2){
                    Log.d(TAG, "onNext:dispose");
                    mDisposable.dispose();//取消訂閱,不再接收事件
                    Log.d(TAG, "onNext isDisposed : " + mDisposable.isDisposed());
                }
            }

            @Override
            public void onError(@NonNull Throwable e) {
                Log.d(TAG, "error");
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "Observer complete");
            }
        });

下游調(diào)用dispose(),取消收收事件,但并不會(huì)導(dǎo)致上游不再繼續(xù)發(fā)送事件, 上游可以繼續(xù)發(fā)送剩余的事件。

subscribe()有多個(gè)重載:

public final Disposable subscribe();
public final Disposable subscribe(Consumer<? super T> onNext);
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError);
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete);

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe);
public final void subscribe(Observer<? super T> observer);

帶有一個(gè)Consumer參數(shù)的方法,表示下游只關(guān)心onNext事件,其他的事件不管。
因此,如果只需要onNext事件可以這么寫:

//create創(chuàng)建一個(gè)上游 Observable(被觀察者)
Observable.create(new ObservableOnSubscribe<Integer>() {

            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                Log.d(TAG, "Observable發(fā)出:1");
                e.onNext(1);//向下游(觀察者)發(fā)射內(nèi)容1
                Log.d(TAG, "Observable發(fā)出:2");
                e.onNext(2);
                Log.d(TAG, "Observable發(fā)出:3");
                e.onNext(3);
                Log.d(TAG, "Observable發(fā)出:4");
                e.onNext(4);
                Log.d(TAG, "Observable發(fā)出:Complete");
                e.onComplete();
                Log.d(TAG, "Observable發(fā)出:5");
                e.onNext(5);//雖然無法接收事件,但發(fā)送事件還是繼續(xù)的
            }
        }).subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer integer) throws Exception {
                Log.d(TAG, "onNext收到:" + integer);
            }
        });
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容