kotlin coroutines 協(xié)程教程(二)關(guān)鍵類分析

原理篇(一)關(guān)鍵類的分析

上面簡(jiǎn)單的介紹了一些用法,但是具體的原理和特點(diǎn),好像還不是很清楚,那么下面就來(lái)介紹一下,一些關(guān)鍵的類,流程和原理。

介紹的相關(guān)的原理基于這行代碼:

    fun coroTest() {
        GlobalScope.launch {
            delay(1000L)//Delays coroutine for a given time without blocking a thread and resumes it after a specified time
            Log.i(CO_TAG, "launch ")
        }
        Log.i(CO_TAG, "----")
    }

然后貼上 launch() 的源碼:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

CoroutineScope

源碼分析

首先,launch() 是 CoroutineScope 的一個(gè)擴(kuò)展函數(shù),

CoroutineScope 簡(jiǎn)單來(lái)說(shuō),就是協(xié)程的作用范圍,每一個(gè) Coroutine Builder,例如 CoroutineScope.launch,都是 CoroutineScope 的擴(kuò)展,并且繼承了其 coroutineContext .

Corotine 提供了全局的 CoroutineScope 也就是 GlobalScope,簡(jiǎn)單看下其代碼:

//CoroutineScope.kt 中
public object GlobalScope : CoroutineScope {
    /**
     * Returns [EmptyCoroutineContext].
     */
    override val coroutineContext: CoroutineContext
        get() = EmptyCoroutineContext
}

//EmptyCorotineContext in CorotineContextImpl.kt 文件中
public object EmptyCoroutineContext : CoroutineContext, Serializable {
    private const val serialVersionUID: Long = 0
    private fun readResolve(): Any = EmptyCoroutineContext

    public override fun <E : Element> get(key: Key<E>): E? = null
    public override fun <R> fold(initial: R, operation: (R, Element) -> R): R = initial
    public override fun plus(context: CoroutineContext): CoroutineContext = context
    public override fun minusKey(key: Key<*>): CoroutineContext = this
    public override fun hashCode(): Int = 0
    public override fun toString(): String = "EmptyCoroutineContext"
}

簡(jiǎn)單來(lái)說(shuō),GlobalScope 沒(méi)有綁定任何 job,它用于構(gòu)建最頂級(jí)的 coroutines,這些協(xié)程的生命周期跟隨這個(gè) Application,并且在 Application 生命周期結(jié)束之前,不會(huì)被 cancel。

關(guān)鍵函數(shù)分析

CoroutinScope 主要包含了以下擴(kuò)展函數(shù):

