《Kotlin極簡教程》正式上架:
點(diǎn)擊這里 > 去京東商城購買閱讀
點(diǎn)擊這里 > 去天貓商城購買閱讀
非常感謝您親愛的讀者,大家請(qǐng)多支持!??!有任何問題,歡迎隨時(shí)與我交流~
9.8 掛起函數(shù)的組合執(zhí)行
本節(jié)我們介紹掛起函數(shù)組合的各種方法。
9.8.1 按默認(rèn)順序執(zhí)行
假設(shè)我們有兩個(gè)在別處定義的掛起函數(shù):
suspend fun doJob1(): Int {
println("Doing Job1 ...")
delay(1000L) // 此處模擬我們的工作代碼
println("Job1 Done")
return 10
}
suspend fun doJob2(): Int {
println("Doing Job2 ...")
delay(1000L) // 此處模擬我們的工作代碼
println("Job2 Done")
return 20
}
如果需要依次調(diào)用它們, 我們只需要使用正常的順序調(diào)用, 因?yàn)閰f(xié)同中的代碼 (就像在常規(guī)代碼中一樣) 是默認(rèn)的順序執(zhí)行。下面的示例通過測(cè)量執(zhí)行兩個(gè)掛起函數(shù)所需的總時(shí)間來演示:
fun testSequential() = runBlocking<Unit> {
val time = measureTimeMillis {
val one = doJob1()
val two = doJob2()
println("[testSequential] 最終結(jié)果: ${one + two}")
}
println("[testSequential] Completed in $time ms")
}
執(zhí)行上面的代碼,我們將得到輸出:
Doing Job1 ...
Job1 Done
Doing Job2 ...
Job2 Done
[testSequential] 最終結(jié)果: 30
[testSequential] Completed in 6023 ms
可以看出,我們的代碼是跟普通的代碼一樣順序執(zhí)行下去。
9.8.2 使用async異步并發(fā)執(zhí)行
上面的例子中,如果在調(diào)用 doJob1 和 doJob2 之間沒有時(shí)序上的依賴關(guān)系, 并且我們希望通過同時(shí)并發(fā)地執(zhí)行這兩個(gè)函數(shù)來更快地得到答案, 那該怎么辦呢?這個(gè)時(shí)候,我們就可以使用async來實(shí)現(xiàn)異步。代碼示例如下:
fun testAsync() = runBlocking<Unit> {
val time = measureTimeMillis {
val one = async(CommonPool) { doJob1() }
val two = async(CommonPool) { doJob2() }
println("最終結(jié)果: ${one.await() + two.await()}")
}
println("Completed in $time ms")
}
如果跟上面同步的代碼一起執(zhí)行對(duì)比,我們可以看到如下輸出:
Doing Job1 ...
Job1 Done
Doing Job2 ...
Job2 Done
[testSequential] 最終結(jié)果: 30
[testSequential] Completed in 6023 ms
Doing Job1 ...
Doing Job2 ...
Job1 Done
Job2 Done
[testAsync] 最終結(jié)果: 30
[testAsync] Completed in 3032 ms
我們可以看出,使用async函數(shù),我們的兩個(gè)Job并發(fā)的執(zhí)行了,并發(fā)花的時(shí)間要比順序的執(zhí)行的要快將近兩倍。因?yàn)?,我們有兩個(gè)任務(wù)在并發(fā)的執(zhí)行。
從概念上講, async跟launch類似, 它啟動(dòng)一個(gè)協(xié)程, 它與其他協(xié)程并發(fā)地執(zhí)行。
不同之處在于, launch返回一個(gè)任務(wù)Job對(duì)象, 不帶任何結(jié)果值;而async返回一個(gè)延遲任務(wù)對(duì)象Deferred,一種輕量級(jí)的非阻塞性future, 它表示后面會(huì)提供結(jié)果。
在上面的示例代碼中,我們使用Deferred調(diào)用 await() 函數(shù)來獲得其最終結(jié)果。另外,延遲任務(wù)Deferred也是Job類型, 它繼承自Job,所以它也有isActive、isCompleted屬性,也有join()、cancel()函數(shù),因此我們也可以在需要時(shí)取消它。Deferred接口定義如下:
public interface Deferred<out T> : Job {
val isCompletedExceptionally: Boolean
val isCancelled: Boolean
public suspend fun await(): T
public fun <R> registerSelectAwait(select: SelectInstance<R>, block: suspend (T) -> R)
public fun getCompleted(): T
@Deprecated(message = "Use `isActive`", replaceWith = ReplaceWith("isActive"))
public val isComputing: Boolean get() = isActive
}
其中,常用的屬性和函數(shù)說明如下:
| 名稱 | 說明 |
|---|---|
| isCompletedExceptionally | 當(dāng)協(xié)程在計(jì)算過程中有異常failed 或被取消,返回true。 這也意味著isActive等于 false ,同時(shí) isCompleted等于 true
|
| isCancelled | 如果當(dāng)前延遲任務(wù)被取消,返回true |
| suspend fun await() | 等待此延遲任務(wù)完成,而不阻塞線程;如果延遲任務(wù)完成, 則返回結(jié)果值或引發(fā)相應(yīng)的異常。 |
延遲任務(wù)對(duì)象Deferred的狀態(tài)與對(duì)應(yīng)的屬性值如下表所示:
| 狀態(tài) | isActive | isCompleted | isCompletedExceptionally | isCancelled |
|---|---|---|---|---|
| New (可選初始狀態(tài)) | false |
false |
false |
false |
| Active (默認(rèn)初始狀態(tài)) | true |
false |
false |
false |
| Resolved (最終狀態(tài)) | false |
true |
false |
false |
| Failed (最終狀態(tài)) | false |
true |
true |
false |
| Cancelled (最終狀態(tài)) | false |
true |
true |
true |
9.9 協(xié)程上下文與調(diào)度器
到這里,我們已經(jīng)看到了下面這些啟動(dòng)協(xié)程的方式:
launch(CommonPool) {...}
async(CommonPool) {...}
run(NonCancellable) {...}
這里的CommonPool 和 NonCancellable 是協(xié)程上下文(coroutine contexts)。本小節(jié)我們簡單介紹一下自定義協(xié)程上下文。
9.9.1 調(diào)度和線程
協(xié)程上下文包括一個(gè)協(xié)程調(diào)度程序, 它可以指定由哪個(gè)線程來執(zhí)行協(xié)程。調(diào)度器可以將協(xié)程的執(zhí)行調(diào)度到一個(gè)線程池,限制在特定的線程中;也可以不作任何限制,讓它無約束地運(yùn)行。請(qǐng)看下面的示例:
fun testDispatchersAndThreads() = runBlocking {
val jobs = arrayListOf<Job>()
jobs += launch(Unconfined) {
// 未作限制 -- 將會(huì)在 main thread 中執(zhí)行
println("Unconfined: I'm working in thread ${Thread.currentThread()}")
}
jobs += launch(context) {
// 父協(xié)程的上下文 : runBlocking coroutine
println("context: I'm working in thread ${Thread.currentThread()}")
}
jobs += launch(CommonPool) {
// 調(diào)度指派給 ForkJoinPool.commonPool
println("CommonPool: I'm working in thread ${Thread.currentThread()}")
}
jobs += launch(newSingleThreadContext("MyOwnThread")) {
// 將會(huì)在這個(gè)協(xié)程自己的新線程中執(zhí)行
println("newSingleThreadContext: I'm working in thread ${Thread.currentThread()}")
}
jobs.forEach { it.join() }
}
運(yùn)行上面的代碼,我們將得到以下輸出 (可能按不同的順序):
Unconfined: I'm working in thread Thread[main,5,main]
CommonPool: I'm working in thread Thread[ForkJoinPool.commonPool-worker-1,5,main]
newSingleThreadContext: I'm working in thread Thread[MyOwnThread,5,main]
context: I'm working in thread Thread[main,5,main]
從上面的結(jié)果,我們可以看出:
使用無限制的Unconfined上下文的協(xié)程運(yùn)行在主線程中;
繼承了 runBlocking {...} 的context的協(xié)程繼續(xù)在主線程中執(zhí)行;
而CommonPool在ForkJoinPool.commonPool中;
我們使用newSingleThreadContext函數(shù)新建的協(xié)程上下文,該協(xié)程運(yùn)行在自己的新線程Thread[MyOwnThread,5,main]中。
另外,我們還可以在使用 runBlocking的時(shí)候顯式指定上下文, 同時(shí)使用 run 函數(shù)來更改協(xié)程的上下文:
fun log(msg: String) = println("${Thread.currentThread()} $msg")
fun testRunBlockingWithSpecifiedContext() = runBlocking {
log("$context")
log("${context[Job]}")
log("開始")
val ctx1 = newSingleThreadContext("線程A")
val ctx2 = newSingleThreadContext("線程B")
runBlocking(ctx1) {
log("Started in Context1")
run(ctx2) {
log("Working in Context2")
}
log("Back to Context1")
}
log("結(jié)束")
}
運(yùn)行輸出:
Thread[main,5,main] [BlockingCoroutine{Active}@b1bc7ed, EventLoopImpl@7cd84586]
Thread[main,5,main] BlockingCoroutine{Active}@b1bc7ed
Thread[main,5,main] 開始
Thread[線程A,5,main] Started in Context1
Thread[線程B,5,main] Working in Context2
Thread[線程A,5,main] Back to Context1
Thread[main,5,main] 結(jié)束
9.9.2 父子協(xié)程
當(dāng)我們使用協(xié)程A的上下文啟動(dòng)另一個(gè)協(xié)程B時(shí), B將成為A的子協(xié)程。當(dāng)父協(xié)程A任務(wù)被取消時(shí), B以及它的所有子協(xié)程都會(huì)被遞歸地取消。代碼示例如下:
fun testChildrenCoroutine()= runBlocking<Unit> {
val request = launch(CommonPool) {
log("ContextA1: ${context}")
val job1 = launch(CommonPool) {
println("job1: 獨(dú)立的協(xié)程上下文!")
delay(1000)
println("job1: 不會(huì)受到request.cancel()的影響")
}
// 繼承父上下文:request的context
val job2 = launch(context) {
log("ContextA2: ${context}")
println("job2: 是request coroutine的子協(xié)程")
delay(1000)
println("job2: 當(dāng)request.cancel(),job2也會(huì)被取消")
}
job1.join()
job2.join()
}
delay(500)
request.cancel()
delay(1000)
println("main: Who has survived request cancellation?")
}
運(yùn)行輸出: