一、ReactiveX簡介
在學習RxJava前首先需要了解ReactiveX,因為RxJava是ReactiveX的一種Java的實現(xiàn)形式。
- ReactiveX的官網(wǎng)地址為:
http://reactivex.io/
ReactiveX官網(wǎng)對于自身的介紹是:
An API for asynchronous programming with observable streams
實質(zhì)上我們可以對其解讀為三部分:
| ReactiveX的解讀 |
|---|
| ①An API: 首先它是個編程接口,不同語言提供不同實現(xiàn)。例如Java中的RxJava。 |
| ②For asynchronous programming: 在異步編程設計中使用。例如開啟子線程處理耗時的網(wǎng)絡請求。 |
| ③With observable streams: 基于可觀察的事件流。例如觀察者模式中觀察者對被觀察者的監(jiān)聽。 |
而ReactiveX結(jié)合了如下三部分內(nèi)容:
- 觀察者模式,即定義對象間一種一對多的依賴關系,當一個對象改變狀態(tài)時,則所有依賴它的對象都會被改變。
- Iterator模式,即迭代流式編程模式。
- 函數(shù)式編程模式,即提供一系列函數(shù)樣式的方法供快速開發(fā)。
Reactive的模式圖如下:

二、RxJava的使用
1、RxJava的優(yōu)勢
在Android的SDK中,給開發(fā)者提供的用于異步操作的原生內(nèi)容有AsyncTask和Handler。對于簡單的異步請求來說,使用Android原生的AsyncTask和Handler即可滿足需求,但是對于復雜的業(yè)務邏輯而言,依然使用AsyncTask和Handler會導致代碼結(jié)構(gòu)混亂,代碼的可讀性非常差。
但是RxJava的異步操作是基于觀察者模式實現(xiàn)的,在越來越復雜的業(yè)務邏輯中,RxJava依舊可以保持簡潔
2、RxJava的配置
首先,在Android Studio中配置Module的build.gradle,在這里我們使用的版本是1.2版本,并且導入RxAndroid,輔助RxJava完成線程調(diào)度:
implementation "io.reactivex:rxjava:1.2.0"
implementation "io.reactivex:rxandroid:1.2.0"
然后,RxJava基于觀察者設計模式,其中的關鍵性三個步驟如下:
(1)Observable被觀察者
Observable被觀察者創(chuàng)建的代碼如下:
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Alex");
subscriber.onCompleted();
}
});
在這里,要強調(diào)的是Observable被觀察者是類類型,其中有諸多方法,我們關注其構(gòu)造函數(shù)與創(chuàng)建Observable對象的方法,查看如下圖對應的視圖結(jié)構(gòu):
查看源碼:
protected Observable(OnSubscribe<T> f) {
this.onSubscribe = f;
}
public interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {
}
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
public static <S, T> Observable<T> create(SyncOnSubscribe<S, T> syncOnSubscribe) {
return create((OnSubscribe<T>)syncOnSubscribe);
}
public static <S, T> Observable<T> create(AsyncOnSubscribe<S, T> asyncOnSubscribe) {
return create((OnSubscribe<T>)asyncOnSubscribe);
}
通過源碼分析,可知Observable提供了create()方法來獲取Observable實例對象。
此外,除了基本的創(chuàng)建的方法,Observable還提供了便捷的創(chuàng)建Observable序列的兩種方式,代碼如下:
- 第一種,會將參數(shù)逐個發(fā)送
Observable<String> observable1 = Observable.just("Alex","Payne");
- 第二種,會將數(shù)組元素逐個轉(zhuǎn)換完畢后逐個發(fā)送
String[] observableArr = {"Alex", "Payne"};
Observable<String> observable2 = Observable.from(observableArr);
其中Observable.just()方法會調(diào)用from()方法,詳情可查看源碼。
(2)Observer觀察者
Observer觀察者創(chuàng)建的代碼如下:
Observer<String> observer = new Observer<String>() {
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError,Error Info is:" + e.getMessage());
}
@Override
public void onNext(String s) {
Log.e(TAG, s);
}
};
Observer是接口,其中包含的方法有onCompleted()、onNext()、onError()。查看如下圖所示Observer的視圖結(jié)構(gòu):

那么在RxJava中,Observer有其接口實現(xiàn)類對象Subscriber,它們的使用onNext、onCompleted、onError方法是一樣的,但是Subscriber對于Observer接口進行了拓展,在 RxJava 的 subscribe 過程中,Observer 也總是會先被轉(zhuǎn)換成一個 Subscriber 再使用,代碼如下:
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onStart() {
Log.e(TAG, "onStart");
}
@Override
public void onCompleted() {
Log.e(TAG, "onCompleted");
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError,Error Info is:" + e.getMessage());
}
@Override
public void onNext(String s) {
Log.e(TAG, s);
}
};
其中,onStart()方法會在事件未發(fā)送前被調(diào)用,可以用于訂閱關系建立前的準備工作,例如將數(shù)據(jù)清空或者重置,在Subscriber中默認是空實現(xiàn),我們可以在該方法中調(diào)用自己的業(yè)務邏輯代碼。在如下的視圖結(jié)構(gòu)中我們可以看到Subscriber的拓展內(nèi)容,重點是add()、unsubscribe()方法以及名為subscription的Subscription隊列。

(3)Subscribe訂閱關系
Observable與observer形成訂閱關系代碼如下:
observable.subscribe(observer);
//或者
observable.subscribe(subscriber);
那么我們以observable.subscribe(observer)為例在這里繼續(xù)查看源碼,查看subscribe()方法到底做了什么:

Observer轉(zhuǎn)換為Subscriber對象在這里得到印證。
- 在之后的內(nèi)容中統(tǒng)一以Subscriber作為訂閱觀察者對象
繼續(xù)深入,我們可以看到訂閱關系中的關鍵步驟(僅核心代碼):
subscriber.onStart();
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
在這里RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber)等價于OnSubscribe.call(subscriber),見下圖2.3.2:


- 可以看到,subscriber() 做了3件事:
- 調(diào)用 Subscriber.onStart() 。該方法用于在訂閱關系建立之前的準備。
- 調(diào)用 Observable 中的 OnSubscribe.call(Subscriber) 。OnSubscribe是Observable的內(nèi)部接口,而事件發(fā)送的邏輯在這里開始運行。從這也可以看出,在 RxJava 中當 subscribe() 方法執(zhí)行的時候訂閱關系確立,Observable 開始發(fā)送事件。
- 將傳入的 Subscriber 作為 Subscription 返回。這是為了方便后續(xù)調(diào)用unsubscribe()。
三、RxJava的不完整回調(diào)
1、不完整回調(diào)的代碼示例
Observable<String> observable = Observable.just("Alex", "Payne");
Action1<String> onNextAction = new Action1<String>() {
@Override
public void call(String s) {
Log.e(TAG, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
Log.e(TAG, "onError,Error Info is:" + throwable.getMessage());
}
};
Action0 onCompletedAction = new Action0() {
@Override
public void call() {
Log.e(TAG, "onCompleted");
}
};
// 根據(jù)onNextAction 來定義 onNext()
observable.subscribe(onNextAction);
// 根據(jù)onNextAction 來定義 onNext()、根據(jù)onErrorAction 來定義 onError()
observable.subscribe(onNextAction, onErrorAction);
// 根據(jù)onNextAction 來定義 onNext()、根據(jù)onErrorAction 來定義 onError()、onCompletedAction 來定義 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
2、不完整回調(diào)的原理分析
在這里我們可以看到:
| Action0無參數(shù)泛型無返回值類型,而Subscriber中的onCompleted()方法也沒有參數(shù)泛型 |
|---|
| Action1有1個參數(shù)泛型無返回值類型 ,onNextAction設置的參數(shù)泛型為String,而Subscriber中的onNext()方法參數(shù)泛型也是String(和本文中觀察者對象中的OnNext方法對比) |
| Action1有1個參數(shù)泛型無返回值類型,onErrorAction設置的參數(shù)泛型為Throwable,而Subscriber中的onError()方法參數(shù)泛型也是Throwable |
那么,我們來查看observable.subscribe(onNextAction)的源碼,在這里, Action1可以被當成一個包裝對象,將onNext()方法進行包裝作為不完整的回調(diào)傳入到observable.subscribe()中。

我們來看看Action1有何玄機,Action1的源碼如下圖所示:


四、RxJava的線程切換
1、Scheduler線程調(diào)度器
如果不指定線程,默認是在調(diào)用subscribe方法的線程上進行回調(diào),那么如果子線程中調(diào)用subscibe方法,而想在主線程中進行UI更新,則會拋出異常。當然了RxJava已經(jīng)幫我們考慮到了這一點,所以提供了Scheduler線程調(diào)度器幫助我們進行線程之間的切換。
實質(zhì)上,Scheduler線程調(diào)度器和RxJava的操作符有緊密關聯(lián),我將在下一篇文章中進行詳細介紹。
- RxJava內(nèi)置了如下所示幾個的線程調(diào)度器:
- Schedulers.immediate():在當前線程中執(zhí)行
- Schedulers.newThread():啟動新線程,在新線程中進行操作
- Schedulers.io(): I/O 操作(讀寫文件、讀寫數(shù)據(jù)庫、網(wǎng)絡信息交互等)所使用的 Scheduler。行為模式和 newThread() 差不多,區(qū)別在于 io() 的內(nèi)部實現(xiàn)是是用一個無數(shù)量上限的線程池,可以重用空閑的線程,因此多數(shù)情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中,可以避免創(chuàng)建不必要的線程。
- Schedulers.computation():計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數(shù)。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。
- Schedulers.trampoline():會將任務按添加順序依次放入當前線程中等待執(zhí)行。線程一次只執(zhí)行一個任務,其余任務排隊等待,一個任務都執(zhí)行完成后再開始下一個任務的執(zhí)行。
- 此外RxJava還提供了用于測試的調(diào)度器Schedulers.test() 及 可自定義Scheduler—-Schedulers.form() 。
RxAndroid并且其為我們提供了AndroidSchedulers.mainThread()進行主線程的回調(diào)
2、線程控制
調(diào)用Observable對象的subscribeOn()、observeOn()方法即可完成線程控制。
- subscribeOn(): 指定 subscribe() 所發(fā)生的線程,即 Observable.OnSubscribe 被激活時所處的線程。或者叫做事件產(chǎn)生的線程。
- observeOn(): 指定 Subscriber 所運行在的線程?;蛘呓凶鍪录M的線程。
Observable.just("Alex", "Payne")
.subscribeOn(Schedulers.io())//指定 subscribe() 所發(fā)生的線程
.unsubscribeOn(Schedulers.io())//事件發(fā)送完畢后,及時取消發(fā)送
.observeOn(AndroidSchedulers.mainThread())//指定 Subscriber 所運行在的線程
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
Log.e(TAG, s);
}
});
五、總結(jié)
本文主要介紹了RxJava的由來、使用步驟、部分內(nèi)容的原理解析。在下篇文章中我會詳細介紹RxJava的操作符。希望本文對你在學習RxJava的路上有所啟發(fā)。