actor fun <E> CoroutineScope.actor( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, start: CoroutineStart = CoroutineStart.DEFAULT, onCompletion: CompletionHandler? = null, block: suspend ActorScope<E>.() -> Unit): SendChannel<E>Launches new coroutine that is receiving messages from its mailbox channel and returns a reference to its mailbox channel as a SendChannel. The resulting object can be used to send messages to this coroutine.
async fun <T> CoroutineScope.async( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> T): Deferred<T>Creates new coroutine and returns its future result as an implementation of Deferred. The running coroutine is cancelled when the resulting deferred is cancelled.
broadcast fun <E> CoroutineScope.broadcast( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 1, start: CoroutineStart = CoroutineStart.LAZY, onCompletion: CompletionHandler? = null, block: suspend ProducerScope<E>.() -> Unit): BroadcastChannel<E>Launches new coroutine to produce a stream of values by sending them to a broadcast channel and returns a reference to the coroutine as a BroadcastChannel. The resulting object can be used to subscribe to elements produced by this coroutine.
cancel fun CoroutineScope.cancel(): UnitCancels this scope, including its job and all its children. Throws IllegalStateExceptionif the scope does not have a job in it.
launch fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit): JobLaunches new coroutine without blocking current thread and returns a reference to the coroutine as a Job. The coroutine is cancelled when the resulting job is cancelled.
newCoroutineContext fun CoroutineScope.newCoroutineContext( context: CoroutineContext): CoroutineContextCreates context for the new coroutine. It installs Dispatchers.Default when no other dispatcher nor ContinuationInterceptor is specified, and adds optional support for debugging facilities (when turned on).
plus operator fun CoroutineScope.plus( context: CoroutineContext): CoroutineScopeAdds the specified coroutine context to this scope, overriding existing elements in the current scope’s context with the corresponding keys.
produce fun <E> CoroutineScope.produce( context: CoroutineContext = EmptyCoroutineContext, capacity: Int = 0, block: suspend ProducerScope<E>.() -> Unit): ReceiveChannel<E>Launches new coroutine to produce a stream of values by sending them to a channel and returns a reference to the coroutine as a ReceiveChannel. This resulting object can be used to receive elements produced by this coroutine.
  • actor{} 會(huì)啟動(dòng)一個(gè)能夠接收 Message 的 Coroutine,也就是 SendChannel.你可以通過(guò)SendChannel.send() 方法給其它協(xié)程發(fā)送消息或者結(jié)果,或者通過(guò) SendChannel.offer() 發(fā)送消息,兩者區(qū)別在于 offer() 在隊(duì)列滿的時(shí)候,會(huì)返回失敗,而 send() 則不會(huì)。對(duì)應(yīng)接收消息的是 ReceiveChannel。
  • async{} 會(huì)啟動(dòng)一個(gè)新的協(xié)程,并且返回一個(gè) Deferred(它是一個(gè) future 的實(shí)現(xiàn)),你可以通過(guò) Deferred.await() 異步獲取結(jié)果。
  • broadcast{} 和 actor 類似,只不過(guò),BroadcastChannel 是一種一對(duì)多的訂閱關(guān)系。
  • launch{} 啟動(dòng)一個(gè)新的協(xié)程,并且返回一個(gè) Job 對(duì)象,你可以通過(guò)這個(gè) Job 取消協(xié)程
  • plus 也就是 + ,通過(guò)該操作,你可以給該 CoroutineScope 關(guān)聯(lián)一個(gè) CoroutineContext,并且如果存在同一個(gè) key 會(huì)替換之前的 CoroutineContext。
  • newConroutinContext 會(huì)給當(dāng)前 CoroutineScop 創(chuàng)建一個(gè) ,如果不存在 Dispatcher 或者 ContinuationInterceptor 則會(huì)給這個(gè) Context 關(guān)聯(lián)默認(rèn)的 Dispatcher.Default。
  • produce{} 啟動(dòng)一個(gè)新的協(xié)程,并且返回 receiveChannel。

官方說(shuō)明鏈接 CoroutineScope

CoroutineContext

協(xié)程是運(yùn)行在 CoroutinesContext 的一些集合里面,根據(jù)官方文檔的意思

* Persistent context for the coroutine. It is an indexed set of [Element] instances.
* An indexed set is a mix between a set and a map.
* Every element in this set has a unique [Key]. Keys are compared _by reference_.

也就是說(shuō),CoroutineContext 是 coroutine 的運(yùn)行的 Context,它是 Element 實(shí)例的集合,這種集合介于 set 和map 之間,每一個(gè) Element 都有一個(gè)和對(duì)象引用相關(guān)的 key,作為 Element 的唯一標(biāo)志。

簡(jiǎn)單看下其源碼:

public interface CoroutineContext {
    /**
     * Returns the element with the given [key] from this context or `null`.
     * Keys are compared _by reference_, that is to get an element from the context the reference to its actual key
     * object must be presented to this function.
     通過(guò)key 獲得對(duì)應(yīng)的CoroutineContext
     */
    public operator fun <E : Element> get(key: Key<E>): E?

    /**
     * Accumulates entries of this context starting with [initial] value and applying [operation]
     * from left to right to current accumulator value and each element of this context.
     使用 initial 作為初始值,operation 作為累加操作
     */
    public fun <R> fold(initial: R, operation: (R, Element) -> R): R

    /**
     * Returns a context containing elements from this context and elements from  other [context].
     * The elements from this context with the same key as in the other one are dropped.
     */
    public operator fun plus(context: CoroutineContext): CoroutineContext =
        if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
            context.fold(this) { acc, element ->
                val removed = acc.minusKey(element.key)
                if (removed === EmptyCoroutineContext) element else {
                    // make sure interceptor is always last in the context (and thus is fast to get when present)
                    val interceptor = removed[ContinuationInterceptor]
                    if (interceptor == null) CombinedContext(removed, element) else {
                        val left = removed.minusKey(ContinuationInterceptor)
                        if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                            CombinedContext(CombinedContext(left, element), interceptor)
                    }
                }
            }

