Kotlin協(xié)程原理解析

1. Kotlin協(xié)程作用

Kotlin協(xié)程是一套基于Java Thread的線程框架,最大的特點(diǎn)就是可以1,用同步的方式寫(xiě)出異步代碼,并且2,不阻塞當(dāng)前線程。

2. cps轉(zhuǎn)換

2.1 cps轉(zhuǎn)換示例

//編譯前
private suspend fun testCPS(): String {
     withContext(Dispatchers.IO) {
        return "testCPS"
       }
  }
 
//編譯后
private final Object testCPS(Continuation $completion) {
      return "testCPS";
   }

Kotlin 的編譯器檢測(cè)到 suspend 關(guān)鍵字修飾的函數(shù)后,會(huì)進(jìn)行cps轉(zhuǎn)換,轉(zhuǎn)換點(diǎn):

1,函數(shù)中增加了一個(gè)Continuation類(lèi)型的參數(shù);

2,函數(shù)返回值變?yōu)镺bject(本例String->Object)

注意:這里suspend函數(shù)即使沒(méi)有使用withContext開(kāi)啟一個(gè)協(xié)程,也會(huì)進(jìn)行cps轉(zhuǎn)換。說(shuō)明只要suspend函數(shù),不管開(kāi)不開(kāi)啟協(xié)程,編譯器都會(huì)對(duì)其進(jìn)行cps轉(zhuǎn)換。

2.2 參數(shù)Continuation

public interface Continuation<in T> {
    public val context: CoroutineContext
//      相當(dāng)于 onSuccess     結(jié)果   
//                 ↓         ↓
    public fun resumeWith(result: Result<T>)
}

interface CallBack {
    void onSuccess(String response);
}

1,Continuation :續(xù)體,可以理解為剩余要執(zhí)行的代碼。協(xié)程體中的異步操作被狀態(tài)機(jī)分割成不同的片段,分片段執(zhí)行,執(zhí)行完一部分,剩下的部分叫做續(xù)體。

2,Continuation 是一個(gè)接口,和一般回調(diào)接口定義類(lèi)似,可以判斷,協(xié)程的思想其實(shí)就是回調(diào)。

Continuation 定義看一個(gè)協(xié)程上下文屬性context,一個(gè)方法聲明resumeWith(),用于協(xié)程1,啟動(dòng)(DispatchedContinuation),2,掛起時(shí)恢復(fù)(BaseContinuationImpl),或者3,協(xié)程運(yùn)行完成時(shí)的回調(diào)(AbstractCoroutine);

2.2 返回值 Object

為什么返回值從String->Object?


public suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T {
    contract {
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)
    }
    return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
        ...
        val coroutine = DispatchedCoroutine(newContext, uCont)
        coroutine.initParentJob()
        block.startCoroutineCancellable(coroutine, coroutine)
        coroutine.getResult()
    }
}

fun getResult(): Any? {
        if (trySuspend()) return COROUTINE_SUSPENDED
        // otherwise, onCompletionInternal was already invoked & invoked tryResume, and the result is in the state
        val state = this.state.unboxState()
        if (state is CompletedExceptionally) throw state.cause
        @Suppress("UNCHECKED_CAST")
        return state as T
    }

可以看出withContext的返回值有兩種:

1,如果當(dāng)前withContext中的異步操作沒(méi)有完成,返回COROUTINE_SUSPENDED,協(xié)程框架根據(jù)這個(gè)字段掛起協(xié)程(其實(shí)就是直接return);

2,如果當(dāng)前withContext中的異步操作已經(jīng)完成,返回對(duì)應(yīng)操作執(zhí)行后的返回值(對(duì)應(yīng)String)

3. 協(xié)程狀態(tài)機(jī)

3.1 協(xié)程狀態(tài)機(jī)


//編譯前
class TestCoroutine {
    private fun startCoroutine() {
        // funTest協(xié)程體
        val funTest: suspend CoroutineScope.() -> Unit = {
            println("funTest")
            suspendFun1()
            suspendFun2()
        }
        GlobalScope.launch(Dispatchers.Default, block = funTest)
    }

    // 掛起函數(shù)
    suspend fun suspendFun1() {
        println("suspendFun1")
    }
    // 掛起函數(shù)
    suspend fun suspendFun2() {
        println("suspendFun2")
    }
}

