最近,我抽出了幾個(gè)晚上的時(shí)間,把咖啡和啤酒變成了代碼與文字。
引子
三個(gè)月以來,我翻譯了一些關(guān)于RxJava的文章,說實(shí)話這些翻譯,真的搞得我很頭疼,那么現(xiàn)在是時(shí)候回來寫點(diǎn)什么了。
最近,我在看兩本書,《Learning Reactive Programming with Java 8》,《RxJava Essentials》,不過,沒關(guān)系,我已經(jīng)買到了電子版,我會(huì)在文章結(jié)尾附上網(wǎng)盤鏈接和密碼,但我還是希望你將文章繼續(xù)讀下去,因?yàn)槟鞘俏恼陆Y(jié)尾的事。
其實(shí)關(guān)于RxJava的文章和消息遠(yuǎn)不止我們能了解到的,但又拜英語所賜,所以它看起來又沒那么多。好在,國(guó)內(nèi)有許多優(yōu)秀的開發(fā)專家hi大頭鬼hi
,BlackSwift,程序亦非猿,Drakeet,扔物線,流火楓林等等在為之做著貢獻(xiàn),以及簡(jiǎn)直不能更優(yōu)秀的文章《給 Android 開發(fā)者的 RxJava 詳解》。
但是,現(xiàn)在,我不得不再次做啰嗦一下,RxJava究竟會(huì)改變我們什么。
響應(yīng)式編程Reactive Programming
什么是響應(yīng)式編程呢?在Java程序中:
int a = 4;
int b = 5;
int c = a + b;
System.out.println(c); // 9
a = 6;
System.out.println(c);
// 9 again, but if 'c' was tracking the changes of 'a' and 'b',
// it would've been 6 + 5 = 11
當(dāng)我們改變“a”和“b”的值時(shí),“c”并沒有改變。換句話說,“a”和“b”的改變并沒有響應(yīng)到“c”。這就是響應(yīng)式:程序以流的形式,傳遞數(shù)據(jù)的改變。
那我,我們又為什么需要響應(yīng)式呢?
以下翻譯自《Learning Reactive Programming with Java 8》
10-15年前,對(duì)于網(wǎng)站開發(fā)來說,最平常的日常工作就是進(jìn)行維護(hù)和縮短響應(yīng)時(shí)間,那么今天,一切程序都應(yīng)該保證七天二十四小時(shí)不間斷運(yùn)行,并且能夠極快的做出響應(yīng);如果你的網(wǎng)站響應(yīng)慢或者宕機(jī),那么用戶將會(huì)對(duì)你們真愛一秒變備胎,轉(zhuǎn)而選擇其他網(wǎng)站服務(wù)。當(dāng)今的慢意味著不可用甚至是有故障的。如今的互聯(lián)網(wǎng)是在和大數(shù)據(jù)打交道,所以我們需要快速的處理數(shù)據(jù)。
過去的幾年中HTTP錯(cuò)誤已經(jīng)不是什么新鮮事了,但是現(xiàn)在,我們不得不進(jìn)行容錯(cuò)機(jī)制,還要提供用戶易讀以及合理的消息更新。
在過去,我們寫簡(jiǎn)單的桌面應(yīng)用,但如今我們寫能夠做出快速響應(yīng)的Web應(yīng)用。多數(shù)情況下,這些應(yīng)用要與大量的遠(yuǎn)程服務(wù)器進(jìn)行數(shù)據(jù)傳遞。
如果我們想讓自己的軟件保持競(jìng)爭(zhēng)性,就不得不實(shí)現(xiàn)這些新需求,所以,換言之就是我們應(yīng)該這樣做:
- 模塊的/動(dòng)態(tài)的:用這種方式,我們就能夠擁有一個(gè)七天二十四小時(shí)的系統(tǒng)了,因?yàn)檫@些模塊能夠在不停止整個(gè)系統(tǒng)的情況下進(jìn)行脫機(jī)和聯(lián)機(jī)。另外,隨著系統(tǒng)的不斷龐大,還能幫助我們更好地組織應(yīng)用結(jié)構(gòu),同時(shí)還能管理底層代碼。
- 可擴(kuò)展的:用這種方式,我們就能夠處理大量的數(shù)據(jù)和用戶請(qǐng)求了。
- 容錯(cuò)性:用這種方式,能夠?yàn)橛脩籼峁┓€(wěn)定的系統(tǒng)。
- 響應(yīng)式:這不僅意味著快速,還意味著可用性強(qiáng)。
讓我們思考如何實(shí)現(xiàn)它:
- 如果我們的系統(tǒng)是事件驅(qū)動(dòng)型的,那就把它模塊化。我們可以將系統(tǒng)分成多個(gè)彼此之間通過通知進(jìn)行交互的微服務(wù)/組件/模塊。這樣,我們就能夠以通知為代表,響應(yīng)系統(tǒng)的數(shù)據(jù)流了。
- 可擴(kuò)展意味著能夠應(yīng)對(duì)日益增長(zhǎng)的數(shù)據(jù),在負(fù)載的情況下不會(huì)崩潰。
- 對(duì)故障/錯(cuò)誤做出及時(shí)的響應(yīng),能夠提高系統(tǒng)的容錯(cuò)性。
- 響應(yīng)意味著對(duì)能夠?qū)τ脩舨僮骷皶r(shí)的做出反應(yīng)。
如果應(yīng)用是事件驅(qū)動(dòng)型的,那么,它就能夠解耦成多個(gè)自包含組件。這能夠幫我們更好的實(shí)現(xiàn)擴(kuò)展性,因?yàn)槲覀兛偸强梢栽诓煌5艋蛘叽驍嘞到y(tǒng)的情況下添加新組建或者移除舊組件。如果錯(cuò)誤和故障傳遞給正確的組件,把它們當(dāng)做通知來處理并作出響應(yīng),那么應(yīng)用能變得更具有容錯(cuò)性和彈性。所以,如果把系統(tǒng)構(gòu)建成事件驅(qū)動(dòng)型的。我們可以更容易的實(shí)現(xiàn)擴(kuò)展性和容錯(cuò)性,而且一個(gè)具有擴(kuò)展性,低耦合和防錯(cuò)的應(yīng)用能夠快速的響應(yīng)用戶操作。
Reactive Manifesto文檔定義了我們剛剛提到的四點(diǎn)響應(yīng)式準(zhǔn)則。每一個(gè)響應(yīng)式系統(tǒng)都應(yīng)該是消息驅(qū)動(dòng)型(事件驅(qū)動(dòng)型)的。這樣它不僅能變得低耦合,而且擴(kuò)展性和容錯(cuò)性將更高,這就意味著它可靠和具有響應(yīng)式。
要注意的是,Reactive Manifesto只是描述了一個(gè)響應(yīng)式系統(tǒng),并不是對(duì)響應(yīng)式編程的定義。當(dāng)然,你也可以不使用任何響應(yīng)式類庫或者語言,打造一款彈性可擴(kuò)展,具有消息驅(qū)動(dòng)的響應(yīng)式應(yīng)用。
應(yīng)用程序中數(shù)據(jù)的變化,以通知的方式傳遞給正確的Handler。所以,使用響應(yīng)式構(gòu)造應(yīng)用是符遵循Manifesto最簡(jiǎn)單的方式。
回調(diào)地獄
如果你是一個(gè)能夠時(shí)刻保持頭腦清醒,邏輯清晰和思維縝密的人,是個(gè)Callback高手,善用并且能夠用好FutureTask。
那么在Android中你的代碼可能會(huì)頻繁的使用async+callbacks,或者service composition+ error handing 。
那么關(guān)于異步回調(diào)的邏輯,你會(huì)寫成這樣getData(Callback<T>)、這樣Future<T> getData(),還是這樣Future<List<T>> getData(),甚至這樣Future<List<Future<T>>> getData(),嗷!拜托,我簡(jiǎn)直不能再舉例下去了,這簡(jiǎn)直就是Callback Hell,這樣的程序或許寫起來很舒服,但是如何測(cè)試和維護(hù)呢。
如果哪天你的程序出了問題而必須馬上修復(fù),但你卻不能馬上趕來或者需要?jiǎng)e人協(xié)助(這在很多公司是很常見的),或者當(dāng)他人在review你的代碼時(shí),那么,是時(shí)候拿出這張圖了。

