3.協(xié)程的調(diào)度(2)

協(xié)程上下文源代碼

public interface CoroutineContext {
    //從該上下文返回具有給定[鍵]的元素或' null '
    public operator fun <E : Element> get(key: Key<E>): E?
    //從左到右添加元素
    public fun <R> fold(initial: R, operation: (R, Element) -> R): R
    //刪除此上下文
    public operator fun plus(context: CoroutineContext): CoroutineContext =
    if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
        context.fold(this) { acc, element ->
            val removed = acc.minusKey(element.key)
            if (removed === EmptyCoroutineContext) element else {
                // make sure interceptor is always last in the context (and thus is fast to get when present)
                val interceptor = removed[ContinuationInterceptor]
                if (interceptor == null) CombinedContext(removed, element) else {
                    val left = removed.minusKey(ContinuationInterceptor)
                    if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                        CombinedContext(CombinedContext(left, element), interceptor)
                }
            }
        }
    //返回包含來自此上下文的元素的上下文
    public fun minusKey(key: Key<*>): CoroutineContext
    //上下文元素的鍵
    public interface Key<E : Element>
    //上下文的一個元素,本身就是一個單例的協(xié)程上下文
    public interface Element : CoroutineContext {
    /**
     * A key of this coroutine context element.
     */
    public val key: Key<*>

    public override operator fun <E : Element> get(key: Key<E>): E? =
        @Suppress("UNCHECKED_CAST")
        if (this.key == key) this as E else null

    public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
        operation(initial, this)

    public override fun minusKey(key: Key<*>): CoroutineContext =
        if (this.key == key) EmptyCoroutineContext else this
    }
}

通過源碼我們可以看出協(xié)程上下文是一個跟list類似的數(shù)據(jù)結(jié)構(gòu)
CoroutineContext 是元素Element的集合,每一個Element都有一個key,同時Element 又實現(xiàn)了CoroutineContext 的接口,它自身也是一個協(xié)程的上下文,因此也可以作為集合出現(xiàn)。
協(xié)程上下文關(guān)鍵的幾個子類


clipboard.png

1.協(xié)程攔截器

refrofit接口定義

suspend fun getMessage3(@Query("city") city: String): WeatherEntity

轉(zhuǎn)化java代碼

Object getMessage3(@Query("city") @NotNull String var1, @NotNull Continuation var2);

我們大膽猜測協(xié)程的本質(zhì)就是回調(diào) + “黑魔法
如何查看Continuation在線程調(diào)度過程中做了些什么,這時候就要利用攔截器
調(diào)度器就是基于攔截器實現(xiàn)的

public interface ContinuationInterceptor : CoroutineContext.Element {
    //上下文攔截器的鍵
    companion object Key : CoroutineContext.Key<ContinuationInterceptor>
    //攔截操作,Continuation很重要
    public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
    //釋放攔截器
    public fun releaseInterceptedContinuation(continuation: Continuation<*>) {
        /* do nothing by default */
    }

    // 重寫get方法
    public override operator fun <E : CoroutineContext.Element> get(key: CoroutineContext.Key<E>): E? =
        @Suppress("UNCHECKED_CAST")
        if (key === Key) this as E else null

    // 重寫minusKey方法
    public override fun minusKey(key: CoroutineContext.Key<*>): CoroutineContext =
        if (key === Key) EmptyCoroutineContext else this
}
Continuation的源碼
public interface Continuation<in T> {
    /**
     * 與此Continuation相對應(yīng)的協(xié)程上下文
     */
    public val context: CoroutineContext

    /**
     * 繼續(xù)執(zhí)行相應(yīng)的協(xié)程,將一個成功或失敗的[result]作為最后一個掛起點的返回值
     */
    public fun resumeWith(result: Result<T>)
}

我們可以自定義攔截器,看一下具體調(diào)用

class MyContinuation<T>(private val continuation: Continuation<T>) :Continuation<T>{
    override val context=continuation.context
    override fun resumeWith(result: Result<T>) {
        log("MyContinuation:$result")
        continuation.resumeWith(result)
    }
}
class MyContinuationInterceptor: ContinuationInterceptor {
    override val key=ContinuationInterceptor
    override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =MyContinuation(continuation)
}
suspend fun main(){
    GlobalScope.launch(MyContinuationInterceptor()) {
        log(1)
        val async = async {
            log(2)
            delay(1000)
            log(3)
            "hello"
        }
        log(4)
        val result = async.await()
        log("5---$result")
    }.join()
    log(6)
}

