使用 Reactor 進(jìn)行反應(yīng)式編程

反應(yīng)式編程(Reactive Programming)這種新的編程范式越來(lái)越受到開發(fā)人員的歡迎。在 Java 社區(qū)中比較流行的是 RxJava 和 RxJava 2。本文要介紹的是另外一個(gè)新的反應(yīng)式編程庫(kù) Reactor。

反應(yīng)式編程介紹

反應(yīng)式編程來(lái)源于數(shù)據(jù)流和變化的傳播,意味著由底層的執(zhí)行模型負(fù)責(zé)通過(guò)數(shù)據(jù)流來(lái)自動(dòng)傳播變化。比如求值一個(gè)簡(jiǎn)單的表達(dá)式 c=a+b,當(dāng) a 或者 b 的值發(fā)生變化時(shí),傳統(tǒng)的編程范式需要對(duì) a+b 進(jìn)行重新計(jì)算來(lái)得到 c 的值。如果使用反應(yīng)式編程,當(dāng) a 或者 b 的值發(fā)生變化時(shí),c 的值會(huì)自動(dòng)更新。反應(yīng)式編程最早由 .NET 平臺(tái)上的 Reactive Extensions (Rx) 庫(kù)來(lái)實(shí)現(xiàn)。后來(lái)遷移到 Java 平臺(tái)之后就產(chǎn)生了著名的 RxJava 庫(kù),并產(chǎn)生了很多其他編程語(yǔ)言上的對(duì)應(yīng)實(shí)現(xiàn)。在這些實(shí)現(xiàn)的基礎(chǔ)上產(chǎn)生了后來(lái)的反應(yīng)式流(Reactive Streams)規(guī)范。該規(guī)范定義了反應(yīng)式流的相關(guān)接口,并將集成到 Java 9 中。

在傳統(tǒng)的編程范式中,我們一般通過(guò)迭代器(Iterator)模式來(lái)遍歷一個(gè)序列。這種遍歷方式是由調(diào)用者來(lái)控制節(jié)奏的,采用的是拉的方式。每次由調(diào)用者通過(guò) next()方法來(lái)獲取序列中的下一個(gè)值。使用反應(yīng)式流時(shí)采用的則是推的方式,即常見(jiàn)的發(fā)布者-訂閱者模式。當(dāng)發(fā)布者有新的數(shù)據(jù)產(chǎn)生時(shí),這些數(shù)據(jù)會(huì)被推送到訂閱者來(lái)進(jìn)行處理。在反應(yīng)式流上可以添加各種不同的操作來(lái)對(duì)數(shù)據(jù)進(jìn)行處理,形成數(shù)據(jù)處理鏈。這個(gè)以聲明式的方式添加的處理鏈只在訂閱者進(jìn)行訂閱操作時(shí)才會(huì)真正執(zhí)行。

反應(yīng)式流中第一個(gè)重要概念是負(fù)壓(backpressure)。在基本的消息推送模式中,當(dāng)消息發(fā)布者產(chǎn)生數(shù)據(jù)的速度過(guò)快時(shí),會(huì)使得消息訂閱者的處理速度無(wú)法跟上產(chǎn)生的速度,從而給訂閱者造成很大的壓力。當(dāng)壓力過(guò)大時(shí),有可能造成訂閱者本身的奔潰,所產(chǎn)生的級(jí)聯(lián)效應(yīng)甚至可能造成整個(gè)系統(tǒng)的癱瘓。負(fù)壓的作用在于提供一種從訂閱者到生產(chǎn)者的反饋渠道。訂閱者可以通過(guò) request()方法來(lái)聲明其一次所能處理的消息數(shù)量,而生產(chǎn)者就只會(huì)產(chǎn)生相應(yīng)數(shù)量的消息,直到下一次 request()方法調(diào)用。這實(shí)際上變成了推拉結(jié)合的模式。

Reactor 簡(jiǎn)介

前面提到的 RxJava 庫(kù)是 JVM 上反應(yīng)式編程的先驅(qū),也是反應(yīng)式流規(guī)范的基礎(chǔ)。RxJava 2 在 RxJava 的基礎(chǔ)上做了很多的更新。不過(guò) RxJava 庫(kù)也有其不足的地方。RxJava 產(chǎn)生于反應(yīng)式流規(guī)范之前,雖然可以和反應(yīng)式流的接口進(jìn)行轉(zhuǎn)換,但是由于底層實(shí)現(xiàn)的原因,使用起來(lái)并不是很直觀。RxJava 2 在設(shè)計(jì)和實(shí)現(xiàn)時(shí)考慮到了與規(guī)范的整合,不過(guò)為了保持與 RxJava 的兼容性,很多地方在使用時(shí)也并不直觀。Reactor 則是完全基于反應(yīng)式流規(guī)范設(shè)計(jì)和實(shí)現(xiàn)的庫(kù),沒(méi)有 RxJava 那樣的歷史包袱,在使用上更加的直觀易懂。Reactor 也是 Spring 5 中反應(yīng)式編程的基礎(chǔ)。學(xué)習(xí)和掌握 Reactor 可以更好地理解 Spring 5 中的相關(guān)概念。

在 Java 程序中使用 Reactor 庫(kù)非常的簡(jiǎn)單,只需要通過(guò) Maven 或 Gradle 來(lái)添加對(duì) io.projectreactor:reactor-core 的依賴即可,目前的版本是 3.0.5.RELEASE。