//編譯后
public final class TestCoroutine {
   private final void startCoroutine() {
      Function2 funTest = (Function2)(new Function2((Continuation)null) {
         int label;

         @Nullable
         public final Object invokeSuspend(@NotNull Object $result) {
            Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            TestCoroutine var10000;
            switch(this.label) {
            case 0:
               ResultKt.throwOnFailure($result);
               String var2 = "funTest";
               boolean var3 = false;
               System.out.println(var2);
               var10000 = TestCoroutine.this;
               this.label = 1;
               if (var10000.suspendFun1(this) == var4) {
                  return var4;
               }
               break;
            case 1:
               ResultKt.throwOnFailure($result);
               break;
            case 2:
               ResultKt.throwOnFailure($result);
               return Unit.INSTANCE;
            default:
               throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }

            var10000 = TestCoroutine.this;
            this.label = 2;
            if (var10000.suspendFun2(this) == var4) {
               return var4;
            } else {
               return Unit.INSTANCE;
            }
         }

         @NotNull
         public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
            Intrinsics.checkNotNullParameter(completion, "completion");
            Function2 var3 = new <anonymous constructor>(completion);
            return var3;
         }

         public final Object invoke(Object var1, Object var2) {
            return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
         }
      });
      BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)Dispatchers.getDefault(), (CoroutineStart)null, funTest, 2, (Object)null);
   }

   @Nullable
   public final Object suspendFun1(@NotNull Continuation $completion) {
      System.out.println(""suspendFun1"");
      return Unit.INSTANCE;
   }

   @Nullable
   public final Object suspendFun2(@NotNull Continuation $completion) {
      System.out.println("suspendFun2");
      return Unit.INSTANCE;
   }
}

(其他文章描述)在反編譯的代碼中,協(xié)程體funTest被編譯成一個(gè)繼承SuspendLambda的類(lèi),在類(lèi)中實(shí)現(xiàn)create(),invokeSuspend()兩個(gè)方法:

create()創(chuàng)建了一個(gè)協(xié)程體funTest類(lèi)的實(shí)例;

invokeSuspend()方法執(zhí)行具體的協(xié)程操作。

(我反編譯的代碼)沒(méi)搞懂的地方:

1,只創(chuàng)建了一個(gè)Function2的對(duì)象;

2,create方法創(chuàng)建的是匿名對(duì)象(anonymous constructor);

3,invoke方法是什么作用

基本原理:

協(xié)程體會(huì)被編譯成一個(gè)SuspendLambda的子類(lèi),在這個(gè)類(lèi)的invokeSuspend方法中,協(xié)程體中的suspend方法會(huì)被分割到switch不同的分支中,每個(gè)suspend方法會(huì)被cps機(jī)制轉(zhuǎn)換成帶有一個(gè)Continuation參數(shù)的方法。

通過(guò)一個(gè)label標(biāo)簽控制分支代碼執(zhí)行,label為0,首先會(huì)進(jìn)入第一個(gè)分支,首先將label設(shè)置為下一個(gè)分支的數(shù)值,然后執(zhí)行第一個(gè)suspend方法并傳遞當(dāng)前Continuation,得到返回值,如果是COROUTINE_SUSPENDED,協(xié)程框架就直接return,協(xié)程掛起,當(dāng)?shù)谝粋€(gè)suspend方法執(zhí)行完成,會(huì)回調(diào)Continuation的invokeSuspend方法,進(jìn)入第二個(gè)分支執(zhí)行,以此類(lèi)推執(zhí)行完所有suspend方法。

如果suspend方法直接返回執(zhí)行結(jié)果,那invokeSuspend后面的代碼怎么執(zhí)行?

3.2 SuspendLambda類(lèi)圖

image.png

4 協(xié)程相關(guān)概念

4.1 CoroutineContext

launch函數(shù)是CoroutineScope的一個(gè)擴(kuò)展函數(shù),CoroutineScope只是一個(gè)接口,但是可以通過(guò)CoroutineScope的擴(kuò)展方法進(jìn)行協(xié)程的創(chuàng)建,除了launch函數(shù)還有async函數(shù)。

CoroutineScope除了通過(guò)擴(kuò)展函數(shù)創(chuàng)建協(xié)程還有其它兩個(gè)作用,launch函數(shù)返回一個(gè)Job對(duì)象,可以通過(guò)這個(gè)Job管理協(xié)程,另外CoroutineScope為協(xié)程提供一個(gè)上下文CoroutineContext。

CoroutineContext協(xié)程的上下文,這是一個(gè)數(shù)據(jù)集合接口聲明,協(xié)程中Job、Dispatcher調(diào)度器都可以是它的元素,CoroutineContext有一個(gè)非常好的作用就是我們可以通過(guò)它拿到Job、Dispatcher調(diào)度器等數(shù)據(jù)。

CombinedContext是CoroutineContext接口的具體實(shí)現(xiàn)類(lèi),存在兩個(gè)屬性,其中element是一個(gè)Element,代表集合的元素,left是一個(gè)CoroutineContext,代表鏈表的下一個(gè)節(jié)點(diǎn)。

通過(guò)CoroutineContext#plus可以看出,CoroutineContext的數(shù)據(jù)存儲(chǔ)方式是一個(gè)左向鏈表,鏈表的每一個(gè)節(jié)點(diǎn)是CombinedContext,并且存在攔截器的情況下,攔截器永遠(yuǎn)是鏈表尾部的元素,這樣設(shè)計(jì)目的是因?yàn)閿r截器的使用頻率很高,為了更快的讀取攔截器;

