Kotlin協(xié)程實現(xiàn)原理:ContinuationInterceptor&CoroutineDispatcher

今天我們來聊聊Kotlin的協(xié)程Coroutine。

如果你還沒有接觸過協(xié)程,推薦你先閱讀這篇入門級文章What? 你還不知道Kotlin Coroutine?

如果你已經(jīng)接觸過協(xié)程,但對協(xié)程的原理存在疑惑,那么在閱讀本篇文章之前推薦你先閱讀下面的文章,這樣能讓你更全面更順暢的理解這篇文章。

Kotlin協(xié)程實現(xiàn)原理:Suspend&CoroutineContext

Kotlin協(xié)程實現(xiàn)原理:CoroutineScope&Job

如果你已經(jīng)接觸過協(xié)程,相信你都有過以下幾個疑問:

  1. 協(xié)程到底是個什么東西?
  2. 協(xié)程的suspend有什么作用,工作原理是怎樣的?
  3. 協(xié)程中的一些關(guān)鍵名稱(例如:Job、CoroutineDispatcherCoroutineContextCoroutineScope)它們之間到底是怎么樣的關(guān)系?
  4. 協(xié)程的所謂非阻塞式掛起與恢復又是什么?
  5. 協(xié)程的內(nèi)部實現(xiàn)原理是怎么樣的?
  6. ...

接下來的一些文章試著來分析一下這些疑問,也歡迎大家一起加入來討論。

ContinuationInterceptor

看到Interceptor相信第一印象應(yīng)該就是攔截器,例如在Okhttp中被廣泛應(yīng)用。自然在協(xié)程中ContinuationInterceptor的作用也是用來做攔截協(xié)程的。

下面來看下它的實現(xiàn)。

public interface ContinuationInterceptor : CoroutineContext.Element {
    /**
     * The key that defines *the* context interceptor.
     */
    companion object Key : CoroutineContext.Key<ContinuationInterceptor>

    /**
     * Returns continuation that wraps the original [continuation], thus intercepting all resumptions.
     * This function is invoked by coroutines framework when needed and the resulting continuations are
     * cached internally per each instance of the original [continuation].
     *
     * This function may simply return original [continuation] if it does not want to intercept this particular continuation.
     *
     * When the original [continuation] completes, coroutine framework invokes [releaseInterceptedContinuation]
     * with the resulting continuation if it was intercepted, that is if `interceptContinuation` had previously
     * returned a different continuation instance.
     */
    public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>

    ...
}

只給出了關(guān)鍵部分,ContinuationInterceptor繼承于CoroutineContext.Element,所以它也是CoroutineContext,同時提供了interceptContinuation方法,先記住這個方法后續(xù)會用到。

大家是否還記得在Kotlin協(xié)程實現(xiàn)原理系列的第一篇文章中,我們分析了CoroutineContext的內(nèi)部結(jié)構(gòu),當時提到了它的plus方法,就是下面這段代碼

public operator fun plus(context: CoroutineContext): CoroutineContext =
    if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
        context.fold(this) { acc, element ->
            val removed = acc.minusKey(element.key)
            if (removed === EmptyCoroutineContext) element else {
                // make sure interceptor is always last in the context (and thus is fast to get when present)
                val interceptor = removed[ContinuationInterceptor]
                if (interceptor == null) CombinedContext(removed, element) else {
                    val left = removed.minusKey(ContinuationInterceptor)
                    if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                        CombinedContext(CombinedContext(left, element), interceptor)
                }
            }
        }

在這里第一次看到了ContinuationInterceptor的身影,當時核心是為了分析CoroutineContext,所以只是提了plus方法每次都會將ContinuationInterceptor添加到拼接鏈的尾部。

不知道有沒有老鐵想過這個問題,為什么要每次新加入一個CoroutineContext都要調(diào)整ContinuationInterceptor的位置,并將它添加到尾部?

這里其實涉及到兩點。

其中一點是由于CombinedContext的結(jié)構(gòu)決定的。它有兩個元素分別是leftelement。而left類似于前驅(qū)節(jié)點,它是一個前驅(qū)集合,而element只是一個純碎的CoroutineContext,而它的get方法每次都是從element開始進行查找對應(yīng)KeyCoroutineContext對象;沒有匹配到才會去left集合中進行遞歸查找。

所以為了加快查找ContinuationInterceptor類型的實例,才將它加入到拼接鏈的尾部,對應(yīng)的就是element

另一個原因是ContinuationInterceptor使用的很頻繁,因為每次創(chuàng)建協(xié)程都會去嘗試查找當前協(xié)程的CoroutineContext中是否存在ContinuationInterceptor。例如我們通過launch來看協(xié)程的啟動。

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

