kotlin Coroutine原理

Coroutine協(xié)程是kotlin實(shí)現(xiàn)的一種異步執(zhí)行邏輯的方式,相對(duì)與傳統(tǒng)的線程,協(xié)程更加簡(jiǎn)潔,高效,占用資源少。那協(xié)程到底是怎么實(shí)現(xiàn)異步的呢?

線程

在現(xiàn)在的操作系統(tǒng)中,線程是CPU調(diào)度的最少單元。所有的程序邏輯運(yùn)行在線程之上。在Java API中, Thread是實(shí)現(xiàn)線程的基本類。它的內(nèi)部實(shí)現(xiàn)是大量的 JNI 調(diào)用,因?yàn)榫€程的實(shí)現(xiàn)必須由操作系統(tǒng)直接提供支持。在 Android 平臺(tái)上,Thread 的創(chuàng)建過(guò)程中,會(huì)調(diào)用 Linux API 中的 pthread_create 函數(shù),這直接說(shuō)明了 Java 層中的 Thread 和 Linux 系統(tǒng)級(jí)別的中的線程是一一對(duì)應(yīng)的。

線程的問(wèn)題是阻塞與運(yùn)行兩種狀態(tài)之間的切換有相當(dāng)大的資源開(kāi)銷,線程并不是一種輕量級(jí)資源,大量創(chuàng)建線程是對(duì)系統(tǒng)資源的一種消耗,而線程的阻塞調(diào)用會(huì)導(dǎo)致系統(tǒng)中存在大量因阻塞而不運(yùn)行的線程,這對(duì)系統(tǒng)資源是一種極大的浪費(fèi)。

協(xié)程

協(xié)程本質(zhì)上可以認(rèn)為是運(yùn)行在線程上的代碼塊,協(xié)程提供的 掛起 操作會(huì)使協(xié)程暫停執(zhí)行,而不會(huì)導(dǎo)致線程阻塞。而且協(xié)程是一種輕量級(jí)資源,一個(gè)應(yīng)用中即使創(chuàng)建了上千個(gè)協(xié)程也不會(huì)造成太大的負(fù)擔(dān)。

協(xié)程的是通過(guò)’suspend‘修飾符來(lái)修飾需要掛起的方法。suspend并不是Java的API,是kotlin通過(guò)編譯器實(shí)現(xiàn)的。

CPS 變換

被 suspend 修飾符修飾的函數(shù)在編譯期間會(huì)被編譯器做特殊處理,這個(gè)特殊處理的第一步就是做CPS 變換。

CPS (Continuation-passing style)變換是一種編程風(fēng)格,就是將控制流顯式表示為continuation的一種編程風(fēng)格. 簡(jiǎn)單來(lái)理解就是顯式使用函數(shù)表示函數(shù)返回的后續(xù)操作。

例如:

suspend fun <T> foo<T>.await(): T

在編譯期發(fā)生 CPS 變換之后:

fun <T> foo<T>.await(continuation: Continuation<T>): Any?

CPS 變換后的函數(shù)多了一個(gè) Continuation<T> 類型的參數(shù),Continuation就是續(xù)體。源碼:

interface Continuation<in T> {
   val context: CoroutineContext
   fun resumeWith(result: Result<T>)
}

Continuation是一個(gè)抽象的概念,簡(jiǎn)單來(lái)說(shuō)它包裝了協(xié)程在掛起之后應(yīng)該繼續(xù)執(zhí)行的代碼;在編譯的過(guò)程中,一個(gè)完整的協(xié)程被分割切塊成多個(gè)續(xù)體。在 await 函數(shù)的掛起結(jié)束以后,它會(huì)調(diào)用 continuation 參數(shù)的 resumeWith 函數(shù),來(lái)恢復(fù)執(zhí)行 await 函數(shù)后面的代碼。

