Kotlin-通過async與await實現(xiàn)高效并發(fā)

Kotlin-通過async與await實現(xiàn)高效并發(fā)

image-20210615150418309
/*
    掛起函數(shù)的組合
 */

fun main() = runBlocking {
    val elapsedTime = measureTimeMillis {
        val value1 = intValue1()
        val value2 = intValue2()

        println("$value1 + $value2 = ${value1 + value2}")
    }

    println("total time: $elapsedTime")
}



private suspend fun intValue1(): Int {
    delay(1000)
    return 15
}

private suspend fun intValue2(): Int {
    delay(2000)
    return 20
}

RUN> ????????????

15 + 20 = 35
total time: 3010

Process finished with exit code 0

總耗時3秒多(2+1),說明程序是串行執(zhí)行的

這倆函數(shù)之間是沒有任何依賴關(guān)系的,那如果有一種機制能在執(zhí)行intValue1的同時,也能并行的執(zhí)行intValue2,那最終總的執(zhí)行效率是不是會大幅度提升呢?

使用 async 并發(fā)

image-20210615150802387
/*
    使用async與await實現(xiàn)并發(fā)

    從概念上來說,async就像是launch一樣。它會開啟一個單獨的協(xié)程,這個協(xié)程是個輕量級線程,可以與其他協(xié)程并發(fā)工作。區(qū)別在于,launch
    會返回一個Job,但是Job并不會持有任何結(jié)果值,而async會返回一個Deferred,這是一個輕量級的非阻塞的future,它代表一個promise,可以
    在稍后提供一個結(jié)果值。

    可以通過在一個deferred值上調(diào)用.await()方法來獲取最終的結(jié)果值,Deferred也是個Job,因此可以在需要時對其進行取消
 */

fun main() = runBlocking {
    val elapsedTime = measureTimeMillis {
        val value1 = async { intValue1() }
        val value2 = async { intValue2() }


        val result1 = value1.await()
        val result2 = value2.await()

        println("$result1 + $result2 = ${result1 + result2}")
    }

    println("total time: $elapsedTime")

}

private suspend fun intValue1(): Int {
    delay(2000)
    return 15
}

private suspend fun intValue2(): Int {
    delay(1000)
    return 20
}

RUN> ????????????

15 + 20 = 35
total time: 2021

Process finished with exit code 0

總耗時2秒多,說明程序是并行執(zhí)行的

