本文目標(biāo)
- 理解響應(yīng)式編程
前言
之前的《聊聊 IO 多路復(fù)用》中,我們理解了非阻塞 IO 的意義。但是 Spring MVC 并不能完美的應(yīng)用非阻塞編程,于是 Spring 團(tuán)隊(duì)開發(fā)了 WebFlux,而 WebFlux 的基礎(chǔ)正是本文要講到的 Project Reactor(下文簡(jiǎn)稱為 Reactor)
本文以 Reactor 為例帶大家入門響應(yīng)式編程
版本
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>3.4.6</version>
</dependency>
什么是 Reactor
Reactor 是 JVM 的非阻塞響應(yīng)式編程基礎(chǔ),支持背壓。 它直接與 Java 8 函數(shù)式 API 集成,特別是 CompletableFuture、Stream 和 Duration。 它提供了可組合的異步序列 API — Flux(用于 [N] 個(gè)元素)和 Mono(用于 [0|1] 個(gè)元素),并實(shí)現(xiàn)了 Reactive Streams 規(guī)范。
在 Reactor 的基礎(chǔ)上還演化出了適合微服務(wù)架構(gòu)的 Reactor Netty 。為 HTTP(包括 Websockets)、TCP 和 UDP 提供支持背壓和響應(yīng)式的網(wǎng)絡(luò)引擎。
上面是對(duì)于官方文檔的翻譯。下面來說說我自己對(duì) Reactor 和響應(yīng)式編程的理解。
回想一下之前的非阻塞 IO 編程,例如我們現(xiàn)在要用非阻塞的方式調(diào)用一個(gè)遠(yuǎn)程服務(wù),當(dāng)遠(yuǎn)程接口數(shù)據(jù)可用時(shí)去做一些業(yè)務(wù)處理。這時(shí)候代碼怎么寫呢?我們需要提供一個(gè)回調(diào)函數(shù),然后在響應(yīng)就緒的時(shí)候,去調(diào)用我們的回調(diào)函數(shù)。
從邏輯上來看,這完全沒有問題。但是如果我們的回調(diào)很復(fù)雜,代碼看起來會(huì)是什么樣呢?
// 以下案例來自 Reactor 官網(wǎng)
userService.getFavorites(userId, new Callback<List<String>>() {
public void onSuccess(List<String> list) {
if (list.isEmpty()) {
suggestionService.getSuggestions(new Callback<List<Favorite>>() {
public void onSuccess(List<Favorite> list) {
UiUtils.submitOnUiThread(() -> {
list.stream()
.limit(5)
.forEach(uiList::show);
});
}
public void onError(Throwable error) {
UiUtils.errorPopup(error);
}
});
} else {
list.stream()
.limit(5)
.forEach(favId -> favoriteService.getDetails(favId,
new Callback<Favorite>() {
public void onSuccess(Favorite details) {
UiUtils.submitOnUiThread(() -> uiList.show(details));
}
public void onError(Throwable error) {
UiUtils.errorPopup(error);
}
}
));
}
}
public void onError(Throwable error) {
UiUtils.errorPopup(error);
}
});
這個(gè)代碼說實(shí)話已經(jīng)有點(diǎn)回調(diào)地獄那味兒了,讓一段不是很復(fù)雜的邏輯變得很難讀了。但是如果用 Reactor 寫呢?
// 以下案例來自 Reactor 官網(wǎng)
userService.getFavorites(userId)
.flatMap(favoriteService::getDetails)
.switchIfEmpty(suggestionService.getSuggestions())
.take(5)
.publishOn(UiUtils.uiThreadScheduler())
.subscribe(uiList::show, UiUtils::errorPopup);
可以看到,代碼變得非常的簡(jiǎn)潔。唯一帶來的困擾就是,我們不知道這些函數(shù)到底是啥意思 ??
響應(yīng)式編程雖然有非常多的特性,但是它并不是什么神奇的技術(shù),它也是建立在傳統(tǒng)命令式編程的基礎(chǔ)上。只不過它所提供的 API 以及規(guī)范更適合在非阻塞 IO 中使用。雖然在非阻塞 IO 框架中幾乎只使用響應(yīng)式編程(Vertx,WebFlux),只是因?yàn)檫@樣做更合適,并不是說沒了響應(yīng)式編程,就玩不了非阻塞 IO 了。
響應(yīng)式編程內(nèi)幕
Reactor 實(shí)現(xiàn)了 org.reactivestreams 提供的 Java 響應(yīng)式編程規(guī)范,我們只要了解 reactivestreams 中代碼是如何運(yùn)轉(zhuǎn)的,再看 Reactor 相關(guān)的代碼就容易多了。
下圖展示了 reactivestreams 中的核心接口

