
在上一篇文章中,我們學(xué)習(xí)了Kotlin協(xié)程的基本概念。如果你對(duì)Kotlin協(xié)程仍沒(méi)有概念,請(qǐng)先閱讀它。
上次我僅解釋說(shuō)掛起函數(shù)允許我們掛起和等待直到方法恢復(fù)。我們沒(méi)有深入,因?yàn)榧?xì)節(jié)很多,值得單獨(dú)開(kāi)一篇帖子。所以今天,我們?cè)敿?xì)介紹掛起函數(shù)的用法。
目錄
- 什么是掛起函數(shù)?
- 在掛起函數(shù)中調(diào)用阻塞方法
- 回調(diào)和掛起的可取消協(xié)程(SuspendCancellableCoroutine)
(1) resume(value: T)
(2) resumeWithException(exception: Throwable)
(3) cancellableContinuation.cancel() - 在掛起函數(shù)里調(diào)用RxJava
1. 什么是掛起函數(shù)?
我們可以將掛起函數(shù)當(dāng)作可以暫停并在任務(wù)結(jié)束后恢復(fù)的常規(guī)方法,這意味著我們可以開(kāi)啟一個(gè)耗時(shí)任務(wù)然后等待它完成。這就是為什么我們可以用串行的方式來(lái)寫協(xié)程,而無(wú)須回調(diào)或者RxJava。

