Kotlin 協(xié)程學(xué)習(xí)筆記

一、Kotlin 協(xié)程概念

Kotlin 協(xié)程提供了一種全新處理并發(fā)的方式,你可以在 Android 平臺(tái)上使用它來(lái)簡(jiǎn)化異步執(zhí)行的代碼。協(xié)程從 Kotlin 1.3 版本開(kāi)始引入,但這一概念在編程世界誕生的黎明之際就有了,最早使用協(xié)程的編程語(yǔ)言可以追溯到 1967 年的 Simula 語(yǔ)言。在過(guò)去幾年間,協(xié)程這個(gè)概念發(fā)展勢(shì)頭迅猛,現(xiàn)已經(jīng)被諸多主流編程語(yǔ)言采用,比如 Javascript、C#、Python、Ruby 以及 Go 等。Kotlin 協(xié)程是基于來(lái)自其他語(yǔ)言的既定概念
Goggle 官方推薦將 Kotlin 協(xié)程作為在 Android 上進(jìn)行異步編程的解決方案,值得關(guān)注的功能點(diǎn)包括:

  • 輕量:你可以在單個(gè)線程上運(yùn)行多個(gè)協(xié)程,因?yàn)閰f(xié)程支持掛起,不會(huì)使正在運(yùn)行協(xié)程的線程阻塞。掛起比阻塞節(jié)省內(nèi)存,且支持多個(gè)并行操作
  • 內(nèi)存泄露更少:使用結(jié)構(gòu)化并發(fā)機(jī)制在一個(gè)作用域內(nèi)執(zhí)行多個(gè)操作
  • 內(nèi)置取消支持:取消功能會(huì)自動(dòng)通過(guò)正在運(yùn)行的協(xié)程層次結(jié)構(gòu)傳播
  • Jetpack 集成:許多 Jetpack 庫(kù)都包含提供全面協(xié)程支持的擴(kuò)展。某些庫(kù)還提供自己的協(xié)程作用域,可供你用于結(jié)構(gòu)化并發(fā)

引入依賴:

    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutine_version"
    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:$coroutine_version"

二、協(xié)程練手

協(xié)程可以稱為輕量級(jí)線程。Kotlin 協(xié)程在 CoroutineScope 的上下文中通過(guò) launch、async 等協(xié)程構(gòu)造器(CoroutineBuilder)來(lái)聲明并啟動(dòng)

fun main() {
    GlobalScope.launch(context = Dispatchers.IO) {
        //延時(shí)一秒
        delay(1000)
        log("launch")
    }
    //主動(dòng)休眠兩秒,防止JVM過(guò)快退出
    Thread.sleep(2000)
    log("end")
}

private fun log(msg: Any?) = println("[${Thread.currentThread().name}] $msg")
[DefaultDispatcher-worker-1 @coroutine#1] launch
[main] end

在上面的例子中,通過(guò) GlobalScope(即全局作用域)啟動(dòng)了一個(gè)協(xié)程,在延遲一秒后輸出一行日志。從輸出結(jié)果可以看出來(lái),啟動(dòng)的協(xié)程是運(yùn)行在協(xié)程內(nèi)部的線程池中。雖然從表現(xiàn)結(jié)果上來(lái)看,啟動(dòng)一個(gè)協(xié)程類似于我們直接使用 Thread 來(lái)執(zhí)行耗時(shí)任務(wù),但實(shí)際上協(xié)程和線程有著本質(zhì)上的區(qū)別。通過(guò)使用協(xié)程,可以極大的提高線程的并發(fā)效率,避免以往的嵌套回調(diào)地獄,極大提高了代碼的可讀性

以上代碼就涉及到了協(xié)程的四個(gè)基礎(chǔ)概念:

  • suspend function。即掛起函數(shù),delay 函數(shù)就是協(xié)程庫(kù)提供的一個(gè)用于實(shí)現(xiàn)非阻塞式延時(shí)的掛起函數(shù)
  • CoroutineScope。即協(xié)程作用域,GlobalScope 是 CoroutineScope 的一個(gè)實(shí)現(xiàn)類,用于指定協(xié)程的作用范圍,可用于管理多個(gè)協(xié)程的生命周期,所有協(xié)程都需要通過(guò) CoroutineScope 來(lái)啟動(dòng)
  • CoroutineContext。即協(xié)程上下文,包含多種類型的配置參數(shù)。Dispatchers.IO 就是 CoroutineContext 這個(gè)抽象概念的一種實(shí)現(xiàn),用于指定協(xié)程的運(yùn)行載體,即用于指定協(xié)程要運(yùn)行在哪類線程上
  • CoroutineBuilder。即協(xié)程構(gòu)建器,協(xié)程在 CoroutineScope 的上下文中通過(guò) launch、async 等協(xié)程構(gòu)建器來(lái)進(jìn)行聲明并啟動(dòng)。launch、async 等均被聲明 CoroutineScope 的擴(kuò)展方法

三、suspend function

如果上述例子試圖直接在 GlobalScope 外調(diào)用 delay() 函數(shù)的話,IDE 就會(huì)提示一個(gè)錯(cuò)誤:Suspend function 'delay' should be called only from a coroutine or another suspend function。意思是:delay() 函數(shù)是一個(gè)掛起函數(shù),只能由協(xié)程或者由其它掛起函數(shù)來(lái)調(diào)用

delay() 函數(shù)就使用了 suspend 進(jìn)行修飾,用 suspend 修飾的函數(shù)就是掛起函數(shù)

public suspend fun delay(timeMillis: Long)

讀者在網(wǎng)上看關(guān)于協(xié)程的文章的時(shí)候,應(yīng)該經(jīng)常會(huì)看到這么一句話:掛起函數(shù)不會(huì)阻塞其所在線程,而是會(huì)將協(xié)程掛起,在特定的時(shí)候才再恢復(fù)協(xié)程

對(duì)于這句話我的理解是:delay() 函數(shù)類似于 Java 中的 Thread.sleep(),而之所以說(shuō) delay() 函數(shù)是非阻塞的,是因?yàn)樗蛦渭兊木€程休眠有著本質(zhì)的區(qū)別。協(xié)程是運(yùn)行于線程上的,一個(gè)線程可以運(yùn)行多個(gè)(幾千上萬(wàn)個(gè))協(xié)程。線程的調(diào)度行為是由操作系統(tǒng)來(lái)管理的,而協(xié)程的調(diào)度行為是可以由開(kāi)發(fā)者來(lái)指定并由編譯器來(lái)實(shí)現(xiàn)的,協(xié)程能夠細(xì)粒度地控制多個(gè)任務(wù)的執(zhí)行時(shí)機(jī)和執(zhí)行線程,當(dāng)某個(gè)特定的線程上的所有協(xié)程被 suspend 后,該線程便可騰出資源去處理其他任務(wù)

例如,當(dāng)在 ThreadA 上運(yùn)行的 CoroutineA 調(diào)用了delay(1000L)函數(shù)指定延遲一秒后再運(yùn)行,ThreadA 會(huì)轉(zhuǎn)而去執(zhí)行 CoroutineB,等到一秒后再來(lái)繼續(xù)執(zhí)行 CoroutineA。所以,ThreadA 并不會(huì)因?yàn)?CoroutineA 的延時(shí)而阻塞,而是能繼續(xù)去執(zhí)行其它任務(wù),所以掛起函數(shù)并不會(huì)阻塞其所在線程,這樣就極大地提高了線程的并發(fā)靈活度,最大化了線程的利用效率。而如果是使用Thread.sleep()的話,線程就只能干等著而不能去執(zhí)行其它任務(wù),降低了線程的利用效率

四、suspend function 的掛起與恢復(fù)

協(xié)程在常規(guī)函數(shù)的基礎(chǔ)上添加了兩項(xiàng)操作用于處理長(zhǎng)時(shí)間運(yùn)行的任務(wù)。在invoke(或 call)和return之外,協(xié)程添加了suspendresume

  • suspend 用于暫停執(zhí)行當(dāng)前協(xié)程,并保存所有局部變量
  • resume 用于讓已暫停的協(xié)程從暫停處繼續(xù)執(zhí)行

suspend 函數(shù)只能由其它 suspend 函數(shù)調(diào)用,或者是由協(xié)程來(lái)調(diào)用

以下示例展示了一項(xiàng)任務(wù)(假設(shè) get 方法是一個(gè)網(wǎng)絡(luò)請(qǐng)求任務(wù))的簡(jiǎn)單協(xié)程實(shí)現(xiàn):

