這一篇我們好好聊一聊協(xié)程的原理,通過上一篇的學(xué)習(xí),相信大家對于如何使用協(xié)程已經(jīng)非常熟悉了。
故事還得從上次的協(xié)程分享開始,由于大家對協(xié)程的實踐并不多,所以大家對下面的這段代碼如何執(zhí)行爭論不休:
GlobalScope.launch {
val a = async {
1+2
}
val b = async {
1+3
}
val c = a + b
Log.e(TAG,"result:$c")
}
復(fù)制代碼
有人說,a 和 b 會串行執(zhí)行,有人說,a 和 b 會并行執(zhí)行,那么執(zhí)行的結(jié)果到底是什么樣的?我們將在下面的文章給出。
一、結(jié)構(gòu)簡要介紹
首先,我們得明確協(xié)程中有哪些東西,如果你會使用協(xié)程,那你肯定知道協(xié)程中有 CoroutineScope、CoroutineContext 和 CoroutineDispatcher,這些都是使用過程中我們可以接觸到的 API。
我簡單的整理了協(xié)程中主要的基礎(chǔ)類:
協(xié)程的類結(jié)構(gòu)可分為三部分:CoroutineScope、CoroutineContext 和 Continuation。
1. Continuation
如果你會使用協(xié)程,那你肯定知道,協(xié)程遇到耗時 suspend 操作可以掛起,等到任務(wù)結(jié)束的時候,協(xié)程會自動切回來。
它的奧秘就是 Continuation,Continuation 可以理解程續(xù)體,你可以理解其每次在協(xié)程掛起點將剩余的代碼包括起來,等到結(jié)束以后執(zhí)行剩余的內(nèi)容。一個協(xié)程的代碼塊可能會被切割成若干個 Continuation,在每個需要掛起的地方都會分配一個 Continuation。
先拋出一些結(jié)論,協(xié)程在做耗時操作的時候,如果執(zhí)行了耗時 suspend 操作,會自動掛起,但是這個耗時操作終究是要做的,只不過切換到其他線程去做了,做完以后協(xié)程就需要切回來,但是切到哪兒呢?這便是 Continuation 需要解決的問題。
Continuation 的流程是這樣的:
<figcaption></figcaption>
無論是使用 launch 還是 async 啟動的協(xié)程,都會有一個結(jié)束的時候用來回調(diào)的 continuation。
2. CoroutineScope
關(guān)于 CoroutineScope 沒有特別多要說的,它持有了 CoroutineContext,主要對協(xié)程的生命周期進(jìn)行管理。
3. CoroutineContext
一開始看 CoroutineContext 覺得特別暈,不明白為啥要這么設(shè)計,看了 Bennyhuo 大佬的文章以后才稍微好轉(zhuǎn)。
從上面協(xié)程的類的機(jī)構(gòu)中可以看出,光看這個 CoroutineContext 這個接口(源碼內(nèi)容我們下面講),會發(fā)現(xiàn)它有點像 List 集合,而繼承自 CoroutineContext 接口的 Element 接口則定義了其中的元素。
隨后,這個 Element 接口被劃分成了兩種類,Job 和 ContinuationInterceptor:
-
Job:從字面上來講,它代表一個任務(wù),Thread也是執(zhí)行任務(wù),所以我們可以理解它定義了協(xié)程的一些東西,比如協(xié)程的狀態(tài),協(xié)程和子協(xié)程的管理方式等等。 -
ContinuationInterceptor:也從字面上來看,它是Continuation的攔截器,通過攔截Continuation,完成我們想要完成的工作,比如說線程的切換。
二、結(jié)構(gòu)源碼分析
上面我們從概念上介紹了協(xié)程的三大件,在這部分,我們從源碼分析。
1. Continuation
suspend 修飾的方法會在在編譯期間被編譯器做特殊處理,這種處理被成為CPS(續(xù)體轉(zhuǎn)換風(fēng)格) 轉(zhuǎn)化,suspend 方法會被包裹成 Continuation。
說了這么久的 Continuation,我們還沒有見過接口代碼,由于接口內(nèi)容不多,我就把所有的內(nèi)容貼出來了:
/**
* Interface representing a continuation after a suspension point that returns a value of type `T`.
*/
@SinceKotlin("1.3")
public interface Continuation<in T> {
/**
* The context of the coroutine that corresponds to this continuation.
*/
public val context: CoroutineContext
/**
* Resumes the execution of the corresponding coroutine passing a successful or failed [result] as the
* return value of the last suspension point.
*/
public fun resumeWith(result: Result<T>)
}
復(fù)制代碼
我們重點關(guān)注Continuation#resumeWith()方法,從注釋來看,通過返回 suspend 掛起點的值來恢復(fù)協(xié)程的執(zhí)行,協(xié)程可以從參數(shù) Result<T>) 獲取成功的值或者失敗的結(jié)果,如果沒有結(jié)果,那么 Result<T> 的泛型是 Unit。Resulut 這個類也特別簡單,感興趣的同學(xué)可以查看源碼。
BaseContinuationImpl 實現(xiàn)了 Continuation 接口,我們看一下 Continuation#resumeWith 方法的實現(xiàn):
internal abstract class BaseContinuationImpl(
// 完成后調(diào)用的 Continuation
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
// 1\. 執(zhí)行 suspend 中的代碼塊
val outcome = invokeSuspend(param)
// 2\. 如果代碼掛起就提前返回
if (outcome === COROUTINE_SUSPENDED) return
// 3\. 返回結(jié)果
Result.success(outcome)
} catch (exception: Throwable) {
// 3\. 返回失敗結(jié)果
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// 4\. 如果 completion 中還有子 completion,遞歸
current = completion
param = outcome
} else {
// 5\. 結(jié)果通知
completion.resumeWith(outcome)
return
}
}
}
}
}
復(fù)制代碼
主要的過程我在注釋中已經(jīng)標(biāo)注出來了,我來解釋一下 Continuation 的機(jī)制。
每個 suspend 方法生成的 BaseContinuationImpl ,其構(gòu)造方法有一個參數(shù)叫 completion ,它也是一個 Continuation,它的調(diào)用時機(jī)是在 suspen 方法執(zhí)行完畢的時候。我們后面稱
這個流程展示給我們的內(nèi)容很直觀了,簡單起見,我們直接看3、4和5這一個 launch 啟動流程就好,通常一個 launch 生成一個外層 Continuation一個相應(yīng)的結(jié)果 Continuation,我們后面稱結(jié)果 continuation 為 complete,Continuation 調(diào)用順序是:
- 調(diào)用外層
Continuation中的Continuation#resumeWith()方法。 - 該方法會去執(zhí)行
launch包裹的代碼塊,并返回一個結(jié)果。 - 將上述代碼塊執(zhí)行的結(jié)果交給
completion,由它完成協(xié)程結(jié)束的通知。
上述的過程只存在于一個 launch 并且里面沒有執(zhí)行其他耗時的掛起操作,對于這些情況,我們將會在下面的文章討論。
拋出問題一: 可以看到,在注釋2,遇到耗時的 suspend,返回的結(jié)果是一個 COROUTINE_SUSPENDED,后面會直接返回,耗時操作結(jié)束的時候,我們的 completion 怎么恢復(fù)呢?
2. CoroutineContext 和 Element
在概要分析的時候,我們說 CoroutineContext 的結(jié)構(gòu)像一個集合,是從它的接口得出結(jié)論的:
public interface CoroutineContext {
// get 方法,通過 key 獲取
public operator fun <E : Element> get(key: Key<E>): E?
// 累加操作
public fun <R> fold(initial: R, operation: (R, Element) -> R): R
// 操作符 + , 實際的實現(xiàn)調(diào)用了 fold 方法
public operator fun plus(context: CoroutineContext): CoroutineContext
// 移除操作
public fun minusKey(key: Key<*>): CoroutineContext
// CoroutineContext 定義的 Key
public interface Key<E : Element>
// CoroutineContext 中元素的定義
public interface Element : CoroutineContext {
// key
public val key: Key<*>
//...
}
}
復(fù)制代碼
從中我們可以大致看出,CoroutineContext 中可以通過 Key 來獲取元素 Element,并且 Element 接口也是繼承自 CoroutineContext 接口。
除此以外,CoroutineContext 支持增加和移除操作,并且支持 + 操作符來完成增加。+ 操作符即 plus 方法是有具體實現(xiàn)的,感興趣的可以自己看一下,主要涉及到了攔截器 ContinuationInterceptor 的添加。
1.1 Job
Job 的注釋中闡述定義是這樣的:
A background job. Conceptually, a job is a cancellable thing with a life-cycle that culminates in its completion.
從中我們可以得出:
- 后臺任務(wù)
- 可取消
- 生命周期在完成它的時候結(jié)束
從后臺任務(wù)的角度來看,Job 聽著有點像 Thread,和 Thread 一樣,Job 也有各種狀態(tài),文檔中對 Job 各種狀態(tài)的注釋(感覺大佬們的注釋寫的真棒~):
Job 另一個值得關(guān)注的點是對子 Job 的管理,主要的規(guī)則如下:
- 子
Job都會結(jié)束的時候,父Job才會結(jié)束 - 父
Job取消的時候,子Job也會取消
上述的一些內(nèi)容都可以從 Job 的接口文檔中得出。那么,Job哪里來的?如果你看一下CoroutineScope#launch方法,你就會得出結(jié)論,該方法的返回類型就是 Job,我們每次調(diào)用該方法,都會創(chuàng)建一個 Job。
1.2 ContinuationInterceptor
顧名思義,Continuation 攔截器,先看接口:
interface ContinuationInterceptor : CoroutineContext.Element {
// ContinuationInterceptor 在 CoroutineContext 中的 Key
companion object Key : CoroutineContext.Key<ContinuationInterceptor>
/**
* 攔截 continuation
*/
fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
//...
}
復(fù)制代碼
這個接口可以提煉的就這兩個信息:
- 攔截器的
Key,也就是說,無論你后面一個CoroutineContext放了多少個攔截器,Key為ContinuationInterceptor的攔截器只能有一個。 - 我們都知道,
Continuation在調(diào)用其Continuation#resumeWith()方法,會執(zhí)行其suspend修飾的函數(shù)的代碼塊,如果我們提前攔截到,是不是可以做點其他事情,比如說切換線程,這也是ContinuationInterceptor的作用之一。
需要說明一下,我們通過 Dispatchers 來指定協(xié)程發(fā)生的線程,Dispatchers 實現(xiàn)了 ContinuationInterceptor接口。
3. CoroutineScope
CoroutineScope 的接口很簡單:
public interface CoroutineScope {
public val coroutineContext: CoroutineContext
}
復(fù)制代碼
它要求后續(xù)的實現(xiàn)都要提供 CoroutineContext,不過我們都知道,CoroutineContext 是協(xié)程中很重要的東西,既包括 Job,也包括調(diào)度器。
在上面的代碼中,我多次使用了 Android Jetpack 中的 Lifecycle 中協(xié)程的擴(kuò)展庫,好處我們獲取 CoroutineScope 更加簡單,無需在組件 onDestroy 的時候手動 cancel,并且它的源碼超級簡單,前提是你會使用 Lifecycle:
internal class LifecycleCoroutineScopeImpl(
override val lifecycle: Lifecycle,
override val coroutineContext: CoroutineContext
) : LifecycleCoroutineScope(), LifecycleEventObserver {
// ...
override fun onStateChanged(source: LifecycleOwner, event: Lifecycle.Event) {
if (lifecycle.currentState <= Lifecycle.State.DESTROYED) {
lifecycle.removeObserver(this)
coroutineContext.cancel()
}
}
}
復(fù)制代碼
并且它也支持你在指定的生命周期調(diào)用協(xié)程,大家看一下接口就明白了。
三、過程源碼分析
先上一段使用代碼:
lifecycleScope.launch(Dispatchers.Main) {
val a = async { getResult(1, 2) }
val b = async { getResult(3, 5) }
val c = a.await() + b.await()
Log.e(TAG, "result:$c")
}
suspend fun getResult(a: Int, b: Int): Int {
return withContext(Dispatchers.IO) {
delay(1000)
return@withContext a + b
}
}
復(fù)制代碼
雖然代碼很簡單,但是源碼還是比較復(fù)雜的,我們分步講。
第一步 獲取 CoroutineScope
我已經(jīng)在上面說明了,我們使用的 Lifecycle 的協(xié)程拓展庫,如果我們不使用拓展庫,就得使用 MainScope,它們的 CoroutineContext 都是一樣的:
public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)
// LifecycleCoroutineScope
val Lifecycle.coroutineScope: LifecycleCoroutineScope
get() {
while (true) {
// ...
val newScope = LifecycleCoroutineScopeImpl(
this,
SupervisorJob() + Dispatchers.Main.immediate
)
// ...
return newScope
}
}
復(fù)制代碼
顯而易見,MainScope 和 LifecycleCoroutineScope 都使用了 SupervisorJob() + Dispatchers.Main, 作為它們的 CoroutineContext。
說明一下,SupervisorJob 和Dispatchers.Main 很重要,它們分別代表了CoroutineContext 之前提及的 Job 和 ContinuationInterceptor,后面用到的時候再分析。
第二步 啟動協(xié)程
直接進(jìn)入 CoroutineScope#launch() 方法:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
復(fù)制代碼
上面的方法一共有三個參數(shù),前兩個不作過多介紹,第三個參數(shù):
block: suspend CoroutineScope.() -> Unit)
復(fù)制代碼
這是一個方法,是一個 lambda 參數(shù),同時也表明了它需要被 suspend 修飾。 繼續(xù)看 launch 方法,發(fā)現(xiàn)它主要做了兩件事:
- 組合新的
CoroutineContext - 再創(chuàng)建一個
Continuation
組合新的CoroutineContext
在第一行代碼 val newContext = newCoroutineContext(context) 做了第一件事,這里的 newCoroutineContext(context) 是一個擴(kuò)展方法:
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
val combined = coroutineContext + context
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}
復(fù)制代碼
CoroutineScope 使用本身的 coroutineContext 集合,利用 + 操作符將我們在 launch 方法中提供的 coroutineContext 添加進(jìn)來。
再創(chuàng)建一個Continuation
回到上一段代碼,通常我們不會指定 start 參數(shù),所以它會使用默認(rèn)的 CoroutineStart.DEFAULT,最終 coroutine 會得到一個 StandaloneCoroutine。
StandaloneCoroutine 實現(xiàn)自 AbstractCoroutine,翻開上面的類圖,你會發(fā)現(xiàn),它實現(xiàn)了 Continuation、Job 和 CoroutineScope 等一堆接口。需要說明一下,這個 StandaloneCoroutine 其實是我們當(dāng)前 Suspend Contination 的 complete。
接著會調(diào)用
coroutine.start(start, coroutine, block)
復(fù)制代碼
這就表明協(xié)程開始啟動了。
第三步 start
進(jìn)入到 AbstractCoroutine#start 方法:
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
start(block, receiver, this)
}
復(fù)制代碼
跳過層層嵌套,最后到達(dá)了:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
// 外面再包一層 Coroutine
createCoroutineUnintercepted(receiver, completion)
// 如果需要,做攔截處理
.intercepted()
// 調(diào)用 resumeWith 方法
.resumeCancellableWith(Result.success(Unit))
}
復(fù)制代碼
雖然這僅僅是一個函數(shù),但是后面主要的邏輯都揭露了:
- 創(chuàng)建一個沒有攔截過的
Continuation。 - 攔截
Continuation。 - 執(zhí)行
Continuation#resumeWith方法。
第四步 又創(chuàng)建 Continuation
我這里用了 又,因為我們在 launch 中已經(jīng)創(chuàng)建了一個 AbstractContinuaion,不過它是一個 complete,從各個函數(shù)的行參就可以看出來。
不過我們 suspend 修飾的外層 Continuation 還沒有創(chuàng)建,它來了,是 SuspendLambda,它繼承自 ContinuationImpl,如果你問我為什么源碼中沒找到具體實現(xiàn),我覺得可能跟 suspend 修飾符有關(guān),由編譯器處理,但是調(diào)用棧確實是這樣的:
看一下 SuspendLambda 類的實現(xiàn):
internal abstract class SuspendLambda(
public override val arity: Int,
completion: Continuation<Any?>?
) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction {
constructor(arity: Int) : this(arity, null)
//...
}
復(fù)制代碼
可以看到,它的構(gòu)造方法的形參就包括一個 complete。
第五步 攔截處理
回到:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
// 外面再包一層 Coroutine
createCoroutineUnintercepted(receiver, completion)
// 如果需要,做攔截處理
.intercepted()
// 調(diào)用 resumeWith 方法
.resumeCancellableWith(Result.success(Unit))
}
復(fù)制代碼
里面的攔截方法 Continuation#intercepted() 方法是一個擴(kuò)展方法:
@SinceKotlin("1.3")
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this
復(fù)制代碼
createCoroutineUnintercepted(receiver, completion) 返回的是一個 SuspendLambda,所以它肯定是一個 ContinuationImpl,看一下它的攔截方法的實現(xiàn):
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
public override val context: CoroutineContext
get() = _context!!
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
// ...
}
復(fù)制代碼
在 ContinuationImpl#intercepted()方法中,直接利用 context 這個數(shù)據(jù)結(jié)構(gòu)通過 context[ContinuationInterceptor] 獲取攔截器。
CoroutineDispatcher攔截實現(xiàn)
我們都知道 ContinuationInterceptor 具有攔截作用,它的直接實現(xiàn)是 CoroutineDispatcher 這個抽象類,所有其他調(diào)度器都直接或者間接繼承這個類,我們關(guān)注一下它的攔截方法:
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
//...
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
// 1.攔截的 Continuation 被包了一層 DispatchedContinuation
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
//...
}
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation {
// ...
override fun resumeWith(result: Result<T>) {
// ...
if (dispatcher.isDispatchNeeded(context)) {
// 2\. 后面一個參數(shù)需要提供 Runnable,父類已經(jīng)實現(xiàn)
dispatcher.dispatch(context, this)
}
//...
}
// ...
}
// SchedulerTask 是一個 Runnable
internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
) : SchedulerTask() {
// ...
public final override fun run() {
// ...
try {
//...
withCoroutineContext(context, delegate.countOrElement) {
// 3\. continuation 是 DispatchedContinuation 包裹的 continuation
continuation.resume(...)
}
}
//...
}
}
復(fù)制代碼
簡單來說,就是對原有的 Continuation 的 resumeWith 操作加了一層攔截,就像這樣:
<figcaption></figcaption>
加入 CoroutineDispatcher 以后,執(zhí)行真正的 Continue#resumeWith() 之前,會執(zhí)行 CoroutineDispatcher#dispatch() 方法,所以我們現(xiàn)在關(guān)注 CoroutineDispatcher#dispatch 具體實現(xiàn)即可。
講一個CoroutineDispatcher具體實現(xiàn)
首先我們得明確這個 CoroutineDispatcher 來自哪里?它從 context 獲取,context來自哪里?
注意 SuspendLambda 和 ContinuationImpl 的構(gòu)造方法,SuspendLambda 中的參數(shù)沒有 CoroutineContext,所以只能來自 completion 中的 CoroutineContext,而completion 的 CoroutineContext 來自 launch 方法中來自 CoroutineScope,默認(rèn)是 SupervisorJob() + Dispatchers.Main,不過只有 Dispatchers.Main 繼承了 CoroutineDispatcher。
Dispatchers.Main 是一個 MainCoroutineDispatcher,Android 中對應(yīng)的 MainCoroutineDispatcher 是 HandlerContext:
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
public constructor(
handler: Handler,
name: String? = null
) : this(handler, name, false)
//...
override fun dispatch(context: CoroutineContext, block: Runnable) {
// 利用主線程的 Handler 執(zhí)行任務(wù)
handler.post(block)
}
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
// 利用主線程的 Handler 延遲執(zhí)行任務(wù),將完成的 continuation 放在任務(wù)中執(zhí)行
val block = Runnable {
with(continuation) { resumeUndispatched(Unit) }
}
handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
continuation.invokeOnCancellation { handler.removeCallbacks(block) }
}
//..
}
復(fù)制代碼
重點來了,調(diào)度任務(wù)最后竟然交給了主線程的 Handler,其實想想也對,主線程的任務(wù)最后一般都會交給主線程的 Handler。
好奇的同學(xué)可能問了,如果不是主線程呢?不是主線程就利用的線程池:
public open class ExperimentalCoroutineDispatcher(
private val corePoolSize: Int,
private val maxPoolSize: Int,
private val idleWorkerKeepAliveNs: Long,
private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
// 執(zhí)行期
override val executor: Executor
get() = coroutineScheduler
private var coroutineScheduler = createScheduler()
override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
try {
coroutineScheduler.dispatch(block)
} catch (e: RejectedExecutionException) {
DefaultExecutor.dispatch(context, block)
}
}
復(fù)制代碼
結(jié)果可以說是很清晰了,coroutineScheduler 是一個線程池,如果像了解具體的過程,同學(xué)們可以自行查看代碼。
讀到這里,你可能有一點明白 CoroutineContext 為什么要設(shè)計成一種數(shù)據(jù)結(jié)構(gòu):
-
coroutineContext[ContinuationInterceptor]就可以直接取到當(dāng)前協(xié)程的攔截器,并且一個協(xié)程只能對應(yīng)一個調(diào)度器。 - 調(diào)度器都放在其他
coroutineContext的前面,所以在執(zhí)行協(xié)程的時候,可以做攔截處理。
同理,我們也可以使用 coroutineContext[Job] 獲取當(dāng)前協(xié)程。
第六步 resumeWith
再次回到:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
// 外面再包一層 Coroutine
createCoroutineUnintercepted(receiver, completion)
// 如果需要,做攔截處理
.intercepted()
// 調(diào)用 resumeWith 方法
.resumeCancellableWith(Result.success(Unit))
}
復(fù)制代碼
現(xiàn)在我們看 Continue#resumeCancellableWith() 方法,它是一個擴(kuò)展方法,里面的調(diào)度邏輯是:
DispatchContinuation#resumeCancellableWithCoroutineDispatcher#dispatchContinuation#resumeWith
這里的 Continuation 就是 SuspendLambda,它繼承了 BaseContinuationImpl,我們看一下它的實現(xiàn)方法:
internal abstract class BaseContinuationImpl(
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
// This implementation is final. This fact is used to unroll resumeWith recursion.
public final override fun resumeWith(result: Result<Any?>) {
// This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
var current = this
var param = result
while (true) {
// Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
// can precisely track what part of suspended callstack was already resumed
probeCoroutineResumed(current)
with(current) {
val completion = completion!! // fail fast when trying to resume continuation without completion
val outcome: Result<Any?> =
try {
// 1\. 執(zhí)行 suspend 里面的代碼塊
val outcome = invokeSuspend(param)
// 2\. 如果代碼塊里面執(zhí)行了掛起方法,會提前返回
if (outcome === COROUTINE_SUSPENDED) return
Result.success(outcome)
} catch (exception: Throwable) {
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
// 3\. 如果完成的completion也是BaseContinuationImpl,就會進(jìn)入循環(huán)
current = completion
param = outcome
} else {
// 4\. 執(zhí)行 completion resumeWith 方法
completion.resumeWith(outcome)
return
}
}
}
}
}
復(fù)制代碼
這邊被我分為2個部分:
- 執(zhí)行
suspend方法,并獲取結(jié)果 - 調(diào)用
complete(放在下一步講)
執(zhí)行suspend方法
在第一處會先執(zhí)行 suspend 修飾的方法內(nèi)容,在方法里面可能又會調(diào)度 suspend 方法,比如說我們的實例方法:
lifecycleScope.launch(Dispatchers.Main) {
val a = async { getResult(1, 2) }
val b = async { getResult(3, 5) }
val c = a.await() + b.await()
Log.e(TAG, "result:$c")
}
suspend fun getResult(a: Int, b: Int): Int {
return withContext(Dispatchers.IO) {
delay(1000)
return@withContext a + b
}
}
復(fù)制代碼
因為我們在 getResult 執(zhí)行了延時操作,所以我們 launch 方法肯定執(zhí)行了耗時掛起方法,所以 BaseContinuationImpl#invokeSuspend 方法會返回一個 COROUTINE_SUSPENDED ,結(jié)果你也看到了,該方法會提前結(jié)束。(說明一下,我沒有找到BaseContinuationImpl#invokeSuspend 方法的具體實現(xiàn),我猜可能跟編譯器有關(guān))
我猜你肯定跟我一樣好奇,遇到耗時掛起會提前返回,那么耗時掛起如何對 complete 進(jìn)行恢復(fù)的?
我們看一下 delay(1000) 這個延時操作在主線程是如何處理的:
public suspend fun delay(timeMillis: Long) {
if (timeMillis <= 0) return // don't delay
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
}
}
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
//...
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val block = Runnable {
with(continuation) { resumeUndispatched(Unit) }
}
handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
continuation.invokeOnCancellation { handler.removeCallbacks(block) }
}
//...
}
復(fù)制代碼
可以看到,將恢復(fù)任務(wù)包了一個 Runnable,交給 Handler 的 Handler#postDelayed() 方法了。
第七步 complete resumeWith
對于 complete 的處理一般會有兩種。
complete是BaseContinuationImpl
第一種情況是我們稱之為套娃,完成回調(diào)的 Continuation 它本身也有自己的完成回調(diào) Continuation,接下來循環(huán)就對了。
調(diào)用complete的resumeWith
第二種情況,就是通過 complete 去完成回調(diào),由于 complete 是 AbstractContinuation,我們看一下它的 resumeWith:
public abstract class AbstractCoroutine<in T>(
/**
* The context of the parent coroutine.
*/
@JvmField
protected val parentContext: CoroutineContext,
active: Boolean = true
) : JobSupport(active), Job, Continuation<T>, CoroutineScope {
// ...
public final override fun resumeWith(result: Result<T>) {
// 1\. 獲取當(dāng)前協(xié)程的技術(shù)狀態(tài)
val state = makeCompletingOnce(result.toState())
// 2\. 如果當(dāng)前還在等待完成,說明還有子協(xié)程沒有結(jié)束
if (state === COMPLETING_WAITING_CHILDREN) return
// 3\. 執(zhí)行結(jié)束恢復(fù)的方法,默認(rèn)為空
afterResume(state)
}
// 這是父類 JobSupport 中的 makeCompletingOnce 方法
// 為了方便查看,我復(fù)制過來
internal fun makeCompletingOnce(proposedUpdate: Any?): Any? {
loopOnState { state ->
// tryMakeCompleting 的內(nèi)容主要根據(jù)是否有子Job做不同處理
val finalState = tryMakeCompleting(state, proposedUpdate)
when {
finalState === COMPLETING_ALREADY ->
throw IllegalStateException(
"Job $this is already complete or completing, " +
"but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull
)
finalState === COMPLETING_RETRY -> return@loopOnState
else -> return finalState // COMPLETING_WAITING_CHILDREN or final state
}
}
}
}
復(fù)制代碼
這段代碼的意思其實也很簡單,就是協(xié)程即將完成,得先評估一下協(xié)程的技術(shù)狀態(tài),別協(xié)程還有東西在運(yùn)行,就給結(jié)束了。對于一些有子協(xié)程的一些協(xié)程,會等待子協(xié)程結(jié)束的時候,才會結(jié)束當(dāng)前協(xié)程。
一個 launch 的過程大概就是這樣了。大致的流程圖是這樣的:
下面我們再談?wù)?async。
四、關(guān)于async
async 和 launch 的代碼相似度很高:
public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T> {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyDeferredCoroutine(newContext, block) else
DeferredCoroutine<T>(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
復(fù)制代碼
最終也會進(jìn)行三步走:
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
// 外面再包一層 Coroutine
createCoroutineUnintercepted(receiver, completion)
// 如果需要,做攔截處理
.intercepted()
// 調(diào)用 resumeWith 方法
.resumeCancellableWith(Result.success(Unit))
}
復(fù)制代碼
不同的是,async 返回的是一個 Deferred<T>,我們需要調(diào)用 Deferred#await() 去獲取返回結(jié)果,它的實現(xiàn)在 JobSupport:
private open class DeferredCoroutine<T>(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<T>(parentContext, active), Deferred<T>, SelectClause1<T> {
// ... awaitInternal方法來自父類 JobSupport
override suspend fun await(): T = awaitInternal() as T
// ...
// 這是 JobSupport 中的實現(xiàn)
internal suspend fun awaitInternal(): Any? {
// 循環(huán)獲取結(jié)果
while (true) { // lock-free loop on state
val state = this.state
// 1\. 如果處于完成狀態(tài)
if (state !is Incomplete) {
if (state is CompletedExceptionally) { // Slow path to recover stacktrace
recoverAndThrow(state.cause)
}
return state.unboxState()
}
// 2\. 除非需要重試,不然就 break
if (startInternal(state) >= 0) break
}
// 等待掛起的方法
return awaitSuspend() // slow-path
}
}
復(fù)制代碼
它的具體過程可以從我的注釋看出,就不一一介紹了,感興趣的同學(xué)可以查看源碼。
1. 本文一開始的討論
本文一開始的代碼是錯的,連編譯器都過不了,尷尬~
正確的代碼應(yīng)該是:
GlobalScope.launch {
val a = async {
1+2
}
val b = async {
1+3
}
val c = a.await() + bawait()
Log.e(TAG,"result:$c")
}
復(fù)制代碼
如果是正確的代碼,這里可能分兩種情況:
如果你放在UI線程,那肯定是串行的,這時候有人說,我在 a 里使用 delay(1000),在 b 里使用 delay(2000),得到 c 的時候就花了 2000 毫秒啊,這不是并行嗎?事情并不是這樣的,delay 操作使用了 Handler#postDelay 方法,一個延遲了 1000 毫秒執(zhí)行,一個延遲了 2000 毫秒執(zhí)行,但是主線程只有一個,所以只能是串行。
如果是子線程,通常都是并行的,因為我們使用了線程池啊~
總結(jié)
寫這邊源碼分析的時候,一些細(xì)節(jié)總是找不到,比如說 suspendLambda 的子類找不到,自己對 Kotlin 的學(xué)習(xí)有待深入。
所以本文有些地方還值得商榷,如果你有更好的理解,歡迎下方交流。