然而使用RxJava的操作符,我們可以避免這些煩人甚至糟糕的回調(diào),讓結(jié)構(gòu)和思路看起來更清晰,通過組合API,只需要約定最終的結(jié)果Observable<T>就行了。
并且scheduler的出現(xiàn),不僅解放了線程的切換,讓UI線程與工作線程間的跳轉(zhuǎn)變得簡(jiǎn)單,而且,它的API很豐,也提供了很多使用場(chǎng)景的建議,比如,適用計(jì)算任務(wù)的Schedulers.computation(?);處理密集IO任務(wù)的Schedulers.io(?);以及Schedulers.trampoline(?)能夠有效避免StackOverflowError,所以非常適合函數(shù)的遞歸調(diào)用。好了,我不再舉例了,因?yàn)?a target="_blank" rel="nofollow">官方文檔已經(jīng)給出了很詳細(xì)的解釋了,但是值得一提的是,如果使用Schedulers的工廠方法創(chuàng)建的Worker,一旦任務(wù)執(zhí)行完畢,都應(yīng)該調(diào)用worker.unsubscribe( )方法,然后轉(zhuǎn)向之前定義的Scheduler實(shí)例上來。
當(dāng)然RxJava的出現(xiàn)并不僅僅是為了解決回調(diào)地獄的。
這是我通過學(xué)習(xí)和不斷地練習(xí),一路走來很辛苦,總結(jié)的一些經(jīng)驗(yàn),分享給大家:
1 . error handling
3 . caching (roation)
@Override public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setRetainInstance(true);
/*.cache()操作符:
當(dāng)?shù)谝粋€(gè)subscribe訂閱的時(shí)候,才會(huì)連接原始Observable,緩存事件,重發(fā)給后續(xù)訂閱的subscribe
值得注意的事,它和使用了.replay()操作符的ConnectableObservable的不同。
另外,為了避免內(nèi)存開銷,不建議緩存大量事件*/
cacheObservable = weatherManager.getWeather().cache();
}
@Override public void onViewCreated(View view, Bundle savedInstanceState) {
super.onViewCreated(view, savedInstanceState);
cacheObservable.subscribe(/*your subscribe*/);
}
4 . composing multiple calls
5 . more robust interface than asyncTask
6 . easy to do complex threading
7 . functional nature is more expressive
/*一個(gè)數(shù)組,每個(gè)元素乘以2,然后篩選小于10的元素,放入集合中*/
Integer[] integers = { 0, 1, 2, 3, 4, 5 };
/*一般寫法,看上去并不是那么的“函數(shù)”*/
Integer[] doubles = new Integer[integers.length];
for (int i = 0; i < integers.length; i++) {
doubles[i] = integers[i] * 2;
}
List<Integer> integerList = new ArrayList<>(doubles.length);
for (Integer integer : doubles) {
if (integer < 10) integerList.add(integer);
}
/*Observable寫法,一切都好多了*/
List<Integer> funactionalList = Observable.from(integers).map(new Func1<Integer, Integer>() {
@Override public Integer call(Integer integer) {
return integer * 2;
}
}).filter(new Func1<Integer, Boolean>() {
@Override public Boolean call(Integer integer) {
return integer < 10;
}
}).toList().toBlocking().first();
9 . fluent API
10 . easy debugging
//值得一提的是,關(guān)于@RxLogSubscriber要放在繼承自Subscriber的類上
@RxLogSubscriber class MySubscriber extends Subscriber<Void> {
@Override public void onCompleted() {
}
@Override public void onError(Throwable e) {
}
@Override public void onNext(Void aVoid) {
}
}
//而不是實(shí)現(xiàn)Observer接口的類上
@RxLogSubscriber class MySubscriber implements Observer<Void> {
@Override public void onCompleted() {
}
@Override public void onError(Throwable e) {
}
@Override public void onNext(Void aVoid) {
}
}
當(dāng)然,隨著學(xué)習(xí)的深入,你會(huì)發(fā)現(xiàn),收益不止如此。
在響應(yīng)式編程中,應(yīng)該牢記以下兩點(diǎn):
-
everything is a stream(一切皆流)
-
don't break the chain(不要打斷鏈?zhǔn)浇Y(jié)構(gòu))
談?wù)凚ackpressure
Android這種嵌入式系統(tǒng),尤其是生產(chǎn)者-消費(fèi)者(producer-consumer)模式中,一定要小心Backpressure(背壓,反壓)的出現(xiàn)。一個(gè)寬泛的解釋就是:事件產(chǎn)生的速度比消費(fèi)快。一旦發(fā)生overproducing,當(dāng)你的鏈?zhǔn)浇Y(jié)構(gòu)不能承受數(shù)據(jù)壓力的時(shí)候,就會(huì)拋出MissingBackpressureException異常。
在Android中最容易出現(xiàn)的Backpressure就是連續(xù)快速點(diǎn)擊跳轉(zhuǎn)界面、數(shù)據(jù)庫查詢、文件掃面、鍵盤輸入,甚至聯(lián)網(wǎng)等操作都有可能造成Backpressure,可能有些情況并不會(huì)導(dǎo)致程序崩潰,但是會(huì)造成一些我們不想見到的小麻煩。那么一起來看看如何用RxJava解決Backpressure,OK,讓我們的程序變得健壯起來吧。
groupBy操作符
在寫這篇文章的時(shí)候,剛好看到一段代碼,看來有必要說一說這個(gè)操作符了。

