Come and Meet Kotlin Coroutine
Tags of Kotlin Coroutine
Kotlin協(xié)程可以被理解為一種輕量級(jí)的線程,它具有掛起和恢復(fù)的特點(diǎn),可以將我們從異步編程的回調(diào)陷阱中解放出來
下面我們一一來看給協(xié)程貼上的標(biāo)簽如何理解:
-
掛起和恢復(fù)
- 掛起函數(shù)(suspend function)
協(xié)程最吸引人的特點(diǎn)就在協(xié)程的掛起和恢復(fù)特性上,通過這個(gè)特性我們能夠像編寫同步代碼一樣簡化異步回調(diào)。這種特性在Kotlin語言層面表現(xiàn)為
suspend關(guān)鍵字:// suspend function suspend fun function1() { delay(1000L) println("suspend function1") } // normal function fun function2() { // delay(2000L) not satisfy structural concurrency println("suspend function2") } // type check: val funcVal1: suspend () -> Unit = ::function1 val funcVal2: () -> Unit = ::function2相比普通的函數(shù),
suspend函數(shù)可以理解為一種新的函數(shù)類型。- 協(xié)程構(gòu)建器(Coroutine Builder)
launchasyncrunBlocking是三種常見的協(xié)程構(gòu)建器,我們從函數(shù)簽名上【感性】地認(rèn)識(shí)一下他們的區(qū)別:// launch public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job // aysnc public fun <T> CoroutineScope.async( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T ): Deferred<T> // runBlocking public fun <T> runBlocking( context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T ): T
我們通常可以使用launch啟動(dòng)一個(gè)協(xié)程,他的返回值Job可以用于控制這個(gè)協(xié)程的生命周期, async可以看做是一個(gè)升級(jí)版的launch,他的block的返回值會(huì)被放在Deferred中。Deferred是Job的子類,可以通過await方法獲取返回值:
fun main() = runBlocking {
val job = launch {
println("execute of job")
"execute of job" // launch中的block不考慮返回值,lambda的返回值會(huì)被忽略
}
val deferred = async {
println("execute of deferred")
"result of deferred"
}
println(deferred.await())
Unit
}
/**
execute of job
execute of deferred
result of deferred
**/
launch和async默認(rèn)都是寫了之后立刻啟動(dòng)(這一點(diǎn)非常重要,aysnc并不需要await觸發(fā)執(zhí)行),可以通過調(diào)整CoroutineStart參數(shù)變更啟動(dòng)方式:
fun main() = runBlocking {
val lazyJob = launch(start = CoroutineStart.LAZY) {
println("execute now!")
}
println("before lazy starts")
// 通過delay先讓父協(xié)程掛起,明顯去別處launch沒有立刻執(zhí)行
println("parent sleeps")
delay(1000L)
println("parent wakes up")
lazyJob.start()
Unit
}
/**
before lazy starts
parent sleeps
parent wakes up
execute now!
**/
- 理解掛起和恢復(fù)
下面我分別在兩個(gè)suspend函數(shù)和兩個(gè)由launch發(fā)起的協(xié)程中delay兩秒,請(qǐng)問main函數(shù)執(zhí)行完成分別需要幾秒?
- suspend函數(shù)
fun main() = runBlocking {
getUserInfo()
getFriendList()
}
suspend fun getUserInfo() {
println("getUserInfo: start time: ${System.currentTimeMillis()}")
delay(2000L)
println("getUserInfo: end time: ${System.currentTimeMillis()}")
logX("suspend function1")
}
suspend fun getFriendList() {
println("getFriendList: start time: ${System.currentTimeMillis()}")
delay(2000L)
println("getFriendList end time: ${System.currentTimeMillis()}")
logX("suspend function2")
}
- Launch
fun main() = runBlocking {
launch {
println("launch1: start time: ${System.currentTimeMillis()}")
delay(2000L)
println("launch1: end time: ${System.currentTimeMillis()}")
logX("launch1")
}
launch {
println("launch2: start time: ${System.currentTimeMillis()}")
delay(2000L)
println("launch2: end time: ${System.currentTimeMillis()}")
logX("launch2")
}
Unit
}
答案揭曉時(shí)刻:
suspend函數(shù)需要4秒,launch需要2秒。我們來看看掛起函數(shù)和launch的執(zhí)行模型:

suspend函數(shù)和launch這類的協(xié)程構(gòu)建器是有本質(zhì)上的不同的,suspend函數(shù)在Kotlin編譯器的作用下會(huì)變成一個(gè)自動(dòng)機(jī),而launch這類都不是suspend,他們其實(shí)是將任務(wù)【分發(fā)】到線程池(在JVM平臺(tái)上)上實(shí)現(xiàn)的執(zhí)行。
suspend和協(xié)程構(gòu)建器的結(jié)合之處就在await上:
public suspend fun await(): T
await是一個(gè)掛起函數(shù),后續(xù)的流程會(huì)像上圖以上被掛起,我們來看這個(gè)例子:
fun main() = runBlocking {
val def = async {
println("async starts")
delay(2000L)
println("async end")
"hello world"
}
println("message from main")
println(def.await())
println("end of story")
}
/**
message from main
async starts
async end
hello world // end of story的輸出被掛起到await執(zhí)行完成再恢復(fù)
end of story
**/
suspend函數(shù)到自動(dòng)機(jī)的轉(zhuǎn)換在最后一節(jié)會(huì)說明。Kotlin Coroutine狹義的協(xié)程指的是通過構(gòu)建器啟動(dòng)的協(xié)程,后文不再說明。
-
輕量級(jí)的線程
- 如何理解【輕量級(jí)】
在下面的代碼中我們開啟了很多個(gè)協(xié)程,但是等量的線程會(huì)OOM
fun main() = runBlocking { repeat(1000_000_000) { launch { //常見的協(xié)程 delay(1000000) } } delay(10000L) }- Kotlin Coroutine VS Thread
協(xié)程本身是運(yùn)行在線程池上的:
fun main() = runBlocking {
logX("main ")
val job = launch(Dispatchers.IO) {
logX("launch 1")
}
}
/**
================================
main
Thread:main @coroutine#1
================================
================================
launch 1
Thread:DefaultDispatcher-worker-1 @coroutine#2
================================
**/
Dispatchers就可以指定運(yùn)行的線程池。

Structured Concurrency
結(jié)構(gòu)化并發(fā)的思想貫穿于Kotlin coroutine的始終,我通過一句話概述:控制協(xié)程執(zhí)行的范圍。這個(gè)范圍使用CoroutineScope實(shí)現(xiàn)。因?yàn)樯厦娴拇a都運(yùn)行在runBlocking中,傳入?yún)?shù)的時(shí)候直接將block設(shè)置為CoroutineScope的擴(kuò)展lambda,所以不需要再指定scope:
// runBlocking
public fun <T> runBlocking(
context: CoroutineContext = EmptyCoroutineContext,
block: suspend CoroutineScope.() -> T
): T
包括suspend函數(shù)也需要運(yùn)行在scope中,否則就會(huì)在編譯期報(bào)錯(cuò)。
Suspend Function : A CPS Transformation
Kotlin編譯器會(huì)對(duì)掛起函數(shù)進(jìn)行轉(zhuǎn)換,如圖所示:

這種轉(zhuǎn)換在Kotlin中被稱為CPS(continuation-passing-style)轉(zhuǎn)換,Continuation可以理解為是存儲(chǔ)了中間過程的Callback。下面我們具體看一個(gè)例子:
要注意什么?
編譯后新增加的匿名內(nèi)部類:
TestContinuation看【掛起】和【恢復(fù)】的邏輯:
invokeSuspend
下面代碼將編譯前的掛起函數(shù)和編譯后的掛起函數(shù)進(jìn)行了一個(gè)比較,在編譯后的testCoroutine中增加了一個(gè)新的匿名內(nèi)部類,TestContinuation,其中記錄了獲取的結(jié)果的信息,同時(shí)注意看invokeSuspend方法,這個(gè)方法有點(diǎn)像遞歸,最后還會(huì)調(diào)用到自身,但是會(huì)走不同的狀態(tài)機(jī)的分支邏輯:
// 編譯前的代碼
suspend fun testCoroutine() {
log("start")
val user = getUserInfo()
log(user)
val friendList = getFriendList(user)
log(friendList)
val feedList = getFeedList(friendList)
log(feedList)
}
// 編譯后的代碼
fun testCoroutine(completion: Continuation<Any?>): Any? {
// TestContinuation本質(zhì)上是匿名內(nèi)部類
class TestContinuation(completion: Continuation<Any?>?) : ContinuationImpl(completion) {
// 表示協(xié)程狀態(tài)機(jī)當(dāng)前的狀態(tài)
var label: Int = 0
// 三個(gè)變量,對(duì)應(yīng)原函數(shù)的三個(gè)變量
lateinit var user: String
lateinit var friendList: String
lateinit var feedList: String
// result 接收協(xié)程的運(yùn)行結(jié)果
var result = continuation.result
// suspendReturn 接收掛起函數(shù)的返回值
var suspendReturn: Any? = null
// CoroutineSingletons 是個(gè)枚舉類
// COROUTINE_SUSPENDED 代表當(dāng)前函數(shù)被掛起了
val sFlag = CoroutineSingletons.COROUTINE_SUSPENDED
// invokeSuspend 是協(xié)程的關(guān)鍵
// 它最終會(huì)調(diào)用 testCoroutine(this) 開啟協(xié)程狀態(tài)機(jī)
// 狀態(tài)機(jī)相關(guān)代碼就是后面的 when 語句
// 協(xié)程的本質(zhì),可以說就是 CPS + 狀態(tài)機(jī)
override fun invokeSuspend(_result: Result<Any?>): Any? {
result = _result
label = label or Int.Companion.MIN_VALUE
return testCoroutine(this)
}
}
// ...
val continuation = if (completion is TestContinuation) {
completion
} else {
// 作為參數(shù)
// ↓
TestContinuation(completion)
}
}
testCoroutine運(yùn)行的邏輯如下:
協(xié)程狀態(tài)機(jī)的核心邏輯反編譯后的偽代碼如下:
when (continuation.label) {
0 -> {
// 檢測異常
throwOnFailure(result)
log("start")
// 將 label 置為 1,準(zhǔn)備進(jìn)入下一次狀態(tài)
continuation.label = 1
// 執(zhí)行 getUserInfo
suspendReturn = getUserInfo(continuation)
// 判斷是否掛起
if (suspendReturn == sFlag) {
return suspendReturn
} else {
result = suspendReturn
//go to next state
}
}
1 -> {
throwOnFailure(result)
// 獲取 user 值
user = result as String
log(user)
// 準(zhǔn)備進(jìn)入下一個(gè)狀態(tài)
continuation.label = 2
// 執(zhí)行 getFriendList
suspendReturn = getFriendList(user, continuation)
// 判斷是否掛起
if (suspendReturn == sFlag) {
return suspendReturn
} else {
result = suspendReturn
//go to next state
}
}
2 -> {
throwOnFailure(result)
user = continuation.mUser as String
// 獲取 friendList 的值
friendList = result as String
log(friendList)
// 準(zhǔn)備進(jìn)入下一個(gè)狀態(tài)
continuation.label = 3
// 執(zhí)行 getFeedList
suspendReturn = getFeedList(user, friendList, continuation)
// 判斷是否掛起
if (suspendReturn == sFlag) {
return suspendReturn
} else {
result = suspendReturn
//go to next state
}
}
3 -> {
throwOnFailure(result)
user = continuation.mUser as String
friendList = continuation.mFriendList as String
feedList = continuation.result as String
log(feedList)
loop = false
}
}
我們來捋一下其中的順序,最開始先構(gòu)建一個(gè)TestContinuation的實(shí)例,注意,Continuation的這個(gè)實(shí)例是三個(gè)掛起函數(shù)的公共參數(shù)。
- getUserInfo
開始時(shí)label = 0, 此時(shí)進(jìn)入邏輯,先進(jìn)行異常的檢查,設(shè)置下一次的入口label=1,執(zhí)行getUserInfo:
when (continuation.label) {
0 -> {
// ...
continuation.label = 1
// 執(zhí)行 getUserInfo
suspendReturn = getUserInfo(continuation)
// 判斷是否掛起
if (suspendReturn == sFlag) {
return suspendReturn
} else {
result = suspendReturn
//go to next state
}
}
// ...
}
在Kotlin編譯器CPS轉(zhuǎn)換之后的getUserInfo方法中,因?yàn)閭魅肓薱ontinuation參數(shù),需要再執(zhí)行一次Continuation#invokeSuspend,這個(gè)方法同時(shí)也將結(jié)果記錄在了result處
override fun invokeSuspend(_result: Result<Any?>): Any? {
result = _result
label = label or Int.Companion.MIN_VALUE
return testCoroutine(this)
}
相當(dāng)于【遞歸】地執(zhí)行一次這樣的邏輯(個(gè)人認(rèn)為這個(gè)邏輯和傳遞事件的分發(fā)有點(diǎn)相似)。此時(shí)getUserInfo執(zhí)行完成返回的結(jié)果是CoroutineSingletons.COROUTINE_SUSPEND,所以繼續(xù)執(zhí)行下個(gè)when的case。
后面的結(jié)果其他的掛起函數(shù)的執(zhí)行過程都差不多。具體過程如圖所示:

通過這個(gè)狀態(tài)機(jī)的分析能夠讓我們更加深刻的理解掛起函數(shù)中【掛起】和【恢復(fù)】的本質(zhì):其實(shí)就是基于狀態(tài)機(jī)的回調(diào)函數(shù),但是這種回調(diào)函數(shù)的執(zhí)行邏輯是Kotlin編譯器自動(dòng)生成的,大大減少了我們的腦力消耗。
需要注意的是,以上的掛起函數(shù)都是【真正的】掛起函數(shù),suspend function中都帶有掛起的操作,但是Kotlin編譯器在進(jìn)行CPS轉(zhuǎn)換的時(shí)候只認(rèn)supsend關(guān)鍵字,對(duì)于偽suspend函數(shù),走else分支,節(jié)省開銷:
if (suspendReturn == sFlag) {
return suspendReturn
} else {
result = suspendReturn
//go to next state
}