Kotlin-通過async與await實現(xiàn)高效并發(fā)

/*
掛起函數(shù)的組合
*/
fun main() = runBlocking {
val elapsedTime = measureTimeMillis {
val value1 = intValue1()
val value2 = intValue2()
println("$value1 + $value2 = ${value1 + value2}")
}
println("total time: $elapsedTime")
}
private suspend fun intValue1(): Int {
delay(1000)
return 15
}
private suspend fun intValue2(): Int {
delay(2000)
return 20
}
RUN> ????????????
15 + 20 = 35 total time: 3010 Process finished with exit code 0總耗時3秒多(2+1),說明程序是串行執(zhí)行的
這倆函數(shù)之間是沒有任何依賴關(guān)系的,那如果有一種機制能在執(zhí)行intValue1的同時,也能并行的執(zhí)行intValue2,那最終總的執(zhí)行效率是不是會大幅度提升呢?
使用 async 并發(fā)

/*
使用async與await實現(xiàn)并發(fā)
從概念上來說,async就像是launch一樣。它會開啟一個單獨的協(xié)程,這個協(xié)程是個輕量級線程,可以與其他協(xié)程并發(fā)工作。區(qū)別在于,launch
會返回一個Job,但是Job并不會持有任何結(jié)果值,而async會返回一個Deferred,這是一個輕量級的非阻塞的future,它代表一個promise,可以
在稍后提供一個結(jié)果值。
可以通過在一個deferred值上調(diào)用.await()方法來獲取最終的結(jié)果值,Deferred也是個Job,因此可以在需要時對其進行取消
*/
fun main() = runBlocking {
val elapsedTime = measureTimeMillis {
val value1 = async { intValue1() }
val value2 = async { intValue2() }
val result1 = value1.await()
val result2 = value2.await()
println("$result1 + $result2 = ${result1 + result2}")
}
println("total time: $elapsedTime")
}
private suspend fun intValue1(): Int {
delay(2000)
return 15
}
private suspend fun intValue2(): Int {
delay(1000)
return 20
}
RUN> ????????????
15 + 20 = 35 total time: 2021 Process finished with exit code 0總耗時2秒多,說明程序是并行執(zhí)行的
image-20210615151457111
public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T> {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyDeferredCoroutine(newContext, block) else
DeferredCoroutine<T>(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}

Deferred就是一個Job

惰性啟動的 async

async是接收三個參數(shù)的,另外兩個參數(shù)有默認值:
public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T> {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyDeferredCoroutine(newContext, block) else
DeferredCoroutine<T>(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}

Default -- immediately schedules the coroutine for execution according to its context.
根據(jù)上下文會立即調(diào)用協(xié)程執(zhí)行
也就是上一次咱們做的這個async中的代碼塊是立即會執(zhí)行的,不是延時執(zhí)行的,但是??!在實際場景中可以會有遇到async時不希望立刻就來執(zhí)行,而是這個執(zhí)行的時機由程序員在某個條件下來觸發(fā),此時這個CoroutineStart參數(shù)就需要改變一下不能用默認參數(shù)了,需要用它了:LAZY
/**
* Starts the coroutine lazily, only when it is needed.
*
* See the documentation for the corresponding coroutine builders for details
* (like [launch][CoroutineScope.launch] and [async][CoroutineScope.async]).
*
* If coroutine [Job] is cancelled before it even had a chance to start executing, then it will not start its
* execution at all, but will complete with an exception.
*/
LAZY,
接下來就是要來啟動延遲的async進行執(zhí)行了
/*
關(guān)于async的延遲執(zhí)行
我們可以通過將async方法的start參數(shù)設(shè)置為CoroutineStart.LAZY來實現(xiàn)協(xié)程的延遲執(zhí)行。
在這種情況下,協(xié)程會在兩種場景下去執(zhí)行:調(diào)用Deferred的await方法,或是調(diào)用Job的start方法。
*/
fun main() = runBlocking {
val elapsedTime = measureTimeMillis {
val value1 = async(start = CoroutineStart.LAZY) { intValue1() }
val value2 = async(start = CoroutineStart.LAZY) { intValue2() }
println("hello world")
// Thread.sleep(3000)
delay(3000L)
// 嘗試注釋掉如下兩行代碼
value1.start()
value2.start()
val result1 = value1.await()
val result2 = value2.await()
println("$result1 + $result2 = ${result1 + result2}")
}
println("total time: $elapsedTime")
}
private suspend fun intValue1(): Int {
delay(2000)
return 15
}
private suspend fun intValue2(): Int {
delay(1000)
return 20
}
RUN> ????????????
hello world 15 + 20 = 35 total time: 5046delay(3000L)+intValue1(2000) = 5秒,程序是并行執(zhí)行的

fun main() = runBlocking {
val elapsedTime = measureTimeMillis {
val value1 = async(start = CoroutineStart.LAZY) { intValue1() }
val value2 = async(start = CoroutineStart.LAZY) { intValue2() }
println("hello world")
// Thread.sleep(3000)
delay(3000L)
// 嘗試注釋掉如下兩行代碼
// value1.start()
// value2.start()
val result1 = value1.await()
val result2 = value2.await()
println("$result1 + $result2 = ${result1 + result2}")
}
println("total time: $elapsedTime")
}
private suspend fun intValue1(): Int {
delay(2000)
return 15
}
private suspend fun intValue2(): Int {
delay(1000)
return 20
}
RUN> ????????????
hello world 15 + 20 = 35 total time: 6027 Process finished with exit code 0delay(3000L)+intValue1(2000) +intValue2(1000) 程序串行執(zhí)行
其實根源是:
image-20210615152635899
所以,如果將async變成了lazy了之后需要特別的注意!??!如果使用不當(dāng)?shù)臅r候跟不用async沒任何區(qū)別了,這樣就失去了async的使用意義了。
async ?格的函數(shù)

/*
異步風(fēng)格的函數(shù)
*/
fun main() {
val elapsedTime = measureTimeMillis {
val value1 = intValue1Async()
val value2 = intValue2Async()
runBlocking {
println("the answer is: ${value1.await() + value2.await()}")
}
}
println("total time: $elapsedTime")
}
private suspend fun intValue1(): Int {
delay(2000)
return 15
}
private suspend fun intValue2(): Int {
delay(3000)
return 20
}
fun intValue1Async() = GlobalScope.async {
intValue1()
}
fun intValue2Async() = GlobalScope.async {
intValue2()
}
RUN> ????????????
the answer is: 35 total time: 3161 Process finished with exit code 0image-20210615153228945
結(jié)構(gòu)化并發(fā)程序開發(fā):
/*
使用async進行結(jié)構(gòu)化并發(fā)程序開發(fā)
*/
fun main() = runBlocking {
val elapsedTime = measureTimeMillis {
println("the answer is: ${intSum()}")
}
println("total time: $elapsedTime")
}
private suspend fun intSum(): Int = coroutineScope {
val value1 = async { intValue1() }
val value2 = async { intValue2() }
value1.await() + value2.await()
}
private suspend fun intValue1(): Int {
delay(2000)
return 15
}
private suspend fun intValue2(): Int {
delay(3000)
return 20
}
RUN> ????????????
the answer is: 35 total time: 3019 Process finished with exit code 0
取消始終通過協(xié)程的層次結(jié)構(gòu)來進行傳遞:

/*
關(guān)于父子協(xié)程的異常與取消問題
協(xié)程的取消總是會沿著協(xié)程層次體系向上進行傳播
*/
fun main() = runBlocking<Unit> {
try {
failureComputation()
} finally {
println("Computation failed")
}
}
private suspend fun failureComputation(): Int = coroutineScope {
val value1 = async<Int> {
try {
delay(9000000)
50
} finally {
println("value1 was cancelled")
}
}
val value2 = async<Int> {
Thread.sleep(2000)
println("value2 throws an exception")
throw Exception()
}
value1.await() + value2.await()
}
RUN> ????????????
value2 throws an exception value1 was cancelled Computation failed Exception in thread "main" java.lang.Exception at com.shengsiyuan.coroutines3.HelloKotlin6Kt$failureComputation$2$value2$1.invokeSuspend(HelloKotlin6.kt:37) at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33) at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106) at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274) at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:84) at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59) at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source) at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38) at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source) at com.shengsiyuan.coroutines3.HelloKotlin6Kt.main(HelloKotlin6.kt:14) at com.shengsiyuan.coroutines3.HelloKotlin6Kt.main(HelloKotlin6.kt) Process finished with exit code 1image-20210615154029585
“協(xié)程的取消總是會沿著協(xié)程層次體系向上進行傳播。”,以上就是解決在async中如果出異常的一個比較好的解決方案。



