協(xié)程和線程的差異
- 線程的目的是提高CPU資源使用率, 使多個(gè)任務(wù)得以并行的運(yùn)行,是為了服務(wù)于機(jī)器的.
- 協(xié)程的目的是為了讓多個(gè)任務(wù)之間更好的協(xié)作,主要體現(xiàn)在代碼邏輯上,是為了服務(wù)開發(fā)者 (能提升資源的利用率, 但并不是原始目的)
協(xié)程的核心競(jìng)爭(zhēng)力
簡(jiǎn)化異步并發(fā)任務(wù)。
協(xié)程上下文 CoroutineContext
- 協(xié)程總是運(yùn)行在一些以
CoroutineContext類型為代表的上下文中 ,協(xié)程上下文是各種不同元素的集合 - 集合內(nèi)部的元素
Element是根據(jù)key去對(duì)應(yīng)(Map特點(diǎn)),但是不允許重復(fù)(Set特點(diǎn)) -
Element之間可以通過+號(hào)進(jìn)行組合 -
Element有如下四類,共同組成了CoroutineContext-
Job:協(xié)程的唯一標(biāo)識(shí),用來控制協(xié)程的生命周期(new、active、completing、completed、cancelling、cancelled) -
CoroutineDispatcher:指定協(xié)程運(yùn)行的線程(IO、Default、Main、Unconfined) -
CoroutineName: 指定協(xié)程的名稱,默認(rèn)為coroutine -
CoroutineExceptionHandler: 指定協(xié)程的異常處理器,用來處理未捕獲的異常
-
它們的關(guān)系如圖所示:
[圖片上傳失敗...(image-9c6306-1637655771397)]
協(xié)程切換線程源碼分析
我們?cè)趨f(xié)程體內(nèi),可能通過withContext與launch方法簡(jiǎn)單便捷的切換線程,用同步的方式寫異步代碼,這也是kotin協(xié)程的主要優(yōu)勢(shì)之一
示例:
private fun testDispatchers() = runBlocking {
Log.d(TAG, "main : I'm working in thread ${Thread.currentThread().name}")
launch(Dispatchers.Default) {
Log.d(TAG, "launch Default : I'm working in thread ${Thread.currentThread().name}")
}
withContext(Dispatchers.Default) {
Log.d(TAG, "withContext Default : I'm working in thread ${Thread.currentThread().name} ")
}
}
輸出結(jié)果為:
[圖片上傳失敗...(image-e89054-1637655771397)]
從輸出結(jié)果可以看出,調(diào)用Dispatch.Default會(huì)由主線程切換到DefaultDispatcher-worker-3線程,而且launch和withContext切換的線程是相同的。
launch方法解析
協(xié)程的發(fā)起方式如下
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job {
//創(chuàng)建協(xié)程上下文Context
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
//創(chuàng)建一個(gè)獨(dú)立協(xié)程并啟動(dòng)
coroutine.start(start, coroutine, block)
return coroutine
}
launch方法主要作用:
1、是創(chuàng)建新的上下文Context
2、創(chuàng)建并啟動(dòng)協(xié)程
組合一個(gè)新的Context
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
//根據(jù)傳入的Context 組合成新的上下文
val combined = coroutineContext + context
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
//如果發(fā)起的時(shí)候沒有傳入調(diào)度器,則使用默認(rèn)的Default
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
}
從上述方法中能夠得出,此方法主要是
1、將launch方法傳入的context與CoroutineScope中的context組合起來
2、若combined中沒傳入一個(gè)調(diào)度器 ,則會(huì)默認(rèn)使用Dispatchers.Default調(diào)度器
創(chuàng)建一個(gè)獨(dú)立協(xié)程Coroutine
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.start(start, coroutine, block)
//繼承抽象協(xié)程類
private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
//省略......
}
//AbstractCoroutine類核心源碼
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T){
initParentJob()
start(block, receiver, this)
}
// CoroutineStart類核心源碼
public operator fun <T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>)
when (this) {
//launch 默認(rèn)為DEFAULT
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(completion)
CoroutineStart.ATOMIC -> block.startCoroutine(completion)
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(completion)
CoroutineStart.LAZY -> Unit // will start lazily
}
創(chuàng)建一個(gè)協(xié)程體 Continuation
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion) {
createCoroutineUnintercepted(receiver, completion)
//如果需要?jiǎng)t進(jìn)行攔截處理
.intercepted()
//調(diào)用 resumeWith 方法
.resumeCancellableWith(Result.success(Unit))
}
調(diào)用createCoroutineUnintercepted,會(huì)把我們的協(xié)程體即suspend block轉(zhuǎn)換成Continuation
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
(this as? ContinuationImpl)?.intercepted() ?: this
//ContinuationImpl類核心源碼
public fun intercepted(): Continuation<Any?> =
intercepted
?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also { intercepted = it }
//CoroutineDispatcher類核心源碼
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
從上述方法可以得出
1.interepted是個(gè)擴(kuò)展方法,最后會(huì)調(diào)用到ContinuationImpl.intercepted方法
2.在intercepted會(huì)利用CoroutineContext,獲取當(dāng)前的調(diào)度器
3.當(dāng)前調(diào)度器是CoroutineDispatcher,最終會(huì)返回一個(gè)DispatchedContinuation,我們也是利用它來實(shí)現(xiàn)線程切換的
調(diào)度處理
//DispatchedContinuation
public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>) = when (this) {
is DispatchedContinuation -> resumeCancellableWith(result)
else -> resumeWith(result)
}
@Suppress("NOTHING_TO_INLINE")
inline fun resumeCancellableWith(result: Result<T>) {
val state = result.toState()
//判斷是否需要切換線程
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_CANCELLABLE
//調(diào)用器進(jìn)行切換線程
dispatcher.dispatch(context, this)
} else {
//Unconfined,會(huì)執(zhí)行該方法
executeUnconfined(state, MODE_CANCELLABLE) {
if (!resumeCancelled()) {
resumeUndispatchedWith(result)
}
}
}
}
上述分析可得出
1、判斷是否需要切換線程,如果需要?jiǎng)t調(diào)用dispatcher.dispatch()方法進(jìn)行切換線程
2、如果不需要切換線程 ,則直接在原有線程執(zhí)行。
withContext方法解析
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
//創(chuàng)建新的content
val oldContext = uCont.context
val newContext = oldContext + context
......
//創(chuàng)建新的調(diào)度協(xié)程
val coroutine = DispatchedCoroutine(newContext, uCont)
//初始化父類Job
coroutine.initParentJob()
//開始一個(gè)可以取消的協(xié)程
block.startCoroutineCancellable(coroutine, coroutine)
coroutine.getResult()
}
private class DispatchedCoroutine<in T>(
context: CoroutineContext,
uCont: Continuation<T>
) : ScopeCoroutine<T>(context, uCont) {
//在complete時(shí)會(huì)會(huì)回調(diào)
override fun afterCompletion(state: Any?) {
afterResume(state)
}
override fun afterResume(state: Any?) {
//uCont就是父協(xié)程,context仍是老版context,因此可以切換回原來的線程上
uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
}
}
從上述方法可以得出,調(diào)用withContext方法最終也是調(diào)用uCont.intercepted().resumeCancellableWith方法與launch方法最后切換線程是相同的,
這里也說明了上面輸出結(jié)果,為什么二者調(diào)用同一調(diào)度器切換的線程是相同的。
也有不相同的時(shí)候,就是當(dāng)線程DefaultDispatcher-worker-1還沒創(chuàng)建成功的時(shí)候,withContext已經(jīng)需要切換線程時(shí),會(huì)再創(chuàng)建一個(gè)新的線程,如下圖所示
[圖片上傳失敗...(image-3e02ba-1637655771397)]
其切換線程的流程圖為:
[圖片上傳失敗...(image-d7223e-1637655771397)]
CoroutineDispatcher 作用
- 用于指定協(xié)程的運(yùn)行線程
-
kotlin已經(jīng)內(nèi)置了CoroutineDispatcher的4個(gè)實(shí)現(xiàn),分別為Dispatchers的Default、IO、Main、Unconfined字段
public actual object Dispatchers {
@JvmStatic
public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
@JvmStatic
public val IO: CoroutineDispatcher = DefaultScheduler.IO
@JvmStatic
public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
@JvmStatic
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
}
[圖片上傳失敗...(image-adf112-1637655771397)]
Dispatchers.Default
Default根據(jù)useCoroutinesScheduler屬性(默認(rèn)為true) 去獲取對(duì)應(yīng)的線程池
-
DefaultScheduler :Kotlin內(nèi)部自己實(shí)現(xiàn)的線程池邏輯 -
CommonPool:Java類庫(kù)中的Executor實(shí)現(xiàn)的線程池邏輯
internal actual fun createDefaultDispatcher(): CoroutineDispatcher =
if (useCoroutinesScheduler) DefaultScheduler else CommonPool
internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
.....
}
open class ExperimentalCoroutineDispatcher(
private val corePoolSize: Int,
private val maxPoolSize: Int,
private val idleWorkerKeepAliveNs: Long,
private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
constructor(
corePoolSize: Int = CORE_POOL_SIZE,
maxPoolSize: Int = MAX_POOL_SIZE,
schedulerName: String = DEFAULT_SCHEDULER_NAME
) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)
......
}
//java類庫(kù)中的Executor實(shí)現(xiàn)線程池邏輯
internal object CommonPool : ExecutorCoroutineDispatcher() {}
如果想使用java類庫(kù)中的線程池該如何使用呢?也就是修改useCoroutinesScheduler屬性為false
internal const val COROUTINES_SCHEDULER_PROPERTY_NAME = "kotlinx.coroutines.scheduler"
internal val useCoroutinesScheduler = systemProp(COROUTINES_SCHEDULER_PROPERTY_NAME).let { value ->
when (value) {
null, "", "on" -> true
"off" -> false
else -> error("System property '$COROUTINES_SCHEDULER_PROPERTY_NAME' has unrecognized value '$value'")
}
}
internal actual fun systemProp(
propertyName: String
): String? =
try {
//獲取系統(tǒng)屬性
System.getProperty(propertyName)
} catch (e: SecurityException) {
null
}
從源碼中可以看到,使用過獲取系統(tǒng)屬性拿到的值, 那我們就可以通過修改系統(tǒng)屬性 去改變useCoroutinesScheduler的值,
具體修改方法為
val properties = Properties()
properties["kotlinx.coroutines.scheduler"] = "off"
System.setProperties(properties)
DefaultScheduler的主要實(shí)現(xiàn)都在其父類 ExperimentalCoroutineDispatcher 中
open class ExperimentalCoroutineDispatcher(
private val corePoolSize: Int,
private val maxPoolSize: Int,
private val idleWorkerKeepAliveNs: Long,
private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
public constructor(
corePoolSize: Int = CORE_POOL_SIZE,
maxPoolSize: Int = MAX_POOL_SIZE,
schedulerName: String = DEFAULT_SCHEDULER_NAME
) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)
//省略......
//創(chuàng)建CoroutineScheduler實(shí)例
private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
override val executor: Executorget() = coroutineScheduler
//此方法也就是上文說到切換線程的方法
override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
try {
//dispatch方法委托到CoroutineScheduler的dispatch方法
coroutineScheduler.dispatch(block)
} catch (e: RejectedExecutionException) {
....
}
//省略......
//實(shí)現(xiàn)請(qǐng)求阻塞,執(zhí)行IO密集型任務(wù)
public fun blocking(parallelism: Int = BLOCKING_DEFAULT_PARALLELISM): CoroutineDispatcher {
require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" }
return LimitingDispatcher(this, parallelism, null, TASK_PROBABLY_BLOCKING)
}
//實(shí)現(xiàn)并發(fā)數(shù)量限制,執(zhí)行CPU密集型任務(wù)
public fun limited(parallelism: Int): CoroutineDispatcher {
require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" }
require(parallelism <= corePoolSize) { "Expected parallelism level lesser than core pool size ($corePoolSize), but have $parallelism" }
return LimitingDispatcher(this, parallelism, null, TASK_NON_BLOCKING)
}
//省略......
}
從上文代碼可以提煉出
1、在ExperimentalCoroutineDispatcher類中創(chuàng)建協(xié)程調(diào)度線程池coroutineScheduler,通過該線程池來管理線程。
2、該類中的dispatch()方法,在協(xié)程切換線程中 dispatcher.dispatch(context, this) 調(diào)用。
3、其中 blocking()方法是執(zhí)行IO密集型任務(wù),limited()方法執(zhí)行CPU密集型任務(wù),
實(shí)現(xiàn)請(qǐng)求數(shù)量限制是調(diào)用LimitingDispatcher 類,其類實(shí)現(xiàn)為
private class LimitingDispatcher(
private val dispatcher: ExperimentalCoroutineDispatcher,
private val parallelism: Int,
private val name: String?,
override val taskMode: Int
) : ExecutorCoroutineDispatcher(), TaskContext, Executor {
//同步阻塞隊(duì)列
private val queue = ConcurrentLinkedQueue<Runnable>()
//cas計(jì)數(shù)
private val inFlightTasks = atomic(0)
override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)
private fun dispatch(block: Runnable, tailDispatch: Boolean) {
var taskToSchedule = block
while (true) {
if (inFlight <= parallelism) {
//LimitingDispatcher的dispatch方法委托給了DefaultScheduler的dispatchWithContext方法
dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)
return
}
......
}
}
}
Dispatchers.IO
先看下Dispatchers.IO 的定義
@JvmStatic
public val IO: CoroutineDispatcher = DefaultScheduler.IO
Internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
val IO = blocking(systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)))
IO在DefaultScheduler中的實(shí)現(xiàn) 是調(diào)用blacking()方法,而blacking()方法最終實(shí)現(xiàn)是LimitingDispatcher類,
所以 從源碼可以看出 Dispatchers.Default和IO 是在同一個(gè)線程中運(yùn)行的,也就是共用相同的線程池。
而Default和IO 都是共享CoroutineScheduler線程池 ,kotlin內(nèi)部實(shí)現(xiàn)了一套線程池兩種調(diào)度策略,主要是通過dispatch方法中的Mode區(qū)分的
| Type | Mode |
|---|---|
| Default | NON_BLOCKING |
| IO | PROBABLY_BLOCKING |
internal enum class TaskMode {
//執(zhí)行CPU密集型任務(wù)
NON_BLOCKING,
//執(zhí)行IO密集型任務(wù)
PROBABLY_BLOCKING,
}
//CoroutineScheduler類核心源碼
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
......
if (task.mode == TaskMode.NON_BLOCKING) {
signalCpuWork() //Dispatchers.Default
} else {
signalBlockingWork() // Dispatchers.IO
}
}
從上述代碼中可以提煉出的是:
1、signalCpuWork()方法處理CPU密集任務(wù),在該方法中根據(jù)CPU密集型任務(wù)處理策略,創(chuàng)建并管理線程以及執(zhí)行任務(wù)
2、signalBlockingWork()方法處理IO密集任務(wù),在該方法中根據(jù)IO密集型任務(wù)處理策略,創(chuàng)建并管理線程以及執(zhí)行任務(wù)
其處理策略如下圖所示:
[圖片上傳失敗...(image-e24d8-1637655771397)]
Dispatchers.Unconfined
任務(wù)執(zhí)行在默認(rèn)的啟動(dòng)線程。之后由調(diào)用resume的線程決定恢復(fù)協(xié)程的線程
internal object Unconfined : CoroutineDispatcher() {
//為false為不需要dispatch
override fun isDispatchNeeded(context: CoroutineContext): Boolean = false
override fun dispatch(context: CoroutineContext, block: Runnable) {
// 只有當(dāng)調(diào)用yield方法時(shí),Unconfined的dispatch方法才會(huì)被調(diào)用
// yield() 表示當(dāng)前協(xié)程讓出自己所在的線程給其他協(xié)程運(yùn)行
val yieldContext = context[YieldContext]
if (yieldContext != null) {
yieldContext.dispatcherWasUnconfined = true
return
}
throw UnsupportedOperationException("Dispatchers.Unconfined.dispatch function can only be used by the yield function. " +
"If you wrap Unconfined dispatcher in your code, make sure you properly delegate " +
"isDispatchNeeded and dispatch calls.")
}
}
每一個(gè)協(xié)程都有對(duì)應(yīng)的Continuation實(shí)例,其中的resumeWith用于協(xié)程的恢復(fù),存在于DispatchedContinuation,重點(diǎn)看resumeWith的實(shí)現(xiàn)以及類委托
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>//協(xié)程suspend掛起方法產(chǎn)生的Continuation
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation {
.....
override fun resumeWith(result: Result<T>) {
val context = continuation.context
val state = result.toState()
if (dispatcher.isDispatchNeeded(context)) {
_state = state
resumeMode = MODE_ATOMIC
dispatcher.dispatch(context, this)
} else {
executeUnconfined(state, MODE_ATOMIC) {
withCoroutineContext(this.context, countOrElement) {
continuation.resumeWith(result)
}
}
}
}
....
}
通過isDispatchNeeded(是否需要dispatch,Unconfined=false,default,IO=true)判斷做不同處理
-
true:調(diào)用協(xié)程的CoroutineDispatcher的dispatch方法 -
false:調(diào)用executeUnconfined方法
private inline fun DispatchedContinuation<*>.executeUnconfined(
contState: Any?, mode: Int, doYield: Boolean = false,
block: () -> Unit
): Boolean {
assert { mode != MODE_UNINITIALIZED }
val eventLoop = ThreadLocalEventLoop.eventLoop
if (doYield && eventLoop.isUnconfinedQueueEmpty) return false
return if (eventLoop.isUnconfinedLoopActive) {
_state = contState
resumeMode = mode
eventLoop.dispatchUnconfined(this)
true
} else {
runUnconfinedEventLoop(eventLoop, block = block)
false
}
}
從threadlocal中取出eventLoop(eventLoop和當(dāng)前線程相關(guān)),判斷是否在執(zhí)行Unconfined任務(wù)
- 如果在執(zhí)行則調(diào)用
EventLoop的dispatchUnconfined方法把Unconfined任務(wù)放進(jìn)EventLoop中 - 如果沒有在執(zhí)行則直接執(zhí)行
internal inline fun DispatchedTask<*>.runUnconfinedEventLoop(
eventLoop: EventLoop,
block: () -> Unit
) {
eventLoop.incrementUseCount(unconfined = true)
try {
block()
while (true) {
if (!eventLoop.processUnconfinedEvent()) break
}
} catch (e: Throwable) {
handleFatalException(e, null)
} finally {
eventLoop.decrementUseCount(unconfined = true)
}
}
- 執(zhí)行
block()代碼塊,即上文提到的resumeWith() - 調(diào)用
processUnconfinedEvent()方法實(shí)現(xiàn)執(zhí)行剩余的Unconfined任務(wù),直到全部執(zhí)行完畢跳出循環(huán)
EventLoop是CoroutineDispatcher的一個(gè)子類
internal abstract class EventLoop : CoroutineDispatcher() {
.....
//雙端隊(duì)列實(shí)現(xiàn)存放Unconfined任務(wù)
private var unconfinedQueue: ArrayQueue<DispatchedTask<*>>? = null
//從隊(duì)列的頭部移出Unconfined任務(wù)執(zhí)行
public fun processUnconfinedEvent(): Boolean {
val queue = unconfinedQueue ?: return false
val task = queue.removeFirstOrNull() ?: return false
task.run()
return true
}
//把Unconfined任務(wù)放進(jìn)隊(duì)列的尾部
public fun dispatchUnconfined(task: DispatchedTask<*>) {
val queue = unconfinedQueue ?:
ArrayQueue<DispatchedTask<*>>().also { unconfinedQueue = it }
queue.addLast(task)
}
.....
}
內(nèi)部通過雙端隊(duì)列實(shí)現(xiàn)存放Unconfined任務(wù)
-
EventLoop的dispatchUnconfined方法用于把Unconfined任務(wù)放進(jìn)隊(duì)列的尾部 -
processUnconfinedEvent方法用于從隊(duì)列的頭部移出Unconfined任務(wù)執(zhí)行
Dispatchers.Main
kotlin在JVM上的實(shí)現(xiàn) Android就需要引入kotlinx-coroutines-android庫(kù),它里面有Android對(duì)應(yīng)的Dispatchers.Main實(shí)現(xiàn),
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
@JvmField
val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()
private fun loadMainDispatcher(): MainCoroutineDispatcher {
return try {
val factories = if (FAST_SERVICE_LOADER_ENABLED) {
FastServiceLoader.loadMainDispatcherFactory()
} else {
ServiceLoader.load(
MainDispatcherFactory::class.java,
MainDispatcherFactory::class.java.classLoader
).iterator().asSequence().toList()
}
factories.maxBy { it.loadPriority }?.tryCreateDispatcher(factories)
?: MissingMainCoroutineDispatcher(null)
} catch (e: Throwable) {
// Service loader can throw an exception as well
MissingMainCoroutineDispatcher(e)
}
}
internal fun loadMainDispatcherFactory(): List<MainDispatcherFactory> {
val clz = MainDispatcherFactory::class.java
if (!ANDROID_DETECTED) {
return load(clz, clz.classLoader)
}
return try {
val result = ArrayList<MainDispatcherFactory>(2)
createInstanceOf(clz, "kotlinx.coroutines.android.AndroidDispatcherFactory")?.apply { result.add(this) }
createInstanceOf(clz, "kotlinx.coroutines.test.internal.TestMainDispatcherFactory")?.apply { result.add(this) }
result
} catch (e: Throwable) {
// Fallback to the regular SL in case of any unexpected exception
load(clz, clz.classLoader)
}
}
從上文代碼中主要功能是通過反射獲取AndroidDispatcherFactory 然后根據(jù)加載的優(yōu)先級(jí) 去創(chuàng)建Dispatcher
internal class AndroidDispatcherFactory : MainDispatcherFactory {
override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
HandlerContext(Looper.getMainLooper().asHandler(async = true), "Main")
override fun hintOnError(): String? = "For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used"
override val loadPriority: Int
get() = Int.MAX_VALUE / 2
}
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
public constructor(
handler: Handler,
name: String? = null
) : this(handler, name, false)
......
override fun dispatch(context: CoroutineContext, block: Runnable) {
handler.post(block)
}
......
}
從上文代碼中可以提煉出以下信息:createDispatcher調(diào)用HandlerContext類,通過調(diào)用Looper.getMainLooper()獲取handler ,最終通過handler來實(shí)現(xiàn)在主線程中運(yùn)行.
可以得出Dispatchers.Main其實(shí)就是把任務(wù)通過Handler運(yùn)行在Android主線程中的。
總結(jié)
1、Dispatchers.Default,切換線程執(zhí)行CPU密集型任務(wù)
2、Dispatchers.IO,切換線程執(zhí)行IO密集型任務(wù)
3、Dispatchers.Unconfined,任務(wù)執(zhí)行在默認(rèn)的啟動(dòng)線程
4、Dispatchers.Main,切換線程到主線程