Kotlin Coroutine 是如何與 Spring WebFlux 整合的

一、前言

上篇文章介紹了 Kotlin Coroutine 的實(shí)現(xiàn)原理。因?yàn)槠蓿⑽唇榻B Kotlin Coroutine 具體是如何與其它異步編程技術(shù)整合的。本文將向大家介紹 Kotlin Coroutine 是如何與 Spring Reactor 整合。

雖然本文的標(biāo)題是關(guān)于 Kotlin Coroutine 與 Spring WebFlux 的,但其實(shí)講的是 Kotlin Coroutine 是如何與 Spring Reactor 整合的。因?yàn)?Spring Reactor 是 Spring WebFlux 的基礎(chǔ),所以不管起哪個(gè)標(biāo)題,內(nèi)容都是類似的。

因?yàn)?Spring WebFlux 這個(gè)名字更容易吸引人,所以本文便做了回標(biāo)題黨。

Kotlin Coroutine 與 Spring Reactor 的整合主要是通過 kotlinx-coroutines-reactivekotlinx-coroutines-reactor 實(shí)現(xiàn)的。本文提到的源碼都能從這兩個(gè)模塊中找到。

二、示例

本文將繼續(xù)使用《Kotlin Coroutine 初探》一文中的在 Spring WebFlux 中使用 Kotlin Coroutine 的示例:

@GetMapping("/coroutine/{personId}")
fun getNumberOfMessages(@PathVariable personId: String) = mono(Unconfined) { // ①
    val person = peopleRepository.findById(personId).awaitFirstOrDefault(null) // ②
            ?: throw NoSuchElementException("No person can be found by $personId")

    // ②
    val lastLoginDate = auditRepository.findByEmail(person.email).awaitSingle().eventDate

    val numberOfMessages =
            messageRepository.countByMessageDateGreaterThanAndEmail(lastLoginDate, person.email).awaitSingle() // ②

    "Hello ${person.name}, you have $numberOfMessages messages since $lastLoginDate"
}

本文將重點(diǎn)介紹上面代碼標(biāo)注的 ①、② 兩點(diǎn)。

第 ① 點(diǎn):mono(Unconfined) { ... } 的實(shí)現(xiàn)原理

第 ② 點(diǎn):awaitXXX 方法的實(shí)現(xiàn)原理

理解了上面兩點(diǎn),就能理解 Kotlin Coroutine 是如何與 Spring Reactor 整合的了。

三、Spring Reactor 相關(guān)知識(shí)

本節(jié)會(huì)介紹一下后面會(huì)涉及到的 Spring Reactor 的概念和實(shí)現(xiàn)細(xì)節(jié)方面的內(nèi)容。之所以將 Spring Reactor 的部分內(nèi)容單拎出來,是因?yàn)轭A(yù)先了解 Spring Reactor 的部分內(nèi)容對(duì)理解后面的內(nèi)容非常重要。

1. Publisher/Mono/Flux

Reactive Streams 是 Spring、Netflix 等公司提出的一個(gè)反應(yīng)式編程的一個(gè)規(guī)范。這個(gè)規(guī)范定義了必要的接口和對(duì)實(shí)現(xiàn)的要求。Publisher 是其中一個(gè)重要的接口:

public interface Publisher<T> {
    public void subscribe(Subscriber<? super T> s);
}

Publisher 顧名思義,發(fā)布者,等同于很多技術(shù)中的 Observable。subscribe 方法實(shí)際應(yīng)該被看做是被訂閱,即 aPublisher.subscribe(aSubscriber) 應(yīng)該理解為 aPublisheraSubscriber 訂閱。

但不同于之前同樣有訂閱機(jī)制的技術(shù),比方說一些消息隊(duì)列。之前的訂閱機(jī)制中,訂閱和消息的發(fā)布是兩個(gè)獨(dú)立的環(huán)節(jié)。而 Publisher.subscribe(Subscriber) 方法,如果被調(diào)用,則會(huì)使這個(gè) Publisher 開始發(fā)布消息。

