概述
在上一篇博客中,我們學習了如何在后臺執(zhí)行耗時的運算任務。倘若我們想終止運算呢?
這篇文章就是關于取消執(zhí)行任務的。
取消執(zhí)行任務意味著什么呢?
我們想要能夠取消一個已經被 RxJava 或者 Coroutines 創(chuàng)建的運算執(zhí)行。這個運算或同步或異步。
在 Android 開發(fā)中,這是一個很重要的用例,最常見的場景可能是當 View 正被銷毀的時候。在這個場景下,我們可能通常想要取消真正執(zhí)行的任務,比如:網絡請求、一個耗時的對象初始化,等等。
RxJava
正如上一篇博客一樣,我們打算暫且忽略 RxJava 傳輸流的能力。僅僅思考我們如何使用 RxJava 取消執(zhí)行呢?(注:這里其實有點推鍋的意思,大體意思可以理解為,文章里的代碼不是標準的范例,甚至是違背了 RxJava 的編碼特質的,文章的聚焦點主要是對比 RxJava 和 協(xié)程。)
讓我們想像我們使用 間隔操作符 創(chuàng)建一個 定時器 ,代碼如下:
Observable.interval(1, TimeUnit.SECONDS)
當你訂閱這個 observable ,定時器將被觸發(fā),然后它將每秒發(fā)射一個事件到訂閱者。
你如何取消這個定時器呢?
當訂閱定時器(調用 .subscribe() ),它將返回一個 Disposable 對象,代碼如下:
val disposable: Disposable =
Observable.interval(1, TimeUnit.SECONDS).subscribe()
你能調用 Disposable 的 dispose() 來取消執(zhí)行。這個 Observable 將結束發(fā)射,如下:
disposable.dispose()
注意事項
如果你手動的創(chuàng)建 Observable,沒有使用任何創(chuàng)建類運算符(如:interval),那么你不需要處理取消運算。(注:通常自定 Observable 是不推薦的,最好都要使用 RxJava 庫已有運算符創(chuàng)建 Observable,當然大神除外,推薦大家可以通過閱讀文章 關于RxJava的Tips & Tricks 和 Common Mistakes in RxJava )
如果這個 Observable 可以被取消,我們必須在被訂閱的發(fā)射器調用之前,檢查發(fā)射器是否被處理掉了,如下:
Observable.create<Int> { emitter ->
for (i in 1..5) {
if (!emitter.isDisposed) {
emitter.onNext(i)
} else {
break
}
}
emitter.onComplete()
}
上面的代碼可以看到,如果訂閱者被銷毀,我們能跳過這次多余的發(fā)射。根據 Observable.create 的源碼,若我們不跳過,這代碼會繼續(xù)執(zhí)行下去,并且 Observable 會忽略 emitter.onNext(i) 的調用。(注:其實在平時開發(fā)中,取消正在執(zhí)行的 RxJava 異步任務,由于業(yè)務邏輯復雜,通常不止一個 Observable ,更多的還是用到 CompositeDisposable)
協(xié)程
協(xié)程本身就是一個運算實例。取消協(xié)程意味著停止它的掛起 lambda 表達式的執(zhí)行任務。
我們能夠使用 Coroutine Job 來取消執(zhí)行任務,它是 Coroutine Context 的一部分。
Coroutine Job 暴露了一個取消協(xié)程任務的方法。當我們期望取消時,就可以調用 cancel() 方法。
例如,Coroutine Builder launch 就返回一個協(xié)程的 Job 接口實例。
val job = launch(CommonPool) {
// my suspending block
}
...
job.cancel()
如上所示,我們賦值到一個變量中,然后就可以調用取消了。
以上就是從協(xié)程獲取 Job 然后取消它的例子了。那么還能有另一種方式實現嗎?當然,還能通過指定一個 Job 給協(xié)程。這樣可以實現更復雜的業(yè)務邏輯。
一些協(xié)程構造者(如:launch 和 async)能接收一個名為 parent 的參數,你能設置它作為協(xié)程創(chuàng)建的 Job ,代碼如下:
val parentJob = Job()
async(CommonPool, parent = parentJob) {
// my suspending block
}
parentJob.cancel()
上面實現途徑的好處之一就是,你能共享上面的 parentJob 實例給多個協(xié)程,這樣當你調用 parentJob.cancel() 時,你就能取消所有持有 parentJob 的協(xié)程。
這個方式有點類似于 RxJava 的 CompositeDisposable,調用一次就可以銷毀多個訂閱者。
val parentJob = Job()
val deferred1 = async(CommonPool, parent = parentJob) {
// my suspending block
}
val deferred2 = async(CommonPool, parent = parentJob) {
// my suspending block
}
parentJob.cancel() // Cancels both Coroutines
當共享 Job 給不同的協(xié)程時,一定要注意:當你取消一個 Job 后,你必須重新創(chuàng)建它,你不可以再將已取消的 Job 分配給另一個協(xié)程。
有另一種實現方式,那就是通過組合 Coroutine Context。你能使用加號運算符實現它。
val parentJob = Job()
launch(parentJob + CommonPool) {
// my suspending block
}
parentJob.cancel()
在上面的例子,協(xié)程的上下文結果就是由 parentJob 和 CommonPool 組合而成。這個線程化策略被 CommonPool 定義然后 Job 的值來源于 parentJob。
如果你想了解更多關于組合上下文的內容,你可以閱讀 Kotlin 協(xié)程文檔的 這一章節(jié): lifecycle-and-coroutine-parent-child-hierarchy
注意事項
正如 RxJava 一樣,你必須認真考慮在協(xié)程取消這一場景。
val job = launch(CommonPool) {
for (i in 1..5) {
heavyComputation()
}
}
job.cancel()
如果我們嘗試執(zhí)行這段代碼,它將重復執(zhí)行 5 次耗時運算,由于這段代碼并未做好被取消的準備。
我們要如何改進它呢?
如同在 RxJava 中檢查訂閱者是否存在一樣,我們需要檢查協(xié)程是否活躍。
val job = launch(CommonPool) {
for (i in 1..5) {
if (!isActive) { break }
heavyComputation()
}
}
job.cancel()
isActive 是 Job 實例的一個內置變量,它能在協(xié)程內被訪問(coroutineContext 是另一個變量)。
一些掛起函數 (suspending functions) 存在于協(xié)程標準庫中,為我們處理取消協(xié)程。讓我們來看看 delay 這個函數。
val job = launch(CommonPool) {
doSomething()
delay(300) // It’s going to cancel at this point
doSomething()
}
job.cancel()
Delay 是一個掛起函數,它能為我們處理取消任務。然而,如果你使用 Thread.sleep 代替 delay ,由于它是阻塞線程的并且沒有掛起協(xié)程,所以它不會被取消。
val job = launch(CommonPool) {
doSomething()
Thread.sleep(300) // It’s NOT going to cancel execution
doSomething()
}
job.cancel()
Thread.sleep 不會讓我們取消任務。它甚至都不是一個掛起函數! 即使我們調用 job.cancel(),協(xié)程也不會被取消。
在上面的例子中 Thread.sleep 你并不會使用它。如果你非常非常需要,那么在阻塞的前后都要檢查協(xié)程是否活躍,如下所示:
val job = launch(CommonPool) {
doSomething()
if (!isActive) return
Thread.sleep(300) // It’s NOT going to cancel execution
if (!isActive) return
doSomething()
}
job.cancel()