    /**
     * Returns a context containing elements from this context, but without an element with
     * the specified [key]. Keys are compared _by reference_, that is to remove an element from the context
     * the reference to its actual key object must be presented to this function.
     */
    public fun minusKey(key: Key<*>): CoroutineContext

    /**
     * Key for the elements of [CoroutineContext]. [E] is a type of element with this key.
     * Keys in the context are compared _by reference_.
     */
    public interface Key<E : Element>

    /**
     * An element of the [CoroutineContext]. An element of the coroutine context is a singleton context by itself.
     */
    public interface Element : CoroutineContext {
        /**
         * A key of this coroutine context element.
         */
        public val key: Key<*>

        public override operator fun <E : Element> get(key: Key<E>): E? =
            @Suppress("UNCHECKED_CAST")
            if (this.key == key) this as E else null

        public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
            operation(initial, this)

        public override fun minusKey(key: Key<*>): CoroutineContext =
            if (this.key == key) EmptyCoroutineContext else this
    }
}

簡(jiǎn)單介紹下這里相關(guān)的類以及函數(shù):

相關(guān)類

  1. Element:CoroutineContext 的一個(gè)元素,任何一個(gè) Coroutine context 都是自己唯一的單例。
  2. Key: 作為CoroutineContext 的key,這些 Key 都是通過(guò)對(duì)象引用比較的,也就是說(shuō),同一個(gè)對(duì)象的key 才是相同的。這里也就是提供一種泛型的能力,使子類的CoroutineContext 的key,可以是任何繼承自 CoroutineContext 的類實(shí)例。
  3. CoroutineContext:如之前的介紹

相關(guān)函數(shù)

  1. public operator fun <E : Element> get(key: Key<E>): E? 根據(jù)給定的 Key 獲取特定的 CoroutineContext
  2. public fun <R> fold(initial: R, operation: (R, Element) -> R): R 把傳入的 initial 作為初始變量,通過(guò)傳入的 operation 操作,累次操作 Context 中的 Element。
  3. public operator fun plus(context: CoroutineContext): CoroutineContext 返回一個(gè)包含了當(dāng)前 Context 和傳入的 CoroutineContext 包含的所有的 Element 的一個(gè) Context。如果是兩者都存在的 Element 會(huì)被刪除掉。
  4. public fun minusKey(key: Key<*>): CoroutineContext 根據(jù)傳入的 key 返回該Context 存在的但是不包含傳入的 Keys 的或集。

具體怎樣去理解這些內(nèi)容呢?我們結(jié)合 CoroutineContextImpl.kt 文件里面提供的幾個(gè) Context 來(lái)理解一下:

例如 EmptyCoroutineContext

public object EmptyCoroutineContext : CoroutineContext, Serializable {
    private const val serialVersionUID: Long = 0
    private fun readResolve(): Any = EmptyCoroutineContext
    //EmptyCoroutineContext 不包含一個(gè)任何一個(gè) Element
    public override fun <E : Element> get(key: Key<E>): E? = null
    public override fun <R> fold(initial: R, operation: (R, Element) -> R): R = initial
    public override fun plus(context: CoroutineContext): CoroutineContext = context
    public override fun minusKey(key: Key<*>): CoroutineContext = this
    public override fun hashCode(): Int = 0
    public override fun toString(): String = "EmptyCoroutineContext"
}

CoroutineDispatcher

簡(jiǎn)單用法

CoroutineDispatcher 每一個(gè)CoroutineContext 都包含一個(gè) CoroutineDispatcher ,它設(shè)置了對(duì)應(yīng)的協(xié)程使用一個(gè)或者多個(gè)線程,協(xié)程調(diào)度器可以將協(xié)程的執(zhí)行局限在指定的線程中,調(diào)度它運(yùn)行在線程池中或讓它不受限的運(yùn)行。

