協(xié)程調(diào)度器詳解

協(xié)程和線程的差異

  • 線程的目的是提高CPU資源使用率, 使多個(gè)任務(wù)得以并行的運(yùn)行,是為了服務(wù)于機(jī)器的.
  • 協(xié)程的目的是為了讓多個(gè)任務(wù)之間更好的協(xié)作,主要體現(xiàn)在代碼邏輯上,是為了服務(wù)開發(fā)者 (能提升資源的利用率, 但并不是原始目的)

協(xié)程的核心競(jìng)爭(zhēng)力

簡(jiǎn)化異步并發(fā)任務(wù)。

協(xié)程上下文 CoroutineContext

  • 協(xié)程總是運(yùn)行在一些以 CoroutineContext 類型為代表的上下文中 ,協(xié)程上下文是各種不同元素的集合
  • 集合內(nèi)部的元素Element是根據(jù)key去對(duì)應(yīng)(Map特點(diǎn)),但是不允許重復(fù)(Set特點(diǎn))
  • Element之間可以通過+號(hào)進(jìn)行組合
  • Element有如下四類,共同組成了CoroutineContext
    • Job:協(xié)程的唯一標(biāo)識(shí),用來控制協(xié)程的生命周期(new、activecompleting、completed、cancelling、cancelled)
    • CoroutineDispatcher:指定協(xié)程運(yùn)行的線程(IO、Default、MainUnconfined)
    • CoroutineName: 指定協(xié)程的名稱,默認(rèn)為coroutine
    • CoroutineExceptionHandler: 指定協(xié)程的異常處理器,用來處理未捕獲的異常

它們的關(guān)系如圖所示:

[圖片上傳失敗...(image-9c6306-1637655771397)]

協(xié)程切換線程源碼分析

我們?cè)趨f(xié)程體內(nèi),可能通過withContextlaunch方法簡(jiǎn)單便捷的切換線程,用同步的方式寫異步代碼,這也是kotin協(xié)程的主要優(yōu)勢(shì)之一

示例:

private fun testDispatchers() = runBlocking {

    Log.d(TAG, "main                 : I'm working in thread ${Thread.currentThread().name}")

    launch(Dispatchers.Default) {
        Log.d(TAG, "launch Default       : I'm working in thread ${Thread.currentThread().name}")
    }

    withContext(Dispatchers.Default) {
        Log.d(TAG, "withContext Default  : I'm working in thread ${Thread.currentThread().name} ")
    }
}

輸出結(jié)果為:

[圖片上傳失敗...(image-e89054-1637655771397)]
從輸出結(jié)果可以看出,調(diào)用Dispatch.Default會(huì)由主線程切換到DefaultDispatcher-worker-3線程,而且launchwithContext切換的線程是相同的。

launch方法解析

協(xié)程的發(fā)起方式如下

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    //創(chuàng)建協(xié)程上下文Context
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    //創(chuàng)建一個(gè)獨(dú)立協(xié)程并啟動(dòng)
    coroutine.start(start, coroutine, block)
    return coroutine
}

launch方法主要作用:
1、是創(chuàng)建新的上下文Context
2、創(chuàng)建并啟動(dòng)協(xié)程

組合一個(gè)新的Context

public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
    //根據(jù)傳入的Context 組合成新的上下文
    val combined = coroutineContext + context
    val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
    //如果發(fā)起的時(shí)候沒有傳入調(diào)度器,則使用默認(rèn)的Default
    return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
        debug + Dispatchers.Default else debug
}

從上述方法中能夠得出,此方法主要是
1、將launch方法傳入的contextCoroutineScope中的context組合起來
2、若combined中沒傳入一個(gè)調(diào)度器 ,則會(huì)默認(rèn)使用Dispatchers.Default調(diào)度器

創(chuàng)建一個(gè)獨(dú)立協(xié)程Coroutine

val coroutine = if (start.isLazy)
    LazyStandaloneCoroutine(newContext, block) else
    StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)

//繼承抽象協(xié)程類
private open class StandaloneCoroutine(
    parentContext: CoroutineContext,
    active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
   //省略......
}

//AbstractCoroutine類核心源碼
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T){
    initParentJob()
    start(block, receiver, this)
}

// CoroutineStart類核心源碼
public operator fun <T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>)
    when (this) {
        //launch 默認(rèn)為DEFAULT
        CoroutineStart.DEFAULT -> block.startCoroutineCancellable(completion)
        CoroutineStart.ATOMIC -> block.startCoroutine(completion)
        CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(completion)
        CoroutineStart.LAZY -> Unit // will start lazily
    }

創(chuàng)建一個(gè)協(xié)程體 Continuation

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
    runSafely(completion) {
        createCoroutineUnintercepted(receiver, completion)
            //如果需要?jiǎng)t進(jìn)行攔截處理
            .intercepted()
            //調(diào)用 resumeWith 方法      
            .resumeCancellableWith(Result.success(Unit))
    }

調(diào)用createCoroutineUnintercepted,會(huì)把我們的協(xié)程體即suspend block轉(zhuǎn)換成Continuation

public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
    (this as? ContinuationImpl)?.intercepted() ?: this

//ContinuationImpl類核心源碼
public fun intercepted(): Continuation<Any?> =
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }     

//CoroutineDispatcher類核心源碼
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
      DispatchedContinuation(this, continuation)           

從上述方法可以得出
1.interepted是個(gè)擴(kuò)展方法,最后會(huì)調(diào)用到ContinuationImpl.intercepted方法
2.在intercepted會(huì)利用CoroutineContext,獲取當(dāng)前的調(diào)度器
3.當(dāng)前調(diào)度器是CoroutineDispatcher,最終會(huì)返回一個(gè)DispatchedContinuation,我們也是利用它來實(shí)現(xiàn)線程切換的

調(diào)度處理

//DispatchedContinuation
public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>) = when (this) {
    is DispatchedContinuation -> resumeCancellableWith(result)
    else -> resumeWith(result)
}


@Suppress("NOTHING_TO_INLINE")
inline fun resumeCancellableWith(result: Result<T>) {
    val state = result.toState()
    //判斷是否需要切換線程
    if (dispatcher.isDispatchNeeded(context)) {
        _state = state
        resumeMode = MODE_CANCELLABLE
        //調(diào)用器進(jìn)行切換線程
        dispatcher.dispatch(context, this)
    } else {
        //Unconfined,會(huì)執(zhí)行該方法
        executeUnconfined(state, MODE_CANCELLABLE) {
            if (!resumeCancelled()) {
                resumeUndispatchedWith(result)
            }
        }
    }
}

上述分析可得出
1、判斷是否需要切換線程,如果需要?jiǎng)t調(diào)用dispatcher.dispatch()方法進(jìn)行切換線程
2、如果不需要切換線程 ,則直接在原有線程執(zhí)行。

withContext方法解析

public suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->

    //創(chuàng)建新的content
    val oldContext = uCont.context
    val newContext = oldContext + context
    
    ......
    
    //創(chuàng)建新的調(diào)度協(xié)程
    val coroutine = DispatchedCoroutine(newContext, uCont)
    //初始化父類Job
    coroutine.initParentJob()
    //開始一個(gè)可以取消的協(xié)程
    block.startCoroutineCancellable(coroutine, coroutine)
    coroutine.getResult()
}