沒(méi)看懂這個(gè)左向鏈表實(shí)現(xiàn),現(xiàn)在只要知道這是個(gè)集合,類(lèi)似list,但是它有一個(gè)left元素始終在表尾,存儲(chǔ)攔截器,

4.2 CoroutineStart 啟動(dòng)模式

CoroutineStart 是協(xié)程的啟動(dòng)模式,存在以下4種模式:

DEFAULT 立即調(diào)度,可以在執(zhí)行前被取消
LAZY 需要時(shí)才啟動(dòng),需要start、join等函數(shù)觸發(fā)才可進(jìn)行調(diào)度
ATOMIC 立即調(diào)度,協(xié)程肯定會(huì)執(zhí)行,執(zhí)行前不可以被取消
UNDISPATCHED 立即在當(dāng)前線程執(zhí)行,直到遇到第一個(gè)掛起點(diǎn)(可能切線程)

5. 協(xié)程啟動(dòng)


// launch是CoroutineScope的一個(gè)擴(kuò)展函數(shù)  
 public fun CoroutineScope.launch(  
     context: CoroutineContext = EmptyCoroutineContext,  
     start: CoroutineStart = CoroutineStart.DEFAULT,  
     block: suspend CoroutineScope.()
 ): Job {  
     // CoroutineContext創(chuàng)建一個(gè)新的Context
     val newContext = newCoroutineContext(context) 
     // 啟動(dòng)模式的判斷
     val coroutine = if (start.isLazy)  
         LazyStandaloneCoroutine(newContext, block) else  
         StandaloneCoroutine(newContext, active = true)  
     coroutine.start(start, coroutine, block)  
     return coroutine  
 }

launch函數(shù)存在3個(gè)參數(shù):

CoroutineContext 協(xié)程的上下文

CoroutineStart 協(xié)程的啟動(dòng)模式

suspend CoroutineScope.() -> Unit 協(xié)程體

newCoroutineContext()是CoroutineScope的一個(gè)擴(kuò)展方法,它的作用就是將傳參context與CoroutineScope中的CoroutineContext集合合并,并返回一個(gè)新的CoroutineContext,如果傳入Dispatchers.Default,就是將Dispatchers.Default與CoroutineScope中的CoroutineContext合并。

根據(jù)啟動(dòng)模式,構(gòu)建一個(gè)AbstractCoroutine的子類(lèi)(協(xié)程對(duì)象都繼承AbstractCoroutine),如果是默認(rèn)模式,則創(chuàng)建StandaloneCoroutine,并調(diào)用它的start方法。并將StandaloneCoroutine又作為job返回。

5.1 AbstractCoroutine

image.png

AbstractCoroutine繼承或者實(shí)現(xiàn)了JobSupport、Job、Continuation、CoroutineScope。

JobSupport是Job的具體實(shí)現(xiàn),AbstractCoroutine可以作為一個(gè)Job控制協(xié)程的生命周期,同時(shí)實(shí)現(xiàn)Continuation接口,也可以作為一個(gè)Continuation,重寫(xiě)的resmueWith()方法的一個(gè)重要作用是恢復(fù)協(xié)程

AbstractCoroutine#resmueWith
public final override fun resumeWith(result: Result<T>) {  
         val state = makeCompletingOnce(result.toState())  
         // 子協(xié)程未完成,父協(xié)程需要等待子協(xié)程完成之后才可以完成  
         if (state === COMPLETING_WAITING_CHILDREN) return  
         // 子協(xié)程全部執(zhí)行完成或者沒(méi)有子協(xié)程的情況下不需要等待  
         afterResume(state)  
     }  
       
  protected open fun afterResume(state: Any?): Unit = afterCompletion(state)  
    
  // JobSupport#afterCompletion  
  protected open fun afterCompletion(state: Any?) {}  

在AbstractCoroutine#resmueWith中首先根據(jù)JobSupport#makeCompletingOnce返回狀態(tài)判斷,協(xié)程是否處于等待子協(xié)程完成的狀態(tài):

state == COMPLETING_WAITING_CHILDREN 等待子協(xié)程完成,自身才可完成。子協(xié)程完成后,觸發(fā)afterCompletion()

state != COMPLETING_WAITING_CHILDREN 沒(méi)有子協(xié)程或者所有子協(xié)程已經(jīng)完成,自身可以完成,直接觸發(fā)afterCompletion()

協(xié)程對(duì)象可以通過(guò)重寫(xiě)afterCompletion()處理協(xié)程完成之后的操作,下文中的協(xié)程恢復(fù)章節(jié)中,withContext()中DispatchedCoroutine協(xié)程對(duì)象,通過(guò)afterCompletion()恢復(fù)了外層的協(xié)程的運(yùn)行。