Flux 和 Mono

Flux 和 Mono 是 Reactor 中的兩個(gè)基本概念。Flux 表示的是包含 0 到 N 個(gè)元素的異步序列。在該序列中可以包含三種不同類型的消息通知:正常的包含元素的消息、序列結(jié)束的消息和序列出錯(cuò)的消息。當(dāng)消息通知產(chǎn)生時(shí),訂閱者中對(duì)應(yīng)的方法 onNext(), onComplete()和 onError()會(huì)被調(diào)用。Mono 表示的是包含 0 或者 1 個(gè)元素的異步序列。該序列中同樣可以包含與 Flux 相同的三種類型的消息通知。Flux 和 Mono 之間可以進(jìn)行轉(zhuǎn)換。對(duì)一個(gè) Flux 序列進(jìn)行計(jì)數(shù)操作,得到的結(jié)果是一個(gè) Mono對(duì)象。把兩個(gè) Mono 序列合并在一起,得到的是一個(gè) Flux 對(duì)象。

創(chuàng)建 Flux

有多種不同的方式可以創(chuàng)建 Flux 序列。

Flux 類的靜態(tài)方法

第一種方式是通過(guò) Flux 類中的靜態(tài)方法。

just():可以指定序列中包含的全部元素。創(chuàng)建出來(lái)的 Flux 序列在發(fā)布這些元素之后會(huì)自動(dòng)結(jié)束。

fromArray(),fromIterable()和 fromStream():可以從一個(gè)數(shù)組、Iterable 對(duì)象或 Stream 對(duì)象中創(chuàng)建 Flux 對(duì)象。

empty():創(chuàng)建一個(gè)不包含任何元素,只發(fā)布結(jié)束消息的序列。

error(Throwable error):創(chuàng)建一個(gè)只包含錯(cuò)誤消息的序列。

never():創(chuàng)建一個(gè)不包含任何消息通知的序列。

range(int start, int count):創(chuàng)建包含從 start 起始的 count 個(gè)數(shù)量的 Integer 對(duì)象的序列。

interval(Duration period)和 interval(Duration delay, Duration period):創(chuàng)建一個(gè)包含了從 0 開始遞增的 Long 對(duì)象的序列。其中包含的元素按照指定的間隔來(lái)發(fā)布。除了間隔時(shí)間之外,還可以指定起始元素發(fā)布之前的延遲時(shí)間。

intervalMillis(long period)和 intervalMillis(long delay, long period):與 interval()方法的作用相同,只不過(guò)該方法通過(guò)毫秒數(shù)來(lái)指定時(shí)間間隔和延遲時(shí)間。

代碼清單 1 中給出了上述這些方法的使用示例。

清單 1. 通過(guò) Flux 類的靜態(tài)方法創(chuàng)建 Flux 序列

1

2

3

4

5

6

Flux.just("Hello", "World").subscribe(System.out::println);

Flux.fromArray(new Integer[] {1, 2, 3}).subscribe(System.out::println);

Flux.empty().subscribe(System.out::println);

Flux.range(1, 10).subscribe(System.out::println);

Flux.interval(Duration.of(10, ChronoUnit.SECONDS)).subscribe(System.out::println);

Flux.intervalMillis(1000).subscribe(System.out::println);

上面的這些靜態(tài)方法適合于簡(jiǎn)單的序列生成,當(dāng)序列的生成需要復(fù)雜的邏輯時(shí),則應(yīng)該使用 generate() 或 create() 方法。

generate()方法

generate()方法通過(guò)同步和逐一的方式來(lái)產(chǎn)生 Flux 序列。序列的產(chǎn)生是通過(guò)調(diào)用所提供的 SynchronousSink 對(duì)象的 next(),complete()和 error(Throwable)方法來(lái)完成的。逐一生成的含義是在具體的生成邏輯中,next()方法只能最多被調(diào)用一次。在有些情況下,序列的生成可能是有狀態(tài)的,需要用到某些狀態(tài)對(duì)象。此時(shí)可以使用 generate()方法的另外一種形式 generate(Callable stateSupplier, BiFunction,S> generator),其中 stateSupplier 用來(lái)提供初始的狀態(tài)對(duì)象。在進(jìn)行序列生成時(shí),狀態(tài)對(duì)象會(huì)作為 generator 使用的第一個(gè)參數(shù)傳入,可以在對(duì)應(yīng)的邏輯中對(duì)該狀態(tài)對(duì)象進(jìn)行修改以供下一次生成時(shí)使用。

在代碼清單 2中,第一個(gè)序列的生成邏輯中通過(guò) next()方法產(chǎn)生一個(gè)簡(jiǎn)單的值,然后通過(guò) complete()方法來(lái)結(jié)束該序列。如果不調(diào)用 complete()方法,所產(chǎn)生的是一個(gè)無(wú)限序列。第二個(gè)序列的生成邏輯中的狀態(tài)對(duì)象是一個(gè) ArrayList 對(duì)象。實(shí)際產(chǎn)生的值是一個(gè)隨機(jī)數(shù)。產(chǎn)生的隨機(jī)數(shù)被添加到 ArrayList 中。當(dāng)產(chǎn)生了 10 個(gè)數(shù)時(shí),通過(guò) complete()方法來(lái)結(jié)束序列。

清單 2. 使用 generate()方法生成 Flux 序列

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