MonoFlux 是在實(shí)現(xiàn)了 Reactive Streams 規(guī)范的 Spring Reactor 中的兩個(gè)類。這兩個(gè)類實(shí)現(xiàn)了 Publisher 接口。不同于類似的 RxJava 2,其只有一個(gè) Publisher 的實(shí)現(xiàn)類。Spring Reactor 中兩個(gè) Publisher 實(shí)現(xiàn)類,分別代表單個(gè)元素和多個(gè)元素兩種場景。

在使用了 Spring WebFlux(基于 Spring Reactor 的新一代 Web 框架)的項(xiàng)目中,一個(gè)被 @RequestMapping 標(biāo)注的方法需要返回 MonoFlux。然后,當(dāng)一個(gè)請(qǐng)求根據(jù)映射配置被轉(zhuǎn)發(fā)到這個(gè)方法上時(shí),一個(gè) MonoFlux 對(duì)象會(huì)根據(jù)這個(gè)方法的定義創(chuàng)建出一個(gè) Mono。但這時(shí)真正的請(qǐng)求處理并未開始,方法返回的只是一個(gè)處理的步驟定義。

當(dāng) Spring WebFlux 框架得到這個(gè)方法返回的 MonoFlux 之后,會(huì)調(diào)用它們的 subscribe(Subcriber) 方法。此時(shí),真正的請(qǐng)求處理便開始了。

后面的內(nèi)容為了簡便,會(huì)省略 Flux,僅會(huì)提到 Mono,但兩者的原理基本類似。

2. Mono.create(Consumer<MonoSink<T>>) 方法

Spring WebFlux 和傳統(tǒng) Spring MVC 最大的不同就是要求方法返回 MonoFlux。當(dāng) Spring WebFlux 與 Kotlin Coroutine 整合后,我們需要將 Coroutine 轉(zhuǎn)換成一個(gè) Mono(或者 Flux,后面將省略 Flux)。

如何做呢?Kotlin Coroutine 使用的是 Mono.create(Consumer<MonoSink<T>>) 方法。

從方法簽名看,Mono.create 方法涉及到最主要的接口是 MonoSink(此處不解釋 Consumer 接口)。

MonoSink 是什么呢?其 API 文檔是這么解釋的:

Wrapper API around an actual downstream Subscriber for emitting nothing, a single value or an error (mutually exclusive).

簡單理解就是對(duì)后續(xù) Subscriber 的封裝。

可能有些同學(xué)對(duì) Sink 這個(gè)詞有些陌生,我起初也是這種感覺。但是對(duì)于一些做過流處理相關(guān)開發(fā)的同學(xué),這個(gè)詞應(yīng)該不陌生。原因是 Sink 這個(gè)詞經(jīng)常出現(xiàn)在流處理相關(guān)技術(shù)中(比如 Flink、Flume)。在 Spring Cloud Stream 中,也能看到這個(gè)詞。同樣,Spring Reactor 是對(duì) Reactive Streams 規(guī)范的實(shí)現(xiàn),也可以看做是另一種形式的流技術(shù),所以,出現(xiàn) Sink 這個(gè)詞也不足為奇了。

因?yàn)?MonoSink 是對(duì)后續(xù) Subscriber 的封裝,所以可以利用 MonoSink 向后續(xù)的 Subscriber 輸出一些東西的。在 Kotlin Coroutine 與 Spring Reactor 整合的過程中,Kotlin Coroutine 將開啟一個(gè) Coroutine,并將執(zhí)行結(jié)果通過 MonoSink 輸出給 Subscriber。

在 Spring WebFlux 應(yīng)用中,Subscriber 會(huì)將 Mono(或 Flux)以 HTTP 數(shù)據(jù)的形式輸出。

這樣就完成了 Kotlin Coroutine 向 Mono 轉(zhuǎn)換的主要工作。更多細(xì)節(jié)將在下面的內(nèi)容介紹。

四、整合的兩個(gè)關(guān)鍵點(diǎn)

接下來將向大家介紹 Kotlin Coroutine 與 Spring Reactor整合的兩個(gè)關(guān)鍵點(diǎn):mono 方法和 await 系列方法。

1. mono 方法

mono 方法連接了 Spring Reactor 環(huán)境與 Kotlin Coroutine 環(huán)境,可以看做是一個(gè)將 Kotlin Coroutine 裝換為 Spring Reactor Mono 的工廠方法。我們先來看 mono 方法的源碼:

fun <T> mono(
    context: CoroutineContext = DefaultDispatcher,
    parent: Job? = null,
    block: suspend CoroutineScope.() -> T?
): Mono<T> = Mono.create { sink ->
    // 創(chuàng)建一個(gè)新的 Coroutine Context
    val newContext = newCoroutineContext(context, parent)
    // 創(chuàng)建一個(gè)新的 MonoCoroutine,MonoCoroutine 會(huì)實(shí)現(xiàn) Disposable 接口
    val coroutine = MonoCoroutine(newContext, sink)
    sink.onDispose(coroutine)
    coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}

mono 方法最主要的部分都集中在對(duì) Mono.create 方法的調(diào)用。這也是為什么在前面的部分著重介紹 Mono.create 方法和 MonoSink 接口的原因。

Mono.create 會(huì)創(chuàng)建出一個(gè) Mono 對(duì)象,當(dāng)這個(gè) Mono 對(duì)象的 subscribe 方法被執(zhí)行的時(shí)候。傳入 Mono.createConsumer 就會(huì)被調(diào)用。此時(shí)下面的代碼就會(huì)被執(zhí)行:

{ sink ->
    // 創(chuàng)建一個(gè)新的 Coroutine Context
    val newContext = newCoroutineContext(context, parent)
    // 創(chuàng)建一個(gè)新的 MonoCoroutine,MonoCoroutine 會(huì)實(shí)現(xiàn) Disposable 接口,用于關(guān)閉和異常處理。
    val coroutine = MonoCoroutine(newContext, sink)
    sink.onDispose(coroutine)
    coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
}

上面這段代碼就是 Consumer 的 Lambda 形式。

最主要的部分是下面兩行代碼:

val coroutine = MonoCoroutine(newContext, sink)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)

首先,一個(gè) Coroutine —— MonoCoroutine 被創(chuàng)建。同時(shí),sink 對(duì)象被傳入到這個(gè) MonoCoroutine 中。

接下來,這個(gè) Coroutine 被執(zhí)行,接下來的重點(diǎn)變成了 MonoCoroutine

private class MonoCoroutine<in T>(
    parentContext: CoroutineContext,
    private val sink: MonoSink<T>
) : AbstractCoroutine<T>(parentContext, true), Disposable {
    var disposed = false

    override fun onCompleted(value: T) {
        if (!disposed) {
            if (value == null) sink.success() else sink.success(value)
        }
    }

    override fun onCompletedExceptionally(exception: Throwable) {
        if (!disposed) sink.error(exception)
    }
    
    override fun dispose() {
        disposed = true
        cancel(cause = null)
    }

    override fun isDisposed(): Boolean = disposed
}

從上面的代碼可以看到,MonoCoroutine 繼承了 AbstractCoroutine,同時(shí)構(gòu)造函數(shù)入?yún)魅肓?MonoSink

MonoCoroutine 實(shí)現(xiàn)了兩個(gè)在 AbstractCoroutine 聲明的重要方法:

  • onCompleted
  • onCompletedExceptionally

這兩個(gè)方法都是回調(diào)方法。從上面的代碼可以明顯看出。當(dāng) MonoCoroutine 在執(zhí)行完畢之后,即這兩個(gè)回調(diào)方法被調(diào)用時(shí),會(huì)通過調(diào)用 MonoSink 將結(jié)果輸出給 Subscriber。從而完成 Kotlin Coroutine 向 Mono 的轉(zhuǎn)換工作。

2. await 系列方法

接下來要介紹的是一系列以 await 開頭的方法,比如示例中的 awaitFirstOrDefault、awaitSingle,等等。這些方法定義在 kotlinx-coroutines-reactive 模塊中的 Await.kt 文件。

這些方法都是 suspending 方法,能夠用命令式的代碼風(fēng)格獲取 PublisherMonoFlux)中的結(jié)果。

因?yàn)樯弦黄恼隆禟otlin Coroutine 原理解析》已經(jīng)介紹了 suspending 方法的工作原理,所以這里就不重復(fù)了。本文只介紹 Kotlin Coroutine 是如何與 Reactive Streams 中的 Publisher 接口整合的。