.groupBy( ),分組操作符,雖然目前這個(gè)項(xiàng)目中沒有用到,但是我還是蠻喜歡它的,而且我看到很多人在使用,將原始Observable根據(jù)不同的key分組成多個(gè)GroupedObservable,由原始Observable發(fā)射(原始Observable的泛型將變成這樣Observable<GroupedObservable<K, T>>),每一個(gè)GroupedObservable既是事件本身也是一個(gè)獨(dú)立的Observable,每一個(gè)GroupedObservable發(fā)射一組原始Observable的事件子集。
引用自:GroupBy中文翻譯
注意:groupBy將原始Observable分解為一個(gè)發(fā)射多個(gè)GroupedObservable的Observable,一旦有訂閱,每個(gè)GroupedObservable就開始緩存數(shù)據(jù)。因此,如果你忽略這些GroupedObservable中的任何一個(gè),這個(gè)緩存可能形成一個(gè)潛在的內(nèi)存泄露。因此,如果你不想觀察,也不要忽略GroupedObservable。你應(yīng)該使用像take(0)這樣會(huì)丟棄自己的緩存的操作符。
如果你取消訂閱一個(gè)GroupedObservable,那個(gè)Observable將會(huì)終止。如果之后原始的Observable又發(fā)射了一個(gè)與這個(gè)Observable的Key匹配的數(shù)據(jù),groupBy將會(huì)為這個(gè)Key創(chuàng)建一個(gè)新的GroupedObservable。
那么問題恰恰出在.take(n)操作符上。