Flux.generate(sink -> {

sink.next("Hello");

sink.complete();

}).subscribe(System.out::println);

final Random random = new Random();

Flux.generate(ArrayList::new, (list, sink) -> {

int value = random.nextInt(100);

list.add(value);

sink.next(value);

if (list.size() == 10) {

sink.complete();

}

return list;

}).subscribe(System.out::println);

create()方法

create()方法與 generate()方法的不同之處在于所使用的是 FluxSink 對(duì)象。FluxSink 支持同步和異步的消息產(chǎn)生,并且可以在一次調(diào)用中產(chǎn)生多個(gè)元素。在代碼清單 3 中,在一次調(diào)用中就產(chǎn)生了全部的 10 個(gè)元素。

清單 3. 使用 create()方法生成 Flux 序列

1

2

3

4

5

6

Flux.create(sink -> {

for (int i = 0; i < 10; i++) {

sink.next(i);

}

sink.complete();

}).subscribe(System.out::println);

創(chuàng)建 Mono

Mono 的創(chuàng)建方式與之前介紹的 Flux 比較相似。Mono 類中也包含了一些與 Flux 類中相同的靜態(tài)方法。這些方法包括 just(),empty(),error()和 never()等。除了這些方法之外,Mono 還有一些獨(dú)有的靜態(tài)方法。

fromCallable()、fromCompletionStage()、fromFuture()、fromRunnable()和 fromSupplier():分別從 Callable、CompletionStage、CompletableFuture、Runnable 和 Supplier 中創(chuàng)建 Mono。

delay(Duration duration)和 delayMillis(long duration):創(chuàng)建一個(gè) Mono 序列,在指定的延遲時(shí)間之后,產(chǎn)生數(shù)字 0 作為唯一值。

ignoreElements(Publisher source):創(chuàng)建一個(gè) Mono 序列,忽略作為源的 Publisher 中的所有元素,只產(chǎn)生結(jié)束消息。

justOrEmpty(Optional data)和 justOrEmpty(T data):從一個(gè) Optional 對(duì)象或可能為 null 的對(duì)象中創(chuàng)建 Mono。只有 Optional 對(duì)象中包含值或?qū)ο蟛粸?null 時(shí),Mono 序列才產(chǎn)生對(duì)應(yīng)的元素。

還可以通過(guò) create()方法來(lái)使用 MonoSink 來(lái)創(chuàng)建 Mono。代碼清單 4 中給出了創(chuàng)建 Mono 序列的示例。

清單 4. 創(chuàng)建 Mono 序列

1

2

3

Mono.fromSupplier(() -> "Hello").subscribe(System.out::println);

Mono.justOrEmpty(Optional.of("Hello")).subscribe(System.out::println);

Mono.create(sink -> sink.success("Hello")).subscribe(System.out::println);

操作符

和 RxJava 一樣,Reactor 的強(qiáng)大之處在于可以在反應(yīng)式流上通過(guò)聲明式的方式添加多種不同的操作符。下面對(duì)其中重要的操作符進(jìn)行分類介紹。

buffer 和 bufferTimeout

這兩個(gè)操作符的作用是把當(dāng)前流中的元素收集到集合中,并把集合對(duì)象作為流中的新元素。在進(jìn)行收集時(shí)可以指定不同的條件:所包含的元素的最大數(shù)量或收集的時(shí)間間隔。方法 buffer()僅使用一個(gè)條件,而 bufferTimeout()可以同時(shí)指定兩個(gè)條件。指定時(shí)間間隔時(shí)可以使用 Duration 對(duì)象或毫秒數(shù),即使用 bufferMillis()或 bufferTimeoutMillis()兩個(gè)方法。

除了元素?cái)?shù)量和時(shí)間間隔之外,還可以通過(guò) bufferUntil 和 bufferWhile 操作符來(lái)進(jìn)行收集。這兩個(gè)操作符的參數(shù)是表示每個(gè)集合中的元素所要滿足的條件的 Predicate 對(duì)象。bufferUntil 會(huì)一直收集直到 Predicate 返回為 true。使得 Predicate 返回 true 的那個(gè)元素可以選擇添加到當(dāng)前集合或下一個(gè)集合中;bufferWhile 則只有當(dāng) Predicate 返回 true 時(shí)才會(huì)收集。一旦值為 false,會(huì)立即開始下一次收集。

代碼清單 5 給出了 buffer 相關(guān)操作符的使用示例。第一行語(yǔ)句輸出的是 5 個(gè)包含 20 個(gè)元素的數(shù)組;第二行語(yǔ)句輸出的是 2 個(gè)包含了 10 個(gè)元素的數(shù)組;第三行語(yǔ)句輸出的是 5 個(gè)包含 2 個(gè)元素的數(shù)組。每當(dāng)遇到一個(gè)偶數(shù)就會(huì)結(jié)束當(dāng)前的收集;第四行語(yǔ)句輸出的是 5 個(gè)包含 1 個(gè)元素的數(shù)組,數(shù)組里面包含的只有偶數(shù)。

需要注意的是,在代碼清單 5 中,首先通過(guò) toStream()方法把 Flux 序列轉(zhuǎn)換成 Java 8 中的 Stream 對(duì)象,再通過(guò) forEach()方法來(lái)進(jìn)行輸出。這是因?yàn)樾蛄械纳墒钱惒降?,而轉(zhuǎn)換成 Stream 對(duì)象可以保證主線程在序列生成完成之前不會(huì)退出,從而可以正確地輸出序列中的所有元素。

