Kotlin Coroutines(協(xié)程) 完全解析系列:
Kotlin Coroutines(協(xié)程) 完全解析(一),協(xié)程簡(jiǎn)介
Kotlin Coroutines(協(xié)程) 完全解析(二),深入理解協(xié)程的掛起、恢復(fù)與調(diào)度
Kotlin Coroutines(協(xié)程) 完全解析(三),封裝異步回調(diào)、協(xié)程間關(guān)系及協(xié)程的取消
Kotlin Coroutines(協(xié)程) 完全解析(四),協(xié)程的異常處理
Kotlin Coroutines(協(xié)程) 完全解析(五),協(xié)程的并發(fā)
本文基于 Kotlin v1.3.0-rc-146,Kotlin-Coroutines v1.0.0-RC1
前面一篇文章協(xié)程簡(jiǎn)介,簡(jiǎn)單介紹了協(xié)程的一些基本概念以及其簡(jiǎn)化異步編程的優(yōu)勢(shì),但是協(xié)程與線(xiàn)程有什么區(qū)別,協(xié)程的掛起與恢復(fù)是如何實(shí)現(xiàn)的,還有協(xié)程運(yùn)行在哪個(gè)線(xiàn)程上,依然不是很清楚。這篇文章將分析協(xié)程的實(shí)現(xiàn)原理,一步步揭開(kāi)協(xié)程的面紗。先來(lái)看看協(xié)程中最關(guān)鍵的掛起函數(shù)的實(shí)現(xiàn)原理:
1. 掛起函數(shù)的工作原理
協(xié)程的內(nèi)部實(shí)現(xiàn)使用了 Kotlin 編譯器的一些編譯技術(shù),當(dāng)掛起函數(shù)調(diào)用時(shí),背后大致細(xì)節(jié)如下:
掛起函數(shù)或掛起 lambda 表達(dá)式調(diào)用時(shí),都有一個(gè)隱式的參數(shù)額外傳入,這個(gè)參數(shù)是Continuation類(lèi)型,封裝了協(xié)程恢復(fù)后的執(zhí)行的代碼邏輯。
用前文中的一個(gè)掛起函數(shù)為例:
suspend fun requestToken(): Token { ... }
實(shí)際上在 JVM 中更像下面這樣:
Object requestToken(Continuation<Token> cont) { ... }
Continuation的定義如下,類(lèi)似于一個(gè)通用的回調(diào)接口:
/**
* Interface representing a continuation after a suspension point that returns value of type `T`.
*/
public interface Continuation<in T> {
/**
* Context of the coroutine that corresponds to this continuation.
*/
public val context: CoroutineContext
/**
* Resumes the execution of the corresponding coroutine passing successful or failed [result] as the
* return value of the last suspension point.
*/
public fun resumeWith(result: Result<T>)
}
現(xiàn)在再看之前postItem函數(shù):
suspend fun requestToken(): Token { ... } // 掛起函數(shù)
suspend fun createPost(token: Token, item: Item): Post { ... } // 掛起函數(shù)
fun processPost(post: Post) { ... }
fun postItem(item: Item) {
GlobalScope.launch {
val token = requestToken()
val post = createPost(token, item)
processPost(post)
}
}
然而,協(xié)程內(nèi)部實(shí)現(xiàn)不是使用普通回調(diào)的形式,而是使用狀態(tài)機(jī)來(lái)處理不同的掛起點(diǎn),大致的 CPS(Continuation Passing Style) 代碼為:
// 編譯后生成的內(nèi)部類(lèi)大致如下
final class postItem$1 extends SuspendLambda ... {
public final Object invokeSuspend(Object result) {
...
switch (this.label) {
case 0:
this.label = 1;
token = requestToken(this)
break;
case 1:
this.label = 2;
Token token = result;
post = createPost(token, this.item, this)
break;
case 2:
Post post = result;
processPost(post)
break;
}
}
}
上面代碼中每一個(gè)掛起點(diǎn)和初始掛起點(diǎn)對(duì)應(yīng)的 Continuation 都會(huì)轉(zhuǎn)化為一種狀態(tài),協(xié)程恢復(fù)只是跳轉(zhuǎn)到下一種狀態(tài)中。掛起函數(shù)將執(zhí)行過(guò)程分為多個(gè) Continuation 片段,并且利用狀態(tài)機(jī)的方式保證各個(gè)片段是順序執(zhí)行的。

