一、簡介
關(guān)于RxJava的介紹與使用,在網(wǎng)上已經(jīng)有很多的相關(guān)的教程,無奈自身水平有限,絕大多數(shù)講解的原理噼里啪啦一堆,說的一堆高大上的名詞把自己繞的云里霧里,領(lǐng)悟不透;剛好最近公司要求做分享,自己就做了一份關(guān)于RxJava的介紹。
這里我將用河水的上游和下游代替被觀察者和觀察者, 用通俗易懂的話把它們的關(guān)系解釋清楚, 在這里我將從事件流向這個角度來說明RxJava的基本工作原理。
二、簡單原理分析
首先,我們來看一張圖:

上圖中河流的上游產(chǎn)生了三個事件,而下游就會接收到三個事件,在這里上游按照順序發(fā)送了1,2,3三個事件,而下游在收到的事件順序也是先1,后2,再3的順序;簡單來說,這里的上游和下游就分別對應(yīng)著RxJava中的Observable和Observer,它們之間的連接就對應(yīng)著subscribe()。下面用簡單的代碼來說明這種關(guān)系。
三、簡單使用
3.1 上游Observable的創(chuàng)建
//創(chuàng)建一個上游 即Observable:
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override public void subscribe(@NonNull ObservableEmitter<Integer> emitter)
throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
//e.onError(new IOException());
}
});
在創(chuàng)建上游Observable的時候有涉及到的三個方法說明:
** 1. onNext(T t):**
Observable調(diào)用這個方法發(fā)射數(shù)據(jù),方法的參數(shù)就是Observable發(fā)射的數(shù)據(jù),這個方法可能會被調(diào)用多次,取決于你的實現(xiàn)。
2. onComplete():
正常終止,如果沒有遇到錯誤,Observable在最后一次調(diào)用onNext之后調(diào)用此方法。
3. onError(Exception ex):
當(dāng)Observable遇到錯誤或者無法返回期望的數(shù)據(jù)時會調(diào)用這個方法,這個調(diào)用會終止Observable,后續(xù)不會再調(diào)用onNext和onCompleted,onError方法的參數(shù)是拋出的異常。
注:以上三個方法都是在被ObservableEmitter這個方法所調(diào)用的,關(guān)于這個方法又是做什么的呢?
3.2 關(guān)于上游Observable中的ObservableEmitter()方法說明
Emitter是發(fā)射器的意思,那就很好猜了,這個就是用來發(fā)出事件的,它可以發(fā)出三種類型的事件,通過調(diào)用emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分別發(fā)出next事件、complete事件和error事件。
然而,這并不意味著我們就可以隨意的發(fā)送亂七八糟的事件,我們需要滿足以下的規(guī)則才行:
1. 上游可以發(fā)送無限個onNext, 下游也可以接收無限個onNext;
2. 當(dāng)上游發(fā)送了一個onComplete后, 上游onComplete之后的事件將會繼續(xù)發(fā)送, 而下游收到onComplete事件之后將不再繼續(xù)接收事件;
3. 當(dāng)上游發(fā)送了一個onError后, 上游onError之后的事件將繼續(xù)發(fā)送, 而下游收到onError事件之后將不再繼續(xù)接收事件;
4. 上游可以不發(fā)送onComplete或onError;
5. 最為關(guān)鍵的是onComplete和onError必須唯一并且互斥, 即不能發(fā)多個onComplete, 也不能發(fā)多個onError, 也不能先發(fā)一個onComplete, 然后再發(fā)一個onError, 反之亦然。
說了這么多,那么我們的下游到底要如何接收處理呢,下面我們來看一下下游的創(chuàng)建。
3.3 下游Observer的創(chuàng)建
//創(chuàng)建一個下游 即Observer
Observer<Integer> observer = new Observer<Integer>() {
@Override public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override public void onNext(@NonNull Integer integer) {
Log.d(TAG, "onNext" + integer);
}
@Override public void onError(@NonNull Throwable e) {
Log.d(TAG, "onError");
}
@Override public void onComplete() {
Log.d(TAG, "onComplete");
}
};
在創(chuàng)建下游的時候我么發(fā)現(xiàn),在下游除了接收了onNext、onError、onComplete三個方法之外,還有一個onSubscribe方法,那么這個方法是做啥的?這里我簡單介紹下:
關(guān)于onSubscribe(Disposable d)方法的說明:
onSubscribe是在Observer中執(zhí)行的第一個方法,在onNext方法之前執(zhí)行,關(guān)于它傳入的參數(shù)Disposable,可以將其理解成一個上游和下游之間的機關(guān),當(dāng)我們調(diào)用其對應(yīng)的dispose()方法的時候,他就會將上游和下游之間切斷,從而導(dǎo)致下游不會接收到新的事件。
注:調(diào)用dispose()并不會導(dǎo)致上游不再繼續(xù)發(fā)送事件, 上游會繼續(xù)發(fā)送剩余的事件。
上面說了這么多,那么onNext、onError、onCompleted方法到底有何區(qū)別。
3.4 onNext、onComplete、onError三個方法的區(qū)別
我們通過以下三幅圖來了解關(guān)于這三個方法的區(qū)別。



