原文地址:README.md
前言
- 關(guān)于RxJava:詳情請(qǐng)查看另一篇文章:Android RxJava 學(xué)習(xí)筆記。
- 寫這篇文章的目的:通過閱讀RxJava官方文檔,對(duì)RxJava有更全面的認(rèn)識(shí),在翻譯的過程中,提高自己的英語水平,并為他人帶來便利。
以下內(nèi)容為譯文內(nèi)容。
RxJava:為JVM而設(shè)計(jì)的Reactive Extensions
Reactive Extensions(Rx):一個(gè)使用可觀察序列組成異步的、基于事件的程序的庫。RxJava就是Reactive Extensions在Java虛擬機(jī)上的實(shí)現(xiàn)。
RxJava繼承了觀察者模式以支持?jǐn)?shù)據(jù)、事件序列,以及添加允許你以聲明的方式組合序列的運(yùn)算符,同時(shí)抽出對(duì)類似于低級(jí)線程、同步、線程安全和并發(fā)數(shù)據(jù)結(jié)構(gòu)等問題的關(guān)注。
2.x版本(API文檔)
- 單獨(dú)依賴:Reactive-Streams
- 持續(xù)支持Java 6+ 以及 Android 2.3+
- 通過設(shè)計(jì)變更以及Reactive-Streams-Commons研究項(xiàng)目提升了性能
- Java 8 lambda友好API
- 兼容并發(fā)源(線程、池、事件循環(huán)、Actors、參與者等)
- 異步或同步執(zhí)行
- 用于參數(shù)化并發(fā)的虛擬時(shí)間和調(diào)度器
版本2.x和版本1.x將會(huì)在幾年內(nèi)并存。他們擁有不同的Group id(io.reactivex.rxjava2 對(duì)比 io.reactivex)和命名空間(io.reactivex 對(duì)比 rx)。
關(guān)于版本1.x和2.x的不同可以查看維基文章《2.0有何不同》。可以在維基主頁從整體上了解更多RxJava的內(nèi)容。
1.x版本
截至2018年3月31日,1.x版本已過期。不再對(duì)其進(jìn)行開發(fā)、支持、維護(hù)、PRs和更新。最后一個(gè)版本1.3.8的API文檔,將可持續(xù)訪問。
入門
設(shè)置依賴
首先將RxJava2添加到你的項(xiàng)目中,例如,通過Gradle方式添加compile依賴:
implementation "io.reactivex.rxjava2:rxjava:2.x.y"
(請(qǐng)將x和y換成最新的版本號(hào))
Hello World
接下來我們寫一段Hello World程序:
package rxjava.examples;
import io.reactivex.*;
public class HelloWorld {
public static void main(String[] args) {
Flowable.just("Hello world").subscribe(System.out::println);
}
}
如果你的平臺(tái)不支持Java 8 lambdas(目前為止),你需要手動(dòng)新建一個(gè)Consumer內(nèi)部類:
import io.reactivex.functions.Consumer;
Flowable.just("Hello world")
.subscribe(new Consumer<String>() {
@Override public void accept(String s) {
System.out.println(s);
}
});
基本類
RxJava 2 具有幾個(gè)基本類,你可以在其中發(fā)現(xiàn)運(yùn)算符:
-
io.reactivex.Flowable:0..N個(gè)流,支持響應(yīng)流和背壓 -
io.reactivex.Observable:0..N個(gè)流,無背壓 -
io.reactivex.Single:一條只有一個(gè)條目或者錯(cuò)誤的流 -
io.reactivex.Completable:一條沒有條目但是只有一個(gè)完成或錯(cuò)誤標(biāo)志的流 -
io.reactivex.Maybe:一條無條目或者只有一個(gè)條目或錯(cuò)誤的流
一些術(shù)語
上游,下游
RxJava中的數(shù)據(jù)流包含一個(gè)數(shù)據(jù)源,至少0個(gè)中間步驟,隨后是一個(gè)數(shù)據(jù)消費(fèi)者或者組合器步驟(該步驟負(fù)責(zé)以某種方式消費(fèi)數(shù)據(jù)流):
source.operator1().operator2().operator3().subscribe(consumer);
source.flatMap(value -> source.operator1().operator2().operator3());
在這里,如果我們想想自己處于operator2的位置,向左看,一直到source,被稱為上游。向右看,直到consumer,被稱為下游。這當(dāng)我們把每個(gè)元素分開寫時(shí)通常更容易理解:
source
.operator1()
.operator2()
.operator3()
.subscribe(consumer)
運(yùn)動(dòng)中的對(duì)象
在RxJava的文檔中,排放物(emission),發(fā)射物(emits),條目(item),信號(hào)(signal),數(shù)據(jù)(data)以及消息(message)被認(rèn)為是同意詞并且代表在整個(gè)數(shù)據(jù)流中運(yùn)動(dòng)的對(duì)象。
背壓
當(dāng)數(shù)據(jù)流運(yùn)行到異步步驟時(shí),每一步可能都以不同的速度執(zhí)行不同的操作。為了避免此類步驟(通常表現(xiàn)為由于臨時(shí)緩沖或需要跳過/刪除數(shù)據(jù)而導(dǎo)致內(nèi)存使用量增加)被吞沒,應(yīng)用了所謂的凡壓力,這是一種流控制形式,使得此類步驟能明確他們準(zhǔn)備處理的條目數(shù)。其允許當(dāng)數(shù)據(jù)流中的某個(gè)步驟無法直到上游有多少個(gè)條目會(huì)傳遞過來時(shí),限制內(nèi)存的使用。
在RxJava中,Flowable類專用于支持背壓,而Observable類專用于非背壓操作(短序列、GUI交互等)。其他類型,Single,Maybe以及Completable都不支持背壓,并且也不應(yīng)該支持背壓;暫時(shí)存儲(chǔ)一個(gè)條目的空間總會(huì)有的。
裝配時(shí)間
數(shù)據(jù)流通過應(yīng)用各種中間操作的發(fā)生而做的準(zhǔn)備被稱為裝配時(shí)間:
Flowable<Integer> flow = Flowable.range(1, 5)
.map(v -> v * v)
.filter(v -> v % 3 == 0)
;
在此時(shí),數(shù)據(jù)還未流動(dòng)并且沒有副作用發(fā)生。
訂閱時(shí)間
這是在流中的subscribe()方法被調(diào)用使得隊(duì)列內(nèi)部建立起鏈條關(guān)系時(shí)的一個(gè)短暫的狀態(tài):
flow.subscribe(System.out::println)
這是觸發(fā)訂閱副作用的時(shí)候(參見doonsubscribe)。在這種狀態(tài)下,某些源會(huì)立即阻止或開始發(fā)出條目。
運(yùn)行時(shí)間
這是當(dāng)流主動(dòng)地發(fā)出條目、錯(cuò)誤或完成信號(hào)時(shí)的狀態(tài):
Observable.create(emitter -> {
while (!emitter.isDisposed()) {
long time = System.currentTimeMillis();
emitter.onNext(time);
if (time % 2 != 0) {
emitter.onError(new IllegalStateException("Odd millisecond!"));
break;
}
}
})
.subscribe(System.out::println, Throwable::printStackTrace);
實(shí)際上,是上面給出的示例的主體執(zhí)行的時(shí)候。
簡(jiǎn)單的后臺(tái)計(jì)算
在RxJava中有一個(gè)很普遍的使用場(chǎng)景就是在后臺(tái)線程運(yùn)行一些計(jì)算,網(wǎng)絡(luò)請(qǐng)求并在UI線程顯示結(jié)果(或錯(cuò)誤):
import io.reactivex.schedulers.Schedulers;
Flowable.fromCallable(() -> {
Thread.sleep(1000); // 模仿復(fù)雜的計(jì)算
return "Done";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000); // <--- 等待流結(jié)束
這種類似于建造者模式的鏈?zhǔn)椒椒L(fēng)格被稱為流式API。然而,RxJava的響應(yīng)式類型是不可改變的;每個(gè)方法調(diào)用后返回一個(gè)攜帶了被添加的行為的新Flowable對(duì)象。為了更容易理解,上方的示例代碼可以被寫為如下形式:
Flowable<String> source = Flowable.fromCallable(() -> {
Thread.sleep(1000); // 模仿復(fù)雜的計(jì)算
return "Done";
});
Flowable<String> runBackground = source.subscribeOn(Schedulers.io());
Flowable<String> showForeground = runBackground.observeOn(Schedulers.single());
showForeground.subscribe(System.out::println, Throwable::printStackTrace);
Thread.sleep(2000);
通常,你可以通過subscribeOn將計(jì)算或者阻塞IO轉(zhuǎn)移到其他線程。一旦數(shù)據(jù)就緒,你可以通過observeOn確保他們?cè)谇芭_(tái)或者GUI線程上進(jìn)行處理。
調(diào)度器
RxJava運(yùn)算符不直接與Thread或ExecutorService直接工作,而是與所謂的Scheduler一起工作,后者在統(tǒng)一的API背后抽象出并發(fā)源。RxJava 2 具有多個(gè)標(biāo)準(zhǔn)調(diào)度程序,可通過Schedulers工具類訪問它們。
-
Schedulers.computation():在后臺(tái)中的固定數(shù)量的專用線程中運(yùn)行密集的計(jì)算工作。大部分異步操作將此作為默認(rèn)的Scheduler。 -
Schedulers.io():在一組動(dòng)態(tài)變化的線程池中運(yùn)行I/O類或阻塞操作。 -
Schedulers.single():以順序和先進(jìn)先出的方式在單個(gè)線程上運(yùn)行工作。 -
Schedulers.trampoline():在一個(gè)線程中以順序和先進(jìn)先出的方式運(yùn)行工作,通常用作測(cè)試目的。
這些可用于所有JVM平臺(tái),但在某些特定的平臺(tái),比如Android,定義了典型的Scheduler:AndroidSchedulers.mainThread(),SwingScheduler.instance()或JavaFXSchedulers.gui()。
此外,可以通過Schedulers.from(Executor)方法選擇一個(gè)已有的Executor(及其子類比如ExecutorService)。舉個(gè)例子,持有一個(gè)更大但數(shù)量固定的線程池(與computation()和io()不同)時(shí)可以使用這個(gè)方法。
末尾的Thread.sleep(2000)不會(huì)引發(fā)異常。RxJava中的默認(rèn)Scheduler運(yùn)行在守護(hù)進(jìn)程中,這意味著一旦Java主線程存在,它們都會(huì)停止,后臺(tái)的計(jì)算也不會(huì)發(fā)生。在這個(gè)例子中睡眠一段時(shí)間,可以讓你有足夠時(shí)間在控制臺(tái)中看到流的輸出。
流中的并發(fā)情況
RxJava中的流本質(zhì)上是連續(xù)的,分為幾個(gè)處理階段,這些處理階段可能同時(shí)運(yùn)行:
Flowable.range(1, 10)
.observeOn(Schedulers.computation())
.map(v -> v * v)
.blockingSubscribe(System.out::println);
這個(gè)例子中,流在計(jì)算Scheduler中將1到10的數(shù)字作開方,并在“主”線程中消耗計(jì)算結(jié)果(更精確地說,blockingSubscribe的調(diào)用線程)。然而,lambda表達(dá)式v -> v * v并未與這個(gè)流并行運(yùn)行;它在同一個(gè)計(jì)算線程上一個(gè)接一個(gè)地接收1到10的值。
并行處理
并行處理數(shù)字1到10會(huì)更復(fù)雜一些:
Flowable.range(1, 10)
.flatMap(v ->
Flowable.just(v)
.subscribeOn(Schedulers.computation())
.map(w -> w * w)
)
.blockingSubscribe(System.out::println);
實(shí)際上,并行在RxJava中其實(shí)是在不同的流中運(yùn)行最后將他們的結(jié)果合并到一個(gè)單獨(dú)的流中。運(yùn)算符flatMap通過首先將1到10之間的每個(gè)數(shù)字銀蛇到它自己?jiǎn)为?dú)的Flowable中運(yùn)行,然后將計(jì)算好的平方數(shù)合并起來以實(shí)現(xiàn)這種平行。
但是,請(qǐng)注意,flatMap不保證任何順序,內(nèi)部流的最終結(jié)果可能會(huì)交替出現(xiàn)。有一些其他運(yùn)算符可以代替:
-
concatMap同時(shí)在一個(gè)內(nèi)部流中進(jìn)行映射以及運(yùn)行 -
concatMapEager“同時(shí)”運(yùn)行所有的內(nèi)部流,但是輸出流會(huì)按照這些內(nèi)部流創(chuàng)建時(shí)的順序進(jìn)行排序
或者,操作符Flowable.parallel()以及ParallelFlowable類型可以實(shí)現(xiàn)相同的并行處理模式:
Flowable.range(1, 10)
.parallel()
.runOn(Schedulers.computation())
.map(v -> v * v)
.sequential()
.blockingSubscribe(System.out::println);
子流依賴
flatMap是一個(gè)能在很多情況下使用的強(qiáng)大的操作符。比如,給定一個(gè)返回Flowable的服務(wù),我們項(xiàng)用第一個(gè)服務(wù)發(fā)出的值調(diào)用另一個(gè)服務(wù):
Flowable<Inventory> inventorySource = warehouse.getInventoryAsync();
inventorySource.flatMap(inventoryItem ->
erp.getDemandAsync(inventoryItem.getId())
.map(demand
-> System.out.println("Item " + inventoryItem.getName() + " has demand " + demand));
)
.subscribe();
持續(xù)
有時(shí)候,當(dāng)一個(gè)條目變得可用時(shí),有人希望對(duì)它執(zhí)行一些依賴性的計(jì)算。這有時(shí)被稱為持續(xù),根據(jù)應(yīng)該發(fā)生的內(nèi)容的不同以及設(shè)計(jì)的類型的不同,可能需要不同的操作符實(shí)現(xiàn)。
依賴
最典型的場(chǎng)景是給定一個(gè)值,調(diào)用另一個(gè)服務(wù),等待并繼續(xù)其結(jié)果:
service.apiCall()
.flatMap(value -> service.anotherApiCall(value))
.flatMap(next -> service.finalCall(next))
還有一個(gè)場(chǎng)景是后面的序列需要用到之前的映射中的值。這可以通過將外部的flatMap移動(dòng)到之前的flatMap中而解決,如下:
service.apiCall()
.flatMap(value ->
service.anotherApiCall(value)
.flatMap(next -> service.finalCallBoth(value, next))
)
這樣,最初的value就可以被內(nèi)部的flatMap使用了,由lambda變量捕獲提供。
非依賴
在其他場(chǎng)景中,第一個(gè)源/數(shù)據(jù)流的結(jié)果是無用的,有人想要以另一個(gè)差不多的獨(dú)立的源繼續(xù),這種情況,flatMap同樣可用:
Observable continued = sourceObservable.flatMapSingle(ignored -> someSingleSource)
continued.map(v -> v.toString())
.subscribe(System.out::println, Throwable::printStackTrace);
但是,這個(gè)例子中的連續(xù)保持著Observable而不是看起來更合適的Single。(這是可以理解的,因?yàn)閺?code>flatMapSingle的調(diào)用者sourceObservable是一個(gè)多值的源,所以映射的可能的結(jié)果同樣也是多值的)。
通常,即使有一種有點(diǎn)兒更具表現(xiàn)力(同時(shí)也降低了開銷)方式,通過使用Completable作為中介及其操作符andThen以用其他東西繼續(xù):
sourceObservable
.ignoreElements() // returns Completable
.andThen(someSingleSource)
.map(v -> v.toString())
sourceObservable和someSingleSource之間的唯一的依賴是前者應(yīng)當(dāng)正常完成以便后者被消費(fèi)。
延遲依賴
有時(shí),在之前的序列和新序列中有一個(gè)隱藏的依賴,由于某種原因,并未通過“常規(guī)通道”而流動(dòng)。有人會(huì)想要將持續(xù)寫成如下文這樣:
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.just(count.get()))
.subscribe(System.out::println);
不幸的是,因?yàn)?code>Single.just(count.get())在數(shù)據(jù)流還沒有運(yùn)行的時(shí)候,作為參數(shù)組裝時(shí)已經(jīng)運(yùn)行完成了,所以最終打印了0。我們需要某種方式以延遲這個(gè)Single源的運(yùn)算,直到主源完成運(yùn)行時(shí)為止:
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.defer(() -> Single.just(count.get())))
.subscribe(System.out::println);
或者
AtomicInteger count = new AtomicInteger();
Observable.range(1, 10)
.doOnNext(ignored -> count.incrementAndGet())
.ignoreElements()
.andThen(Single.fromCallable(() -> count.get()))
.subscribe(System.out::println);
類型變換
有時(shí),一個(gè)源或服務(wù)返回了一個(gè)與應(yīng)該使用它的流不同的類型。比如,在上方的inventory例子中,getDemandAsync應(yīng)當(dāng)返回一個(gè)Single<DemandRecord>。如果示例代碼保持不變,最終將引起一個(gè)便利錯(cuò)誤(但是,經(jīng)常會(huì)拋出一個(gè)有關(guān)缺乏過載的誤導(dǎo)性錯(cuò)誤消息)。
待更新……