Publisher:發(fā)布者
Subscriber:訂閱者
Subscription:這個(gè)單詞中文翻譯為名詞的訂閱,在代碼中它是發(fā)布者和訂閱者之間的媒介
Processor:該接口繼承了發(fā)布者和訂閱者,可以理解為發(fā)布者和訂閱者的中間操作(但是 Reactor 的中間操作并沒有實(shí)現(xiàn) Processor,在最新版本的 Reactor 中,Processor 的相關(guān)實(shí)現(xiàn)接口已經(jīng)被棄用)
在了解了響應(yīng)式編程的核心接口之后,我們來看下響應(yīng)式編程是如何運(yùn)作的

在 Reactor 中大部分實(shí)現(xiàn)都是按照上圖的邏輯來執(zhí)行的
- 首先是Subscriber(訂閱者)主動(dòng)訂閱 Publisher(發(fā)布者),通過調(diào)用 Publisher 的 subscribe 方法
- Publisher 在向下游發(fā)送數(shù)據(jù)之前,會(huì)先調(diào)用 Subscriber 的 onSubscribe 方法,傳遞的參數(shù)為 Subscription(訂閱媒介)
- Subscriber 通過 Subscription#request 來請(qǐng)求數(shù)據(jù),或者 Subscription#cancel 來取消數(shù)據(jù)發(fā)布(這就是響應(yīng)式編程中的背壓,訂閱者可以控制數(shù)據(jù)發(fā)布)
- Subscription 在接收到訂閱者的調(diào)用后,通過 Subscriber#onNext 向下游訂閱者傳遞數(shù)據(jù)。
- 在數(shù)據(jù)發(fā)布完成后,調(diào)用 Subscriber#onComplete 結(jié)束本次流,如果數(shù)據(jù)發(fā)布或者處理遇到錯(cuò)誤會(huì)調(diào)用 Subscriber#onError
調(diào)用 Subscriber#onNext,onComplete,onError 這三個(gè)方法,可能是在 Publisher 中做的,也可能是在 Subscription 中做的,根據(jù)不同的場(chǎng)景有不同的實(shí)現(xiàn)方式,并沒有什么嚴(yán)格的要求??梢哉J(rèn)為 Publisher 和 Subscription 共同配合完成了數(shù)據(jù)發(fā)布
其實(shí) Reactor 中 API 實(shí)現(xiàn)原理也都是這個(gè)套路,我這邊也自己寫了個(gè)例子便于讓讀者加深對(duì)響應(yīng)式編程的理解
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
/**
* @author tianwen.yin
*/
public class SimpleReactiveStream {
/**
* 實(shí)現(xiàn)一個(gè)簡(jiǎn)單的響應(yīng)式編程發(fā)布者
* 邏輯:當(dāng)訂閱者發(fā)起訂閱時(shí),像下游發(fā)送一個(gè) HelloWorld,發(fā)布邏輯由 SimpleSubscription 完成
*/
static class SimplePublisher implements Publisher {
@Override
public void subscribe(Subscriber s) {
// 2. Publisher 發(fā)布數(shù)據(jù)之前,調(diào)用 Subscriber 的 onSubscribe
s.onSubscribe(new SimpleSubscription(data(), s));
}
private String data() {
return "Hello World";
}
}
static class SimpleSubscriber implements Subscriber {
@Override
public void onSubscribe(Subscription s) {
// 3. Subscriber 通過 Subscription#request 來請(qǐng)求數(shù)據(jù)
// 或者 Subscription#cancel 來取消數(shù)據(jù)發(fā)布
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Object o) {
System.out.println(o);
}
@Override
public void onError(Throwable t) {
System.out.println("error");
}
@Override
public void onComplete() {
System.out.println("complete");
}
}
static class SimpleSubscription implements Subscription {
String data;
Subscriber actual;
boolean isCanceled;
public SimpleSubscription(String data, Subscriber actual) {
this.data = data;
this.actual = actual;
}
@Override
public void request(long n) {
if (!isCanceled) {
try {
// 4. Subscription 在接收到訂閱者的調(diào)用后
// 通過 Subscriber#onNext 向下游訂閱者傳遞數(shù)據(jù)
actual.onNext(data);
// 5. 在數(shù)據(jù)發(fā)布完成后,調(diào)用 Subscriber#onComplete 結(jié)束本次流
actual.onComplete();
} catch (Exception e) {
// 如果數(shù)據(jù)發(fā)布或者處理遇到錯(cuò)誤會(huì)調(diào)用 Subscriber#onError
actual.onError(e);
}
}
}
@Override
public void cancel() {
isCanceled = true;
}
}
public static void main(String[] args) {
// 1. Subscriber ”訂閱“ Publisher
new SimplePublisher().subscribe(new SimpleSubscriber());
}
}
響應(yīng)式編程思想
響應(yīng)式編程,就像裝配一條流水線。Publisher 規(guī)定了數(shù)據(jù)如何生產(chǎn),中間會(huì)有 Operators(操作符)對(duì)流水線的數(shù)據(jù)進(jìn)行解析,校驗(yàn),轉(zhuǎn)換等等操作,最終處理好的數(shù)據(jù)流轉(zhuǎn)到 Subscriber。