AbstractCoroutine#start()
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {  
       ...  
       //block :協(xié)程體  //receiver:協(xié)程對(duì)象  //this:AbstractCoroutine(也是協(xié)程對(duì)象)
       start(block, receiver, this)  
   }  
     
CoroutineStart#invoke
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =  
    when (this) {  
        //completion:start傳過(guò)來(lái)的AbstractCoroutine  
        DEFAULT -> block.startCoroutineCancellable(receiver, completion)  
        ATOMIC -> block.startCoroutine(receiver, completion)  
        UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion) 
        LAZY -> Unit // will start lazily  
    }  
    
    
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
    receiver: R, completion: Continuation<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
) =
    runSafely(completion) {
        createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
    }

AbstractCoroutine#start()調(diào)用了CoroutineStart的invoke()方法,然后根據(jù)啟動(dòng)模式調(diào)用block對(duì)應(yīng)的協(xié)程啟動(dòng)方法,block.startCoroutineCancellable(receiver, completion) 中是一個(gè)鏈?zhǔn)秸{(diào)用流程。

createCoroutineUnintercepted() 
 public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(  
     completion: Continuation<T>  
 ): Continuation<Unit> {  
     // probeCompletion :AbstractCoroutine  
     val probeCompletion = probeCoroutineCreated(completion)  
     return if (this is BaseContinuationImpl)  
         create(probeCompletion)  
     else  
         createCoroutineFromSuspendFunction(probeCompletion) {  
             (this as Function1<Continuation<T>, Any?>).invoke(it)  
         }  
 }  
 
 
// continuation:AbstractCoroutine  
public final Continuation<Unit> create(Object obj, Continuation<?> continuation) {  
         ...  
         MainActivity$startCoroutine$funTest$1 mainActivity$startCoroutine$funTest$1 = new MainActivity$startCoroutine$funTest$1(this.this$0, continuation);  
         return mainActivity$startCoroutine$funTest$1;  
     }  

createCoroutineUnintercepted中通過(guò)調(diào)用create(probeCompletion)創(chuàng)建了一個(gè)協(xié)程體類(lèi)的對(duì)象。

createCoroutineUnintercepted()是一個(gè)擴(kuò)展函數(shù),通過(guò)協(xié)程體block調(diào)用,所以源碼中this is BaseContinuationImpl的判斷中this指協(xié)程體類(lèi),編譯章節(jié)中協(xié)程體被編譯成SuspendLambda的子類(lèi);這里的create函數(shù)就是SuspendLambda的子類(lèi)中的create函數(shù)。

這里的block對(duì)象和協(xié)程體類(lèi)對(duì)象是什么關(guān)系?有什么區(qū)別?

注意看下構(gòu)造函數(shù)的參數(shù)continuation,這里continuation就是AbstractCoroutine,在協(xié)程體類(lèi)的繼承鏈中,這個(gè)continuation一直傳遞到了BaseContinuationImpl父類(lèi)中,用于后續(xù)協(xié)程的恢復(fù)。

注意:這里將AbstractCoroutine對(duì)象傳遞給了協(xié)程類(lèi)對(duì)象,進(jìn)行了第一層代理。

繼續(xù)分析intercepted()

public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =  
     (this as? ContinuationImpl)?.intercepted() ?: this  
     
ContinuationImpl#intercepted
public fun intercepted(): Continuation<Any?> =  
        intercepted  
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this) .also { intercepted = it } 
       
CoroutineDispatcher#interceptContinuation
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        //this是Dispatcher,continuation是協(xié)程體類(lèi)對(duì)象
        DispatchedContinuation(this, continuation)

intercepted()協(xié)程體類(lèi)對(duì)象轉(zhuǎn)換成ContinuationImpl,然后調(diào)用了ContinuationImpl的intercepted方法,intercepted方法中調(diào)用context[ContinuationInterceptor] 從協(xié)程類(lèi)對(duì)象的CoroutineContext集合中取到調(diào)度器CoroutineDispatcher(這個(gè)CoroutineContext是launch是構(gòu)建的,并傳遞到StandaloneCoroutine對(duì)象中),并調(diào)用調(diào)度器CoroutineDispatcher的interceptContinuation(),interceptContinuation()的作用是將協(xié)程體Continuation對(duì)象包裝成一個(gè)DispatchedContinuation。

注意:這里將協(xié)程類(lèi)對(duì)象傳遞給了DispatchedContinuationd對(duì)象,進(jìn)行了第二層代理

5.2 CoroutineDispatcher

