『譯』Coroutines 與 RxJava 異部機制對比之取消執(zhí)行

概述

上一篇博客中,我們學習了如何在后臺執(zhí)行耗時的運算任務。倘若我們想終止運算呢?

這篇文章就是關于取消執(zhí)行任務的。

取消執(zhí)行任務意味著什么呢?

我們想要能夠取消一個已經被 RxJava 或者 Coroutines 創(chuàng)建的運算執(zhí)行。這個運算或同步或異步。

在 Android 開發(fā)中,這是一個很重要的用例,最常見的場景可能是當 View 正被銷毀的時候。在這個場景下,我們可能通常想要取消真正執(zhí)行的任務,比如:網絡請求、一個耗時的對象初始化,等等。

RxJava

正如上一篇博客一樣,我們打算暫且忽略 RxJava 傳輸流的能力。僅僅思考我們如何使用 RxJava 取消執(zhí)行呢?(注:這里其實有點推鍋的意思,大體意思可以理解為,文章里的代碼不是標準的范例,甚至是違背了 RxJava 的編碼特質的,文章的聚焦點主要是對比 RxJava 和 協(xié)程。

讓我們想像我們使用 間隔操作符 創(chuàng)建一個 定時器 ,代碼如下:

Observable.interval(1, TimeUnit.SECONDS)

當你訂閱這個 observable ,定時器將被觸發(fā),然后它將每秒發(fā)射一個事件到訂閱者。

你如何取消這個定時器呢?
當訂閱定時器(調用 .subscribe() ),它將返回一個 Disposable 對象,代碼如下:

val disposable: Disposable = 
    Observable.interval(1, TimeUnit.SECONDS).subscribe()

你能調用 Disposable 的 dispose() 來取消執(zhí)行。這個 Observable 將結束發(fā)射,如下:

disposable.dispose()

注意事項

如果你手動的創(chuàng)建 Observable,沒有使用任何創(chuàng)建類運算符(如:interval),那么你不需要處理取消運算。(注:通常自定 Observable 是不推薦的,最好都要使用 RxJava 庫已有運算符創(chuàng)建 Observable,當然大神除外,推薦大家可以通過閱讀文章 關于RxJava的Tips & TricksCommon Mistakes in RxJava

如果這個 Observable 可以被取消,我們必須在被訂閱的發(fā)射器調用之前,檢查發(fā)射器是否被處理掉了,如下:

Observable.create<Int> { emitter ->
   for (i in 1..5) {
       if (!emitter.isDisposed) {
           emitter.onNext(i)
       } else {
           break
       }
   }
   emitter.onComplete()
}

上面的代碼可以看到,如果訂閱者被銷毀,我們能跳過這次多余的發(fā)射。根據 Observable.create 的源碼,若我們不跳過,這代碼會繼續(xù)執(zhí)行下去,并且 Observable 會忽略 emitter.onNext(i) 的調用。(注:其實在平時開發(fā)中,取消正在執(zhí)行的 RxJava 異步任務,由于業(yè)務邏輯復雜,通常不止一個 Observable ,更多的還是用到 CompositeDisposable

協(xié)程

協(xié)程本身就是一個運算實例。取消協(xié)程意味著停止它的掛起 lambda 表達式的執(zhí)行任務。

我們能夠使用 Coroutine Job 來取消執(zhí)行任務,它是 Coroutine Context 的一部分。

Coroutine Job 暴露了一個取消協(xié)程任務的方法。當我們期望取消時,就可以調用 cancel() 方法。

例如,Coroutine Builder launch 就返回一個協(xié)程的 Job 接口實例。

val job = launch(CommonPool) {
    // my suspending block
}
...
job.cancel()

如上所示,我們賦值到一個變量中,然后就可以調用取消了。

以上就是從協(xié)程獲取 Job 然后取消它的例子了。那么還能有另一種方式實現嗎?當然,還能通過指定一個 Job 給協(xié)程。這樣可以實現更復雜的業(yè)務邏輯。

一些協(xié)程構造者(如:launch 和 async)能接收一個名為 parent 的參數,你能設置它作為協(xié)程創(chuàng)建的 Job ,代碼如下:

val parentJob = Job()
async(CommonPool, parent = parentJob) {
   // my suspending block
}
parentJob.cancel()

上面實現途徑的好處之一就是,你能共享上面的 parentJob 實例給多個協(xié)程,這樣當你調用 parentJob.cancel() 時,你就能取消所有持有 parentJob 的協(xié)程。

這個方式有點類似于 RxJava 的 CompositeDisposable,調用一次就可以銷毀多個訂閱者。

val parentJob = Job()
val deferred1 = async(CommonPool, parent = parentJob) {
    // my suspending block
}
val deferred2 = async(CommonPool, parent = parentJob) {
    // my suspending block
}
parentJob.cancel() // Cancels both Coroutines

當共享 Job 給不同的協(xié)程時,一定要注意:當你取消一個 Job 后,你必須重新創(chuàng)建它,你不可以再將已取消的 Job 分配給另一個協(xié)程。


有另一種實現方式,那就是通過組合 Coroutine Context。你能使用加號運算符實現它。

val parentJob = Job()
launch(parentJob + CommonPool) {
   // my suspending block
}
parentJob.cancel()

在上面的例子,協(xié)程的上下文結果就是由 parentJobCommonPool 組合而成。這個線程化策略被 CommonPool 定義然后 Job 的值來源于 parentJob。

如果你想了解更多關于組合上下文的內容,你可以閱讀 Kotlin 協(xié)程文檔的 這一章節(jié): lifecycle-and-coroutine-parent-child-hierarchy

注意事項

正如 RxJava 一樣,你必須認真考慮在協(xié)程取消這一場景。

val job = launch(CommonPool) {
    for (i in 1..5) {
        heavyComputation()
    }
}
job.cancel()

如果我們嘗試執(zhí)行這段代碼,它將重復執(zhí)行 5 次耗時運算,由于這段代碼并未做好被取消的準備。

我們要如何改進它呢?

如同在 RxJava 中檢查訂閱者是否存在一樣,我們需要檢查協(xié)程是否活躍。

val job = launch(CommonPool) {
    for (i in 1..5) {
        if (!isActive) { break }
        heavyComputation()
    }
}
job.cancel()

isActive 是 Job 實例的一個內置變量,它能在協(xié)程內被訪問(coroutineContext 是另一個變量)。


一些掛起函數 (suspending functions) 存在于協(xié)程標準庫中,為我們處理取消協(xié)程。讓我們來看看 delay 這個函數。

val job = launch(CommonPool) {
    doSomething()
    delay(300) // It’s going to cancel at this point
    doSomething()
}
job.cancel()

Delay 是一個掛起函數,它能為我們處理取消任務。然而,如果你使用 Thread.sleep 代替 delay ,由于它是阻塞線程的并且沒有掛起協(xié)程,所以它不會被取消。

val job = launch(CommonPool) {
   doSomething()
   Thread.sleep(300) // It’s NOT going to cancel execution
   doSomething()
}
job.cancel()

Thread.sleep 不會讓我們取消任務。它甚至都不是一個掛起函數! 即使我們調用 job.cancel(),協(xié)程也不會被取消。

在上面的例子中 Thread.sleep 你并不會使用它。如果你非常非常需要,那么在阻塞的前后都要檢查協(xié)程是否活躍,如下所示:

val job = launch(CommonPool) {
    doSomething()
    if (!isActive) return
    Thread.sleep(300) // It’s NOT going to cancel execution
    if (!isActive) return
    doSomething()
}
job.cancel()
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容