Kotlin Coroutines(協(xié)程) 完全解析(二),深入理解協(xié)程的掛起、恢復(fù)與調(diào)度

Kotlin Coroutines(協(xié)程) 完全解析系列:

Kotlin Coroutines(協(xié)程) 完全解析(一),協(xié)程簡(jiǎn)介

Kotlin Coroutines(協(xié)程) 完全解析(二),深入理解協(xié)程的掛起、恢復(fù)與調(diào)度

Kotlin Coroutines(協(xié)程) 完全解析(三),封裝異步回調(diào)、協(xié)程間關(guān)系及協(xié)程的取消

Kotlin Coroutines(協(xié)程) 完全解析(四),協(xié)程的異常處理

Kotlin Coroutines(協(xié)程) 完全解析(五),協(xié)程的并發(fā)

本文基于 Kotlin v1.3.0-rc-146,Kotlin-Coroutines v1.0.0-RC1

前面一篇文章協(xié)程簡(jiǎn)介,簡(jiǎn)單介紹了協(xié)程的一些基本概念以及其簡(jiǎn)化異步編程的優(yōu)勢(shì),但是協(xié)程與線(xiàn)程有什么區(qū)別,協(xié)程的掛起與恢復(fù)是如何實(shí)現(xiàn)的,還有協(xié)程運(yùn)行在哪個(gè)線(xiàn)程上,依然不是很清楚。這篇文章將分析協(xié)程的實(shí)現(xiàn)原理,一步步揭開(kāi)協(xié)程的面紗。先來(lái)看看協(xié)程中最關(guān)鍵的掛起函數(shù)的實(shí)現(xiàn)原理:

1. 掛起函數(shù)的工作原理

協(xié)程的內(nèi)部實(shí)現(xiàn)使用了 Kotlin 編譯器的一些編譯技術(shù),當(dāng)掛起函數(shù)調(diào)用時(shí),背后大致細(xì)節(jié)如下:

掛起函數(shù)或掛起 lambda 表達(dá)式調(diào)用時(shí),都有一個(gè)隱式的參數(shù)額外傳入,這個(gè)參數(shù)是Continuation類(lèi)型,封裝了協(xié)程恢復(fù)后的執(zhí)行的代碼邏輯。

用前文中的一個(gè)掛起函數(shù)為例:

suspend fun requestToken(): Token { ... }

實(shí)際上在 JVM 中更像下面這樣:

Object requestToken(Continuation<Token> cont) { ... }

Continuation的定義如下,類(lèi)似于一個(gè)通用的回調(diào)接口:

/**
 * Interface representing a continuation after a suspension point that returns value of type `T`.
 */
public interface Continuation<in T> {
    /**
     * Context of the coroutine that corresponds to this continuation.
     */
    public val context: CoroutineContext

    /**
     * Resumes the execution of the corresponding coroutine passing successful or failed [result] as the
     * return value of the last suspension point.
     */
    public fun resumeWith(result: Result<T>)
}

現(xiàn)在再看之前postItem函數(shù):

suspend fun requestToken(): Token { ... }   // 掛起函數(shù)
suspend fun createPost(token: Token, item: Item): Post { ... }  // 掛起函數(shù)
fun processPost(post: Post) { ... }

fun postItem(item: Item) {
    GlobalScope.launch {
        val token = requestToken()
        val post = createPost(token, item)
        processPost(post)
    }
}

然而,協(xié)程內(nèi)部實(shí)現(xiàn)不是使用普通回調(diào)的形式,而是使用狀態(tài)機(jī)來(lái)處理不同的掛起點(diǎn),大致的 CPS(Continuation Passing Style) 代碼為:

// 編譯后生成的內(nèi)部類(lèi)大致如下
final class postItem$1 extends SuspendLambda ... {
    public final Object invokeSuspend(Object result) {
        ...
        switch (this.label) {
            case 0:
                this.label = 1;
                token = requestToken(this)
                break;
            case 1:
                this.label = 2;
                Token token = result;
                post = createPost(token, this.item, this)
                break;
            case 2:
                Post post = result;
                processPost(post)
                break;
        }
    }
}

