一、前言
上篇文章介紹了 Kotlin Coroutine 的實(shí)現(xiàn)原理。因?yàn)槠蓿⑽唇榻B Kotlin Coroutine 具體是如何與其它異步編程技術(shù)整合的。本文將向大家介紹 Kotlin Coroutine 是如何與 Spring Reactor 整合。
雖然本文的標(biāo)題是關(guān)于 Kotlin Coroutine 與 Spring WebFlux 的,但其實(shí)講的是 Kotlin Coroutine 是如何與 Spring Reactor 整合的。因?yàn)?Spring Reactor 是 Spring WebFlux 的基礎(chǔ),所以不管起哪個(gè)標(biāo)題,內(nèi)容都是類似的。
因?yàn)?Spring WebFlux 這個(gè)名字更容易吸引人,所以本文便做了回標(biāo)題黨。
Kotlin Coroutine 與 Spring Reactor 的整合主要是通過 kotlinx-coroutines-reactive 和 kotlinx-coroutines-reactor 實(shí)現(xiàn)的。本文提到的源碼都能從這兩個(gè)模塊中找到。
二、示例
本文將繼續(xù)使用《Kotlin Coroutine 初探》一文中的在 Spring WebFlux 中使用 Kotlin Coroutine 的示例:
@GetMapping("/coroutine/{personId}")
fun getNumberOfMessages(@PathVariable personId: String) = mono(Unconfined) { // ①
val person = peopleRepository.findById(personId).awaitFirstOrDefault(null) // ②
?: throw NoSuchElementException("No person can be found by $personId")
// ②
val lastLoginDate = auditRepository.findByEmail(person.email).awaitSingle().eventDate
val numberOfMessages =
messageRepository.countByMessageDateGreaterThanAndEmail(lastLoginDate, person.email).awaitSingle() // ②
"Hello ${person.name}, you have $numberOfMessages messages since $lastLoginDate"
}
本文將重點(diǎn)介紹上面代碼標(biāo)注的 ①、② 兩點(diǎn)。
第 ① 點(diǎn):mono(Unconfined) { ... } 的實(shí)現(xiàn)原理
第 ② 點(diǎn):awaitXXX 方法的實(shí)現(xiàn)原理
理解了上面兩點(diǎn),就能理解 Kotlin Coroutine 是如何與 Spring Reactor 整合的了。
三、Spring Reactor 相關(guān)知識(shí)
本節(jié)會(huì)介紹一下后面會(huì)涉及到的 Spring Reactor 的概念和實(shí)現(xiàn)細(xì)節(jié)方面的內(nèi)容。之所以將 Spring Reactor 的部分內(nèi)容單拎出來,是因?yàn)轭A(yù)先了解 Spring Reactor 的部分內(nèi)容對(duì)理解后面的內(nèi)容非常重要。
1. Publisher/Mono/Flux
Reactive Streams 是 Spring、Netflix 等公司提出的一個(gè)反應(yīng)式編程的一個(gè)規(guī)范。這個(gè)規(guī)范定義了必要的接口和對(duì)實(shí)現(xiàn)的要求。Publisher 是其中一個(gè)重要的接口:
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
Publisher 顧名思義,發(fā)布者,等同于很多技術(shù)中的 Observable。subscribe 方法實(shí)際應(yīng)該被看做是被訂閱,即 aPublisher.subscribe(aSubscriber) 應(yīng)該理解為 aPublisher 被 aSubscriber 訂閱。
但不同于之前同樣有訂閱機(jī)制的技術(shù),比方說一些消息隊(duì)列。之前的訂閱機(jī)制中,訂閱和消息的發(fā)布是兩個(gè)獨(dú)立的環(huán)節(jié)。而 Publisher.subscribe(Subscriber) 方法,如果被調(diào)用,則會(huì)使這個(gè) Publisher 開始發(fā)布消息。
Mono 和 Flux 是在實(shí)現(xiàn)了 Reactive Streams 規(guī)范的 Spring Reactor 中的兩個(gè)類。這兩個(gè)類實(shí)現(xiàn)了 Publisher 接口。不同于類似的 RxJava 2,其只有一個(gè) Publisher 的實(shí)現(xiàn)類。Spring Reactor 中兩個(gè) Publisher 實(shí)現(xiàn)類,分別代表單個(gè)元素和多個(gè)元素兩種場景。
在使用了 Spring WebFlux(基于 Spring Reactor 的新一代 Web 框架)的項(xiàng)目中,一個(gè)被 @RequestMapping 標(biāo)注的方法需要返回 Mono 或 Flux。然后,當(dāng)一個(gè)請(qǐng)求根據(jù)映射配置被轉(zhuǎn)發(fā)到這個(gè)方法上時(shí),一個(gè) Mono 或 Flux 對(duì)象會(huì)根據(jù)這個(gè)方法的定義創(chuàng)建出一個(gè) Mono。但這時(shí)真正的請(qǐng)求處理并未開始,方法返回的只是一個(gè)處理的步驟定義。
當(dāng) Spring WebFlux 框架得到這個(gè)方法返回的 Mono 或 Flux 之后,會(huì)調(diào)用它們的 subscribe(Subcriber) 方法。此時(shí),真正的請(qǐng)求處理便開始了。
后面的內(nèi)容為了簡便,會(huì)省略 Flux,僅會(huì)提到 Mono,但兩者的原理基本類似。
2. Mono.create(Consumer<MonoSink<T>>) 方法
Spring WebFlux 和傳統(tǒng) Spring MVC 最大的不同就是要求方法返回 Mono 或 Flux。當(dāng) Spring WebFlux 與 Kotlin Coroutine 整合后,我們需要將 Coroutine 轉(zhuǎn)換成一個(gè) Mono(或者 Flux,后面將省略 Flux)。
如何做呢?Kotlin Coroutine 使用的是 Mono.create(Consumer<MonoSink<T>>) 方法。
從方法簽名看,Mono.create 方法涉及到最主要的接口是 MonoSink(此處不解釋 Consumer 接口)。
MonoSink 是什么呢?其 API 文檔是這么解釋的:
Wrapper API around an actual downstream Subscriber for emitting nothing, a single value or an error (mutually exclusive).
簡單理解就是對(duì)后續(xù) Subscriber 的封裝。
可能有些同學(xué)對(duì) Sink 這個(gè)詞有些陌生,我起初也是這種感覺。但是對(duì)于一些做過流處理相關(guān)開發(fā)的同學(xué),這個(gè)詞應(yīng)該不陌生。原因是 Sink 這個(gè)詞經(jīng)常出現(xiàn)在流處理相關(guān)技術(shù)中(比如 Flink、Flume)。在 Spring Cloud Stream 中,也能看到這個(gè)詞。同樣,Spring Reactor 是對(duì) Reactive Streams 規(guī)范的實(shí)現(xiàn),也可以看做是另一種形式的流技術(shù),所以,出現(xiàn) Sink 這個(gè)詞也不足為奇了。
因?yàn)?MonoSink 是對(duì)后續(xù) Subscriber 的封裝,所以可以利用 MonoSink 向后續(xù)的 Subscriber 輸出一些東西的。在 Kotlin Coroutine 與 Spring Reactor 整合的過程中,Kotlin Coroutine 將開啟一個(gè) Coroutine,并將執(zhí)行結(jié)果通過 MonoSink 輸出給 Subscriber。
在 Spring WebFlux 應(yīng)用中,Subscriber 會(huì)將 Mono(或 Flux)以 HTTP 數(shù)據(jù)的形式輸出。
這樣就完成了 Kotlin Coroutine 向 Mono 轉(zhuǎn)換的主要工作。更多細(xì)節(jié)將在下面的內(nèi)容介紹。
四、整合的兩個(gè)關(guān)鍵點(diǎn)
接下來將向大家介紹 Kotlin Coroutine 與 Spring Reactor整合的兩個(gè)關(guān)鍵點(diǎn):mono 方法和 await 系列方法。
1. mono 方法
mono 方法連接了 Spring Reactor 環(huán)境與 Kotlin Coroutine 環(huán)境,可以看做是一個(gè)將 Kotlin Coroutine 裝換為 Spring Reactor Mono 的工廠方法。我們先來看 mono 方法的源碼:
fun <T> mono(
context: CoroutineContext = DefaultDispatcher,
parent: Job? = null,
block: suspend CoroutineScope.() -> T?
): Mono<T> = Mono.create { sink ->
// 創(chuàng)建一個(gè)新的 Coroutine Context
val newContext = newCoroutineContext(context, parent)
// 創(chuàng)建一個(gè)新的 MonoCoroutine,MonoCoroutine 會(huì)實(shí)現(xiàn) Disposable 接口
val coroutine = MonoCoroutine(newContext, sink)
sink.onDispose(coroutine)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}
mono 方法最主要的部分都集中在對(duì) Mono.create 方法的調(diào)用。這也是為什么在前面的部分著重介紹 Mono.create 方法和 MonoSink 接口的原因。
Mono.create 會(huì)創(chuàng)建出一個(gè) Mono 對(duì)象,當(dāng)這個(gè) Mono 對(duì)象的 subscribe 方法被執(zhí)行的時(shí)候。傳入 Mono.create 的 Consumer 就會(huì)被調(diào)用。此時(shí)下面的代碼就會(huì)被執(zhí)行:
{ sink ->
// 創(chuàng)建一個(gè)新的 Coroutine Context
val newContext = newCoroutineContext(context, parent)
// 創(chuàng)建一個(gè)新的 MonoCoroutine,MonoCoroutine 會(huì)實(shí)現(xiàn) Disposable 接口,用于關(guān)閉和異常處理。
val coroutine = MonoCoroutine(newContext, sink)
sink.onDispose(coroutine)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}
上面這段代碼就是 Consumer 的 Lambda 形式。
最主要的部分是下面兩行代碼:
val coroutine = MonoCoroutine(newContext, sink)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
首先,一個(gè) Coroutine —— MonoCoroutine 被創(chuàng)建。同時(shí),sink 對(duì)象被傳入到這個(gè) MonoCoroutine 中。
接下來,這個(gè) Coroutine 被執(zhí)行,接下來的重點(diǎn)變成了 MonoCoroutine:
private class MonoCoroutine<in T>(
parentContext: CoroutineContext,
private val sink: MonoSink<T>
) : AbstractCoroutine<T>(parentContext, true), Disposable {
var disposed = false
override fun onCompleted(value: T) {
if (!disposed) {
if (value == null) sink.success() else sink.success(value)
}
}
override fun onCompletedExceptionally(exception: Throwable) {
if (!disposed) sink.error(exception)
}
override fun dispose() {
disposed = true
cancel(cause = null)
}
override fun isDisposed(): Boolean = disposed
}
從上面的代碼可以看到,MonoCoroutine 繼承了 AbstractCoroutine,同時(shí)構(gòu)造函數(shù)入?yún)魅肓?MonoSink。
MonoCoroutine 實(shí)現(xiàn)了兩個(gè)在 AbstractCoroutine 聲明的重要方法:
onCompletedonCompletedExceptionally
這兩個(gè)方法都是回調(diào)方法。從上面的代碼可以明顯看出。當(dāng) MonoCoroutine 在執(zhí)行完畢之后,即這兩個(gè)回調(diào)方法被調(diào)用時(shí),會(huì)通過調(diào)用 MonoSink 將結(jié)果輸出給 Subscriber。從而完成 Kotlin Coroutine 向 Mono 的轉(zhuǎn)換工作。
2. await 系列方法
接下來要介紹的是一系列以 await 開頭的方法,比如示例中的 awaitFirstOrDefault、awaitSingle,等等。這些方法定義在 kotlinx-coroutines-reactive 模塊中的 Await.kt 文件。
這些方法都是 suspending 方法,能夠用命令式的代碼風(fēng)格獲取 Publisher (Mono 或 Flux)中的結(jié)果。
因?yàn)樯弦黄恼隆禟otlin Coroutine 原理解析》已經(jīng)介紹了 suspending 方法的工作原理,所以這里就不重復(fù)了。本文只介紹 Kotlin Coroutine 是如何與 Reactive Streams 中的 Publisher 接口整合的。
await 系列方法可以看作是將 Mono 或 Flux 轉(zhuǎn)換為 Coroutine 的方法。這些方法真正的實(shí)現(xiàn)集中在了 awaitOne 方法中。接下來我們看看 awaitOne 方法的源代碼:
private suspend fun <T> Publisher<T>.awaitOne(
mode: Mode,
default: T? = null
): T = suspendCancellableCoroutine { cont ->
subscribe(object : Subscriber<T> {
private lateinit var subscription: Subscription
private var value: T? = null
private var seenValue = false
override fun onSubscribe(sub: Subscription) {
subscription = sub
cont.invokeOnCompletion { sub.cancel() }
sub.request(if (mode == Mode.FIRST) 1 else Long.MAX_VALUE)
}
override fun onNext(t: T) {
when (mode) {
Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
if (!seenValue) {
seenValue = true
cont.resume(t)
subscription.cancel()
}
}
Mode.LAST, Mode.SINGLE -> {
if (mode == Mode.SINGLE && seenValue) {
if (cont.isActive)
cont.resumeWithException(IllegalArgumentException("More that one onNext value for $mode"))
subscription.cancel()
} else {
value = t
seenValue = true
}
}
}
}
@Suppress("UNCHECKED_CAST")
override fun onComplete() {
if (seenValue) {
if (cont.isActive) cont.resume(value as T)
return
}
when {
mode == Mode.FIRST_OR_DEFAULT -> {
cont.resume(default as T)
}
cont.isActive -> {
cont.resumeWithException(NoSuchElementException("No value received via onNext for $mode"))
}
}
}
override fun onError(e: Throwable) {
cont.resumeWithException(e)
}
})
}
逐行解釋一下關(guān)鍵代碼:
第4行:調(diào)用 suspendCancellableCoroutine 方法
這個(gè)方法在上一篇文章中解釋過了。它是一個(gè)特殊的 suspending 方法。不同于普通的 suspending 方法,通過這個(gè)方法,開發(fā)人員可以獲得 Continuation 引用,用來與第三方技術(shù)進(jìn)行集成。
第5行:調(diào)用 Publisher 的 subscribe 方法
因?yàn)?awaitOne 方法的定義是使用 Kotlin 擴(kuò)展方法語法添加進(jìn)了 Publisher 及其所有的子類,所以,可以在 awaitOne 方法中調(diào)用 Publisher 中的方法。
同時(shí),調(diào)用 subscribe 方法的時(shí)候傳入一個(gè) Subscriber 接口的匿名內(nèi)部類。后面大部分代碼都是關(guān)于如何實(shí)現(xiàn)這個(gè)匿名內(nèi)部類的。
在介紹這個(gè) Subscriber 匿名內(nèi)部類之前,需要先說明,對(duì) subscribe 方法的調(diào)用會(huì)出發(fā) Publisher 的執(zhí)行。這也就是說當(dāng)你調(diào)用一個(gè)如 aMono.awaitSingle() 方法的時(shí)候,就會(huì)使 aMono 開始執(zhí)行。
但是,結(jié)合上面一節(jié)的內(nèi)容,這一切的發(fā)生,需要當(dāng) Spring WebFlux 去調(diào)用如本例中 getNumberOfMessages 所返回的 Mono 的 subscribe 方法才會(huì)開始。所以,整個(gè)執(zhí)行過程和一個(gè)普通的 Spring WebFlux 方法并沒有大的區(qū)別。
第21行:調(diào)用 cont.resume(t)
這行調(diào)用發(fā)生在 Subscriber 的 onNext 方法中。在反應(yīng)式編程體系中,Publisher 的每個(gè)結(jié)果都會(huì)通過回調(diào) onNext 方法通知給 Subscriber。
在 onNext 方法里面,當(dāng)獲取到 Publisher 的結(jié)果之后,需要將結(jié)果傳遞給 Continuation。方法就是通過 Continuation 的 resume 方法。通過這種方法,Publisher 的結(jié)果便傳遞給了 Kotlin Coroutine。
五、總結(jié)
從上面的內(nèi)容看,Kotlin Coroutine 與 Spring Reactor 的整合的原理并不復(fù)雜。主要是實(shí)現(xiàn)兩個(gè)方向的轉(zhuǎn)換:Kotlin Coroutine 向 Mono 的轉(zhuǎn)換和 Mono 向 Kotlin Coroutine 的轉(zhuǎn)換。
Kotlin Coroutine 向 Mono 的轉(zhuǎn)換是通過 Mono.create 方法以及 MonoSink 接口實(shí)現(xiàn)的。Kotlin Coroutine 通過 MonoSink 接口,將執(zhí)行結(jié)果輸出給 Subscriber。
Mono 向 Kotlin Coroutine 的轉(zhuǎn)換是通過使用 suspendCancellableCoroutine 方法獲取到 Continuation 引用。再通過調(diào)用 Publisher.subscribe 方法,傳入一個(gè)自定義的 Subscriber。通過 Subscriber.onNext 方法獲取到 Publisher 的執(zhí)行結(jié)果,并將這個(gè)執(zhí)行結(jié)果傳遞給 Continuation。從而是 Kotlin Coroutine 獲得了 Mono 的執(zhí)行結(jié)果,完成了轉(zhuǎn)換過程。
接下來,Kotlin Coroutine 系列的文章將向大家介紹 JVM 領(lǐng)域其它的協(xié)程技術(shù)(Quasar Fiber、AliJDK 協(xié)程等)與 Kotlin Coroutine 技術(shù)的對(duì)比。
附:名詞解釋
為方便大家理解,先向大家介紹本文將會(huì)涉及的名詞及其含義:
- Reactive Streams:一個(gè)由多個(gè)技術(shù)社區(qū)共同提出的反應(yīng)式編程方面的規(guī)范
- Spring Reactor:Spring 社區(qū)提出的反應(yīng)式編程解決方案,實(shí)現(xiàn) Reactive Streams 規(guī)范。
- Spring WebFlux:Spring 5 提供了反應(yīng)式的 Web 開發(fā)解決方案,以 Spring Reactor 為基礎(chǔ)。用法同 Spring MVC 類似。
-
Publisher:在本文中指 Reactive Streams 中的一個(gè)重要接口。在其它技術(shù)中,也被稱為 Observable。Publisher 中有一個(gè)重要方法
subscribe。Subscriber 可以通過這個(gè)方法訂閱一個(gè) Publisher,并使 Publisher 開始執(zhí)行。 - Mono/Flux:Spring Reactor 中對(duì) Publisher 接口的實(shí)現(xiàn),分別代表一個(gè)元素和多個(gè)元素兩種場景。
- Continuation:異步編程中的一個(gè)概念,可以簡單理解為 Callback。在 Kotlin Coroutine 中,Continuation 也表示一個(gè)具體的回調(diào)接口。
-
Coroutine:協(xié)程。在 Kotlin 中,有很多以 Coroutine 命名的類,比如
CoroutineImpl、AbstractCoroutine??梢院唵卫斫鉃?Continuation 是一個(gè)概念、規(guī)范,而 Coroutine 是一種實(shí)現(xiàn)機(jī)制。