(譯)Android中的Kotlin協(xié)程-基礎(chǔ)

如果英文較好,建議直接閱讀原文

譯文

什么是協(xié)程

基本上,coroutines是輕量級線程,它使得我們可以用串行的方式寫出異步的、非阻塞的代碼。

Android中如何導(dǎo)入Kotlin協(xié)程

根據(jù)Kotlin Coroutines Github repo,我們需要導(dǎo)入kotlinx-coroutines-core和kotlinx-coroutines-android(類似于RxJava的io.reactivex.rxjava2:rxandroid,該庫支持Android主線程,同時保證未捕獲的異??梢栽趹?yīng)用崩潰前輸出日志)。如果項目里使用了RxJava,可以導(dǎo)入kotlinx-coroutines-rx2來同時使用RxJava和協(xié)程,這個庫幫助將RxJava代碼轉(zhuǎn)為協(xié)程。

添加如下代碼導(dǎo)入

implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.2'
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:1.3.2'
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-rx2:1.3.2"

記得添加最新的Kotlin版本到根build.gradle:

buildscript {
    ext.kotlin_version = '1.3.50'
    repositories {
        jcenter()
        ...
    }
    ...
}

OK,準(zhǔn)備工作已就緒,讓我們開始吧~

內(nèi)容目錄

  1. 掛起函數(shù)(Suspending functions)
  2. 協(xié)程作用域 (Coroutine scope)
    (1) 自定義作用域(CoroutineScope)
    (2) 主作用域(MainScope)
    (3) 全局作用域(GlobalScope)
  3. 協(xié)程上下文(Coroutine context)
    (1) 調(diào)度器(Dispatchers)
    (2) 協(xié)程異常處理器(CoroutineExceptionHandler)
    (3) 任務(wù)(Job)
    — (3.1) 父-子層級(Parent-child hierarchies)
    — (3.2) SupervisorJob v.s. Job
  4. 協(xié)程構(gòu)建器 (Coroutine builder)
    (1) launch
    (2) async
  5. 協(xié)程體(Coroutine body)

協(xié)程基礎(chǔ)

先看看協(xié)程長啥樣:

CoroutineScope(Dispatchers.Main + Job()).launch {
  val user = fetchUser() // A suspending function running in the I/O thread.
  updateUser(user) // Updates UI in the main thread.
}

private suspend fun fetchUser(): User = withContext(Dispatchers.IO) {
  // Fetches the data from server and returns user data.
}

這段代碼在后臺線程拉取服務(wù)器數(shù)據(jù),然后回到主線程更新UI.

1. 掛起函數(shù)(Suspending functions)

掛起函數(shù)是Kotlin協(xié)程中的特殊函數(shù),用關(guān)鍵字suspend定義。掛起函數(shù)可以中斷(suspend)當(dāng)前協(xié)程的執(zhí)行,這意味著它一直等待,直到掛起函數(shù)恢復(fù)(resume)。因為這篇博客關(guān)注協(xié)程的基本概念, Android中的Kotlin協(xié)程-掛起函數(shù)將會討論更多細(xì)節(jié)

我們回過頭來看看上面的代碼,它可以分為4個部分:

suspend functions.png

2. 協(xié)程作用域(Coroutine scope)

為新協(xié)程定義一個作用域。每個協(xié)程構(gòu)建器都是CoroutineScope的拓展,繼承其coroutineContext以自動傳遞上下文對象和取消。

所有的協(xié)程都在協(xié)程作用域里運行,并接受一個CoroutineContext(協(xié)程上下文,后文詳述)作為參數(shù)。有幾個作用域我們可以使用:

(1) CoroutineScope

用自定義的協(xié)程上下文創(chuàng)建作用域。例如,根據(jù)我們的需要,指定線程、父job和異常處理器(the thread, parent job and exception handler):

CoroutineScope(Dispatchers.Main + job + exceptionHandler).launch {
    ...
}

(2) MainScope