上面代碼中每一個(gè)掛起點(diǎn)和初始掛起點(diǎn)對(duì)應(yīng)的 Continuation 都會(huì)轉(zhuǎn)化為一種狀態(tài),協(xié)程恢復(fù)只是跳轉(zhuǎn)到下一種狀態(tài)中。掛起函數(shù)將執(zhí)行過(guò)程分為多個(gè) Continuation 片段,并且利用狀態(tài)機(jī)的方式保證各個(gè)片段是順序執(zhí)行的。

coroutine-continuation.png

1.1 掛起函數(shù)可能會(huì)掛起協(xié)程

掛起函數(shù)使用 CPS style 的代碼來(lái)掛起協(xié)程,保證掛起點(diǎn)后面的代碼只能在掛起函數(shù)執(zhí)行完后才能執(zhí)行,所以?huà)炱鸷瘮?shù)保證了協(xié)程內(nèi)的順序執(zhí)行順序。

在多個(gè)協(xié)程的情況下,掛起函數(shù)的作用更加明顯:

fun postItem(item: Item) {
    GlobalScope.launch {
        // async { requestToken() } 新建一個(gè)協(xié)程,可能在另一個(gè)線(xiàn)程運(yùn)行
        // 但是 await() 是掛起函數(shù),當(dāng)前協(xié)程執(zhí)行邏輯卡在第一個(gè)分支,第一種狀態(tài),當(dāng) async 的協(xié)程執(zhí)行完后恢復(fù)當(dāng)前協(xié)程,才會(huì)切換到下一個(gè)分支
        val token = async { requestToken() }.await()
        // 在第二個(gè)分支狀態(tài)中,又新建一個(gè)協(xié)程,使用 await 掛起函數(shù)將之后代碼作為 Continuation 放倒下一個(gè)分支狀態(tài),直到 async 協(xié)程執(zhí)行完
        val post = aync { createPost(token, item) }.await()
        // 最后一個(gè)分支狀態(tài),直接在當(dāng)前協(xié)程處理
        processPost(post)
    }
}

上面的例子中,await()掛起函數(shù)掛起當(dāng)前協(xié)程,直到異步協(xié)程完成執(zhí)行,但是這里并沒(méi)有阻塞線(xiàn)程,是使用狀態(tài)機(jī)的控制邏輯來(lái)實(shí)現(xiàn)。而且掛起函數(shù)可以保證掛起點(diǎn)之后的代碼一定在掛起點(diǎn)前代碼執(zhí)行完成后才會(huì)執(zhí)行,掛起函數(shù)保證順序執(zhí)行,所以異步邏輯也可以用順序的代碼順序來(lái)編寫(xiě)。

注意掛起函數(shù)不一定會(huì)掛起協(xié)程,如果相關(guān)調(diào)用的結(jié)果已經(jīng)可用,庫(kù)可以決定繼續(xù)進(jìn)行而不掛起,例如async { requestToken() }的返回值Deferred的結(jié)果已經(jīng)可用時(shí),await()掛起函數(shù)可以直接返回結(jié)果,不用再掛起協(xié)程。

1.2 掛起函數(shù)不會(huì)阻塞線(xiàn)程

掛起函數(shù)掛起協(xié)程,并不會(huì)阻塞協(xié)程所在的線(xiàn)程,例如協(xié)程的delay()掛起函數(shù)會(huì)暫停協(xié)程一定時(shí)間,并不會(huì)阻塞協(xié)程所在線(xiàn)程,但是Thread.sleep()函數(shù)會(huì)阻塞線(xiàn)程。

看下面一個(gè)例子,兩個(gè)協(xié)程運(yùn)行在同一線(xiàn)程上:

fun main(args: Array<String>) {
    // 創(chuàng)建一個(gè)單線(xiàn)程的協(xié)程調(diào)度器,下面兩個(gè)協(xié)程都運(yùn)行在這同一線(xiàn)程上
    val coroutineDispatcher = newSingleThreadContext("ctx")
    // 啟動(dòng)協(xié)程 1
    GlobalScope.launch(coroutineDispatcher) {
        println("the first coroutine")
        delay(200)
        println("the first coroutine")
    }
    // 啟動(dòng)協(xié)程 2
    GlobalScope.launch(coroutineDispatcher) {
        println("the second coroutine")
        delay(100)
        println("the second coroutine")
    }
    // 保證 main 線(xiàn)程存活,確保上面兩個(gè)協(xié)程運(yùn)行完成
    Thread.sleep(500)
}

運(yùn)行結(jié)果為:

the first coroutine
the second coroutine
the second coroutine
the first coroutine

從上面結(jié)果可以看出,當(dāng)協(xié)程 1 暫停 200 ms 時(shí),線(xiàn)程并沒(méi)有阻塞,而是執(zhí)行協(xié)程 2 的代碼,然后在 200 ms 時(shí)間到后,繼續(xù)執(zhí)行協(xié)程 1 的邏輯。所以?huà)炱鸷瘮?shù)并不會(huì)阻塞線(xiàn)程,這樣可以節(jié)省線(xiàn)程資源,協(xié)程掛起時(shí),線(xiàn)程可以繼續(xù)執(zhí)行其他邏輯。

1.3 掛起函數(shù)恢復(fù)協(xié)程后運(yùn)行在哪個(gè)線(xiàn)程

協(xié)程的所屬的線(xiàn)程調(diào)度在前一篇文章《協(xié)程簡(jiǎn)介》中有提到過(guò),主要是由協(xié)程的CoroutineDispatcher控制,CoroutineDispatcher可以指定協(xié)程運(yùn)行在某一特定線(xiàn)程上、運(yùn)作在線(xiàn)程池中或者不指定所運(yùn)行的線(xiàn)程。所以協(xié)程調(diào)度器可以分為Confined dispatcherUnconfined dispatcher,Dispatchers.Default、Dispatchers.IODispatchers.Main屬于Confined dispatcher,都指定了協(xié)程所運(yùn)行的線(xiàn)程或線(xiàn)程池,掛起函數(shù)恢復(fù)后協(xié)程也是運(yùn)行在指定的線(xiàn)程或線(xiàn)程池上的,而Dispatchers.Unconfined屬于Unconfined dispatcher,協(xié)程啟動(dòng)并運(yùn)行在 Caller Thread 上,但是只是在第一個(gè)掛起點(diǎn)之前是這樣的,掛起恢復(fù)后運(yùn)行在哪個(gè)線(xiàn)程完全由所調(diào)用的掛起函數(shù)決定。

fun main(args: Array<String>) = runBlocking<Unit> {
    launch { // 默認(rèn)繼承 parent coroutine 的 CoroutineDispatcher,指定運(yùn)行在 main 線(xiàn)程
        println("main runBlocking: I'm working in thread ${Thread.currentThread().name}")
        delay(100)
        println("main runBlocking: After delay in thread ${Thread.currentThread().name}")
    }
    launch(Dispatchers.Unconfined) {
        println("Unconfined      : I'm working in thread ${Thread.currentThread().name}")
        delay(100)
        println("Unconfined      : After delay in thread ${Thread.currentThread().name}")
    }
}

輸出如下:

Unconfined      : I'm working in thread main
main runBlocking: I'm working in thread main
Unconfined      : After delay in thread kotlinx.coroutines.DefaultExecutor
main runBlocking: After delay in thread main

上面第三行輸出,經(jīng)過(guò)delay掛起函數(shù)后,使用Dispatchers.Unconfined的協(xié)程掛起恢復(fù)后依然在delay函數(shù)使用的DefaultExecutor上。

2. 協(xié)程深入解析