只返回前面指定的n項(xiàng)數(shù)據(jù),然后發(fā)送完成通知,忽略后面的事件。
那么看一下這個(gè)例子:
Observable.just(0, 1, 2, 3, 4, 5).groupBy(new Func1<Integer, Boolean>() {
@Override public Boolean call(Integer integer) {
return integer % 2 == 0;
}
}).flatMap(new Func1<GroupedObservable<Boolean, Integer>, Observable<Integer>>() {
@Override
public Observable<Integer> call(GroupedObservable<Boolean, Integer> groupedObservable) {
return groupedObservable.getKey() ? groupedObservable.take(1) : groupedObservable;
}
}).subscribe(new Action1<Integer>() {
@Override public void call(Integer i) {
System.out.println(i);
}
});
輸出結(jié)果:
0
1
2
3
4
5
然而在1.0.0-RC5之前的版本中,在GroupedObservable上使用.take(n)操作符將會(huì)在發(fā)送完n個(gè)事件后,對(duì)GroupedObservable進(jìn)行unsubscribe。并且GroupedObservable內(nèi)部將會(huì)記錄這個(gè)unsubscribed狀態(tài),然后忽略后面的事件。所以輸出結(jié)果將是這樣的:
0
1
3
5
而在這之后的版本,使用.take(n)操作符,雖然也會(huì)發(fā)生unsubscribe,但是當(dāng)原始Observable再次發(fā)送一個(gè)滿足key的事件后,將會(huì)重新創(chuàng)建一個(gè)GroupedObservable,然后發(fā)送這個(gè)GroupedObservable,不會(huì)發(fā)生之前那樣的,忽略后續(xù)事件的現(xiàn)象。
當(dāng)然,不要忘記,對(duì)不感興趣的GroupedObservable使用.take(0),來避免泄露。
所以,我的建議是,在使用RxJava之前看看官方文檔或者change log。
關(guān)于RxWeather
我盡量減少對(duì)這個(gè)工程的文字描述。因?yàn)榇a才是最好的老師。
通過對(duì)Android技術(shù)棧,1#架構(gòu)(譯文)和Android架構(gòu)演化之路(譯文)的解讀和學(xué)習(xí),按照架構(gòu)和思路進(jìn)行了實(shí)現(xiàn),并且加入了RxBus。
關(guān)于REST API,我選擇了和風(fēng)天氣,而放棄了Openweathermap的理由如下:
Openweathermap免費(fèi)用戶所在的服務(wù)器不穩(wěn)定。
付費(fèi)方面,和風(fēng)天氣更經(jīng)濟(jì)實(shí)惠。
但是和風(fēng)天氣目前并不支持同時(shí)查詢多個(gè)地區(qū)的天氣預(yù)報(bào),也不支持根據(jù)經(jīng)緯度查詢天氣預(yù)報(bào)。但是以后的事情誰又能說的準(zhǔn)呢?
由于應(yīng)用并不支持動(dòng)態(tài)的上拉加載。所以,所有的列表展示結(jié)果,取決于city.txt文件。
我從Openweathermap給出的資源(下載city.list.json)中,整理需要的城市Json字符串,整合了經(jīng)緯度,以備不時(shí)之需。
找到了一個(gè)通過Location查詢所在地的API。
就這樣基本實(shí)現(xiàn)了列表展示頁ListActivity的功能:
根據(jù)Loaction查詢所在地城市名稱,然后查詢當(dāng)?shù)靥鞖狻?/p>
讀取
domain->assets->city.txt,然后依次查詢每個(gè)城市的天氣,所以,這個(gè)文件不建議放入太多json。整合1和2的并發(fā)請(qǐng)求結(jié)果,顯示界面。
詳情頁DetailActivity通過RxBus發(fā)送黏性事件接收列表頁傳遞過來的數(shù)據(jù),然后進(jìn)行展示。這里會(huì)有七天內(nèi)的天氣以及穿衣建議。由于我么并沒有找到一個(gè)正確的算法,所以當(dāng)進(jìn)入詳情頁后,旋轉(zhuǎn)屏幕之后的退出動(dòng)畫會(huì)有所不同。這個(gè)類涉及的代碼大部分都是動(dòng)畫(注意Hardware Layer的使用)以及對(duì)屏幕旋轉(zhuǎn)的處理,所以代碼看起有點(diǎn)多。ForkView使用了一個(gè)簡(jiǎn)單的自定義Behavior。
搜索界面SearchActivity,輸入的關(guān)鍵字請(qǐng)不要以市、區(qū)結(jié)尾,例如,北京而不是北京市,因?yàn)锳PI不支持,我也沒辦法 :( 。
啟動(dòng)頁
我認(rèn)為,出彩的引導(dǎo)頁是對(duì)細(xì)節(jié)的重視,但是我實(shí)在不能忍受,在啟動(dòng)頁等太久。注意:不要混淆這兩種場(chǎng)景。
所以,我在看了正確使用啟動(dòng)頁之后,決定采取這種方式實(shí)現(xiàn)SplashActivity。而且不建議使用大圖,一個(gè)icon足以。
Code
所有代碼都可以從Github上獲得。
片尾Tips:
文章開頭提到的資料,需要pdf或者kindle版本的請(qǐng)自行選擇下載,不得用于商業(yè)用途。
Learning Reactive Programming with Java 8 - pdf版,提取密碼:2d88。
Learning Reactive Programming with Java 8 - kindle版,提取密碼:5nec。
RxJava Essentials - pdf版,提取密碼:z3r8。
RxJava Essentials - kindle版,提取密碼:l67e。


