前言
本系列文章適用于已經(jīng)了解 RxJava 的讀者,深入貫徹其原理,加深對其的認識。如果從未了解過 RxJava 的讀者們,建議先熟悉它。
RxJava 0.x
RxJava 最早是 Netflix 參照微軟的 Rx.Net,在 Java 上實現(xiàn)一套類似的庫,0.x 其實就是社區(qū)內(nèi)部迭代開發(fā)的時代。
在 0.x 的迭代過程中,API 還不穩(wěn)定,在長期的變更中,逐步完善了 Observable,Publisher,Subscriber,Scheduler 等接口以及大量的操作符。
Reactive Streams
在開發(fā) RxJava 早期版本的過程中,開發(fā)組也參與了制定 Reactive Streams 規(guī)范。
但是 RxJava1 并沒有遵循這個規(guī)范,因為考慮到下面幾個原因:
- Subscriber.onComplete 和 當時的 Observer.onCompleted 相差一個字母,導致 API 不能兼容。
- RxJava 中的 Subscription 和 RS 中的有些細微的區(qū)別,它不能在沒有較大變動的情況下實現(xiàn) request(n) 方法,因此 RxJava 引入了 Producer 這個接口,并混在了 Subscriber 中。
- RS 彼時不是穩(wěn)定版本,也在持續(xù)修訂中。
因此 RxJava 開發(fā)組決定在 2.0 版本中正式支持 RS 規(guī)范,在 1.x 版本中實現(xiàn)類似的機制,而不像在 2.0 中直接使用 RS 的接口。
Backpressure
由于 RS 的影響,在 0.20.0-RC1 中 RxJava 第一次引入 Backpressure 的概念,從此 RxJava 變成了一個讓人愛恨交織的庫。事實上 RxJava 開發(fā)組也曾表示,在 RxJava 早期版本中,在 Observable 混入 Backpressure 是一個重大的失誤。
- 由于 Backpressure 是在中間版本引入的,因此部分操作符支持,部分操作符不支持,導致對使用者有些混亂和不友好。
- 在熱數(shù)據(jù)源中(如點擊事件),是無法被正確地 backpressured ,從而導致經(jīng)常出現(xiàn)意外的
MissingBackpressureException。
事實上正如現(xiàn)在 2.x 中做的那樣,正確的做法是應該把模塊分為 支持 Backpressure 和不支持的兩類。在 io.reactivex.Observable 中徹底移除了 Backpressure,而 io.reactivex.Flowable 則遵循 RS 規(guī)范支持 Backpressure。
RxJava 1.x
經(jīng)過兩年多的迭代,RxJava 在 14 年 9 月發(fā)布了 1.0.0 正式版。
上面有提到,其實 1.x 是一個類似 RS 的版本,但是不依賴 RS 的接口。同時對比 0.x,做了如下的更改。
依賴方式
| groupId | artifactId | |
|---|---|---|
| 0.x | com.netflix.rxjava | rxjava-core |
| 1.x | io.reactivex | rxjava |
拆分項目
在 RxJava 1.0.0 發(fā)布之際,把 JVM 上其他語言的實現(xiàn)和子工程都獨立出去了,而在 RxJava 庫中只保留了 Java 版的實現(xiàn)。
如:
其他
新增和廢棄了部分操作符,修復了大量的 BUG。
RxJava 2.x
RxJava 2.0.0 正式版發(fā)布于 2016 年 9 月底。
筆者也曾寫過一篇 《淺談RxJava與2.0的新特性》,不過寫那篇文章時還在版本還在 2.0.0-RC1,以現(xiàn)在的角度看起來不免顯得不夠全面,因此最好的理解方式還是看官方的 Wiki。
類型
| 1.x | 2.x | |
|---|---|---|
| Single/Completable | Maybe | 0 或 1 數(shù)據(jù)源 |
| Observable | Flowable | 多數(shù)據(jù)源 |
| Subject | Processor | 熱數(shù)據(jù)源 |
正如上面所說的,RxJava2 遵循了 RS 規(guī)范,其冷數(shù)據(jù)源真正實現(xiàn)的類型就是 Flowable,熱數(shù)據(jù)源的實現(xiàn)則在io.reactivex.processors包中。
同時也把 Observable 中舊的 Backpressure 徹底移除,因此在 RxJava2 中使 用 Observable 再也不會拋出MissingBackpressureException。
Observable 與 Flowable
在 RxJava2 中, Flowable 和 Observable 雖然實現(xiàn)的代碼復用了一部分,但是機制卻大相徑庭。這里要涉及到數(shù)據(jù)源的三種模型:
數(shù)據(jù)源
- 響應式推:如鼠標點擊事件。此時生產(chǎn)者無條件產(chǎn)生數(shù)據(jù),消費者負責配合生產(chǎn)者。
- 同步拉:如
Iterable的迭代。此時消費者提出要求,生產(chǎn)者配合消費者下發(fā),數(shù)據(jù)源是確定的。 - 異步拉:如
Future,Processor。消費者提出要求,生產(chǎn)者據(jù)此下發(fā)數(shù)據(jù),但是數(shù)據(jù)到來不確定。
Observable
在 Observable 中,消費者是無權(quán)提出要求的,即數(shù)據(jù)都是生產(chǎn)者提供的,消費者只能被動接受。雖然數(shù)據(jù)源不確定,但是對消費者是透明的,只能被動等待數(shù)據(jù)。由于 Observable 已經(jīng)徹底移除了 Backpressure,因此對于消費速度和生產(chǎn)速度不協(xié)調(diào)時,中間操作符可能會創(chuàng)建 Buffer(如 observeOn)來緩存數(shù)據(jù)。因此數(shù)據(jù)在無限的積累中可能會導致 OOM, 但不再會拋出MissingBackpressureException。
Flowable
在 Flowable 中,消費者通過Subscription主動的向生產(chǎn)者提出自己需求的數(shù)量,上游據(jù)此發(fā)射數(shù)據(jù)。從而就有生產(chǎn)和消費的矛盾。如果是數(shù)據(jù)源是響應式推或者異步拉時,可能會導致MissingBackpressureException。
舉例
這么說有點抽象,我們舉個例子。Subject / Processor 就是對應異步拉的數(shù)據(jù)源,也是熱數(shù)據(jù)源。消費者在訂閱他們后,無論是否可以 request,數(shù)據(jù)的是否產(chǎn)生以及產(chǎn)生速度也是未知的。
在接受到onNext時:
- Subject 直接轉(zhuǎn)發(fā)給下游
- Prosessor 檢查下游的 request 數(shù)目,如果少于已經(jīng)發(fā)送的數(shù)目,則拋出
MissingBackpressureException
因此使用Prosessor稍有不慎就會出錯哦。當然在實際使用中Flowable.subscribe()時,內(nèi)置的 Subscriber 通常都會在 onSubscribe 時直接向生產(chǎn)者request(Long.MAX_VALUE),在 RxJava2 Long.MAX_VALUE 是一個特殊值,意味著無限流。大家可參見 subscribers
選擇
對于 Observable 與 Flowable 的選擇官方也有提示:
-
選擇 Observable:
- 處理不超過 1000 個數(shù)據(jù)時,因為數(shù)據(jù)很少,一般不會觸發(fā) OOM
- 處理 GUI 或者點擊事件時,因為這些事件是異步推的,很難被 backpressured 也一般不這樣做
- 數(shù)據(jù)源本質(zhì)上是同步的,但是平臺不支持 Java Stream API 或者你想用 RxJava 豐富的操作符,因為 Observable 比 Flowable 的性能更好
-
選擇 Flowable:
- 處理 10k+ 數(shù)據(jù)且數(shù)據(jù)源是可控的
- 基于拉的且阻塞的數(shù)據(jù)源
拆分之爭
事實上 RxJava 自 16年 開始社區(qū)一度有討論是否要把 RxJava2 拆分成多個庫,因為:
- 區(qū)分是否支持 Backpressure
- 將通用的代碼獨立出去,如
Scheduler、SimpleQueue - 減少包體積的大小
但是最終經(jīng)過討論后還是放棄了:
- 使用 Proguard 來壓縮 RxJava2 的效果非常好,基本上是用多少保留多少代碼
- 如果拆分開,強行逼迫使用者需要了解 Backpressure 的概念,且增加了使用方的麻煩,因為他們需要區(qū)分自己到底需要哪些庫
- 多個不同版本子庫的組合可能會導致兼容問題
- 拆分后,Observable 和 Flowable 互不依賴,互轉(zhuǎn)需要使用靜態(tài)方法,打斷了鏈式操作
- ...
結(jié)語
如果您作為一個 Android 開發(fā)者,正在糾結(jié)于 RxJava 帶來的好處和他的龐大的體積,那么您可以打消這個顧慮了,只要正確地配置了 Proguard,RxJava 對您包體積大小的影響微乎其微。反過來說,如果沒有配置 Proguard,那么是否引入 RxJava 確實是值得思考的一件事。
事實上,做一個 Android 開發(fā)者,筆者認為 RxJava 簡直是為 Android 而生的,天生響應式的事件與 RxJava 的結(jié)合能大幅提供您的工作效率。前提是您有一些函數(shù)式編程的思維,能把流程拆解成一個個操作符。