清單 5. buffer 相關(guān)操作符的使用示例

1

2

3

4

Flux.range(1, 100).buffer(20).subscribe(System.out::println);

Flux.intervalMillis(100).bufferMillis(1001).take(2).toStream().forEach(System.out::println);

Flux.range(1, 10).bufferUntil(i -> i % 2 == 0).subscribe(System.out::println);

Flux.range(1, 10).bufferWhile(i -> i % 2 == 0).subscribe(System.out::println);

filter

對(duì)流中包含的元素進(jìn)行過(guò)濾,只留下滿足 Predicate 指定條件的元素。代碼清單 6 中的語(yǔ)句輸出的是 1 到 10 中的所有偶數(shù)。

清單 6. filter 操作符使用示例

1Flux.range(1, 10).filter(i -> i % 2 == 0).subscribe(System.out::println);

window

window 操作符的作用類似于 buffer,所不同的是 window 操作符是把當(dāng)前流中的元素收集到另外的 Flux 序列中,因此返回值類型是 Flux>。在代碼清單 7 中,兩行語(yǔ)句的輸出結(jié)果分別是 5 個(gè)和 2 個(gè) UnicastProcessor 字符。這是因?yàn)?window 操作符所產(chǎn)生的流中包含的是 UnicastProcessor 類的對(duì)象,而 UnicastProcessor 類的 toString 方法輸出的就是 UnicastProcessor 字符。

清單 7. window 操作符使用示例

1

2

Flux.range(1, 100).window(20).subscribe(System.out::println);

Flux.intervalMillis(100).windowMillis(1001).take(2).toStream().forEach(System.out::println);

zipWith

zipWith 操作符把當(dāng)前流中的元素與另外一個(gè)流中的元素按照一對(duì)一的方式進(jìn)行合并。在合并時(shí)可以不做任何處理,由此得到的是一個(gè)元素類型為 Tuple2 的流;也可以通過(guò)一個(gè) BiFunction 函數(shù)對(duì)合并的元素進(jìn)行處理,所得到的流的元素類型為該函數(shù)的返回值。

在代碼清單 8 中,兩個(gè)流中包含的元素分別是 a,b 和 c,d。第一個(gè) zipWith 操作符沒(méi)有使用合并函數(shù),因此結(jié)果流中的元素類型為 Tuple2;第二個(gè) zipWith 操作通過(guò)合并函數(shù)把元素類型變?yōu)?String。

清單 8. zipWith 操作符使用示例

1

2

3

4

5

6

Flux.just("a", "b")

.zipWith(Flux.just("c", "d"))

.subscribe(System.out::println);

Flux.just("a", "b")

.zipWith(Flux.just("c", "d"), (s1, s2) -> String.format("%s-%s", s1, s2))

.subscribe(System.out::println);

take

take 系列操作符用來(lái)從當(dāng)前流中提取元素。提取的方式可以有很多種。

take(long n),take(Duration timespan)和 takeMillis(long timespan):按照指定的數(shù)量或時(shí)間間隔來(lái)提取。

takeLast(long n):提取流中的最后 N 個(gè)元素。

takeUntil(Predicate predicate):提取元素直到 Predicate 返回 true。

takeWhile(Predicate continuePredicate): 當(dāng) Predicate 返回 true 時(shí)才進(jìn)行提取。

takeUntilOther(Publisher other):提取元素直到另外一個(gè)流開始產(chǎn)生元素。

在代碼清單 9 中,第一行語(yǔ)句輸出的是數(shù)字 1 到 10;第二行語(yǔ)句輸出的是數(shù)字 991 到 1000;第三行語(yǔ)句輸出的是數(shù)字 1 到 9;第四行語(yǔ)句輸出的是數(shù)字 1 到 10,使得 Predicate 返回 true 的元素也是包含在內(nèi)的。

清單 9. take 系列操作符使用示例

1

2

3

4

Flux.range(1, 1000).take(10).subscribe(System.out::println);

Flux.range(1, 1000).takeLast(10).subscribe(System.out::println);

Flux.range(1, 1000).takeWhile(i -> i <10).subscribe(System.out::println);

Flux.range(1, 1000).takeUntil(i -> i == 10).subscribe(System.out::println);

reduce 和 reduceWith

reduce 和 reduceWith 操作符對(duì)流中包含的所有元素進(jìn)行累積操作,得到一個(gè)包含計(jì)算結(jié)果的 Mono 序列。累積操作是通過(guò)一個(gè) BiFunction 來(lái)表示的。在操作時(shí)可以指定一個(gè)初始值。如果沒(méi)有初始值,則序列的第一個(gè)元素作為初始值。

在代碼清單 10 中,第一行語(yǔ)句對(duì)流中的元素進(jìn)行相加操作,結(jié)果為 5050;第二行語(yǔ)句同樣也是進(jìn)行相加操作,不過(guò)通過(guò)一個(gè) Supplier 給出了初始值為 100,所以結(jié)果為 5150。

清單 10. reduce 和 reduceWith 操作符使用示例

1

2

Flux.range(1, 100).reduce((x, y) -> x + y).subscribe(System.out::println);

Flux.range(1, 100).reduceWith(() -> 100, (x, y) -> x + y).subscribe(System.out::println);

merge 和 mergeSequential

