在我研究響應(yīng)式編程的過(guò)程中,我所找到的每一篇文章幾乎都以響應(yīng)式編程很難學(xué)習(xí)的理念開(kāi)頭。針對(duì)響應(yīng)式編程零基礎(chǔ)人員準(zhǔn)備的文章少之又少。本文嘗試通過(guò)在android上使用RxJava為初學(xué)者厘清響應(yīng)式編程的基本概念。
什么是響應(yīng)式編程?
響應(yīng)式編程就是編程處理異步數(shù)據(jù)流。
等等,我使用callback也很容易處理異步數(shù)據(jù)啊。所以這和響應(yīng)式編程有什么不同呢?
是的,這個(gè)概念并不新鮮。它可以通過(guò)命令式(imperatively)編程來(lái)完成,而且通常都是這么做的。
如果我們不僅僅考慮回調(diào),同時(shí)再考慮一下讓回調(diào)啟動(dòng)并運(yùn)行的支持機(jī)制。使用命令式方法來(lái)支持通常會(huì)涉及到狀態(tài)管理還需要考慮狀態(tài)改變所帶來(lái)的副作用。在軟件開(kāi)發(fā)界,這些考慮已經(jīng)成為大量錯(cuò)誤的原因。響應(yīng)式編程采用函數(shù)式方法;它處理的是流從而避免了全局狀態(tài)以及相應(yīng)的副作用。
什么是流?
萬(wàn)物皆流,無(wú)物常住 --- 赫拉克利特
流代表一個(gè)數(shù)據(jù)序列。想象一下我們的交通系統(tǒng)。某條高速路上的汽車就是一條一直流動(dòng)偶爾出現(xiàn)一個(gè)瓶頸的對(duì)象流。所謂響應(yīng)式編程,就是我們接收連續(xù)流動(dòng)的數(shù)據(jù)--數(shù)據(jù)流--提供處理數(shù)據(jù)流的方法并將該方法應(yīng)用到數(shù)據(jù)流。數(shù)據(jù)的源頭我們并不(也不應(yīng))關(guān)心。
數(shù)據(jù)流無(wú)處不在。任何物體都可以是數(shù)據(jù)流:變量,用戶輸入,屬性,緩存,數(shù)據(jù)結(jié)構(gòu)等等。
什么是聲明式編程,什么是命令式編程?
- 命令式引導(dǎo)你怎么做
- 聲明式告訴你做什么
在深入代碼之前,我們還是來(lái)看一下我們的交通系統(tǒng)網(wǎng)絡(luò)。讓我們假設(shè)市長(zhǎng)想臨時(shí)在一條指定的高速路上分間隔擺放停車標(biāo)志來(lái)中斷車流。市長(zhǎng)會(huì)說(shuō):“將高速路分成均勻的幾段,在每一段的邊界上放一個(gè)停車標(biāo)志”。承包商會(huì)說(shuō):“等等,在分段之前,我需要確定每一段的長(zhǎng)度;為了確定分段長(zhǎng)度,我需要知道高速路的總長(zhǎng),我們要放置多少個(gè)停車標(biāo)志,以及車輛的平均長(zhǎng)度”。在這個(gè)場(chǎng)景里,市長(zhǎng)擁有足夠多的職能部門(mén)(包括DOT(交通部)),她在處理事情時(shí)只需要專注于宣布她的意圖,而不用關(guān)心事情具體怎么完成的細(xì)節(jié)--這就是聲明式方法。而另一方面。承包商需要保證整個(gè)流程的每一處細(xì)節(jié)都要考慮周全并準(zhǔn)確的完成--這就是命令式方法。如果你可以像市長(zhǎng)建設(shè)她的城市一樣構(gòu)建一個(gè)軟件會(huì)是什么樣子呢?我們一起來(lái)看一個(gè)示例:
例:使用命令式方法過(guò)濾掉偶數(shù)。
Integer[] numbers = {1, 2, 3, 4, 5};
List<Integer> lists = Arrays.asList(numbers);
List<Integer> results = new ArrayList<>();
for (Integer num : numbers) {
if (num % 2 != 0)
results.add(num);
}
聲明式方法
List<Integer> results = lists.stream()
.filter(s -> s % 2 != 0)
.collect(Collectors.toList());
很酷,我喜歡聲明式方法,但是如果我們不告訴它怎么做,計(jì)算機(jī)怎么知道做什么呢?
在現(xiàn)在的世界里,任何事情最終落實(shí)到操作系統(tǒng)和硬件時(shí)都是命令式的。而響應(yīng)式編程,是函數(shù)式編程的一種抽象。就和我們所使用的高階命令式編程語(yǔ)言是底層二進(jìn)制以及匯編命令的抽象一樣(市長(zhǎng)也需要有她的DOT承包商)。
所以,我們?cè)鯓釉贘ava中使用聲明式編程風(fēng)格呢?
Java8有一個(gè)很驚艷的Stream API,但是如果你和我一樣是一個(gè)Android開(kāi)發(fā)者,你不能使用Stream API,因?yàn)閍ndroid還不支持Java8的所有特性。盡管如此,你可以使用RxJava,這是由Netflix的開(kāi)發(fā)者為Java提供的一個(gè)響應(yīng)式擴(kuò)展。
RxJava怎么工作?