如果你使用launch的默認參數(shù),那么此時的Coroutine就是StandaloneCoroutine,然后調(diào)用start方法啟動協(xié)程。

public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
    initParentJob()
    start(block, receiver, this)
}

start中進入了CoroutineStart,對應(yīng)的就是下面這段代碼

public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>) =
    when (this) {
        CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion)
        CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
        CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
        CoroutineStart.LAZY -> Unit // will start lazily
    }

因為我們使用的是默認參數(shù),所以這里對應(yīng)的就是CoroutineStart.DEFAULT,最終來到block.startCoroutineCancellable

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
    runSafely(completion) {
        createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellable(Unit)
    }

在這里我們終于看到了intercepted。

首先通過createCoroutineUnintercepted來創(chuàng)建一個協(xié)程(內(nèi)部具體如何創(chuàng)建的這篇文章先不說,后續(xù)文章會單獨分析),然后再調(diào)用了intercepted方法進行攔截操作,最后再resumeCancellable,這個方法最終調(diào)用的就是ContinuationresumeWith方法,即啟動協(xié)程。

所以每次啟動協(xié)程都會自動回調(diào)一次resumeWith方法。

今天的主題是ContinuationInterceptor所以我們直接看intercepted。

public expect fun <T> Continuation<T>.intercepted(): Continuation<T>

發(fā)現(xiàn)它是一個expect方法,它會根據(jù)不同平臺實現(xiàn)不同的邏輯。因為我們是Android所以直接看Android上的actual的實現(xiàn)

public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
    (this as? ContinuationImpl)?.intercepted() ?: this

最終來到ContinuationImplintercepted方法

public fun intercepted(): Continuation<Any?> =
    intercepted
        ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
            .also { intercepted = it }

在這里看到了熟悉的context,獲取到ContinuationInterceptor實例,并且調(diào)用它的interceptContinuation方法返回一個處理過的Continuation。

多次調(diào)用intercepted,對應(yīng)的interceptContinuation只會調(diào)用一次。

所以ContinuationInterceptor的攔截是通過interceptContinuation方法進行的。既然已經(jīng)明白了它的攔截方式,我們自己來手動寫一個攔截器來驗證一下。

val interceptor = object : ContinuationInterceptor {
 
    override val key: CoroutineContext.Key<*> = ContinuationInterceptor
 
    override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {
        println("intercept todo something. change run to thread")
        return object : Continuation<T> by continuation {
            override fun resumeWith(result: Result<T>) {
                println("create new thread")
                thread {
                    continuation.resumeWith(result)
                }
            }
        }
    }

}
 
println(Thread.currentThread().name)
 
lifecycleScope.launch(interceptor) {
    println("launch start. current thread: ${Thread.currentThread().name}")
    
    withContext(Dispatchers.Main) {
        println("new continuation todo something in the main thread. current thread: ${Thread.currentThread().name}")
    }
    
    launch {
        println("new continuation todo something. current thread: ${Thread.currentThread().name}")
    }
    
    println("launch end. current thread: ${Thread.currentThread().name}")
}

這里簡單實現(xiàn)了一個ContinuationInterceptor,如果攔截成功就會輸出interceptContinuation中對應(yīng)的語句。下面是程序運行后的輸出日志。

main
// 第一次launch
intercept todo something. change run to thread
create new thread
launch start. current thread: Thread-2
new continuation todo something in the main thread. current thread: main
create new thread
// 第二次launch
intercept todo something. change run to thread
create new thread
launch end. current thread: Thread-7
new continuation todo something. current thread: Thread-8

分析一下上面的日志,首先程序運行在main線程,通過lifecycleScope.launch啟動協(xié)程并將我們自定義的intercetpor加入到CoroutineContext中;然后在啟動的過程中發(fā)現(xiàn)我們自定義的interceptor攔截成功了,同時將原本在main線程運行的程序切換到了新的thread線程。同時第二次launch的時候也攔截成功。

到這里就已經(jīng)可以證明我們上面對ContinuationInterceptor理解是正確的,它可以在協(xié)程啟動的時候進行攔截操作。

下面我們繼續(xù)看日志,發(fā)現(xiàn)withContext并沒有攔截成功,這是為什么呢?注意看Dispatchers.Main。這也是接下來需要分析的內(nèi)容。

另外還有一點,如果細心的老鐵就會發(fā)現(xiàn),launch startlaunch end所處的線程不一樣,這是因為在withContext結(jié)束之后,它內(nèi)部還會進行一次線程恢復,將自身所處的main線程切換到之前的線程,但為什么又與之前launch start的線程不同呢?