merge 和 mergeSequential 操作符用來(lái)把多個(gè)流合并成一個(gè) Flux 序列。不同之處在于 merge 按照所有流中元素的實(shí)際產(chǎn)生順序來(lái)合并,而 mergeSequential 則按照所有流被訂閱的順序,以流為單位進(jìn)行合并。

代碼清單 11 中分別使用了 merge 和 mergeSequential 操作符。進(jìn)行合并的流都是每隔 100 毫秒產(chǎn)生一個(gè)元素,不過(guò)第二個(gè)流中的每個(gè)元素的產(chǎn)生都比第一個(gè)流要延遲 50 毫秒。在使用 merge 的結(jié)果流中,來(lái)自兩個(gè)流的元素是按照時(shí)間順序交織在一起;而使用 mergeSequential 的結(jié)果流則是首先產(chǎn)生第一個(gè)流中的全部元素,再產(chǎn)生第二個(gè)流中的全部元素。

清單 11. merge 和 mergeSequential 操作符使用示例

1

2

3

4

5

6

Flux.merge(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5))

.toStream()

.forEach(System.out::println);

Flux.mergeSequential(Flux.intervalMillis(0, 100).take(5), Flux.intervalMillis(50, 100).take(5))

.toStream()

.forEach(System.out::println);

flatMap 和 flatMapSequential

flatMap 和 flatMapSequential 操作符把流中的每個(gè)元素轉(zhuǎn)換成一個(gè)流,再把所有流中的元素進(jìn)行合并。flatMapSequential 和 flatMap 之間的區(qū)別與 mergeSequential 和 merge 之間的區(qū)別是一樣的。

在代碼清單 12 中,流中的元素被轉(zhuǎn)換成每隔 100 毫秒產(chǎn)生的數(shù)量不同的流,再進(jìn)行合并。由于第一個(gè)流中包含的元素?cái)?shù)量較少,所以在結(jié)果流中一開始是兩個(gè)流的元素交織在一起,然后就只有第二個(gè)流中的元素。

清單 12. flatMap 操作符使用示例

1

2

3

4

Flux.just(5, 10)

.flatMap(x -> Flux.intervalMillis(x * 10, 100).take(x))

.toStream()

.forEach(System.out::println);

concatMap

concatMap 操作符的作用也是把流中的每個(gè)元素轉(zhuǎn)換成一個(gè)流,再把所有流進(jìn)行合并。與 flatMap 不同的是,concatMap 會(huì)根據(jù)原始流中的元素順序依次把轉(zhuǎn)換之后的流進(jìn)行合并;與 flatMapSequential 不同的是,concatMap 對(duì)轉(zhuǎn)換之后的流的訂閱是動(dòng)態(tài)進(jìn)行的,而 flatMapSequential 在合并之前就已經(jīng)訂閱了所有的流。

代碼清單 13 與代碼清單 12 類似,只不過(guò)把 flatMap 換成了 concatMap,結(jié)果流中依次包含了第一個(gè)流和第二個(gè)流中的全部元素。

清單 13. concatMap 操作符使用示例

1

2

3

4

Flux.just(5, 10)

.concatMap(x -> Flux.intervalMillis(x * 10, 100).take(x))

.toStream()

.forEach(System.out::println);

combineLatest

combineLatest 操作符把所有流中的最新產(chǎn)生的元素合并成一個(gè)新的元素,作為返回結(jié)果流中的元素。只要其中任何一個(gè)流中產(chǎn)生了新的元素,合并操作就會(huì)被執(zhí)行一次,結(jié)果流中就會(huì)產(chǎn)生新的元素。在 代碼清單 14 中,流中最新產(chǎn)生的元素會(huì)被收集到一個(gè)數(shù)組中,通過(guò) Arrays.toString 方法來(lái)把數(shù)組轉(zhuǎn)換成 String。

清單 14. combineLatest 操作符使用示例

1

2

3

4

5

Flux.combineLatest(

Arrays::toString,

Flux.intervalMillis(100).take(5),

Flux.intervalMillis(50, 100).take(5)

).toStream().forEach(System.out::println);

消息處理

當(dāng)需要處理 Flux 或 Mono 中的消息時(shí),如之前的代碼清單所示,可以通過(guò) subscribe 方法來(lái)添加相應(yīng)的訂閱邏輯。在調(diào)用 subscribe 方法時(shí)可以指定需要處理的消息類型。可以只處理其中包含的正常消息,也可以同時(shí)處理錯(cuò)誤消息和完成消息。代碼清單 15 中通過(guò) subscribe()方法同時(shí)處理了正常消息和錯(cuò)誤消息。

清單 15. 通過(guò) subscribe()方法處理正常和錯(cuò)誤消息

1

2

3

Flux.just(1, 2)

.concatWith(Mono.error(new IllegalStateException()))

.subscribe(System.out::println, System.err::println);

正常的消息處理相對(duì)簡(jiǎn)單。當(dāng)出現(xiàn)錯(cuò)誤時(shí),有多種不同的處理策略。第一種策略是通過(guò) onErrorReturn()方法返回一個(gè)默認(rèn)值。在代碼清單 16 中,當(dāng)出現(xiàn)錯(cuò)誤時(shí),流會(huì)產(chǎn)生默認(rèn)值 0.

清單 16. 出現(xiàn)錯(cuò)誤時(shí)返回默認(rèn)值

1

2

3

4

Flux.just(1, 2)

.concatWith(Mono.error(new IllegalStateException()))