await 系列方法可以看作是將 MonoFlux 轉(zhuǎn)換為 Coroutine 的方法。這些方法真正的實(shí)現(xiàn)集中在了 awaitOne 方法中。接下來我們看看 awaitOne 方法的源代碼:

private suspend fun <T> Publisher<T>.awaitOne(
    mode: Mode,
    default: T? = null
): T = suspendCancellableCoroutine { cont ->
    subscribe(object : Subscriber<T> {
        private lateinit var subscription: Subscription
        private var value: T? = null
        private var seenValue = false

        override fun onSubscribe(sub: Subscription) {
            subscription = sub
            cont.invokeOnCompletion { sub.cancel() }
            sub.request(if (mode == Mode.FIRST) 1 else Long.MAX_VALUE)
        }

        override fun onNext(t: T) {
            when (mode) {
                Mode.FIRST, Mode.FIRST_OR_DEFAULT -> {
                    if (!seenValue) {
                        seenValue = true
                        cont.resume(t)
                        subscription.cancel()
                    }
                }
                Mode.LAST, Mode.SINGLE -> {
                    if (mode == Mode.SINGLE && seenValue) {
                        if (cont.isActive)
                            cont.resumeWithException(IllegalArgumentException("More that one onNext value for $mode"))
                        subscription.cancel()
                    } else {
                        value = t
                        seenValue = true
                    }
                }
            }
        }

        @Suppress("UNCHECKED_CAST")
        override fun onComplete() {
            if (seenValue) {
                if (cont.isActive) cont.resume(value as T)
                return
            }
            when {
                mode == Mode.FIRST_OR_DEFAULT -> {
                    cont.resume(default as T)
                }
                cont.isActive -> {
                    cont.resumeWithException(NoSuchElementException("No value received via onNext for $mode"))
                }
            }
        }

        override fun onError(e: Throwable) {
            cont.resumeWithException(e)
        }
    })
}

逐行解釋一下關(guān)鍵代碼:

第4行:調(diào)用 suspendCancellableCoroutine 方法

這個(gè)方法在上一篇文章中解釋過了。它是一個(gè)特殊的 suspending 方法。不同于普通的 suspending 方法,通過這個(gè)方法,開發(fā)人員可以獲得 Continuation 引用,用來與第三方技術(shù)進(jìn)行集成。

第5行:調(diào)用 Publishersubscribe 方法

因?yàn)?awaitOne 方法的定義是使用 Kotlin 擴(kuò)展方法語法添加進(jìn)了 Publisher 及其所有的子類,所以,可以在 awaitOne 方法中調(diào)用 Publisher 中的方法。

同時(shí),調(diào)用 subscribe 方法的時(shí)候傳入一個(gè) Subscriber 接口的匿名內(nèi)部類。后面大部分代碼都是關(guān)于如何實(shí)現(xiàn)這個(gè)匿名內(nèi)部類的。

在介紹這個(gè) Subscriber 匿名內(nèi)部類之前,需要先說明,對(duì) subscribe 方法的調(diào)用會(huì)出發(fā) Publisher 的執(zhí)行。這也就是說當(dāng)你調(diào)用一個(gè)如 aMono.awaitSingle() 方法的時(shí)候,就會(huì)使 aMono 開始執(zhí)行。

但是,結(jié)合上面一節(jié)的內(nèi)容,這一切的發(fā)生,需要當(dāng) Spring WebFlux 去調(diào)用如本例中 getNumberOfMessages 所返回的 Monosubscribe 方法才會(huì)開始。所以,整個(gè)執(zhí)行過程和一個(gè)普通的 Spring WebFlux 方法并沒有大的區(qū)別。

第21行:調(diào)用 cont.resume(t)

這行調(diào)用發(fā)生在 SubscriberonNext 方法中。在反應(yīng)式編程體系中,Publisher 的每個(gè)結(jié)果都會(huì)通過回調(diào) onNext 方法通知給 Subscriber。

onNext 方法里面,當(dāng)獲取到 Publisher 的結(jié)果之后,需要將結(jié)果傳遞給 Continuation。方法就是通過 Continuationresume 方法。通過這種方法,Publisher 的結(jié)果便傳遞給了 Kotlin Coroutine。