3.6 上游Observable與下游Observer的關(guān)聯(lián)
我們現(xiàn)在已經(jīng)創(chuàng)建好了上游和下游,該如何關(guān)聯(lián)起來呢,很簡單,我們可以用subscribe()方法進行關(guān)聯(lián):
//建立連接
observable.subscribe(observer);
關(guān)于observable()方法,它其實有很多的重載方法,具體如下:

下面我簡單的對說明一下這些重載方法:
- 不帶任何參數(shù)的 subscribe() 表示下游不關(guān)心任何事件,你上游盡管發(fā)你的數(shù)據(jù)去吧, 老子可不管你發(fā)什么.
- 帶有一個Consumer參數(shù)的方法表示下游只關(guān)心onNext事件, 其他的事件我假裝沒看見, 因此我們?nèi)绻恍枰猳nNext事件可以這么寫:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
emitter.onNext(4);
}
}).subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
Log.d(TAG, "onNext: " + integer);
}
});
打印結(jié)果如下:

3.7 關(guān)于Observable的其他創(chuàng)建方法
除了我們使用Observable.create()方法創(chuàng)建Observable之外還有:
Observable.just():
注:just()方法里存放的其實就是onNext()發(fā)射的內(nèi)容,其存放的個數(shù)最多為10個,超過10個則會報錯,這是因為just()方法重載了10次。
Observable.fromArray():
注:fromArray()方法里存放的同樣也是onNext()發(fā)射的內(nèi)容,與just()方法區(qū)別的是fromArray()里面的參數(shù)是可變數(shù)組,存放的個數(shù)沒有限制。
Observable.fromIterable():
注: fromIterable()方法里傳入的參數(shù)為一個集合,集合中每個條目對應(yīng)的就是一個onNext()發(fā)射的內(nèi)容。
下面一一舉例說明:
//Observable.just()創(chuàng)建方法
Observable.just(1, 2, 3, 4).subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
Log.d(TAG, "onNext: " + integer);
}
});
//Observable.fromArray()創(chuàng)建方法
Observable.fromArray(1, 2, 3, 4).subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
Log.d(TAG, "onNext: " + integer);
}
});
//Observable.fromIterable()創(chuàng)建方法
List<Integer> list = new ArrayList<>();
list.add(1);
list.add(2);
list.add(3);
list.add(4);
Observable.fromIterable(list).subscribe(new Consumer<Integer>() {
@Override public void accept(Integer integer) throws Exception {
Log.d(TAG, "onNext: " + integer);
}
});
3.8 RxJava的線程控制
正常情況下, 上游和下游是工作在同一個線程中的, 也就是說上游在哪個線程發(fā)事件, 下游就在哪個線程接收事件.,例如我們在Activity的按鈕點擊事件中做如下操作:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
Log.d(TAG, "Observable所在的線程:" + Thread.currentThread().getName());
e.onNext(1);
}
}).subscribe(new Consumer<Integer>() {
@Override public void accept(@NonNull Integer integer) throws Exception {
Log.d(TAG, "Observer所在的線程:" + Thread.currentThread().getName());
Log.d(TAG, "onNext: " + integer);
}
});