1.1 掛起函數(shù)可能會(huì)掛起協(xié)程
掛起函數(shù)使用 CPS style 的代碼來(lái)掛起協(xié)程,保證掛起點(diǎn)后面的代碼只能在掛起函數(shù)執(zhí)行完后才能執(zhí)行,所以?huà)炱鸷瘮?shù)保證了協(xié)程內(nèi)的順序執(zhí)行順序。
在多個(gè)協(xié)程的情況下,掛起函數(shù)的作用更加明顯:
fun postItem(item: Item) {
GlobalScope.launch {
// async { requestToken() } 新建一個(gè)協(xié)程,可能在另一個(gè)線(xiàn)程運(yùn)行
// 但是 await() 是掛起函數(shù),當(dāng)前協(xié)程執(zhí)行邏輯卡在第一個(gè)分支,第一種狀態(tài),當(dāng) async 的協(xié)程執(zhí)行完后恢復(fù)當(dāng)前協(xié)程,才會(huì)切換到下一個(gè)分支
val token = async { requestToken() }.await()
// 在第二個(gè)分支狀態(tài)中,又新建一個(gè)協(xié)程,使用 await 掛起函數(shù)將之后代碼作為 Continuation 放倒下一個(gè)分支狀態(tài),直到 async 協(xié)程執(zhí)行完
val post = aync { createPost(token, item) }.await()
// 最后一個(gè)分支狀態(tài),直接在當(dāng)前協(xié)程處理
processPost(post)
}
}
上面的例子中,await()掛起函數(shù)掛起當(dāng)前協(xié)程,直到異步協(xié)程完成執(zhí)行,但是這里并沒(méi)有阻塞線(xiàn)程,是使用狀態(tài)機(jī)的控制邏輯來(lái)實(shí)現(xiàn)。而且掛起函數(shù)可以保證掛起點(diǎn)之后的代碼一定在掛起點(diǎn)前代碼執(zhí)行完成后才會(huì)執(zhí)行,掛起函數(shù)保證順序執(zhí)行,所以異步邏輯也可以用順序的代碼順序來(lái)編寫(xiě)。
注意掛起函數(shù)不一定會(huì)掛起協(xié)程,如果相關(guān)調(diào)用的結(jié)果已經(jīng)可用,庫(kù)可以決定繼續(xù)進(jìn)行而不掛起,例如async { requestToken() }的返回值Deferred的結(jié)果已經(jīng)可用時(shí),await()掛起函數(shù)可以直接返回結(jié)果,不用再掛起協(xié)程。
1.2 掛起函數(shù)不會(huì)阻塞線(xiàn)程
掛起函數(shù)掛起協(xié)程,并不會(huì)阻塞協(xié)程所在的線(xiàn)程,例如協(xié)程的delay()掛起函數(shù)會(huì)暫停協(xié)程一定時(shí)間,并不會(huì)阻塞協(xié)程所在線(xiàn)程,但是Thread.sleep()函數(shù)會(huì)阻塞線(xiàn)程。
看下面一個(gè)例子,兩個(gè)協(xié)程運(yùn)行在同一線(xiàn)程上:
fun main(args: Array<String>) {
// 創(chuàng)建一個(gè)單線(xiàn)程的協(xié)程調(diào)度器,下面兩個(gè)協(xié)程都運(yùn)行在這同一線(xiàn)程上
val coroutineDispatcher = newSingleThreadContext("ctx")
// 啟動(dòng)協(xié)程 1
GlobalScope.launch(coroutineDispatcher) {
println("the first coroutine")
delay(200)
println("the first coroutine")
}
// 啟動(dòng)協(xié)程 2
GlobalScope.launch(coroutineDispatcher) {
println("the second coroutine")
delay(100)
println("the second coroutine")
}
// 保證 main 線(xiàn)程存活,確保上面兩個(gè)協(xié)程運(yùn)行完成
Thread.sleep(500)
}
運(yùn)行結(jié)果為:
the first coroutine
the second coroutine
the second coroutine
the first coroutine
從上面結(jié)果可以看出,當(dāng)協(xié)程 1 暫停 200 ms 時(shí),線(xiàn)程并沒(méi)有阻塞,而是執(zhí)行協(xié)程 2 的代碼,然后在 200 ms 時(shí)間到后,繼續(xù)執(zhí)行協(xié)程 1 的邏輯。所以?huà)炱鸷瘮?shù)并不會(huì)阻塞線(xiàn)程,這樣可以節(jié)省線(xiàn)程資源,協(xié)程掛起時(shí),線(xiàn)程可以繼續(xù)執(zhí)行其他邏輯。
1.3 掛起函數(shù)恢復(fù)協(xié)程后運(yùn)行在哪個(gè)線(xiàn)程
協(xié)程的所屬的線(xiàn)程調(diào)度在前一篇文章《協(xié)程簡(jiǎn)介》中有提到過(guò),主要是由協(xié)程的CoroutineDispatcher控制,CoroutineDispatcher可以指定協(xié)程運(yùn)行在某一特定線(xiàn)程上、運(yùn)作在線(xiàn)程池中或者不指定所運(yùn)行的線(xiàn)程。所以協(xié)程調(diào)度器可以分為Confined dispatcher和Unconfined dispatcher,Dispatchers.Default、Dispatchers.IO和Dispatchers.Main屬于Confined dispatcher,都指定了協(xié)程所運(yùn)行的線(xiàn)程或線(xiàn)程池,掛起函數(shù)恢復(fù)后協(xié)程也是運(yùn)行在指定的線(xiàn)程或線(xiàn)程池上的,而Dispatchers.Unconfined屬于Unconfined dispatcher,協(xié)程啟動(dòng)并運(yùn)行在 Caller Thread 上,但是只是在第一個(gè)掛起點(diǎn)之前是這樣的,掛起恢復(fù)后運(yùn)行在哪個(gè)線(xiàn)程完全由所調(diào)用的掛起函數(shù)決定。
fun main(args: Array<String>) = runBlocking<Unit> {
launch { // 默認(rèn)繼承 parent coroutine 的 CoroutineDispatcher,指定運(yùn)行在 main 線(xiàn)程
println("main runBlocking: I'm working in thread ${Thread.currentThread().name}")
delay(100)
println("main runBlocking: After delay in thread ${Thread.currentThread().name}")
}
launch(Dispatchers.Unconfined) {
println("Unconfined : I'm working in thread ${Thread.currentThread().name}")
delay(100)
println("Unconfined : After delay in thread ${Thread.currentThread().name}")
}
}
輸出如下:
Unconfined : I'm working in thread main
main runBlocking: I'm working in thread main
Unconfined : After delay in thread kotlinx.coroutines.DefaultExecutor
main runBlocking: After delay in thread main
上面第三行輸出,經(jīng)過(guò)delay掛起函數(shù)后,使用Dispatchers.Unconfined的協(xié)程掛起恢復(fù)后依然在delay函數(shù)使用的DefaultExecutor上。
2. 協(xié)程深入解析
上面更多地是通過(guò) demo 的方式說(shuō)明掛起函數(shù)函數(shù)的一些特性,但是協(xié)程的創(chuàng)建、啟動(dòng)、恢復(fù)、線(xiàn)程調(diào)度、協(xié)程切換是如何實(shí)現(xiàn)的呢,還是不清楚,下面結(jié)合源碼詳細(xì)地解析協(xié)程。
2.1 協(xié)程的創(chuàng)建與啟動(dòng)
先從新建一個(gè)協(xié)程開(kāi)始分析協(xié)程的創(chuàng)建,最常見(jiàn)的協(xié)程創(chuàng)建方式為CoroutineScope.launch {},關(guān)鍵源碼如下:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
...
coroutine.start(start, coroutine, block)
return coroutine
}
coroutine.start(start, coroutine, block)默認(rèn)情況下會(huì)走到startCoroutineCancellable,最終會(huì)調(diào)用到createCoroutineUnintercepted。
/**
* Creates unintercepted coroutine without receiver and with result type [T].
* This function creates a new, fresh instance of suspendable computation every time it is invoked.
*
* To start executing the created coroutine, invoke `resume(Unit)` on the returned [Continuation] instance.
* The [completion] continuation is invoked when coroutine completes with result or exception.
...
*/
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
completion: Continuation<T>
): Continuation<Unit> { ... }
重點(diǎn)注意該方法的注釋?zhuān)瑒?chuàng)建一個(gè)協(xié)程,創(chuàng)建了一個(gè)新的可掛起計(jì)算,通過(guò)調(diào)用resume(Unit)啟動(dòng)該協(xié)程。而且返回值為Continuation,Continuation提供了resumeWith恢復(fù)協(xié)程的接口,用以實(shí)現(xiàn)協(xié)程恢復(fù),Continuation封裝了協(xié)程的代碼運(yùn)行邏輯和恢復(fù)接口。
再看之前協(xié)程代碼編譯生成的內(nèi)部類(lèi)final class postItem$1 extends SuspendLambda ...,協(xié)程的計(jì)算邏輯封裝在invokeSuspend方法中,而SuspendLambda的繼承關(guān)系為 SuspendLambda -> ContinuationImpl -> BaseContinuationImpl -> Continuation,其中BaseContinuationImpl 部分關(guān)鍵源碼如下:
internal abstract class BaseContinuationImpl(...) {
// 實(shí)現(xiàn) Continuation 的 resumeWith,并且是 final 的,不可被重寫(xiě)
public final override fun resumeWith(result: Result<Any?>) {
...
val outcome = invokeSuspend(param)
...
}
// 由編譯生成的協(xié)程相關(guān)類(lèi)來(lái)實(shí)現(xiàn),例如 postItem$1
protected abstract fun invokeSuspend(result: Result<Any?>): Any?
}
而這部分與之前的分析也是吻合的,啟動(dòng)協(xié)程流程是resume(Unit)->resumeWith()->invokeSuspend(),協(xié)程的掛起通過(guò)suspend掛起函數(shù)實(shí)現(xiàn),協(xié)程的恢復(fù)通過(guò)Continuation.resumeWith實(shí)現(xiàn)。
2.2 協(xié)程的線(xiàn)程調(diào)度
協(xié)程的線(xiàn)程調(diào)度是通過(guò)攔截器實(shí)現(xiàn)的,前面提到了協(xié)程啟動(dòng)調(diào)用到了startCoroutineCancellable,該方法實(shí)現(xiàn)為:
internal fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) =
createCoroutineUnintercepted(completion).intercepted().resumeCancellable(Unit)
// createCoroutineUnintercepted(completion) 會(huì)創(chuàng)建一個(gè)新的協(xié)程,返回值類(lèi)型為 Continuation
// intercepted() 是給 Continuation 加上 ContinuationInterceptor 攔截器,也是線(xiàn)程調(diào)度的關(guān)鍵
// resumeCancellable(Unit) 最終將調(diào)用 resume(Unit) 啟動(dòng)協(xié)程
再看intercepted()的具體實(shí)現(xiàn):
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this
// ContinuationImpl 是 SuspendLambda 的父類(lèi)
internal abstract class ContinuationImpl(...) : BaseContinuationImpl(completion) {
@Transient
private var intercepted: Continuation<Any?>? = null
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
// intercepted() 方法關(guān)鍵是 context[ContinuationInterceptor]?.interceptContinuation(this)
// context[ContinuationInterceptor] 就是協(xié)程的 CoroutineDispatcher
}
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
/**
* Returns continuation that wraps the original [continuation], thus intercepting all resumptions.
*/
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
}
所以intercepted()最終會(huì)使用協(xié)程的CoroutineDispatcher的interceptContinuation方法包裝原來(lái)的 Continuation,攔截所有的協(xié)程運(yùn)行操作。
DispatchedContinuation攔截了協(xié)程的啟動(dòng)和恢復(fù),分別是resumeCancellable(Unit)和重寫(xiě)的resumeWith(Result):
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : Continuation<T> by continuation, DispatchedTask<T> {
inline fun resumeCancellable(value: T) {
// 判斷是否需要線(xiàn)程調(diào)度
if (dispatcher.isDispatchNeeded(context)) {
...
// 將協(xié)程的運(yùn)算分發(fā)到另一個(gè)線(xiàn)程
dispatcher.dispatch(context, this)
} else {
...
// 如果不需要調(diào)度,直接在當(dāng)前線(xiàn)程執(zhí)行協(xié)程運(yùn)算
resumeUndispatched(value)
}
}
override fun resumeWith(result: Result<T>) {
// 判斷是否需要線(xiàn)程調(diào)度
if (dispatcher.isDispatchNeeded(context)) {
...
// 將協(xié)程的運(yùn)算分發(fā)到另一個(gè)線(xiàn)程
dispatcher.dispatch(context, this)
} else {
...
// 如果不需要調(diào)度,直接在當(dāng)前線(xiàn)程執(zhí)行協(xié)程運(yùn)算
continuation.resumeWith(result)
}
}
}
internal interface DispatchedTask<in T> : Runnable {
public override fun run() {
...
// 封裝了 continuation.resume 邏輯
}
}
繼續(xù)跟蹤newSingleThreadContext()、Dispatchers.IO等dispatch方法的實(shí)現(xiàn),發(fā)現(xiàn)其實(shí)都調(diào)用了Executor.execute(Runnable)方法,而Dispatchers.Unconfined的實(shí)現(xiàn)更簡(jiǎn)單,關(guān)鍵在于isDispatchNeeded()返回為false。
2.3 協(xié)程的掛起和恢復(fù)
Kotlin 編譯器會(huì)生成繼承自SuspendLambda的子類(lèi),協(xié)程的真正運(yùn)算邏輯都在invokeSuspend中。但是協(xié)程掛起的具體實(shí)現(xiàn)是如何呢?先看下面示例代碼:
fun main(args: Array<String>) = runBlocking<Unit> { // 新建并啟動(dòng) blocking 協(xié)程,運(yùn)行在 main 線(xiàn)程上,等待所有子協(xié)程運(yùn)行完成后才會(huì)結(jié)束
launch(Dispatchers.Unconfined) { // 新建并啟動(dòng) launch 協(xié)程,沒(méi)有指定所運(yùn)行線(xiàn)程,一開(kāi)始運(yùn)行在調(diào)用者所在的 main 線(xiàn)程上
println("${Thread.currentThread().name} : launch start")
async(Dispatchers.Default) { // 新建并啟動(dòng) async 協(xié)程,運(yùn)行在 Dispatchers.Default 的線(xiàn)程池中
println("${Thread.currentThread().name} : async start")
delay(100) // 掛起 async 協(xié)程 100 ms
println("${Thread.currentThread().name} : async end")
}.await() // 掛起 launch 協(xié)程,直到 async 協(xié)程結(jié)束
println("${Thread.currentThread().name} : launch end")
}
}
其中 launch 協(xié)程編譯生成的 SuspendLambda 子類(lèi)的invokeSuspend方法如下:
public final Object invokeSuspend(@NotNull Object result) {
Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch (this.label) {
case 0:
...
System.out.println(stringBuilder.append(currentThread.getName()).append(" : launch start").toString());
// 新建并啟動(dòng) async 協(xié)程
Deferred async$default = BuildersKt.async$default(coroutineScope, (CoroutineContext) Dispatchers.getDefault(), null, (Function2) new 1(null), 2, null);
this.label = 1;
// 調(diào)用 await() 掛起函數(shù)
if (async$default.await(this) == coroutine_suspended) {
return coroutine_suspended;
}
break;
case 1:
if (result instanceof Failure) {
throw ((Failure) result).exception;
}
// 恢復(fù)協(xié)程后再執(zhí)行一次 resumeWith(),然后無(wú)異常的話(huà)執(zhí)行最后的 println()
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
...
System.out.println(stringBuilder2.append(currentThread2.getName()).append(" : launch end").toString());
return Unit.INSTANCE;
}
上面代碼中 launch 協(xié)程掛起的關(guān)鍵在于async$default.await(this) == coroutine_suspended,如果此時(shí) async 線(xiàn)程未執(zhí)行完成,await()返回為IntrinsicsKt.getCOROUTINE_SUSPENDED(),就會(huì) return,launch 協(xié)程的invokeSuspend方法執(zhí)行完成,協(xié)程所在線(xiàn)程繼續(xù)往下運(yùn)行,此時(shí) launch 線(xiàn)程處于掛起狀態(tài)。所以協(xié)程掛起就是協(xié)程掛起點(diǎn)之前邏輯執(zhí)行完成,協(xié)程的運(yùn)算關(guān)鍵方法resumeWith()執(zhí)行完成,線(xiàn)程繼續(xù)執(zhí)行往下執(zhí)行其他邏輯。
協(xié)程掛起有三點(diǎn)需要注意的:
啟動(dòng)其他協(xié)程并不會(huì)掛起當(dāng)前協(xié)程,所以
launch和async啟動(dòng)線(xiàn)程時(shí),除非新協(xié)程運(yùn)行在當(dāng)前線(xiàn)程,則當(dāng)前協(xié)程只能在新協(xié)程運(yùn)行完成后繼續(xù)執(zhí)行,否則當(dāng)前協(xié)程都會(huì)馬上繼續(xù)運(yùn)行。協(xié)程掛起并不會(huì)阻塞線(xiàn)程,因?yàn)閰f(xié)程掛起時(shí)相當(dāng)于執(zhí)行完協(xié)程的方法,線(xiàn)程繼續(xù)執(zhí)行其他之后的邏輯。
掛起函數(shù)并一定都會(huì)掛起協(xié)程,例如
await()掛起函數(shù)如果返回值不等于IntrinsicsKt.getCOROUTINE_SUSPENDED(),則協(xié)程繼續(xù)執(zhí)行掛起點(diǎn)之后邏輯。
下面繼續(xù)分析await()的實(shí)現(xiàn)原理,它的實(shí)現(xiàn)中關(guān)鍵是調(diào)用了JobSupport.awaitSuspend()方法:
private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont ->
/*
* Custom code here, so that parent coroutine that is using await
* on its child deferred (async) coroutine would throw the exception that this child had
* thrown and not a JobCancellationException.
*/
val cont = AwaitContinuation(uCont.intercepted(), this)
cont.initCancellability()
invokeOnCompletion(ResumeAwaitOnCompletion(this, cont).asHandler)
cont.getResult()
}
private class ResumeAwaitOnCompletion<T>(
job: JobSupport,
private val continuation: AbstractContinuation<T>
) : JobNode<JobSupport>(job) {
override fun invoke(cause: Throwable?) {
val state = job.state
check(state !is Incomplete)
if (state is CompletedExceptionally) {
// Resume with exception in atomic way to preserve exception
continuation.resumeWithExceptionMode(state.cause, MODE_ATOMIC_DEFAULT)
} else {
// Resuming with value in a cancellable way (AwaitContinuation is configured for this mode).
@Suppress("UNCHECKED_CAST")
continuation.resume(state as T)
}
}
override fun toString() = "ResumeAwaitOnCompletion[$continuation]"
}
上面源碼中ResumeAwaitOnCompletion的invoke方法的邏輯就是調(diào)用continuation.resume(state as T)恢復(fù)協(xié)程。invokeOnCompletion函數(shù)里面是如何實(shí)現(xiàn) async 協(xié)程完成后自動(dòng)恢復(fù)之前協(xié)程的呢,源碼實(shí)現(xiàn)有些復(fù)雜,因?yàn)楹芏噙吔缜闆r處理就不全部展開(kāi),其中最關(guān)鍵的邏輯如下:
// handler 就是 ResumeAwaitOnCompletion 的實(shí)例,將 handler 作為節(jié)點(diǎn)
val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it }
// 將 node 節(jié)點(diǎn)添加到 state.list 中
if (!addLastAtomic(state, list, node)) return@loopOnState // retry
接下來(lái)我斷點(diǎn)調(diào)試 launch 協(xié)程恢復(fù)的過(guò)程,從 async 協(xié)程的SuspendLambda的子類(lèi)的completion.resumeWith(outcome) -> AbstractCoroutine.resumeWith(result) ..-> JobSupport.tryFinalizeSimpleState() -> JobSupport.completeStateFinalization() -> state.list?.notifyCompletion(cause) -> node.invoke,最后 handler 節(jié)點(diǎn)里面通過(guò)調(diào)用resume(result)恢復(fù)協(xié)程。
所以await()掛起函數(shù)恢復(fù)協(xié)程的原理是,將 launch 協(xié)程封裝為 ResumeAwaitOnCompletion 作為 handler 節(jié)點(diǎn)添加到 aynsc 協(xié)程的 state.list,然后在 async 協(xié)程完成時(shí)會(huì)通知 handler 節(jié)點(diǎn)調(diào)用 launch 協(xié)程的 resume(result) 方法將結(jié)果傳給 launch 協(xié)程,并恢復(fù) launch 協(xié)程繼續(xù)執(zhí)行 await 掛起點(diǎn)之后的邏輯。
而這過(guò)程中有兩個(gè)final的resumeWith 方法,一個(gè)是SuspendLambda的父類(lèi)BaseContinuationImpl的,我們?cè)賮?lái)詳細(xì)分析一篇:
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
public final override fun resumeWith(result: Result<Any?>) {
...
var param = result
while (true) {
with(current) {
val completion = completion!!
val outcome: Result<Any?> =
try {
// 調(diào)用 invokeSuspend 方法執(zhí)行,執(zhí)行協(xié)程的真正運(yùn)算邏輯
val outcome = invokeSuspend(param)
// 協(xié)程掛起時(shí) invokeSuspend 才會(huì)返回 COROUTINE_SUSPENDED,所以協(xié)程掛起時(shí),其實(shí)只是協(xié)程的 resumeWith 運(yùn)行邏輯執(zhí)行完成,再次調(diào)用 resumeWith 時(shí),協(xié)程掛起點(diǎn)之后的邏輯才能繼續(xù)執(zhí)行
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
// 這里可以看出 Continuation 其實(shí)分為兩類(lèi),一種是 BaseContinuationImpl,封裝了協(xié)程的真正運(yùn)算邏輯
if (completion is BaseContinuationImpl) {
// unrolling recursion via loop
current = completion
param = outcome
} else {
// 斷點(diǎn)時(shí)發(fā)現(xiàn) completion 是 DeferredCoroutine 實(shí)例,這里實(shí)際調(diào)用的是其父類(lèi) AbstractCoroutine 的 resumeWith 方法
completion.resumeWith(outcome)
return
}
}
}
}
}
接下來(lái)再來(lái)看另外一類(lèi) Continuation,AbstractCoroutine 的resumeWith實(shí)現(xiàn):
public abstract class AbstractCoroutine<in T>(
@JvmField
protected val parentContext: CoroutineContext,
active: Boolean = true
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
/**
* Completes execution of this with coroutine with the specified result.
*/
public final override fun resumeWith(result: Result<T>) {
// makeCompletingOnce 大致實(shí)現(xiàn)是修改協(xié)程狀態(tài),如果需要的話(huà)還會(huì)將結(jié)果返回給調(diào)用者協(xié)程,并恢復(fù)調(diào)用者協(xié)程
makeCompletingOnce(result.toState(), defaultResumeMode)
}
}
所以其中一類(lèi) Continuation BaseContinuationImpl的resumeWith封裝了協(xié)程的運(yùn)算邏輯,用以協(xié)程的啟動(dòng)和恢復(fù);而另一類(lèi) Continuation AbstractCoroutine,主要是負(fù)責(zé)維護(hù)協(xié)程的狀態(tài)和管理,它的resumeWith則是完成協(xié)程,恢復(fù)調(diào)用者協(xié)程。
2.4 協(xié)程的三層包裝
通過(guò)一步步的分析,慢慢發(fā)現(xiàn)協(xié)程其實(shí)有三層包裝。常用的launch和async返回的Job、Deferred,里面封裝了協(xié)程狀態(tài),提供了取消協(xié)程接口,而它們的實(shí)例都是繼承自AbstractCoroutine,它是協(xié)程的第一層包裝。第二層包裝是編譯器生成的SuspendLambda的子類(lèi),封裝了協(xié)程的真正運(yùn)算邏輯,繼承自BaseContinuationImpl,其中completion屬性就是協(xié)程的第一層包裝。第三層包裝是前面分析協(xié)程的線(xiàn)程調(diào)度時(shí)提到的DispatchedContinuation,封裝了線(xiàn)程調(diào)度邏輯,包含了協(xié)程的第二層包裝。三層包裝都實(shí)現(xiàn)了Continuation接口,通過(guò)代理模式將協(xié)程的各層包裝組合在一起,每層負(fù)責(zé)不同的功能。
下面是協(xié)程運(yùn)行的流程圖:

3. 小結(jié)
經(jīng)過(guò)以上解析之后,再來(lái)看協(xié)程就是一段可以?huà)炱鸷突謴?fù)執(zhí)行的運(yùn)算邏輯,而協(xié)程的掛起是通過(guò)掛起函數(shù)實(shí)現(xiàn)的,掛起函數(shù)用狀態(tài)機(jī)的方式用掛起點(diǎn)將協(xié)程的運(yùn)算邏輯拆分為不同的片段,每次運(yùn)行協(xié)程執(zhí)行的不同的邏輯片段。所以協(xié)程有兩個(gè)很大的好處:一是簡(jiǎn)化異步編程,支持異步返回;而是掛起不阻塞線(xiàn)程,提供線(xiàn)程利用率。