打印結(jié)果

22:25:29:804 [main] MyContinuation:Success(kotlin.Unit)   //①
22:25:29:819 [main] 1
22:25:29:850 [main] MyContinuation:Success(kotlin.Unit)  //②
22:25:29:850 [main] 2
22:25:29:897 [main] 4
22:25:30:897 [kotlinx.coroutines.DefaultExecutor] MyContinuation:Success(kotlin.Unit)//③
22:25:30:897 [kotlinx.coroutines.DefaultExecutor] 3
22:25:30:897 [kotlinx.coroutines.DefaultExecutor] MyContinuation:Success(hello)//④
22:25:30:897 [kotlinx.coroutines.DefaultExecutor] 5---hello
22:25:30:897 [kotlinx.coroutines.DefaultExecutor] 6

打印結(jié)果分析
所有協(xié)程啟動都會有一次resumeWith,所以打?、?,Result<T>此時為Success(kotlin.Unit)
由于是join,所以打印1
async 又啟動一個協(xié)程,所以打?、赗esult<T>,Result<T>此時為Success(kotlin.Unit)
打印2,delay函數(shù)是掛起函數(shù)
打印4
delay函數(shù)的掛起函數(shù)恢復(fù)繼續(xù),打?、?,Result<T>此時為Success(kotlin.Unit)
打印3
async.await()是掛起函數(shù),打印④,Result<T>此時為Success(hello)
打印5---hello
打印6
思考為什么從③處開始線程切換

public suspend fun delay(timeMillis: Long) {
    if (timeMillis <= 0) return // don't delay
    return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
        cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
    }
}

2.協(xié)程調(diào)度器

public abstract class CoroutineDispatcher :AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
      //如果需要執(zhí)行diapatch方法,返回true
      public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
      //由給定的上下文執(zhí)行Runnable代碼塊在另外的線程中,此方法不會立即執(zhí)行
      public abstract fun dispatch(context: CoroutineContext, block: Runnable)
      @InternalCoroutinesApi
      public open fun dispatchYield(context: CoroutineContext, block: Runnable) = dispatch(context, block)
      public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =DispatchedContinuation(this, continuation)
      @InternalCoroutinesApi
      public override fun releaseInterceptedContinuation(continuation: Continuation<*>) {
      (continuation as DispatchedContinuation<*>).reusableCancellableContinuation?.detachChild()
     }
     public operator fun plus(other: CoroutineDispatcher) = other
     override fun toString(): String = "$classSimpleName@$hexAddress"
}

它本身是協(xié)程上下文的子類,同時實現(xiàn)了攔截器的接口, dispatch 方法會在攔截器的方法 interceptContinuation 中調(diào)用,進而實現(xiàn)協(xié)程的調(diào)度
kotlin在Android中提供以下四種CoroutineDispatcher

public actual object Dispatchers {
    @JvmStatic
    //協(xié)程默認的調(diào)度器,是線程池,默認等于cpu內(nèi)核的數(shù)量,最少兩個
    public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
    @JvmStatic
    //Android當中的主線程即ui線程
    public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
    @JvmStatic
    //無指定派發(fā)線程,會根據(jù)運行時的上線文環(huán)境決定
    public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
    @JvmStatic
    //用于執(zhí)行阻塞線程的IO線程池,默認限制為64或者cpu內(nèi)核數(shù)量(取最大)
    public val IO: CoroutineDispatcher = DefaultScheduler.IO
}

也可以自定義CoroutineDispatcher

suspend fun main(){
    val dispatcher =
        Executors.newSingleThreadExecutor { r -> Thread(r, "MyThread") }.asCoroutineDispatcher()
    GlobalScope.launch(dispatcher) {
        log(1)
    }.join()
    log(2)
    //由于這個線程池是我們自己創(chuàng)建的,因此我們需要在合適的時候關(guān)閉它
    dispatcher.close()
}