大家不要忘了,協(xié)程每一個掛起后的恢復都是通過回調(diào)resumeWith進行的,然而外部launch協(xié)程我們進行了攔截,在它返回的ContinuationresumeWith回調(diào)中總是會創(chuàng)建新的thread。所以發(fā)生這種情況也就不奇怪了,這是我們攔截的效果。

整體再來看這個例子,它是不是像一個簡易版的協(xié)程的線程切換呢?

CoroutineDispatcher

現(xiàn)在我們來看Dispatchers.Main,為什么它會導致我們攔截失敗呢?要探究原因沒有直接看源碼更加直接有效的。

public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher

主要看它的類型,它返回的是MainCoroutineDispatcher,然后再看它是什么

public abstract class MainCoroutineDispatcher : CoroutineDispatcher() {}

發(fā)現(xiàn)MainCoroutineDispatcher繼承于CoroutineDispatcher,主角登場了,但還不夠我們繼續(xù)看CoroutineDispatcher是什么

public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
    
    public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
    
    public abstract fun dispatch(context: CoroutineContext, block: Runnable)
    
    public open fun dispatchYield(context: CoroutineContext, block: Runnable) = dispatch(context, block)
    
    public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        DispatchedContinuation(this, continuation)
}

真想已經(jīng)浮出水面了,原來CoroutineDispatcher實現(xiàn)了ContinuationInterceptor,說明CoroutineDispatcher也具有攔截器的功能。然后再結(jié)合CoroutineContext的性質(zhì),就很好解釋為什么我們自定義的攔截器沒有生效。

原因就是它與我們自定義的攔截器一樣都實現(xiàn)了ContinuationInterceptor接口,一旦使用Dispatchers.Main就會替換掉我們自定義的攔截器。

因果關(guān)系弄明白了現(xiàn)在就好辦了。我們已經(jīng)知道它具有攔截功能,再來看CoroutineDispatcher提供的另外幾個方法isDispatchNeededdispatch。

我們可以大膽猜測,isDispatchNeeded就是判斷是否需要分發(fā),然后dispatch就是如何進行分發(fā),接下來我們來驗證一下。

ContinuationInterceptor重要的方法就是interceptContinuation,在CoroutineDispatcher中直接返回了DispatchedContinuation對象,它是一個Continuation類型。那么自然重點就是它的resumeWith方法。

override fun resumeWith(result: Result<T>) {
    val context = continuation.context
    val state = result.toState()
    if (dispatcher.isDispatchNeeded(context)) {
        _state = state
        resumeMode = MODE_ATOMIC_DEFAULT
        dispatcher.dispatch(context, this)
    } else {
        executeUnconfined(state, MODE_ATOMIC_DEFAULT) {
            withCoroutineContext(this.context, countOrElement) {
                continuation.resumeWith(result)
            }
        }
    }
}

這里我們看到了isDispatchNeededdispatch方法,如果不需要分發(fā)自然是直接調(diào)用原始的continuation對象的resumeWith方法,也就沒有什么類似于線程的切換。

那什么時候isDispatcheNeededtrue呢?這就要看它的dispatcer是什么。

由于現(xiàn)在我們是拿Dispatchers.Main作分析。所以這里我直接告訴你們它的dispatcherHandlerContext

override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
    HandlerContext(Looper.getMainLooper().asHandler(async = true), "Main")

internal class HandlerContext private constructor(
    private val handler: Handler,
    private val name: String?,
    private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
    /**
     * Creates [CoroutineDispatcher] for the given Android [handler].
     *
     * @param handler a handler.
     * @param name an optional name for debugging.
     */
    public constructor(
        handler: Handler,
        name: String? = null
    ) : this(handler, name, false)

    @Volatile
    private var _immediate: HandlerContext? = if (invokeImmediately) this else null

    override val immediate: HandlerContext = _immediate ?:
        HandlerContext(handler, name, true).also { _immediate = it }

    override fun isDispatchNeeded(context: CoroutineContext): Boolean {
        return !invokeImmediately || Looper.myLooper() != handler.looper
    }

    override fun dispatch(context: CoroutineContext, block: Runnable) {
        handler.post(block)
    }
    ...
}

它繼承于HandlerDispatcher,而HandlerDispatcher繼承于MainCoroutineDispatcher。

條件都符合,我們直接看isDispatchNeeded方法返回true的邏輯。

首先通過invokeImmediately判斷,它代表當前線程是否與自身的線程相同,如何你外部使用者能夠保證這一點,就可以直接使用Dispatcher.Main.immediate來避免進行線程的切換邏輯。當然為了保證外部的判斷失敗,最后也會通過Looper.myLooper() != handler.looper來進行校正。對于Dispatchers.Main這個的handle.looper自然是主線程的looper。