private class DispatchedCoroutine<in T>(
    context: CoroutineContext,
    uCont: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {

    //在complete時(shí)會(huì)會(huì)回調(diào)
    override fun afterCompletion(state: Any?) {
        afterResume(state)
    }

    override fun afterResume(state: Any?) {
        //uCont就是父協(xié)程,context仍是老版context,因此可以切換回原來的線程上
        uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
    }
}

從上述方法可以得出,調(diào)用withContext方法最終也是調(diào)用uCont.intercepted().resumeCancellableWith方法與launch方法最后切換線程是相同的,
這里也說明了上面輸出結(jié)果,為什么二者調(diào)用同一調(diào)度器切換的線程是相同的。
也有不相同的時(shí)候,就是當(dāng)線程DefaultDispatcher-worker-1還沒創(chuàng)建成功的時(shí)候,withContext已經(jīng)需要切換線程時(shí),會(huì)再創(chuàng)建一個(gè)新的線程,如下圖所示

[圖片上傳失敗...(image-3e02ba-1637655771397)]

其切換線程的流程圖為:

[圖片上傳失敗...(image-d7223e-1637655771397)]

CoroutineDispatcher 作用

  • 用于指定協(xié)程的運(yùn)行線程
  • kotlin已經(jīng)內(nèi)置了CoroutineDispatcher的4個(gè)實(shí)現(xiàn),分別為 DispatchersDefaultIO、Main、Unconfined字段

public actual object Dispatchers {

    @JvmStatic
    public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
    
    @JvmStatic
    public val IO: CoroutineDispatcher = DefaultScheduler.IO
    
    @JvmStatic
    public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
    
    @JvmStatic
    public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
}

[圖片上傳失敗...(image-adf112-1637655771397)]

Dispatchers.Default

Default根據(jù)useCoroutinesScheduler屬性(默認(rèn)為true) 去獲取對(duì)應(yīng)的線程池

  • DefaultScheduler :Kotlin內(nèi)部自己實(shí)現(xiàn)的線程池邏輯
  • CommonPoolJava類庫(kù)中的Executor實(shí)現(xiàn)的線程池邏輯
internal actual fun createDefaultDispatcher(): CoroutineDispatcher =
    if (useCoroutinesScheduler) DefaultScheduler else CommonPool
internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
    .....
}

open class ExperimentalCoroutineDispatcher(
    private val corePoolSize: Int,
    private val maxPoolSize: Int,
    private val idleWorkerKeepAliveNs: Long,
    private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
    constructor(
        corePoolSize: Int = CORE_POOL_SIZE,
        maxPoolSize: Int = MAX_POOL_SIZE,
        schedulerName: String = DEFAULT_SCHEDULER_NAME
    ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)

    ......
}
//java類庫(kù)中的Executor實(shí)現(xiàn)線程池邏輯
internal object CommonPool : ExecutorCoroutineDispatcher() {}

如果想使用java類庫(kù)中的線程池該如何使用呢?也就是修改useCoroutinesScheduler屬性為false

internal const val COROUTINES_SCHEDULER_PROPERTY_NAME = "kotlinx.coroutines.scheduler"

internal val useCoroutinesScheduler = systemProp(COROUTINES_SCHEDULER_PROPERTY_NAME).let { value ->
    when (value) {
        null, "", "on" -> true
        "off" -> false
        else -> error("System property '$COROUTINES_SCHEDULER_PROPERTY_NAME' has unrecognized value '$value'")
    }
}

internal actual fun systemProp(
    propertyName: String
): String? =
    try {
       //獲取系統(tǒng)屬性
        System.getProperty(propertyName)
    } catch (e: SecurityException) {
        null
    }

從源碼中可以看到,使用過獲取系統(tǒng)屬性拿到的值, 那我們就可以通過修改系統(tǒng)屬性 去改變useCoroutinesScheduler的值,
具體修改方法為

 val properties = Properties()
 properties["kotlinx.coroutines.scheduler"] = "off"
 System.setProperties(properties)

DefaultScheduler的主要實(shí)現(xiàn)都在其父類 ExperimentalCoroutineDispatcher

open class ExperimentalCoroutineDispatcher(
    private val corePoolSize: Int,
    private val maxPoolSize: Int,
    private val idleWorkerKeepAliveNs: Long,
    private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
    public constructor(
        corePoolSize: Int = CORE_POOL_SIZE,
        maxPoolSize: Int = MAX_POOL_SIZE,
        schedulerName: String = DEFAULT_SCHEDULER_NAME
    ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)
    
    //省略......
    
    //創(chuàng)建CoroutineScheduler實(shí)例
    private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
    
    override val executor: Executorget() = coroutineScheduler

    //此方法也就是上文說到切換線程的方法
    override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
        try {
            //dispatch方法委托到CoroutineScheduler的dispatch方法
            coroutineScheduler.dispatch(block)
        } catch (e: RejectedExecutionException) {
            ....
        }

    //省略......
    
    //實(shí)現(xiàn)請(qǐng)求阻塞,執(zhí)行IO密集型任務(wù)
    public fun blocking(parallelism: Int = BLOCKING_DEFAULT_PARALLELISM): CoroutineDispatcher {
        require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" }
        return LimitingDispatcher(this, parallelism, null, TASK_PROBABLY_BLOCKING)
    }
    //實(shí)現(xiàn)并發(fā)數(shù)量限制,執(zhí)行CPU密集型任務(wù)
    public fun limited(parallelism: Int): CoroutineDispatcher {
        require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" }
        require(parallelism <= corePoolSize) { "Expected parallelism level lesser than core pool size ($corePoolSize), but have $parallelism" }
        return LimitingDispatcher(this, parallelism, null, TASK_NON_BLOCKING)
    }
    
   //省略......
}

從上文代碼可以提煉出
1、在ExperimentalCoroutineDispatcher類中創(chuàng)建協(xié)程調(diào)度線程池coroutineScheduler,通過該線程池來管理線程。
2、該類中的dispatch()方法,在協(xié)程切換線程中 dispatcher.dispatch(context, this) 調(diào)用。
3、其中 blocking()方法是執(zhí)行IO密集型任務(wù),limited()方法執(zhí)行CPU密集型任務(wù),
實(shí)現(xiàn)請(qǐng)求數(shù)量限制是調(diào)用LimitingDispatcher 類,其類實(shí)現(xiàn)為

private class LimitingDispatcher(
    private val dispatcher: ExperimentalCoroutineDispatcher,
    private val parallelism: Int,
    private val name: String?,
    override val taskMode: Int
) : ExecutorCoroutineDispatcher(), TaskContext, Executor {
    //同步阻塞隊(duì)列
    private val queue = ConcurrentLinkedQueue<Runnable>()
    //cas計(jì)數(shù)
    private val inFlightTasks = atomic(0)
    
    override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)

    private fun dispatch(block: Runnable, tailDispatch: Boolean) {
        var taskToSchedule = block
        while (true) {

            if (inFlight <= parallelism) {
                //LimitingDispatcher的dispatch方法委托給了DefaultScheduler的dispatchWithContext方法
                dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)
                return
            }
            ......
        }
    }
}

Dispatchers.IO

先看下Dispatchers.IO 的定義

    @JvmStatic
    public val IO: CoroutineDispatcher = DefaultScheduler.IO
    
    
    Internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
    val IO = blocking(systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)))

IODefaultScheduler中的實(shí)現(xiàn) 是調(diào)用blacking()方法,而blacking()方法最終實(shí)現(xiàn)是LimitingDispatcher類,
所以 從源碼可以看出 Dispatchers.DefaultIO 是在同一個(gè)線程中運(yùn)行的,也就是共用相同的線程池。

DefaultIO 都是共享CoroutineScheduler線程池 ,kotlin內(nèi)部實(shí)現(xiàn)了一套線程池兩種調(diào)度策略,主要是通過dispatch方法中的Mode區(qū)分的

Type Mode
Default NON_BLOCKING
IO PROBABLY_BLOCKING
internal enum class TaskMode {

    //執(zhí)行CPU密集型任務(wù)
    NON_BLOCKING,

    //執(zhí)行IO密集型任務(wù)
    PROBABLY_BLOCKING,
}

//CoroutineScheduler類核心源碼
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
      
      ......
      
     if (task.mode == TaskMode.NON_BLOCKING) {
            signalCpuWork() //Dispatchers.Default
     } else {
            signalBlockingWork() // Dispatchers.IO
     }
}

從上述代碼中可以提煉出的是:
1、signalCpuWork()方法處理CPU密集任務(wù),在該方法中根據(jù)CPU密集型任務(wù)處理策略,創(chuàng)建并管理線程以及執(zhí)行任務(wù)
2、signalBlockingWork()方法處理IO密集任務(wù),在該方法中根據(jù)IO密集型任務(wù)處理策略,創(chuàng)建并管理線程以及執(zhí)行任務(wù)

其處理策略如下圖所示:
[圖片上傳失敗...(image-e24d8-1637655771397)]

Dispatchers.Unconfined

任務(wù)執(zhí)行在默認(rèn)的啟動(dòng)線程。之后由調(diào)用resume的線程決定恢復(fù)協(xié)程的線程

internal object Unconfined : CoroutineDispatcher() {
    //為false為不需要dispatch
    override fun isDispatchNeeded(context: CoroutineContext): Boolean = false

    override fun dispatch(context: CoroutineContext, block: Runnable) {
        // 只有當(dāng)調(diào)用yield方法時(shí),Unconfined的dispatch方法才會(huì)被調(diào)用
        // yield() 表示當(dāng)前協(xié)程讓出自己所在的線程給其他協(xié)程運(yùn)行
        val yieldContext = context[YieldContext]
        if (yieldContext != null) {
            yieldContext.dispatcherWasUnconfined = true
            return
        }
        throw UnsupportedOperationException("Dispatchers.Unconfined.dispatch function can only be used by the yield function. " +
            "If you wrap Unconfined dispatcher in your code, make sure you properly delegate " +
            "isDispatchNeeded and dispatch calls.")
    }
}

每一個(gè)協(xié)程都有對(duì)應(yīng)的Continuation實(shí)例,其中的resumeWith用于協(xié)程的恢復(fù),存在于DispatchedContinuation,重點(diǎn)看resumeWith的實(shí)現(xiàn)以及類委托

internal class DispatchedContinuation<in T>(
    @JvmField val dispatcher: CoroutineDispatcher,
    @JvmField val continuation: Continuation<T>//協(xié)程suspend掛起方法產(chǎn)生的Continuation
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
    .....
    override fun resumeWith(result: Result<T>) {
        val context = continuation.context
        val state = result.toState()
        if (dispatcher.isDispatchNeeded(context)) {
            _state = state
            resumeMode = MODE_ATOMIC
            dispatcher.dispatch(context, this)
        } else {
            executeUnconfined(state, MODE_ATOMIC) {
                withCoroutineContext(this.context, countOrElement) {
                    continuation.resumeWith(result)
                }
            }
        }
    }
    ....
}

通過isDispatchNeeded(是否需要dispatchUnconfined=false,defaultIO=true)判斷做不同處理

  • true:調(diào)用協(xié)程的CoroutineDispatcherdispatch方法
  • false:調(diào)用executeUnconfined方法
private inline fun DispatchedContinuation<*>.executeUnconfined(
    contState: Any?, mode: Int, doYield: Boolean = false,
    block: () -> Unit
): Boolean {
    assert { mode != MODE_UNINITIALIZED }
    val eventLoop = ThreadLocalEventLoop.eventLoop
    if (doYield && eventLoop.isUnconfinedQueueEmpty) return false
    return if (eventLoop.isUnconfinedLoopActive) {
        _state = contState
        resumeMode = mode
        eventLoop.dispatchUnconfined(this)
        true
    } else {
        runUnconfinedEventLoop(eventLoop, block = block)
        false
    }
}

threadlocal中取出eventLoopeventLoop和當(dāng)前線程相關(guān)),判斷是否在執(zhí)行Unconfined任務(wù)

  1. 如果在執(zhí)行則調(diào)用EventLoopdispatchUnconfined方法把Unconfined任務(wù)放進(jìn)EventLoop
  2. 如果沒有在執(zhí)行則直接執(zhí)行
internal inline fun DispatchedTask<*>.runUnconfinedEventLoop(
    eventLoop: EventLoop,
    block: () -> Unit
) {
    eventLoop.incrementUseCount(unconfined = true)
    try {
        block()
        while (true) {
            if (!eventLoop.processUnconfinedEvent()) break
        }
    } catch (e: Throwable) {
        handleFatalException(e, null)
    } finally {
        eventLoop.decrementUseCount(unconfined = true)
    }
}

  1. 執(zhí)行block()代碼塊,即上文提到的resumeWith()
  2. 調(diào)用processUnconfinedEvent()方法實(shí)現(xiàn)執(zhí)行剩余的Unconfined任務(wù),直到全部執(zhí)行完畢跳出循環(huán)
    EventLoopCoroutineDispatcher的一個(gè)子類