協(xié)程如果運行在多線程中一樣會有兩個問題
1.線程切換的開銷問題
2.多線程安全問題
舉例多線程開銷問題

suspend fun main(){
    val dispatcher = Executors.newFixedThreadPool(10).asCoroutineDispatcher()
    GlobalScope.launch (dispatcher){
        log(1)
        val async = async {
            log(2)
            delay(1000)
            log(3)
            "hello"
        }
        log(4)
        val result = async.await()
        log("5$result")
    }.join()
    log(6)
}

打印

21:52:54:234 [pool-1-thread-1] 1
21:52:54:287 [pool-1-thread-1] 4
21:52:54:313 [pool-1-thread-2] 2
21:52:55:339 [pool-1-thread-3] 3
21:52:55:352 [pool-1-thread-4] 5hello
21:52:55:352 [pool-1-thread-4] 6

線程切了四次,掛起函數(shù)的繼續(xù)操作都會切換線程
所以我們在實際開發(fā)中要根據(jù)具體情況選用合適的CoroutineDispatcher
舉例多線程安全問題

suspend fun main(){
    val dispatcher = Executors.newFixedThreadPool(10).asCoroutineDispatcher()
    var i=0
    dispatcher.use {
        List(1000000){
            GlobalScope.launch(dispatcher) {
                i++
            }
        }.forEach {
            it.join()
        }
    }
    log(i)
    dispatcher.close()
}

打印
22:14:56:583 [main] 999881
解決方案1 單線程的CoroutineDispatcher操作數(shù)據(jù)(其他邏輯可以放在多線程,數(shù)據(jù)操作放在單線程比如此處的i++)
解決方案2 使用kotlin中線程安全的數(shù)據(jù)結(jié)構(gòu)

suspend fun main(){
    val dispatcher = Executors.newFixedThreadPool(10).asCoroutineDispatcher()
    var i= AtomicInteger()
    dispatcher.use {
        List(1000000){
            GlobalScope.launch(dispatcher) {
                i.incrementAndGet()
            }
        }.forEach {
            it.join()
        }
    }
    log(i)
    dispatcher.close()
}

與此相關(guān)的數(shù)據(jù)結(jié)構(gòu)


clipboard.png

解決方案3 利用互斥鎖

suspend fun main(){
    val dispatcher = Executors.newFixedThreadPool(10).asCoroutineDispatcher()
    var i=0
    val mutex = Mutex()
    dispatcher.use {
        List(1000000){
            GlobalScope.launch(dispatcher) {
                // 用鎖保護每次自增
                mutex.withLock {
                    i++
                }
            }
        }.forEach {
            it.join()
        }
    }
    log(i)
    dispatcher.close()
}

解決方案4 協(xié)程方式處理線程安全的actor,actor 在高負載下比鎖更有效

// 計數(shù)器 Actor 的各種類型
sealed class CounterMsg
object IncCounter : CounterMsg() // 遞增計數(shù)器的單向消息
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // 攜帶回復(fù)的請求
// 這個函數(shù)啟動一個新的計數(shù)器 actor
fun CoroutineScope.counterActor() = actor<CounterMsg> {
    var counter = 0 // actor 狀態(tài)
    for (msg in channel) { // 即將到來消息的迭代器
        when (msg) {
            is IncCounter -> counter++
            is GetCounter -> msg.response.complete(counter)
        }
    }
}
suspend fun main(){
    val dispatcher = Executors.newFixedThreadPool(10).asCoroutineDispatcher()
    var scope= CoroutineScope(EmptyCoroutineContext)
    scope.launch {
        val counter = counterActor() // 創(chuàng)建該 actor
        dispatcher.use {
            List(1000000){
                GlobalScope.launch(dispatcher) {
                    counter.send(IncCounter)
                }
            }.forEach {
                it.join()
            }
        }
        // 發(fā)送一條消息以用來從一個 actor 中獲取計數(shù)值
        val response = CompletableDeferred<Int>()
        counter.send(GetCounter(response))
        log("Counter = ${response.await()}")
        counter.close() // 關(guān)閉該actor
        dispatcher.close()
    }.join()
}

第四種稍微偏難,了解即可,本質(zhì)還是用到kotlin協(xié)程里面的SendChannel

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容