Kotlin協(xié)程

Kotlin協(xié)程是一種編程思想,其中一個(gè)比較重要的應(yīng)用場景就是線程控制。
以往我們?cè)谛枰l(fā)的時(shí)候,往往會(huì)用到j(luò)ava 的Executor和Android 的AsyncTask, 這樣會(huì)導(dǎo)致跟直接使用Thead一樣的困難和麻煩
1、線程什么時(shí)候結(jié)束
2、線程間的互相通信
3、多個(gè)線程的管理

然后就有了比較出名的Rxjava,它采用[Obervable]的編程范式的鏈?zhǔn)秸{(diào)用方式,解決了callback比較多的難題。而同樣Kotlin 協(xié)程也能夠解決這個(gè)問題,而且它以同步的方式寫異步代碼,從代碼角度來看結(jié)構(gòu)更清晰。

舉個(gè)例子:

coroutineScope.launch(Dispatcher.Main){
val user = withContext(Dispatcher.IO){
        api.login() // IO 線程執(zhí)行請(qǐng)求
}
nameTv.text = user.name // 主線程更新ui
}

是不是比較簡單,另外Kotlin協(xié)程還可以處理并行的請(qǐng)求處理,比如

coroutineScope.launch(Dispatcher.Main){
val avatar = async { api.getAvatar(user)}
val logo = async { api.getLogo(user)}
show(avatar.await(), logo.await())
}

async函數(shù)返回的是Deferred類型,意思是延時(shí),稍后拿到結(jié)果,取結(jié)果需調(diào)用Deferred.await()方法。

suspend 函數(shù)
suspend標(biāo)注的函數(shù)是一個(gè)耗時(shí)函數(shù),該函數(shù)只能在kotlin協(xié)程函數(shù)或者是另外一個(gè)suspend函數(shù)里面被調(diào)用。而真正實(shí)施線程切換的是withContext(Dispatchers.IO)。

這里先介紹一下Dispatchers.IO是個(gè)什么東西

public val IO: CoroutineDispatcher = DefaultScheduler.IO
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
public actual val Default: CoroutineDispatcher = createDefaultDispatcher()

CoroutineDispatcher 是所有Coroutine Dispatcher的父類

public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
// 判斷是否需求分發(fā)
 public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
//分發(fā)runnable
 public abstract fun dispatch(context: CoroutineContext, block: Runnable)
//創(chuàng)建一個(gè)DispatchedContinuation
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        DispatchedContinuation(this, continuation)

 public operator fun plus(other: CoroutineDispatcher): CoroutineDispatcher = other
}
 val IO: CoroutineDispatcher = LimitingDispatcher(
        this,
        systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),
        "Dispatchers.IO",
        TASK_PROBABLY_BLOCKING
    )

這里知道Dispatchers.IO是個(gè)CoroutineDispatcher的實(shí)例就行了,具體它是如何實(shí)現(xiàn)線程間的調(diào)度的我們稍后再說。
我們先看啟動(dòng)一個(gè)協(xié)程的過程

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
//調(diào)用內(nèi)部的plus方法生成一個(gè)新的CoroutineContext
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

private open class StandaloneCoroutine(
    parentContext: CoroutineContext,
    active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
    override fun handleJobException(exception: Throwable): Boolean {
        handleCoroutineException(context, exception)
        return true
    }
}
//AbstractCoroutine
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
        initParentJob()
// 調(diào)用CoroutineStart里面的invoke方法
        start(block, receiver, this)
    }
//CoroutineStart
 public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
        when (this) {
            DEFAULT -> block.startCoroutineCancellable(completion)
            ATOMIC -> block.startCoroutine(completion)
            UNDISPATCHED -> block.startCoroutineUndispatched(completion)
            LAZY -> Unit // will start lazily
        }