為UI組件創(chuàng)建一個主作用域。它使用SupervisorJob(),在主線程運行,這意味著如果它的某個子任務(wù)(child job)失敗了,不會影響其他子任務(wù)。

public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)

(3) GlobalScope

這個作用域不跟任何任務(wù)(job)綁定。它用來啟動頂級協(xié)程,這些協(xié)程可以運行在整個的應(yīng)用生命周期,且永遠不能取消。

3. 協(xié)程上下文(Coroutine context)

協(xié)程總是運行在某個CoroutineContext類型的上下文中。協(xié)程上下文是一系列元素,用來指定線程策略、異常處理器、控制協(xié)程生命周期等??梢杂?a target="_blank">+操作符將這些元素組合起來。

有3種最重要的協(xié)程上下文:調(diào)度器,協(xié)程異常處理器,任務(wù)(Dispatchers,CoroutineExceptionHandler,Job)

(1) 調(diào)度器(Dispatchers)

指定協(xié)程在哪個線程執(zhí)行。協(xié)程可以隨時用withContext()切換線程。

Dispatchers.Default

使用共享的后臺線程緩存池。默認(rèn)情況下,它使用的最大線程數(shù)等于CPU內(nèi)核數(shù),但至少2個。這個線程看起來會像是Thread[DefaultDispatcher-worker-2,5,main].

Dispatchers.IO

跟Dispatchers.Default共享線程,但它數(shù)量受kotlinx.coroutines.io.parallelism限制,默認(rèn)最多是64個線程或CPU內(nèi)核數(shù)(其中的大值)。跟Dispatchers.Default一樣,線程看起來像Thread[DefaultDispatcher-worker-1,5,main].

Dispatchers.Main

等效于主線程。線程看起來像Thread[main,5,main].

Dispatchers.Unconfined

未指定特定線程的協(xié)程分發(fā)器。協(xié)程在當(dāng)前線程執(zhí)行,并讓協(xié)程恢復(fù)到對應(yīng)的suspending function用過的任意線程上。

CoroutineScope(Dispatchers.Unconfined).launch {
    // Writes code here running on Main thread.
    
    delay(1_000)
    // Writes code here running on `kotlinx.coroutines.DefaultExecutor`.
    
    withContext(Dispatchers.IO) { ... }
    // Writes code running on I/O thread.
    
    withContext(Dispatchers.Main) { ... }
    // Writes code running on Main thread.
}

(2) CoroutineExceptionHandler

處理未捕獲的異常。

一般的, 未捕獲異常只會從launch構(gòu)建器創(chuàng)建的協(xié)程中拋出. async構(gòu)建器創(chuàng)建的協(xié)程總是捕獲所有的異常,并在返回的Deferred對象中表示.

例子1:不能通過外層try-catch捕獲IOException()。不能用try-catch包圍整個協(xié)程作用域,否則應(yīng)用還是會崩潰。

try {
  CoroutineScope(Dispatchers.Main).launch {
    doSomething()
  }
} catch (e: IOException) {
  //  無法捕獲IOException()
  Log.d("demo", "try-catch: $e")
}

private suspend fun doSomething() {
  delay(1_000)
  throw IOException()
}

例子2:用CoroutineExceptionHandler捕獲IOException()。除CancellationException外的其他異常,如IOException(),將傳遞給CoroutineExceptionHandler。

// Handles coroutine exception here.
val handler = CoroutineExceptionHandler { _, throwable ->
  Log.d("demo", "handler: $throwable") // Prints "handler: java.io.IOException"
}

CoroutineScope(Dispatchers.Main + handler).launch {
  doSomething()
}

private suspend fun doSomething() {
  delay(1_000)
  throw IOException()
}

例子3:CancellationException()會被忽略。

如果協(xié)程拋出CancellationException,它將會被忽略(因為這是取消運行中的協(xié)程的預(yù)期機制,所以該異常不會傳遞給CoroutineExceptionHandler)(譯注:不會導(dǎo)致崩潰)

