(譯)Android中的Kotlin協(xié)程-掛起函數(shù)

原文

掛起函數(shù)原理

在上一篇文章中,我們學(xué)習(xí)了Kotlin協(xié)程的基本概念。如果你對(duì)Kotlin協(xié)程仍沒(méi)有概念,請(qǐng)先閱讀它。

上次我僅解釋說(shuō)掛起函數(shù)允許我們掛起和等待直到方法恢復(fù)。我們沒(méi)有深入,因?yàn)榧?xì)節(jié)很多,值得單獨(dú)開(kāi)一篇帖子。所以今天,我們?cè)敿?xì)介紹掛起函數(shù)的用法。

目錄

  1. 什么是掛起函數(shù)?
  2. 在掛起函數(shù)中調(diào)用阻塞方法
  3. 回調(diào)和掛起的可取消協(xié)程(SuspendCancellableCoroutine)
    (1) resume(value: T)
    (2) resumeWithException(exception: Throwable)
    (3) cancellableContinuation.cancel()
  4. 在掛起函數(shù)里調(diào)用RxJava

1. 什么是掛起函數(shù)?

我們可以將掛起函數(shù)當(dāng)作可以暫停并在任務(wù)結(jié)束后恢復(fù)的常規(guī)方法,這意味著我們可以開(kāi)啟一個(gè)耗時(shí)任務(wù)然后等待它完成。這就是為什么我們可以用串行的方式來(lái)寫協(xié)程,而無(wú)須回調(diào)或者RxJava。


什么是掛起函數(shù).png

掛起函數(shù)只能在協(xié)程中調(diào)用。掛起函數(shù)跟普通函數(shù)的使用一樣,只是它會(huì)掛起當(dāng)前協(xié)程的執(zhí)行。例如,delay()是一個(gè)內(nèi)建的掛起函數(shù)。感謝Android Studio的貼心提醒,我們可以從左側(cè)欄的箭頭icon知道delay()是一個(gè)掛起函數(shù)。當(dāng)我們?cè)趨f(xié)程里調(diào)用delay(1_000)的時(shí)候,它會(huì)中斷1s執(zhí)行,不會(huì)阻塞線程,然后在1s后回到協(xié)程繼續(xù)執(zhí)行doSomething()方法。

掛起函數(shù)咋定義?suspend來(lái)幫忙。只需在常規(guī)方法前加上suspend,阻塞線程的繁重任務(wù)就能變成非阻塞方法嗎?答案是大大的NO。雖然官方文檔提到“通過(guò)調(diào)用其他掛起函數(shù),它會(huì)掛起代碼的執(zhí)行而不會(huì)阻塞當(dāng)前線程?!?,但我們?nèi)孕杩紤]掛起函數(shù)運(yùn)行的Dispatchers(調(diào)度器)。

如果你只是在普通方法前加上supend,IDE會(huì)警告“冗余的supend修飾符。

// IDE warning: "Redundant 'suspend' modifier".
private suspend fun doSomething() {
    // do some heavy tasks
}

最簡(jiǎn)單且正確的方式是用withContext()包裹任務(wù),并指定恰當(dāng)?shù)膁ispatchers(調(diào)度器)。例如,如果繁重任務(wù)是計(jì)算密集的,那我們應(yīng)該將它包裹在withContext(Dispatchers.default)里。請(qǐng)見(jiàn)上一篇帖子

private suspend fun doSomething() {
    withContext(Dispatchers.Default) {
        // do some heavy tasks
    }
}

2. 在掛起函數(shù)里調(diào)用阻塞方法

將耗時(shí)任務(wù)放進(jìn)掛起函數(shù)是個(gè)好主意。例如,通過(guò)網(wǎng)絡(luò)任務(wù)獲取用戶數(shù)據(jù)然后更新UI是一件常事。最大的問(wèn)題是網(wǎng)絡(luò)請(qǐng)求這類繁重任務(wù)會(huì)阻塞主線程。為了避免ANR,我們將該任務(wù)放到后臺(tái)線程,接下來(lái)一件煩人的事是不能在后臺(tái)線程更新UI,于是我們使用Activity.runOnUiThread(Runnable)甚至Handler來(lái)實(shí)現(xiàn)這一點(diǎn)。
Umm..對(duì)Android開(kāi)發(fā)者而言,維護(hù)大量這樣的任務(wù)并非易事。幸運(yùn)的是,Kotlin協(xié)程來(lái)了。