響應(yīng)式代碼的基礎(chǔ)是被觀察者(Observable)和觀察者(Observer)。
- 被觀察者(Observable)可以被監(jiān)聽(tīng)(和觀察者模式中的Subject相似)
- 觀察者(Observer)則監(jiān)聽(tīng)被觀察者
被觀察者是一個(gè)發(fā)送數(shù)據(jù)流或者事件流的類,觀察者則對(duì)被觀察者發(fā)送出的數(shù)據(jù)/事件做出反應(yīng)。一個(gè)被觀察者可以有多個(gè)觀察者,對(duì)于被觀察者發(fā)送出的每一個(gè)事件/項(xiàng)目都會(huì)被Observer.onNext()方法接收并處理。一旦被觀察者發(fā)送完了所有的數(shù)據(jù)它會(huì)調(diào)用Observer.onComplete()。如果發(fā)生錯(cuò)誤,被觀察者會(huì)調(diào)用Observer.onError()方法。
注意: 有的被觀察者永遠(yuǎn)都不會(huì)終止(比如溫度傳感器的輸出)。
觀察者和被觀察者之間通過(guò)Subscription連接,觀察者在后面也可以通過(guò)Subscription取消訂閱被觀察者。
聽(tīng)起來(lái)和觀察者模式很相似,那么觀察者模式和RxJava框架之間有什么區(qū)別呢?
RxJava的被觀察者為觀察者模式添加了兩個(gè)功能。
- 當(dāng)不再產(chǎn)生數(shù)據(jù)時(shí),生產(chǎn)者會(huì)通知消費(fèi)者。(onComplete())
- 當(dāng)發(fā)生錯(cuò)誤時(shí),生產(chǎn)者會(huì)通知消費(fèi)者。(onError())
除此之外,RxJava的威力在于僅僅只需要幾行代碼就可以變換,聚合,過(guò)濾被觀察者發(fā)送的數(shù)據(jù)流,這樣可以極大的減少需要維護(hù)的狀態(tài)變量。
給我看代碼
創(chuàng)建一個(gè)被觀察者(Observable):
Integer[] numbers = {1, 2, 3, 4, 5, 6, 7};
List<Integer> lists = Arrays.asList(numbers);
Observable<Integer> integerObservable = Observable.from(lists);
integerObservable將發(fā)射數(shù)字1、2、3、4、5、6、7然后結(jié)束。
注意: 創(chuàng)建被觀察者的方式有很多很多。更多信息可以參考官方文檔。
Subscriber
Subscriber是一種特殊類型的觀察者,它可以取消訂閱被觀察者。
Observable.
Subscriber<Integer> mySubscriber = new Subscriber<Integer>() {
@Override
public void onNext(Integer data) {
Log.d("Rx", "onNext:"+data);
}
@Override
public void onCompleted() {
Log.d("Rx","Complete!");
}
@Override
public void onError(Throwable e) {
// handle your error here
}
};
將Subscriber連接到被觀察者:
被觀察者是惰性的,在沒(méi)有訂閱者監(jiān)聽(tīng)之前它不會(huì)做任何事情。
myObservable.subscribe(mySubscriber);
// Outputs:
// onNext: 1
// onNext: 2
// onNext: 3
// onNext: 4
// onNext: 5
// onNext: 6
// onNext: 7
// Complete!
改變流:
RxJava提供了許多改變流的運(yùn)算符。下面幾個(gè)操作方法是最常用的。
-
Filter:Filter運(yùn)算符會(huì)過(guò)濾被觀察者,被觀察者發(fā)射的數(shù)據(jù)中只有通過(guò)你在謂詞函數(shù)中指定的測(cè)試后才能繼續(xù)往下流動(dòng)。
integerObservable.filter(new Func1<Integer, Boolean>() { @Override public Boolean call(Integer o) { return o % 2 == 0; } }).subscribe(mySubscriber); // Outputs : // onNext: 2 // onNext: 4 // onNext: 6 // Complete!
這里我過(guò)濾掉了所有的奇數(shù)項(xiàng)。
---1---2---3---4----5----6----7---|-->
filter(x % 2 == 0)
-------2-------4---------6--------|-->
注意: Func<T, R>表示一個(gè)單參數(shù)的函數(shù),T是第一個(gè)參數(shù)的類型,R是返回結(jié)果的類型。
-
Map: Map運(yùn)算符將會(huì)將你指定的函數(shù)應(yīng)用到被觀察者發(fā)射的每一項(xiàng),并返回一個(gè)被觀察者,這個(gè)被觀察者發(fā)射的數(shù)據(jù)就是你指定函數(shù)的返回結(jié)果。
integerObservable.map(new Func1<Integer, Integer>() { @Override public Integer call(Integer value) { return value * value; } }).subscribe(mySubscriber); // onNext:1 // onNext:4 // onNext:9 // onNext:16 // onNext:25 // onNext:36 // onNext:49 // Complete!
這里我使用map運(yùn)算符將發(fā)射出的數(shù)據(jù)改變成另外一個(gè)數(shù)。我改變了integerObservable所發(fā)射出的每一項(xiàng),所以最后每一個(gè)數(shù)據(jù)都變成了該數(shù)據(jù)的平方。
---1---2---3---4----5----6----7---|-->
map(x -> x * x)
---1---4---9---16---25---36---49---|-->
RxJava中有大量的操作符用于處理流變換。
好吧,你不是說(shuō)響應(yīng)式編程是異步的嗎?
如果你不告訴它需要使用異步的方式,RxJava默認(rèn)是同步的。
但是同步是響應(yīng)式系統(tǒng)必須的行為嗎?
確定是使用異步還是同步的被觀察者需要根據(jù)具體的問(wèn)題分析。例如:從內(nèi)存緩存中獲取數(shù)據(jù)并立即返回也許使用同步會(huì)更合適。另一方面,如果被觀察者會(huì)產(chǎn)生網(wǎng)絡(luò)調(diào)用或者一些耗時(shí)的數(shù)據(jù)處理則應(yīng)該使用異步的方式??偟脑瓌t就是:如果是在開(kāi)發(fā)一個(gè)圖形系統(tǒng),當(dāng)一項(xiàng)任務(wù)起源于UI線程并且需要阻塞(或者大量的計(jì)算操作)時(shí)應(yīng)該采用異步的方式。
在異步從何而來(lái)這個(gè)問(wèn)題上,RxJava持不可知論者的態(tài)度。
了解,現(xiàn)在告訴我怎么創(chuàng)建一個(gè)異步的observable?
首先,我們來(lái)看一下使用RxJava之前,將一個(gè)密集的長(zhǎng)時(shí)間的I/O操作轉(zhuǎn)移的其他線程(非ui線程)時(shí)的處理方式。
以前的方式
private class FetchUsersTask
extends AsyncTask<String, Void, User> {
protected User doInBackground(String... someData) {
String userId=params[0];
User user = UsersService.getUser(userId);
return user;
}
protected void onPostExecute(User user) {
//handle the result and update the view
}
}
FetchUsersTask調(diào)用usersService.getUsers()并返回一個(gè)字符串列表,然后傳遞給onPostExecute()方法??雌饋?lái)非常簡(jiǎn)單,但是這段代碼中存在一些問(wèn)題
- 錯(cuò)誤處理: doInBackground()也許會(huì)發(fā)生錯(cuò)誤或者異常,為了能夠從異常中恢復(fù)我們需要添加try-catch。通常情況下,當(dāng)我們捕獲到異常時(shí),我們會(huì)log輸出并且通知用戶,這又需要回到UI線程。使用AsyncTask我們也可以將Object作為doInBackground()方法的返回類型,然后在onPostExecute()中使用instanceof來(lái)檢查類型--這需要更多的代碼。
- 內(nèi)存泄漏: 即使啟動(dòng)異步任務(wù)的Activity/Fragment 被銷毀,異步任務(wù)仍持續(xù)運(yùn)行直到doInBackground()方法執(zhí)行完成,因?yàn)樵诤笈_(tái)任務(wù)完成后asyncTask需要通知view,所以運(yùn)行時(shí)必須持有Activity/Fragment的引用。如果Activity在后臺(tái)任務(wù)完成之前就被銷毀,而開(kāi)發(fā)者又沒(méi)有采取合適的技術(shù)比如弱引用,那么就會(huì)導(dǎo)致內(nèi)存泄漏甚至是應(yīng)用程序崩潰。也許可以采用另一種方法,就是使用cancel(boolean)來(lái)取消正在執(zhí)行的任務(wù),那么最終會(huì)調(diào)用onCancelled()方法而不是onPostExecute()。
- 連續(xù)多次網(wǎng)絡(luò)調(diào)用: 編排多個(gè)AsyncTask的唯一辦法就是使用嵌套,這將使得代碼非常復(fù)雜。
RxJava方式:
現(xiàn)在我們來(lái)看一下怎么使用RxJava來(lái)異步加載數(shù)據(jù)。
Observable.fromCallable(new Callable<User>() {
@Override public User call() throws Exception {
return UsersService.getUser(userId);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<User>() {
@Override public void onCompleted() {
Log.d("Rx", "Completed");
}
@Override public void onError(Throwable e) {
Log.d("Rx", e.getMessage());
}
@Override public void onNext(User user) {
Log.d("Rx", user.getName());
}
});
這里subscribeOn(Schedulers.io())將使observable工作在新的線程,而observeOn(AndroidSchedulers.mainThread()))將使訂閱者在主UI線程上去處理observable發(fā)送出來(lái)的結(jié)果。
這和AsyncTask很相似但是更簡(jiǎn)單更簡(jiǎn)潔。RxJava解決了我前面提到的所有問(wèn)題。
錯(cuò)誤處理:使用RxJava方式以后錯(cuò)誤處理變得非常簡(jiǎn)單,因?yàn)樗锌赡艿腻e(cuò)誤和異常都會(huì)拋給onError()方法。由于我們是在主線程監(jiān)聽(tīng)(AndroidSchedulers.mainThread()),所以我們可以非常便捷的和UI交互從而告知用戶相關(guān)錯(cuò)誤。
-
內(nèi)存泄漏:RxJava不會(huì)魔力般的減輕內(nèi)存泄漏的問(wèn)題,但是要阻止內(nèi)存泄漏則非常簡(jiǎn)單。RxJava提供了非常簡(jiǎn)潔的方式來(lái)取消訂閱正在執(zhí)行的異步調(diào)用。調(diào)用 subscriber 或者 subscription 的 unsubscribe()方法可以讓Activity或者Fragment從通知列表中注銷掉。如果你有多個(gè)subscription,可以使用CompositeSubscription來(lái)持有所有的Subscriptions,然后在onDestroy()或者onDestroyView()中一次取消所有的訂閱。
private CompositeSubscription allSubscriptions = new CompositeSubscription(); //add all the subscription to allSubscriptions allSubscriptions.add(subscription1); allSubscriptions.add(subscription2); allSubscriptions.add(subscription3); //clear all subscription on onDestroy @Override public void onDestroy() { super.onDestroy(); allSubscriptions.clear(); } -
連續(xù)多次網(wǎng)絡(luò)調(diào)用:有許多運(yùn)算符可以幫助我們串聯(lián)并修改observable。一旦理解,連續(xù)多次進(jìn)行網(wǎng)絡(luò)調(diào)用將變得非常簡(jiǎn)單。我們一起來(lái)看一下這樣的場(chǎng)景,從第一次網(wǎng)絡(luò)調(diào)用中獲取到了一個(gè)用戶ID列表,然后需要對(duì)每一個(gè)用戶ID調(diào)用getUser()來(lái)獲取用戶的信息。
Observable.fromCallable(new Callable<List<String>>() { @Override public List<String> call() throws Exception { return UserService.getUserIds(); } }).flatMap(new Func1<List<String>, Observable<String>>() { @Override public Observable<String> call(List<String> userIds) { return Observable.from(userIds); } }).flatMap(new Func1<String, Observable<User>>() { @Override public Observable<User> call(String userId) { return Observable.just(UserSerive.getUser(userId)); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<User>() { @Override public void onCompleted() { Log.d("Rx", "emit","Completed!"); } @Override public void onError(Throwable e) { Log.d("Rx", "emit", e.getMessage()); } @Override public void onNext(User user) { Log.d("Rx", user.getName()); } });
使用Lambda后的代碼:
Observable.fromCallable(() -> UsersService.getUserIds())
.flatMap(userIds -> Observable.from(userIds))
.flatMap(userId -> Observable.just(UserService.getUser(userId))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()))
.subscribe(new Subscriber<User>() {
@Override
public void onCompleted() {
Log.d("Rx", "emit","Completed!");
}
@Override
public void onError(Throwable e) {
Log.d("Rx", "emit", e.getMessage());
}
@Override
public void onNext(User user) {
Log.d("Rx", "emit", user.getName());
}
});
下面的圖標(biāo)描述了將一個(gè)含有單個(gè)字符串列表流變化成含有多個(gè)用戶信息流的過(guò)程。
-------{~~~~~~~~~~~~list of user ids [1,2,3,4,5]~~~~~~~~~}---|-->
flatMap(userIds -> Observable.from(userIds))
-------1------------2----------3------------4------------5---|--->
flatMap (userId -> UserService.getUser(userId))
----user1--------user2------user3--------user4-------user5---|--->
參考文獻(xiàn)
- Staltz, André. The Introduction to Reactive Programming You’ve Been Missing
- Christensen, Ben, and Tomasz Nurkiewicz. Reactive Programming with RxJava.
本文譯自Howdy RxJava