public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) {
createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))

}
//1、
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
    receiver: R, completion: Continuation<T>
): Continuation<Unit> {
//createCoroutineUnintercepted 創(chuàng)建一個(gè)Continuation對(duì)象,就是我們反編譯的時(shí)候的那個(gè)create方法
    return if (this is BaseContinuationImpl) create(receiver, completion) else // ...
}
 @NotNull
   public final Continuation create(@NotNull FlowCollector $this$create, Object it, @NotNull Continuation continuation) {
      PlantListViewModel$$special$$inlined$flatMapLatest$1 var4 = new PlantListViewModel$$special$$inlined$flatMapLatest$1(continuation, this.$plantRepository$inlined);
      var4.p$ = $this$create;
      var4.p$0 = it;
      return var4;
   }
//2、
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
    (this as? ContinuationImpl)?.intercepted() ?: this
//ContinuationImpl
  public fun intercepted(): Continuation<Any?> =
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also { intercepted = it }
//CoroutineDispatcher
   public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        DispatchedContinuation(this, continuation)
//3、DispatchedContinuation
public fun <T> Continuation<T>.resumeCancellableWith(
    result: Result<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) {
    is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
    else -> resumeWith(result)
}

 inline fun resumeCancellableWith(
        result: Result<T>,
        noinline onCancellation: ((cause: Throwable) -> Unit)?
    ) {
        val state = result.toState(onCancellation)
//這里判斷是否需要切換線程
        if (dispatcher.isDispatchNeeded(context)) {
            _state = state
            resumeMode = MODE_CANCELLABLE
      //調(diào)用對(duì)應(yīng)的調(diào)度器,分發(fā)事件,實(shí)現(xiàn)線程切換
            dispatcher.dispatch(context, this)
        } else {
            executeUnconfined(state, MODE_CANCELLABLE) {
                if (!resumeCancelled(state)) {
//不需要切換
                    resumeUndispatchedWith(result)
                }
            }
        }
    }
 inline fun resumeUndispatchedWith(result: Result<T>) {
        withCoroutineContext(context, countOrElement) {
            continuation.resumeWith(result)
        }
    }

continuation.resumeWith()調(diào)用的是SuspendLambda.resumeWith,然后它調(diào)用的是父類的BaseContinuationImpl里面的resumeWith