MainScope().launch {
  val user = fetchUser() // Waits for fetching user in I/O thread.
  updateUser(user) // Updates UI in main thread.
}

private suspend fun fetchUser(): User = withContext(Dispatchers.IO) {
  fetchUserFromServerBlocking()
}

private fun fetchUserFromServerBlocking(): User {
  // Do I/O execution.
}

private fun updateUser(user: User) {
  // Updates UI with [User] data.
}

class User

這些代碼片段在數(shù)據(jù)拉取后更新UI。更重要的是,網(wǎng)絡(luò)任務(wù)不會(huì)阻塞主線程,它在工作線程中執(zhí)行,因?yàn)槲覀冇?code>withContext(Dispatchers.IO)切了線程。

3. 回調(diào)和掛起的可取消協(xié)程(SuspendCancellableCoroutine)

假定我們有一個(gè)線上的Android項(xiàng)目。我們使用了大量異步任務(wù)讀取數(shù)據(jù)庫(kù)或者從服務(wù)器拉取數(shù)據(jù)。使用回調(diào)是在主線程處理數(shù)據(jù)的一個(gè)可能的方法?,F(xiàn)在,怎么把回調(diào)任務(wù)轉(zhuǎn)為協(xié)程呢?suspendCancellableCoroutine來(lái)了。

SuspendCancellableCoroutine返回一個(gè)CancellableContinuation對(duì)象供我們resumeresumeWithException,以及在協(xié)程取消時(shí)拋出CancellationException異常。(有一個(gè)類似的方法叫suspendCoroutine,它倆的區(qū)別是suspendCoroutine不能被Job.cancel()取消)

CancellableContinuation

我們可以在suspendCancellableCoroutine里執(zhí)行一個(gè)代碼塊,它具有一個(gè)CancellableContinuation參數(shù)。CancellableContinuation有3種用法:

(1) resume(value: T):

恢復(fù)相應(yīng)協(xié)程的執(zhí)行,傳遞 [value]作為掛起點(diǎn)的返回值。

MainScope().launch {
  try {
    val user = fetchUser()
    updateUser(user)
  } catch (exception: Exception) {
    // Use try-catch or CoroutinesExceptionHandler to handle exceptions.
  }
}

// Fetches the user data from server.
private suspend fun fetchUser(): User = suspendCancellableCoroutine { 
cancellableContinuation ->
  fetchUserFromNetwork(object : Callback {
    override fun onSuccess(user: User) {
      // Invokes this line since the callback onSuccess() is invoked.
      cancellableContinuation.resume(user)
    }

    override fun onFailure(exception: Exception) {
      cancellableContinuation.resumeWithException(exception)
    }
  })
}

private fun fetchUserFromNetwork(callback: Callback) {
  Thread {
    Thread.sleep(3_000)
    
    // Invokes onSuccess() with user data.
    callback.onSuccess(User())
  }.start()
}

private fun updateUser(user: User) {
  // Updates UI with [User] data.
}

interface Callback {
  fun onSuccess(user: User)
  fun onFailure(exception: Exception)
}

class User

(2) resumeWithException(exception: Throwable)
恢復(fù)相應(yīng)協(xié)程的執(zhí)行,以便[exeption]在上一個(gè)掛起點(diǎn)后重新拋出。

MainScope().launch {
  try {
    val user = fetchUser()
    updateUser(user)
  } catch (exception: Exception) {
    // Use try-catch or CoroutinesExceptionHandler to handle exceptions.
    Log.d("demo", "$exception") // Prints "java.io.IOException".
  }
  
  // If we handle exception in try-catch, we can still do something after it.
  doSomething()
}

