1. Kotlin協(xié)程作用
Kotlin協(xié)程是一套基于Java Thread的線程框架,最大的特點(diǎn)就是可以1,用同步的方式寫(xiě)出異步代碼,并且2,不阻塞當(dāng)前線程。
2. cps轉(zhuǎn)換
2.1 cps轉(zhuǎn)換示例
//編譯前
private suspend fun testCPS(): String {
withContext(Dispatchers.IO) {
return "testCPS"
}
}
//編譯后
private final Object testCPS(Continuation $completion) {
return "testCPS";
}
Kotlin 的編譯器檢測(cè)到 suspend 關(guān)鍵字修飾的函數(shù)后,會(huì)進(jìn)行cps轉(zhuǎn)換,轉(zhuǎn)換點(diǎn):
1,函數(shù)中增加了一個(gè)Continuation類(lèi)型的參數(shù);
2,函數(shù)返回值變?yōu)镺bject(本例String->Object)
注意:這里suspend函數(shù)即使沒(méi)有使用withContext開(kāi)啟一個(gè)協(xié)程,也會(huì)進(jìn)行cps轉(zhuǎn)換。說(shuō)明只要suspend函數(shù),不管開(kāi)不開(kāi)啟協(xié)程,編譯器都會(huì)對(duì)其進(jìn)行cps轉(zhuǎn)換。
2.2 參數(shù)Continuation
public interface Continuation<in T> {
public val context: CoroutineContext
// 相當(dāng)于 onSuccess 結(jié)果
// ↓ ↓
public fun resumeWith(result: Result<T>)
}
interface CallBack {
void onSuccess(String response);
}
1,Continuation :續(xù)體,可以理解為剩余要執(zhí)行的代碼。協(xié)程體中的異步操作被狀態(tài)機(jī)分割成不同的片段,分片段執(zhí)行,執(zhí)行完一部分,剩下的部分叫做續(xù)體。
2,Continuation 是一個(gè)接口,和一般回調(diào)接口定義類(lèi)似,可以判斷,協(xié)程的思想其實(shí)就是回調(diào)。
Continuation 定義看一個(gè)協(xié)程上下文屬性context,一個(gè)方法聲明resumeWith(),用于協(xié)程1,啟動(dòng)(DispatchedContinuation),2,掛起時(shí)恢復(fù)(BaseContinuationImpl),或者3,協(xié)程運(yùn)行完成時(shí)的回調(diào)(AbstractCoroutine);
2.2 返回值 Object
為什么返回值從String->Object?
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
...
val coroutine = DispatchedCoroutine(newContext, uCont)
coroutine.initParentJob()
block.startCoroutineCancellable(coroutine, coroutine)
coroutine.getResult()
}
}
fun getResult(): Any? {
if (trySuspend()) return COROUTINE_SUSPENDED
// otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
val state = this.state.unboxState()
if (state is CompletedExceptionally) throw state.cause
@Suppress("UNCHECKED_CAST")
return state as T
}
可以看出withContext的返回值有兩種:
1,如果當(dāng)前withContext中的異步操作沒(méi)有完成,返回COROUTINE_SUSPENDED,協(xié)程框架根據(jù)這個(gè)字段掛起協(xié)程(其實(shí)就是直接return);
2,如果當(dāng)前withContext中的異步操作已經(jīng)完成,返回對(duì)應(yīng)操作執(zhí)行后的返回值(對(duì)應(yīng)String)
3. 協(xié)程狀態(tài)機(jī)
3.1 協(xié)程狀態(tài)機(jī)
//編譯前
class TestCoroutine {
private fun startCoroutine() {
// funTest協(xié)程體
val funTest: suspend CoroutineScope.() -> Unit = {
println("funTest")
suspendFun1()
suspendFun2()
}
GlobalScope.launch(Dispatchers.Default, block = funTest)
}
// 掛起函數(shù)
suspend fun suspendFun1() {
println("suspendFun1")
}
// 掛起函數(shù)
suspend fun suspendFun2() {
println("suspendFun2")
}
}
//編譯后
public final class TestCoroutine {
private final void startCoroutine() {
Function2 funTest = (Function2)(new Function2((Continuation)null) {
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result) {
Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
TestCoroutine var10000;
switch(this.label) {
case 0:
ResultKt.throwOnFailure($result);
String var2 = "funTest";
boolean var3 = false;
System.out.println(var2);
var10000 = TestCoroutine.this;
this.label = 1;
if (var10000.suspendFun1(this) == var4) {
return var4;
}
break;
case 1:
ResultKt.throwOnFailure($result);
break;
case 2:
ResultKt.throwOnFailure($result);
return Unit.INSTANCE;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
}
var10000 = TestCoroutine.this;
this.label = 2;
if (var10000.suspendFun2(this) == var4) {
return var4;
} else {
return Unit.INSTANCE;
}
}
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
Intrinsics.checkNotNullParameter(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
return var3;
}
public final Object invoke(Object var1, Object var2) {
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
}
});
BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)Dispatchers.getDefault(), (CoroutineStart)null, funTest, 2, (Object)null);
}
@Nullable
public final Object suspendFun1(@NotNull Continuation $completion) {
System.out.println(""suspendFun1"");
return Unit.INSTANCE;
}
@Nullable
public final Object suspendFun2(@NotNull Continuation $completion) {
System.out.println("suspendFun2");
return Unit.INSTANCE;
}
}
(其他文章描述)在反編譯的代碼中,協(xié)程體funTest被編譯成一個(gè)繼承SuspendLambda的類(lèi),在類(lèi)中實(shí)現(xiàn)create(),invokeSuspend()兩個(gè)方法:
create()創(chuàng)建了一個(gè)協(xié)程體funTest類(lèi)的實(shí)例;
invokeSuspend()方法執(zhí)行具體的協(xié)程操作。
(我反編譯的代碼)沒(méi)搞懂的地方:
1,只創(chuàng)建了一個(gè)Function2的對(duì)象;
2,create方法創(chuàng)建的是匿名對(duì)象(anonymous constructor);
3,invoke方法是什么作用
基本原理:
協(xié)程體會(huì)被編譯成一個(gè)SuspendLambda的子類(lèi),在這個(gè)類(lèi)的invokeSuspend方法中,協(xié)程體中的suspend方法會(huì)被分割到switch不同的分支中,每個(gè)suspend方法會(huì)被cps機(jī)制轉(zhuǎn)換成帶有一個(gè)Continuation參數(shù)的方法。
通過(guò)一個(gè)label標(biāo)簽控制分支代碼執(zhí)行,label為0,首先會(huì)進(jìn)入第一個(gè)分支,首先將label設(shè)置為下一個(gè)分支的數(shù)值,然后執(zhí)行第一個(gè)suspend方法并傳遞當(dāng)前Continuation,得到返回值,如果是COROUTINE_SUSPENDED,協(xié)程框架就直接return,協(xié)程掛起,當(dāng)?shù)谝粋€(gè)suspend方法執(zhí)行完成,會(huì)回調(diào)Continuation的invokeSuspend方法,進(jìn)入第二個(gè)分支執(zhí)行,以此類(lèi)推執(zhí)行完所有suspend方法。
如果suspend方法直接返回執(zhí)行結(jié)果,那invokeSuspend后面的代碼怎么執(zhí)行?
3.2 SuspendLambda類(lèi)圖
4 協(xié)程相關(guān)概念
4.1 CoroutineContext
launch函數(shù)是CoroutineScope的一個(gè)擴(kuò)展函數(shù),CoroutineScope只是一個(gè)接口,但是可以通過(guò)CoroutineScope的擴(kuò)展方法進(jìn)行協(xié)程的創(chuàng)建,除了launch函數(shù)還有async函數(shù)。
CoroutineScope除了通過(guò)擴(kuò)展函數(shù)創(chuàng)建協(xié)程還有其它兩個(gè)作用,launch函數(shù)返回一個(gè)Job對(duì)象,可以通過(guò)這個(gè)Job管理協(xié)程,另外CoroutineScope為協(xié)程提供一個(gè)上下文CoroutineContext。
CoroutineContext協(xié)程的上下文,這是一個(gè)數(shù)據(jù)集合接口聲明,協(xié)程中Job、Dispatcher調(diào)度器都可以是它的元素,CoroutineContext有一個(gè)非常好的作用就是我們可以通過(guò)它拿到Job、Dispatcher調(diào)度器等數(shù)據(jù)。
CombinedContext是CoroutineContext接口的具體實(shí)現(xiàn)類(lèi),存在兩個(gè)屬性,其中element是一個(gè)Element,代表集合的元素,left是一個(gè)CoroutineContext,代表鏈表的下一個(gè)節(jié)點(diǎn)。
通過(guò)CoroutineContext#plus可以看出,CoroutineContext的數(shù)據(jù)存儲(chǔ)方式是一個(gè)左向鏈表,鏈表的每一個(gè)節(jié)點(diǎn)是CombinedContext,并且存在攔截器的情況下,攔截器永遠(yuǎn)是鏈表尾部的元素,這樣設(shè)計(jì)目的是因?yàn)閿r截器的使用頻率很高,為了更快的讀取攔截器;
沒(méi)看懂這個(gè)左向鏈表實(shí)現(xiàn),現(xiàn)在只要知道這是個(gè)集合,類(lèi)似list,但是它有一個(gè)left元素始終在表尾,存儲(chǔ)攔截器,
4.2 CoroutineStart 啟動(dòng)模式
CoroutineStart 是協(xié)程的啟動(dòng)模式,存在以下4種模式:
DEFAULT 立即調(diào)度,可以在執(zhí)行前被取消
LAZY 需要時(shí)才啟動(dòng),需要start、join等函數(shù)觸發(fā)才可進(jìn)行調(diào)度
ATOMIC 立即調(diào)度,協(xié)程肯定會(huì)執(zhí)行,執(zhí)行前不可以被取消
UNDISPATCHED 立即在當(dāng)前線程執(zhí)行,直到遇到第一個(gè)掛起點(diǎn)(可能切線程)
5. 協(xié)程啟動(dòng)
// launch是CoroutineScope的一個(gè)擴(kuò)展函數(shù)
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.()
): Job {
// CoroutineContext創(chuàng)建一個(gè)新的Context
val newContext = newCoroutineContext(context)
// 啟動(dòng)模式的判斷
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
launch函數(shù)存在3個(gè)參數(shù):
CoroutineContext 協(xié)程的上下文
CoroutineStart 協(xié)程的啟動(dòng)模式
suspend CoroutineScope.() -> Unit 協(xié)程體
newCoroutineContext()是CoroutineScope的一個(gè)擴(kuò)展方法,它的作用就是將傳參context與CoroutineScope中的CoroutineContext集合合并,并返回一個(gè)新的CoroutineContext,如果傳入Dispatchers.Default,就是將Dispatchers.Default與CoroutineScope中的CoroutineContext合并。
根據(jù)啟動(dòng)模式,構(gòu)建一個(gè)AbstractCoroutine的子類(lèi)(協(xié)程對(duì)象都繼承AbstractCoroutine),如果是默認(rèn)模式,則創(chuàng)建StandaloneCoroutine,并調(diào)用它的start方法。并將StandaloneCoroutine又作為job返回。
5.1 AbstractCoroutine
AbstractCoroutine繼承或者實(shí)現(xiàn)了JobSupport、Job、Continuation、CoroutineScope。
JobSupport是Job的具體實(shí)現(xiàn),AbstractCoroutine可以作為一個(gè)Job控制協(xié)程的生命周期,同時(shí)實(shí)現(xiàn)Continuation接口,也可以作為一個(gè)Continuation,重寫(xiě)的resmueWith()方法的一個(gè)重要作用是恢復(fù)協(xié)程
AbstractCoroutine#resmueWith
public final override fun resumeWith(result: Result<T>) {
val state = makeCompletingOnce(result.toState())
// 子協(xié)程未完成,父協(xié)程需要等待子協(xié)程完成之后才可以完成
if (state === COMPLETING_WAITING_CHILDREN) return
// 子協(xié)程全部執(zhí)行完成或者沒(méi)有子協(xié)程的情況下不需要等待
afterResume(state)
}
protected open fun afterResume(state: Any?): Unit = afterCompletion(state)
// JobSupport#afterCompletion
protected open fun afterCompletion(state: Any?) {}
在AbstractCoroutine#resmueWith中首先根據(jù)JobSupport#makeCompletingOnce返回狀態(tài)判斷,協(xié)程是否處于等待子協(xié)程完成的狀態(tài):
state == COMPLETING_WAITING_CHILDREN 等待子協(xié)程完成,自身才可完成。子協(xié)程完成后,觸發(fā)afterCompletion()
state != COMPLETING_WAITING_CHILDREN 沒(méi)有子協(xié)程或者所有子協(xié)程已經(jīng)完成,自身可以完成,直接觸發(fā)afterCompletion()
協(xié)程對(duì)象可以通過(guò)重寫(xiě)afterCompletion()處理協(xié)程完成之后的操作,下文中的協(xié)程恢復(fù)章節(jié)中,withContext()中DispatchedCoroutine協(xié)程對(duì)象,通過(guò)afterCompletion()恢復(fù)了外層的協(xié)程的運(yùn)行。
AbstractCoroutine#start()
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
...
//block :協(xié)程體 //receiver:協(xié)程對(duì)象 //this:AbstractCoroutine(也是協(xié)程對(duì)象)
start(block, receiver, this)
}
CoroutineStart#invoke
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
when (this) {
//completion:start傳過(guò)來(lái)的AbstractCoroutine
DEFAULT -> block.startCoroutineCancellable(receiver, completion)
ATOMIC -> block.startCoroutine(receiver, completion)
UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
LAZY -> Unit // will start lazily
}
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>,
onCancellation: ((cause: Throwable) -> Unit)? = null
) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
}
AbstractCoroutine#start()調(diào)用了CoroutineStart的invoke()方法,然后根據(jù)啟動(dòng)模式調(diào)用block對(duì)應(yīng)的協(xié)程啟動(dòng)方法,block.startCoroutineCancellable(receiver, completion) 中是一個(gè)鏈?zhǔn)秸{(diào)用流程。
createCoroutineUnintercepted()
public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
completion: Continuation<T>
): Continuation<Unit> {
// probeCompletion :AbstractCoroutine
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
create(probeCompletion)
else
createCoroutineFromSuspendFunction(probeCompletion) {
(this as Function1<Continuation<T>, Any?>).invoke(it)
}
}
// continuation:AbstractCoroutine
public final Continuation<Unit> create(Object obj, Continuation<?> continuation) {
...
MainActivity$startCoroutine$funTest$1 mainActivity$startCoroutine$funTest$1 = new MainActivity$startCoroutine$funTest$1(this.this$0, continuation);
return mainActivity$startCoroutine$funTest$1;
}
createCoroutineUnintercepted中通過(guò)調(diào)用create(probeCompletion)創(chuàng)建了一個(gè)協(xié)程體類(lèi)的對(duì)象。
createCoroutineUnintercepted()是一個(gè)擴(kuò)展函數(shù),通過(guò)協(xié)程體block調(diào)用,所以源碼中this is BaseContinuationImpl的判斷中this指協(xié)程體類(lèi),編譯章節(jié)中協(xié)程體被編譯成SuspendLambda的子類(lèi);這里的create函數(shù)就是SuspendLambda的子類(lèi)中的create函數(shù)。
這里的block對(duì)象和協(xié)程體類(lèi)對(duì)象是什么關(guān)系?有什么區(qū)別?
注意看下構(gòu)造函數(shù)的參數(shù)continuation,這里continuation就是AbstractCoroutine,在協(xié)程體類(lèi)的繼承鏈中,這個(gè)continuation一直傳遞到了BaseContinuationImpl父類(lèi)中,用于后續(xù)協(xié)程的恢復(fù)。
注意:這里將AbstractCoroutine對(duì)象傳遞給了協(xié)程類(lèi)對(duì)象,進(jìn)行了第一層代理。
繼續(xù)分析intercepted()
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this
ContinuationImpl#intercepted
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this) .also { intercepted = it }
CoroutineDispatcher#interceptContinuation
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
//this是Dispatcher,continuation是協(xié)程體類(lèi)對(duì)象
DispatchedContinuation(this, continuation)
intercepted()協(xié)程體類(lèi)對(duì)象轉(zhuǎn)換成ContinuationImpl,然后調(diào)用了ContinuationImpl的intercepted方法,intercepted方法中調(diào)用context[ContinuationInterceptor] 從協(xié)程類(lèi)對(duì)象的CoroutineContext集合中取到調(diào)度器CoroutineDispatcher(這個(gè)CoroutineContext是launch是構(gòu)建的,并傳遞到StandaloneCoroutine對(duì)象中),并調(diào)用調(diào)度器CoroutineDispatcher的interceptContinuation(),interceptContinuation()的作用是將協(xié)程體Continuation對(duì)象包裝成一個(gè)DispatchedContinuation。
注意:這里將協(xié)程類(lèi)對(duì)象傳遞給了DispatchedContinuationd對(duì)象,進(jìn)行了第二層代理
5.2 CoroutineDispatcher
CoroutineDispatcher
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
...
// 是否需要線程調(diào)度
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
// 線程調(diào)度,讓一個(gè)runable對(duì)象在指定線程運(yùn)行
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
// 將協(xié)程體對(duì)象continuation封裝為一個(gè)DispatchedContinuation對(duì)象
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
...
}
CoroutineDispatcher的作用是進(jìn)行任務(wù)的線程切換。
CoroutineDispatcher實(shí)現(xiàn)了ContinuationInterceptor,代表是一個(gè)攔截器;
實(shí)現(xiàn)CoroutineContext接口,存儲(chǔ)在CoroutineContext的left節(jié)點(diǎn)。
CoroutineContext[ContinuationInterceptor]就可以在CoroutineContext集合中獲取到攔截器。
為啥要通過(guò)攔截器去代理Continuation,直接使用DispatchedContinuation包裝不就行了嗎??
5.3 DispatchedContinuation
接著分析resumeCancellableWith(Result.success(Unit), onCancellation)
DispatchedContinuation
internal class DispatchedContinuation<in T>(
// 調(diào)度器
@JvmField val dispatcher: CoroutineDispatcher,
// 協(xié)程體Continuation對(duì)象
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation {
// 使用delegate存儲(chǔ)當(dāng)前對(duì)象
override val delegate: Continuation<T>
get() = this
// ATOMIC啟動(dòng)模式
override fun resumeWith(result: Result<T>) {
val context = continuation.context
val state = result.toState()
// 是否需要線程調(diào)度
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_ATOMIC_DEFAULT
// dispatch 調(diào)度線程,第二個(gè)參數(shù)是一個(gè)Runnable類(lèi)型,這里傳參this也就是DispatchedContinuation自身
// DispatchedContinuation實(shí)際上也是一個(gè)Runnable對(duì)象,調(diào)用調(diào)度器的dispatch方法之后就可以使這個(gè)runnable在指定的線程運(yùn)行了
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_ATOMIC_DEFAULT) {
withCoroutineContext(this.context, countOrElement) {
// 不需要調(diào)度,執(zhí)行協(xié)程體的resumeWith
continuation.resumeWith(result)
}
}
}
}
// 默認(rèn)啟動(dòng)模式
inline fun resumeCancellableWith(result: Result<T>) {
val state = result.toState()
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled()) {
resumeUndispatchedWith(result)
}
}
}
}
}
DispatchedContinuation 代理協(xié)程體類(lèi)對(duì)象(SuspendLambda)并持有線程調(diào)度器(CoroutineDispatcher),它的作用就是使用線程調(diào)度器將協(xié)程體調(diào)度到指定的線程執(zhí)行。
DispatchedContinuation也實(shí)現(xiàn)了Continuation接口,并重寫(xiě)resumeWith(),首先
根據(jù)dispatcher.isDispatchNeeded(context)判斷需要線程切換:
1.如果需要線程調(diào)度,則調(diào)用dispatcher#dispatch進(jìn)行調(diào)度,而dispatch()的第二個(gè)參數(shù)是一個(gè)runnable對(duì)象(這里傳參為this,即DispatchedContinuation對(duì)象本身,DispatchedContinuation同時(shí)還實(shí)現(xiàn)了Runnable接口),這個(gè)runnable就會(huì)運(yùn)行在調(diào)度的線程上;
2.不需要調(diào)度則直接調(diào)用協(xié)程體類(lèi)continuation對(duì)象的resumeWith(),前面的章節(jié)中提到,協(xié)程體的運(yùn)行就是協(xié)程體類(lèi)Continuation對(duì)象的resumeWith()被觸發(fā),所以這里就會(huì)讓協(xié)程體在當(dāng)前線程運(yùn)行;
另外還有一個(gè)方法resumeCancellableWith(),它和resumeWith()的實(shí)現(xiàn)很類(lèi)似,在不同的啟動(dòng)模式下調(diào)度線程的方法調(diào)用不同。比如默認(rèn)的啟動(dòng)模式調(diào)用resumeCancellableWith(),ATOMIC啟動(dòng)模式則調(diào)用resumeWith()。
public actual object Dispatchers {
@JvmStatic
public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
@JvmStatic
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
@JvmStatic
public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
@JvmStatic
public val IO: CoroutineDispatcher = DefaultScheduler.IO
}
internal actual fun createDefaultDispatcher(): CoroutineDispatcher =
if (useCoroutinesScheduler) DefaultScheduler else CommonPool
可以看到Default和IO 底層還是使用了線程池進(jìn)行調(diào)度;Main使用了handler進(jìn)行調(diào)度。
5.4 DispatchedTask
internal abstract class DispatchedTask<in T>(
@JvmField public var resumeMode: Int
) : SchedulerTask() {
’
// 在DispatchedContinuation中重寫(xiě)了該屬性,delegate實(shí)際是指DispatchedContinuation對(duì)象
internal abstract val delegate: Continuation<T>
public final override fun run() {
...
val delegate = delegate as DispatchedContinuation<T>
// 通過(guò)delegate拿到原始協(xié)程體Continuation對(duì)象
val continuation = delegate.continuation
...
// 調(diào)用協(xié)程體類(lèi)對(duì)象的resume
continuation.resume(getSuccessfulResult(state))
...
}
}
// Continuation的擴(kuò)展方法,觸發(fā)Continuation內(nèi)的方法resumeWith
public inline fun <T> Continuation<T>.resume(value: T): Unit =
resumeWith(Result.success(value))
DispatchedTask的run方法中調(diào)用了協(xié)程體類(lèi)對(duì)象的resume方法,間接調(diào)用了BaseContinuationImpl的resumeWith
注意:這里是協(xié)程體類(lèi)的resumeWith被執(zhí)行了,不是DispatchedContinuation
5.5 BaseContinuationImpl
internal abstract class BaseContinuationImpl(
// completion:實(shí)參是一個(gè)AbstractCoroutine
public val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable {
public final override fun resumeWith(result: Result<Any?>) {
var current = this
var param = result
while (true) {
probeCoroutineResumed(current)
with(current) {
val completion = completion!!
val outcome: Result<Any?> =
try {
// 調(diào)用invokeSuspend方法,協(xié)程體真正開(kāi)始執(zhí)行
val outcome = invokeSuspend(param)
// invokeSuspend方法返回值為COROUTINE_SUSPENDED,resumeWith方法被return,結(jié)束執(zhí)行,說(shuō)明執(zhí)行了掛起操作
if (outcome === COROUTINE_SUSPENDED) return
// 協(xié)程體執(zhí)行成功的結(jié)果
Result.success(outcome)
} catch (exception: Throwable) {
// 協(xié)程體出現(xiàn)異常的結(jié)果
Result.failure(exception)
}
releaseIntercepted() // this state machine instance is terminating
if (completion is BaseContinuationImpl) {
current = completion
param = outcome
} else {
// 在示例代碼中,completion是一個(gè)AbstractCoroutine,是指launch函數(shù)創(chuàng)建的StandaloneCoroutine
completion.resumeWith(outcome)
return
}
}
}
}
protected abstract fun invokeSuspend(result: Result<Any?>): Any?
}
BaseContinuationImpl定義了一個(gè)抽象方法invokeSuspend(),并重寫(xiě)了Continuation的resumeWith(),并在其中調(diào)用invokeSuspend(),具體實(shí)現(xiàn)就是SuspendLambda的invokeSuspend(),invokeSuspend()方法中便是具體協(xié)程任務(wù)。
invokeSuspend()和invoke啥關(guān)系??
5.6 協(xié)程啟動(dòng)流程總結(jié):
以調(diào)度器為Dispatchers.Default,啟動(dòng)模式為CoroutineStart.DEFAULT為例:
CoroutineScope#launch()創(chuàng)建一個(gè)協(xié)程,在其內(nèi)部實(shí)現(xiàn)中根據(jù)啟動(dòng)模式為CoroutineStart.DEFAULT,創(chuàng)建一個(gè)StandaloneCoroutine協(xié)程對(duì)象,并觸發(fā)StandaloneCoroutine#start(start, coroutine, block);
StandaloneCoroutine的父類(lèi)是AbstractCoroutine,StandaloneCoroutine#start()的實(shí)現(xiàn)在其父類(lèi)中,即AbstractCoroutine#start();
在AbstractCoroutine#start()中,觸發(fā)CoroutineStart#invoke();
CoroutineStart#invoke()的處理邏輯中,根據(jù)調(diào)度器為Dispatchers.Default,調(diào)用協(xié)程體的startCoroutineCancellable()方法;
startCoroutineCancellable()的內(nèi)部處理是一個(gè)鏈?zhǔn)秸{(diào)用:
createCoroutineUnintercepted(..).intercepted().resumeCancellableWith(Result.success(Unit))
createCoroutineUnintercepted()創(chuàng)建一個(gè)協(xié)程體類(lèi)對(duì)象;
intercepted()使用攔截器(調(diào)度器)將協(xié)程體類(lèi)對(duì)象包裝成DispatchedContinuation(DispatchedContinuation代理了協(xié)程體類(lèi)Continuation對(duì)象,并持有調(diào)度器);
調(diào)用DispatchedContinuation#resumeCancellableWith()。
在DispatchedContinuation#resumeCancellableWith()中,使用線程調(diào)度器觸發(fā)dispatcher#dispatch(context, this)進(jìn)行調(diào)度,該調(diào)度器為Dispatchers.Default;
Dispatchers.Default#dispatch()調(diào)度處理中,將DispatchedContinuation分發(fā)到CoroutineScheduler線程池中,由CoroutineScheduler分配一個(gè)線程Worker,最終在Woreder的run()方法中觸發(fā)了DispatchedContinuation的run(),其內(nèi)部實(shí)現(xiàn)是使協(xié)程體Continuation對(duì)象的resumeWithI()得以執(zhí)行,前文中分析到協(xié)程體的執(zhí)行其實(shí)就是resumeWith()方法被調(diào)用,這樣協(xié)程體就可以在執(zhí)行的線程中執(zhí)行了。
協(xié)程啟動(dòng)流程圖:
5.7 三個(gè)協(xié)程對(duì)象總結(jié):
1,第一層協(xié)程對(duì)象AbstractCoroutine,主要處理協(xié)程狀態(tài)和恢復(fù)掛起協(xié)程。
2,第二層對(duì)象BaseContinuationImpl,由編譯器將我們的代碼轉(zhuǎn)換而成,利用狀態(tài)機(jī)實(shí)現(xiàn)代碼分段執(zhí)行和協(xié)程掛起,并代理第一層對(duì)象。在其resumeWith方法中通過(guò)調(diào)用invokeSuspend()執(zhí)行我們的任務(wù)代碼。
注意:resumeWith方法中調(diào)用completion.resumeWith(outcome)是恢復(fù)協(xié)程,這個(gè)completion是AbstractCoroutine(launch對(duì)應(yīng)StandaloneCoroutine或者witchContext對(duì)應(yīng)DispatchedCoroutine),不是BaseContinuationImpl的子類(lèi)。
3,第三層對(duì)象DispatchedContinuation,對(duì)第二層協(xié)程對(duì)象進(jìn)行代理,負(fù)責(zé)使用dispatcher進(jìn)行調(diào)度任務(wù)。
注意:協(xié)程體類(lèi)是持有了一個(gè)AbstractCoroutine,不是繼承
6.協(xié)程掛起
withContext()
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
// 返回啟動(dòng)withContext的協(xié)程體
return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
// 構(gòu)建一個(gè)新的newContext,合并當(dāng)前協(xié)程體以及withContext協(xié)程體的CoroutineContext
val oldContext = uCont.context
val newContext = oldContext + context
// 檢查協(xié)程是否活躍,如果線程處于非活躍的狀態(tài),拋出cancle異常
newContext.checkCompletion()
...
// DispatchedCoroutine也是一個(gè)AbstractCoroutine對(duì)象,負(fù)責(zé)協(xié)程完成的回調(diào),
// 注意這里的Continuation的傳參為uCont,及發(fā)起withContext的協(xié)程對(duì)象
val coroutine = DispatchedCoroutine(newContext, uCont)
coroutine.initParentJob()
// 和協(xié)程啟動(dòng)的流程一樣,啟動(dòng)withContext的協(xié)程
// 注意這里的傳參coroutine為DispatchedCoroutine,它持有需要恢復(fù)的協(xié)程
block.startCoroutineCancellable(coroutine, coroutine)
// 返回結(jié)果為掛起還是完成
coroutine.getResult()
}
}
在withContext()的源碼可以看到,withContext()的協(xié)程體的啟動(dòng)和原有協(xié)程的啟動(dòng)流程是一樣的,DispatchedCoroutin是AbstractCoroutine的一個(gè)子類(lèi),并且在創(chuàng)建DispatchedCoroutin時(shí)的傳參是外層協(xié)程體對(duì)象,這是因?yàn)楫?dāng)withContext()的協(xié)程體完成的時(shí)候需要通過(guò)外層協(xié)程體對(duì)象恢復(fù)當(dāng)前協(xié)程的運(yùn)行。
先看下協(xié)程的掛起coroutine.getResult()的實(shí)現(xiàn)。
// DispatchedCoroutine#getResult
fun getResult(): Any? {
// 返回COROUTINE_SUSPENDED,掛起
if (trySuspend()) return COROUTINE_SUSPENDED
val state = this.state.unboxState()
// 出現(xiàn)異常
if (state is CompletedExceptionally) throw state.cause
@Suppress("UNCHECKED_CAST")
// 未出現(xiàn)異常結(jié)果返回
return state as T
}
// DispatchedCoroutine#trySuspend
private val _decision = atomic(UNDECIDED)
private fun trySuspend(): Boolean {
_decision.loop { decision ->
when (decision) {
// compareAndSet原子操作,當(dāng)前值與預(yù)期值一致時(shí)返回true,以原子方式更新自身的值
UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true
RESUMED -> return false
else -> error("Already suspended")
}
}
}
是否掛起,結(jié)束協(xié)程運(yùn)行,關(guān)鍵在是否返回COROUTINE_SUSPENDED標(biāo)志,在getResult()方法中的處理邏輯,就是看trySuspend()是否返回true。
trySuspend()方法中,_decision默認(rèn)為UNDECIDED,預(yù)期的參數(shù)值傳參也為UNDECIDED,所以,trySuspend返回true,最終getResult方法返回了COROUTINE_SUSPENDED,協(xié)程被掛起了。
7.協(xié)程恢復(fù)
withContext()啟動(dòng)一個(gè)協(xié)程和launch類(lèi)似,當(dāng)執(zhí)行到BaseContinuationImpl的resumeWith方法,調(diào)用invokeSuspend得到結(jié)果之后,會(huì)調(diào)用內(nèi)部代理的completion.resumeWith(outcome)方法,這個(gè)completion是DispatchedCoroutine。
DispatchedCoroutine是AbstractCoroutine的子類(lèi),當(dāng)協(xié)程完成時(shí)會(huì)調(diào)用它的內(nèi)部方法resumeWith(),內(nèi)部的處理邏輯最后會(huì)觸發(fā)JubSpuuort#afterCompletion(),而在DispatchedCoroutine中重寫(xiě)了afterCompletion()。
private class DispatchedCoroutine<in T>(
context: CoroutineContext,
// 外部需要恢復(fù)的協(xié)程
uCont: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {
override fun afterCompletion(state: Any?) {
afterResume(state)
}
override fun afterResume(state: Any?) {
// 在getResult()之前,協(xié)程已運(yùn)行結(jié)束,未發(fā)生掛起,不需要恢復(fù)外層協(xié)程
if (tryResume()) return
// 獲取外部協(xié)程的DispatchedContinuation,去恢復(fù)外層協(xié)程
uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
}
}
uCont.intercepted()獲取到外層協(xié)程的DispatchedContinuation,然后調(diào)用resumeCancellableWith方法,使用外層協(xié)程的dispatcher將任務(wù)的執(zhí)行切換到之前的線程中去執(zhí)行。再次調(diào)用到外層協(xié)程的BaseContinuationImpl#resumeWith方法,再次調(diào)用到外層協(xié)程類(lèi)的invokeSuspend方法中,去執(zhí)行剩余代碼。如果能直接結(jié)果就調(diào)用外層協(xié)程的completion.resumeWith(outcome)結(jié)束協(xié)程(completion是外層協(xié)類(lèi)對(duì)象中代理的AbstractCoroutine)