上面更多地是通過(guò) demo 的方式說(shuō)明掛起函數(shù)函數(shù)的一些特性,但是協(xié)程的創(chuàng)建、啟動(dòng)、恢復(fù)、線(xiàn)程調(diào)度、協(xié)程切換是如何實(shí)現(xiàn)的呢,還是不清楚,下面結(jié)合源碼詳細(xì)地解析協(xié)程。

2.1 協(xié)程的創(chuàng)建與啟動(dòng)

先從新建一個(gè)協(xié)程開(kāi)始分析協(xié)程的創(chuàng)建,最常見(jiàn)的協(xié)程創(chuàng)建方式為CoroutineScope.launch {},關(guān)鍵源碼如下:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    ...
    coroutine.start(start, coroutine, block)
    return coroutine
}

coroutine.start(start, coroutine, block)默認(rèn)情況下會(huì)走到startCoroutineCancellable,最終會(huì)調(diào)用到createCoroutineUnintercepted

/**
 * Creates unintercepted coroutine without receiver and with result type [T].
 * This function creates a new, fresh instance of suspendable computation every time it is invoked.
 *
 * To start executing the created coroutine, invoke `resume(Unit)` on the returned [Continuation] instance.
 * The [completion] continuation is invoked when coroutine completes with result or exception.
 ...
 */
 public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
    completion: Continuation<T>
): Continuation<Unit> { ... }

重點(diǎn)注意該方法的注釋?zhuān)瑒?chuàng)建一個(gè)協(xié)程,創(chuàng)建了一個(gè)新的可掛起計(jì)算,通過(guò)調(diào)用resume(Unit)啟動(dòng)該協(xié)程。而且返回值為ContinuationContinuation提供了resumeWith恢復(fù)協(xié)程的接口,用以實(shí)現(xiàn)協(xié)程恢復(fù),Continuation封裝了協(xié)程的代碼運(yùn)行邏輯和恢復(fù)接口。

再看之前協(xié)程代碼編譯生成的內(nèi)部類(lèi)final class postItem$1 extends SuspendLambda ...,協(xié)程的計(jì)算邏輯封裝在invokeSuspend方法中,而SuspendLambda的繼承關(guān)系為 SuspendLambda -> ContinuationImpl -> BaseContinuationImpl -> Continuation,其中BaseContinuationImpl 部分關(guān)鍵源碼如下:

internal abstract class BaseContinuationImpl(...) {
    // 實(shí)現(xiàn) Continuation 的 resumeWith,并且是 final 的,不可被重寫(xiě)
    public final override fun resumeWith(result: Result<Any?>) {
        ...
        val outcome = invokeSuspend(param)
        ...
    }

    // 由編譯生成的協(xié)程相關(guān)類(lèi)來(lái)實(shí)現(xiàn),例如 postItem$1
    protected abstract fun invokeSuspend(result: Result<Any?>): Any?
}

而這部分與之前的分析也是吻合的,啟動(dòng)協(xié)程流程是resume(Unit)->resumeWith()->invokeSuspend(),協(xié)程的掛起通過(guò)suspend掛起函數(shù)實(shí)現(xiàn),協(xié)程的恢復(fù)通過(guò)Continuation.resumeWith實(shí)現(xiàn)。

2.2 協(xié)程的線(xiàn)程調(diào)度

協(xié)程的線(xiàn)程調(diào)度是通過(guò)攔截器實(shí)現(xiàn)的,前面提到了協(xié)程啟動(dòng)調(diào)用到了startCoroutineCancellable,該方法實(shí)現(xiàn)為:

internal fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) =
    createCoroutineUnintercepted(completion).intercepted().resumeCancellable(Unit)
// createCoroutineUnintercepted(completion) 會(huì)創(chuàng)建一個(gè)新的協(xié)程,返回值類(lèi)型為 Continuation
// intercepted() 是給 Continuation 加上 ContinuationInterceptor 攔截器,也是線(xiàn)程調(diào)度的關(guān)鍵
// resumeCancellable(Unit) 最終將調(diào)用 resume(Unit) 啟動(dòng)協(xié)程