掛起函數(shù)只能在協(xié)程中調(diào)用。掛起函數(shù)跟普通函數(shù)的使用一樣,只是它會(huì)掛起當(dāng)前協(xié)程的執(zhí)行。例如,delay()是一個(gè)內(nèi)建的掛起函數(shù)。感謝Android Studio的貼心提醒,我們可以從左側(cè)欄的箭頭icon知道delay()是一個(gè)掛起函數(shù)。當(dāng)我們?cè)趨f(xié)程里調(diào)用delay(1_000)的時(shí)候,它會(huì)中斷1s執(zhí)行,不會(huì)阻塞線程,然后在1s后回到協(xié)程繼續(xù)執(zhí)行doSomething()方法。
掛起函數(shù)咋定義?suspend來(lái)幫忙。只需在常規(guī)方法前加上suspend,阻塞線程的繁重任務(wù)就能變成非阻塞方法嗎?答案是大大的NO。雖然官方文檔提到“通過(guò)調(diào)用其他掛起函數(shù),它會(huì)掛起代碼的執(zhí)行而不會(huì)阻塞當(dāng)前線程?!?,但我們?nèi)孕杩紤]掛起函數(shù)運(yùn)行的Dispatchers(調(diào)度器)。
如果你只是在普通方法前加上supend,IDE會(huì)警告“冗余的supend修飾符。
// IDE warning: "Redundant 'suspend' modifier".
private suspend fun doSomething() {
// do some heavy tasks
}
最簡(jiǎn)單且正確的方式是用withContext()包裹任務(wù),并指定恰當(dāng)?shù)膁ispatchers(調(diào)度器)。例如,如果繁重任務(wù)是計(jì)算密集的,那我們應(yīng)該將它包裹在withContext(Dispatchers.default)里。請(qǐng)見(jiàn)上一篇帖子
private suspend fun doSomething() {
withContext(Dispatchers.Default) {
// do some heavy tasks
}
}
2. 在掛起函數(shù)里調(diào)用阻塞方法
將耗時(shí)任務(wù)放進(jìn)掛起函數(shù)是個(gè)好主意。例如,通過(guò)網(wǎng)絡(luò)任務(wù)獲取用戶數(shù)據(jù)然后更新UI是一件常事。最大的問(wèn)題是網(wǎng)絡(luò)請(qǐng)求這類繁重任務(wù)會(huì)阻塞主線程。為了避免ANR,我們將該任務(wù)放到后臺(tái)線程,接下來(lái)一件煩人的事是不能在后臺(tái)線程更新UI,于是我們使用Activity.runOnUiThread(Runnable)甚至Handler來(lái)實(shí)現(xiàn)這一點(diǎn)。
Umm..對(duì)Android開(kāi)發(fā)者而言,維護(hù)大量這樣的任務(wù)并非易事。幸運(yùn)的是,Kotlin協(xié)程來(lái)了。
MainScope().launch {
val user = fetchUser() // Waits for fetching user in I/O thread.
updateUser(user) // Updates UI in main thread.
}
private suspend fun fetchUser(): User = withContext(Dispatchers.IO) {
fetchUserFromServerBlocking()
}
private fun fetchUserFromServerBlocking(): User {
// Do I/O execution.
}
private fun updateUser(user: User) {
// Updates UI with [User] data.
}
class User
這些代碼片段在數(shù)據(jù)拉取后更新UI。更重要的是,網(wǎng)絡(luò)任務(wù)不會(huì)阻塞主線程,它在工作線程中執(zhí)行,因?yàn)槲覀冇?code>withContext(Dispatchers.IO)切了線程。
3. 回調(diào)和掛起的可取消協(xié)程(SuspendCancellableCoroutine)
假定我們有一個(gè)線上的Android項(xiàng)目。我們使用了大量異步任務(wù)讀取數(shù)據(jù)庫(kù)或者從服務(wù)器拉取數(shù)據(jù)。使用回調(diào)是在主線程處理數(shù)據(jù)的一個(gè)可能的方法?,F(xiàn)在,怎么把回調(diào)任務(wù)轉(zhuǎn)為協(xié)程呢?suspendCancellableCoroutine來(lái)了。
SuspendCancellableCoroutine返回一個(gè)CancellableContinuation對(duì)象供我們resume、resumeWithException,以及在協(xié)程取消時(shí)拋出CancellationException異常。(有一個(gè)類似的方法叫suspendCoroutine,它倆的區(qū)別是suspendCoroutine不能被Job.cancel()取消)
CancellableContinuation
我們可以在suspendCancellableCoroutine里執(zhí)行一個(gè)代碼塊,它具有一個(gè)CancellableContinuation參數(shù)。CancellableContinuation有3種用法:
(1) resume(value: T):
恢復(fù)相應(yīng)協(xié)程的執(zhí)行,傳遞 [value]作為掛起點(diǎn)的返回值。
MainScope().launch {
try {
val user = fetchUser()
updateUser(user)
} catch (exception: Exception) {
// Use try-catch or CoroutinesExceptionHandler to handle exceptions.
}
}
// Fetches the user data from server.
private suspend fun fetchUser(): User = suspendCancellableCoroutine {
cancellableContinuation ->
fetchUserFromNetwork(object : Callback {
override fun onSuccess(user: User) {
// Invokes this line since the callback onSuccess() is invoked.
cancellableContinuation.resume(user)
}
override fun onFailure(exception: Exception) {
cancellableContinuation.resumeWithException(exception)
}
})
}
private fun fetchUserFromNetwork(callback: Callback) {
Thread {
Thread.sleep(3_000)
// Invokes onSuccess() with user data.
callback.onSuccess(User())
}.start()
}
private fun updateUser(user: User) {
// Updates UI with [User] data.
}
interface Callback {
fun onSuccess(user: User)
fun onFailure(exception: Exception)
}
class User
(2) resumeWithException(exception: Throwable)
恢復(fù)相應(yīng)協(xié)程的執(zhí)行,以便[exeption]在上一個(gè)掛起點(diǎn)后重新拋出。
MainScope().launch {
try {
val user = fetchUser()
updateUser(user)
} catch (exception: Exception) {
// Use try-catch or CoroutinesExceptionHandler to handle exceptions.
Log.d("demo", "$exception") // Prints "java.io.IOException".
}
// If we handle exception in try-catch, we can still do something after it.
doSomething()
}
// Fetches the user data from server.
private suspend fun fetchUser(): User = suspendCancellableCoroutine {
cancellableContinuation ->
fetchUserFromNetwork(object : Callback {
override fun onSuccess(user: User) {
cancellableContinuation.resume(user)
}
override fun onFailure(exception: Exception) {
// Invokes this line since the callback onFailure() is invoked.
cancellableContinuation.resumeWithException(exception)
}
})
}
private fun fetchUserFromNetwork(callback: Callback) {
Thread {
Thread.sleep(3_000)
// Invokes onFailure() callback with "IOException()".
callback.onFailure(IOException())
}.start()
}
private fun updateUser(user: User) {
// Updates UI with [User] data.
}
interface Callback {
fun onSuccess(user: User)
fun onFailure(exception: Exception)
}
class User
在上面的示例代碼中,當(dāng)我們調(diào)用CancellableContinuation.resumeWithException(user)時(shí),fetchUser()就會(huì)拋出[exception]異常。
updateUser(user)不會(huì)被調(diào)用,而try-catch將會(huì)處理該異常。try-catch后面的代碼塊將會(huì)繼續(xù)執(zhí)行。
(3) cancellableContinuation.cancel()
雖然Kotlin沒(méi)有受檢異常,我們?nèi)孕枰趖ry-catch中處理所有的異常。否則,應(yīng)用將會(huì)崩潰。但仍有一個(gè)特殊異常我想在這里分享,那就是CancellationException,它會(huì)在我們調(diào)用cancellableContinuation.cancel()時(shí)拋出。
MainScope().launch {
try {
val user = fetchUser()
updateUser(user)
} catch (exception: Exception) {
// Handles exceptions here.
// Prints "java.util.concurrent.CancellationException: Continuation
// CancellableContinuation(DispatchedContinuation[Main, Continuation at
// com.mutant.coroutinestest.MainActivity$onCreate$1.invokeSuspend
// (MainActivity.kt:22)@5af0f84]){Active}@65c036d was cancelled normally".
Log.d("demo", "$exception")
}
// If we handle exception in try-catch, we can still do something after it.
doSomething()
}
// Fetches the user data from server.
private suspend fun fetchUser(): User = suspendCancellableCoroutine {
cancellableContinuation ->
fetchUserFromNetwork(object : Callback {
override fun onSuccess(user: User) {
cancellableContinuation.resume(user)
}
override fun onFailure(exception: Exception) {
cancellableContinuation.resumeWithException(exception)
}
})
// We call "contiuation.cancel()" to cancel this suspend function.
cancellableContinuation.cancel()
}
private fun fetchUserFromNetwork(callback: Callback) {
Thread {
Thread.sleep(3_000)
callback.onSuccess(User())
}.start()
}
private fun updateUser(user: User) {
// Updates UI with [User] data.
}
interface Callback {
fun onSuccess(user: User)
fun onFailure(exception: Exception)
}
class User
即使我們不處理CancellationException,它也不會(huì)導(dǎo)致崩潰。更多信息,請(qǐng)參考此文。但它隨后的代碼不會(huì)被執(zhí)行。
MainScope().launch {
val user = fetchUser()
updateUser(user)
// If we dont't handle CancellationException, this job would be cancelled.
canNOTBeExecuted()
}
// Fetches the user data from server.
private suspend fun fetchUser(): User = suspendCancellableCoroutine {
cancellableContinuation ->
fetchUserFromNetwork(object : Callback {
override fun onSuccess(user: User) {
cancellableContinuation.resume(user)
}
override fun onFailure(exception: Exception) {
cancellableContinuation.resumeWithException(exception)
}
})
// We call "contiuation.cancel()" to cancel this suspend function.
cancellableContinuation.cancel()
}
private fun fetchUserFromNetwork(callback: Callback) {
Thread {
Thread.sleep(3_000)
callback.onSuccess(User())
}.start()
}
private fun updateUser(user: User) {
// Updates UI with [User] data.
}
interface Callback {
fun onSuccess(user: User)
fun onFailure(exception: Exception)
}
class User
在掛起函數(shù)中調(diào)用RxJava
如果我們的項(xiàng)目中用了RxJava怎么辦?有一個(gè)庫(kù)叫kotlinx-coroutines-rx2,它可以將RxJava轉(zhuǎn)化為協(xié)程。用下列代碼將它導(dǎo)入:
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-rx2:1.3.2"
下列是所有的協(xié)程構(gòu)建器:

例如,如果我們用了RaJava的Single,那么Single.await()幫我們將RxJava轉(zhuǎn)為suspendCancellableCoroutine。

正如上面代碼展示的,await()拓展函數(shù)將成功的情況傳遞給cancellableContinuation.resume(),而將失敗的情況傳遞給cancellableContinuation.resumeWithException()。
讓我們實(shí)現(xiàn)我們的示例代碼:
MainScope().launch {
CoroutineScope(Dispatchers.Main).launch {
val user = fetchUserFromServer().await()
updateUser(user)
}
}
private fun fetchUserFromServer(): Single<User> =
Single.create<User> {
Log.d("demo", "(1) fetchUserFromServer start, ${Thread.currentThread()}")
Thread.sleep(3_000)
it.onSuccess(User())
Log.d("demo", "(2) fetchUserFromServer onSuccess, ${Thread.currentThread()}")
}.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
private fun updateUser(user: User) {
Log.d("demo", "(3) updateUser, ${Thread.currentThread()}")
}
class User
日志將是:
D/demo: (1) fetchUserFromServer start, Thread[RxCachedThreadScheduler-1,5,main]
D/demo: (2) fetchUserFromServer onSuccess, Thread[RxCachedThreadScheduler-1,5,main]
D/demo: (3) updateUser, Thread[main,5,main]
fetchUserFromServer().await()代碼掛起協(xié)程的執(zhí)行,一直等待,直到RxJava返回結(jié)果。
如果RxJava的Single失敗了,并且返回了一個(gè)異常怎么辦呢?
oroutineScope(Dispatchers.Main).launch {
try {
val user = fetchUserFromServer().await()
updateUser(user)
} catch (e: Exception) {
Log.d("demo", "(4) {$e}, ${Thread.currentThread()}")
}
}
private fun fetchUserFromServer(): Single<User> =
Single.create<User> {
Log.d("demo", "(1) fetchUserFromServer start, ${Thread.currentThread()}")
Thread.sleep(3_000)
it.onError(IOException())
Log.d("demo", "(2) fetchUserFromServer onError, ${Thread.currentThread()}")
}.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
private fun updateUser(user: User) {
Log.d("demo", "(3) updateUser, ${Thread.currentThread()}")
}
class User
那么異常將在try-catch中處理。日志如下:
D/demo: (1) fetchUserFromServer start, Thread[RxCachedThreadScheduler-1,5,main]
D/demo: (2) fetchUserFromServer onError, Thread[RxCachedThreadScheduler-1,5,main]
D/demo: (4) {java.io.IOException}, Thread[main,5,main]
對(duì)于RxJava的* Maybe, Observable*,都有相應(yīng)的拓展函數(shù)供我們使用。盡管在你的代碼中嘗試它們。

這就是今天的全部。感謝閱讀。我希望這些文章能幫你更加了解掛起函數(shù),并有助于在你的項(xiàng)目中實(shí)現(xiàn)它。如果你有任何疑問(wèn)或建議,歡迎留言。再見(jiàn)。????