// Handles coroutine exception here.
val handler = CoroutineExceptionHandler { _, throwable ->
  // Won't print the log because the exception is "CancellationException()".
  Log.d("demo", "handler: $throwable")
}

CoroutineScope(Dispatchers.Main + handler).launch {
  doSomething()
}

private suspend fun doSomething() {
  delay(1_000)
  throw CancellationException()
}

例子4:用invokeOnCompletion可以獲取所有異常信息。

CancellationException不會傳遞給CoroutineExceptionHandler,但當(dāng)該異常發(fā)生時,如果我們想打印出某些信息,可以使用invokeOnCompletion來獲取。

val job = CoroutineScope(Dispatchers.Main).launch {
  doSomething()
}

job.invokeOnCompletion {
    val error = it ?: return@invokeOnCompletion
    // Prints "invokeOnCompletion: java.util.concurrent.CancellationException".
    Log.d("demo", "invokeOnCompletion: $error")
  }
}

private suspend fun doSomething() {
  delay(1_000)
  throw CancellationException()
}

(3) Job

控制協(xié)程的生命周期。一個協(xié)程有如下狀態(tài):

job狀態(tài).png

查詢job的當(dāng)前狀態(tài)很簡單,用Job.isActive。

狀態(tài)流圖是:

job狀態(tài)流圖.png
  1. 協(xié)程工作時job是active態(tài)的
  2. job發(fā)生異常時將會變成cancelling. 一個job可以隨時用cancel方法取消,這個強制使它立刻變?yōu)閏ancelling態(tài)
  3. 當(dāng)job工作完成時,會變成cancelled態(tài)
  4. 父job會維持在completingcancelling態(tài)直到所有子job完成。注意completing是一種內(nèi)部狀態(tài),對外部來說,completing態(tài)的job仍然是active的。
(3.1) Parent-child hierarchies(父-子層級)

弄明白狀態(tài)后,我門還必須知道父-子層級是如何工作的。假設(shè)我們寫了如下代碼:

val parentJob = Job()
val childJob1 = CoroutineScope(parentJob).launch {
    val childJob2 = launch { ... }
    val childJob3 = launch { ... }
}

則其父子層級會長這樣:

job父子層級.png

我們可以改變父job,像這樣:

val parentJob1 = Job()
val parentJob2 = Job()
val childJob1 = CoroutineScope(parentJob1).launch {
    val childJob2 = launch { ... }
    val childJob3 = launch(parentJob2) { ... }
}

則父子層級會長這樣:

job父子層級2.png

基于以上知識,我們需要知道如下一些重要概念:

  • 父job取消將立即導(dǎo)致所有子job取消

    val parentJob = Job()
    CoroutineScope(Dispatchers.Main + parentJob).launch {
        val childJob = launch {
            delay(5_000)
            
            // This function won't be executed because its parentJob is 
            // already cancelled after 1 sec. 
            canNOTBeExcecuted()
        }
        launch {
            delay(1_000)
            parentJob.cancel() // Cancels parent job after 1 sec.
        }
    }
    
  • 當(dāng)某個子job因為除CancellationException外的異常而失敗或取消時,會立刻導(dǎo)致所有父job和其他子job取消。但如果是CancellationException,則除該job的子job外的其他jobs不會受到影響。

例子1:如果拋出CancellationException,只有childJob1下的job被取消。

val parentJob = Job()
CoroutineScope(Dispatchers.Main + parentJob).launch {
  val childJob1 = launch {
    val childOfChildJob1 = launch {
      delay(2_000)
      // This function won't be executed since childJob1 is cancelled.
      canNOTBeExecuted()
    }
    delay(1_000)
    
    // Cancel childJob1.
    cancel()
  }

  val childJob2 = launch {
    delay(2_000)
    canDoSomethinghHere()
  }

  delay(3_000)
  canDoSomethinghHere()
}

例子2:如果某個子job拋出IOException,則所有關(guān)聯(lián)job都會被取消

val parentJob = Job()
val handler = CoroutineExceptionHandler { _, throwable ->
  Log.d("demo", "handler: $throwable") // Prints "handler: java.io.IOException"
}

CoroutineScope(Dispatchers.Main + parentJob + handler).launch {
  val childJob1 = launch {
    delay(1_000)
    // Throws any exception "other than CancellationException" after 1 sec.
    throw IOException() 
  }

  val childJob2 = launch {
    delay(2_000)
    // The other child job: this function won't be executed.
    canNOTBExecuted()
  }

  delay(3_000)
  // Parent job: this function won't be executed.
  canNOTBExecuted()
}
  • cancelChildren(): 父job可以取消它的所有子job(遞歸到它們的子job)而不取消自己。注意:如果一個job已取消,則它不能再作為父job運行協(xié)程了。

如果我們用Job.cancel(),父job將會變成cancelled(當(dāng)前是Cancelling),當(dāng)其所有子job都cancelled后,父job會成為cancelled態(tài)。

val parentJob = Job()
val childJob = CoroutineScope(Dispatchers.Main + parentJob).launch {
  delay(1_000)
  
  // This function won't be executed because its parent is cancelled.
  canNOTBeExecuted()
}

parentJob.cancel()

// Prints "JobImpl{Cancelling}@199d143", parent job status becomes "cancelling".
// And will be "cancelled" after all the child job is cancelled.
Log.d("demo", "$parentJob")

而如果我們用Job.cancelChildren(),父job將會變?yōu)锳ctive態(tài),我們?nèi)匀豢梢杂盟鼇磉\行其他協(xié)程。

val parentJob = Job()
val childJob = CoroutineScope(Dispatchers.Main + parentJob).launch {
  delay(1_000)
  
  // This function won't be executed because its parent job is cancelled.
  canNOTBeExecuted()
}

// Only children are cancelled, the parent job won't be cancelled.
parentJob.cancelChildren()

// Prints "JobImpl{Active}@199d143", parent job is still active.
Log.d("demo", "$parentJob")

val childJob2 = CoroutineScope(Dispatchers.Main + parentJob).launch {
  delay(1_000)
  
  // Since the parent job is still active, we could use it to run child job 2.
  canDoSomethingHere()
}
(3.2) SupervisorJob v.s. Job

supervisor job的子job可以獨立失敗,而不影響其他子job。

正如前文提到的,如果我們用Job()作為父job,當(dāng)某個子job失敗時將會導(dǎo)致所有子job取消。

val parentJob = Job()
val handler = CoroutineExceptionHandler { _, _ -> }
val scope = CoroutineScope(Dispatchers.Default + parentJob + handler)
val childJob1 = scope.launch {
    delay(1_000)
    // ChildJob1 fails with the IOException().
    throw IOException()
}

val childJob2 = scope.launch {
    delay(2_000)
    // This line won't be executed due to childJob1 failure.
    canNOTBeExecuted()
}

如果我們使用SupervisorJob()作為父job,則其中一個子job取消時不會影響其他子jobs。

val parentJob = SupervisorJob()
val handler = CoroutineExceptionHandler { _, _ -> }
val scope = CoroutineScope(Dispatchers.Default + parentJob + handler)
val childJob1 = scope.launch {
    delay(1_000)
    // ChildJob1 fails with the IOException().
    throw IOException()
}

val childJob2 = scope.launch {
    delay(2_000)
    // Since we use SupervisorJob() as parent job, the failure of
    // childJob1 won't affect other child jobs. This function will be 
    // executed.
    canDoSomethinghHere()
}

4. 協(xié)程構(gòu)建器(Coroutines Builder)

(1) launch

啟動一個新協(xié)程,不會阻塞當(dāng)前線程,返回一個指向當(dāng)前協(xié)程的Job引用。

(2) async and await