再看intercepted()的具體實(shí)現(xiàn):

public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
    (this as? ContinuationImpl)?.intercepted() ?: this
// ContinuationImpl 是 SuspendLambda 的父類(lèi)

internal abstract class ContinuationImpl(...) : BaseContinuationImpl(completion) {
    @Transient
    private var intercepted: Continuation<Any?>? = null

    public fun intercepted(): Continuation<Any?> =
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }
    // intercepted() 方法關(guān)鍵是 context[ContinuationInterceptor]?.interceptContinuation(this)
    // context[ContinuationInterceptor] 就是協(xié)程的 CoroutineDispatcher
}

public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
    /**
     * Returns continuation that wraps the original [continuation], thus intercepting all resumptions.
     */
    public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        DispatchedContinuation(this, continuation)
}

所以intercepted()最終會(huì)使用協(xié)程的CoroutineDispatcherinterceptContinuation方法包裝原來(lái)的 Continuation,攔截所有的協(xié)程運(yùn)行操作。

DispatchedContinuation攔截了協(xié)程的啟動(dòng)和恢復(fù),分別是resumeCancellable(Unit)和重寫(xiě)的resumeWith(Result)

internal class DispatchedContinuation<in T>(
    @JvmField val dispatcher: CoroutineDispatcher,
    @JvmField val continuation: Continuation<T>
) : Continuation<T> by continuation, DispatchedTask<T> {
    inline fun resumeCancellable(value: T) {
        // 判斷是否需要線(xiàn)程調(diào)度
        if (dispatcher.isDispatchNeeded(context)) {
            ...
            // 將協(xié)程的運(yùn)算分發(fā)到另一個(gè)線(xiàn)程
            dispatcher.dispatch(context, this)
        } else {
            ...
            // 如果不需要調(diào)度,直接在當(dāng)前線(xiàn)程執(zhí)行協(xié)程運(yùn)算
            resumeUndispatched(value)
        }
    }

    override fun resumeWith(result: Result<T>) {
        // 判斷是否需要線(xiàn)程調(diào)度
        if (dispatcher.isDispatchNeeded(context)) {
            ...
            // 將協(xié)程的運(yùn)算分發(fā)到另一個(gè)線(xiàn)程
            dispatcher.dispatch(context, this)
        } else {
            ...
            // 如果不需要調(diào)度,直接在當(dāng)前線(xiàn)程執(zhí)行協(xié)程運(yùn)算
            continuation.resumeWith(result)
        }
    }
}

internal interface DispatchedTask<in T> : Runnable {
    public override fun run() {
        ...
        // 封裝了 continuation.resume 邏輯
    }
}

繼續(xù)跟蹤newSingleThreadContext()、Dispatchers.IOdispatch方法的實(shí)現(xiàn),發(fā)現(xiàn)其實(shí)都調(diào)用了Executor.execute(Runnable)方法,而Dispatchers.Unconfined的實(shí)現(xiàn)更簡(jiǎn)單,關(guān)鍵在于isDispatchNeeded()返回為false。

2.3 協(xié)程的掛起和恢復(fù)

Kotlin 編譯器會(huì)生成繼承自SuspendLambda的子類(lèi),協(xié)程的真正運(yùn)算邏輯都在invokeSuspend中。但是協(xié)程掛起的具體實(shí)現(xiàn)是如何呢?先看下面示例代碼:

fun main(args: Array<String>) = runBlocking<Unit> { // 新建并啟動(dòng) blocking 協(xié)程,運(yùn)行在 main 線(xiàn)程上,等待所有子協(xié)程運(yùn)行完成后才會(huì)結(jié)束
    launch(Dispatchers.Unconfined) { // 新建并啟動(dòng) launch 協(xié)程,沒(méi)有指定所運(yùn)行線(xiàn)程,一開(kāi)始運(yùn)行在調(diào)用者所在的 main 線(xiàn)程上
        println("${Thread.currentThread().name} : launch start")
        async(Dispatchers.Default) { // 新建并啟動(dòng) async 協(xié)程,運(yùn)行在 Dispatchers.Default 的線(xiàn)程池中
            println("${Thread.currentThread().name} : async start")
            delay(100)  // 掛起 async 協(xié)程 100 ms
            println("${Thread.currentThread().name} : async end")
        }.await() // 掛起 launch 協(xié)程,直到 async 協(xié)程結(jié)束
        println("${Thread.currentThread().name} : launch end")
    }
}

其中 launch 協(xié)程編譯生成的 SuspendLambda 子類(lèi)的invokeSuspend方法如下:

public final Object invokeSuspend(@NotNull Object result) {
    Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();
    switch (this.label) {
        case 0:
            ...
            System.out.println(stringBuilder.append(currentThread.getName()).append(" : launch start").toString());
            // 新建并啟動(dòng) async 協(xié)程 
            Deferred async$default = BuildersKt.async$default(coroutineScope, (CoroutineContext) Dispatchers.getDefault(), null, (Function2) new 1(null), 2, null);
            this.label = 1;
            // 調(diào)用 await() 掛起函數(shù)
            if (async$default.await(this) == coroutine_suspended) {
                return coroutine_suspended;
            }
            break;
        case 1:
            if (result instanceof Failure) {
                throw ((Failure) result).exception;
            }
            // 恢復(fù)協(xié)程后再執(zhí)行一次 resumeWith(),然后無(wú)異常的話(huà)執(zhí)行最后的 println()
            break;
        default:
            throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
    }
    ...
    System.out.println(stringBuilder2.append(currentThread2.getName()).append(" : launch end").toString());
    return Unit.INSTANCE;
}

上面代碼中 launch 協(xié)程掛起的關(guān)鍵在于async$default.await(this) == coroutine_suspended,如果此時(shí) async 線(xiàn)程未執(zhí)行完成,await()返回為IntrinsicsKt.getCOROUTINE_SUSPENDED(),就會(huì) return,launch 協(xié)程的invokeSuspend方法執(zhí)行完成,協(xié)程所在線(xiàn)程繼續(xù)往下運(yùn)行,此時(shí) launch 線(xiàn)程處于掛起狀態(tài)。所以協(xié)程掛起就是協(xié)程掛起點(diǎn)之前邏輯執(zhí)行完成,協(xié)程的運(yùn)算關(guān)鍵方法resumeWith()執(zhí)行完成,線(xiàn)程繼續(xù)執(zhí)行往下執(zhí)行其他邏輯。

協(xié)程掛起有三點(diǎn)需要注意的:

  • 啟動(dòng)其他協(xié)程并不會(huì)掛起當(dāng)前協(xié)程,所以launchasync啟動(dòng)線(xiàn)程時(shí),除非新協(xié)程運(yùn)行在當(dāng)前線(xiàn)程,則當(dāng)前協(xié)程只能在新協(xié)程運(yùn)行完成后繼續(xù)執(zhí)行,否則當(dāng)前協(xié)程都會(huì)馬上繼續(xù)運(yùn)行。

  • 協(xié)程掛起并不會(huì)阻塞線(xiàn)程,因?yàn)閰f(xié)程掛起時(shí)相當(dāng)于執(zhí)行完協(xié)程的方法,線(xiàn)程繼續(xù)執(zhí)行其他之后的邏輯。

  • 掛起函數(shù)并一定都會(huì)掛起協(xié)程,例如await()掛起函數(shù)如果返回值不等于IntrinsicsKt.getCOROUTINE_SUSPENDED(),則協(xié)程繼續(xù)執(zhí)行掛起點(diǎn)之后邏輯。

下面繼續(xù)分析await()的實(shí)現(xiàn)原理,它的實(shí)現(xiàn)中關(guān)鍵是調(diào)用了JobSupport.awaitSuspend()方法:

private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont ->
    /*
        * Custom code here, so that parent coroutine that is using await
        * on its child deferred (async) coroutine would throw the exception that this child had
        * thrown and not a JobCancellationException.
        */
    val cont = AwaitContinuation(uCont.intercepted(), this)
    cont.initCancellability()
    invokeOnCompletion(ResumeAwaitOnCompletion(this, cont).asHandler)
    cont.getResult()
}

private class ResumeAwaitOnCompletion<T>(
    job: JobSupport,
    private val continuation: AbstractContinuation<T>
) : JobNode<JobSupport>(job) {
    override fun invoke(cause: Throwable?) {
        val state = job.state
        check(state !is Incomplete)
        if (state is CompletedExceptionally) {
            // Resume with exception in atomic way to preserve exception
            continuation.resumeWithExceptionMode(state.cause, MODE_ATOMIC_DEFAULT)
        } else {
            // Resuming with value in a cancellable way (AwaitContinuation is configured for this mode).
            @Suppress("UNCHECKED_CAST")
            continuation.resume(state as T)
        }
    }
    override fun toString() = "ResumeAwaitOnCompletion[$continuation]"
}

上面源碼中ResumeAwaitOnCompletioninvoke方法的邏輯就是調(diào)用continuation.resume(state as T)恢復(fù)協(xié)程。invokeOnCompletion函數(shù)里面是如何實(shí)現(xiàn) async 協(xié)程完成后自動(dòng)恢復(fù)之前協(xié)程的呢,源碼實(shí)現(xiàn)有些復(fù)雜,因?yàn)楹芏噙吔缜闆r處理就不全部展開(kāi),其中最關(guān)鍵的邏輯如下:

// handler 就是 ResumeAwaitOnCompletion 的實(shí)例,將 handler 作為節(jié)點(diǎn)
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
// 將 node 節(jié)點(diǎn)添加到 state.list 中
if (!addLastAtomic(state, list, node)) return@loopOnState // retry

接下來(lái)我斷點(diǎn)調(diào)試 launch 協(xié)程恢復(fù)的過(guò)程,從 async 協(xié)程的SuspendLambda的子類(lèi)的completion.resumeWith(outcome) -> AbstractCoroutine.resumeWith(result) ..-> JobSupport.tryFinalizeSimpleState() -> JobSupport.completeStateFinalization() -> state.list?.notifyCompletion(cause) -> node.invoke,最后 handler 節(jié)點(diǎn)里面通過(guò)調(diào)用resume(result)恢復(fù)協(xié)程。

所以await()掛起函數(shù)恢復(fù)協(xié)程的原理是,將 launch 協(xié)程封裝為 ResumeAwaitOnCompletion 作為 handler 節(jié)點(diǎn)添加到 aynsc 協(xié)程的 state.list,然后在 async 協(xié)程完成時(shí)會(huì)通知 handler 節(jié)點(diǎn)調(diào)用 launch 協(xié)程的 resume(result) 方法將結(jié)果傳給 launch 協(xié)程,并恢復(fù) launch 協(xié)程繼續(xù)執(zhí)行 await 掛起點(diǎn)之后的邏輯。

而這過(guò)程中有兩個(gè)finalresumeWith 方法,一個(gè)是SuspendLambda的父類(lèi)BaseContinuationImpl的,我們?cè)賮?lái)詳細(xì)分析一篇:

internal abstract class BaseContinuationImpl(
    public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
    public final override fun resumeWith(result: Result<Any?>) {
        ...
        var param = result
        while (true) {
            with(current) {
                val completion = completion!!
                val outcome: Result<Any?> =
                    try {
                        // 調(diào)用 invokeSuspend 方法執(zhí)行,執(zhí)行協(xié)程的真正運(yùn)算邏輯
                        val outcome = invokeSuspend(param)
                        // 協(xié)程掛起時(shí) invokeSuspend 才會(huì)返回 COROUTINE_SUSPENDED,所以協(xié)程掛起時(shí),其實(shí)只是協(xié)程的 resumeWith 運(yùn)行邏輯執(zhí)行完成,再次調(diào)用 resumeWith 時(shí),協(xié)程掛起點(diǎn)之后的邏輯才能繼續(xù)執(zhí)行
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                // 這里可以看出 Continuation 其實(shí)分為兩類(lèi),一種是 BaseContinuationImpl,封裝了協(xié)程的真正運(yùn)算邏輯
                if (completion is BaseContinuationImpl) {
                    // unrolling recursion via loop
                    current = completion
                    param = outcome
                } else {
                    // 斷點(diǎn)時(shí)發(fā)現(xiàn) completion 是 DeferredCoroutine 實(shí)例,這里實(shí)際調(diào)用的是其父類(lèi) AbstractCoroutine 的 resumeWith 方法
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }
}

接下來(lái)再來(lái)看另外一類(lèi) Continuation,AbstractCoroutine 的resumeWith實(shí)現(xiàn):

public abstract class AbstractCoroutine<in T>(
    @JvmField
    protected val parentContext: CoroutineContext,
    active: Boolean = true
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
    /**
     * Completes execution of this with coroutine with the specified result.
     */
    public final override fun resumeWith(result: Result<T>) {
        // makeCompletingOnce 大致實(shí)現(xiàn)是修改協(xié)程狀態(tài),如果需要的話(huà)還會(huì)將結(jié)果返回給調(diào)用者協(xié)程,并恢復(fù)調(diào)用者協(xié)程
        makeCompletingOnce(result.toState(), defaultResumeMode)
    }
}

所以其中一類(lèi) Continuation BaseContinuationImplresumeWith封裝了協(xié)程的運(yùn)算邏輯,用以協(xié)程的啟動(dòng)和恢復(fù);而另一類(lèi) Continuation AbstractCoroutine,主要是負(fù)責(zé)維護(hù)協(xié)程的狀態(tài)和管理,它的resumeWith則是完成協(xié)程,恢復(fù)調(diào)用者協(xié)程。

2.4 協(xié)程的三層包裝

通過(guò)一步步的分析,慢慢發(fā)現(xiàn)協(xié)程其實(shí)有三層包裝。常用的launchasync返回的Job、Deferred,里面封裝了協(xié)程狀態(tài),提供了取消協(xié)程接口,而它們的實(shí)例都是繼承自AbstractCoroutine,它是協(xié)程的第一層包裝。第二層包裝是編譯器生成的SuspendLambda的子類(lèi),封裝了協(xié)程的真正運(yùn)算邏輯,繼承自BaseContinuationImpl,其中completion屬性就是協(xié)程的第一層包裝。第三層包裝是前面分析協(xié)程的線(xiàn)程調(diào)度時(shí)提到的DispatchedContinuation,封裝了線(xiàn)程調(diào)度邏輯,包含了協(xié)程的第二層包裝。三層包裝都實(shí)現(xiàn)了Continuation接口,通過(guò)代理模式將協(xié)程的各層包裝組合在一起,每層負(fù)責(zé)不同的功能。

下面是協(xié)程運(yùn)行的流程圖:

coroutine_flow.jpg

3. 小結(jié)

經(jīng)過(guò)以上解析之后,再來(lái)看協(xié)程就是一段可以?huà)炱鸷突謴?fù)執(zhí)行的運(yùn)算邏輯,而協(xié)程的掛起是通過(guò)掛起函數(shù)實(shí)現(xiàn)的,掛起函數(shù)用狀態(tài)機(jī)的方式用掛起點(diǎn)將協(xié)程的運(yùn)算邏輯拆分為不同的片段,每次運(yùn)行協(xié)程執(zhí)行的不同的邏輯片段。所以協(xié)程有兩個(gè)很大的好處:一是簡(jiǎn)化異步編程,支持異步返回;而是掛起不阻塞線(xiàn)程,提供線(xiàn)程利用率。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容