image-20210615151457111
public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T> {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyDeferredCoroutine(newContext, block) else
        DeferredCoroutine<T>(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}
image-20210615151137938

Deferred就是一個Job

image-20210615151233515

惰性啟動的 async

image-20210615151610474

async是接收三個參數(shù)的,另外兩個參數(shù)有默認值:

public fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T> {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyDeferredCoroutine(newContext, block) else
        DeferredCoroutine<T>(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}
image-20210615151755225

Default -- immediately schedules the coroutine for execution according to its context.

根據(jù)上下文會立即調(diào)用協(xié)程執(zhí)行

也就是上一次咱們做的這個async中的代碼塊是立即會執(zhí)行的,不是延時執(zhí)行的,但是??!在實際場景中可以會有遇到async時不希望立刻就來執(zhí)行,而是這個執(zhí)行的時機由程序員在某個條件下來觸發(fā),此時這個CoroutineStart參數(shù)就需要改變一下不能用默認參數(shù)了,需要用它了:LAZY

/**
     * Starts the coroutine lazily, only when it is needed.
     *
     * See the documentation for the corresponding coroutine builders for details
     * (like [launch][CoroutineScope.launch] and [async][CoroutineScope.async]).
     *
     * If coroutine [Job] is cancelled before it even had a chance to start executing, then it will not start its
     * execution at all, but will complete with an exception.
     */
LAZY,

接下來就是要來啟動延遲的async進行執(zhí)行了

/*
    關(guān)于async的延遲執(zhí)行

    我們可以通過將async方法的start參數(shù)設(shè)置為CoroutineStart.LAZY來實現(xiàn)協(xié)程的延遲執(zhí)行。
    在這種情況下,協(xié)程會在兩種場景下去執(zhí)行:調(diào)用Deferred的await方法,或是調(diào)用Job的start方法。

 */
fun main() = runBlocking {
    val elapsedTime = measureTimeMillis {
        val value1 = async(start = CoroutineStart.LAZY) { intValue1() }
        val value2 = async(start = CoroutineStart.LAZY) { intValue2() }

        println("hello world")
//        Thread.sleep(3000)
        delay(3000L)
        // 嘗試注釋掉如下兩行代碼
        value1.start()
        value2.start()
        val result1 = value1.await()
        val result2 = value2.await()

        println("$result1 + $result2 = ${result1 + result2}")
    }
    println("total time: $elapsedTime")
}


private suspend fun intValue1(): Int {
    delay(2000)
    return 15
}

private suspend fun intValue2(): Int {
    delay(1000)
    return 20
}

RUN> ????????????

hello world
15 + 20 = 35
total time: 5046

delay(3000L)+intValue1(2000) = 5秒,程序是并行執(zhí)行的

image-20210615152334555
fun main() = runBlocking {
    val elapsedTime = measureTimeMillis {
        val value1 = async(start = CoroutineStart.LAZY) { intValue1() }
        val value2 = async(start = CoroutineStart.LAZY) { intValue2() }

        println("hello world")
//        Thread.sleep(3000)
        delay(3000L)
        // 嘗試注釋掉如下兩行代碼
//        value1.start()
//        value2.start()
        val result1 = value1.await()
        val result2 = value2.await()

        println("$result1 + $result2 = ${result1 + result2}")
    }
    println("total time: $elapsedTime")
}


private suspend fun intValue1(): Int {
    delay(2000)
    return 15
}

private suspend fun intValue2(): Int {
    delay(1000)
    return 20
}

RUN> ????????????

hello world
15 + 20 = 35
total time: 6027

Process finished with exit code 0

delay(3000L)+intValue1(2000) +intValue2(1000) 程序串行執(zhí)行

其實根源是:

image-20210615152635899

所以,如果將async變成了lazy了之后需要特別的注意!??!如果使用不當(dāng)?shù)臅r候跟不用async沒任何區(qū)別了,這樣就失去了async的使用意義了。

async ?格的函數(shù)

image-20210615152955681
/*
    異步風(fēng)格的函數(shù)
 */


fun main() {
    val elapsedTime = measureTimeMillis {
        val value1 = intValue1Async()
        val value2 = intValue2Async()

        runBlocking {
            println("the answer is: ${value1.await() + value2.await()}")
        }
    }

    println("total time: $elapsedTime")
}


private suspend fun intValue1(): Int {
    delay(2000)
    return 15
}

private suspend fun intValue2(): Int {
    delay(3000)
    return 20
}

fun intValue1Async() = GlobalScope.async {
    intValue1()
}

fun intValue2Async() = GlobalScope.async {
    intValue2()
}

RUN> ????????????

the answer is: 35
total time: 3161

Process finished with exit code 0
image-20210615153228945

結(jié)構(gòu)化并發(fā)程序開發(fā):

/*
    使用async進行結(jié)構(gòu)化并發(fā)程序開發(fā)
 */

fun main() = runBlocking {
    val elapsedTime = measureTimeMillis {
        println("the answer is: ${intSum()}")
    }

    println("total time: $elapsedTime")
}


private suspend fun intSum(): Int = coroutineScope {
    val value1 = async { intValue1() }
    val value2 = async { intValue2() }

    value1.await() + value2.await()
}


private suspend fun intValue1(): Int {
    delay(2000)
    return 15
}
private suspend fun intValue2(): Int {
    delay(3000)
    return 20
}

RUN> ????????????

the answer is: 35
total time: 3019

Process finished with exit code 0

取消始終通過協(xié)程的層次結(jié)構(gòu)來進行傳遞:

image-20210615153625963
/*
    關(guān)于父子協(xié)程的異常與取消問題

    協(xié)程的取消總是會沿著協(xié)程層次體系向上進行傳播
 */

fun main() = runBlocking<Unit> {
    try {
        failureComputation()
    } finally {
        println("Computation failed")
    }
}


private suspend fun failureComputation(): Int = coroutineScope {
    val value1 = async<Int> {
        try {
            delay(9000000)
            50
        } finally {
            println("value1 was cancelled")
        }
    }

    val value2 = async<Int> {
        Thread.sleep(2000)
        println("value2 throws an exception")

        throw Exception()
    }
    value1.await() + value2.await()
}

RUN> ????????????

value2 throws an exception
value1 was cancelled
Computation failed
Exception in thread "main" java.lang.Exception
  at com.shengsiyuan.coroutines3.HelloKotlin6Kt$failureComputation$2$value2$1.invokeSuspend(HelloKotlin6.kt:37)
  at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
  at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
  at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)
  at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:84)
  at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
  at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
  at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
  at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
  at com.shengsiyuan.coroutines3.HelloKotlin6Kt.main(HelloKotlin6.kt:14)
  at com.shengsiyuan.coroutines3.HelloKotlin6Kt.main(HelloKotlin6.kt)

Process finished with exit code 1
image-20210615154029585

協(xié)程的取消總是會沿著協(xié)程層次體系向上進行傳播。”,以上就是解決在async中如果出異常的一個比較好的解決方案。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容