CoroutineDispatcher 
public abstract class CoroutineDispatcher :  
     AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {  
     ...  
     // 是否需要線程調(diào)度  
     public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true  
    // 線程調(diào)度,讓一個(gè)runable對(duì)象在指定線程運(yùn)行  
    public abstract fun dispatch(context: CoroutineContext, block: Runnable)  
            // 將協(xié)程體對(duì)象continuation封裝為一個(gè)DispatchedContinuation對(duì)象  
     public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =  
         DispatchedContinuation(this, continuation)     
            ...  
 }  

CoroutineDispatcher的作用是進(jìn)行任務(wù)的線程切換。

image.png

CoroutineDispatcher實(shí)現(xiàn)了ContinuationInterceptor,代表是一個(gè)攔截器;

實(shí)現(xiàn)CoroutineContext接口,存儲(chǔ)在CoroutineContext的left節(jié)點(diǎn)。

CoroutineContext[ContinuationInterceptor]就可以在CoroutineContext集合中獲取到攔截器。

為啥要通過(guò)攔截器去代理Continuation,直接使用DispatchedContinuation包裝不就行了嗎??

5.3 DispatchedContinuation

接著分析resumeCancellableWith(Result.success(Unit), onCancellation)

DispatchedContinuation 
internal class DispatchedContinuation<in T>(  
     // 調(diào)度器  
     @JvmField val dispatcher: CoroutineDispatcher,  
     // 協(xié)程體Continuation對(duì)象  
     @JvmField val continuation: Continuation<T>  
 ) : DispatchedTask<T>(MODE_ATOMIC_DEFAULT), CoroutineStackFrame, Continuation<T> by continuation {  
      
     // 使用delegate存儲(chǔ)當(dāng)前對(duì)象  
     override val delegate: Continuation<T>  
         get() = this  
         
     // ATOMIC啟動(dòng)模式  
     override fun resumeWith(result: Result<T>) {  
         val context = continuation.context  
         val state = result.toState()  
         // 是否需要線程調(diào)度  
         if (dispatcher.isDispatchNeeded(context)) {  
             _state = state  
             resumeMode = MODE_ATOMIC_DEFAULT  
             // dispatch 調(diào)度線程,第二個(gè)參數(shù)是一個(gè)Runnable類(lèi)型,這里傳參this也就是DispatchedContinuation自身  
             // DispatchedContinuation實(shí)際上也是一個(gè)Runnable對(duì)象,調(diào)用調(diào)度器的dispatch方法之后就可以使這個(gè)runnable在指定的線程運(yùn)行了  
             dispatcher.dispatch(context, this)  
         } else {  
             executeUnconfined(state, MODE_ATOMIC_DEFAULT) {  
                 withCoroutineContext(this.context, countOrElement) {  
                     // 不需要調(diào)度,執(zhí)行協(xié)程體的resumeWith  
                     continuation.resumeWith(result)  
                 }  
             }  
         }  
     }  
      // 默認(rèn)啟動(dòng)模式  
      inline fun resumeCancellableWith(result: Result<T>) {  
         val state = result.toState()  
         if (dispatcher.isDispatchNeeded(context)) {  
             _state = state  
             resumeMode = MODE_CANCELLABLE  
             dispatcher.dispatch(context, this)  
         } else {  
             executeUnconfined(state, MODE_CANCELLABLE) {  
                 if (!resumeCancelled()) {  
                     resumeUndispatchedWith(result)  
                 }  
             }  
         }  
     }  
 }  
image.png

DispatchedContinuation 代理協(xié)程體類(lèi)對(duì)象(SuspendLambda)并持有線程調(diào)度器(CoroutineDispatcher),它的作用就是使用線程調(diào)度器將協(xié)程體調(diào)度到指定的線程執(zhí)行。

DispatchedContinuation也實(shí)現(xiàn)了Continuation接口,并重寫(xiě)resumeWith(),首先
根據(jù)dispatcher.isDispatchNeeded(context)判斷需要線程切換:

1.如果需要線程調(diào)度,則調(diào)用dispatcher#dispatch進(jìn)行調(diào)度,而dispatch()的第二個(gè)參數(shù)是一個(gè)runnable對(duì)象(這里傳參為this,即DispatchedContinuation對(duì)象本身,DispatchedContinuation同時(shí)還實(shí)現(xiàn)了Runnable接口),這個(gè)runnable就會(huì)運(yùn)行在調(diào)度的線程上;

2.不需要調(diào)度則直接調(diào)用協(xié)程體類(lèi)continuation對(duì)象的resumeWith(),前面的章節(jié)中提到,協(xié)程體的運(yùn)行就是協(xié)程體類(lèi)Continuation對(duì)象的resumeWith()被觸發(fā),所以這里就會(huì)讓協(xié)程體在當(dāng)前線程運(yùn)行;

另外還有一個(gè)方法resumeCancellableWith(),它和resumeWith()的實(shí)現(xiàn)很類(lèi)似,在不同的啟動(dòng)模式下調(diào)度線程的方法調(diào)用不同。比如默認(rèn)的啟動(dòng)模式調(diào)用resumeCancellableWith(),ATOMIC啟動(dòng)模式則調(diào)用resumeWith()。