先來(lái)看下具體怎么使用它吧,下面是一個(gè)簡(jiǎn)單的例子:

    fun dispatchTest() {
        runBlocking {
            launch {
                doLog("in main thread")
            }
            launch(Dispatchers.Unconfined) {
                doLog("in Unconfined thread 1")
            }

            launch(Dispatchers.Default) {
                doLog(" in child thread 2")
            }

            launch(newSingleThreadContext("child thread3")) {
                doLog("in new thread")
            }

        }
    }

然后控制臺(tái)輸出如下:

01-12 17:20:48.212 14843-14843/com.yy.yylite.kotlinshare I/Context: in Unconfined thread 1
01-12 17:20:48.213 14843-14950/com.yy.yylite.kotlinshare I/Context:  in child thread 2
01-12 17:20:48.215 14843-14843/com.yy.yylite.kotlinshare I/Context: in main thread
01-12 17:20:48.215 14843-14953/com.yy.yylite.kotlinshare I/Context: in new thread

所以我們可以指定一個(gè)協(xié)程在特定的子線程執(zhí)行。

如果直接調(diào)用 launch,不指定Dispatcher,那么它使用的是啟動(dòng)它的 CoroutineScope 的Context 以及 Dispatch。

常見(jiàn)的 Dispatcher 如下:

  • Dispatchers.Default:默認(rèn)的 Dispatcher,會(huì)在jvm 層級(jí)共享線程池,會(huì)創(chuàng)建等于 cpu 核數(shù)的線程數(shù)目,但是始終大于等于2,因?yàn)?cpu 是非常珍貴的資源所以使用 Default Dispatcher 是非常合理的。
  • Dispatchers.IO :用于IO 操作的Dispatcher,是按需創(chuàng)建的線程池。
  • Dispatcher.Main:主線程的 Dispatcher
  • Dispatcher.UnConfined:不確定的 Dispatcher,會(huì)在調(diào)用的地方創(chuàng)建 Coroutine,但是會(huì)在任意線程執(zhí)行 Coroutine,取決于第一個(gè)掛起的協(xié)程的返回結(jié)果。

關(guān)于 Dispatcher.UnConfined 我們?cè)趺慈ダ斫饽??先看一個(gè)例子:

/**
 * 非限定的 Dispatcher
 */
fun testDispatcher() = runBlocking {
    launch(Dispatchers.Unconfined) {
        doLog("start coroutine")
        delay(500)
        doLog("after delay")
    }

    launch {
        doLog("in main start coroutine ")
        delay(1000)
        doLog("in main after delay")
    }
    
}

然后控制臺(tái)輸出結(jié)果如下:

01-19 10:22:19.785 30584-30584/com.yy.yylite.kotlinshare I/Context: start coroutine
01-19 10:22:19.788 30584-30584/com.yy.yylite.kotlinshare I/Context: in main start coroutine 
01-19 10:22:20.286 30584-30723/com.yy.yylite.kotlinshare I/Context: after delay
01-19 10:22:20.788 30584-30584/com.yy.yylite.kotlinshare I/Context: in main after delay

也就是說(shuō),Unconfined 類型的 Dispatcher最終分發(fā)的線程,是不確定的。

源碼分析

協(xié)程執(zhí)行在特定的 CoroutineContext,而CoroutineContext 中總是有一個(gè)特定的 Dispatcher,負(fù)責(zé)分發(fā)協(xié)程,根據(jù)官網(wǎng)對(duì) CoroutineDispatcher 的說(shuō)明如下:

Coroutine context includes a coroutine dispatcher (see [CoroutineDispatcher]) that determines what thread or threads
the corresponding coroutine uses for its execution. Coroutine dispatcher can confine coroutine execution
to a specific thread, dispatch it to a thread pool, or let it run unconfined.

簡(jiǎn)單來(lái)說(shuō),CoroutineDispatcher 決定了當(dāng)前 Coroutine 在哪個(gè)線程或者哪幾個(gè)線程中執(zhí)行,可能是某個(gè)特定的線程,也可能是分發(fā)到線程池或者是 unconfined。

繼承結(jié)構(gòu)

public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {}

也就是說(shuō) CoroutineDispatcher 既是一種 Element 也是 ContinuationInterceptor。

其存在多個(gè)子類,如下:

image

其中 ExecutorCoroutineDispatcher 是基于線程池 Executor 來(lái)分發(fā)任務(wù)的,并且每一個(gè)線程池,需要Dispatcher 自己本身去關(guān)閉。