.onErrorReturn(0)

.subscribe(System.out::println);

第二種策略是通過(guò) switchOnError()方法來(lái)使用另外的流來(lái)產(chǎn)生元素。在代碼清單 17 中,當(dāng)出現(xiàn)錯(cuò)誤時(shí),將產(chǎn)生 Mono.just(0)對(duì)應(yīng)的流,也就是數(shù)字 0。

清單 17. 出現(xiàn)錯(cuò)誤時(shí)使用另外的流

1

2

3

4

Flux.just(1, 2)

.concatWith(Mono.error(new IllegalStateException()))

.switchOnError(Mono.just(0))

.subscribe(System.out::println);

第三種策略是通過(guò) onErrorResumeWith()方法來(lái)根據(jù)不同的異常類型來(lái)選擇要使用的產(chǎn)生元素的流。在代碼清單 18 中,根據(jù)異常類型來(lái)返回不同的流作為出現(xiàn)錯(cuò)誤時(shí)的數(shù)據(jù)來(lái)源。因?yàn)楫惓5念愋蜑?IllegalArgumentException,所產(chǎn)生的元素為-1。

清單 18. 出現(xiàn)錯(cuò)誤時(shí)根據(jù)異常類型來(lái)選擇流

1

2

3

4

5

6

7

8

9

10

11

Flux.just(1, 2)

.concatWith(Mono.error(new IllegalArgumentException()))

.onErrorResumeWith(e -> {

if (e instanceof IllegalStateException) {

return Mono.just(0);

} else if (e instanceof IllegalArgumentException) {

return Mono.just(-1);

}

return Mono.empty();

})

.subscribe(System.out::println);

當(dāng)出現(xiàn)錯(cuò)誤時(shí),還可以通過(guò) retry 操作符來(lái)進(jìn)行重試。重試的動(dòng)作是通過(guò)重新訂閱序列來(lái)實(shí)現(xiàn)的。在使用 retry 操作符時(shí)可以指定重試的次數(shù)。代碼清單 19 中指定了重試次數(shù)為 1,所輸出的結(jié)果是 1,2,1,2 和錯(cuò)誤信息。

清單 19. 使用 retry 操作符進(jìn)行重試

1

2

3

4

Flux.just(1, 2)

.concatWith(Mono.error(new IllegalStateException()))

.retry(1)

.subscribe(System.out::println);

調(diào)度器

前面介紹了反應(yīng)式流和在其上可以進(jìn)行的各種操作,通過(guò)調(diào)度器(Scheduler)可以指定這些操作執(zhí)行的方式和所在的線程。有下面幾種不同的調(diào)度器實(shí)現(xiàn)。

當(dāng)前線程,通過(guò) Schedulers.immediate()方法來(lái)創(chuàng)建。

單一的可復(fù)用的線程,通過(guò) Schedulers.single()方法來(lái)創(chuàng)建。

使用彈性的線程池,通過(guò) Schedulers.elastic()方法來(lái)創(chuàng)建。線程池中的線程是可以復(fù)用的。當(dāng)所需要時(shí),新的線程會(huì)被創(chuàng)建。如果一個(gè)線程閑置太長(zhǎng)時(shí)間,則會(huì)被銷毀。該調(diào)度器適用于 I/O 操作相關(guān)的流的處理。

使用對(duì)并行操作優(yōu)化的線程池,通過(guò) Schedulers.parallel()方法來(lái)創(chuàng)建。其中的線程數(shù)量取決于 CPU 的核的數(shù)量。該調(diào)度器適用于計(jì)算密集型的流的處理。

使用支持任務(wù)調(diào)度的調(diào)度器,通過(guò) Schedulers.timer()方法來(lái)創(chuàng)建。

從已有的 ExecutorService 對(duì)象中創(chuàng)建調(diào)度器,通過(guò) Schedulers.fromExecutorService()方法來(lái)創(chuàng)建。

某些操作符默認(rèn)就已經(jīng)使用了特定類型的調(diào)度器。比如 intervalMillis()方法創(chuàng)建的流就使用了由 Schedulers.timer()創(chuàng)建的調(diào)度器。通過(guò) publishOn()和 subscribeOn()方法可以切換執(zhí)行操作的調(diào)度器。其中 publishOn()方法切換的是操作符的執(zhí)行方式,而 subscribeOn()方法切換的是產(chǎn)生流中元素時(shí)的執(zhí)行方式。

在代碼清單 20 中,使用 create()方法創(chuàng)建一個(gè)新的 Flux 對(duì)象,其中包含唯一的元素是當(dāng)前線程的名稱。接著是兩對(duì) publishOn()和 map()方法,其作用是先切換執(zhí)行時(shí)的調(diào)度器,再把當(dāng)前的線程名稱作為前綴添加。最后通過(guò) subscribeOn()方法來(lái)改變流產(chǎn)生時(shí)的執(zhí)行方式。運(yùn)行之后的結(jié)果是[elastic-2] [single-1] parallel-1。最內(nèi)層的線程名字 parallel-1 來(lái)自產(chǎn)生流中元素時(shí)使用的 Schedulers.parallel()調(diào)度器,中間的線程名稱 single-1 來(lái)自第一個(gè) map 操作之前的 Schedulers.single()調(diào)度器,最外層的線程名字 elastic-2 來(lái)自第二個(gè) map 操作之前的 Schedulers.elastic()調(diào)度器。