這條流水線還有一個(gè)特點(diǎn)。大部分情況下當(dāng) Publisher 的 subscribe 方法被調(diào)用之前,什么都不會(huì)發(fā)生。在被訂閱之前我們只是在定義流水線該如何工作,直到真正有人需要的時(shí)候,流水線才會(huì)啟動(dòng)。
Reactor 中的 Operator
Operators 怎么理解呢?對(duì)于上游來說,Operators 像一個(gè)訂閱者,而對(duì)于它的下游來說,它像一個(gè)發(fā)布者(我們上文說過了 Reactor 中的中間操作并沒有實(shí)現(xiàn) Processor 接口)
Mono.just("hello")
.map(a -> a + "world")
.subscribe(System.out::println);
舉個(gè)簡(jiǎn)單的例子,在上面的代碼中,map 就是一個(gè) Operator,它的實(shí)現(xiàn)思路是什么?來看下面的代碼
// 注意,這是我基于 Reactor API 實(shí)現(xiàn)的偽代碼!
public static class MonoMap implements Publisher {
// 我們自定義的轉(zhuǎn)換邏輯
private Function mapper;
// source 代表當(dāng)前操作符的上游發(fā)布者
private Publisher source;
public MonoMap(Publisher source, Function mapper) {
this.source = source;
this.mapper = mapper;
}
@Override
public void subscribe(Subscriber actual) {
source.subscribe(new MonoMapSubscriber(mapper, actual));
}
}
public static class MonoMapSubscriber implements Subscriber {
// 我們自定義的轉(zhuǎn)換邏輯
private Function mapper;
// 真正的下游
private Subscriber actual;
public MonoMapSubscriber(Function mapper, Subscriber actual) {
this.mapper = mapper;
this.actual = actual;
}
@Override
public void onSubscribe(Subscription s) {
actual.onSubscribe(s);
}
@Override
public void onNext(Object o) {
// 當(dāng)上游數(shù)據(jù)發(fā)送過來時(shí),先進(jìn)行轉(zhuǎn)換再發(fā)送給下游
Object result = mapper.apply(o);
actual.onNext(result);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
}
上述代碼是我自己實(shí)現(xiàn)的一個(gè)偽代碼,用于讓大家理解操作符的實(shí)現(xiàn)思路,實(shí)際 Reactor 代碼也是這個(gè)思路,只不過實(shí)現(xiàn)的更加巧妙和嚴(yán)謹(jǐn)
我們首先來分析一下 Mono.just("hello").map(a -> a + "world") 這句話
當(dāng)執(zhí)行到 Mono.just 時(shí),會(huì)新建一個(gè) MonoJust 對(duì)象作為當(dāng)前的 Publisher。該發(fā)布者的邏輯是,當(dāng)訂閱時(shí),向下游發(fā)送數(shù)據(jù) "hello"
-
當(dāng)執(zhí)行到 map 方法時(shí),會(huì)新建一個(gè) MonoMap 對(duì)象替作為當(dāng)前的 Publisher,MonoJust 成為了 MonoMap 中的一個(gè)屬性 source(實(shí)際的上游)
當(dāng) MonoMap 被訂閱時(shí),會(huì)先將它的下游 actual 做一層包裝,也就是我們上面的 MonoMapSubscriber。然后去調(diào)用 source 的 subscribe 方法。上游發(fā)布數(shù)據(jù)時(shí),MonoMapSubscriber 先對(duì)數(shù)據(jù)進(jìn)行轉(zhuǎn)換(我們上面的拼接字符串操作),然后再發(fā)送給 actual(它的下游)
當(dāng) MonoMap 被再次轉(zhuǎn)換時(shí),MonoMap 就變成了下游操作符的 source...
最后通過一張圖來總結(jié)一下

Reactor 該如何學(xué)習(xí)
本文并沒有介紹太多 Reactor 的細(xì)節(jié),因?yàn)檫@些東西實(shí)在是太多了。我想聊聊我自己是如何學(xué)習(xí) Reactor 的
如果你已經(jīng)通過本文理解了響應(yīng)式編程的核心接口是如何工作的了,那恭喜你已經(jīng)邁向了成功的第一步了。接下來就是閱讀官方文檔,不斷的練習(xí)和閱讀 Reactor 的源碼。源碼追蹤的方向已經(jīng)很明確了,當(dāng)我們想了解一個(gè)發(fā)布者的實(shí)現(xiàn)原理是什么,我就要去關(guān)注這個(gè)發(fā)布者的 subscribe 方法和 Subscription 都做了什么。想了解消費(fèi)者的邏輯,就看它的 onNext,onComplete,onError。
最后
如果覺得我的文章對(duì)你有幫助,動(dòng)動(dòng)小手點(diǎn)下關(guān)注,你的支持是對(duì)我最大的幫助