Kotlin Coroutines 協(xié)程其實(shí)挺簡(jiǎn)單
本文收錄于: https://github.com/mengdd/KotlinTutorials
Coroutines概念
Coroutines(協(xié)程), 計(jì)算機(jī)程序組件, 通過(guò)允許任務(wù)掛起和恢復(fù)執(zhí)行, 來(lái)支持非搶占式的多任務(wù). (見(jiàn)Wiki).
協(xié)程主要是為了異步, 非阻塞的代碼. 這個(gè)概念并不是Kotlin特有的, Go, Python等多個(gè)語(yǔ)言中都有支持.
Kotlin Coroutines
Kotlin中用協(xié)程來(lái)做異步和非阻塞任務(wù), 主要優(yōu)點(diǎn)是代碼可讀性好, 不用回調(diào)函數(shù). (用協(xié)程寫(xiě)的異步代碼乍一看很像同步代碼.)
Kotlin對(duì)協(xié)程的支持是在語(yǔ)言級(jí)別的, 在標(biāo)準(zhǔn)庫(kù)中只提供了最低程度的APIs, 然后把很多功能都代理到庫(kù)中.
Kotlin中只加了suspend作為關(guān)鍵字.
async和await不是Kotlin的關(guān)鍵字, 也不是標(biāo)準(zhǔn)庫(kù)的一部分.
比起futures和promises, kotlin中suspending function的概念為異步操作提供了一種更安全和不易出錯(cuò)的抽象.
kotlinx.coroutines是協(xié)程的庫(kù), 為了使用它的核心功能, 項(xiàng)目需要增加kotlinx-coroutines-core的依賴(lài).
Coroutines Basics: 協(xié)程到底是什么?
先上一段官方的demo:
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
fun main() {
GlobalScope.launch { // launch a new coroutine in background and continue
delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
println("World!") // print after delay
}
println("Hello,") // main thread continues while coroutine is delayed
Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
}
這段代碼的輸出:
先打印Hello, 延遲1s之后, 打印World.
對(duì)這段代碼的解釋:
launch開(kāi)始了一個(gè)計(jì)算, 這個(gè)計(jì)算是可掛起的(suspendable), 它在計(jì)算過(guò)程中, 釋放了底層的線(xiàn)程, 當(dāng)協(xié)程執(zhí)行完成, 就會(huì)恢復(fù)(resume).
這種可掛起的計(jì)算就叫做一個(gè)協(xié)程(coroutine). 所以我們可以簡(jiǎn)單地說(shuō)launch開(kāi)始了一個(gè)新的協(xié)程.
注意, 主線(xiàn)程需要等待協(xié)程結(jié)束, 如果注釋掉最后一行的Thread.sleep(2000L), 則只打印Hello, 沒(méi)有World.
協(xié)程和線(xiàn)程的關(guān)系
coroutine(協(xié)程)可以理解為輕量級(jí)的線(xiàn)程. 多個(gè)協(xié)程可以并行運(yùn)行, 互相等待, 互相通信. 協(xié)程和線(xiàn)程的最大區(qū)別就是協(xié)程非常輕量(cheap), 我們可以創(chuàng)建成千上萬(wàn)個(gè)協(xié)程而不必考慮性能.
協(xié)程是運(yùn)行在線(xiàn)程上可以被掛起的運(yùn)算. 可以被掛起, 意味著運(yùn)算可以被暫停, 從線(xiàn)程移除, 存儲(chǔ)在內(nèi)存里. 此時(shí), 線(xiàn)程就可以自由做其他事情. 當(dāng)計(jì)算準(zhǔn)備好繼續(xù)進(jìn)行時(shí), 它會(huì)返回線(xiàn)程(但不一定要是同一個(gè)線(xiàn)程).
默認(rèn)情況下, 協(xié)程運(yùn)行在一個(gè)共享的線(xiàn)程池里, 線(xiàn)程還是存在的, 只是一個(gè)線(xiàn)程可以運(yùn)行多個(gè)協(xié)程, 所以線(xiàn)程沒(méi)必要太多.
調(diào)試
在上面的代碼中加上線(xiàn)程的名字:
fun main() {
GlobalScope.launch {
// launch a new coroutine in background and continue
delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
println("World! + ${Thread.currentThread().name}") // print after delay
}
println("Hello, + ${Thread.currentThread().name}") // main thread continues while coroutine is delayed
Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
}
可以在IDE的Edit Configurations中設(shè)置VM options: -Dkotlinx.coroutines.debug, 運(yùn)行程序, 會(huì)在log中打印出代碼運(yùn)行的協(xié)程信息:
Hello, + main
World! + DefaultDispatcher-worker-1 @coroutine#1
suspend function
上面例子中的delay方法是一個(gè)suspend function.
delay()和Thread.sleep()的區(qū)別是: delay()方法可以在不阻塞線(xiàn)程的情況下延遲協(xié)程. (It doesn't block a thread, but only suspends the coroutine itself). 而Thread.sleep()則阻塞了當(dāng)前線(xiàn)程.
所以, suspend的意思就是協(xié)程作用域被掛起了, 但是當(dāng)前線(xiàn)程中協(xié)程作用域之外的代碼不被阻塞.
如果把GlobalScope.launch替換為thread, delay方法下面會(huì)出現(xiàn)紅線(xiàn)報(bào)錯(cuò):
Suspend functions are only allowed to be called from a coroutine or another suspend function
suspend方法只能在協(xié)程或者另一個(gè)suspend方法中被調(diào)用.
在協(xié)程等待的過(guò)程中, 線(xiàn)程會(huì)返回線(xiàn)程池, 當(dāng)協(xié)程等待結(jié)束, 協(xié)程會(huì)在線(xiàn)程池中一個(gè)空閑的線(xiàn)程上恢復(fù). (The thread is returned to the pool while the coroutine is waiting, and when the waiting is done, the coroutine resumes on a free thread in the pool.)
啟動(dòng)協(xié)程
啟動(dòng)一個(gè)新的協(xié)程, 常用的主要有以下幾種方式:
launchasyncrunBlocking
它們被稱(chēng)為coroutine builders. 不同的庫(kù)可以定義其他更多的構(gòu)建方式.
runBlocking: 連接blocking和non-blocking的世界
runBlocking用來(lái)連接阻塞和非阻塞的世界.
runBlocking可以建立一個(gè)阻塞當(dāng)前線(xiàn)程的協(xié)程. 所以它主要被用來(lái)在main函數(shù)中或者測(cè)試中使用, 作為連接函數(shù).
比如前面的例子可以改寫(xiě)成:
fun main() = runBlocking<Unit> {
// start main coroutine
GlobalScope.launch {
// launch a new coroutine in background and continue
delay(1000L)
println("World! + ${Thread.currentThread().name}")
}
println("Hello, + ${Thread.currentThread().name}") // main coroutine continues here immediately
delay(2000L) // delaying for 2 seconds to keep JVM alive
}
最后不再使用Thread.sleep(), 使用delay()就可以了.
程序輸出:
Hello, + main @coroutine#1
World! + DefaultDispatcher-worker-1 @coroutine#2
launch: 返回Job
上面的例子delay了一段時(shí)間來(lái)等待一個(gè)協(xié)程結(jié)束, 不是一個(gè)好的方法.
launch返回Job, 代表一個(gè)協(xié)程, 我們可以用Job的join()方法來(lái)顯式地等待這個(gè)協(xié)程結(jié)束:
fun main() = runBlocking {
val job = GlobalScope.launch {
// launch a new coroutine and keep a reference to its Job
delay(1000L)
println("World! + ${Thread.currentThread().name}")
}
println("Hello, + ${Thread.currentThread().name}")
job.join() // wait until child coroutine completes
}
輸出結(jié)果和上面是一樣的.
Job還有一個(gè)重要的用途是cancel(), 用于取消不再需要的協(xié)程任務(wù).
async: 從協(xié)程返回值
async開(kāi)啟協(xié)程, 返回Deferred<T>, Deferred<T>是Job的子類(lèi), 有一個(gè)await()函數(shù), 可以返回協(xié)程的結(jié)果.
await()也是suspend函數(shù), 只能在協(xié)程之內(nèi)調(diào)用.
fun main() = runBlocking {
// @coroutine#1
println(Thread.currentThread().name)
val deferred: Deferred<Int> = async {
// @coroutine#2
loadData()
}
println("waiting..." + Thread.currentThread().name)
println(deferred.await()) // suspend @coroutine#1
}
suspend fun loadData(): Int {
println("loading..." + Thread.currentThread().name)
delay(1000L) // suspend @coroutine#2
println("loaded!" + Thread.currentThread().name)
return 42
}
運(yùn)行結(jié)果:
main @coroutine#1
waiting...main @coroutine#1
loading...main @coroutine#2
loaded!main @coroutine#2
42
Context, Dispatcher和Scope
看一下launch方法的聲明:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
...
}
其中有幾個(gè)相關(guān)概念我們要了解一下.
協(xié)程總是在一個(gè)context下運(yùn)行, 類(lèi)型是接口CoroutineContext. 協(xié)程的context是一個(gè)索引集合, 其中包含各種元素, 重要元素就有Job和dispatcher. Job代表了這個(gè)協(xié)程, 那么dispatcher是做什么的呢?
構(gòu)建協(xié)程的coroutine builder: launch, async, 都是CoroutineScope類(lèi)型的擴(kuò)展方法. 查看CoroutineScope接口, 其中含有CoroutineContext的引用. scope是什么? 有什么作用呢?
下面我們就來(lái)回答這些問(wèn)題.
Dispatchers和線(xiàn)程
Context中的CoroutineDispatcher可以指定協(xié)程運(yùn)行在什么線(xiàn)程上. 可以是一個(gè)指定的線(xiàn)程, 線(xiàn)程池, 或者不限.
看一個(gè)例子:
fun main() = runBlocking<Unit> {
launch {
// context of the parent, main runBlocking coroutine
println("main runBlocking : I'm working in thread ${Thread.currentThread().name}")
}
launch(Dispatchers.Unconfined) {
// not confined -- will work with main thread
println("Unconfined : I'm working in thread ${Thread.currentThread().name}")
}
launch(Dispatchers.Default) {
// will get dispatched to DefaultDispatcher
println("Default : I'm working in thread ${Thread.currentThread().name}")
}
launch(newSingleThreadContext("MyOwnThread")) {
// will get its own new thread
println("newSingleThreadContext: I'm working in thread ${Thread.currentThread().name}")
}
}
運(yùn)行后打印出:
Unconfined : I'm working in thread main
Default : I'm working in thread DefaultDispatcher-worker-1
newSingleThreadContext: I'm working in thread MyOwnThread
main runBlocking : I'm working in thread main
API提供了幾種選項(xiàng):
-
Dispatchers.Default代表使用JVM上的共享線(xiàn)程池, 其大小由CPU核數(shù)決定, 不過(guò)即便是單核也有兩個(gè)線(xiàn)程. 通常用來(lái)做CPU密集型工作, 比如排序或復(fù)雜計(jì)算等. -
Dispatchers.Main指定主線(xiàn)程, 用來(lái)做UI更新相關(guān)的事情. (需要添加依賴(lài), 比如kotlinx-coroutines-android.) 如果我們?cè)谥骶€(xiàn)程上啟動(dòng)一個(gè)新的協(xié)程時(shí), 主線(xiàn)程忙碌, 這個(gè)協(xié)程也會(huì)被掛起, 僅當(dāng)線(xiàn)程有空時(shí)會(huì)被恢復(fù)執(zhí)行. -
Dispatchers.IO: 采用on-demand創(chuàng)建的線(xiàn)程池, 用于網(wǎng)絡(luò)或者是讀寫(xiě)文件的工作. -
Dispatchers.Unconfined: 不指定特定線(xiàn)程, 這是一個(gè)特殊的dispatcher.
如果不明確指定dispatcher, 協(xié)程將會(huì)繼承它被啟動(dòng)的那個(gè)scope的context(其中包含了dispatcher).
在實(shí)踐中, 更推薦使用外部scope的dispatcher, 由調(diào)用方?jīng)Q定上下文. 這樣也方便測(cè)試.
newSingleThreadContext創(chuàng)建了一個(gè)線(xiàn)程來(lái)跑協(xié)程, 一個(gè)專(zhuān)注的線(xiàn)程算是一種昂貴的資源, 在實(shí)際的應(yīng)用中需要被釋放或者存儲(chǔ)復(fù)用.
切換線(xiàn)程還可以用withContext, 可以在指定的協(xié)程context下運(yùn)行代碼, 掛起直到它結(jié)束, 返回結(jié)果.
另一種方式是新啟一個(gè)協(xié)程, 然后用join明確地掛起等待.
在Android這種UI應(yīng)用中, 比較常見(jiàn)的做法是, 頂部協(xié)程用CoroutineDispatchers.Main, 當(dāng)需要在別的線(xiàn)程上做一些事情的時(shí)候, 再明確指定一個(gè)不同的dispatcher.
Scope是什么?
當(dāng)launch, async或runBlocking開(kāi)啟新協(xié)程的時(shí)候, 它們自動(dòng)創(chuàng)建相應(yīng)的scope. 所有的這些方法都有一個(gè)帶receiver的lambda參數(shù), 默認(rèn)的receiver類(lèi)型是CoroutineScope.
IDE會(huì)提示this: CoroutineScope:
launch { /* this: CoroutineScope */
}
當(dāng)我們?cè)?code>runBlocking, launch, 或async的大括號(hào)里面再創(chuàng)建一個(gè)新的協(xié)程的時(shí)候, 自動(dòng)就在這個(gè)scope里創(chuàng)建:
fun main() = runBlocking {
/* this: CoroutineScope */
launch { /* ... */ }
// the same as:
this.launch { /* ... */ }
}
因?yàn)?code>launch是一個(gè)擴(kuò)展方法, 所以上面例子中默認(rèn)的receiver是this.
這個(gè)例子中launch所啟動(dòng)的協(xié)程被稱(chēng)作外部協(xié)程(runBlocking啟動(dòng)的協(xié)程)的child. 這種"parent-child"的關(guān)系通過(guò)scope傳遞: child在parent的scope中啟動(dòng).
協(xié)程的父子關(guān)系:
- 當(dāng)一個(gè)協(xié)程在另一個(gè)協(xié)程的scope中被啟動(dòng)時(shí), 自動(dòng)繼承其context, 并且新協(xié)程的Job會(huì)作為父協(xié)程Job的child.
所以, 關(guān)于scope目前有兩個(gè)關(guān)鍵知識(shí)點(diǎn):
- 我們開(kāi)啟一個(gè)協(xié)程的時(shí)候, 總是在一個(gè)
CoroutineScope里. - Scope用來(lái)管理不同協(xié)程之間的父子關(guān)系和結(jié)構(gòu).
協(xié)程的父子關(guān)系有以下兩個(gè)特性:
- 父協(xié)程被取消時(shí), 所有的子協(xié)程都被取消.
- 父協(xié)程永遠(yuǎn)會(huì)等待所有的子協(xié)程結(jié)束.
值得注意的是, 也可以不啟動(dòng)協(xié)程就創(chuàng)建一個(gè)新的scope. 創(chuàng)建scope可以用工廠方法: MainScope()或CoroutineScope().
coroutineScope()方法也可以創(chuàng)建scope. 當(dāng)我們需要以結(jié)構(gòu)化的方式在suspend函數(shù)內(nèi)部啟動(dòng)新的協(xié)程, 我們創(chuàng)建的新的scope, 自動(dòng)成為suspend函數(shù)被調(diào)用的外部scope的child.
所以上面的父子關(guān)系, 可以進(jìn)一步抽象到, 沒(méi)有parent協(xié)程, 由scope來(lái)管理其中所有的子協(xié)程.
(注意: 實(shí)際上scope會(huì)提供默認(rèn)job, cancel操作是由scope中的job支持的.)
Scope在實(shí)際應(yīng)用中解決什么問(wèn)題呢? 如果我們的應(yīng)用中, 有一個(gè)對(duì)象是有自己的生命周期的, 但是這個(gè)對(duì)象又不是協(xié)程, 比如Android應(yīng)用中的Activity, 其中啟動(dòng)了一些協(xié)程來(lái)做異步操作, 更新數(shù)據(jù)等, 當(dāng)Activity被銷(xiāo)毀的時(shí)候需要取消所有的協(xié)程, 來(lái)避免內(nèi)存泄漏. 我們就可以利用CoroutineScope來(lái)做這件事: 創(chuàng)建一個(gè)CoroutineScope對(duì)象和activity的生命周期綁定, 或者讓activity實(shí)現(xiàn)CoroutineScope接口.
所以, scope的主要作用就是記錄所有的協(xié)程, 并且可以取消它們.
A CoroutineScope keeps track of all your coroutines, and it can cancel all of the coroutines started in it.
Structured Concurrency
這種利用scope將協(xié)程結(jié)構(gòu)化組織起來(lái)的機(jī)制, 被稱(chēng)為"structured concurrency".
好處是:
- scope自動(dòng)負(fù)責(zé)子協(xié)程, 子協(xié)程的生命和scope綁定.
- scope可以自動(dòng)取消所有的子協(xié)程.
- scope自動(dòng)等待所有的子協(xié)程結(jié)束. 如果scope和一個(gè)parent協(xié)程綁定, 父協(xié)程會(huì)等待這個(gè)scope中所有的子協(xié)程完成.
通過(guò)這種結(jié)構(gòu)化的并發(fā)模式: 我們可以在創(chuàng)建top級(jí)別的協(xié)程時(shí), 指定主要的context一次, 所有嵌套的協(xié)程會(huì)自動(dòng)繼承這個(gè)context, 只在有需要的時(shí)候進(jìn)行修改即可.
GlobalScope: daemon
GlobalScope啟動(dòng)的協(xié)程都是獨(dú)立的, 它們的生命只受到application的限制. 即GlobalScope啟動(dòng)的協(xié)程沒(méi)有parent, 和它被啟動(dòng)時(shí)所在的外部的scope沒(méi)有關(guān)系.
launch(Dispatchers.Default) { ... }和GlobalScope.launch { ... }用的dispatcher是一樣的.
GlobalScope啟動(dòng)的協(xié)程并不會(huì)保持進(jìn)程活躍. 它們就像daemon threads(守護(hù)線(xiàn)程)一樣, 如果JVM發(fā)現(xiàn)沒(méi)有其他一般的線(xiàn)程, 就會(huì)關(guān)閉.
Key takeaways
- Coroutine協(xié)程機(jī)制: suspend, resume, 簡(jiǎn)化回調(diào)代碼.
- suspend方法.
- 啟動(dòng)協(xié)程的幾種方法.
- Dispatcher指定線(xiàn)程.
- Structured Concurrency: 依靠scope來(lái)架構(gòu)化管理協(xié)程.
參考
- Coroutine Wiki
- 官方文檔 Overview頁(yè)
- 官方文檔 Coroutines Guide
- Asynchronous Programming Techniques
- Your first coroutine with Kotlin
- Introduction to Coroutines and Channels
- Github: Kotlin/kotlinx.coroutines
- Github: Coroutines Guide
- Github: KEEP: Kotlin Coroutines
第三方博客:
- Coroutines on Android (part I): Getting the background
- Async Operations with Kotlin Coroutines — Part 1
- Kotlin Coroutines Tutorial for Android
- Coroutine Context and Scope
歡迎關(guān)注公眾號(hào): 圣騎士Wind