public actual object Dispatchers {
   
    @JvmStatic
    public actual val Default: CoroutineDispatcher = createDefaultDispatcher()

    @JvmStatic
    public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
    
    @JvmStatic
    public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
  
    @JvmStatic
    public val IO: CoroutineDispatcher = DefaultScheduler.IO
}

internal actual fun createDefaultDispatcher(): CoroutineDispatcher =
    if (useCoroutinesScheduler) DefaultScheduler else CommonPool

可以看到Default和IO 底層還是使用了線程池進(jìn)行調(diào)度;Main使用了handler進(jìn)行調(diào)度。

5.4 DispatchedTask

internal abstract class DispatchedTask<in T>(  
     @JvmField public var resumeMode: Int  
 ) : SchedulerTask() {  
 ’  
     // 在DispatchedContinuation中重寫(xiě)了該屬性,delegate實(shí)際是指DispatchedContinuation對(duì)象  
     internal abstract val delegate: Continuation<T>  
   
     public final override fun run() {  
             ...  
             val delegate = delegate as DispatchedContinuation<T>  
             // 通過(guò)delegate拿到原始協(xié)程體Continuation對(duì)象  
             val continuation = delegate.continuation  
             ...  
             // 調(diào)用協(xié)程體類(lèi)對(duì)象的resume  
             continuation.resume(getSuccessfulResult(state))  
             ...  
     }  
 }     

 // Continuation的擴(kuò)展方法,觸發(fā)Continuation內(nèi)的方法resumeWith  
 public inline fun <T> Continuation<T>.resume(value: T): Unit =  
     resumeWith(Result.success(value)) 

DispatchedTask的run方法中調(diào)用了協(xié)程體類(lèi)對(duì)象的resume方法,間接調(diào)用了BaseContinuationImpl的resumeWith

注意:這里是協(xié)程體類(lèi)的resumeWith被執(zhí)行了,不是DispatchedContinuation

5.5 BaseContinuationImpl

internal abstract class BaseContinuationImpl(  
     // completion:實(shí)參是一個(gè)AbstractCoroutine  
     public val completion: Continuation<Any?>?  
 ) : Continuation<Any?>, CoroutineStackFrame, Serializable {  
     public final override fun resumeWith(result: Result<Any?>) {  
         var current = this  
         var param = result  
         while (true) {  
             probeCoroutineResumed(current)  
             with(current) {  
                 val completion = completion!!   
                 val outcome: Result<Any?> =  
                     try {  
                         // 調(diào)用invokeSuspend方法,協(xié)程體真正開(kāi)始執(zhí)行  
                         val outcome = invokeSuspend(param)  
                         // invokeSuspend方法返回值為COROUTINE_SUSPENDED,resumeWith方法被return,結(jié)束執(zhí)行,說(shuō)明執(zhí)行了掛起操作  
                         if (outcome === COROUTINE_SUSPENDED) return  
                         // 協(xié)程體執(zhí)行成功的結(jié)果  
                         Result.success(outcome)  
                     } catch (exception: Throwable) {  
                         // 協(xié)程體出現(xiàn)異常的結(jié)果  
                         Result.failure(exception)  
                     }  
                 releaseIntercepted() // this state machine instance is terminating  
                  
                 if (completion is BaseContinuationImpl) {               
                     current = completion  
                     param = outcome  
                 } else {  
                     // 在示例代碼中,completion是一個(gè)AbstractCoroutine,是指launch函數(shù)創(chuàng)建的StandaloneCoroutine  
                     completion.resumeWith(outcome)  
                     return  
                 }  
             }  
         }  
     } 
     
    protected abstract fun invokeSuspend(result: Result<Any?>): Any?  
    
 }

BaseContinuationImpl定義了一個(gè)抽象方法invokeSuspend(),并重寫(xiě)了Continuation的resumeWith(),并在其中調(diào)用invokeSuspend(),具體實(shí)現(xiàn)就是SuspendLambda的invokeSuspend(),invokeSuspend()方法中便是具體協(xié)程任務(wù)。

invokeSuspend()和invoke啥關(guān)系??

5.6 協(xié)程啟動(dòng)流程總結(jié):