suspend fun fetchDocs() {                             // Dispatchers.Main
    val result = get("https://developer.android.com") // Dispatchers.IO for `get`
    show(result)                                      // Dispatchers.Main
}
suspend fun get(url: String) = withContext(Dispatchers.IO) { /* ... */ }

在上面的示例中,get() 仍在主線程上被調(diào)用,但它會(huì)在啟動(dòng)網(wǎng)絡(luò)請(qǐng)求之前暫停協(xié)程。get() 主體內(nèi)通過(guò)調(diào)用 withContext(Dispatchers.IO) 創(chuàng)建了一個(gè)在 IO 線程池中運(yùn)行的代碼塊,在該塊內(nèi)的任何代碼都始終通過(guò) IO 調(diào)度器執(zhí)行。當(dāng)網(wǎng)絡(luò)請(qǐng)求完成后,get() 會(huì)恢復(fù)已暫停的協(xié)程,使得主線程協(xié)程可以直接拿到網(wǎng)絡(luò)請(qǐng)求結(jié)果而不用使用回調(diào)來(lái)通知主線程。Retrofit 就是以這種方式來(lái)實(shí)現(xiàn)對(duì)協(xié)程的支持的

Kotlin 使用堆棧幀管理要運(yùn)行哪個(gè)函數(shù)以及所有局部變量。暫停協(xié)程時(shí),系統(tǒng)會(huì)復(fù)制并保存當(dāng)前的堆棧幀以供稍后使用?;謴?fù)時(shí),會(huì)將堆棧幀從其保存位置復(fù)制回來(lái),然后函數(shù)再次開(kāi)始運(yùn)行。即使代碼可能看起來(lái)像普通的順序阻塞請(qǐng)求,協(xié)程也能確保網(wǎng)絡(luò)請(qǐng)求避免阻塞主線程

在主線程進(jìn)行的暫停協(xié)程恢復(fù)協(xié)程的兩個(gè)操作,既實(shí)現(xiàn)了將耗時(shí)任務(wù)交由后臺(tái)線程完成,保障了主線程安全,又以同步代碼的方式完成了實(shí)際上的多線程異步調(diào)用??梢哉f(shuō),在 Android 平臺(tái)上協(xié)程主要就用來(lái)解決兩個(gè)問(wèn)題:

  1. 處理耗時(shí)任務(wù) (Long running tasks),這種任務(wù)常常會(huì)阻塞住主線程
  2. 保證主線程安全 (Main-safety) ,即確保安全地從主線程調(diào)用任何 suspend 函數(shù)

五、CoroutineScope

CoroutineScope 即協(xié)程作用域,用于對(duì)協(xié)程進(jìn)行追蹤。如果我們啟動(dòng)了多個(gè)協(xié)程但是沒(méi)有一個(gè)可以對(duì)其進(jìn)行統(tǒng)一管理的途徑的話,那么就會(huì)導(dǎo)致我們的代碼臃腫雜亂,甚至發(fā)生內(nèi)存泄露或者任務(wù)泄露。為了確保所有的協(xié)程都會(huì)被追蹤,Kotlin 不允許在沒(méi)有使用 CoroutineScope 的情況下啟動(dòng)新的協(xié)程。CoroutineScope 可被看作是一個(gè)具有超能力的 ExecutorService 的輕量級(jí)版本。它能啟動(dòng)新的協(xié)程,同時(shí)這個(gè)協(xié)程還具備上文所說(shuō)的 suspend 和 resume 的優(yōu)勢(shì)

所有的協(xié)程都需要通過(guò) CoroutineScope 來(lái)啟動(dòng),它會(huì)跟蹤它使用 launchasync 創(chuàng)建的所有協(xié)程,你可以隨時(shí)調(diào)用 scope.cancel() 取消正在運(yùn)行的協(xié)程。CoroutineScope 本身并不運(yùn)行協(xié)程,它只是確保你不會(huì)失去對(duì)協(xié)程的追蹤,即使協(xié)程被掛起也是如此。在 Android 中,某些 KTX 庫(kù)為某些生命周期類提供了自己的 CoroutineScope。例如,ViewModelviewModelScope,LifecyclelifecycleScope

CoroutineScope 大體上可以分為三種:

  • GlobalScope。即全局協(xié)程作用域,在這個(gè)范圍內(nèi)啟動(dòng)的協(xié)程可以一直運(yùn)行直到應(yīng)用停止運(yùn)行。GlobalScope 本身不會(huì)阻塞當(dāng)前線程,且啟動(dòng)的協(xié)程相當(dāng)于守護(hù)線程,不會(huì)阻止 JVM 結(jié)束運(yùn)行
  • runBlocking。一個(gè)頂層函數(shù),和 GlobalScope 不一樣,它會(huì)阻塞當(dāng)前線程直到其內(nèi)部所有相同作用域的協(xié)程執(zhí)行結(jié)束
  • 自定義 CoroutineScope??捎糜趯?shí)現(xiàn)主動(dòng)控制協(xié)程的生命周期范圍,對(duì)于 Android 開(kāi)發(fā)來(lái)說(shuō)最大意義之一就是可以避免內(nèi)存泄露

1、GlobalScope

GlobalScope 屬于全局作用域,這意味著通過(guò) GlobalScope 啟動(dòng)的協(xié)程的生命周期只受整個(gè)應(yīng)用程序的生命周期的限制,只要整個(gè)應(yīng)用程序還在運(yùn)行且協(xié)程的任務(wù)還未結(jié)束,協(xié)程就可以一直運(yùn)行

GlobalScope 不會(huì)阻塞其所在線程,所以以下代碼中主線程的日志會(huì)早于 GlobalScope 內(nèi)部輸出日志。此外,GlobalScope 啟動(dòng)的協(xié)程相當(dāng)于守護(hù)線程,不會(huì)阻止 JVM 結(jié)束運(yùn)行,所以如果將主線程的休眠時(shí)間改為三百毫秒的話,就不會(huì)看到 launch A 輸出日志