清單 20. 使用調(diào)度器切換操作符執(zhí)行方式

1

2

3

4

5

6

7

8

9

10

11

Flux.create(sink -> {

sink.next(Thread.currentThread().getName());

sink.complete();

})

.publishOn(Schedulers.single())

.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))

.publishOn(Schedulers.elastic())

.map(x -> String.format("[%s] %s", Thread.currentThread().getName(), x))

.subscribeOn(Schedulers.parallel())

.toStream()

.forEach(System.out::println);

測(cè)試

在對(duì)使用 Reactor 的代碼進(jìn)行測(cè)試時(shí),需要用到 io.projectreactor.addons:reactor-test 庫(kù)。

使用 StepVerifier

進(jìn)行測(cè)試時(shí)的一個(gè)典型的場(chǎng)景是對(duì)于一個(gè)序列,驗(yàn)證其中所包含的元素是否符合預(yù)期。StepVerifier 的作用是可以對(duì)序列中包含的元素進(jìn)行逐一驗(yàn)證。在代碼清單 21 中,需要驗(yàn)證的流中包含 a 和 b 兩個(gè)元素。通過(guò) StepVerifier.create()方法對(duì)一個(gè)流進(jìn)行包裝之后再進(jìn)行驗(yàn)證。expectNext()方法用來(lái)聲明測(cè)試時(shí)所期待的流中的下一個(gè)元素的值,而 verifyComplete()方法則驗(yàn)證流是否正常結(jié)束。類似的方法還有 verifyError()來(lái)驗(yàn)證流由于錯(cuò)誤而終止。

清單 21. 使用 StepVerifier 驗(yàn)證流中的元素

1

2

3

4

StepVerifier.create(Flux.just("a", "b"))

.expectNext("a")

.expectNext("b")

.verifyComplete();

操作測(cè)試時(shí)間

有些序列的生成是有時(shí)間要求的,比如每隔 1 分鐘才產(chǎn)生一個(gè)新的元素。在進(jìn)行測(cè)試中,不可能花費(fèi)實(shí)際的時(shí)間來(lái)等待每個(gè)元素的生成。此時(shí)需要用到 StepVerifier 提供的虛擬時(shí)間功能。通過(guò) StepVerifier.withVirtualTime()方法可以創(chuàng)建出使用虛擬時(shí)鐘的 StepVerifier。通過(guò) thenAwait(Duration)方法可以讓虛擬時(shí)鐘前進(jìn)。

在代碼清單 22 中,需要驗(yàn)證的流中包含兩個(gè)產(chǎn)生間隔為一天的元素,并且第一個(gè)元素的產(chǎn)生延遲是 4 個(gè)小時(shí)。在通過(guò) StepVerifier.withVirtualTime()方法包裝流之后,expectNoEvent()方法用來(lái)驗(yàn)證在 4 個(gè)小時(shí)之內(nèi)沒(méi)有任何消息產(chǎn)生,然后驗(yàn)證第一個(gè)元素 0 產(chǎn)生;接著 thenAwait()方法來(lái)讓虛擬時(shí)鐘前進(jìn)一天,然后驗(yàn)證第二個(gè)元素 1 產(chǎn)生;最后驗(yàn)證流正常結(jié)束。

清單 22. 操作測(cè)試時(shí)間

1

2

3

4

5

6

7

StepVerifier.withVirtualTime(() -> Flux.interval(Duration.ofHours(4), Duration.ofDays(1)).take(2))

.expectSubscription()

.expectNoEvent(Duration.ofHours(4))

.expectNext(0L)

.thenAwait(Duration.ofDays(1))

.expectNext(1L)

.verifyComplete();

使用 TestPublisher

TestPublisher 的作用在于可以控制流中元素的產(chǎn)生,甚至是違反反應(yīng)流規(guī)范的情況。在代碼清單 23 中,通過(guò) create()方法創(chuàng)建一個(gè)新的 TestPublisher 對(duì)象,然后使用 next()方法來(lái)產(chǎn)生元素,使用 complete()方法來(lái)結(jié)束流。TestPublisher 主要用來(lái)測(cè)試開發(fā)人員自己創(chuàng)建的操作符。

清單 23. 使用 TestPublisher 創(chuàng)建測(cè)試所用的流

1

2

3

4

5

6

7

8

9

final TestPublisher testPublisher = TestPublisher.create();

testPublisher.next("a");

testPublisher.next("b");

testPublisher.complete();

StepVerifier.create(testPublisher)

.expectNext("a")

.expectNext("b")

.expectComplete();

調(diào)試

由于反應(yīng)式編程范式與傳統(tǒng)編程范式的差異性,使用 Reactor 編寫的代碼在出現(xiàn)問(wèn)題時(shí)比較難進(jìn)行調(diào)試。為了更好的幫助開發(fā)人員進(jìn)行調(diào)試,Reactor 提供了相應(yīng)的輔助功能。

啟用調(diào)試模式

當(dāng)需要獲取更多與流相關(guān)的執(zhí)行信息時(shí),可以在程序開始的地方添加代碼清單 24 中的代碼來(lái)啟用調(diào)試模式。在調(diào)試模式啟用之后,所有的操作符在執(zhí)行時(shí)都會(huì)保存額外的與執(zhí)行鏈相關(guān)的信息。當(dāng)出現(xiàn)錯(cuò)誤時(shí),這些信息會(huì)被作為異常堆棧信息的一部分輸出。通過(guò)這些信息可以分析出具體是在哪個(gè)操作符的執(zhí)行中出現(xiàn)了問(wèn)題。

