前言:
這是一個關于Rx的博文系列 ,也是本人第一次通過網(wǎng)絡來記錄、整理自己的學習歷程。若有紕漏之處,尤其是錯誤的理解和觀點,請各位大神拍磚。
干貨在后頭呢。
Rx——Reactive Extension,使用可觀測的序列(流)來組成異步的、基于事件的程序的庫。我們使用Observable(數(shù)據(jù)流)描述數(shù)據(jù)序列,通過Operator(操作符)查詢、改變數(shù)據(jù)流,并且支持以參數(shù)的形式利用Scheduler(調度器)控制異步數(shù)據(jù)流的并發(fā)。簡而言之:Rx = Observable + Operator + Scheduler。
若無特別說明,本系列文章中所出現(xiàn)的數(shù)據(jù)流和事件流均代表Observable。
我更愿意稱之為一種編程方式——響應式編程。實際上Rx不止一個庫,它包括了多個實現(xiàn)了Rx方式的各種編程語言的開源庫。如非特別說明,本系列文章均參考Rx的Java實現(xiàn)——RxJava2——進行闡述。
Rx擴展了觀察者模式,使得該模式從對一個數(shù)據(jù)或事件的觀察升級為對一個序列的數(shù)據(jù)或事件的觀察,更強大的是它可通過操作符將多個數(shù)據(jù)序列組合成一個序列。借助于Rx我們能夠更加專注于程序的邏輯,而不用過多地關注諸如多線程、同步、線程安全、并發(fā)、非阻塞IO這些底層的東東。
看到這里有人會質疑了:沒有Rx我照樣能得到想要的數(shù)據(jù)序列啊,談何“升級”?這里以吃飯來舉例。
場景一和場景二:
來到公司樓下的食堂——沒錯,就是那種去晚了菜都涼了的地方:
一)中午胃口不錯,于是點了個兩葷一素:Iterable<Dish> lunch = getDish(); eat(lauch);
二)晚上想減肥,只要了一個素菜:Dish dinner = getDish(); eat(dinner);
食堂的菜已經(jīng)提前準備好了,想吃多少任君挑選。點好菜就開吃不用等待,可見在食堂吃飯是一個同步的操作。
場景三:
某個下午堆代碼上癮錯過食堂飯點,只好去樓下的拉面館吃面:
三) 老板,來碗牛肉炒刀削:Future<Dish> dinner = waitDish(); eat(lauch);
給牛肉炒刀削點贊!接下來就是等面出鍋,可見吃面一個異步的操作。
場景四和場景五:
周末好不容易約了女神一起吃中餐:
為了這頓飯我餓了3天,服務員菜單遞過來我底氣十足地點了5個硬菜,女神卻說要減肥(不早說)。為了體現(xiàn)我的紳士風度,餓死也必須等女神先吃。
四) 然而女神說等菜全部上齊了再吃:Future<Iterable<Dish>> lunch = waitDishAll(); eat(lauch);
五) 如果第一道菜上桌就開始吃我就不用餓那么久了:Iterable<Future<Dish>> lunch = waitDishAny(); eat(lauch);
可見吃大餐是一個由多個異步過程組合的操作,組合的方式由具體業(yè)務決定。
毫無疑問,我們能從容面對場景一和場景二;對于場景三,不同語言提供了類似功能的工具,比如Java提供了Future(配合Callback)來應對這類單一異步操作。然而在場景四和場景五中,存在多個、甚至可能嵌套的異步操作,考慮到異步操作在時間上地不確定性,盡管Future仍然可以作為一種解決方案,但隨之而來的復雜性也不容忽視,程序員需要提供一些額外的邏輯來判斷異步操作何時結束以及后續(xù)的操作(對應本場景中的吃)何時開始。
吃瓜群眾開始起哄:Rx行那Rx上啊。
Observable<Dish> lauch = getDish(); // 得到數(shù)據(jù)流
5道菜上齊再吃: lauch.buffer(5).subscribe(eat); // 轉換數(shù)據(jù)流 、訂閱
第一道菜上桌就吃: lauch.firstElement().subscribe(eat); // 過濾數(shù)據(jù)流 、訂閱
如何獲取和響應包含多個數(shù)據(jù)的異步數(shù)據(jù)流,Java的Future+Callback方案顯得有點笨拙,程序員不得不分散精力去處理核心業(yè)務之外的邏輯,編寫出的代碼也容易產(chǎn)生臭名昭著的“回調地獄”。相比之下,Rx表現(xiàn)得游刃有余,它提供了豐富的操作符(此處暫且按下不表),原始數(shù)據(jù)流(上游數(shù)據(jù)流)經(jīng)若干次操作符處理后變成目標數(shù)據(jù)流(下游數(shù)據(jù)流)。觀察者訂閱了目標數(shù)據(jù)流后,剩下的工作就是集中精力處理后續(xù)業(yè)務。
Rx不但可以方便地處理單個數(shù)據(jù)——更重要的是——它在需要處理多個、甚至無限多個數(shù)據(jù)的場景中表現(xiàn)優(yōu)異。因此,Rx的準則是:(幾乎)一切都可以成為一個數(shù)據(jù)流(哪怕只有單個數(shù)據(jù)、甚至沒有任何數(shù)據(jù))。
Rx相比經(jīng)典的觀察者模式增加了兩個能力:
1) 被觀察者向觀察者發(fā)送“沒有更多數(shù)據(jù)”通知的能力(調用觀察者的onComplete方法),類比于Iterable因 !hasNext()正常結束遍歷。
2) 被觀察者向觀察者發(fā)送“異常”通知的能力 (調用觀察者的onError方法),類比于Iterable遍歷過程中拋出異常 提前結束遍歷。
觀察者只能收到以上兩個通知其中之一。一旦收到任何一個通知,之后觀察者將不會收到任何數(shù)據(jù)或通知。
獲得以上兩個能力的加持后,Rx的Observable與Iterable看起來就像是孿生兄弟,我們可以像使用Iterable一樣使用Observable。
通過Iterable處理數(shù)據(jù)序列,用的是pull(拉?。┑姆绞?,處理過程發(fā)生在當前線程:
getDataFromLocalMemory()
? ? .skip(10)
? ? .take(5)
? ? ? .map({ s -> s + " transformed" })
? ? .forEach({ println "next -> " + it })
通過Observable處理數(shù)據(jù)序列,用的是push(推送)的方式,可以靈活地選擇同步或異步地發(fā)送處理結果:
getDataFromNetwork()
? ? .skip(10)
? ? .take(5)
? ? .map({ s ->s + " transformed" })
? ? .subscribe({ println "onNext -> " + it })
Observable比它的孿生兄弟Iterable多了一對隱形的翅膀——在處理異步的數(shù)據(jù)流時,我們便可以打開這對翅膀。
Rx使用非常靈活,盡管我們通常用它來處理異步數(shù)據(jù)流,事實上,Rx完全不關心產(chǎn)生數(shù)據(jù)的方式,無論是通過線程池、event loops、non-blocking I/O、 actors。即便你已經(jīng)通過線程池來產(chǎn)生數(shù)據(jù),仍然可以改造成其他方式,而完全不用改變現(xiàn)有觀察者的工作方式。
數(shù)據(jù)是同步or異步獲取的?
數(shù)據(jù)序列是否需要在多個不同線程中計算并依次返回給調用者?
是否通過異步的網(wǎng)絡請求來獲取數(shù)據(jù)?
是否通過callback線程獲取數(shù)據(jù)?
Rx把一切與Observable的交互視為異步的。