方法經(jīng)過(guò)CPS 變換之后,返回值類型變成了 Any?,這是因?yàn)檫@個(gè)函數(shù)在發(fā)生變換后,除了要返回它本身的返回值,還要返回一個(gè)標(biāo)記——COROUTINE_SUSPENDED,而這個(gè)返回類型事實(shí)上是返回類型 T 與 COROUTINE_SUSPENDED 的聯(lián)合類型。由于Kotlin 中沒(méi)有聯(lián)合類型,所以只好用最泛化的類型 Any? 來(lái)表示,而 COROUTINE_SUSPENDED 是一個(gè)標(biāo)記,返回它的掛起函數(shù)表示這個(gè)掛起函數(shù)會(huì)發(fā)生事實(shí)上的掛起操作。

狀態(tài)機(jī)

Continuation為了直接支持掛起(即使協(xié)程在掛起點(diǎn)中斷執(zhí)行而在適當(dāng)?shù)臅r(shí)機(jī)在恢復(fù))操作,編譯器在編譯掛起函數(shù)時(shí)會(huì)將函數(shù)體編譯為狀態(tài)機(jī)。主要是為了性能考慮,避免多創(chuàng)建類和對(duì)象。

如:

val a = a()
val y = foo(a).await() // #1
b()
val z = bar(a, y).await() //  #2
c(z)

編譯之后生成的偽代碼:

class <anonymous_for_state_machine> extends SuspendLambda<...> {
    // 狀態(tài)機(jī)當(dāng)前狀態(tài)
    int label = 0
    
    // 協(xié)程的局部變量
    A a = null
    Y y = null
    
    void resumeWith(Object result) {
        if (label == 0) goto L0
        if (label == 1) goto L1
        if (label == 2) goto L2
        else throw IllegalStateException()
        
      L0:
        // 這次調(diào)用,result 應(yīng)該為空
        a = a()
        label = 1
        result = foo(a).await(this) // 'this' 作為續(xù)體傳遞
        if (result == COROUTINE_SUSPENDED) return // 如果 await 掛起了執(zhí)行則返回
      L1:
        // 外部代碼傳入 .await() 的結(jié)果恢復(fù)協(xié)程 
        y = (Y) result
        b()
        label = 2
        result = bar(a, y).await(this) // 'this' 作為續(xù)體傳遞
        if (result == COROUTINE_SUSPENDED) return // 如果 await 掛起了執(zhí)行則返回
      L2:
        // 外部代碼傳入 .await() 的結(jié)果恢復(fù)協(xié)程
        Z z = (Z) result
        c(z)
        label = -1 // 沒(méi)有其他步驟了
        return
    }          
}    

一個(gè)掛起函數(shù)會(huì)被編譯成一個(gè)匿名類,匿名類中的一個(gè)函數(shù)實(shí)現(xiàn)了這個(gè)狀態(tài)機(jī)。成員變量 label 代表了當(dāng)前狀態(tài)機(jī)的狀態(tài),每一個(gè)續(xù)體(即掛起點(diǎn)中間的部分以及掛起點(diǎn)與函數(shù)頭尾之間的部分)都各自對(duì)應(yīng)了一個(gè)狀態(tài),當(dāng)函數(shù)運(yùn)行到每個(gè)掛起點(diǎn)時(shí),label 的值都受限會(huì)發(fā)生改變,并且當(dāng)前的續(xù)體(也就是代碼中的this)都會(huì)作為實(shí)參傳遞給發(fā)生了 CPS 變換的掛起函數(shù),如果這個(gè)掛起函數(shù)沒(méi)有發(fā)生事實(shí)上的掛起,函數(shù)繼續(xù)運(yùn)行,如果發(fā)生了事實(shí)上的掛起,則函數(shù)直接 return。

由于 label 記錄了狀態(tài),所以在協(xié)程恢復(fù)的時(shí)候,可以根據(jù)狀態(tài)使用 goto 語(yǔ)句直接跳轉(zhuǎn)至上次的掛起點(diǎn)并向后執(zhí)行,這就是協(xié)程掛起的原理。另外,雖然 Java 中沒(méi)有 goto 語(yǔ)句,但是 class 字節(jié)碼中支持 goto。

續(xù)體攔截器

掛起函數(shù)在恢復(fù)的時(shí)候,理論上可能會(huì)在任何一個(gè)線程上恢復(fù),有時(shí)我們需要限定協(xié)程運(yùn)行在指定的線程,例如在Android中,更新 UI 的操作只能在 UI 主線程中進(jìn)行。