internal abstract class EventLoop : CoroutineDispatcher() {
    .....
    //雙端隊(duì)列實(shí)現(xiàn)存放Unconfined任務(wù)
    private var unconfinedQueue: ArrayQueue<DispatchedTask<*>>? = null
    //從隊(duì)列的頭部移出Unconfined任務(wù)執(zhí)行
    public fun processUnconfinedEvent(): Boolean {
        val queue = unconfinedQueue ?: return false
        val task = queue.removeFirstOrNull() ?: return false
        task.run()
        return true
    }
    //把Unconfined任務(wù)放進(jìn)隊(duì)列的尾部
    public fun dispatchUnconfined(task: DispatchedTask<*>) {
        val queue = unconfinedQueue ?:
            ArrayQueue<DispatchedTask<*>>().also { unconfinedQueue = it }
        queue.addLast(task)
    }
    .....
}

內(nèi)部通過雙端隊(duì)列實(shí)現(xiàn)存放Unconfined任務(wù)

  1. EventLoopdispatchUnconfined方法用于把Unconfined任務(wù)放進(jìn)隊(duì)列的尾部
  2. processUnconfinedEvent方法用于從隊(duì)列的頭部移出Unconfined任務(wù)執(zhí)行

Dispatchers.Main

kotlinJVM上的實(shí)現(xiàn) Android就需要引入kotlinx-coroutines-android庫(kù),它里面有Android對(duì)應(yīng)的Dispatchers.Main實(shí)現(xiàn),

   public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
   
     @JvmField
    val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()

    private fun loadMainDispatcher(): MainCoroutineDispatcher {
        return try {
            val factories = if (FAST_SERVICE_LOADER_ENABLED) {
                FastServiceLoader.loadMainDispatcherFactory()
            } else {
                ServiceLoader.load(
                        MainDispatcherFactory::class.java,
                        MainDispatcherFactory::class.java.classLoader
                ).iterator().asSequence().toList()
            }
            factories.maxBy { it.loadPriority }?.tryCreateDispatcher(factories)
                ?: MissingMainCoroutineDispatcher(null)
        } catch (e: Throwable) {
            // Service loader can throw an exception as well
            MissingMainCoroutineDispatcher(e)
        }
    }
    
    internal fun loadMainDispatcherFactory(): List<MainDispatcherFactory> {
        val clz = MainDispatcherFactory::class.java
        if (!ANDROID_DETECTED) {
            return load(clz, clz.classLoader)
        }

        return try {
            val result = ArrayList<MainDispatcherFactory>(2)
            createInstanceOf(clz, "kotlinx.coroutines.android.AndroidDispatcherFactory")?.apply { result.add(this) }
            createInstanceOf(clz, "kotlinx.coroutines.test.internal.TestMainDispatcherFactory")?.apply { result.add(this) }
            result
        } catch (e: Throwable) {
            // Fallback to the regular SL in case of any unexpected exception
            load(clz, clz.classLoader)
        }
    }

從上文代碼中主要功能是通過反射獲取AndroidDispatcherFactory 然后根據(jù)加載的優(yōu)先級(jí) 去創(chuàng)建Dispatcher

internal class AndroidDispatcherFactory : MainDispatcherFactory {

    override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
        HandlerContext(Looper.getMainLooper().asHandler(async = true), "Main")

    override fun hintOnError(): String? = "For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used"

    override val loadPriority: Int
        get() = Int.MAX_VALUE / 2
}
internal class HandlerContext private constructor(
    private val handler: Handler,
    private val name: String?,
    private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
   
    public constructor(
        handler: Handler,
        name: String? = null
    ) : this(handler, name, false)

   ......

    override fun dispatch(context: CoroutineContext, block: Runnable) {
        handler.post(block)
    }

    ......
}

從上文代碼中可以提煉出以下信息:createDispatcher調(diào)用HandlerContext類,通過調(diào)用Looper.getMainLooper()獲取handler ,最終通過handler來實(shí)現(xiàn)在主線程中運(yùn)行.
可以得出Dispatchers.Main其實(shí)就是把任務(wù)通過Handler運(yùn)行在Android主線程中的。

總結(jié)

1、Dispatchers.Default,切換線程執(zhí)行CPU密集型任務(wù)
2、Dispatchers.IO,切換線程執(zhí)行IO密集型任務(wù)
3、Dispatchers.Unconfined,任務(wù)執(zhí)行在默認(rèn)的啟動(dòng)線程
4、Dispatchers.Main,切換線程到主線程

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容