【Android】RxJava2可以這么學(xué)!

一、簡介

關(guān)于RxJava的介紹與使用,在網(wǎng)上已經(jīng)有很多的相關(guān)的教程,無奈自身水平有限,絕大多數(shù)講解的原理噼里啪啦一堆,說的一堆高大上的名詞把自己繞的云里霧里,領(lǐng)悟不透;剛好最近公司要求做分享,自己就做了一份關(guān)于RxJava的介紹。
這里我將用河水的上游和下游代替被觀察者和觀察者, 用通俗易懂的話把它們的關(guān)系解釋清楚, 在這里我將從事件流向這個角度來說明RxJava的基本工作原理。


二、簡單原理分析

首先,我們來看一張圖:

簡單原理圖

上圖中河流的上游產(chǎn)生了三個事件,而下游就會接收到三個事件,在這里上游按照順序發(fā)送了1,2,3三個事件,而下游在收到的事件順序也是先1,后2,再3的順序;簡單來說,這里的上游下游就分別對應(yīng)著RxJava中的ObservableObserver,它們之間的連接就對應(yīng)著subscribe()。下面用簡單的代碼來說明這種關(guān)系。


三、簡單使用

3.1 上游Observable的創(chuàng)建
//創(chuàng)建一個上游 即Observable:
    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
      @Override public void subscribe(@NonNull ObservableEmitter<Integer> emitter)
          throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
        emitter.onComplete();
        //e.onError(new IOException());
      }
    });

在創(chuàng)建上游Observable的時候有涉及到的三個方法說明:
** 1. onNext(T t):**
Observable調(diào)用這個方法發(fā)射數(shù)據(jù),方法的參數(shù)就是Observable發(fā)射的數(shù)據(jù),這個方法可能會被調(diào)用多次,取決于你的實現(xiàn)。
2. onComplete():
正常終止,如果沒有遇到錯誤,Observable在最后一次調(diào)用onNext之后調(diào)用此方法。
3. onError(Exception ex):
當(dāng)Observable遇到錯誤或者無法返回期望的數(shù)據(jù)時會調(diào)用這個方法,這個調(diào)用會終止Observable,后續(xù)不會再調(diào)用onNext和onCompleted,onError方法的參數(shù)是拋出的異常。
注:以上三個方法都是在被ObservableEmitter這個方法所調(diào)用的,關(guān)于這個方法又是做什么的呢?


3.2 關(guān)于上游Observable中的ObservableEmitter()方法說明

Emitter是發(fā)射器的意思,那就很好猜了,這個就是用來發(fā)出事件的,它可以發(fā)出三種類型的事件,通過調(diào)用emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分別發(fā)出next事件、complete事件和error事件。
然而,這并不意味著我們就可以隨意的發(fā)送亂七八糟的事件,我們需要滿足以下的規(guī)則才行:
1. 上游可以發(fā)送無限個onNext, 下游也可以接收無限個onNext;
2. 當(dāng)上游發(fā)送了一個onComplete后, 上游onComplete之后的事件將會繼續(xù)發(fā)送, 而下游收到onComplete事件之后將不再繼續(xù)接收事件;
3. 當(dāng)上游發(fā)送了一個onError后, 上游onError之后的事件將繼續(xù)發(fā)送, 而下游收到onError事件之后將不再繼續(xù)接收事件;
4. 上游可以不發(fā)送onComplete或onError;
5. 最為關(guān)鍵的是onComplete和onError必須唯一并且互斥, 即不能發(fā)多個onComplete, 也不能發(fā)多個onError, 也不能先發(fā)一個onComplete, 然后再發(fā)一個onError, 反之亦然。
說了這么多,那么我們的下游到底要如何接收處理呢,下面我們來看一下下游的創(chuàng)建。


3.3 下游Observer的創(chuàng)建
//創(chuàng)建一個下游 即Observer
    Observer<Integer> observer = new Observer<Integer>() {
      @Override public void onSubscribe(@NonNull Disposable d) {
        Log.d(TAG, "onSubscribe");
      }
      @Override public void onNext(@NonNull Integer integer) {
        Log.d(TAG, "onNext" + integer);
      }
      @Override public void onError(@NonNull Throwable e) {
        Log.d(TAG, "onError");
      }
      @Override public void onComplete() {
        Log.d(TAG, "onComplete");
      }
    };

在創(chuàng)建下游的時候我么發(fā)現(xiàn),在下游除了接收了onNext、onError、onComplete三個方法之外,還有一個onSubscribe方法,那么這個方法是做啥的?這里我簡單介紹下:

關(guān)于onSubscribe(Disposable d)方法的說明:

onSubscribe是在Observer中執(zhí)行的第一個方法,在onNext方法之前執(zhí)行,關(guān)于它傳入的參數(shù)Disposable,可以將其理解成一個上游和下游之間的機關(guān),當(dāng)我們調(diào)用其對應(yīng)的dispose()方法的時候,他就會將上游和下游之間切斷,從而導(dǎo)致下游不會接收到新的事件。

注:調(diào)用dispose()并不會導(dǎo)致上游不再繼續(xù)發(fā)送事件, 上游會繼續(xù)發(fā)送剩余的事件。
上面說了這么多,那么onNext、onError、onCompleted方法到底有何區(qū)別。


3.4 onNext、onComplete、onError三個方法的區(qū)別

我們通過以下三幅圖來了解關(guān)于這三個方法的區(qū)別。

只發(fā)送onNext()事件
發(fā)送onNext和onComplete事件
發(fā)送onNext和onError事件

3.6 上游Observable與下游Observer的關(guān)聯(lián)

我們現(xiàn)在已經(jīng)創(chuàng)建好了上游和下游,該如何關(guān)聯(lián)起來呢,很簡單,我們可以用subscribe()方法進行關(guān)聯(lián):

//建立連接
observable.subscribe(observer);

關(guān)于observable()方法,它其實有很多的重載方法,具體如下:

observable重載的方法.png

下面我簡單的對說明一下這些重載方法:

  1. 不帶任何參數(shù)的 subscribe() 表示下游不關(guān)心任何事件,你上游盡管發(fā)你的數(shù)據(jù)去吧, 老子可不管你發(fā)什么.
  1. 帶有一個Consumer參數(shù)的方法表示下游只關(guān)心onNext事件, 其他的事件我假裝沒看見, 因此我們?nèi)绻恍枰猳nNext事件可以這么寫:
Observable.create(new ObservableOnSubscribe<Integer>() {
      @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        emitter.onNext(1);
        emitter.onNext(2);
        emitter.onNext(3);
        emitter.onComplete();
        emitter.onNext(4);
      }
    }).subscribe(new Consumer<Integer>() {
      @Override public void accept(Integer integer) throws Exception {
        Log.d(TAG, "onNext: " + integer);
      }
    });

打印結(jié)果如下:


打印結(jié)果

3.7 關(guān)于Observable的其他創(chuàng)建方法

除了我們使用Observable.create()方法創(chuàng)建Observable之外還有:
Observable.just():
注:just()方法里存放的其實就是onNext()發(fā)射的內(nèi)容,其存放的個數(shù)最多為10個,超過10個則會報錯,這是因為just()方法重載了10次。

Observable.fromArray():
注:fromArray()方法里存放的同樣也是onNext()發(fā)射的內(nèi)容,與just()方法區(qū)別的是fromArray()里面的參數(shù)是可變數(shù)組,存放的個數(shù)沒有限制。

Observable.fromIterable():
注: fromIterable()方法里傳入的參數(shù)為一個集合,集合中每個條目對應(yīng)的就是一個onNext()發(fā)射的內(nèi)容。

下面一一舉例說明:

//Observable.just()創(chuàng)建方法
    Observable.just(1, 2, 3, 4).subscribe(new Consumer<Integer>() {
      @Override public void accept(Integer integer) throws Exception {
        Log.d(TAG, "onNext: " + integer);
      }
    });
  //Observable.fromArray()創(chuàng)建方法
    Observable.fromArray(1, 2, 3, 4).subscribe(new Consumer<Integer>() {
      @Override public void accept(Integer integer) throws Exception {
        Log.d(TAG, "onNext: " + integer);
      }
    });
  //Observable.fromIterable()創(chuàng)建方法
    List<Integer> list = new ArrayList<>();
    list.add(1);
    list.add(2);
    list.add(3);
    list.add(4);
    Observable.fromIterable(list).subscribe(new Consumer<Integer>() {
      @Override public void accept(Integer integer) throws Exception {
        Log.d(TAG, "onNext: " + integer);
      }
    });

3.8 RxJava的線程控制

正常情況下, 上游和下游是工作在同一個線程中的, 也就是說上游在哪個線程發(fā)事件, 下游就在哪個線程接收事件.,例如我們在Activity的按鈕點擊事件中做如下操作:

Observable.create(new ObservableOnSubscribe<Integer>() {
      @Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
        Log.d(TAG, "Observable所在的線程:" + Thread.currentThread().getName());
        e.onNext(1);
      }
    }).subscribe(new Consumer<Integer>() {
      @Override public void accept(@NonNull Integer integer) throws Exception {
        Log.d(TAG, "Observer所在的線程:" + Thread.currentThread().getName());
        Log.d(TAG, "onNext: " + integer);
      }
    });