android MainDispatcherLoader的實(shí)現(xiàn):

// Main 調(diào)度器
@JvmStatic
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher

// dispatcher 由 loadMainDispatcher() 函數(shù)創(chuàng)建
internal object MainDispatcherLoader {
    @JvmField
    val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()

    private fun loadMainDispatcher(): MainCoroutineDispatcher {
        ......
    }
}

// MainCoroutineDispatcher
public abstract class MainCoroutineDispatcher : CoroutineDispatcher() {

    @ExperimentalCoroutinesApi
    public abstract val immediate: MainCoroutineDispatcher
}

// CoroutineDispatcher
public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
    
    ......
}

@InternalCoroutinesApi
public fun MainDispatcherFactory.tryCreateDispatcher(factories: List<MainDispatcherFactory>): MainCoroutineDispatcher =
    try {
        createDispatcher(factories)
    } catch (cause: Throwable) {
        MissingMainCoroutineDispatcher(cause, hintOnError())
    }

/**
 * @suppress
 */
@InternalCoroutinesApi
public object MissingMainCoroutineDispatcherFactory : MainDispatcherFactory {
    override val loadPriority: Int
        get() = -1

    override fun createDispatcher(allFactories: List<MainDispatcherFactory>): MainCoroutineDispatcher {
        return MissingMainCoroutineDispatcher(null)
    }
}

// ContinuationInterceptor(續(xù)體攔截器)
public interface ContinuationInterceptor : CoroutineContext.Element {

    companion object Key : CoroutineContext.Key<ContinuationInterceptor>

    public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>

    public fun releaseInterceptedContinuation(continuation: Continuation<*>) {
        /* do nothing by default */
    }

    // Performance optimization for a singleton Key
    public override operator fun <E : CoroutineContext.Element> get(key: CoroutineContext.Key<E>): E? =
        @Suppress("UNCHECKED_CAST")
        if (key === Key) this as E else null

    // Performance optimization to a singleton Key
    public override fun minusKey(key: CoroutineContext.Key<*>): CoroutineContext =
        if (key === Key) EmptyCoroutineContext else this
}

ContinuationInterceptor,負(fù)責(zé)攔截協(xié)程在恢復(fù)后應(yīng)執(zhí)行的代碼(即續(xù)體)并將其在指定線程或線程池恢復(fù)

在掛起函數(shù)的編譯中,每個(gè)掛起函數(shù)都會(huì)被編譯為一個(gè)實(shí)現(xiàn)了 Continuation 接口的匿名類,而續(xù)體攔截器會(huì)攔截真正掛起協(xié)程的掛起點(diǎn)的續(xù)體。在協(xié)程中調(diào)用掛起函數(shù),掛起函數(shù)不一定會(huì)真正掛起協(xié)程

如:

launch {
    val deferred = async {
        // 異步邏輯
        ......
    }
    ......
    deferred.await()
    ......
}

在 deferred.await() 這行執(zhí)行的時(shí)候,如果異步邏輯已經(jīng)執(zhí)行完成并取得了結(jié)果,那 await 函數(shù)會(huì)直接取得結(jié)果,而不會(huì)掛起協(xié)程。相反,如果網(wǎng)絡(luò)請(qǐng)求還未產(chǎn)生結(jié)果,await 函數(shù)就會(huì)使協(xié)程掛起。續(xù)體攔截器只攔截真正發(fā)生掛起的掛起點(diǎn)后的續(xù)體,對(duì)于未發(fā)生掛起的掛起點(diǎn),續(xù)體會(huì)被直接調(diào)用 resumeWith 這一類的函數(shù)而不需要續(xù)攔截器對(duì)它進(jìn)行操作。除此之外,續(xù)體攔截器還會(huì)緩存攔截過(guò)的續(xù)體,并且在不再需要它的時(shí)候調(diào)用 releaseInterceptedContinuation 函數(shù)釋放它。

參考:Kotlin 協(xié)程設(shè)計(jì)文檔

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容