MainCoroutinCoroutineDispatcher 是在主線程分發(fā)的特殊 Dispatcher,你可以通過(guò) Dispatcher.Mian 獲得,它是一個(gè) Object,可以被直接訪問(wèn)。

關(guān)鍵函數(shù)分析

/**
 * 是否需要將協(xié)程執(zhí)行分發(fā)到其它線程,如果是 true 則表示需要,false 表示不需要;一般都是返回 true
 */
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true

/**
 * Dispatches execution of a runnable [block] onto another thread in the given [context].
   將協(xié)程的執(zhí)行代碼,也就是 Runnable 分發(fā)到指定 Context 的線程
 */
public abstract fun dispatch(context: CoroutineContext, block: Runnable)

    /**
     * Returns continuation that wraps the original [continuation], thus intercepting all resumptions.
    返回一封裝了原始 continuation 的 continuation,實(shí)現(xiàn)可以攔截所有 Coroutin 的 resumtions
     */
    public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        DispatchedContinuation(this, continuation)

從上面的介紹,我們可以大概的知道,dispatch() 類似java Executor 里面的 void execute(Runnable command);方法,幫助執(zhí)行子線程的方法。

ExecutorCoroutineDispatcher

ExecutorCoroutineDispatcher 是 CoroutineDispatcher 的子類,主要用于在子線程分發(fā) Coroutine 的場(chǎng)景,源碼比較簡(jiǎn)單,直接看吧:

public abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closeable {
    /**
     * Closes this coroutine dispatcher and shuts down its executor.
     *
     * It may throw an exception if this dispatcher is global and cannot be closed.
     */
    public abstract override fun close()

    /**
     * Underlying executor of current [CoroutineDispatcher].
     */
    public abstract val executor: Executor
}

這里包含了一個(gè) Executor 線程池類的變量,然后就是定義了一個(gè) close() 方法,用于關(guān)閉線程池。需要注意的是,該類間接繼承了 CoroutineContext 類,所以存在 cancel() 以及 cancelChildren() 方法,用于取消當(dāng)前 Job 或者該 Context 的Children Job。

該類的子類是 ExecutorCoroutineDispatcherBase,看下面分析。

ExecutorCoroutineDispatcherBase

繼承結(jié)構(gòu)

internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispatcher(), Delay {}

除了繼承ExecutorCoroutineDispatcher 之外,還實(shí)現(xiàn)了Delay 接口,Delay 接口如下:

public interface Delay {
    /**
       掛起協(xié)程,并且不會(huì)阻塞當(dāng)前線程
     */
    suspend fun delay(time: Long) {
        if (time <= 0) return // don't delay
        return suspendCancellableCoroutine { scheduleResumeAfterDelay(time, it) }
    }
    /**
     */
    fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>)

    /**
     */
    fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle =
        DefaultDelay.invokeOnTimeout(timeMillis, block)
}

Delay 接口,使得Dispatcher 具備任務(wù)調(diào)度的能力,例如 delay(time: Long),可以掛起協(xié)程,并且不會(huì)阻塞當(dāng)前線程。

直接看ExecutorCoroutineDispatcherBase源碼:

//internal 修飾,表示包內(nèi)可見(jiàn)
internal abstract class ExecutorCoroutineDispatcherBase : ExecutorCoroutineDispatcher(), Delay {

    private var removesFutureOnCancellation: Boolean = false

    internal fun initFutureCancellation() {
        removesFutureOnCancellation = removeFutureOnCancel(executor)
    }

    override fun dispatch(context: CoroutineContext, block: Runnable) {
        try {
            executor.execute(timeSource.wrapTask(block))
        } catch (e: RejectedExecutionException) {
            timeSource.unTrackTask()
            DefaultExecutor.enqueue(block)
        }
    }