//BaseContinuationImpl
 public final override fun resumeWith(result: Result<Any?>) {
        // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
        var current = this
        var param = result
        while (true) {
            // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
            // can precisely track what part of suspended callstack was already resumed
            probeCoroutineResumed(current)
            with(current) {
                val completion = completion!! // fail fast when trying to resume continuation without completion
                val outcome: Result<Any?> =
                    try {
/// 調(diào)用SuspendLambda.invokeSuspend方法
                        val outcome = invokeSuspend(param)
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                if (completion is BaseContinuationImpl) {
                    // unrolling recursion via loop
                    current = completion
                    param = outcome
                } else {
                    // top-level completion reached -- invoke and return
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }

協(xié)程的線程調(diào)度

主線程

public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
//MainDispatcherLoader
 
 val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()
 private fun loadMainDispatcher(): MainCoroutineDispatcher {
        return try {
           // 初始化factories,List<MainDispatcherFactory>
            factories.maxBy { it.loadPriority }?.tryCreateDispatcher(factories)
                ?: createMissingDispatcher()
        } catch (e: Throwable) {
            // Service loader can throw an exception as well
            createMissingDispatcher(e)
        }
    }
public fun MainDispatcherFactory.tryCreateDispatcher(factories: List<MainDispatcherFactory>): MainCoroutineDispatcher =
    try {
        createDispatcher(factories)
    } catch (cause: Throwable) {
        createMissingDispatcher(cause, hintOnError())
    }
//AndroidDispatcherFactory
 override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
        HandlerContext(Looper.getMainLooper().asHandler(async = true))
//HandlerContext

internal class HandlerContext private constructor(
    private val handler: Handler,
    private val name: String?,
    private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
    public constructor(
        handler: Handler,
        name: String? = null
    ) : this(handler, name, false)
//invokeImmediately 默認(rèn)為false
    override fun isDispatchNeeded(context: CoroutineContext): Boolean {
        return !invokeImmediately || Looper.myLooper() != handler.looper
    }
// 用handler分發(fā)事件
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        handler.post(block)
    }

    override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
        val block = Runnable {
            with(continuation) { resumeUndispatched(Unit) }
        }
        handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
        continuation.invokeOnCancellation { handler.removeCallbacks(block) }
    }

    override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle {
        handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
        return object : DisposableHandle {
            override fun dispose() {
                handler.removeCallbacks(block)
            }
        }
    }

主線程的線程調(diào)度,使用的是handler進(jìn)行消息的分發(fā)。

Dispatchers.IO

//DefaultScheduler
 val IO: CoroutineDispatcher = LimitingDispatcher(
        this,
        systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),
        "Dispatchers.IO",
        TASK_PROBABLY_BLOCKING
    )
//ExperimentalCoroutineDispatcher
public open class ExperimentalCoroutineDispatcher(
    private val corePoolSize: Int,
    private val maxPoolSize: Int,
    private val idleWorkerKeepAliveNs: Long,
    private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
    public constructor(
        corePoolSize: Int = CORE_POOL_SIZE,
        maxPoolSize: Int = MAX_POOL_SIZE,
        schedulerName: String = DEFAULT_SCHEDULER_NAME
    ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)

    @Deprecated(message = "Binary compatibility for Ktor 1.0-beta", level = DeprecationLevel.HIDDEN)
    public constructor(
        corePoolSize: Int = CORE_POOL_SIZE,
        maxPoolSize: Int = MAX_POOL_SIZE
    ) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS)

    override val executor: Executor
        get() = coroutineScheduler

    // This is variable for test purposes, so that we can reinitialize from clean state
    private var coroutineScheduler = createScheduler()

    override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
        try {
//分發(fā)事件
            coroutineScheduler.dispatch(block)
        } catch (e: RejectedExecutionException) {
            // CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
            // for testing purposes, so we don't have to worry about cancelling the affected Job here.
            DefaultExecutor.dispatch(context, block)
        }

    override fun dispatchYield(context: CoroutineContext, block: Runnable): Unit =
        try {
            coroutineScheduler.dispatch(block, tailDispatch = true)
        } catch (e: RejectedExecutionException) {
            // CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
            // for testing purposes, so we don't have to worry about cancelling the affected Job here.
            DefaultExecutor.dispatchYield(context, block)
        }
//...
    internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) {
        try {
            coroutineScheduler.dispatch(block, context, tailDispatch)
        } catch (e: RejectedExecutionException) {
            // CoroutineScheduler only rejects execution when it is being closed and this behavior is reserved
            // for testing purposes, so we don't have to worry about cancelling the affected Job here.
            // TaskContext shouldn't be lost here to properly invoke before/after task
            DefaultExecutor.enqueue(coroutineScheduler.createTask(block, context))
        }
    }

    private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
}
// CoroutineScheduler 繼承至Executor 線程池
internal class CoroutineScheduler(
    @JvmField val corePoolSize: Int,
    @JvmField val maxPoolSize: Int,
    @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
    @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {
 override fun execute(command: Runnable) = dispatch(command)

fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
        trackTask() // this is needed for virtual time support
// task是繼承runnable
        val task = createTask(block, taskContext)
        // try to submit the task to the local queue and act depending on the result
        // Worker 繼承 Thread, 里面有一個(gè)local queue
        val currentWorker = currentWorker()
//先加入到local queue里面
        val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
        if (notAdded != null) {
// 加入到local queue失敗,就加入到global queue里面
            if (!addToGlobalQueue(notAdded)) {
                // Global queue is closed in the last step of close/shutdown -- no more tasks should be accepted
                throw RejectedExecutionException("$schedulerName was terminated")
            }
        }
        val skipUnpark = tailDispatch && currentWorker != null
        // Checking 'task' instead of 'notAdded' is completely okay
        if (task.mode == TASK_NON_BLOCKING) {
            if (skipUnpark) return
            signalCpuWork()
        } else {
            // Increment blocking tasks anyway
            signalBlockingWork(skipUnpark = skipUnpark)
        }
    }
}