以調(diào)度器為Dispatchers.Default,啟動(dòng)模式為CoroutineStart.DEFAULT為例:

  1. CoroutineScope#launch()創(chuàng)建一個(gè)協(xié)程,在其內(nèi)部實(shí)現(xiàn)中根據(jù)啟動(dòng)模式為CoroutineStart.DEFAULT,創(chuàng)建一個(gè)StandaloneCoroutine協(xié)程對(duì)象,并觸發(fā)StandaloneCoroutine#start(start, coroutine, block);

  2. StandaloneCoroutine的父類(lèi)是AbstractCoroutine,StandaloneCoroutine#start()的實(shí)現(xiàn)在其父類(lèi)中,即AbstractCoroutine#start();

  3. 在AbstractCoroutine#start()中,觸發(fā)CoroutineStart#invoke();

  4. CoroutineStart#invoke()的處理邏輯中,根據(jù)調(diào)度器為Dispatchers.Default,調(diào)用協(xié)程體的startCoroutineCancellable()方法;

  5. startCoroutineCancellable()的內(nèi)部處理是一個(gè)鏈?zhǔn)秸{(diào)用:

createCoroutineUnintercepted(..).intercepted().resumeCancellableWith(Result.success(Unit))

createCoroutineUnintercepted()創(chuàng)建一個(gè)協(xié)程體類(lèi)對(duì)象;
intercepted()使用攔截器(調(diào)度器)將協(xié)程體類(lèi)對(duì)象包裝成DispatchedContinuation(DispatchedContinuation代理了協(xié)程體類(lèi)Continuation對(duì)象,并持有調(diào)度器);
調(diào)用DispatchedContinuation#resumeCancellableWith()。

  1. 在DispatchedContinuation#resumeCancellableWith()中,使用線程調(diào)度器觸發(fā)dispatcher#dispatch(context, this)進(jìn)行調(diào)度,該調(diào)度器為Dispatchers.Default;

  2. Dispatchers.Default#dispatch()調(diào)度處理中,將DispatchedContinuation分發(fā)到CoroutineScheduler線程池中,由CoroutineScheduler分配一個(gè)線程Worker,最終在Woreder的run()方法中觸發(fā)了DispatchedContinuation的run(),其內(nèi)部實(shí)現(xiàn)是使協(xié)程體Continuation對(duì)象的resumeWithI()得以執(zhí)行,前文中分析到協(xié)程體的執(zhí)行其實(shí)就是resumeWith()方法被調(diào)用,這樣協(xié)程體就可以在執(zhí)行的線程中執(zhí)行了。

協(xié)程啟動(dòng)流程圖:

協(xié)程啟動(dòng)流程圖.png

5.7 三個(gè)協(xié)程對(duì)象總結(jié):

image.png
image.png

1,第一層協(xié)程對(duì)象AbstractCoroutine,主要處理協(xié)程狀態(tài)和恢復(fù)掛起協(xié)程。

2,第二層對(duì)象BaseContinuationImpl,由編譯器將我們的代碼轉(zhuǎn)換而成,利用狀態(tài)機(jī)實(shí)現(xiàn)代碼分段執(zhí)行和協(xié)程掛起,并代理第一層對(duì)象。在其resumeWith方法中通過(guò)調(diào)用invokeSuspend()執(zhí)行我們的任務(wù)代碼。

注意:resumeWith方法中調(diào)用completion.resumeWith(outcome)是恢復(fù)協(xié)程,這個(gè)completion是AbstractCoroutine(launch對(duì)應(yīng)StandaloneCoroutine或者witchContext對(duì)應(yīng)DispatchedCoroutine),不是BaseContinuationImpl的子類(lèi)。

3,第三層對(duì)象DispatchedContinuation,對(duì)第二層協(xié)程對(duì)象進(jìn)行代理,負(fù)責(zé)使用dispatcher進(jìn)行調(diào)度任務(wù)。

注意:協(xié)程體類(lèi)是持有了一個(gè)AbstractCoroutine,不是繼承

6.協(xié)程掛起

withContext()

 public suspend fun <T> withContext(  
     context: CoroutineContext,  
     block: suspend CoroutineScope.() -> T  
 ): T {  
     contract {  
         callsInPlace(block, InvocationKind.EXACTLY_ONCE)  
     }  
     // 返回啟動(dòng)withContext的協(xié)程體  
     return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->  
         // 構(gòu)建一個(gè)新的newContext,合并當(dāng)前協(xié)程體以及withContext協(xié)程體的CoroutineContext  
         val oldContext = uCont.context  
         val newContext = oldContext + context  
         // 檢查協(xié)程是否活躍,如果線程處于非活躍的狀態(tài),拋出cancle異常  
         newContext.checkCompletion()  
         ...  
         // DispatchedCoroutine也是一個(gè)AbstractCoroutine對(duì)象,負(fù)責(zé)協(xié)程完成的回調(diào),  
         // 注意這里的Continuation的傳參為uCont,及發(fā)起withContext的協(xié)程對(duì)象  
         val coroutine = DispatchedCoroutine(newContext, uCont)  
         coroutine.initParentJob()  
                  // 和協(xié)程啟動(dòng)的流程一樣,啟動(dòng)withContext的協(xié)程  
         // 注意這里的傳參coroutine為DispatchedCoroutine,它持有需要恢復(fù)的協(xié)程  
         block.startCoroutineCancellable(coroutine, coroutine)  
         // 返回結(jié)果為掛起還是完成  
         coroutine.getResult()  
     }  
 }  

