《Kotin 極簡教程》第9章 輕量級(jí)線程:協(xié)程(2)


《Kotlin極簡教程》正式上架:

點(diǎn)擊這里 > 去京東商城購買閱讀

點(diǎn)擊這里 > 去天貓商城購買閱讀

非常感謝您親愛的讀者,大家請(qǐng)多支持!??!有任何問題,歡迎隨時(shí)與我交流~


9.8 掛起函數(shù)的組合執(zhí)行

本節(jié)我們介紹掛起函數(shù)組合的各種方法。

9.8.1 按默認(rèn)順序執(zhí)行

假設(shè)我們有兩個(gè)在別處定義的掛起函數(shù):

    suspend fun doJob1(): Int {
        println("Doing Job1 ...")
        delay(1000L) // 此處模擬我們的工作代碼
        println("Job1 Done")
        return 10
    }

    suspend fun doJob2(): Int {
        println("Doing Job2 ...")
        delay(1000L) // 此處模擬我們的工作代碼
        println("Job2 Done")
        return 20
    }

如果需要依次調(diào)用它們, 我們只需要使用正常的順序調(diào)用, 因?yàn)閰f(xié)同中的代碼 (就像在常規(guī)代碼中一樣) 是默認(rèn)的順序執(zhí)行。下面的示例通過測(cè)量執(zhí)行兩個(gè)掛起函數(shù)所需的總時(shí)間來演示:

    fun testSequential() = runBlocking<Unit> {
        val time = measureTimeMillis {
            val one = doJob1()
            val two = doJob2()
            println("[testSequential] 最終結(jié)果: ${one + two}")
        }
        println("[testSequential] Completed in $time ms")
    }

執(zhí)行上面的代碼,我們將得到輸出:

Doing Job1 ...
Job1 Done
Doing Job2 ...
Job2 Done
[testSequential] 最終結(jié)果: 30
[testSequential] Completed in 6023 ms

可以看出,我們的代碼是跟普通的代碼一樣順序執(zhí)行下去。

9.8.2 使用async異步并發(fā)執(zhí)行

上面的例子中,如果在調(diào)用 doJob1 和 doJob2 之間沒有時(shí)序上的依賴關(guān)系, 并且我們希望通過同時(shí)并發(fā)地執(zhí)行這兩個(gè)函數(shù)來更快地得到答案, 那該怎么辦呢?這個(gè)時(shí)候,我們就可以使用async來實(shí)現(xiàn)異步。代碼示例如下:

    fun testAsync() = runBlocking<Unit> {
        val time = measureTimeMillis {
            val one = async(CommonPool) { doJob1() }
            val two = async(CommonPool) { doJob2() }
            println("最終結(jié)果: ${one.await() + two.await()}")
        }
        println("Completed in $time ms")
    }

如果跟上面同步的代碼一起執(zhí)行對(duì)比,我們可以看到如下輸出:

Doing Job1 ...
Job1 Done
Doing Job2 ...
Job2 Done
[testSequential] 最終結(jié)果: 30
[testSequential] Completed in 6023 ms
Doing Job1 ...
Doing Job2 ...
Job1 Done
Job2 Done
[testAsync] 最終結(jié)果: 30
[testAsync] Completed in 3032 ms

我們可以看出,使用async函數(shù),我們的兩個(gè)Job并發(fā)的執(zhí)行了,并發(fā)花的時(shí)間要比順序的執(zhí)行的要快將近兩倍。因?yàn)?,我們有兩個(gè)任務(wù)在并發(fā)的執(zhí)行。

從概念上講, async跟launch類似, 它啟動(dòng)一個(gè)協(xié)程, 它與其他協(xié)程并發(fā)地執(zhí)行。

不同之處在于, launch返回一個(gè)任務(wù)Job對(duì)象, 不帶任何結(jié)果值;而async返回一個(gè)延遲任務(wù)對(duì)象Deferred,一種輕量級(jí)的非阻塞性future, 它表示后面會(huì)提供結(jié)果。

在上面的示例代碼中,我們使用Deferred調(diào)用 await() 函數(shù)來獲得其最終結(jié)果。另外,延遲任務(wù)Deferred也是Job類型, 它繼承自Job,所以它也有isActive、isCompleted屬性,也有join()、cancel()函數(shù),因此我們也可以在需要時(shí)取消它。Deferred接口定義如下:

public interface Deferred<out T> : Job {
    val isCompletedExceptionally: Boolean
    val isCancelled: Boolean
    public suspend fun await(): T
    public fun <R> registerSelectAwait(select: SelectInstance<R>, block: suspend (T) -> R)
    public fun getCompleted(): T
    @Deprecated(message = "Use `isActive`", replaceWith = ReplaceWith("isActive"))
    public val isComputing: Boolean get() = isActive
}

其中,常用的屬性和函數(shù)說明如下:

名稱 說明
isCompletedExceptionally 當(dāng)協(xié)程在計(jì)算過程中有異常failed 或被取消,返回true。 這也意味著isActive等于 false ,同時(shí) isCompleted等于 true
isCancelled 如果當(dāng)前延遲任務(wù)被取消,返回true
suspend fun await() 等待此延遲任務(wù)完成,而不阻塞線程;如果延遲任務(wù)完成, 則返回結(jié)果值或引發(fā)相應(yīng)的異常。

延遲任務(wù)對(duì)象Deferred的狀態(tài)與對(duì)應(yīng)的屬性值如下表所示:

狀態(tài) isActive isCompleted isCompletedExceptionally isCancelled
New (可選初始狀態(tài)) false false false false
Active (默認(rèn)初始狀態(tài)) true false false false
Resolved (最終狀態(tài)) false true false false
Failed (最終狀態(tài)) false true true false
Cancelled (最終狀態(tài)) false true true true

9.9 協(xié)程上下文與調(diào)度器

到這里,我們已經(jīng)看到了下面這些啟動(dòng)協(xié)程的方式:

launch(CommonPool) {...}
async(CommonPool) {...}
run(NonCancellable) {...}

這里的CommonPool 和 NonCancellable 是協(xié)程上下文(coroutine contexts)。本小節(jié)我們簡單介紹一下自定義協(xié)程上下文。

9.9.1 調(diào)度和線程

協(xié)程上下文包括一個(gè)協(xié)程調(diào)度程序, 它可以指定由哪個(gè)線程來執(zhí)行協(xié)程。調(diào)度器可以將協(xié)程的執(zhí)行調(diào)度到一個(gè)線程池,限制在特定的線程中;也可以不作任何限制,讓它無約束地運(yùn)行。請(qǐng)看下面的示例:

    fun testDispatchersAndThreads() = runBlocking {
        val jobs = arrayListOf<Job>()
        jobs += launch(Unconfined) {
            // 未作限制 -- 將會(huì)在 main thread 中執(zhí)行
            println("Unconfined: I'm working in thread ${Thread.currentThread()}")
        }
        jobs += launch(context) {
            // 父協(xié)程的上下文 : runBlocking coroutine
            println("context: I'm working in thread ${Thread.currentThread()}")
        }
        jobs += launch(CommonPool) {
            // 調(diào)度指派給 ForkJoinPool.commonPool
            println("CommonPool: I'm working in thread ${Thread.currentThread()}")
        }
        jobs += launch(newSingleThreadContext("MyOwnThread")) {
            // 將會(huì)在這個(gè)協(xié)程自己的新線程中執(zhí)行
            println("newSingleThreadContext: I'm working in thread ${Thread.currentThread()}")
        }
        jobs.forEach { it.join() }
    }

運(yùn)行上面的代碼,我們將得到以下輸出 (可能按不同的順序):

Unconfined: I'm working in thread Thread[main,5,main]
CommonPool: I'm working in thread Thread[ForkJoinPool.commonPool-worker-1,5,main]
newSingleThreadContext: I'm working in thread Thread[MyOwnThread,5,main]
context: I'm working in thread Thread[main,5,main]

從上面的結(jié)果,我們可以看出:
使用無限制的Unconfined上下文的協(xié)程運(yùn)行在主線程中;
繼承了 runBlocking {...} 的context的協(xié)程繼續(xù)在主線程中執(zhí)行;
而CommonPool在ForkJoinPool.commonPool中;
我們使用newSingleThreadContext函數(shù)新建的協(xié)程上下文,該協(xié)程運(yùn)行在自己的新線程Thread[MyOwnThread,5,main]中。

另外,我們還可以在使用 runBlocking的時(shí)候顯式指定上下文, 同時(shí)使用 run 函數(shù)來更改協(xié)程的上下文:

    fun log(msg: String) = println("${Thread.currentThread()} $msg")

    fun testRunBlockingWithSpecifiedContext() = runBlocking {
        log("$context")
        log("${context[Job]}")
        log("開始")

        val ctx1 = newSingleThreadContext("線程A")
        val ctx2 = newSingleThreadContext("線程B")
        runBlocking(ctx1) {
            log("Started in Context1")
            run(ctx2) {
                log("Working in Context2")
            }
            log("Back to Context1")
        }
        log("結(jié)束")
    }

運(yùn)行輸出:

Thread[main,5,main] [BlockingCoroutine{Active}@b1bc7ed, EventLoopImpl@7cd84586]
Thread[main,5,main] BlockingCoroutine{Active}@b1bc7ed
Thread[main,5,main] 開始
Thread[線程A,5,main] Started in Context1
Thread[線程B,5,main] Working in Context2
Thread[線程A,5,main] Back to Context1
Thread[main,5,main] 結(jié)束

9.9.2 父子協(xié)程

當(dāng)我們使用協(xié)程A的上下文啟動(dòng)另一個(gè)協(xié)程B時(shí), B將成為A的子協(xié)程。當(dāng)父協(xié)程A任務(wù)被取消時(shí), B以及它的所有子協(xié)程都會(huì)被遞歸地取消。代碼示例如下:

    fun testChildrenCoroutine()= runBlocking<Unit> {
        val request = launch(CommonPool) {
            log("ContextA1: ${context}")

            val job1 = launch(CommonPool) {
                println("job1: 獨(dú)立的協(xié)程上下文!")
                delay(1000)
                println("job1: 不會(huì)受到request.cancel()的影響")
            }
            // 繼承父上下文:request的context
            val job2 = launch(context) {
                log("ContextA2: ${context}")
                println("job2: 是request coroutine的子協(xié)程")
                delay(1000)
                println("job2: 當(dāng)request.cancel(),job2也會(huì)被取消")
            }
            job1.join()
            job2.join()
        }
        delay(500)
        request.cancel()
        delay(1000)
        println("main: Who has survived request cancellation?")
    }

運(yùn)行輸出:

還有 59% 的精彩內(nèi)容
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容