前言
終究沒有經(jīng)受住RxJava的誘惑,只恨自己來的比較晚,走起~
RxJava 是什么?
一個在 Java VM 上使用可觀測的序列來組成異步的、基于事件的程序的庫~
Rx Java 有什么優(yōu)勢?##
- 邏輯簡潔
- 異步
RxJava 能做什么?##
- EventBus事件中心
- 與Retrofit結(jié)合進行網(wǎng)絡(luò)處理
- RxBinding
....
初始RxJava
我們先來看一下這坨代碼:
new Thread() {
@Override
public void run() {
super.run();
for (File folder : folders) {
File[] files = folder.listFiles();
for (File file : files) {
if (file.getName().endsWith(".png")) {
final Bitmap bitmap = getBitmapFromFile(file);
getActivity().runOnUiThread(new Runnable() {
@Override
public void run() {
imageCollectorView.addImage(bitmap);
}
});
}
}
}
}}.start();
上面就是一個圖片的遍歷、過濾、加載過程,可是有強迫癥的程序員都會很難過,因為又看到了影響心情的代碼。
用RxJava如何實現(xiàn)呢?
Observable.from(folders)
.flatMap(new Func1<File, Observable<File>>() {
@Override
public Observable<File> call(File file) {
return Observable.from(file.listFiles());
}
})
.filter(new Func1<File, Boolean>() {
@Override
public Boolean call(File file) {
return file.getName().endsWith(".png");
}
})
.map(new Func1<File, Bitmap>() {
@Override
public Bitmap call(File file) {
return getBitmapFromFile(file);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) {
imageCollectorView.addImage(bitmap);
}
});
吾皇勿怒,臣妾知道代碼并沒有減少,可是臣妾說的是邏輯簡單,鏈式結(jié)構(gòu)非常清晰不是嗎?
相信您在頓悟了5s中之后也看到了其好處~
使用介紹與原理分析
基本概念##
1、擴展的觀察者模式
RxJava 的異步實現(xiàn),是通過一種擴展的觀察者模式來實現(xiàn)的,A 對象(觀察者)對 B 對象(被觀察者)的某種變化高度敏感,需要在 B 變化的一瞬間做出反應(yīng)。
2、RxJava的四個基本概念:
- Observable (可觀察者,即被觀察者)
- Observer (觀察者)
- subscribe (訂閱)
- 事件 : onNext()、onCompleted() 和 onError()
事件
onNext(): 普通事件,每個事件執(zhí)行之后的event。
onCompleted(): 事件隊列完結(jié)。RxJava 不僅把每個事件單獨處理,還會把它們看做一個隊列。RxJava 規(guī)定,當不會再有新的onNext() 發(fā)出時,需要觸發(fā) onCompleted() 方法作為標志。
onError(): 事件隊列異常。在事件處理過程中出異常時,onError() 會被觸發(fā),同時隊列自動終止,不允許再有事件發(fā)出。
注意 :
在一個正確運行的事件序列中, onCompleted() 和 onError() 有且只有一個,并且是事件序列中的最后一個。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在隊列中調(diào)用了其中一個,就不應(yīng)該再調(diào)用另一個。