async協(xié)程構(gòu)建器是CoroutineScope的拓展方法。它創(chuàng)建一個協(xié)程,并以Deferred實現(xiàn)來返回它的未來結(jié)果,這是一個非阻塞的可取消future——一個帶結(jié)果的Job。

Async協(xié)程搭配await使用:不阻塞當(dāng)前線程的前提下持續(xù)等待結(jié)果,并在可延遲的任務(wù)完成后恢復(fù)(resume),返回結(jié)果,或者如果deferred被取消了,拋出相應(yīng)的異常。

下列代碼展示了兩個suspending functions的串行調(diào)用。在fetchDataFromServerOne()和fetchDataFromServerTwo()中,我們做了一些耗時任務(wù),分別耗時1秒。在launch構(gòu)建器里調(diào)用它們,會發(fā)現(xiàn)最終的耗時是它們的和:2秒。

override fun onCreate(savedInstanceState: Bundle?) {
  ...

  val scope = MainScope()
  scope.launch {
    val time = measureTimeMillis {
      val one = fetchDataFromServerOne()
      val two = fetchDataFromServerTwo()
      Log.d("demo", "The sum is ${one + two}")
    }
    Log.d("demo", "Completed in $time ms")
  }
}

private suspend fun fetchDataFromServerOne(): Int {
  Log.d("demo", "fetchDataFromServerOne()")
  delay(1_000)
  return 1
}
  
private suspend fun fetchDataFromServerTwo(): Int {
  Log.d("demo", "fetchDataFromServerTwo()")
  delay(1_000)
  return 2
}

日志是:

2019-12-09 00:00:34.547 D/demo: fetchDataFromServerOne()
2019-12-09 00:00:35.553 D/demo: fetchDataFromServerTwo()
2019-12-09 00:00:36.555 D/demo: The sum is 3
2019-12-09 00:00:36.555 D/demo: Completed in 2008 ms

耗時是兩個suspending functions延時的和。該協(xié)程在fetchDataFromServerOne()結(jié)束前會中斷(suspend),然后執(zhí)行fetchDataFromServerTwo()。

如果我們想同時運行兩個方法以減少耗時呢?Async閃亮登場!Async和launch很像。它啟動一個可以和其他協(xié)程同時運行的新協(xié)程,返回Deferred引用——一個帶返回值的Job。

public interface Deferred<out T> : Job {
  public suspend fun await(): T
  ...
}

在Deferred上調(diào)用await()獲取結(jié)果,例如:

override fun onCreate(savedInstanceState: Bundle?) {
  ...
  
  val scope = MainScope()
  scope.launch {
    val time = measureTimeMillis {
      val one = async { fetchDataFromServerOne() }
      val two = async { fetchDataFromServerTwo() }
      Log.d("demo", "The sum is ${one.await() + two.await()}")
    }
    
    // Function one and two will run asynchrously,
    // so the time cost will be around 1 sec only. 
    Log.d("demo", "Completed in $time ms")
  }
}

private suspend fun fetchDataFromServerOne(): Int {
  Log.d("demo", "fetchDataFromServerOne()")
  delay(1_000)
  return 1
}

private suspend fun fetchDataFromServerTwo(): Int {
  Log.d("demo", "fetchDataFromServerTwo()")
  Thread.sleep(1_000)
  return 2
}

日志是:

2019-12-08 23:52:01.714 D/demo: fetchDataFromServerOne()
2019-12-08 23:52:01.718 D/demo: fetchDataFromServerTwo()
2019-12-08 23:52:02.722 D/demo: The sum is 3
2019-12-08 23:52:02.722 D/demo: Completed in 1133 ms

5. 協(xié)程體(Coroutine body)

在CoroutineScope中運行的代碼,包括常規(guī)函數(shù)或掛起函數(shù)——掛起函數(shù)在結(jié)束前會中斷協(xié)程,下篇博客將會詳述。

今天就到這里啦。下篇博客將會深入介紹掛起函數(shù)及其用法。 Android中的Kotlin協(xié)程-掛起函數(shù).

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容