清單 24. 啟用調(diào)試模式

1Hooks.onOperator(providedHook -> providedHook.operatorStacktrace());

不過(guò)當(dāng)調(diào)試模式啟用之后,記錄這些額外的信息是有代價(jià)的。一般只有在出現(xiàn)了錯(cuò)誤之后,再考慮啟用調(diào)試模式。但是當(dāng)為了找到問(wèn)題而啟用了調(diào)試模式之后,之前的錯(cuò)誤不一定能很容易重現(xiàn)出來(lái)。為了減少可能的開銷,可以限制只對(duì)特定類型的操作符啟用調(diào)試模式。

使用檢查點(diǎn)

另外一種做法是通過(guò) checkpoint 操作符來(lái)對(duì)特定的流處理鏈來(lái)啟用調(diào)試模式。代碼清單 25 中,在 map 操作符之后添加了一個(gè)名為 test 的檢查點(diǎn)。當(dāng)出現(xiàn)錯(cuò)誤時(shí),檢查點(diǎn)名稱會(huì)出現(xiàn)在異常堆棧信息中。對(duì)于程序中重要或者復(fù)雜的流處理鏈,可以在關(guān)鍵的位置上啟用檢查點(diǎn)來(lái)幫助定位可能存在的問(wèn)題。

清單 25. 使用 checkpoint 操作符

1Flux.just(1, 0).map(x -> 1 / x).checkpoint("test").subscribe(System.out::println);

日志記錄

在開發(fā)和調(diào)試中的另外一項(xiàng)實(shí)用功能是把流相關(guān)的事件記錄在日志中。這可以通過(guò)添加 log 操作符來(lái)實(shí)現(xiàn)。在代碼清單 26 中,添加了 log 操作符并指定了日志分類的名稱。

清單 26. 使用 log 操作符記錄事件

1Flux.range(1, 2).log("Range").subscribe(System.out::println);

在實(shí)際的運(yùn)行時(shí),所產(chǎn)生的輸出如代碼清單 27 所示。

清單 27. log 操作符所產(chǎn)生的日志

1

2

3

4

5

6

7

8

13:07:56.735 [main] DEBUG reactor.util.Loggers$LoggerFactory - Using Slf4j logging framework

13:07:56.751 [main] INFO Range - | onSubscribe([Synchronous Fuseable] FluxRange.RangeSubscription)

13:07:56.753 [main] INFO Range - | request(unbounded)

13:07:56.754 [main] INFO Range - | onNext(1)

1

13:07:56.754 [main] INFO Range - | onNext(2)

2

13:07:56.754 [main] INFO Range - | onComplete()

“冷”與“熱”序列

之前的代碼清單中所創(chuàng)建的都是冷序列。冷序列的含義是不論訂閱者在何時(shí)訂閱該序列,總是能收到序列中產(chǎn)生的全部消息。而與之對(duì)應(yīng)的熱序列,則是在持續(xù)不斷地產(chǎn)生消息,訂閱者只能獲取到在其訂閱之后產(chǎn)生的消息。

在代碼清單 28 中,原始的序列中包含 10 個(gè)間隔為 1 秒的元素。通過(guò) publish()方法把一個(gè) Flux 對(duì)象轉(zhuǎn)換成 ConnectableFlux 對(duì)象。方法 autoConnect()的作用是當(dāng) ConnectableFlux 對(duì)象有一個(gè)訂閱者時(shí)就開始產(chǎn)生消息。代碼 source.subscribe()的作用是訂閱該 ConnectableFlux 對(duì)象,讓其開始產(chǎn)生數(shù)據(jù)。接著當(dāng)前線程睡眠 5 秒鐘,第二個(gè)訂閱者此時(shí)只能獲得到該序列中的后 5 個(gè)元素,因此所輸出的是數(shù)字 5 到 9。

清單 28. 熱序列

1

2

3

4

5

6

7

8

9

final Flux source = Flux.intervalMillis(1000)

.take(10)

.publish()

.autoConnect();

source.subscribe();

Thread.sleep(5000);

source

.toStream()

.forEach(System.out::println);

小結(jié)

反應(yīng)式編程范式對(duì)于習(xí)慣了傳統(tǒng)編程范式的開發(fā)人員來(lái)說(shuō),既是一個(gè)需要進(jìn)行思維方式轉(zhuǎn)變的挑戰(zhàn),也是一個(gè)充滿了更多可能的機(jī)會(huì)。Reactor 作為一個(gè)基于反應(yīng)式流規(guī)范的新的 Java 庫(kù),可以作為反應(yīng)式應(yīng)用的基礎(chǔ)。本文對(duì) Reactor 庫(kù)做了詳細(xì)的介紹,包括 Flux 和 Mono 序列的創(chuàng)建、常用操作符的使用、調(diào)度器、錯(cuò)誤處理以及測(cè)試和調(diào)試技巧等。

參考資源 (resources)

參考 Reactor 的官方網(wǎng)站,了解 Reactor 的更多內(nèi)容。

查看 Reactor 的用戶指南。

查看 InfoQ 上的Reactor by Example。

查看反應(yīng)式流規(guī)范。

下載資源

評(píng)論

有新評(píng)論時(shí)提醒我

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容