Dispatchers.IO復(fù)雜一點(diǎn),內(nèi)部實(shí)現(xiàn)了線程池Executor,通過線程池進(jìn)行事件的分發(fā),原理也是類似的。
到此,Dispatchers.IO也分析完畢。

最后,說一下協(xié)程容易搞錯(cuò)的一些問題
1). 協(xié)程支持并發(fā)嗎?開啟多個(gè)協(xié)程改變某一個(gè)值,對(duì)值有什么影響?有什么辦法可以避免?
首先,協(xié)程肯定是可以支持并發(fā)的,Dispatchers.IO以及Dispatchers.Default內(nèi)部都是采用線程池來執(zhí)行的,默認(rèn)的核心線程數(shù)是CUP的核數(shù)。既然支持并發(fā),那對(duì)同時(shí)對(duì)某個(gè)值進(jìn)行賦值,肯定會(huì)導(dǎo)致該值混亂。那么遇到這種情況,我們有幾種辦法處理

  1. 可以采用單線程的模式,比如Dispatchers.Unconfined,或者withContext()的方式啟動(dòng)協(xié)程。雖然launch()和withContext()都可以開啟一個(gè)協(xié)程,但是launch是并行啟動(dòng),而withContext卻是串行的。用async{}.await()也是可以的,await()是一個(gè)掛起函數(shù),效果類似withContext().
  2. 使用Mutex,也就是加鎖機(jī)制,和java的synchronized一樣。使用mutex.withLock{*}實(shí)現(xiàn)數(shù)據(jù)同步。當(dāng)然用Atomic也是可行的。

2). 啟動(dòng)協(xié)程的方式有l(wèi)aunch 、withContext、async三種,各有什么區(qū)別?
launch是一個(gè)非suspend函數(shù),也就是說它是非阻塞式的,可以并行執(zhí)行
withContext是一個(gè)suspend函數(shù),所以它是串行的
async不使用await()的時(shí)候跟launch一樣,使用await()的時(shí)候跟withContext一樣,因?yàn)閍wait是一個(gè)suspend函數(shù)。async適用于多個(gè)并行任務(wù),而且需要等待返回結(jié)果的情況下。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • [TOC] 簡介 Coroutines are computer program components that ...
    Whyn閱讀 6,176評(píng)論 5 15
  • 為什么要搞出和用協(xié)程呢 是節(jié)省CPU,避免系統(tǒng)內(nèi)核級(jí)的線程頻繁切換,造成的CPU資源浪費(fèi)。好鋼用在刀刃上。而協(xié)程是...
    靜默的小貓閱讀 907評(píng)論 0 2
  • 協(xié)程是輕量級(jí)的線程。 kotlin協(xié)程是kotlin的擴(kuò)展庫(kotlinx.coroutines)。 線程在An...
    付小影子閱讀 6,626評(píng)論 0 4
  • 在今年的三月份,我因?yàn)樾枰獮轫?xiàng)目搭建一個(gè)新的網(wǎng)絡(luò)請(qǐng)求框架開始接觸 Kotlin 協(xié)程。那時(shí)我司項(xiàng)目中同時(shí)存在著兩種...
    Android開發(fā)指南閱讀 1,004評(píng)論 0 2
  • 我是黑夜里大雨紛飛的人啊 1 “又到一年六月,有人笑有人哭,有人歡樂有人憂愁,有人驚喜有人失落,有的覺得收獲滿滿有...
    陌忘宇閱讀 8,889評(píng)論 28 54

友情鏈接更多精彩內(nèi)容