在withContext()的源碼可以看到,withContext()的協(xié)程體的啟動(dòng)和原有協(xié)程的啟動(dòng)流程是一樣的,DispatchedCoroutin是AbstractCoroutine的一個(gè)子類(lèi),并且在創(chuàng)建DispatchedCoroutin時(shí)的傳參是外層協(xié)程體對(duì)象,這是因?yàn)楫?dāng)withContext()的協(xié)程體完成的時(shí)候需要通過(guò)外層協(xié)程體對(duì)象恢復(fù)當(dāng)前協(xié)程的運(yùn)行。

先看下協(xié)程的掛起coroutine.getResult()的實(shí)現(xiàn)。

 // DispatchedCoroutine#getResult  
  fun getResult(): Any? {  
        // 返回COROUTINE_SUSPENDED,掛起  
        if (trySuspend()) return COROUTINE_SUSPENDED  
        val state = this.state.unboxState()  
        // 出現(xiàn)異常  
        if (state is CompletedExceptionally) throw state.cause  
        @Suppress("UNCHECKED_CAST")  
        // 未出現(xiàn)異常結(jié)果返回  
        return state as T  
    }  
    
  // DispatchedCoroutine#trySuspend  
  private val _decision = atomic(UNDECIDED)  
  private fun trySuspend(): Boolean {  
        _decision.loop { decision ->  
            when (decision) {  
                // compareAndSet原子操作,當(dāng)前值與預(yù)期值一致時(shí)返回true,以原子方式更新自身的值  
                UNDECIDED -> if (this._decision.compareAndSet(UNDECIDED, SUSPENDED)) return true  
                RESUMED -> return false  
                else -> error("Already suspended")  
            }  
        }  
    } 

是否掛起,結(jié)束協(xié)程運(yùn)行,關(guān)鍵在是否返回COROUTINE_SUSPENDED標(biāo)志,在getResult()方法中的處理邏輯,就是看trySuspend()是否返回true。

trySuspend()方法中,_decision默認(rèn)為UNDECIDED,預(yù)期的參數(shù)值傳參也為UNDECIDED,所以,trySuspend返回true,最終getResult方法返回了COROUTINE_SUSPENDED,協(xié)程被掛起了。

image.png

7.協(xié)程恢復(fù)

withContext()啟動(dòng)一個(gè)協(xié)程和launch類(lèi)似,當(dāng)執(zhí)行到BaseContinuationImpl的resumeWith方法,調(diào)用invokeSuspend得到結(jié)果之后,會(huì)調(diào)用內(nèi)部代理的completion.resumeWith(outcome)方法,這個(gè)completion是DispatchedCoroutine。

image.png

DispatchedCoroutine是AbstractCoroutine的子類(lèi),當(dāng)協(xié)程完成時(shí)會(huì)調(diào)用它的內(nèi)部方法resumeWith(),內(nèi)部的處理邏輯最后會(huì)觸發(fā)JubSpuuort#afterCompletion(),而在DispatchedCoroutine中重寫(xiě)了afterCompletion()。

private class DispatchedCoroutine<in T>(  
     context: CoroutineContext,  
     // 外部需要恢復(fù)的協(xié)程  
     uCont: Continuation<T>  
 ) : ScopeCoroutine<T>(context, uCont) {  
   
     override fun afterCompletion(state: Any?) {  
         afterResume(state)  
     }  
   
     override fun afterResume(state: Any?) {  
         // 在getResult()之前,協(xié)程已運(yùn)行結(jié)束,未發(fā)生掛起,不需要恢復(fù)外層協(xié)程  
         if (tryResume()) return   
         // 獲取外部協(xié)程的DispatchedContinuation,去恢復(fù)外層協(xié)程  
         uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))  
     }  
 }

uCont.intercepted()獲取到外層協(xié)程的DispatchedContinuation,然后調(diào)用resumeCancellableWith方法,使用外層協(xié)程的dispatcher將任務(wù)的執(zhí)行切換到之前的線程中去執(zhí)行。再次調(diào)用到外層協(xié)程的BaseContinuationImpl#resumeWith方法,再次調(diào)用到外層協(xié)程類(lèi)的invokeSuspend方法中,去執(zhí)行剩余代碼。如果能直接結(jié)果就調(diào)用外層協(xié)程的completion.resumeWith(outcome)結(jié)束協(xié)程(completion是外層協(xié)類(lèi)對(duì)象中代理的AbstractCoroutine)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。
禁止轉(zhuǎn)載,如需轉(zhuǎn)載請(qǐng)通過(guò)簡(jiǎn)信或評(píng)論聯(lián)系作者。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容