    /*
     * removesFutureOnCancellation is required to avoid memory leak.
     * On Java 7+ we reflectively invoke ScheduledThreadPoolExecutor.setRemoveOnCancelPolicy(true) and we're fine.
     * On Java 6 we're scheduling time-based coroutines to our own thread safe heap which supports cancellation.
     */
    override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
        val future = if (removesFutureOnCancellation) {
            scheduleBlock(ResumeUndispatchedRunnable(this, continuation), timeMillis, TimeUnit.MILLISECONDS)
        } else {
            null
        }
        // If everything went fine and the scheduling attempt was not rejected -- use it
        if (future != null) {
            continuation.cancelFutureOnCancellation(future)
            return
        }
        // Otherwise fallback to default executor
        DefaultExecutor.scheduleResumeAfterDelay(timeMillis, continuation)
    }

    override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
        val future = if (removesFutureOnCancellation) {
            scheduleBlock(block, timeMillis, TimeUnit.MILLISECONDS)
        } else {
            null
        }

        return if (future != null ) DisposableFutureHandle(future) else DefaultExecutor.invokeOnTimeout(timeMillis, block)
    }

    private fun scheduleBlock(block: Runnable, time: Long, unit: TimeUnit): ScheduledFuture<*>? {
        return try {
            (executor as? ScheduledExecutorService)?.schedule(block, time, unit)
        } catch (e: RejectedExecutionException) {
            null
        }
    }

    override fun close() {
        (executor as? ExecutorService)?.shutdown()
    }

    override fun toString(): String = executor.toString()
    override fun equals(other: Any?): Boolean = other is ExecutorCoroutineDispatcherBase && other.executor === executor
    override fun hashCode(): Int = System.identityHashCode(executor)
}

首先看下 dispatch() 方法:

override fun dispatch(context: CoroutineContext, block: Runnable) {
    try {
        executor.execute(timeSource.wrapTask(block))
    } catch (e: RejectedExecutionException) {
        timeSource.unTrackTask()
        DefaultExecutor.enqueue(block)
    }
}

直接調(diào)用了 executor.execute() 執(zhí)行封裝好的 Runnable,也就是之前通過(guò) launch 或者 async 傳入的代碼塊封裝的,不過(guò)這之前會(huì)先用 timeSource.wrapTask(block),包裹一下這個(gè) Runnable,那么在 wrapTask() 里面做了什么操作?其實(shí),沒(méi)做啥,相反,我們看到了其它一些代碼:

internal object DefaultTimeSource : TimeSource {
    override fun currentTimeMillis(): Long = System.currentTimeMillis()
    override fun nanoTime(): Long = System.nanoTime()
    override fun wrapTask(block: Runnable): Runnable = block
    override fun trackTask() {}
    override fun unTrackTask() {}
    override fun registerTimeLoopThread() {}
    override fun unregisterTimeLoopThread() {}

    override fun parkNanos(blocker: Any, nanos: Long) {
        LockSupport.parkNanos(blocker, nanos)
    }

    override fun unpark(thread: Thread) {
        LockSupport.unpark(thread)
    }
}

internal var timeSource: TimeSource = DefaultTimeSource

也就是說(shuō),Coroutine 提供的阻塞功能是通過(guò)LockSupport 掛起的。

CoroutineDispatcher 選擇

創(chuàng)建 Dispatcher 的時(shí)候會(huì)創(chuàng)建一個(gè) CoroutineScheduler,代碼如下:

//第一次調(diào)用 Dispatcher.Default 會(huì)執(zhí)行此方法
 public actual val Default: CoroutineDispatcher = createDefaultDispatcher()

//接著會(huì)根據(jù)useCoroutinesScheduler 創(chuàng)建 DefaultScheduler 或者 CommonPool
internal actual fun createDefaultDispatcher(): CoroutineDispatcher =
    if (useCoroutinesScheduler) DefaultScheduler else CommonPool

//接著這里創(chuàng)建 CoroutineScheduler,既是一個(gè) Dispatcher 也是一個(gè) Executor
private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)

這里存在兩種 Dispatcher(也是線程池Executor),那么各自存在什么特點(diǎn)同時(shí)又有什么不同呢?

CoroutineStart

