原理篇(一)關(guān)鍵類的分析
上面簡(jiǎn)單的介紹了一些用法,但是具體的原理和特點(diǎn),好像還不是很清楚,那么下面就來(lái)介紹一下,一些關(guān)鍵的類,流程和原理。
介紹的相關(guān)的原理基于這行代碼:
fun coroTest() {
GlobalScope.launch {
delay(1000L)//Delays coroutine for a given time without blocking a thread and resumes it after a specified time
Log.i(CO_TAG, "launch ")
}
Log.i(CO_TAG, "----")
}
然后貼上 launch() 的源碼:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
CoroutineScope
源碼分析
首先,launch() 是 CoroutineScope 的一個(gè)擴(kuò)展函數(shù),
CoroutineScope 簡(jiǎn)單來(lái)說(shuō),就是協(xié)程的作用范圍,每一個(gè) Coroutine Builder,例如 CoroutineScope.launch,都是 CoroutineScope 的擴(kuò)展,并且繼承了其 coroutineContext .
Corotine 提供了全局的 CoroutineScope 也就是 GlobalScope,簡(jiǎn)單看下其代碼:
//CoroutineScope.kt 中
public object GlobalScope : CoroutineScope {
/**
* Returns [EmptyCoroutineContext].
*/
override val coroutineContext: CoroutineContext
get() = EmptyCoroutineContext
}
//EmptyCorotineContext in CorotineContextImpl.kt 文件中
public object EmptyCoroutineContext : CoroutineContext, Serializable {
private const val serialVersionUID: Long = 0
private fun readResolve(): Any = EmptyCoroutineContext
public override fun <E : Element> get(key: Key<E>): E? = null
public override fun <R> fold(initial: R, operation: (R, Element) -> R): R = initial
public override fun plus(context: CoroutineContext): CoroutineContext = context
public override fun minusKey(key: Key<*>): CoroutineContext = this
public override fun hashCode(): Int = 0
public override fun toString(): String = "EmptyCoroutineContext"
}
簡(jiǎn)單來(lái)說(shuō),GlobalScope 沒(méi)有綁定任何 job,它用于構(gòu)建最頂級(jí)的 coroutines,這些協(xié)程的生命周期跟隨這個(gè) Application,并且在 Application 生命周期結(jié)束之前,不會(huì)被 cancel。
關(guān)鍵函數(shù)分析
CoroutinScope 主要包含了以下擴(kuò)展函數(shù):
| actor |
fun <E> CoroutineScope.actor( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, start: CoroutineStart = CoroutineStart.DEFAULT, onCompletion: CompletionHandler? = null, block: suspend ActorScope<E>.() -> Unit): SendChannel<E>Launches new coroutine that is receiving messages from its mailbox channel and returns a reference to its mailbox channel as a SendChannel. The resulting object can be used to send messages to this coroutine. |
|---|---|
| async |
fun <T> CoroutineScope.async( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T): Deferred<T>Creates new coroutine and returns its future result as an implementation of Deferred. The running coroutine is cancelled when the resulting deferred is cancelled. |
| broadcast |
fun <E> CoroutineScope.broadcast( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 1, start: CoroutineStart = CoroutineStart.LAZY, onCompletion: CompletionHandler? = null, block: suspend ProducerScope<E>.() -> Unit): BroadcastChannel<E>Launches new coroutine to produce a stream of values by sending them to a broadcast channel and returns a reference to the coroutine as a BroadcastChannel. The resulting object can be used to subscribe to elements produced by this coroutine. |
| cancel |
fun CoroutineScope.cancel(): UnitCancels this scope, including its job and all its children. Throws IllegalStateExceptionif the scope does not have a job in it. |
| launch |
fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit): JobLaunches new coroutine without blocking current thread and returns a reference to the coroutine as a Job. The coroutine is cancelled when the resulting job is cancelled. |
| newCoroutineContext |
fun CoroutineScope.newCoroutineContext( context: CoroutineContext): CoroutineContextCreates context for the new coroutine. It installs Dispatchers.Default when no other dispatcher nor ContinuationInterceptor is specified, and adds optional support for debugging facilities (when turned on). |
| plus |
operator fun CoroutineScope.plus( context: CoroutineContext): CoroutineScopeAdds the specified coroutine context to this scope, overriding existing elements in the current scope’s context with the corresponding keys. |
| produce |
fun <E> CoroutineScope.produce( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, block: suspend ProducerScope<E>.() -> Unit): ReceiveChannel<E>Launches new coroutine to produce a stream of values by sending them to a channel and returns a reference to the coroutine as a ReceiveChannel. This resulting object can be used to receive elements produced by this coroutine. |
- actor{} 會(huì)啟動(dòng)一個(gè)能夠接收 Message 的 Coroutine,也就是 SendChannel.你可以通過(guò)SendChannel.send() 方法給其它協(xié)程發(fā)送消息或者結(jié)果,或者通過(guò) SendChannel.offer() 發(fā)送消息,兩者區(qū)別在于 offer() 在隊(duì)列滿的時(shí)候,會(huì)返回失敗,而 send() 則不會(huì)。對(duì)應(yīng)接收消息的是 ReceiveChannel。
- async{} 會(huì)啟動(dòng)一個(gè)新的協(xié)程,并且返回一個(gè) Deferred(它是一個(gè) future 的實(shí)現(xiàn)),你可以通過(guò) Deferred.await() 異步獲取結(jié)果。
- broadcast{} 和 actor 類似,只不過(guò),BroadcastChannel 是一種一對(duì)多的訂閱關(guān)系。
- launch{} 啟動(dòng)一個(gè)新的協(xié)程,并且返回一個(gè) Job 對(duì)象,你可以通過(guò)這個(gè) Job 取消協(xié)程
- plus 也就是 + ,通過(guò)該操作,你可以給該 CoroutineScope 關(guān)聯(lián)一個(gè) CoroutineContext,并且如果存在同一個(gè) key 會(huì)替換之前的 CoroutineContext。
- newConroutinContext 會(huì)給當(dāng)前 CoroutineScop 創(chuàng)建一個(gè) ,如果不存在 Dispatcher 或者 ContinuationInterceptor 則會(huì)給這個(gè) Context 關(guān)聯(lián)默認(rèn)的 Dispatcher.Default。
- produce{} 啟動(dòng)一個(gè)新的協(xié)程,并且返回 receiveChannel。
官方說(shuō)明鏈接 CoroutineScope
CoroutineContext
協(xié)程是運(yùn)行在 CoroutinesContext 的一些集合里面,根據(jù)官方文檔的意思
* Persistent context for the coroutine. It is an indexed set of [Element] instances.
* An indexed set is a mix between a set and a map.
* Every element in this set has a unique [Key]. Keys are compared _by reference_.
也就是說(shuō),CoroutineContext 是 coroutine 的運(yùn)行的 Context,它是 Element 實(shí)例的集合,這種集合介于 set 和map 之間,每一個(gè) Element 都有一個(gè)和對(duì)象引用相關(guān)的 key,作為 Element 的唯一標(biāo)志。
簡(jiǎn)單看下其源碼:
public interface CoroutineContext {
/**
* Returns the element with the given [key] from this context or `null`.
* Keys are compared _by reference_, that is to get an element from the context the reference to its actual key
* object must be presented to this function.
通過(guò)key 獲得對(duì)應(yīng)的CoroutineContext
*/
public operator fun <E : Element> get(key: Key<E>): E?
/**
* Accumulates entries of this context starting with [initial] value and applying [operation]
* from left to right to current accumulator value and each element of this context.
使用 initial 作為初始值,operation 作為累加操作
*/
public fun <R> fold(initial: R, operation: (R, Element) -> R): R
/**
* Returns a context containing elements from this context and elements from other [context].
* The elements from this context with the same key as in the other one are dropped.
*/
public operator fun plus(context: CoroutineContext): CoroutineContext =
if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
context.fold(this) { acc, element ->
val removed = acc.minusKey(element.key)
if (removed === EmptyCoroutineContext) element else {
// make sure interceptor is always last in the context (and thus is fast to get when present)
val interceptor = removed[ContinuationInterceptor]
if (interceptor == null) CombinedContext(removed, element) else {
val left = removed.minusKey(ContinuationInterceptor)
if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
CombinedContext(CombinedContext(left, element), interceptor)
}
}
}
/**
* Returns a context containing elements from this context, but without an element with
* the specified [key]. Keys are compared _by reference_, that is to remove an element from the context
* the reference to its actual key object must be presented to this function.
*/
public fun minusKey(key: Key<*>): CoroutineContext
/**
* Key for the elements of [CoroutineContext]. [E] is a type of element with this key.
* Keys in the context are compared _by reference_.
*/
public interface Key<E : Element>
/**
* An element of the [CoroutineContext]. An element of the coroutine context is a singleton context by itself.
*/
public interface Element : CoroutineContext {
/**
* A key of this coroutine context element.
*/
public val key: Key<*>
public override operator fun <E : Element> get(key: Key<E>): E? =
@Suppress("UNCHECKED_CAST")
if (this.key == key) this as E else null
public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
operation(initial, this)
public override fun minusKey(key: Key<*>): CoroutineContext =
if (this.key == key) EmptyCoroutineContext else this
}
}
簡(jiǎn)單介紹下這里相關(guān)的類以及函數(shù):
相關(guān)類
- Element:CoroutineContext 的一個(gè)元素,任何一個(gè) Coroutine context 都是自己唯一的單例。
- Key: 作為CoroutineContext 的key,這些 Key 都是通過(guò)對(duì)象引用比較的,也就是說(shuō),同一個(gè)對(duì)象的key 才是相同的。這里也就是提供一種泛型的能力,使子類的CoroutineContext 的key,可以是任何繼承自 CoroutineContext 的類實(shí)例。
- CoroutineContext:如之前的介紹
相關(guān)函數(shù)
- public operator fun <E : Element> get(key: Key<E>): E? 根據(jù)給定的 Key 獲取特定的 CoroutineContext
- public fun <R> fold(initial: R, operation: (R, Element) -> R): R 把傳入的 initial 作為初始變量,通過(guò)傳入的 operation 操作,累次操作 Context 中的 Element。
- public operator fun plus(context: CoroutineContext): CoroutineContext 返回一個(gè)包含了當(dāng)前 Context 和傳入的 CoroutineContext 包含的所有的 Element 的一個(gè) Context。如果是兩者都存在的 Element 會(huì)被刪除掉。
- public fun minusKey(key: Key<*>): CoroutineContext 根據(jù)傳入的 key 返回該Context 存在的但是不包含傳入的 Keys 的或集。
具體怎樣去理解這些內(nèi)容呢?我們結(jié)合 CoroutineContextImpl.kt 文件里面提供的幾個(gè) Context 來(lái)理解一下:
例如 EmptyCoroutineContext
public object EmptyCoroutineContext : CoroutineContext, Serializable {
private const val serialVersionUID: Long = 0
private fun readResolve(): Any = EmptyCoroutineContext
//EmptyCoroutineContext 不包含一個(gè)任何一個(gè) Element
public override fun <E : Element> get(key: Key<E>): E? = null
public override fun <R> fold(initial: R, operation: (R, Element) -> R): R = initial
public override fun plus(context: CoroutineContext): CoroutineContext = context
public override fun minusKey(key: Key<*>): CoroutineContext = this
public override fun hashCode(): Int = 0
public override fun toString(): String = "EmptyCoroutineContext"
}
CoroutineDispatcher
簡(jiǎn)單用法
CoroutineDispatcher 每一個(gè)CoroutineContext 都包含一個(gè) CoroutineDispatcher ,它設(shè)置了對(duì)應(yīng)的協(xié)程使用一個(gè)或者多個(gè)線程,協(xié)程調(diào)度器可以將協(xié)程的執(zhí)行局限在指定的線程中,調(diào)度它運(yùn)行在線程池中或讓它不受限的運(yùn)行。
先來(lái)看下具體怎么使用它吧,下面是一個(gè)簡(jiǎn)單的例子:
fun dispatchTest() {
runBlocking {
launch {
doLog("in main thread")
}
launch(Dispatchers.Unconfined) {
doLog("in Unconfined thread 1")
}
launch(Dispatchers.Default) {
doLog(" in child thread 2")
}
launch(newSingleThreadContext("child thread3")) {
doLog("in new thread")
}
}
}
然后控制臺(tái)輸出如下:
01-12 17:20:48.212 14843-14843/com.yy.yylite.kotlinshare I/Context: in Unconfined thread 1
01-12 17:20:48.213 14843-14950/com.yy.yylite.kotlinshare I/Context: in child thread 2
01-12 17:20:48.215 14843-14843/com.yy.yylite.kotlinshare I/Context: in main thread
01-12 17:20:48.215 14843-14953/com.yy.yylite.kotlinshare I/Context: in new thread
所以我們可以指定一個(gè)協(xié)程在特定的子線程執(zhí)行。
如果直接調(diào)用 launch,不指定Dispatcher,那么它使用的是啟動(dòng)它的 CoroutineScope 的Context 以及 Dispatch。
常見(jiàn)的 Dispatcher 如下:
- Dispatchers.Default:默認(rèn)的 Dispatcher,會(huì)在jvm 層級(jí)共享線程池,會(huì)創(chuàng)建等于 cpu 核數(shù)的線程數(shù)目,但是始終大于等于2,因?yàn)?cpu 是非常珍貴的資源所以使用 Default Dispatcher 是非常合理的。
- Dispatchers.IO :用于IO 操作的Dispatcher,是按需創(chuàng)建的線程池。
- Dispatcher.Main:主線程的 Dispatcher
- Dispatcher.UnConfined:不確定的 Dispatcher,會(huì)在調(diào)用的地方創(chuàng)建 Coroutine,但是會(huì)在任意線程執(zhí)行 Coroutine,取決于第一個(gè)掛起的協(xié)程的返回結(jié)果。
關(guān)于 Dispatcher.UnConfined 我們?cè)趺慈ダ斫饽??先看一個(gè)例子:
/**
* 非限定的 Dispatcher
*/
fun testDispatcher() = runBlocking {
launch(Dispatchers.Unconfined) {
doLog("start coroutine")
delay(500)
doLog("after delay")
}
launch {
doLog("in main start coroutine ")
delay(1000)
doLog("in main after delay")
}
}
然后控制臺(tái)輸出結(jié)果如下:
01-19 10:22:19.785 30584-30584/com.yy.yylite.kotlinshare I/Context: start coroutine
01-19 10:22:19.788 30584-30584/com.yy.yylite.kotlinshare I/Context: in main start coroutine
01-19 10:22:20.286 30584-30723/com.yy.yylite.kotlinshare I/Context: after delay
01-19 10:22:20.788 30584-30584/com.yy.yylite.kotlinshare I/Context: in main after delay
也就是說(shuō),Unconfined 類型的 Dispatcher最終分發(fā)的線程,是不確定的。
源碼分析
協(xié)程執(zhí)行在特定的 CoroutineContext,而CoroutineContext 中總是有一個(gè)特定的 Dispatcher,負(fù)責(zé)分發(fā)協(xié)程,根據(jù)官網(wǎng)對(duì) CoroutineDispatcher 的說(shuō)明如下:
Coroutine context includes a coroutine dispatcher (see [CoroutineDispatcher]) that determines what thread or threads
the corresponding coroutine uses for its execution. Coroutine dispatcher can confine coroutine execution
to a specific thread, dispatch it to a thread pool, or let it run unconfined.
簡(jiǎn)單來(lái)說(shuō),CoroutineDispatcher 決定了當(dāng)前 Coroutine 在哪個(gè)線程或者哪幾個(gè)線程中執(zhí)行,可能是某個(gè)特定的線程,也可能是分發(fā)到線程池或者是 unconfined。
繼承結(jié)構(gòu)
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {}
也就是說(shuō) CoroutineDispatcher 既是一種 Element 也是 ContinuationInterceptor。
其存在多個(gè)子類,如下:

其中 ExecutorCoroutineDispatcher 是基于線程池 Executor 來(lái)分發(fā)任務(wù)的,并且每一個(gè)線程池,需要Dispatcher 自己本身去關(guān)閉。
MainCoroutinCoroutineDispatcher 是在主線程分發(fā)的特殊 Dispatcher,你可以通過(guò) Dispatcher.Mian 獲得,它是一個(gè) Object,可以被直接訪問(wèn)。
關(guān)鍵函數(shù)分析
/**
* 是否需要將協(xié)程執(zhí)行分發(fā)到其它線程,如果是 true 則表示需要,false 表示不需要;一般都是返回 true
*/
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
/**
* Dispatches execution of a runnable [block] onto another thread in the given [context].
將協(xié)程的執(zhí)行代碼,也就是 Runnable 分發(fā)到指定 Context 的線程
*/
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
/**
* Returns continuation that wraps the original [continuation], thus intercepting all resumptions.
返回一封裝了原始 continuation 的 continuation,實(shí)現(xiàn)可以攔截所有 Coroutin 的 resumtions
*/
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
從上面的介紹,我們可以大概的知道,dispatch() 類似java Executor 里面的 void execute(Runnable command);方法,幫助執(zhí)行子線程的方法。
ExecutorCoroutineDispatcher
ExecutorCoroutineDispatcher 是 CoroutineDispatcher 的子類,主要用于在子線程分發(fā) Coroutine 的場(chǎng)景,源碼比較簡(jiǎn)單,直接看吧:
public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closeable {
/**
* Closes this coroutine dispatcher and shuts down its executor.
*
* It may throw an exception if this dispatcher is global and cannot be closed.
*/
public abstract override fun close()
/**
* Underlying executor of current [CoroutineDispatcher].
*/
public abstract val executor: Executor
}
這里包含了一個(gè) Executor 線程池類的變量,然后就是定義了一個(gè) close() 方法,用于關(guān)閉線程池。需要注意的是,該類間接繼承了 CoroutineContext 類,所以存在 cancel() 以及 cancelChildren() 方法,用于取消當(dāng)前 Job 或者該 Context 的Children Job。
該類的子類是 ExecutorCoroutineDispatcherBase,看下面分析。
ExecutorCoroutineDispatcherBase
繼承結(jié)構(gòu)
internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispatcher(), Delay {}
除了繼承ExecutorCoroutineDispatcher 之外,還實(shí)現(xiàn)了Delay 接口,Delay 接口如下:
public interface Delay {
/**
掛起協(xié)程,并且不會(huì)阻塞當(dāng)前線程
*/
suspend fun delay(time: Long) {
if (time <= 0) return // don't delay
return suspendCancellableCoroutine { scheduleResumeAfterDelay(time, it) }
}
/**
*/
fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>)
/**
*/
fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle =
DefaultDelay.invokeOnTimeout(timeMillis, block)
}
Delay 接口,使得Dispatcher 具備任務(wù)調(diào)度的能力,例如 delay(time: Long),可以掛起協(xié)程,并且不會(huì)阻塞當(dāng)前線程。
直接看ExecutorCoroutineDispatcherBase源碼:
//internal 修飾,表示包內(nèi)可見(jiàn)
internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispatcher(), Delay {
private var removesFutureOnCancellation: Boolean = false
internal fun initFutureCancellation() {
removesFutureOnCancellation = removeFutureOnCancel(executor)
}
override fun dispatch(context: CoroutineContext, block: Runnable) {
try {
executor.execute(timeSource.wrapTask(block))
} catch (e: RejectedExecutionException) {
timeSource.unTrackTask()
DefaultExecutor.enqueue(block)
}
}
/*
* removesFutureOnCancellation is required to avoid memory leak.
* On Java 7+ we reflectively invoke ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true) and we're fine.
* On Java 6 we're scheduling time-based coroutines to our own thread safe heap which supports cancellation.
*/
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val future = if (removesFutureOnCancellation) {
scheduleBlock(ResumeUndispatchedRunnable(this, continuation), timeMillis, TimeUnit.MILLISECONDS)
} else {
null
}
// If everything went fine and the scheduling attempt was not rejected -- use it
if (future != null) {
continuation.cancelFutureOnCancellation(future)
return
}
// Otherwise fallback to default executor
DefaultExecutor.scheduleResumeAfterDelay(timeMillis, continuation)
}
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
val future = if (removesFutureOnCancellation) {
scheduleBlock(block, timeMillis, TimeUnit.MILLISECONDS)
} else {
null
}
return if (future != null ) DisposableFutureHandle(future) else DefaultExecutor.invokeOnTimeout(timeMillis, block)
}
private fun scheduleBlock(block: Runnable, time: Long, unit: TimeUnit): ScheduledFuture<*>? {
return try {
(executor as? ScheduledExecutorService)?.schedule(block, time, unit)
} catch (e: RejectedExecutionException) {
null
}
}
override fun close() {
(executor as? ExecutorService)?.shutdown()
}
override fun toString(): String = executor.toString()
override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherBase && other.executor === executor
override fun hashCode(): Int = System.identityHashCode(executor)
}
首先看下 dispatch() 方法:
override fun dispatch(context: CoroutineContext, block: Runnable) {
try {
executor.execute(timeSource.wrapTask(block))
} catch (e: RejectedExecutionException) {
timeSource.unTrackTask()
DefaultExecutor.enqueue(block)
}
}
直接調(diào)用了 executor.execute() 執(zhí)行封裝好的 Runnable,也就是之前通過(guò) launch 或者 async 傳入的代碼塊封裝的,不過(guò)這之前會(huì)先用 timeSource.wrapTask(block),包裹一下這個(gè) Runnable,那么在 wrapTask() 里面做了什么操作?其實(shí),沒(méi)做啥,相反,我們看到了其它一些代碼:
internal object DefaultTimeSource : TimeSource {
override fun currentTimeMillis(): Long = System.currentTimeMillis()
override fun nanoTime(): Long = System.nanoTime()
override fun wrapTask(block: Runnable): Runnable = block
override fun trackTask() {}
override fun unTrackTask() {}
override fun registerTimeLoopThread() {}
override fun unregisterTimeLoopThread() {}
override fun parkNanos(blocker: Any, nanos: Long) {
LockSupport.parkNanos(blocker, nanos)
}
override fun unpark(thread: Thread) {
LockSupport.unpark(thread)
}
}
internal var timeSource: TimeSource = DefaultTimeSource
也就是說(shuō),Coroutine 提供的阻塞功能是通過(guò)LockSupport 掛起的。
CoroutineDispatcher 選擇
創(chuàng)建 Dispatcher 的時(shí)候會(huì)創(chuàng)建一個(gè) CoroutineScheduler,代碼如下:
//第一次調(diào)用 Dispatcher.Default 會(huì)執(zhí)行此方法
public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
//接著會(huì)根據(jù)useCoroutinesScheduler 創(chuàng)建 DefaultScheduler 或者 CommonPool
internal actual fun createDefaultDispatcher(): CoroutineDispatcher =
if (useCoroutinesScheduler) DefaultScheduler else CommonPool
//接著這里創(chuàng)建 CoroutineScheduler,既是一個(gè) Dispatcher 也是一個(gè) Executor
private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
這里存在兩種 Dispatcher(也是線程池Executor),那么各自存在什么特點(diǎn)同時(shí)又有什么不同呢?
CoroutineStart
通過(guò) launch{} 等創(chuàng)建一個(gè)協(xié)程的時(shí)候,你也可以傳入一個(gè) CoroutineStart 枚舉值,這個(gè)枚舉值參數(shù)定義了 CoroutineBuilder 的執(zhí)行 Coroutine 的時(shí)機(jī),具體的時(shí)機(jī)由以下幾種:
- DEFAULTE:會(huì)根據(jù)該 Coroutine 依賴的 Context 立刻執(zhí)行該 Coroutine
- LAZY:按需執(zhí)行 Coroutine,僅僅在你調(diào)用了 Job.start() 或者 Job.await() 之后會(huì)執(zhí)行
- ATOMIC: 原子操作類型,也就是說(shuō)會(huì)根據(jù)依賴的Context 執(zhí)行 Coroutine,但是該 Coroutine 不可取消。對(duì)比 DEFAULT 類型,它不不可取消,不能通過(guò) job.cancel() 去取消的。
- UNDISPATCHED:會(huì)在當(dāng)前線程第一個(gè)掛起點(diǎn)執(zhí)行 Coroutine
關(guān)于 Lazy 模式,簡(jiǎn)單例子如下:
fun testLazy() {
doLog("testLazy")
val job = GlobalScope.launch(start = CoroutineStart.LAZY) {
doLog("I'm begin")
delay(1000)
doLog("I'm finish")
}
runBlocking {
delay(1000)
doLog("Job start")
job.start()
}
}
設(shè)置該 launch 為 CoroutineStart 為 Lazy,則 Coroutine 會(huì)在 Job.start() 之后執(zhí)行。
輸入結(jié)果如下:
01-14 12:30:51.128 22558-22558/com.yy.yylite.kotlinshare I/Context: testLazy
01-14 12:30:52.137 22558-22558/com.yy.yylite.kotlinshare I/Context: Job start
01-14 12:30:52.139 22558-22607/com.yy.yylite.kotlinshare I/Context: I'm begin
01-14 12:30:53.142 22558-22609/com.yy.yylite.kotlinshare I/Context: I'm finish
簡(jiǎn)單看下源碼:
//launch{} 源碼中 Coroutine.launch()
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
//Builder.common.kt 文件中
private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
override val cancelsParent: Boolean get() = true
override fun handleJobException(exception: Throwable) = handleExceptionViaHandler(parentContext, exception)
}
private class LazyStandaloneCoroutine(
parentContext: CoroutineContext,
block: suspend CoroutineScope.() -> Unit
) : StandaloneCoroutine(parentContext, active = false) {
private var block: (suspend CoroutineScope.() -> Unit)? = block
override fun onStart() {
val block = checkNotNull(this.block) { "Already started" }
this.block = null
block.startCoroutineCancellable(this, this)
}
}
也就是說(shuō),默認(rèn)創(chuàng)建的是 StandaloneCoroutine(newContext,active = true),Lazy 模式創(chuàng)建的是 LazyStandaloneCoroutine(newContext, block) 其 active 是 false。
CoroutineExceptionHandler
/**
* An optional element in the coroutine context to handle uncaught exceptions.
*
* Normally, uncaught exceptions can only result from coroutines created using [launch][CoroutineScope.launch] builder.
* A coroutine that was created using [async][CoroutineScope.async] always catches all its exceptions and represents them
* in the resulting [Deferred] object.
*
* By default, when no handler is installed, uncaught exception are handled in the following way:
* * If exception is [CancellationException] then it is ignored
* (because that is the supposed mechanism to cancel the running coroutine)
* * Otherwise:
* * if there is a [Job] in the context, then [Job.cancel] is invoked;
* * Otherwise, all instances of [CoroutineExceptionHandler] found via [ServiceLoader]
* * and current thread's [Thread.uncaughtExceptionHandler] are invoked.
**/
默認(rèn)的情況下,使用 launch{} 會(huì)默認(rèn)使用一個(gè) CoroutineExceptionHandler,但是如果是 async 的話,你會(huì)通過(guò) Deferred 獲得失敗的情況。
默認(rèn)情況下,CoroutineExceptionHandler 會(huì)處理一些異常情況。
下一章節(jié),我會(huì)介紹一下 launch 的原理和流程