如果不能保證則invokeImmediatelyfalse,直接進行線程切換。然后進入dispatch方法,下面是Dispatchers.Maindispatch的處理邏輯。

override fun dispatch(context: CoroutineContext, block: Runnable) {
    handler.post(block)
}

這個再熟悉不過了,因為這個時候的handler.post就是代表向主線程推送消息,此時的block將會在主線程進行調(diào)用。

這樣線程的切換就完成。

所以綜上來看,CoroutineDispatcher為協(xié)程提供了一個線程切換的統(tǒng)一判斷與執(zhí)行標準。

首先在協(xié)程進行啟動的時候通過攔截器的方式進行攔截,對應(yīng)的方法是interceptContinuation,然后返回一個具有切換線程功能的Continuation,在每次進行resumeWith的時候,內(nèi)部再通過isDispatchNeeded進行判斷當前協(xié)程的運行是否需要切換線程。如果需要則調(diào)用dispatch進行線程的切換,保證協(xié)程的正確運行。

如果我要自定義協(xié)程線程的切換邏輯,就可以通過繼承于CoroutineDispatcher來實現(xiàn),將它的核心方法進行自定義即可。

當然,如果你是在Android中使用協(xié)程,那基本上是不需要自定義線程的切換邏輯。因為kotlin已經(jīng)為我們提供了日常所需的Dispatchers。主要有四種分別為:

  1. Dispatchers.Default: 適合在主線程之外執(zhí)行占用大量CPU資源的工作
  2. Dispatchers.Main: Android主線程
  3. Dispatchers.Unconfined: 它不會切換線程,只是啟動一個協(xié)程進行掛起,至于恢復之后所在的線程完全由調(diào)用它恢復的協(xié)程控制。
  4. Dispatchers.IO: 適合在主線程之外執(zhí)行磁盤或網(wǎng)絡(luò)I/O

最后我們再來簡單提一下withContext。

withContext

CoroutineDispatcher雖然能夠提供線程的切換,但這只是單方向的,因為它沒有提供線程的恢復。

試想一下,我們有個網(wǎng)絡(luò)請求,我們通過CoroutineDispatcher將線程切換到Dispatchers.IO,當拿到請求成功的數(shù)據(jù)之后,所在的線程還是IO線程,這樣并不能有利于我們UI操作。所以為了解決這個問題kotlin提供了withContext,它不僅能夠接受CoroutineDispatcher來幫助我們切換線程,同時在執(zhí)行完畢之后還會幫助我們將之前切換掉的線程進恢復,保證協(xié)程運行的連貫性。這也是為什么官方推薦使用withContext進行協(xié)程線程的切換的原因。

withContext的線程恢復原理是它內(nèi)部生成了一個DispatchedCoroutine,保存切換線程時的CoroutineContext與切換之前的Continuation,最后在onCompletionInternal進行恢復。

internal override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {
    if (state is CompletedExceptionally) {
        val exception = if (mode == MODE_IGNORE) state.cause else recoverStackTrace(state.cause, uCont)
        uCont.resumeUninterceptedWithExceptionMode(exception, mode)
    } else {
        uCont.resumeUninterceptedMode(state as T, mode)
    }
}

這個uCont就是切換線程之前的Continuation。具體實現(xiàn)就不在這分析了,感興趣的老鐵可以自己翻一翻源碼。

本篇文章主要介紹了ContinuationInterceptor作用與如何攔截協(xié)程的,同時也分析了CoroutineDispatcher內(nèi)部結(jié)構(gòu),進一步剖析了協(xié)程線程切換的原理。希望對學習協(xié)程的伙伴們能夠有所幫助,敬請期待后續(xù)的協(xié)程分析。

項目

android_startup: 提供一種在應(yīng)用啟動時能夠更加簡單、高效的方式來初始化組件,優(yōu)化啟動速度。不僅支持Jetpack App Startup的全部功能,還提供額外的同步與異步等待、線程控制與多進程支持等功能。

AwesomeGithub: 基于Github客戶端,純練習項目,支持組件化開發(fā),支持賬戶密碼與認證登陸。使用Kotlin語言進行開發(fā),項目架構(gòu)是基于Jetpack&DataBindingMVVM;項目中使用了Arouter、Retrofit、Coroutine、Glide、DaggerHilt等流行開源技術(shù)。

flutter_github: 基于Flutter的跨平臺版本Github客戶端,與AwesomeGithub相對應(yīng)。

android-api-analysis: 結(jié)合詳細的Demo來全面解析Android相關(guān)的知識點, 幫助讀者能夠更快的掌握與理解所闡述的要點。

daily_algorithm: 每日一算法,由淺入深,歡迎加入一起共勉。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容