fun main() {
    log("start")
    GlobalScope.launch {
        launch {
            delay(400)
            log("launch A")
        }
        launch {
            delay(300)
            log("launch B")
        }
        log("GlobalScope")
    }
    log("end")
    Thread.sleep(500)
}
[main] start
[main] end
[DefaultDispatcher-worker-1 @coroutine#1] GlobalScope
[DefaultDispatcher-worker-3 @coroutine#3] launch B
[DefaultDispatcher-worker-3 @coroutine#2] launch A

GlobalScope.launch 會(huì)創(chuàng)建一個(gè)頂級(jí)協(xié)程,盡管它很輕量級(jí),但在運(yùn)行時(shí)還是會(huì)消耗一些內(nèi)存資源,且可以一直運(yùn)行直到整個(gè)應(yīng)用程序停止(只要任務(wù)還未結(jié)束),這可能會(huì)導(dǎo)致內(nèi)存泄露,所以在日常開(kāi)發(fā)中應(yīng)該謹(jǐn)慎使用 GlobalScope

2、runBlocking

也可以使用 runBlocking 這個(gè)頂層函數(shù)來(lái)啟動(dòng)協(xié)程,runBlocking 函數(shù)的第二個(gè)參數(shù)即協(xié)程的執(zhí)行體,該參數(shù)被聲明為 CoroutineScope 的擴(kuò)展函數(shù),因此執(zhí)行體就包含了一個(gè)隱式的 CoroutineScope,所以在 runBlocking 內(nèi)部可以來(lái)直接啟動(dòng)協(xié)程

public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T

runBlocking 的一個(gè)方便之處就是:只有當(dāng)內(nèi)部相同作用域的所有協(xié)程都運(yùn)行結(jié)束后,聲明在 runBlocking 之后的代碼才能執(zhí)行,即 runBlocking 會(huì)阻塞其所在線程

看以下代碼。runBlocking 內(nèi)部啟動(dòng)的兩個(gè)協(xié)程會(huì)各自做耗時(shí)操作,從輸出結(jié)果可以看出來(lái)兩個(gè)協(xié)程還是在交叉并發(fā)執(zhí)行,且 runBlocking 會(huì)等到兩個(gè)協(xié)程都執(zhí)行結(jié)束后才會(huì)退出,外部的日志輸出結(jié)果有明確的先后順序。即 runBlocking 內(nèi)部啟動(dòng)的協(xié)程是非阻塞式的,但 runBlocking 阻塞了其所在線程。此外,runBlocking 只會(huì)等待相同作用域的協(xié)程完成才會(huì)退出,而不會(huì)等待 GlobalScope 等其它作用域啟動(dòng)的協(xié)程

所以說(shuō),runBlocking 本身帶有阻塞線程的意味,但其內(nèi)部運(yùn)行的協(xié)程又是非阻塞的,讀者需要意會(huì)這兩者的區(qū)別

fun main() {
    log("start")
    runBlocking {
        launch {
            repeat(3) {
                delay(100)
                log("launchA - $it")
            }
        }
        launch {
            repeat(3) {
                delay(100)
                log("launchB - $it")
            }
        }
        GlobalScope.launch {
            repeat(3) {
                delay(120)
                log("GlobalScope - $it")
            }
        }
    }
    log("end")
}
[main] start
[main] launchA - 0
[main] launchB - 0
[DefaultDispatcher-worker-1] GlobalScope - 0
[main] launchA - 1
[main] launchB - 1
[DefaultDispatcher-worker-1] GlobalScope - 1
[main] launchA - 2
[main] launchB - 2
[main] end

基于是否會(huì)阻塞線程的區(qū)別,以下代碼中 runBlocking 會(huì)早于 GlobalScope 輸出日志

fun main() {
    GlobalScope.launch(Dispatchers.IO) {
        delay(600)
        log("GlobalScope")
    }
    runBlocking {
        delay(500)
        log("runBlocking")
    }
    //主動(dòng)休眠兩百毫秒,使得和 runBlocking 加起來(lái)的延遲時(shí)間多于六百毫秒
    Thread.sleep(200)
    log("after sleep")
}
[main] runBlocking
[DefaultDispatcher-worker-1] GlobalScope
[main] after sleep

3、coroutineScope

coroutineScope 函數(shù)用于創(chuàng)建一個(gè)獨(dú)立的協(xié)程作用域,直到所有啟動(dòng)的協(xié)程都完成后才結(jié)束自身。runBlockingcoroutineScope 看起來(lái)很像,因?yàn)樗鼈兌夹枰却鋬?nèi)部所有相同作用域的協(xié)程結(jié)束后才會(huì)結(jié)束自己。兩者的主要區(qū)別在于 runBlocking 方法會(huì)阻塞當(dāng)前線程,而 coroutineScope不會(huì)阻塞線程,而是會(huì)掛起并釋放底層線程以供其它協(xié)程使用。由于這個(gè)差別,runBlocking 是一個(gè)普通函數(shù),而 coroutineScope 是一個(gè)掛起函數(shù)

fun main() = runBlocking {
    launch {
        delay(100)
        log("Task from runBlocking")
    }
    coroutineScope {
        launch {
            delay(500)
            log("Task from nested launch")
        }
        delay(100)
        log("Task from coroutine scope")
    }
    log("Coroutine scope is over")
}
[main] Task from coroutine scope
[main] Task from runBlocking
[main] Task from nested launch
[main] Coroutine scope is over

4、supervisorScope

supervisorScope 函數(shù)用于創(chuàng)建一個(gè)使用了 SupervisorJob 的 coroutineScope,該作用域的特點(diǎn)就是拋出的異常不會(huì)連鎖取消同級(jí)協(xié)程和父協(xié)程

fun main() = runBlocking {
    launch {
        delay(100)
        log("Task from runBlocking")
    }
    supervisorScope {
        launch {
            delay(500)
            log("Task throw Exception")
            throw Exception("failed")
        }
        launch {
            delay(600)
            log("Task from nested launch")
        }
    }
    log("Coroutine scope is over")
}
[main @coroutine#2] Task from runBlocking
[main @coroutine#3] Task throw Exception
[main @coroutine#4] Task from nested launch
[main @coroutine#1] Coroutine scope is over

5、自定義 CoroutineScope

假設(shè)我們?cè)?Activity 中先后啟動(dòng)了多個(gè)協(xié)程用于執(zhí)行異步耗時(shí)操作,那么當(dāng) Activity 退出時(shí),必須取消所有協(xié)程以避免內(nèi)存泄漏。我們可以通過(guò)保留每一個(gè) Job 引用然后在 onDestroy方法里來(lái)手動(dòng)取消,但這種方式相當(dāng)來(lái)說(shuō)會(huì)比較繁瑣和低效。kotlinx.coroutines 提供了 CoroutineScope 來(lái)管理多個(gè)協(xié)程的生命周期

我們可以通過(guò)創(chuàng)建與 Activity 生命周期相關(guān)聯(lián)的協(xié)程作用域的實(shí)例來(lái)管理協(xié)程的生命周期。CoroutineScope 的實(shí)例可以通過(guò) CoroutineScope()MainScope() 的工廠函數(shù)來(lái)構(gòu)建。前者創(chuàng)建通用作用域,后者創(chuàng)建 UI 應(yīng)用程序的作用域并使用 Dispatchers.Main 作為默認(rèn)的調(diào)度器

class Activity {

    private val mainScope = MainScope()

    fun onCreate() {
        mainScope.launch {
            repeat(5) {
                delay(1000L * it)
            }
        }
    }

    fun onDestroy() {
        mainScope.cancel()
    }

}

或者,我們可以通過(guò)委托模式來(lái)讓 Activity 實(shí)現(xiàn) CoroutineScope 接口,從而可以在 Activity 內(nèi)直接啟動(dòng)協(xié)程而不必顯示地指定它們的上下文,并且在 onDestroy()中自動(dòng)取消所有協(xié)程

class Activity : CoroutineScope by CoroutineScope(Dispatchers.Default) {

    fun onCreate() {
        launch {
            repeat(5) {
                delay(200L * it)
                log(it)
            }
        }
        log("Activity Created")
    }

    fun onDestroy() {
        cancel()
        log("Activity Destroyed")
    }

}

從輸出結(jié)果可以看出,當(dāng)回調(diào)了onDestroy()方法后協(xié)程就不會(huì)再輸出日志了

fun main() = runBlocking {
    val activity = Activity()
    activity.onCreate()
    delay(1000)
    activity.onDestroy()
    delay(1000)
}
[main @coroutine#1] Activity Created
[DefaultDispatcher-worker-1 @coroutine#2] 0
[DefaultDispatcher-worker-1 @coroutine#2] 1
[DefaultDispatcher-worker-1 @coroutine#2] 2
[main @coroutine#1] Activity Destroyed

已取消的作用域無(wú)法再創(chuàng)建協(xié)程。因此,僅當(dāng)控制其生命周期的類被銷毀時(shí),才應(yīng)調(diào)用 scope.cancel()。例如,使用 viewModelScope 時(shí),ViewModel 類會(huì)在 ViewModel 的 onCleared() 方法中自動(dòng)取消作用域

六、CoroutineBuilder

1、launch

看下 launch 函數(shù)的方法簽名。launch 是一個(gè)作用于 CoroutineScope 的擴(kuò)展函數(shù),用于在不阻塞當(dāng)前線程的情況下啟動(dòng)一個(gè)協(xié)程,并返回對(duì)該協(xié)程任務(wù)的引用,即 Job 對(duì)象

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job
復(fù)制代碼

launch 函數(shù)共包含三個(gè)參數(shù):

  1. context。用于指定協(xié)程的上下文
  2. start。用于指定協(xié)程的啟動(dòng)方式。默認(rèn)值為 CoroutineStart.DEFAULT,即協(xié)程會(huì)在聲明的同時(shí)就立即進(jìn)入等待調(diào)度的狀態(tài),即可以立即執(zhí)行的狀態(tài)??梢酝ㄟ^(guò)將其設(shè)置為CoroutineStart.LAZY來(lái)實(shí)現(xiàn)延遲啟動(dòng),即懶加載
  3. block。用于傳遞協(xié)程的執(zhí)行體,即希望交由協(xié)程執(zhí)行的任務(wù)

可以看到 launchA 和 launchB 是并行交叉執(zhí)行的

fun main() = runBlocking {
    val launchA = launch {
        repeat(3) {
            delay(100)
            log("launchA - $it")
        }
    }
    val launchB = launch {
        repeat(3) {
            delay(100)
            log("launchB - $it")
        }
    }
}
[main] launchA - 0
[main] launchB - 0
[main] launchA - 1
[main] launchB - 1
[main] launchA - 2
[main] launchB - 2

2、Job

Job 是協(xié)程的句柄。使用 launchasync 創(chuàng)建的每個(gè)協(xié)程都會(huì)返回一個(gè) Job 實(shí)例,該實(shí)例唯一標(biāo)識(shí)協(xié)程并管理其生命周期。Job 是一個(gè)接口類型,這里列舉 Job 幾個(gè)比較有用的屬性和函數(shù)

    //當(dāng) Job 處于活動(dòng)狀態(tài)時(shí)為 true
    //如果 Job 未被取消或沒(méi)有失敗,則均處于 active 狀態(tài)
    public val isActive: Boolean

    //當(dāng) Job 正常結(jié)束或者由于異常結(jié)束,均返回 true
    public val isCompleted: Boolean

    //當(dāng) Job 被主動(dòng)取消或者由于異常結(jié)束,均返回 true
    public val isCancelled: Boolean

    //啟動(dòng) Job
    //如果此調(diào)用的確啟動(dòng)了 Job,則返回 true
    //如果 Job 調(diào)用前就已處于 started 或者是 completed 狀態(tài),則返回 false 
    public fun start(): Boolean

    //用于取消 Job,可同時(shí)通過(guò)傳入 Exception 來(lái)標(biāo)明取消原因
    public fun cancel(cause: CancellationException? = null)

    //阻塞等待直到此 Job 結(jié)束運(yùn)行
    public suspend fun join()

    //當(dāng) Job 結(jié)束運(yùn)行時(shí)(不管由于什么原因)回調(diào)此方法,可用于接收可能存在的運(yùn)行異常
    public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle

Job 具有以下幾種狀態(tài)值,每種狀態(tài)對(duì)應(yīng)的屬性值各不相同

State isActive isCompleted isCancelled
New (optional initial state) false false false
Active (default initial state) true false false
Completing (transient state) true false false
Cancelling (transient state) false false true
Cancelled (final state) false true true
Completed (final state) false true false
fun main() {
    //將協(xié)程設(shè)置為延遲啟動(dòng)
    val job = GlobalScope.launch(start = CoroutineStart.LAZY) {
        for (i in 0..100) {
            //每循環(huán)一次均延遲一百毫秒
            delay(100)
        }
    }
    job.invokeOnCompletion {
        log("invokeOnCompletion:$it")
    }
    log("1. job.isActive:${job.isActive}")
    log("1. job.isCancelled:${job.isCancelled}")
    log("1. job.isCompleted:${job.isCompleted}")

    job.start()

    log("2. job.isActive:${job.isActive}")
    log("2. job.isCancelled:${job.isCancelled}")
    log("2. job.isCompleted:${job.isCompleted}")

    //休眠四百毫秒后再主動(dòng)取消協(xié)程
    Thread.sleep(400)
    job.cancel(CancellationException("test"))

    //休眠四百毫秒防止JVM過(guò)快停止導(dǎo)致 invokeOnCompletion 來(lái)不及回調(diào)
    Thread.sleep(400)

    log("3. job.isActive:${job.isActive}")
    log("3. job.isCancelled:${job.isCancelled}")
    log("3. job.isCompleted:${job.isCompleted}")
}
[main] 1. job.isActive:false
[main] 1. job.isCancelled:false
[main] 1. job.isCompleted:false
[main] 2. job.isActive:true
[main] 2. job.isCancelled:false
[main] 2. job.isCompleted:false
[DefaultDispatcher-worker-2] invokeOnCompletion:java.util.concurrent.CancellationException: test
[main] 3. job.isActive:false
[main] 3. job.isCancelled:true
[main] 3. job.isCompleted:true

3、async

看下 async 函數(shù)的方法簽名。async 也是一個(gè)作用于 CoroutineScope 的擴(kuò)展函數(shù),和 launch 的區(qū)別主要就在于:async 可以返回協(xié)程的執(zhí)行結(jié)果,而 launch 不行

public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T>

通過(guò)await()方法可以拿到 async 協(xié)程的執(zhí)行結(jié)果,可以看到兩個(gè)協(xié)程的總耗時(shí)是遠(yuǎn)少于七秒的,總耗時(shí)基本等于耗時(shí)最長(zhǎng)的協(xié)程

fun main() {
    val time = measureTimeMillis {
        runBlocking {
            val asyncA = async {
                delay(3000)
                1
            }
            val asyncB = async {
                delay(4000)
                2
            }
            log(asyncA.await() + asyncB.await())
        }
    }
    log(time)
}
[main] 3
[main] 4070

由于 launch 和 async 僅能夠在 CouroutineScope 中使用,所以任何創(chuàng)建的協(xié)程都會(huì)被該 scope 追蹤。Kotlin 禁止創(chuàng)建不能夠被追蹤的協(xié)程,從而避免協(xié)程泄漏

4、async 的錯(cuò)誤用法

修改下上述代碼,可以發(fā)現(xiàn)兩個(gè)協(xié)程的總耗時(shí)就會(huì)變?yōu)槠呙胱笥?/p>

fun main() {
    val time = measureTimeMillis {
        runBlocking {
            val asyncA = async(start = CoroutineStart.LAZY) {
                delay(3000)
                1
            }
            val asyncB = async(start = CoroutineStart.LAZY) {
                delay(4000)
                2
            }
            log(asyncA.await() + asyncB.await())
        }
    }
    log(time)
}
[main] 3
[main] 7077

會(huì)造成這不同區(qū)別是因?yàn)?CoroutineStart.LAZY不會(huì)主動(dòng)啟動(dòng)協(xié)程,而是直到調(diào)用async.await()或者async.satrt()后才會(huì)啟動(dòng)(即懶加載模式),所以asyncA.await() + asyncB.await()會(huì)導(dǎo)致兩個(gè)協(xié)程其實(shí)是在順序執(zhí)行。而默認(rèn)值 CoroutineStart.DEFAULT 參數(shù)會(huì)使得協(xié)程在聲明的同時(shí)就被啟動(dòng)了(實(shí)際上還需要等待被調(diào)度執(zhí)行,但可以看做是立即就執(zhí)行了),所以即使 async.await()會(huì)阻塞當(dāng)前線程直到協(xié)程返回結(jié)果值,但兩個(gè)協(xié)程其實(shí)都是處于運(yùn)行狀態(tài),所以總耗時(shí)就是四秒左右

此時(shí)可以通過(guò)先調(diào)用start()再調(diào)用await()來(lái)實(shí)現(xiàn)第一個(gè)例子的效果

asyncA.start()
asyncB.start()
log(asyncA.await() + asyncB.await())

5、async 并行分解

suspend 函數(shù)啟動(dòng)的所有協(xié)程都必須在該函數(shù)返回結(jié)果時(shí)停止,因此你可能需要保證這些協(xié)程在返回結(jié)果之前完成。借助 Kotlin 中的結(jié)構(gòu)化并發(fā)機(jī)制,你可以定義用于啟動(dòng)一個(gè)或多個(gè)協(xié)程的 coroutineScope。然后,你可以使用 await()(針對(duì)單個(gè)協(xié)程)或 awaitAll()(針對(duì)多個(gè)協(xié)程)保證這些協(xié)程在從函數(shù)返回結(jié)果之前完成

例如,假設(shè)我們定義一個(gè)用于異步獲取兩個(gè)文檔的 coroutineScope。通過(guò)對(duì)每個(gè)延遲引用調(diào)用 await(),我們可以保證這兩項(xiàng) async 操作在返回值之前完成:

    suspend fun fetchTwoDocs() =
        coroutineScope {
            val deferredOne = async { fetchDoc(1) }
            val deferredTwo = async { fetchDoc(2) }
            deferredOne.await()
            deferredTwo.await()
    }

你還可以對(duì)集合使用 awaitAll(),如以下示例所示:

suspend fun fetchTwoDocs() =        // called on any Dispatcher (any thread, possibly Main)
    coroutineScope {
        val deferreds = listOf(     // fetch two docs at the same time
            async { fetchDoc(1) },  // async returns a result for the first doc
            async { fetchDoc(2) }   // async returns a result for the second doc
        )
        deferreds.awaitAll()        // use awaitAll to wait for both network requests
}

雖然 fetchTwoDocs() 使用 async 啟動(dòng)新協(xié)程,但該函數(shù)使用 awaitAll() 等待啟動(dòng)的協(xié)程完成后才會(huì)返回結(jié)果。不過(guò)請(qǐng)注意,即使我們沒(méi)有調(diào)用 awaitAll(),coroutineScope 構(gòu)建器也會(huì)等到所有新協(xié)程都完成后才恢復(fù)名為 fetchTwoDocs 的協(xié)程。此外,coroutineScope 會(huì)捕獲協(xié)程拋出的所有異常,并將其傳送回調(diào)用方

6、Deferred

async 函數(shù)的返回值是一個(gè) Deferred 對(duì)象。Deferred 是一個(gè)接口類型,繼承于 Job 接口,所以 Job 包含的屬性和方法 Deferred 都有,其主要就是在 Job 的基礎(chǔ)上擴(kuò)展了 await()方法

七、CoroutineContext

CoroutineContext 使用以下元素集定義協(xié)程的行為:

  • Job:控制協(xié)程的生命周期
  • CoroutineDispatcher:將工作分派到適當(dāng)?shù)木€程
  • CoroutineName:協(xié)程的名稱,可用于調(diào)試
  • CoroutineExceptionHandler:處理未捕獲的異常

1、Job

協(xié)程中的 Job 是其上下文 CoroutineContext 中的一部分,可以通過(guò) coroutineContext[Job] 表達(dá)式從上下文中獲取到

以下兩個(gè) log 語(yǔ)句雖然是運(yùn)行在不同的協(xié)程上,但是其指向的 Job 其實(shí)是同個(gè)對(duì)象

fun main() = runBlocking {
    val job = launch {
        log("My job is ${coroutineContext[Job]}")
    }
    log("My job is $job")
}
[main @coroutine#1] My job is "coroutine#2":StandaloneCoroutine{Active}@75a1cd57
[main @coroutine#2] My job is "coroutine#2":StandaloneCoroutine{Active}@75a1cd57

實(shí)際上 CoroutineScope 的 isActive 這個(gè)擴(kuò)展屬性只是 coroutineContext[Job]?.isActive == true 的一種簡(jiǎn)便寫法

public val CoroutineScope.isActive: Boolean
    get() = coroutineContext[Job]?.isActive ?: true

2、CoroutineDispatcher

CoroutineContext 包含一個(gè) CoroutineDispatcher(協(xié)程調(diào)度器)用于指定執(zhí)行協(xié)程的目標(biāo)載體,即運(yùn)行于哪個(gè)線程。CoroutineDispatcher 可以將協(xié)程的執(zhí)行操作限制在特定線程上,也可以將其分派到線程池中,或者讓它無(wú)限制地運(yùn)行。所有的協(xié)程構(gòu)造器(如 launch 和 async)都接受一個(gè)可選參數(shù),即 CoroutineContext ,該參數(shù)可用于顯式指定要?jiǎng)?chuàng)建的協(xié)程和其它上下文元素所要使用的 CoroutineDispatcher

要在主線程之外運(yùn)行代碼,可以讓 Kotlin 協(xié)程在 Default 或 IO 調(diào)度程序上執(zhí)行工作。在 Kotlin 中,所有協(xié)程都必須在 CoroutineDispatcher 中運(yùn)行,即使它們?cè)谥骶€程上運(yùn)行也是如此。協(xié)程可以自行暫停,而 CoroutineDispatcher 負(fù)責(zé)將其恢復(fù)

Kotlin 協(xié)程庫(kù)提供了四個(gè) Dispatcher 用于指定在何處運(yùn)行協(xié)程,大部分情況下我們只會(huì)接觸以下三個(gè):

  • Dispatchers.Main - 使用此調(diào)度程序可在 Android 主線程上運(yùn)行協(xié)程。此調(diào)度程序只能用于與界面交互和執(zhí)行快速工作。示例包括調(diào)用 suspend 函數(shù)、運(yùn)行 Android 界面框架操作,以及更新 LiveData 對(duì)象
  • Dispatchers.IO - 此調(diào)度程序經(jīng)過(guò)了專門優(yōu)化,適合在主線程之外執(zhí)行磁盤或網(wǎng)絡(luò) I/O。示例包括使用 Room 組件、從文件中讀取數(shù)據(jù)或向文件中寫入數(shù)據(jù),以及運(yùn)行任何網(wǎng)絡(luò)操作
  • Dispatchers.Default - 此調(diào)度程序經(jīng)過(guò)了專門優(yōu)化,適合在主線程之外執(zhí)行占用大量 CPU 資源的工作。用例示例包括對(duì)列表排序和解析 JSON
fun main() = runBlocking<Unit> {
    launch {
        log("main runBlocking")
    }
    launch(Dispatchers.Default) {
        log("Default")
    }
    launch(Dispatchers.IO) {
        log("IO")
    }
    launch(newSingleThreadContext("MyOwnThread")) {
        log("newSingleThreadContext")
    }
}
[DefaultDispatcher-worker-1 @coroutine#3] Default
[DefaultDispatcher-worker-2 @coroutine#4] IO
[MyOwnThread @coroutine#5] newSingleThreadContext
[main @coroutine#2] main runBlocking

當(dāng) launch {...} 在不帶參數(shù)的情況下使用時(shí),它從外部的協(xié)程作用域繼承上下文和調(diào)度器,即和 runBlocking 保持一致。而在 GlobalScope 中啟動(dòng)協(xié)程時(shí)默認(rèn)使用的調(diào)度器是 Dispatchers.default,并使用共享的后臺(tái)線程池,因此 launch(Dispatchers.default){...}GlobalScope.launch{...} 是使用相同的調(diào)度器。newSingleThreadContext 用于為協(xié)程專門創(chuàng)建一個(gè)新的線程來(lái)運(yùn)行,專用線程是一種成本非常昂貴的資源,在實(shí)際的應(yīng)用程序中必須在不再需要時(shí)釋放掉,或者存儲(chǔ)在頂級(jí)變量中以便在整個(gè)應(yīng)用程序中進(jìn)行重用

3、withContext

對(duì)于以下代碼,get方法內(nèi)使用withContext(Dispatchers.IO) 創(chuàng)建了一個(gè)指定在 IO 線程池中運(yùn)行的代碼塊,該區(qū)間內(nèi)的任何代碼都始終通過(guò) IO 線程來(lái)執(zhí)行。由于 withContext 方法本身就是一個(gè)掛起函數(shù),因此 get 方法也必須定義為掛起函數(shù)

suspend fun fetchDocs() {                      // Dispatchers.Main
    val result = get("developer.android.com")  // Dispatchers.Main
    show(result)                               // Dispatchers.Main
}

suspend fun get(url: String) =                 // Dispatchers.Main
    withContext(Dispatchers.IO) {              // Dispatchers.IO (main-safety block)
        /* perform network IO here */          // Dispatchers.IO (main-safety block)
    }                                          // Dispatchers.Main
}

借助協(xié)程,你可以細(xì)粒度地來(lái)調(diào)度線程。由于withContext()支持讓你在不引入回調(diào)的情況下控制任何代碼的執(zhí)行線程池,因此你可以將其應(yīng)用于非常小的函數(shù),例如從數(shù)據(jù)庫(kù)中讀取數(shù)據(jù)或執(zhí)行網(wǎng)絡(luò)請(qǐng)求。一種不錯(cuò)的做法是使用 withContext() 來(lái)確保每個(gè)函數(shù)都是主線程安全的,這意味著,你可以從主線程調(diào)用每個(gè)函數(shù)。這樣,調(diào)用方就從不需要考慮應(yīng)該使用哪個(gè)線程來(lái)執(zhí)行函數(shù)了

在前面的示例中,fetchDocs() 方法在主線程上執(zhí)行;不過(guò),它可以安全地調(diào)用 get方法,這樣會(huì)在后臺(tái)執(zhí)行網(wǎng)絡(luò)請(qǐng)求。由于協(xié)程支持 suspendresume,因此 withContext 塊完成后,主線程上的協(xié)程會(huì)立即根據(jù) get 結(jié)果恢復(fù)

與基于回調(diào)的等效實(shí)現(xiàn)相比,withContext() 不會(huì)增加額外的開(kāi)銷。此外在某些情況下,還可以優(yōu)化 withContext() 調(diào)用,使其超越基于回調(diào)的等效實(shí)現(xiàn)。例如,如果某個(gè)函數(shù)對(duì)一個(gè)網(wǎng)絡(luò)進(jìn)行十次調(diào)用,你可以使用外部 withContext() 讓 Kotlin 只切換一次線程。這樣,即使網(wǎng)絡(luò)庫(kù)多次使用 withContext(),它也會(huì)留在同一調(diào)度程序上,并避免切換線程。此外,Kotlin 還優(yōu)化了 Dispatchers.DefaultDispatchers.IO 之間的切換,以盡可能避免線程切換

利用一個(gè)使用線程池的調(diào)度程序(例如 Dispatchers.IODispatchers.Default)不能保證代碼塊一直在同一線程上從上到下執(zhí)行。在某些情況下,Kotlin 協(xié)程在 suspendresume 后可能會(huì)將執(zhí)行工作移交給另一個(gè)線程。這意味著,對(duì)于整個(gè) withContext() 塊,線程局部變量可能并不指向同一個(gè)值

4、CoroutineName

CoroutineName 用于為協(xié)程指定一個(gè)名字,方便調(diào)試和定位問(wèn)題

fun main() = runBlocking<Unit>(CoroutineName("RunBlocking")) {
    log("start")
    launch(CoroutineName("MainCoroutine")) {
        launch(CoroutineName("Coroutine#A")) {
            delay(400)
            log("launch A")
        }
        launch(CoroutineName("Coroutine#B")) {
            delay(300)
            log("launch B")
        }
    }
}
[main @RunBlocking#1] start
[main @Coroutine#B#4] launch B
[main @Coroutine#A#3] launch A

5、CoroutineExceptionHandler

在下文的異常處理會(huì)講到

6、組合上下文元素

有時(shí)我們需要為協(xié)程上下文定義多個(gè)元素,那就可以用 + 運(yùn)算符。例如,我們可以同時(shí)為協(xié)程指定 Dispatcher 和 CoroutineName

fun main() = runBlocking<Unit> {
    launch(Dispatchers.Default + CoroutineName("test")) {
        log("Hello World")
    }
}
[DefaultDispatcher-worker-1 @test#2] Hello World

此外,由于 CoroutineContext 是由一組元素組成的,所以加號(hào)右側(cè)的元素會(huì)覆蓋加號(hào)左側(cè)的元素,進(jìn)而組成新創(chuàng)建的 CoroutineContext。比如,(Dispatchers.Main, "name") + (Dispatchers.IO) = (Dispatchers.IO, "name")

八、取消協(xié)程

如果用戶退出某個(gè)啟動(dòng)了協(xié)程的 Activity/Fragment 的話,那么大部分情況下就應(yīng)該取消所有協(xié)程

job.cancel()就用于取消協(xié)程,job.join()用于阻塞等待協(xié)程運(yùn)行結(jié)束。因?yàn)?cancel() 函數(shù)調(diào)用后會(huì)馬上返回而不是等待協(xié)程結(jié)束后再返回,所以此時(shí)協(xié)程不一定就是已經(jīng)停止運(yùn)行了。如果需要確保協(xié)程結(jié)束運(yùn)行后再執(zhí)行后續(xù)代碼,就需要調(diào)用 join() 方法來(lái)阻塞等待。也可以通過(guò)調(diào)用 Job 的擴(kuò)展函數(shù) cancelAndJoin() 來(lái)完成相同操作,它結(jié)合了 canceljoin兩個(gè)操作

fun main() = runBlocking {
    val job = launch {
        repeat(1000) { i ->
            log("job: I'm sleeping $i ...")
            delay(500L)
        }
    }
    delay(1300L)
    log("main: I'm tired of waiting!")
    job.cancel()
    job.join()
    log("main: Now I can quit.")
}
[main] job: I'm sleeping 0 ...
[main] job: I'm sleeping 1 ...
[main] job: I'm sleeping 2 ...
[main] main: I'm tired of waiting!
[main] main: Now I can quit.

1、協(xié)程可能無(wú)法取消

并不是所有協(xié)程都可以響應(yīng)取消操作,協(xié)程的取消操作是需要協(xié)作(cooperative)完成的,協(xié)程必須協(xié)作才能取消。協(xié)程庫(kù)中的所有掛起函數(shù)都是可取消的,它們?cè)谶\(yùn)行時(shí)會(huì)檢查協(xié)程是否被取消了,并在取消時(shí)拋出 CancellationException 從而結(jié)束整個(gè)任務(wù)。但如果協(xié)程正在執(zhí)行計(jì)算任務(wù)并且未檢查是否已處于取消狀態(tài)的話,就無(wú)法取消協(xié)程

所以即使以下代碼主動(dòng)取消了協(xié)程,協(xié)程也只會(huì)在完成既定循環(huán)后才結(jié)束運(yùn)行,因?yàn)閰f(xié)程沒(méi)有在每次循環(huán)前先進(jìn)行檢查,導(dǎo)致任務(wù)不受取消操作的影響

fun main() = runBlocking {
    val startTime = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
        var nextPrintTime = startTime
        var i = 0
        while (i < 5) {
            if (System.currentTimeMillis() >= nextPrintTime) {
                log("job: I'm sleeping ${i++} ...")
                nextPrintTime += 500L
            }
        }
    }
    delay(1300L)
    log("main: I'm tired of waiting!")
    job.cancelAndJoin()
    log("main: Now I can quit.")
}
[DefaultDispatcher-worker-1] job: I'm sleeping 0 ...
[DefaultDispatcher-worker-1] job: I'm sleeping 1 ...
[DefaultDispatcher-worker-1] job: I'm sleeping 2 ...
[main] main: I'm tired of waiting!
[DefaultDispatcher-worker-1] job: I'm sleeping 3 ...
[DefaultDispatcher-worker-1] job: I'm sleeping 4 ...
[main] main: Now I can quit.

為了實(shí)現(xiàn)取消協(xié)程的目的,就需要為上述代碼加上判斷協(xié)程是否還處于可運(yùn)行狀態(tài)的邏輯,當(dāng)不可運(yùn)行時(shí)就主動(dòng)退出協(xié)程。isActive 是 CoroutineScope 的擴(kuò)展屬性,就用于判斷協(xié)程是否還處于可運(yùn)行狀態(tài)

fun main() = runBlocking {
    val startTime = System.currentTimeMillis()
    val job = launch(Dispatchers.Default) {
        var nextPrintTime = startTime
        var i = 0
        while (i < 5) {
            if (isActive) {
                if (System.currentTimeMillis() >= nextPrintTime) {
                    log("job: I'm sleeping ${i++} ...")
                    nextPrintTime += 500L
                }
            } else {
                return@launch
            }
        }
    }
    delay(1300L)
    log("main: I'm tired of waiting!")
    job.cancelAndJoin()
    log("main: Now I can quit.")
}

取消協(xié)程這個(gè)操作類似于在 Java 中調(diào)用Thread.interrupt()方法來(lái)向線程發(fā)起中斷請(qǐng)求,這兩個(gè)操作都不會(huì)強(qiáng)制停止協(xié)程和線程,外部只是相當(dāng)于發(fā)起一個(gè)停止運(yùn)行的請(qǐng)求,需要依靠協(xié)程和線程響應(yīng)請(qǐng)求后主動(dòng)停止運(yùn)行。Kotlin 和 Java 之所以均沒(méi)有提供一個(gè)可以直接強(qiáng)制停止協(xié)程或線程的方法,是因?yàn)檫@個(gè)操作可能會(huì)帶來(lái)各種意想不到的情況。在停止協(xié)程和線程的時(shí)候,它們可能還持有著某些排他性資源(例如:鎖,數(shù)據(jù)庫(kù)鏈接),如果強(qiáng)制性地停止,它們持有的鎖就會(huì)一直無(wú)法得到釋放,導(dǎo)致其他協(xié)程和線程一直無(wú)法得到目標(biāo)資源,最終可能導(dǎo)致線程死鎖。所以Thread.stop()方法目前也是處于廢棄狀態(tài),Java 官方并沒(méi)有提供可靠的停止線程的方法

2、用 finally 釋放資源

可取消的掛起函數(shù)在取消時(shí)會(huì)拋出 CancellationException,可以依靠try {...} finally {...} 或者 Kotlin 的 use 函數(shù)在取消協(xié)程后釋放持有的資源

fun main() = runBlocking {
    val job = launch {
        try {
            repeat(1000) { i ->
                log("job: I'm sleeping $i ...")
                delay(500L)
            }
        } catch (e: Throwable) {
            log(e.message)
        } finally {
            log("job: I'm running finally")
        }
    }
    delay(1300L)
    log("main: I'm tired of waiting!")
    job.cancelAndJoin()
    log("main: Now I can quit.")
}
[main] job: I'm sleeping 0 ...
[main] job: I'm sleeping 1 ...
[main] job: I'm sleeping 2 ...
[main] main: I'm tired of waiting!
[main] StandaloneCoroutine was cancelled
[main] job: I'm running finally
[main] main: Now I can quit.

3、NonCancellable

如果在上一個(gè)例子中的 finally 塊中再調(diào)用掛起函數(shù)的話,將會(huì)導(dǎo)致拋出 CancellationException,因?yàn)榇藭r(shí)協(xié)程已經(jīng)被取消了。通常我們并不會(huì)遇到這種情況,因?yàn)槌R?jiàn)的資源釋放操作都是非阻塞的,且不涉及任何掛起函數(shù)。但在極少數(shù)情況下我們需要在取消的協(xié)程中再調(diào)用掛起函數(shù),此時(shí)可以使用 withContext 函數(shù)和 NonCancellable上下文將相應(yīng)的代碼包裝在 withContext(NonCancellable) {...} 代碼塊中,NonCancellable 就用于創(chuàng)建一個(gè)無(wú)法取消的協(xié)程作用域

fun main() = runBlocking {
    log("start")
    val launchA = launch {
        try {
            repeat(5) {
                delay(50)
                log("launchA-$it")
            }
        } finally {
            delay(50)
            log("launchA isCompleted")
        }
    }
    val launchB = launch {
        try {
            repeat(5) {
                delay(50)
                log("launchB-$it")
            }
        } finally {
            withContext(NonCancellable) {
                delay(50)
                log("launchB isCompleted")
            }
        }
    }
    //延時(shí)一百毫秒,保證兩個(gè)協(xié)程都已經(jīng)被啟動(dòng)了
    delay(200)
    launchA.cancel()
    launchB.cancel()
    log("end")
}
[main] start
[main] launchA-0
[main] launchB-0
[main] launchA-1
[main] launchB-1
[main] launchA-2
[main] launchB-2
[main] end
[main] launchB isCompleted

4、父協(xié)程和子協(xié)程

當(dāng)一個(gè)協(xié)程在另外一個(gè)協(xié)程的協(xié)程作用域中啟動(dòng)時(shí),它將通過(guò) CoroutineScope.coroutineContext 繼承其上下文,新啟動(dòng)的協(xié)程就被稱為子協(xié)程,子協(xié)程的 Job 將成為父協(xié)程 Job 的子 Job。父協(xié)程總是會(huì)等待其所有子協(xié)程都完成后才結(jié)束自身,所以父協(xié)程不必顯式跟蹤它啟動(dòng)的所有子協(xié)程,也不必使用 Job.join 在末尾等待子協(xié)程完成

所以雖然 parentJob 啟動(dòng)的三個(gè)子協(xié)程的延時(shí)時(shí)間各不相同,但它們最終都會(huì)打印出日志

fun main() = runBlocking {
    val parentJob = launch {
        repeat(3) { i ->
            launch {
                delay((i + 1) * 200L)
                log("Coroutine $i is done")
            }
        }
        log("request: I'm done and I don't explicitly join my children that are still active")
    }
}
[main @coroutine#2] request: I'm done and I don't explicitly join my children that are still active
[main @coroutine#3] Coroutine 0 is done
[main @coroutine#4] Coroutine 1 is done
[main @coroutine#5] Coroutine 2 is done

5、傳播取消操作

一般情況下,協(xié)程的取消操作會(huì)通過(guò)協(xié)程的層次結(jié)構(gòu)來(lái)進(jìn)行傳播。如果取消父協(xié)程或者父協(xié)程拋出異常,那么子協(xié)程都會(huì)被取消。而如果子協(xié)程被取消,則不會(huì)影響同級(jí)協(xié)程和父協(xié)程,但如果子協(xié)程拋出異常則也會(huì)導(dǎo)致同級(jí)協(xié)程和父協(xié)程被取消

對(duì)于以下代碼,子協(xié)程 jon1 被取消并不影響子協(xié)程 jon2 和父協(xié)程繼續(xù)運(yùn)行,但父協(xié)程被取消后子協(xié)程都會(huì)被遞歸取消

fun main() = runBlocking {
    val request = launch {
        val job1 = launch {
            repeat(10) {
                delay(300)
                log("job1: $it")
                if (it == 2) {
                    log("job1 canceled")
                    cancel()
                }
            }
        }
        val job2 = launch {
            repeat(10) {
                delay(300)
                log("job2: $it")
            }
        }
    }
    delay(1600)
    log("parent job canceled")
    request.cancel()
    delay(1000)
}
[main @coroutine#3] job1: 0
[main @coroutine#4] job2: 0
[main @coroutine#3] job1: 1
[main @coroutine#4] job2: 1
[main @coroutine#3] job1: 2
[main @coroutine#3] job1 canceled
[main @coroutine#4] job2: 2
[main @coroutine#4] job2: 3
[main @coroutine#4] job2: 4
[main @coroutine#1] parent job canceled

6、withTimeout

withTimeout 函數(shù)用于指定協(xié)程的運(yùn)行超時(shí)時(shí)間,如果超時(shí)則會(huì)拋出 TimeoutCancellationException,從而令協(xié)程結(jié)束運(yùn)行

fun main() = runBlocking {
    log("start")
    val result = withTimeout(300) {
        repeat(5) {
            delay(100)
        }
        200
    }
    log(result)
    log("end")
}
[main] start
Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 300 ms
    at kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:186)
    at kotlinx.coroutines.TimeoutCoroutine.run(Timeout.kt:156)
    at kotlinx.coroutines.EventLoopImplBase$DelayedRunnableTask.run(EventLoop.common.kt:497)
    at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)
    at kotlinx.coroutines.DefaultExecutor.run(DefaultExecutor.kt:69)
    at java.lang.Thread.run(Thread.java:748)

withTimeout方法拋出的 TimeoutCancellationException 是 CancellationException 的子類,之前我們并未在輸出日志上看到關(guān)于 CancellationException 這類異常的堆棧信息,這是因?yàn)閷?duì)于一個(gè)已取消的協(xié)程來(lái)說(shuō),CancellationException 被認(rèn)為是觸發(fā)協(xié)程結(jié)束的正常原因。但對(duì)于withTimeout方法來(lái)說(shuō),拋出異常是其上報(bào)超時(shí)情況的一種手段,所以該異常不會(huì)被協(xié)程內(nèi)部消化掉

如果不希望因?yàn)楫惓?dǎo)致協(xié)程結(jié)束,可以改用withTimeoutOrNull方法,如果超時(shí)就會(huì)返回 null

九、異常處理

當(dāng)一個(gè)協(xié)程由于異常而運(yùn)行失敗時(shí),它會(huì)傳播這個(gè)異常并傳遞給它的父協(xié)程。接下來(lái),父協(xié)程會(huì)進(jìn)行下面幾步操作:

  • 取消它自己的子級(jí)
  • 取消它自己
  • 將異常傳播并傳遞給它的父級(jí)

異常會(huì)到達(dá)層級(jí)的根部,而且當(dāng)前 CoroutineScope 所啟動(dòng)的所有協(xié)程都會(huì)被取消,但協(xié)程并非都是一發(fā)現(xiàn)異常就執(zhí)行以上流程,launch 和 async 在處理異常方面有著很大的差異

launch 將異常視為未捕獲異常,類似于 Java 的 Thread.uncaughtExceptionHandler,當(dāng)發(fā)現(xiàn)異常時(shí)就會(huì)馬上拋出。async 期望最終是通過(guò)調(diào)用 await 來(lái)獲取結(jié)果 (或者異常),所以默認(rèn)情況下它不會(huì)拋出異常。這意味著如果使用 async 啟動(dòng)新的協(xié)程,它會(huì)靜默地將異常丟棄,直到調(diào)用 async.await() 才會(huì)得到目標(biāo)值或者拋出存在的異常

例如,以下代碼中 launchA 拋出的異常會(huì)先連鎖導(dǎo)致 launchB 也被取消(拋出 JobCancellationException),然后再導(dǎo)致父協(xié)程 BlockingCoroutine 也被取消

fun main() = runBlocking {
    val launchA = launch {
        delay(1000)
        1 / 0
    }
    val launchB = launch {
        try {
            delay(1300)
            log("launchB")
        } catch (e: CancellationException) {
            e.printStackTrace()
        }
    }
    launchA.join()
    launchB.join()
}
kotlinx.coroutines.JobCancellationException: Parent job is Cancelling; job=BlockingCoroutine{Cancelling}@5eb5c224
Caused by: java.lang.ArithmeticException: / by zero
    at coroutines.CoroutinesMainKt$main$1$launchA$1.invokeSuspend(CoroutinesMain.kt:11)
    ···
Exception in thread "main" java.lang.ArithmeticException: / by zero
    at coroutines.CoroutinesMainKt$main$1$launchA$1.invokeSuspend(CoroutinesMain.kt:11)
    ···

1、CoroutineExceptionHandler

如果不想將所有的異常信息都打印到控制臺(tái)上,那么可以使用 CoroutineExceptionHandler 作為協(xié)程的上下文元素之一,在這里進(jìn)行自定義日志記錄或異常處理,它類似于對(duì)線程使用 Thread.uncaughtExceptionHandler。但是,CoroutineExceptionHandler 只會(huì)在預(yù)計(jì)不會(huì)由用戶處理的異常上調(diào)用,因此在 async 中使用它沒(méi)有任何效果,當(dāng) async 內(nèi)部發(fā)生了異常且沒(méi)有捕獲時(shí),那么調(diào)用 async.await() 依然會(huì)導(dǎo)致應(yīng)用崩潰

以下代碼只會(huì)捕獲到 launch 拋出的異常

fun main() = runBlocking {
    val handler = CoroutineExceptionHandler { _, exception ->
        log("Caught $exception")
    }
    val job = GlobalScope.launch(handler) {
        throw AssertionError()
    }
    val deferred = GlobalScope.async(handler) {
        throw ArithmeticException()
    }
    joinAll(job, deferred)
}
[DefaultDispatcher-worker-2] Caught java.lang.AssertionError

2、SupervisorJob

由于異常導(dǎo)致的取消在協(xié)程中是一種雙向關(guān)系,會(huì)在整個(gè)協(xié)程層次結(jié)構(gòu)中傳播,但如果我們需要的是單向取消該怎么實(shí)現(xiàn)呢?

例如,假設(shè)在 Activity 中啟動(dòng)了多個(gè)協(xié)程,如果單個(gè)協(xié)程所代表的子任務(wù)失敗了,此時(shí)并不一定需要連鎖終止整個(gè) Activity 內(nèi)部的所有其它協(xié)程任務(wù),即此時(shí)希望子協(xié)程的異常不會(huì)傳播給同級(jí)協(xié)程和父協(xié)程。而當(dāng) Activity 退出后,父協(xié)程的異常(即 CancellationException)又應(yīng)該連鎖傳播給所有子協(xié)程,終止所有子協(xié)程

可以使用 SupervisorJob 來(lái)實(shí)現(xiàn)上述效果,它類似于常規(guī)的 Job,唯一的區(qū)別就是取消操作只會(huì)向下傳播,一個(gè)子協(xié)程的運(yùn)行失敗不會(huì)影響到其他子協(xié)程

例如,以下示例中 firstChild 拋出的異常不會(huì)導(dǎo)致 secondChild 被取消,但當(dāng) supervisor 被取消時(shí) secondChild 也被同時(shí)取消了

fun main() = runBlocking {
    val supervisor = SupervisorJob()
    with(CoroutineScope(coroutineContext + supervisor)) {
        val firstChild = launch(CoroutineExceptionHandler { _, _ -> }) {
            log("First child is failing")
            throw AssertionError("First child is cancelled")
        }
        val secondChild = launch {
            firstChild.join()
            log("First child is cancelled: ${firstChild.isCancelled}, but second one is still active")
            try {
                delay(Long.MAX_VALUE)
            } finally {
                log("Second child is cancelled because supervisor is cancelled")
            }
        }
        firstChild.join()
        log("Cancelling supervisor")
        //取消所有協(xié)程
        supervisor.cancel()
        secondChild.join()
    }
}
[main] First child is failing
[main] First child is cancelled: true, but second one is still active
[main] Cancelling supervisor
[main] Second child is cancelled because supervisor is cancelled

但是,如果異常沒(méi)有被處理且 CoroutineContext 沒(méi)有包含一個(gè) CoroutineExceptionHandler 的話,異常會(huì)到達(dá)默認(rèn)線程的 ExceptionHandler。在 JVM 中,異常會(huì)被打印在控制臺(tái);而在 Android 中,無(wú)論異常在那個(gè) Dispatcher 中發(fā)生,都會(huì)直接導(dǎo)致應(yīng)用崩潰。所以如果上述例子中移除了 firstChild 包含的 CoroutineExceptionHandler 的話,就會(huì)導(dǎo)致 Android 應(yīng)用崩潰

?? 未被捕獲的異常一定會(huì)被拋出,無(wú)論使用的是哪種 Job

十、Android KTX

Android KTX 是包含在 Android Jetpack 及其他 Android 庫(kù)中的一組 Kotlin 擴(kuò)展程序。KTX 擴(kuò)展程序可以為 Jetpack、Android 平臺(tái)及其他 API 提供簡(jiǎn)潔的慣用 Kotlin 代碼。為此,這些擴(kuò)展程序利用了多種 Kotlin 語(yǔ)言功能,其中就包括了對(duì) Kotlin 協(xié)程的支持

1、ViewModel KTX

ViewModel KTX 庫(kù)提供了一個(gè) viewModelScope,用于在 ViewModel 啟動(dòng)協(xié)程,該作用域的生命周期和 ViewModel 相等,當(dāng) ViewModel 回調(diào)了 onCleared()方法后會(huì)自動(dòng)取消所有當(dāng)前 ViewModel 中的所有協(xié)程

引入依賴:

    dependencies {
        implementation "androidx.lifecycle:lifecycle-viewmodel-ktx:2.2.0"
    }

例如,以下 fetchDocs() 方法內(nèi)就依靠 viewModelScope 啟動(dòng)了一個(gè)協(xié)程,用于在后臺(tái)線程發(fā)起網(wǎng)絡(luò)請(qǐng)求

class MyViewModel : ViewModel() {
    
    fun fetchDocs() {
        viewModelScope.launch {
            val result = get("https://developer.android.com")
            show(result)
        }
    }

    suspend fun get(url: String) = withContext(Dispatchers.IO) { /* ... */ }

}

2、Lifecycle KTX

Lifecycle KTX 為每個(gè) Lifecycle 對(duì)象定義了一個(gè) LifecycleScope,該作用域具有生命周期安全的保障,在此范圍內(nèi)啟動(dòng)的協(xié)程會(huì)在 Lifecycle 被銷毀時(shí)同時(shí)取消,可以使用 lifecycle.coroutineScopelifecycleOwner.lifecycleScope 屬性來(lái)拿到該 CoroutineScope

引入依賴:

    dependencies {
        implementation "androidx.lifecycle:lifecycle-runtime-ktx:2.2.0"
    }

以下示例演示了如何使用 lifecycleOwner.lifecycleScope 異步創(chuàng)建預(yù)計(jì)算文本:

class MyFragment: Fragment() {
    
    override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
        super.onViewCreated(view, savedInstanceState)
        viewLifecycleOwner.lifecycleScope.launch {
            val params = TextViewCompat.getTextMetricsParams(textView)
            val precomputedText = withContext(Dispatchers.Default) {
                PrecomputedTextCompat.create(longTextContent, params)
            }
            TextViewCompat.setPrecomputedText(textView, precomputedText)
        }
    }
    
}

3、LiveData KTX

使用 LiveData 時(shí),你可能需要異步計(jì)算值。例如,你可能需要檢索用戶的偏好設(shè)置并將其傳送給界面。在這些情況下,LiveData KTX 提供了一個(gè) liveData 構(gòu)建器函數(shù),該函數(shù)會(huì)調(diào)用 suspend 函數(shù)并將結(jié)果賦值給 LiveData

引入依賴:

    dependencies {
        implementation "androidx.lifecycle:lifecycle-livedata-ktx:2.2.0"
    }

在以下示例中,loadUser() 是在其他地方聲明的 suspend 函數(shù)。 你可以使用 liveData 構(gòu)建器函數(shù)異步調(diào)用 loadUser(),然后使用 emit() 來(lái)發(fā)出結(jié)果:

val user: LiveData<User> = liveData {
    val data = database.loadUser() // loadUser is a suspend function.
    emit(data)
}

十一、轉(zhuǎn)自 掘金

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容