通過(guò) launch{} 等創(chuàng)建一個(gè)協(xié)程的時(shí)候,你也可以傳入一個(gè) CoroutineStart 枚舉值,這個(gè)枚舉值參數(shù)定義了 CoroutineBuilder 的執(zhí)行 Coroutine 的時(shí)機(jī),具體的時(shí)機(jī)由以下幾種:

  • DEFAULTE:會(huì)根據(jù)該 Coroutine 依賴的 Context 立刻執(zhí)行該 Coroutine
  • LAZY:按需執(zhí)行 Coroutine,僅僅在你調(diào)用了 Job.start() 或者 Job.await() 之后會(huì)執(zhí)行
  • ATOMIC: 原子操作類型,也就是說(shuō)會(huì)根據(jù)依賴的Context 執(zhí)行 Coroutine,但是該 Coroutine 不可取消。對(duì)比 DEFAULT 類型,它不不可取消,不能通過(guò) job.cancel() 去取消的。
  • UNDISPATCHED:會(huì)在當(dāng)前線程第一個(gè)掛起點(diǎn)執(zhí)行 Coroutine

關(guān)于 Lazy 模式,簡(jiǎn)單例子如下:

fun testLazy() {
    doLog("testLazy")
    val job = GlobalScope.launch(start = CoroutineStart.LAZY) {
        doLog("I'm begin")
        delay(1000)
        doLog("I'm finish")
    }

    runBlocking {
        delay(1000)
        doLog("Job start")
        job.start()
    }
}

設(shè)置該 launch 為 CoroutineStart 為 Lazy,則 Coroutine 會(huì)在 Job.start() 之后執(zhí)行。

輸入結(jié)果如下:

01-14 12:30:51.128 22558-22558/com.yy.yylite.kotlinshare I/Context: testLazy
01-14 12:30:52.137 22558-22558/com.yy.yylite.kotlinshare I/Context: Job start
01-14 12:30:52.139 22558-22607/com.yy.yylite.kotlinshare I/Context: I'm begin
01-14 12:30:53.142 22558-22609/com.yy.yylite.kotlinshare I/Context: I'm finish

簡(jiǎn)單看下源碼:

//launch{} 源碼中 Coroutine.launch()
val coroutine = if (start.isLazy)
    LazyStandaloneCoroutine(newContext, block) else
    StandaloneCoroutine(newContext, active = true)
//Builder.common.kt 文件中
private open class StandaloneCoroutine(
    parentContext: CoroutineContext,
    active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
    override val cancelsParent: Boolean get() = true
    override fun handleJobException(exception: Throwable) = handleExceptionViaHandler(parentContext, exception)
}

private class LazyStandaloneCoroutine(
    parentContext: CoroutineContext,
    block: suspend CoroutineScope.() -> Unit
) : StandaloneCoroutine(parentContext, active = false) {
    private var block: (suspend CoroutineScope.() -> Unit)? = block

    override fun onStart() {
        val block = checkNotNull(this.block) { "Already started" }
        this.block = null
        block.startCoroutineCancellable(this, this)
    }
}

也就是說(shuō),默認(rèn)創(chuàng)建的是 StandaloneCoroutine(newContext,active = true),Lazy 模式創(chuàng)建的是 LazyStandaloneCoroutine(newContext, block) 其 active 是 false。

CoroutineExceptionHandler

/**
 * An optional element in the coroutine context to handle uncaught exceptions.
 *
 * Normally, uncaught exceptions can only result from coroutines created using [launch][CoroutineScope.launch] builder.
 * A coroutine that was created using [async][CoroutineScope.async] always catches all its exceptions and represents them
 * in the resulting [Deferred] object.
 *
 * By default, when no handler is installed, uncaught exception are handled in the following way:
 * * If exception is [CancellationException] then it is ignored
 *   (because that is the supposed mechanism to cancel the running coroutine)
 * * Otherwise:
 *     * if there is a [Job] in the context, then [Job.cancel] is invoked;
 *     * Otherwise, all instances of [CoroutineExceptionHandler] found via [ServiceLoader]
 *     * and current thread's [Thread.uncaughtExceptionHandler] are invoked.
 **/

默認(rèn)的情況下,使用 launch{} 會(huì)默認(rèn)使用一個(gè) CoroutineExceptionHandler,但是如果是 async 的話,你會(huì)通過(guò) Deferred 獲得失敗的情況。

默認(rèn)情況下,CoroutineExceptionHandler 會(huì)處理一些異常情況。
下一章節(jié),我會(huì)介紹一下 launch 的原理和流程

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容