// Fetches the user data from server.
private suspend fun fetchUser(): User = suspendCancellableCoroutine { 
cancellableContinuation ->
  fetchUserFromNetwork(object : Callback {
    override fun onSuccess(user: User) {
      cancellableContinuation.resume(user)
    }

    override fun onFailure(exception: Exception) {
      // Invokes this line since the callback onFailure() is invoked.
      cancellableContinuation.resumeWithException(exception)
    }
  })
}

private fun fetchUserFromNetwork(callback: Callback) {
  Thread {
    Thread.sleep(3_000)
    
    // Invokes onFailure() callback with "IOException()".
    callback.onFailure(IOException())
  }.start()
}

private fun updateUser(user: User) {
  // Updates UI with [User] data.
}

interface Callback {
  fun onSuccess(user: User)
  fun onFailure(exception: Exception)
}

class User

在上面的示例代碼中,當(dāng)我們調(diào)用CancellableContinuation.resumeWithException(user)時(shí),fetchUser()就會(huì)拋出[exception]異常。
updateUser(user)不會(huì)被調(diào)用,而try-catch將會(huì)處理該異常。try-catch后面的代碼塊將會(huì)繼續(xù)執(zhí)行。
(3) cancellableContinuation.cancel()
雖然Kotlin沒(méi)有受檢異常,我們?nèi)孕枰趖ry-catch中處理所有的異常。否則,應(yīng)用將會(huì)崩潰。但仍有一個(gè)特殊異常我想在這里分享,那就是CancellationException,它會(huì)在我們調(diào)用cancellableContinuation.cancel()時(shí)拋出。

MainScope().launch {
  try {
    val user = fetchUser()
    updateUser(user)
  } catch (exception: Exception) {
    // Handles exceptions here.
    // Prints "java.util.concurrent.CancellationException: Continuation 
    // CancellableContinuation(DispatchedContinuation[Main, Continuation at 
    // com.mutant.coroutinestest.MainActivity$onCreate$1.invokeSuspend
    // (MainActivity.kt:22)@5af0f84]){Active}@65c036d was cancelled normally".
    Log.d("demo", "$exception")
  }
  
  // If we handle exception in try-catch, we can still do something after it.
  doSomething()
}

// Fetches the user data from server.
private suspend fun fetchUser(): User = suspendCancellableCoroutine { 
cancellableContinuation ->
  fetchUserFromNetwork(object : Callback {
    override fun onSuccess(user: User) {
      cancellableContinuation.resume(user)
    }

    override fun onFailure(exception: Exception) {
      cancellableContinuation.resumeWithException(exception)
    }
  })
  
  // We call "contiuation.cancel()" to cancel this suspend function.
  cancellableContinuation.cancel()
}

private fun fetchUserFromNetwork(callback: Callback) {
  Thread {
    Thread.sleep(3_000)
    callback.onSuccess(User())
  }.start()
}

private fun updateUser(user: User) {
  // Updates UI with [User] data.
}

interface Callback {
  fun onSuccess(user: User)
  fun onFailure(exception: Exception)
}

class User

即使我們不處理CancellationException,它也不會(huì)導(dǎo)致崩潰。更多信息,請(qǐng)參考此文。但它隨后的代碼不會(huì)被執(zhí)行。

MainScope().launch {
  val user = fetchUser()
  updateUser(user)
  
  // If we dont't handle CancellationException, this job would be cancelled.
  canNOTBeExecuted()
}

// Fetches the user data from server.
private suspend fun fetchUser(): User = suspendCancellableCoroutine { 
cancellableContinuation ->
  fetchUserFromNetwork(object : Callback {
    override fun onSuccess(user: User) {
      cancellableContinuation.resume(user)
    }

    override fun onFailure(exception: Exception) {
      cancellableContinuation.resumeWithException(exception)
    }
  })
  
  // We call "contiuation.cancel()" to cancel this suspend function.
  cancellableContinuation.cancel()
}

