Coroutine協(xié)程是kotlin實(shí)現(xiàn)的一種異步執(zhí)行邏輯的方式,相對(duì)與傳統(tǒng)的線程,協(xié)程更加簡(jiǎn)潔,高效,占用資源少。那協(xié)程到底是怎么實(shí)現(xiàn)異步的呢?
線程
在現(xiàn)在的操作系統(tǒng)中,線程是CPU調(diào)度的最少單元。所有的程序邏輯運(yùn)行在線程之上。在Java API中, Thread是實(shí)現(xiàn)線程的基本類。它的內(nèi)部實(shí)現(xiàn)是大量的 JNI 調(diào)用,因?yàn)榫€程的實(shí)現(xiàn)必須由操作系統(tǒng)直接提供支持。在 Android 平臺(tái)上,Thread 的創(chuàng)建過(guò)程中,會(huì)調(diào)用 Linux API 中的 pthread_create 函數(shù),這直接說(shuō)明了 Java 層中的 Thread 和 Linux 系統(tǒng)級(jí)別的中的線程是一一對(duì)應(yīng)的。
線程的問(wèn)題是阻塞與運(yùn)行兩種狀態(tài)之間的切換有相當(dāng)大的資源開(kāi)銷,線程并不是一種輕量級(jí)資源,大量創(chuàng)建線程是對(duì)系統(tǒng)資源的一種消耗,而線程的阻塞調(diào)用會(huì)導(dǎo)致系統(tǒng)中存在大量因阻塞而不運(yùn)行的線程,這對(duì)系統(tǒng)資源是一種極大的浪費(fèi)。
協(xié)程
協(xié)程本質(zhì)上可以認(rèn)為是運(yùn)行在線程上的代碼塊,協(xié)程提供的 掛起 操作會(huì)使協(xié)程暫停執(zhí)行,而不會(huì)導(dǎo)致線程阻塞。而且協(xié)程是一種輕量級(jí)資源,一個(gè)應(yīng)用中即使創(chuàng)建了上千個(gè)協(xié)程也不會(huì)造成太大的負(fù)擔(dān)。
協(xié)程的是通過(guò)’suspend‘修飾符來(lái)修飾需要掛起的方法。suspend并不是Java的API,是kotlin通過(guò)編譯器實(shí)現(xiàn)的。
CPS 變換
被 suspend 修飾符修飾的函數(shù)在編譯期間會(huì)被編譯器做特殊處理,這個(gè)特殊處理的第一步就是做CPS 變換。
CPS (Continuation-passing style)變換是一種編程風(fēng)格,就是將控制流顯式表示為continuation的一種編程風(fēng)格. 簡(jiǎn)單來(lái)理解就是顯式使用函數(shù)表示函數(shù)返回的后續(xù)操作。
例如:
suspend fun <T> foo<T>.await(): T
在編譯期發(fā)生 CPS 變換之后:
fun <T> foo<T>.await(continuation: Continuation<T>): Any?
CPS 變換后的函數(shù)多了一個(gè) Continuation<T> 類型的參數(shù),Continuation就是續(xù)體。源碼:
interface Continuation<in T> {
val context: CoroutineContext
fun resumeWith(result: Result<T>)
}
Continuation是一個(gè)抽象的概念,簡(jiǎn)單來(lái)說(shuō)它包裝了協(xié)程在掛起之后應(yīng)該繼續(xù)執(zhí)行的代碼;在編譯的過(guò)程中,一個(gè)完整的協(xié)程被分割切塊成多個(gè)續(xù)體。在 await 函數(shù)的掛起結(jié)束以后,它會(huì)調(diào)用 continuation 參數(shù)的 resumeWith 函數(shù),來(lái)恢復(fù)執(zhí)行 await 函數(shù)后面的代碼。
方法經(jīng)過(guò)CPS 變換之后,返回值類型變成了 Any?,這是因?yàn)檫@個(gè)函數(shù)在發(fā)生變換后,除了要返回它本身的返回值,還要返回一個(gè)標(biāo)記——COROUTINE_SUSPENDED,而這個(gè)返回類型事實(shí)上是返回類型 T 與 COROUTINE_SUSPENDED 的聯(lián)合類型。由于Kotlin 中沒(méi)有聯(lián)合類型,所以只好用最泛化的類型 Any? 來(lái)表示,而 COROUTINE_SUSPENDED 是一個(gè)標(biāo)記,返回它的掛起函數(shù)表示這個(gè)掛起函數(shù)會(huì)發(fā)生事實(shí)上的掛起操作。
狀態(tài)機(jī)
Continuation為了直接支持掛起(即使協(xié)程在掛起點(diǎn)中斷執(zhí)行而在適當(dāng)?shù)臅r(shí)機(jī)在恢復(fù))操作,編譯器在編譯掛起函數(shù)時(shí)會(huì)將函數(shù)體編譯為狀態(tài)機(jī)。主要是為了性能考慮,避免多創(chuàng)建類和對(duì)象。
如:
val a = a()
val y = foo(a).await() // #1
b()
val z = bar(a, y).await() // #2
c(z)
編譯之后生成的偽代碼:
class <anonymous_for_state_machine> extends SuspendLambda<...> {
// 狀態(tài)機(jī)當(dāng)前狀態(tài)
int label = 0
// 協(xié)程的局部變量
A a = null
Y y = null
void resumeWith(Object result) {
if (label == 0) goto L0
if (label == 1) goto L1
if (label == 2) goto L2
else throw IllegalStateException()
L0:
// 這次調(diào)用,result 應(yīng)該為空
a = a()
label = 1
result = foo(a).await(this) // 'this' 作為續(xù)體傳遞
if (result == COROUTINE_SUSPENDED) return // 如果 await 掛起了執(zhí)行則返回
L1:
// 外部代碼傳入 .await() 的結(jié)果恢復(fù)協(xié)程
y = (Y) result
b()
label = 2
result = bar(a, y).await(this) // 'this' 作為續(xù)體傳遞
if (result == COROUTINE_SUSPENDED) return // 如果 await 掛起了執(zhí)行則返回
L2:
// 外部代碼傳入 .await() 的結(jié)果恢復(fù)協(xié)程
Z z = (Z) result
c(z)
label = -1 // 沒(méi)有其他步驟了
return
}
}
一個(gè)掛起函數(shù)會(huì)被編譯成一個(gè)匿名類,匿名類中的一個(gè)函數(shù)實(shí)現(xiàn)了這個(gè)狀態(tài)機(jī)。成員變量 label 代表了當(dāng)前狀態(tài)機(jī)的狀態(tài),每一個(gè)續(xù)體(即掛起點(diǎn)中間的部分以及掛起點(diǎn)與函數(shù)頭尾之間的部分)都各自對(duì)應(yīng)了一個(gè)狀態(tài),當(dāng)函數(shù)運(yùn)行到每個(gè)掛起點(diǎn)時(shí),label 的值都受限會(huì)發(fā)生改變,并且當(dāng)前的續(xù)體(也就是代碼中的this)都會(huì)作為實(shí)參傳遞給發(fā)生了 CPS 變換的掛起函數(shù),如果這個(gè)掛起函數(shù)沒(méi)有發(fā)生事實(shí)上的掛起,函數(shù)繼續(xù)運(yùn)行,如果發(fā)生了事實(shí)上的掛起,則函數(shù)直接 return。
由于 label 記錄了狀態(tài),所以在協(xié)程恢復(fù)的時(shí)候,可以根據(jù)狀態(tài)使用 goto 語(yǔ)句直接跳轉(zhuǎn)至上次的掛起點(diǎn)并向后執(zhí)行,這就是協(xié)程掛起的原理。另外,雖然 Java 中沒(méi)有 goto 語(yǔ)句,但是 class 字節(jié)碼中支持 goto。
續(xù)體攔截器
掛起函數(shù)在恢復(fù)的時(shí)候,理論上可能會(huì)在任何一個(gè)線程上恢復(fù),有時(shí)我們需要限定協(xié)程運(yùn)行在指定的線程,例如在Android中,更新 UI 的操作只能在 UI 主線程中進(jìn)行。
android MainDispatcherLoader的實(shí)現(xiàn):
// Main 調(diào)度器
@JvmStatic
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
// dispatcher 由 loadMainDispatcher() 函數(shù)創(chuàng)建
internal object MainDispatcherLoader {
@JvmField
val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()
private fun loadMainDispatcher(): MainCoroutineDispatcher {
......
}
}
// MainCoroutineDispatcher
public abstract class MainCoroutineDispatcher : CoroutineDispatcher() {
@ExperimentalCoroutinesApi
public abstract val immediate: MainCoroutineDispatcher
}
// CoroutineDispatcher
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
......
}
@InternalCoroutinesApi
public fun MainDispatcherFactory.tryCreateDispatcher(factories: List<MainDispatcherFactory>): MainCoroutineDispatcher =
try {
createDispatcher(factories)
} catch (cause: Throwable) {
MissingMainCoroutineDispatcher(cause, hintOnError())
}
/**
* @suppress
*/
@InternalCoroutinesApi
public object MissingMainCoroutineDispatcherFactory : MainDispatcherFactory {
override val loadPriority: Int
get() = -1
override fun createDispatcher(allFactories: List<MainDispatcherFactory>): MainCoroutineDispatcher {
return MissingMainCoroutineDispatcher(null)
}
}
// ContinuationInterceptor(續(xù)體攔截器)
public interface ContinuationInterceptor : CoroutineContext.Element {
companion object Key : CoroutineContext.Key<ContinuationInterceptor>
public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
public fun releaseInterceptedContinuation(continuation: Continuation<*>) {
/* do nothing by default */
}
// Performance optimization for a singleton Key
public override operator fun <E : CoroutineContext.Element> get(key: CoroutineContext.Key<E>): E? =
@Suppress("UNCHECKED_CAST")
if (key === Key) this as E else null
// Performance optimization to a singleton Key
public override fun minusKey(key: CoroutineContext.Key<*>): CoroutineContext =
if (key === Key) EmptyCoroutineContext else this
}
ContinuationInterceptor,負(fù)責(zé)攔截協(xié)程在恢復(fù)后應(yīng)執(zhí)行的代碼(即續(xù)體)并將其在指定線程或線程池恢復(fù)
在掛起函數(shù)的編譯中,每個(gè)掛起函數(shù)都會(huì)被編譯為一個(gè)實(shí)現(xiàn)了 Continuation 接口的匿名類,而續(xù)體攔截器會(huì)攔截真正掛起協(xié)程的掛起點(diǎn)的續(xù)體。在協(xié)程中調(diào)用掛起函數(shù),掛起函數(shù)不一定會(huì)真正掛起協(xié)程
如:
launch {
val deferred = async {
// 異步邏輯
......
}
......
deferred.await()
......
}
在 deferred.await() 這行執(zhí)行的時(shí)候,如果異步邏輯已經(jīng)執(zhí)行完成并取得了結(jié)果,那 await 函數(shù)會(huì)直接取得結(jié)果,而不會(huì)掛起協(xié)程。相反,如果網(wǎng)絡(luò)請(qǐng)求還未產(chǎn)生結(jié)果,await 函數(shù)就會(huì)使協(xié)程掛起。續(xù)體攔截器只攔截真正發(fā)生掛起的掛起點(diǎn)后的續(xù)體,對(duì)于未發(fā)生掛起的掛起點(diǎn),續(xù)體會(huì)被直接調(diào)用 resumeWith 這一類的函數(shù)而不需要續(xù)攔截器對(duì)它進(jìn)行操作。除此之外,續(xù)體攔截器還會(huì)緩存攔截過(guò)的續(xù)體,并且在不再需要它的時(shí)候調(diào)用 releaseInterceptedContinuation 函數(shù)釋放它。