基本實現(xiàn)##
基于以上的概念, RxJava 的基本實現(xiàn)主要有三點:
- 創(chuàng)建 Observer
Observer 即觀察者,它決定事件觸發(fā)的時候?qū)⒂性鯓拥男袨椤?RxJava 中的 Observer 接口的實現(xiàn)方式:
Observer<String> observer = new Observer<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}};
除了 Observer 接口之外,RxJava 還內(nèi)置了一個實現(xiàn)了 Observer 的抽象類:Subscriber。 Subscriber 對 Observer 接口進行了一些擴展,但他們的基本使用方式是完全一樣的:
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String s) {
Log.d(tag, "Item: " + s);
}
@Override
public void onCompleted() {
Log.d(tag, "Completed!");
}
@Override
public void onError(Throwable e) {
Log.d(tag, "Error!");
}};
不僅基本使用方式一樣,實質(zhì)上,在 RxJava 的 subscribe 過程中,Observer 也總是會先被轉(zhuǎn)換成一個 Subscriber 再使用。所以如果你只想使用基本功能,選擇 Observer 和 Subscriber 是完全一樣的。它們的區(qū)別對于使用者來說主要有兩點:
onStart(): 這是 Subscriber 增加的方法。它會在 subscribe 剛開始,而事件還未發(fā)送之前被調(diào)用,可以用于做一些準備工作,例如數(shù)據(jù)的清零或重置。這是一個可選方法,默認情況下它的實現(xiàn)為空。需要注意的是,如果對準備工作的線程有要求(例如彈出一個顯示進度的對話框,這必須在主線程執(zhí)行), onStart() 就不適用了,因為它總是在 subscribe 所發(fā)生的線程被調(diào)用,而不能指定線程。要在指定的線程來做準備工作,可以使用 doOnSubscribe() 方法,具體可以在后面的文中看到。
unsubscribe(): 這是 Subscriber 所實現(xiàn)的另一個接口 Subscription 的方法,用于取消訂閱。在這個方法被調(diào)用后,Subscriber 將不再接收事件。一般在這個方法調(diào)用前,可以使用 isUnsubscribed() 先判斷一下狀態(tài)。 unsubscribe() 這個方法很重要,因為在 subscribe() 之后, Observable 會持有 Subscriber 的引用,這個引用如果不能及時被釋放,將有內(nèi)存泄露的風險。所以最好保持一個原則:要在不再使用的時候盡快在合適的地方(例如 onPause() onStop() 等方法中)調(diào)用unsubscribe() 來解除引用關(guān)系,以避免內(nèi)存泄露的發(fā)生。
- 創(chuàng)建 Observable
Observable 即被觀察者,它決定什么時候觸發(fā)事件以及觸發(fā)怎樣的事件。 RxJava 使用 create() 方法來創(chuàng)建一個 Observable ,并為它定義事件觸發(fā)規(guī)則:
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
subscriber.onNext("Hello");
subscriber.onNext("Hi");
subscriber.onNext("Aloha");
subscriber.onCompleted();
}});
可以看到,這里傳入了一個 OnSubscribe 對象作為參數(shù)。OnSubscribe 會被存儲在返回的 Observable 對象中,它的作用相當于一個計劃表,當 Observable 被訂閱的時候,OnSubscribe 的 call() 方法會自動被調(diào)用,事件序列就會依照設(shè)定依次觸發(fā)(對于上面的代碼,就是觀察者Subscriber 將會被調(diào)用三次 onNext() 和一次 onCompleted())。這樣,由被觀察者調(diào)用了觀察者的回調(diào)方法,就實現(xiàn)了由被觀察者向觀察者的事件傳遞,即觀察者模式。
這個例子很簡單:事件的內(nèi)容是字符串,而不是一些復(fù)雜的對象;事件的內(nèi)容是已經(jīng)定好了的,而不像有的觀察者模式一樣是待確定的(例如網(wǎng)絡(luò)請求的結(jié)果在請求返回之前是未知的);所有事件在一瞬間被全部發(fā)送出去,而不是夾雜一些確定或不確定的時間間隔或者經(jīng)過某種觸發(fā)器來觸發(fā)的??傊?,這個例子看起來毫無實用價值。但這是為了便于說明,實質(zhì)上只要你想,各種各樣的事件發(fā)送規(guī)則你都可以自己來寫。至于具體怎么做,后面都會講到,但現(xiàn)在不行。只有把基礎(chǔ)原理先說明白了,上層的運用才能更容易說清楚。
create() 方法是 RxJava 最基本的創(chuàng)造事件序列的方法?;谶@個方法, RxJava 還提供了一些方法用來快捷創(chuàng)建事件隊列,例如:
just(T...): 將傳入的參數(shù)依次發(fā)送出來。
Observable observable = Observable.just("Hello", "Hi", "Aloha");
// 將會依次調(diào)用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
from(T[]) / from(Iterable<? extends T>) : 將傳入的數(shù)組或 Iterable 拆分成具體對象后,依次發(fā)送出來。
String[] words = {"Hello", "Hi", "Aloha"};
Observable observable = Observable.from(words);
// 將會依次調(diào)用:
// onNext("Hello");
// onNext("Hi");
// onNext("Aloha");
// onCompleted();
上面 just(T...) 的例子和 from(T[]) 的例子,都和之前的 create(OnSubscribe) 的例子是等價的。
- Subscribe (訂閱)
創(chuàng)建了 Observable 和 Observer 之后,再用 subscribe() 方法將它們聯(lián)結(jié)起來,整條鏈子就可以工作了。代碼形式很簡單:
observable.subscribe(observer);
// 或者:
observable.subscribe(subscriber);
有人可能會注意到, subscribe() 這個方法有點怪:它看起來是『observalbe 訂閱了 observer / subscriber』而不是『observer / subscriber 訂閱了 observalbe』,這看起來就像『雜志訂閱了讀者』一樣顛倒了對象關(guān)系。這讓人讀起來有點別扭,不過如果把 API 設(shè)計成 observer.subscribe(observable) / subscriber.subscribe(observable) ,雖然更加符合思維邏輯,但對流式 API 的設(shè)計就造成影響了,比較起來明顯是得不償失的。
Observable.subscribe(Subscriber) 的內(nèi)部實現(xiàn)是這樣的(僅核心代碼):
// 注意:這不是 subscribe() 的源碼,而是將源碼中與性能、兼容性、擴展性有關(guān)的代碼剔除后的核心代碼。
// 如果需要看源碼,可以去 RxJava 的 GitHub 倉庫下載。
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;
}
從代碼可以看出,訂閱事件主要做如下幾件事:
- onstart 準備操作
- call 業(yè)務(wù)邏輯處理,而且業(yè)務(wù)邏輯處理并不是Observable創(chuàng)建時運行的,call()方法執(zhí)行的時候開始運行的~
- 將傳入的 Subscriber 作為 Subscription 返回。這是為了方便 unsubscribe().
整個過程中對象間的關(guān)系如下圖:

除了 subscribe(Observer) 和 subscribe(Subscriber) ,subscribe() 還支持不完整定義的回調(diào),RxJava 會自動根據(jù)定義創(chuàng)建出Subscriber 。形式如下:
Action1<String> onNextAction = new Action1<String>() {
// onNext()
@Override
public void call(String s) {
Log.d(tag, s);
}
};
Action1<Throwable> onErrorAction = new Action1<Throwable>() {
// onError()
@Override
public void call(Throwable throwable) {
// Error handling
}
};
Action0 onCompletedAction = new Action0() {
// onCompleted()
@Override
public void call() {
Log.d(tag, "completed");
}
};
// 自動創(chuàng)建 Subscriber ,并使用 onNextAction 來定義 onNext()
observable.subscribe(onNextAction);
// 自動創(chuàng)建 Subscriber ,并使用 onNextAction 和 onErrorAction 來定義 onNext() 和 onError()
observable.subscribe(onNextAction, onErrorAction);
// 自動創(chuàng)建 Subscriber ,并使用 onNextAction、 onErrorAction 和 onCompletedAction 來定義 onNext()、 onError() 和 onCompleted()
observable.subscribe(onNextAction, onErrorAction, onCompletedAction);
簡單解釋一下這段代碼中出現(xiàn)的 Action1 和 Action0。 Action0 是 RxJava 的一個接口,它只有一個方法 call(),這個方法是無參無返回值的;由于 onCompleted() 方法也是無參無返回值的,因此 Action0 可以被當成一個包裝對象,將 onCompleted() 的內(nèi)容打包起來將自己作為一個參數(shù)傳入 subscribe() 以實現(xiàn)不完整定義的回調(diào)。這樣其實也可以看做將 onCompleted() 方法作為參數(shù)傳進了subscribe(),相當于其他某些語言中的『閉包』。 Action1 也是一個接口,它同樣只有一個方法 call(T param),這個方法也無返回值,但有一個參數(shù);與 Action0 同理,由于 onNext(T obj) 和 onError(Throwable error) 也是單參數(shù)無返回值的,因此 Action1可以將 onNext(obj) 和 onError(error) 打包起來傳入 subscribe() 以實現(xiàn)不完整定義的回調(diào)。事實上,雖然 Action0 和 Action1在 API 中使用最廣泛,但 RxJava 是提供了多個 ActionX 形式的接口 (例如 Action2, Action3) 的,它們可以被用以包裝不同的無返回值的方法。
注:正如前面所提到的,Observer 和 Subscriber 具有相同的角色,而且 Observer 在 subscribe() 過程中最終會被轉(zhuǎn)換成 Subscriber 對象,因此,從這里開始,后面的描述我將用 Subscriber 來代替 Observer ,這樣更加嚴謹。
- 場景示例
下面舉兩個例子:
為了把原理用更清晰的方式表述出來,本文中挑選的都是功能盡可能簡單的例子,以至于有些示例代碼看起來會有『畫蛇添足』『明明不用 RxJava 可以更簡便地解決問題』的感覺。當你看到這種情況,不要覺得是因為 RxJava 太啰嗦,而是因為在過早的時候舉出真實場景的例子并不利于原理的解析,因此我刻意挑選了簡單的情景。
a. 打印字符串數(shù)組
將字符串數(shù)組 names 中的所有字符串依次打印出來:
String[] names = ...;
Observable.from(names)
.subscribe(new Action1<String>() {
@Override
public void call(String name) {
Log.d(tag, name);
}
});
b. 由 id 取得圖片并顯示
由指定的一個 drawable 文件 id drawableRes 取得圖片,并顯示在 ImageView 中,并在出現(xiàn)異常的時候打印 Toast 報錯:
int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getTheme().getDrawable(drawableRes));
subscriber.onNext(drawable);
subscriber.onCompleted();
}
}).subscribe(new Observer<Drawable>() {
@Override
public void onNext(Drawable drawable) {
imageView.setImageDrawable(drawable);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
}
});
正如上面兩個例子這樣,創(chuàng)建出 Observable 和 Subscriber ,再用 subscribe() 將它們串起來,一次 RxJava 的基本使用就完成了。非常簡單。
RxJava 的默認規(guī)則中,事件的發(fā)出和消費都是在同一個線程的。也就是說,如果只用上面的方法,實現(xiàn)出來的只是一個同步的觀察者模式。觀察者模式本身的目的就是『后臺處理,前臺回調(diào)』的異步機制,因此異步對于 RxJava 是至關(guān)重要的。而要實現(xiàn)異步,則需要用到 RxJava 的另一個概念: Scheduler 。
線程控制 —— Scheduler基礎(chǔ)##
在不指定線程的情況下, RxJava 遵循的是線程不變的原則,即:在哪個線程調(diào)用 subscribe(),就在哪個線程生產(chǎn)事件;在哪個線程生產(chǎn)事件,就在哪個線程消費事件。如果需要切換線程,就需要用到 Scheduler (調(diào)度器)。
- Scheduler 基礎(chǔ)的 API
在RxJava 中,Scheduler ——調(diào)度器,相當于線程控制器,RxJava 通過它來指定每一段代碼應(yīng)該運行在什么樣的線程。RxJava 已經(jīng)內(nèi)置了幾個 Scheduler ,它們已經(jīng)適合大多數(shù)的使用場景:
Schedulers.immediate(): 直接在當前線程運行,相當于不指定線程。這是默認的 Scheduler。
Schedulers.newThread(): 總是啟用新線程,并在新線程執(zhí)行操作。
Schedulers.io(): I/O 操作(讀寫文件、讀寫數(shù)據(jù)庫、網(wǎng)絡(luò)信息交互等)所使用的 Scheduler。行為模式和 newThread() 差不多,區(qū)別在于 io() 的內(nèi)部實現(xiàn)是是用一個無數(shù)量上限的線程池,可以重用空閑的線程,因此多數(shù)情況下 io() 比 newThread() 更有效率。不要把計算工作放在 io() 中,可以避免創(chuàng)建不必要的線程。
-
Schedulers.computation(): 計算所使用的 Scheduler。這個計算指的是 CPU 密集型計算,即不會被 I/O 等操作限制性能的操作,例如圖形的計算。這個 Scheduler 使用的固定的線程池,大小為 CPU 核數(shù)。不要把 I/O 操作放在 computation() 中,否則 I/O 操作的等待時間會浪費 CPU。
另外, Android 還有一個專用的 AndroidSchedulers.mainThread(),它指定的操作將在 Android 主線程運行。
有了這幾個 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 兩個方法來對線程進行控制了。
- subscribeOn(): 指定 subscribe() 所發(fā)生的線程,即 Observable.OnSubscribe 被激活時所處的線程?;蛘呓凶鍪录a(chǎn)生的線程。
- observeOn(): 指定 Subscriber 所運行在的線程。或者叫做事件消費的線程。
文字敘述總歸難理解,上代碼:
Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 發(fā)生在 IO 線程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調(diào)發(fā)生在主線程
.subscribe(new Action1<Integer>() {
@Override
public void call(Integer number) {
Log.d(tag, "number:" + number);
}
});
上面這段代碼中,由于 subscribeOn(Schedulers.io()) 的指定,被創(chuàng)建的事件的內(nèi)容 1、2、3、4 將會在IO線程發(fā)出; 而由于observeOn(AndroidScheculers.mainThread()) 的指定,因此 subscriber 數(shù)字的打印將發(fā)生在主線程 。事實上,這種在subscribe() 之前寫上兩句 subscribeOn(Scheduler.io()) 和 observeOn(AndroidSchedulers.mainThread()) 的使用方式非常常見,它適用于多數(shù)的 『后臺線程取數(shù)據(jù),主線程顯示』的程序策略。
而前面提到的由圖片 id 取得圖片并顯示的例子,如果也加上這兩句:
int drawableRes = ...;
ImageView imageView = ...;
Observable.create(new OnSubscribe<Drawable>() {
@Override
public void call(Subscriber<? super Drawable> subscriber) {
Drawable drawable = getTheme().getDrawable(drawableRes));
subscriber.onNext(drawable);
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io()) // 指定 subscribe() 發(fā)生在 IO 線程
.observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回調(diào)發(fā)生在主線程
.subscribe(new Observer<Drawable>() {
@Override
public void onNext(Drawable drawable) {
imageView.setImageDrawable(drawable);
}
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
Toast.makeText(activity, "Error!", Toast.LENGTH_SHORT).show();
}
});
那么,加載圖片將會發(fā)生在 IO 線程,而設(shè)置圖片則被設(shè)定在了主線程。這就意味著,即使加載圖片耗費了幾十甚至幾百毫秒的時間,也不會造成絲毫界面的卡頓。
- Scheduler 的原理基礎(chǔ)
RxJava 的 Scheduler API 很方便,也很神奇(加了一句話就把線程切換了,怎么做到的?而且 subscribe() 不是最外層直接調(diào)用的方法嗎,它竟然也能被指定線程?)。然而 Scheduler 的原理需要放在后面講,因為它的原理是以下一節(jié)《變換》的原理作為基礎(chǔ)的。
好吧這一節(jié)其實我屁也沒說,只是為了讓你安心,讓你知道我不是忘了講原理,而是把它放在了更合適的地方。
變換##
RxJava 提供了對事件序列進行變換的支持,這是它的核心功能之一,也是大多數(shù)人說『RxJava 真是太好用了』的最大原因。所謂變換,就是將事件序列中的對象或整個序列進行加工處理,轉(zhuǎn)換成不同的事件或事件序列。概念說著總是模糊難懂的,來看 API。
- API
首先看一個 map() 的例子:
Observable.just("images/logo.png") // 輸入類型 String
.map(new Func1<String, Bitmap>() {
@Override
public Bitmap call(String filePath) { // 參數(shù)類型 String
return getBitmapFromPath(filePath); // 返回類型 Bitmap
}
})
.subscribe(new Action1<Bitmap>() {
@Override
public void call(Bitmap bitmap) { // 參數(shù)類型 Bitmap
showBitmap(bitmap);
}
});
這里出現(xiàn)了一個叫做 Func1 的類。它和 Action1 非常相似,也是 RxJava 的一個接口,用于包裝含有一個參數(shù)的方法。 Func1 和Action 的區(qū)別在于, Func1 包裝的是有返回值的方法。另外,和 ActionX 一樣, FuncX 也有多個,用于不同參數(shù)個數(shù)的方法。FuncX和 ActionX 的區(qū)別在 FuncX 包裝的是有返回值的方法。
可以看到,map() 方法將參數(shù)中的 String 對象轉(zhuǎn)換成一個 Bitmap 對象后返回,而在經(jīng)過 map() 方法后,事件的參數(shù)類型也由 String轉(zhuǎn)為了 Bitmap。這種直接變換對象并返回的,是最常見的也最容易理解的變換。不過 RxJava 的變換遠不止這樣,它不僅可以針對事件對象,還可以針對整個事件隊列,這使得 RxJava 變得非常靈活。我列舉幾個常用的變換:
map(): 事件對象的直接變換,具體功能上面已經(jīng)介紹過。它是 RxJava 最常用的變換。
map() 示意圖:

flatMap(): 這是一個很有用但非常難理解的變換,因此我決定花多些篇幅來介紹它。 首先假設(shè)這么一種需求:假設(shè)有一個數(shù)據(jù)結(jié)構(gòu)『學生』,現(xiàn)在需要打印出一組學生的名字。實現(xiàn)方式很簡單:
Student[] students = ...;
Subscriber<String> subscriber = new Subscriber<String>() {
@Override
public void onNext(String name) {
Log.d(tag, name);
}
...
};
Observable.from(students)
.map(new Func1<Student, String>() {
@Override
public String call(Student student) {
return student.getName();
}
})
.subscribe(subscriber);
很簡單。那么再假設(shè):如果要打印出每個學生所需要修的所有課程的名稱呢?(需求的區(qū)別在于,每個學生只有一個名字,但卻有多個課程。)首先可以這樣實現(xiàn):
Student[] students = ...;
Subscriber<Student> subscriber = new Subscriber<Student>() {
@Override
public void onNext(Student student) {
List<Course> courses = student.getCourses();
for (int i = 0; i < courses.size(); i++) {
Course course = courses.get(i);
Log.d(tag, course.getName());
}
}
...
};
Observable.from(students)
.subscribe(subscriber);
依然很簡單。那么如果我不想在 Subscriber 中使用 for 循環(huán),而是希望 Subscriber 中直接傳入單個的 Course 對象呢(這對于代碼復(fù)用很重要)?用 map() 顯然是不行的,因為 map() 是一對一的轉(zhuǎn)化,而我現(xiàn)在的要求是一對多的轉(zhuǎn)化。那怎么才能把一個 Student 轉(zhuǎn)化成多個 Course 呢?
這個時候,就需要用 flatMap() 了:
Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
@Override
public void onNext(Course course) {
Log.d(tag, course.getName());
}
...
};
Observable.from(students)
.flatMap(new Func1<Student, Observable<Course>>() {
@Override
public Observable<Course> call(Student student) {
return Observable.from(student.getCourses());
}
})
.subscribe(subscriber);
從上面的代碼可以看出, flatMap() 和 map() 有一個相同點:它也是把傳入的參數(shù)轉(zhuǎn)化之后返回另一個對象。但需要注意,和 map()不同的是, flatMap() 中返回的是個 Observable 對象,并且這個 Observable 對象并不是被直接發(fā)送到了 Subscriber 的回調(diào)方法中。
flatMap() 的原理是這樣的:
- 使用傳入的事件對象創(chuàng)建一個 Observable 對象;
- 并不發(fā)送這個 Observable, 而是將它激活,于是它開始發(fā)送事件;
- 每一個創(chuàng)建出來的 Observable 發(fā)送的事件,都被匯入同一個 Observable ,而這個 Observable 負責將這些事件統(tǒng)一交給 Subscriber 的回調(diào)方法。這三個步驟,把事件拆成了兩級,通過一組新創(chuàng)建的 Observable 將初始的對象『鋪平』之后通過統(tǒng)一路徑分發(fā)了下去。而這個『鋪平』就是 flatMap() 所謂的 flat。
flatMap() 示意圖:

擴展:由于可以在嵌套的 Observable 中添加異步代碼, flatMap() 也常用于嵌套的異步操作,例如嵌套的網(wǎng)絡(luò)請求。示例代碼(Retrofit + RxJava):
networkClient.token() // 返回 Observable<String>,在訂閱時請求 token,并在響應(yīng)后發(fā)送 token
.flatMap(new Func1<String, Observable<Messages>>() {
@Override
public Observable<Messages> call(String token) {
// 返回 Observable<Messages>,在訂閱時請求消息列表,并在響應(yīng)后發(fā)送請求到的消息列表
return networkClient.messages();
}
})
.subscribe(new Action1<Messages>() {
@Override
public void call(Messages messages) {
// 處理顯示消息列表
showMessages(messages);
}
});
傳統(tǒng)的嵌套請求需要使用嵌套的 Callback 來實現(xiàn)。而通過 flatMap() ,可以把嵌套的請求寫在一條鏈中,從而保持程序邏輯的清晰。
throttleFirst(): 在每次事件觸發(fā)后的一定時間間隔內(nèi)丟棄新的事件。常用作去抖動過濾,例如按鈕的點擊監(jiān)聽器:
RxView.clickEvents(button) // RxBinding 代碼,后面的文章有解釋
.throttleFirst(500, TimeUnit.MILLISECONDS) // 設(shè)置防抖間隔為 500ms
.subscribe(subscriber);
媽媽再也不怕我的用戶手抖點開兩個重復(fù)的界面啦。
此外, RxJava 還提供很多便捷的方法來實現(xiàn)事件序列的變換,這里就不一一舉例了。
- 變換的原理:lift()
這些變換雖然功能各有不同,但實質(zhì)上都是針對事件序列的處理和再發(fā)送。而在 RxJava 的內(nèi)部,它們是基于同一個基礎(chǔ)的變換方法:lift(Operator)。首先看一下 lift() 的內(nèi)部實現(xiàn)(僅核心代碼):
// 注意:這不是 lift() 的源碼,而是將源碼中與性能、兼容性、擴展性有關(guān)的代碼剔除后的核心代碼。
// 如果需要看源碼,可以去 RxJava 的 GitHub 倉庫下載。
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
return Observable.create(new OnSubscribe<R>() {
@Override
public void call(Subscriber subscriber) {
Subscriber newSubscriber = operator.call(subscriber);
newSubscriber.onStart();
onSubscribe.call(newSubscriber);
}
});
}
這段代碼很有意思:它生成了一個新的 Observable 并返回,而且創(chuàng)建新 Observable 所用的參數(shù) OnSubscribe 的回調(diào)方法 call() 中的實現(xiàn)竟然看起來和前面講過的 Observable.subscribe() 一樣!然而它們并不一樣喲~不一樣的地方關(guān)鍵就在于第二行onSubscribe.call(subscriber) 中的 onSubscribe 所指代的對象不同(高能預(yù)警:接下來的幾句話可能會導致身體的嚴重不適)——
subscribe() 中這句話的 onSubscribe 指的是 Observable 中的 onSubscribe 對象,這個沒有問題,但是 lift() 之后的情況就復(fù)雜了點。
當含有 lift() 時:
1.lift() 創(chuàng)建了一個 Observable 后,加上之前的原始 Observable,已經(jīng)有兩個 Observable 了;
2.而同樣地,新 Observable 里的新 OnSubscribe 加上之前的原始 Observable 中的原始 OnSubscribe,也就有了兩個OnSubscribe;
3.當用戶調(diào)用經(jīng)過 lift() 后的 Observable 的 subscribe() 的時候,使用的是 lift() 所返回的新的 Observable ,于是它所觸發(fā)的 onSubscribe.call(subscriber),也是用的新 Observable 中的新 OnSubscribe,即在 lift() 中生成的那個 OnSubscribe;
4.而這個新 OnSubscribe 的 call() 方法中的 onSubscribe ,就是指的原始 Observable 中的原始 OnSubscribe ,在這個 call()方法里,新 OnSubscribe 利用 operator.call(subscriber) 生成了一個新的 Subscriber(Operator 就是在這里,通過自己的call() 方法將新 Subscriber 和原始 Subscriber 進行關(guān)聯(lián),并插入自己的『變換』代碼以實現(xiàn)變換),然后利用這個新Subscriber 向原始 Observable 進行訂閱。
這樣就實現(xiàn)了 lift() 過程,有點像一種代理機制,通過事件攔截和處理實現(xiàn)事件序列的變換。
精簡掉細節(jié)的話,也可以這么說:在 Observable 執(zhí)行了 lift(Operator) 方法之后,會返回一個新的 Observable,這個新的Observable 會像一個代理一樣,負責接收原始的 Observable 發(fā)出的事件,并在處理后發(fā)送給 Subscriber。
如果你更喜歡具象思維,可以看圖:
lift() 原理圖

兩次和多次的 lift() 同理,如下圖:

舉一個具體的 Operator 的實現(xiàn)。下面這是一個將事件中的 Integer 對象轉(zhuǎn)換成 String 的例子,僅供參考:
observable.lift(new Observable.Operator<String, Integer>() {
@Override
public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {
// 將事件序列中的 Integer 對象轉(zhuǎn)換為 String 對象
return new Subscriber<Integer>() {
@Override
public void onNext(Integer integer) {
subscriber.onNext("" + integer);
}
@Override
public void onCompleted() {
subscriber.onCompleted();
}
@Override
public void onError(Throwable e) {
subscriber.onError(e);
}
};
}
});
講述 lift() 的原理只是為了讓你更好地了解 RxJava ,從而可以更好地使用它。然而不管你是否理解了 lift() 的原理,RxJava 都不建議開發(fā)者自定義 Operator 來直接使用 lift(),而是建議盡量使用已有的 lift() 包裝方法(如map() flatMap() 等)進行組合來實現(xiàn)需求,因為直接使用 lift() 非常容易發(fā)生一些難以發(fā)現(xiàn)的錯誤。
- compose: 對 Observable 整體的變換
除了 lift() 之外, Observable 還有一個變換方法叫做 compose(Transformer)。它和 lift() 的區(qū)別在于, lift() 是針對事件項和事件序列的,而 compose() 是針對 Observable 自身進行變換。舉個例子,假設(shè)在程序中有多個 Observable ,并且他們都需要應(yīng)用一組相同的 lift() 變換。你可以這么寫:
observable1
.lift1()
.lift2()
.lift3()
.lift4()
.subscribe(subscriber1);
observable2
.lift1()
.lift2()
.lift3()
.lift4()
.subscribe(subscriber2);
observable3
.lift1()
.lift2()
.lift3()
.lift4()
.subscribe(subscriber3);
observable4
.lift1()
.lift2()
.lift3()
.lift4()
.subscribe(subscriber1);
你覺得這樣太不軟件工程了,于是你改成了這樣:
private Observable liftAll(Observable observable) {
return observable
.lift1()
.lift2()
.lift3()
.lift4();
}
...
liftAll(observable1).subscribe(subscriber1);
liftAll(observable2).subscribe(subscriber2);
liftAll(observable3).subscribe(subscriber3);
liftAll(observable4).subscribe(subscriber4);
可讀性、可維護性都提高了??墒?Observable 被一個方法包起來,這種方式對于 Observale 的靈活性似乎還是增添了那么點限制。怎么辦?這個時候,就應(yīng)該用 compose() 來解決了:
public class LiftAllTransformer implements Observable.Transformer<Integer, String> {
@Override
public Observable<String> call(Observable<Integer> observable) {
return observable
.lift1()
.lift2()
.lift3()
.lift4();
}
}
...
Transformer liftAll = new LiftAllTransformer();
observable1.compose(liftAll).subscribe(subscriber1);
observable2.compose(liftAll).subscribe(subscriber2);
observable3.compose(liftAll).subscribe(subscriber3);
observable4.compose(liftAll).subscribe(subscriber4);
像上面這樣,使用 compose() 方法,Observable 可以利用傳入的 Transformer 對象的 call 方法直接對自身進行處理,也就不必被包在方法的里面了。
compose() 的原理比較簡單,不附圖嘍。
線程控制:Scheduler高級##
除了靈活的變換,RxJava 另一個牛逼的地方,就是線程的自由控制。
- Scheduler 高級的 API
前面講到了,可以利用 subscribeOn() 結(jié)合 observeOn() 來實現(xiàn)線程控制,讓事件的產(chǎn)生和消費發(fā)生在不同的線程。可是在了解了map() flatMap() 等變換方法后,有些好事的(其實就是當初剛接觸 RxJava 時的我)就問了:能不能多切換幾次線程?
答案是:能。因為 observeOn() 指定的是 Subscriber 的線程,而這個 Subscriber 并不是(嚴格說應(yīng)該為『不一定是』,但這里不妨理解為『不是』)subscribe() 參數(shù)中的 Subscriber ,而是 observeOn() 執(zhí)行時的當前 Observable 所對應(yīng)的 Subscriber ,即它的直接下級 Subscriber 。換句話說,observeOn() 指定的是它之后的操作所在的線程。因此如果有多次切換線程的需求,只要在每個想要切換線程的位置調(diào)用一次 observeOn() 即可。上代碼:
Observable.just(1, 2, 3, 4) // IO 線程,由 subscribeOn() 指定
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.map(mapOperator) // 新線程,由 observeOn() 指定
.observeOn(Schedulers.io())
.map(mapOperator2) // IO 線程,由 observeOn() 指定
.observeOn(AndroidSchedulers.mainThread)
.subscribe(subscriber); // Android 主線程,由 observeOn() 指定
如上,通過 observeOn() 的多次調(diào)用,程序?qū)崿F(xiàn)了線程的多次切換。
不過,不同于 observeOn() , subscribeOn() 的位置放在哪里都可以,但它是只能調(diào)用一次的。
又有好事的(其實還是當初的我)問了:如果我非要調(diào)用多次 subscribeOn() 呢?會有什么效果?
這個問題先放著,我們還是從 RxJava 線程控制的原理說起吧。
- Scheduler 的原理(二)
其實, subscribeOn() 和 observeOn() 的內(nèi)部實現(xiàn),也是用的 lift()。具體看圖(不同顏色的箭頭表示不同的線程):
subscribeOn() 原理圖:

observeOn() 原理圖:

從圖中可以看出,subscribeOn() 和 observeOn() 都做了線程切換的工作(圖中的 "schedule..." 部位)。不同的是, subscribeOn()的線程切換發(fā)生在 OnSubscribe 中,即在它通知上一級 OnSubscribe 時,這時事件還沒有開始發(fā)送,因此 subscribeOn() 的線程控制可以從事件發(fā)出的開端就造成影響;而 observeOn() 的線程切換則發(fā)生在它內(nèi)建的 Subscriber 中,即發(fā)生在它即將給下一級Subscriber 發(fā)送事件時,因此 observeOn() 控制的是它后面的線程。
最后,我用一張圖來解釋當多個 subscribeOn() 和 observeOn() 混合使用時,線程調(diào)度是怎么發(fā)生的(由于圖中對象較多,相對于上面的圖對結(jié)構(gòu)做了一些簡化調(diào)整):
線程控制綜合調(diào)用

圖中共有 5 處含有對事件的操作。由圖中可以看出,①和②兩處受第一個 subscribeOn() 影響,運行在紅色線程;③和④處受第一個observeOn() 的影響,運行在綠色線程;⑤處受第二個 onserveOn() 影響,運行在紫色線程;而第二個 subscribeOn() ,由于在通知過程中線程就被第一個 subscribeOn() 截斷,因此對整個流程并沒有任何影響。這里也就回答了前面的問題:當使用了多個subscribeOn() 的時候,只有第一個 subscribeOn() 起作用。
- 延伸:doOnSubscribe()
然而,雖然超過一個的 subscribeOn() 對事件處理的流程沒有影響,但在流程之前卻是可以利用的。
在前面講 Subscriber 的時候,提到過 Subscriber 的 onStart() 可以用作流程開始前的初始化。然而 onStart() 由于在subscribe() 發(fā)生時就被調(diào)用了,因此不能指定線程,而是只能執(zhí)行在 subscribe() 被調(diào)用時的線程。這就導致如果 onStart() 中含有對線程有要求的代碼(例如在界面上顯示一個 ProgressBar,這必須在主線程執(zhí)行),將會有線程非法的風險,因為有時你無法預(yù)測subscribe() 將會在什么線程執(zhí)行。
而與 Subscriber.onStart() 相對應(yīng)的,有一個方法 Observable.doOnSubscribe() 。它和 Subscriber.onStart() 同樣是在subscribe() 調(diào)用后而且在事件發(fā)送前執(zhí)行,但區(qū)別在于它可以指定線程。默認情況下, doOnSubscribe() 執(zhí)行在 subscribe() 發(fā)生的線程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的話,它將執(zhí)行在離它最近的 subscribeOn() 所指定的線程。
示例代碼:
Observable.create(onSubscribe)
.subscribeOn(Schedulers.io())
.doOnSubscribe(new Action0() {
@Override
public void call() {
progressBar.setVisibility(View.VISIBLE); // 需要在主線程執(zhí)行
}
})
.subscribeOn(AndroidSchedulers.mainThread()) // 指定主線程
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
如上,在 doOnSubscribe()的后面跟一個 subscribeOn() ,就能指定準備工作的線程了。
結(jié)語
此篇博文是RxJava最基本的介紹了,相信大家對RxJava有了初步的認識,接下來會對RXJava的應(yīng)用場景進行分析~