private fun fetchUserFromNetwork(callback: Callback) {
  Thread {
    Thread.sleep(3_000)
    callback.onSuccess(User())
  }.start()
}

private fun updateUser(user: User) {
  // Updates UI with [User] data.
}

interface Callback {
  fun onSuccess(user: User)
  fun onFailure(exception: Exception)
}

class User

在掛起函數(shù)中調(diào)用RxJava

如果我們的項(xiàng)目中用了RxJava怎么辦?有一個(gè)庫(kù)叫kotlinx-coroutines-rx2,它可以將RxJava轉(zhuǎn)化為協(xié)程。用下列代碼將它導(dǎo)入:

implementation "org.jetbrains.kotlinx:kotlinx-coroutines-rx2:1.3.2"

下列是所有的協(xié)程構(gòu)建器:

coroutine builders

例如,如果我們用了RaJava的Single,那么Single.await()幫我們將RxJava轉(zhuǎn)為suspendCancellableCoroutine
RxJava transform.png

正如上面代碼展示的,await()拓展函數(shù)將成功的情況傳遞給cancellableContinuation.resume(),而將失敗的情況傳遞給cancellableContinuation.resumeWithException()。
讓我們實(shí)現(xiàn)我們的示例代碼:

MainScope().launch {
  CoroutineScope(Dispatchers.Main).launch {
    val user = fetchUserFromServer().await()
    updateUser(user)
  }
}

private fun fetchUserFromServer(): Single<User> =
  Single.create<User> {
    Log.d("demo", "(1) fetchUserFromServer start, ${Thread.currentThread()}")
    Thread.sleep(3_000)
    it.onSuccess(User())
    Log.d("demo", "(2) fetchUserFromServer onSuccess, ${Thread.currentThread()}")
  }.subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())

private fun updateUser(user: User) {
  Log.d("demo", "(3) updateUser, ${Thread.currentThread()}")
}

class User

日志將是:

D/demo: (1) fetchUserFromServer start, Thread[RxCachedThreadScheduler-1,5,main]
D/demo: (2) fetchUserFromServer onSuccess, Thread[RxCachedThreadScheduler-1,5,main]
D/demo: (3) updateUser, Thread[main,5,main]

fetchUserFromServer().await()代碼掛起協(xié)程的執(zhí)行,一直等待,直到RxJava返回結(jié)果。
如果RxJava的Single失敗了,并且返回了一個(gè)異常怎么辦呢?

oroutineScope(Dispatchers.Main).launch {
  try {
    val user = fetchUserFromServer().await()
    updateUser(user)
  } catch (e: Exception) {
    Log.d("demo", "(4) {$e}, ${Thread.currentThread()}")
  }
}

private fun fetchUserFromServer(): Single<User> =
  Single.create<User> {
    Log.d("demo", "(1) fetchUserFromServer start, ${Thread.currentThread()}")
    Thread.sleep(3_000)
    it.onError(IOException())
    Log.d("demo", "(2) fetchUserFromServer onError, ${Thread.currentThread()}")
  }.subscribeOn(Schedulers.io())
      .observeOn(AndroidSchedulers.mainThread())

private fun updateUser(user: User) {
  Log.d("demo", "(3) updateUser, ${Thread.currentThread()}")
}

class User

那么異常將在try-catch中處理。日志如下:

D/demo: (1) fetchUserFromServer start, Thread[RxCachedThreadScheduler-1,5,main]
D/demo: (2) fetchUserFromServer onError, Thread[RxCachedThreadScheduler-1,5,main]
D/demo: (4) {java.io.IOException}, Thread[main,5,main]

對(duì)于RxJava的* Maybe, Observable*,都有相應(yīng)的拓展函數(shù)供我們使用。盡管在你的代碼中嘗試它們。

RxJava suspending extensions.png

這就是今天的全部。感謝閱讀。我希望這些文章能幫你更加了解掛起函數(shù),并有助于在你的項(xiàng)目中實(shí)現(xiàn)它。如果你有任何疑問(wèn)或建議,歡迎留言。再見(jiàn)。????

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容