打印結(jié)果

上面這樣肯定是滿足不了我們的需求的, 我們更多想要的是這么一種情況, 在子線程中做耗時的操作, 然后回到主線程中來操作UI,該如何做?
案例分析說明RxJava中的線程切換:
需求:從網(wǎng)上獲取云南電信所有頻道的信息,將頻道中的頻道名顯示到UI界面上。

 Observable.create(new ObservableOnSubscribe<Channel>() {
      @Override public void subscribe(@NonNull ObservableEmitter<Channel> e) throws Exception {
        //網(wǎng)絡(luò)請求頻道的數(shù)據(jù)
        Channel channel = testApi.getChannel().execute().body();
        e.onNext(channel);
      }
    }).subscribe(new Consumer<Channel>() {
          @Override public void accept(@NonNull Channel channel) throws Exception {
            List<Channel.DataBean> data = channel.getData();
            for (Channel.DataBean dataBean : data) {
              tv.append(dataBean.getChannelName() + "\n");
            }
          }
        });

上面的方法在按鈕點擊事件里面執(zhí)行的,即在主線程執(zhí)行的;然而我們的Observable方法執(zhí)行的是網(wǎng)絡(luò)請求,一個耗時操作,應(yīng)該在子線程執(zhí)行才對,當(dāng)我們執(zhí)行上面代碼會出現(xiàn)以下錯誤:

異常錯誤(一)

該如何將Observable切換到子線程呢?
RxJava中有一個subscribeOn()方法,可以指定Observable的線程,因此,我們將之前的代碼修改如下:

Observable.create(new ObservableOnSubscribe<Channel>() {
      @Override public void subscribe(@NonNull ObservableEmitter<Channel> e) throws Exception {
        //網(wǎng)絡(luò)獲取頻道數(shù)據(jù)
        Channel channel = testApi.getChannel().execute().body();
        e.onNext(channel);
      }
    })
        .subscribeOn(Schedulers.io())    //切換上游線程操作
        .subscribe(new Consumer<Channel>() {
          @Override public void accept(@NonNull Channel channel) throws Exception {
            List<Channel.DataBean> data = channel.getData();
            for (Channel.DataBean dataBean : data) {
              tv.append(dataBean.getChannelName() + "\n");
            }
          }
        });

繼續(xù)運行,發(fā)現(xiàn)還是報錯了,什么原因?那是因為我們開始的時候?qū)bservable切換到了子線程,根據(jù)之前說的,默認Observer里面執(zhí)行的線程跟Observable線程是一致的,而這里Observer涉及到更新UI操作,應(yīng)該在主線程執(zhí)行才對。

錯誤異常(二)

該如何將Observer切換到子線程呢?
RxJava中有一個subscribeOn()方法,可以指定Observer的線程,因此,我們將之前的代碼修改如下:

Observable.create(new ObservableOnSubscribe<Channel>() {
      @Override public void subscribe(@NonNull ObservableEmitter<Channel> e) throws Exception {
        //網(wǎng)絡(luò)獲取頻道數(shù)據(jù)
        Channel channel = testApi.getChannel().execute().body();
        e.onNext(channel);
      }
    })
        .subscribeOn(Schedulers.io())     //切換上游線程操作到io線程
        .observeOn(AndroidSchedulers.mainThread())  //切換下游線程到主線程
        .subscribe(new Consumer<Channel>() {
          @Override public void accept(@NonNull Channel channel) throws Exception {
            List<Channel.DataBean> data = channel.getData();
            for (Channel.DataBean dataBean : data) {
              tv.append(dataBean.getChannelName() + "\n");
            }
          }
        });

再來開一下運行結(jié)果:

正確的運行結(jié)果

以上表明我們的線程切換達到了我們需要的目的。

在RxJava中已經(jīng)內(nèi)置了很多線程選項供我們選擇, 例如有:

Schedulers.io() : 代表io操作的線程, 通常用于網(wǎng)絡(luò),讀寫文件等io密集型的操作

Schedulers.computation(): 代表CPU計算密集型的操作, 例如需要大量計算的操作

Schedulers.newThread(): 代表一個常規(guī)的新線程

AndroidSchedulers.mainThread(): 代表Android的主線程

這些內(nèi)置的Scheduler已經(jīng)足夠滿足我們開發(fā)的需求, 因此我們應(yīng)該使用內(nèi)置的這些選項, 在RxJava內(nèi)部使用的是線程池來維護這些線程, 所有效率也比較高.


四、總結(jié)

關(guān)于RxJava的簡單使用就介紹到這里,后期有空我會向大家介紹一下RxJava2的其他實用的方法。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容