五、總結(jié)

從上面的內(nèi)容看,Kotlin Coroutine 與 Spring Reactor 的整合的原理并不復(fù)雜。主要是實(shí)現(xiàn)兩個(gè)方向的轉(zhuǎn)換:Kotlin Coroutine 向 Mono 的轉(zhuǎn)換和 Mono 向 Kotlin Coroutine 的轉(zhuǎn)換。

Kotlin Coroutine 向 Mono 的轉(zhuǎn)換是通過 Mono.create 方法以及 MonoSink 接口實(shí)現(xiàn)的。Kotlin Coroutine 通過 MonoSink 接口,將執(zhí)行結(jié)果輸出給 Subscriber。

Mono 向 Kotlin Coroutine 的轉(zhuǎn)換是通過使用 suspendCancellableCoroutine 方法獲取到 Continuation 引用。再通過調(diào)用 Publisher.subscribe 方法,傳入一個(gè)自定義的 Subscriber。通過 Subscriber.onNext 方法獲取到 Publisher 的執(zhí)行結(jié)果,并將這個(gè)執(zhí)行結(jié)果傳遞給 Continuation。從而是 Kotlin Coroutine 獲得了 Mono 的執(zhí)行結(jié)果,完成了轉(zhuǎn)換過程。

接下來,Kotlin Coroutine 系列的文章將向大家介紹 JVM 領(lǐng)域其它的協(xié)程技術(shù)(Quasar Fiber、AliJDK 協(xié)程等)與 Kotlin Coroutine 技術(shù)的對(duì)比。

附:名詞解釋

為方便大家理解,先向大家介紹本文將會(huì)涉及的名詞及其含義:

  1. Reactive Streams:一個(gè)由多個(gè)技術(shù)社區(qū)共同提出的反應(yīng)式編程方面的規(guī)范
  2. Spring Reactor:Spring 社區(qū)提出的反應(yīng)式編程解決方案,實(shí)現(xiàn) Reactive Streams 規(guī)范。
  3. Spring WebFlux:Spring 5 提供了反應(yīng)式的 Web 開發(fā)解決方案,以 Spring Reactor 為基礎(chǔ)。用法同 Spring MVC 類似。
  4. Publisher:在本文中指 Reactive Streams 中的一個(gè)重要接口。在其它技術(shù)中,也被稱為 Observable。Publisher 中有一個(gè)重要方法 subscribe。Subscriber 可以通過這個(gè)方法訂閱一個(gè) Publisher,并使 Publisher 開始執(zhí)行。
  5. Mono/Flux:Spring Reactor 中對(duì) Publisher 接口的實(shí)現(xiàn),分別代表一個(gè)元素和多個(gè)元素兩種場景。
  6. Continuation:異步編程中的一個(gè)概念,可以簡單理解為 Callback。在 Kotlin Coroutine 中,Continuation 也表示一個(gè)具體的回調(diào)接口。
  7. Coroutine:協(xié)程。在 Kotlin 中,有很多以 Coroutine 命名的類,比如 CoroutineImpl、AbstractCoroutine??梢院唵卫斫鉃?Continuation 是一個(gè)概念、規(guī)范,而 Coroutine 是一種實(shí)現(xiàn)機(jī)制。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 適合閱讀的人群:本文適合對(duì) Spring、Netty 等框架,以及 Java 8 的 Lambda、Stream ...
    編走編想閱讀 51,668評(píng)論 39 116
  • 一、前言 Kotlin 是近兩年興起的一門編程語言,最近一年的發(fā)展速度很快。在2017年,Google 宣布 Ko...
    編走編想閱讀 4,181評(píng)論 5 17
  • 上一篇文章《Kotlin Coroutine 初探》向大家介紹了 Kotlin Coroutine 的由來、重要概...
    編走編想閱讀 17,849評(píng)論 25 50
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,628評(píng)論 19 139
  • Spring5現(xiàn)處在第四個(gè)預(yù)發(fā)布版,正式版將要發(fā)布了,它帶來的一大特性就是響應(yīng)式框架Spring WebFlux。...
    MrTT閱讀 26,984評(píng)論 0 21

友情鏈接更多精彩內(nèi)容