上面這樣肯定是滿足不了我們的需求的, 我們更多想要的是這么一種情況, 在子線程中做耗時的操作, 然后回到主線程中來操作UI,該如何做?
案例分析說明RxJava中的線程切換:
需求:從網(wǎng)上獲取云南電信所有頻道的信息,將頻道中的頻道名顯示到UI界面上。
Observable.create(new ObservableOnSubscribe<Channel>() {
@Override public void subscribe(@NonNull ObservableEmitter<Channel> e) throws Exception {
//網(wǎng)絡(luò)請求頻道的數(shù)據(jù)
Channel channel = testApi.getChannel().execute().body();
e.onNext(channel);
}
}).subscribe(new Consumer<Channel>() {
@Override public void accept(@NonNull Channel channel) throws Exception {
List<Channel.DataBean> data = channel.getData();
for (Channel.DataBean dataBean : data) {
tv.append(dataBean.getChannelName() + "\n");
}
}
});
上面的方法在按鈕點擊事件里面執(zhí)行的,即在主線程執(zhí)行的;然而我們的Observable方法執(zhí)行的是網(wǎng)絡(luò)請求,一個耗時操作,應(yīng)該在子線程執(zhí)行才對,當(dāng)我們執(zhí)行上面代碼會出現(xiàn)以下錯誤:

該如何將Observable切換到子線程呢?
RxJava中有一個subscribeOn()方法,可以指定Observable的線程,因此,我們將之前的代碼修改如下:
Observable.create(new ObservableOnSubscribe<Channel>() {
@Override public void subscribe(@NonNull ObservableEmitter<Channel> e) throws Exception {
//網(wǎng)絡(luò)獲取頻道數(shù)據(jù)
Channel channel = testApi.getChannel().execute().body();
e.onNext(channel);
}
})
.subscribeOn(Schedulers.io()) //切換上游線程操作
.subscribe(new Consumer<Channel>() {
@Override public void accept(@NonNull Channel channel) throws Exception {
List<Channel.DataBean> data = channel.getData();
for (Channel.DataBean dataBean : data) {
tv.append(dataBean.getChannelName() + "\n");
}
}
});
繼續(xù)運行,發(fā)現(xiàn)還是報錯了,什么原因?那是因為我們開始的時候?qū)bservable切換到了子線程,根據(jù)之前說的,默認Observer里面執(zhí)行的線程跟Observable線程是一致的,而這里Observer涉及到更新UI操作,應(yīng)該在主線程執(zhí)行才對。

該如何將Observer切換到子線程呢?
RxJava中有一個subscribeOn()方法,可以指定Observer的線程,因此,我們將之前的代碼修改如下:
Observable.create(new ObservableOnSubscribe<Channel>() {
@Override public void subscribe(@NonNull ObservableEmitter<Channel> e) throws Exception {
//網(wǎng)絡(luò)獲取頻道數(shù)據(jù)
Channel channel = testApi.getChannel().execute().body();
e.onNext(channel);
}
})
.subscribeOn(Schedulers.io()) //切換上游線程操作到io線程
.observeOn(AndroidSchedulers.mainThread()) //切換下游線程到主線程
.subscribe(new Consumer<Channel>() {
@Override public void accept(@NonNull Channel channel) throws Exception {
List<Channel.DataBean> data = channel.getData();
for (Channel.DataBean dataBean : data) {
tv.append(dataBean.getChannelName() + "\n");
}
}
});
再來開一下運行結(jié)果:

以上表明我們的線程切換達到了我們需要的目的。
在RxJava中已經(jīng)內(nèi)置了很多線程選項供我們選擇, 例如有:
Schedulers.io() : 代表io操作的線程, 通常用于網(wǎng)絡(luò),讀寫文件等io密集型的操作
Schedulers.computation(): 代表CPU計算密集型的操作, 例如需要大量計算的操作
Schedulers.newThread(): 代表一個常規(guī)的新線程
AndroidSchedulers.mainThread(): 代表Android的主線程
這些內(nèi)置的Scheduler已經(jīng)足夠滿足我們開發(fā)的需求, 因此我們應(yīng)該使用內(nèi)置的這些選項, 在RxJava內(nèi)部使用的是線程池來維護這些線程, 所有效率也比較高.
四、總結(jié)
關(guān)于RxJava的簡單使用就介紹到這里,后期有空我會向大家介紹一下RxJava2的其他實用的方法。