
今天我們來聊聊Kotlin的協(xié)程Coroutine。
如果你還沒有接觸過協(xié)程,推薦你先閱讀這篇入門級文章What? 你還不知道Kotlin Coroutine?
如果你已經(jīng)接觸過協(xié)程,但對協(xié)程的原理存在疑惑,那么在閱讀本篇文章之前推薦你先閱讀下面的文章,這樣能讓你更全面更順暢的理解這篇文章。
Kotlin協(xié)程實現(xiàn)原理:Suspend&CoroutineContext
Kotlin協(xié)程實現(xiàn)原理:CoroutineScope&Job
如果你已經(jīng)接觸過協(xié)程,相信你都有過以下幾個疑問:
- 協(xié)程到底是個什么東西?
- 協(xié)程的
suspend有什么作用,工作原理是怎樣的? - 協(xié)程中的一些關(guān)鍵名稱(例如:
Job、Coroutine、Dispatcher、CoroutineContext與CoroutineScope)它們之間到底是怎么樣的關(guān)系? - 協(xié)程的所謂非阻塞式掛起與恢復又是什么?
- 協(xié)程的內(nèi)部實現(xiàn)原理是怎么樣的?
- ...
接下來的一些文章試著來分析一下這些疑問,也歡迎大家一起加入來討論。
ContinuationInterceptor
看到Interceptor相信第一印象應(yīng)該就是攔截器,例如在Okhttp中被廣泛應(yīng)用。自然在協(xié)程中ContinuationInterceptor的作用也是用來做攔截協(xié)程的。
下面來看下它的實現(xiàn)。
public interface ContinuationInterceptor : CoroutineContext.Element {
/**
* The key that defines *the* context interceptor.
*/
companion object Key : CoroutineContext.Key<ContinuationInterceptor>
/**
* Returns continuation that wraps the original [continuation], thus intercepting all resumptions.
* This function is invoked by coroutines framework when needed and the resulting continuations are
* cached internally per each instance of the original [continuation].
*
* This function may simply return original [continuation] if it does not want to intercept this particular continuation.
*
* When the original [continuation] completes, coroutine framework invokes [releaseInterceptedContinuation]
* with the resulting continuation if it was intercepted, that is if `interceptContinuation` had previously
* returned a different continuation instance.
*/
public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
...
}
只給出了關(guān)鍵部分,ContinuationInterceptor繼承于CoroutineContext.Element,所以它也是CoroutineContext,同時提供了interceptContinuation方法,先記住這個方法后續(xù)會用到。
大家是否還記得在Kotlin協(xié)程實現(xiàn)原理系列的第一篇文章中,我們分析了CoroutineContext的內(nèi)部結(jié)構(gòu),當時提到了它的plus方法,就是下面這段代碼
public operator fun plus(context: CoroutineContext): CoroutineContext =
if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
context.fold(this) { acc, element ->
val removed = acc.minusKey(element.key)
if (removed === EmptyCoroutineContext) element else {
// make sure interceptor is always last in the context (and thus is fast to get when present)
val interceptor = removed[ContinuationInterceptor]
if (interceptor == null) CombinedContext(removed, element) else {
val left = removed.minusKey(ContinuationInterceptor)
if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
CombinedContext(CombinedContext(left, element), interceptor)
}
}
}
在這里第一次看到了ContinuationInterceptor的身影,當時核心是為了分析CoroutineContext,所以只是提了plus方法每次都會將ContinuationInterceptor添加到拼接鏈的尾部。
不知道有沒有老鐵想過這個問題,為什么要每次新加入一個CoroutineContext都要調(diào)整ContinuationInterceptor的位置,并將它添加到尾部?
這里其實涉及到兩點。
其中一點是由于CombinedContext的結(jié)構(gòu)決定的。它有兩個元素分別是left與element。而left類似于前驅(qū)節(jié)點,它是一個前驅(qū)集合,而element只是一個純碎的CoroutineContext,而它的get方法每次都是從element開始進行查找對應(yīng)Key的CoroutineContext對象;沒有匹配到才會去left集合中進行遞歸查找。
所以為了加快查找ContinuationInterceptor類型的實例,才將它加入到拼接鏈的尾部,對應(yīng)的就是element。
另一個原因是ContinuationInterceptor使用的很頻繁,因為每次創(chuàng)建協(xié)程都會去嘗試查找當前協(xié)程的CoroutineContext中是否存在ContinuationInterceptor。例如我們通過launch來看協(xié)程的啟動。
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
如果你使用launch的默認參數(shù),那么此時的Coroutine就是StandaloneCoroutine,然后調(diào)用start方法啟動協(xié)程。
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
initParentJob()
start(block, receiver, this)
}
在start中進入了CoroutineStart,對應(yīng)的就是下面這段代碼
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>) =
when (this) {
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion)
CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
CoroutineStart.LAZY -> Unit // will start lazily
}
因為我們使用的是默認參數(shù),所以這里對應(yīng)的就是CoroutineStart.DEFAULT,最終來到block.startCoroutineCancellable
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellable(Unit)
}
在這里我們終于看到了intercepted。
首先通過createCoroutineUnintercepted來創(chuàng)建一個協(xié)程(內(nèi)部具體如何創(chuàng)建的這篇文章先不說,后續(xù)文章會單獨分析),然后再調(diào)用了intercepted方法進行攔截操作,最后再resumeCancellable,這個方法最終調(diào)用的就是Continuation的resumeWith方法,即啟動協(xié)程。
所以每次啟動協(xié)程都會自動回調(diào)一次resumeWith方法。
今天的主題是ContinuationInterceptor所以我們直接看intercepted。
public expect fun <T> Continuation<T>.intercepted(): Continuation<T>
發(fā)現(xiàn)它是一個expect方法,它會根據(jù)不同平臺實現(xiàn)不同的邏輯。因為我們是Android所以直接看Android上的actual的實現(xiàn)
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this
最終來到ContinuationImpl的intercepted方法
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
在這里看到了熟悉的context,獲取到ContinuationInterceptor實例,并且調(diào)用它的interceptContinuation方法返回一個處理過的Continuation。
多次調(diào)用
intercepted,對應(yīng)的interceptContinuation只會調(diào)用一次。
所以ContinuationInterceptor的攔截是通過interceptContinuation方法進行的。既然已經(jīng)明白了它的攔截方式,我們自己來手動寫一個攔截器來驗證一下。
val interceptor = object : ContinuationInterceptor {
override val key: CoroutineContext.Key<*> = ContinuationInterceptor
override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> {
println("intercept todo something. change run to thread")
return object : Continuation<T> by continuation {
override fun resumeWith(result: Result<T>) {
println("create new thread")
thread {
continuation.resumeWith(result)
}
}
}
}
}
println(Thread.currentThread().name)
lifecycleScope.launch(interceptor) {
println("launch start. current thread: ${Thread.currentThread().name}")
withContext(Dispatchers.Main) {
println("new continuation todo something in the main thread. current thread: ${Thread.currentThread().name}")
}
launch {
println("new continuation todo something. current thread: ${Thread.currentThread().name}")
}
println("launch end. current thread: ${Thread.currentThread().name}")
}
這里簡單實現(xiàn)了一個ContinuationInterceptor,如果攔截成功就會輸出interceptContinuation中對應(yīng)的語句。下面是程序運行后的輸出日志。
main
// 第一次launch
intercept todo something. change run to thread
create new thread
launch start. current thread: Thread-2
new continuation todo something in the main thread. current thread: main
create new thread
// 第二次launch
intercept todo something. change run to thread
create new thread
launch end. current thread: Thread-7
new continuation todo something. current thread: Thread-8
分析一下上面的日志,首先程序運行在main線程,通過lifecycleScope.launch啟動協(xié)程并將我們自定義的intercetpor加入到CoroutineContext中;然后在啟動的過程中發(fā)現(xiàn)我們自定義的interceptor攔截成功了,同時將原本在main線程運行的程序切換到了新的thread線程。同時第二次launch的時候也攔截成功。
到這里就已經(jīng)可以證明我們上面對ContinuationInterceptor理解是正確的,它可以在協(xié)程啟動的時候進行攔截操作。
下面我們繼續(xù)看日志,發(fā)現(xiàn)withContext并沒有攔截成功,這是為什么呢?注意看Dispatchers.Main。這也是接下來需要分析的內(nèi)容。
另外還有一點,如果細心的老鐵就會發(fā)現(xiàn),launch start與launch end所處的線程不一樣,這是因為在withContext結(jié)束之后,它內(nèi)部還會進行一次線程恢復,將自身所處的main線程切換到之前的線程,但為什么又與之前launch start的線程不同呢?
大家不要忘了,協(xié)程每一個掛起后的恢復都是通過回調(diào)resumeWith進行的,然而外部launch協(xié)程我們進行了攔截,在它返回的Continuation的resumeWith回調(diào)中總是會創(chuàng)建新的thread。所以發(fā)生這種情況也就不奇怪了,這是我們攔截的效果。
整體再來看這個例子,它是不是像一個簡易版的協(xié)程的線程切換呢?
CoroutineDispatcher
現(xiàn)在我們來看Dispatchers.Main,為什么它會導致我們攔截失敗呢?要探究原因沒有直接看源碼更加直接有效的。
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
主要看它的類型,它返回的是MainCoroutineDispatcher,然后再看它是什么
public abstract class MainCoroutineDispatcher : CoroutineDispatcher() {}
發(fā)現(xiàn)MainCoroutineDispatcher繼承于CoroutineDispatcher,主角登場了,但還不夠我們繼續(xù)看CoroutineDispatcher是什么
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
public open fun dispatchYield(context: CoroutineContext, block: Runnable) = dispatch(context, block)
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
}
真想已經(jīng)浮出水面了,原來CoroutineDispatcher實現(xiàn)了ContinuationInterceptor,說明CoroutineDispatcher也具有攔截器的功能。然后再結(jié)合CoroutineContext的性質(zhì),就很好解釋為什么我們自定義的攔截器沒有生效。
原因就是它與我們自定義的攔截器一樣都實現(xiàn)了ContinuationInterceptor接口,一旦使用Dispatchers.Main就會替換掉我們自定義的攔截器。
因果關(guān)系弄明白了現(xiàn)在就好辦了。我們已經(jīng)知道它具有攔截功能,再來看CoroutineDispatcher提供的另外幾個方法isDispatchNeeded與dispatch。
我們可以大膽猜測,isDispatchNeeded就是判斷是否需要分發(fā),然后dispatch就是如何進行分發(fā),接下來我們來驗證一下。
ContinuationInterceptor重要的方法就是interceptContinuation,在CoroutineDispatcher中直接返回了DispatchedContinuation對象,它是一個Continuation類型。那么自然重點就是它的resumeWith方法。
override fun resumeWith(result: Result<T>) {
val context = continuation.context
val state = result.toState()
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_ATOMIC_DEFAULT
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_ATOMIC_DEFAULT) {
withCoroutineContext(this.context, countOrElement) {
continuation.resumeWith(result)
}
}
}
}
這里我們看到了isDispatchNeeded與dispatch方法,如果不需要分發(fā)自然是直接調(diào)用原始的continuation對象的resumeWith方法,也就沒有什么類似于線程的切換。
那什么時候isDispatcheNeeded為true呢?這就要看它的dispatcer是什么。
由于現(xiàn)在我們是拿Dispatchers.Main作分析。所以這里我直接告訴你們它的dispatcher是HandlerContext
override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
HandlerContext(Looper.getMainLooper().asHandler(async = true), "Main")
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
/**
* Creates [CoroutineDispatcher] for the given Android [handler].
*
* @param handler a handler.
* @param name an optional name for debugging.
*/
public constructor(
handler: Handler,
name: String? = null
) : this(handler, name, false)
@Volatile
private var _immediate: HandlerContext? = if (invokeImmediately) this else null
override val immediate: HandlerContext = _immediate ?:
HandlerContext(handler, name, true).also { _immediate = it }
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
return !invokeImmediately || Looper.myLooper() != handler.looper
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
handler.post(block)
}
...
}
它繼承于HandlerDispatcher,而HandlerDispatcher繼承于MainCoroutineDispatcher。
條件都符合,我們直接看isDispatchNeeded方法返回true的邏輯。
首先通過invokeImmediately判斷,它代表當前線程是否與自身的線程相同,如何你外部使用者能夠保證這一點,就可以直接使用Dispatcher.Main.immediate來避免進行線程的切換邏輯。當然為了保證外部的判斷失敗,最后也會通過Looper.myLooper() != handler.looper來進行校正。對于Dispatchers.Main這個的handle.looper自然是主線程的looper。
如果不能保證則invokeImmediately為false,直接進行線程切換。然后進入dispatch方法,下面是Dispatchers.Main中dispatch的處理邏輯。
override fun dispatch(context: CoroutineContext, block: Runnable) {
handler.post(block)
}
這個再熟悉不過了,因為這個時候的handler.post就是代表向主線程推送消息,此時的block將會在主線程進行調(diào)用。
這樣線程的切換就完成。
所以綜上來看,CoroutineDispatcher為協(xié)程提供了一個線程切換的統(tǒng)一判斷與執(zhí)行標準。
首先在協(xié)程進行啟動的時候通過攔截器的方式進行攔截,對應(yīng)的方法是interceptContinuation,然后返回一個具有切換線程功能的Continuation,在每次進行resumeWith的時候,內(nèi)部再通過isDispatchNeeded進行判斷當前協(xié)程的運行是否需要切換線程。如果需要則調(diào)用dispatch進行線程的切換,保證協(xié)程的正確運行。
如果我要自定義協(xié)程線程的切換邏輯,就可以通過繼承于CoroutineDispatcher來實現(xiàn),將它的核心方法進行自定義即可。
當然,如果你是在Android中使用協(xié)程,那基本上是不需要自定義線程的切換邏輯。因為kotlin已經(jīng)為我們提供了日常所需的Dispatchers。主要有四種分別為:
-
Dispatchers.Default: 適合在主線程之外執(zhí)行占用大量CPU資源的工作 -
Dispatchers.Main:Android主線程 -
Dispatchers.Unconfined: 它不會切換線程,只是啟動一個協(xié)程進行掛起,至于恢復之后所在的線程完全由調(diào)用它恢復的協(xié)程控制。 -
Dispatchers.IO: 適合在主線程之外執(zhí)行磁盤或網(wǎng)絡(luò)I/O
最后我們再來簡單提一下withContext。
withContext
CoroutineDispatcher雖然能夠提供線程的切換,但這只是單方向的,因為它沒有提供線程的恢復。
試想一下,我們有個網(wǎng)絡(luò)請求,我們通過CoroutineDispatcher將線程切換到Dispatchers.IO,當拿到請求成功的數(shù)據(jù)之后,所在的線程還是IO線程,這樣并不能有利于我們UI操作。所以為了解決這個問題kotlin提供了withContext,它不僅能夠接受CoroutineDispatcher來幫助我們切換線程,同時在執(zhí)行完畢之后還會幫助我們將之前切換掉的線程進恢復,保證協(xié)程運行的連貫性。這也是為什么官方推薦使用withContext進行協(xié)程線程的切換的原因。
而withContext的線程恢復原理是它內(nèi)部生成了一個DispatchedCoroutine,保存切換線程時的CoroutineContext與切換之前的Continuation,最后在onCompletionInternal進行恢復。
internal override fun onCompletionInternal(state: Any?, mode: Int, suppressed: Boolean) {
if (state is CompletedExceptionally) {
val exception = if (mode == MODE_IGNORE) state.cause else recoverStackTrace(state.cause, uCont)
uCont.resumeUninterceptedWithExceptionMode(exception, mode)
} else {
uCont.resumeUninterceptedMode(state as T, mode)
}
}
這個uCont就是切換線程之前的Continuation。具體實現(xiàn)就不在這分析了,感興趣的老鐵可以自己翻一翻源碼。
本篇文章主要介紹了ContinuationInterceptor作用與如何攔截協(xié)程的,同時也分析了CoroutineDispatcher內(nèi)部結(jié)構(gòu),進一步剖析了協(xié)程線程切換的原理。希望對學習協(xié)程的伙伴們能夠有所幫助,敬請期待后續(xù)的協(xié)程分析。
項目
android_startup: 提供一種在應(yīng)用啟動時能夠更加簡單、高效的方式來初始化組件,優(yōu)化啟動速度。不僅支持Jetpack App Startup的全部功能,還提供額外的同步與異步等待、線程控制與多進程支持等功能。
AwesomeGithub: 基于Github客戶端,純練習項目,支持組件化開發(fā),支持賬戶密碼與認證登陸。使用Kotlin語言進行開發(fā),項目架構(gòu)是基于Jetpack&DataBinding的MVVM;項目中使用了Arouter、Retrofit、Coroutine、Glide、Dagger與Hilt等流行開源技術(shù)。
flutter_github: 基于Flutter的跨平臺版本Github客戶端,與AwesomeGithub相對應(yīng)。
android-api-analysis: 結(jié)合詳細的Demo來全面解析Android相關(guān)的知識點, 幫助讀者能夠更快的掌握與理解所闡述的要點。
daily_algorithm: 每日一算法,由淺入深,歡迎加入一起共勉。