破解 Kotlin 協(xié)程(3) - 協(xié)程調(diào)度篇

關(guān)鍵詞:Kotlin 異步編程 協(xié)程

上一篇我們知道了協(xié)程啟動(dòng)的幾種模式,也通過(guò)示例認(rèn)識(shí)了 launch 啟動(dòng)協(xié)程的使用方法,本文將延續(xù)這些內(nèi)容從調(diào)度的角度來(lái)進(jìn)一步為大家揭示協(xié)程的奧義。

image

1. 協(xié)程上下文

調(diào)度器本質(zhì)上就是一個(gè)協(xié)程上下文的實(shí)現(xiàn),我們先來(lái)介紹下上下文。

前面我們提到 launch 函數(shù)有三個(gè)參數(shù),第一個(gè)參數(shù)叫 上下文,它的接口類(lèi)型是 CoroutineContext,通常我們見(jiàn)到的上下文的類(lèi)型是 CombinedContext 或者 EmptyCoroutineContext,一個(gè)表示上下文的組合,另一個(gè)表示什么都沒(méi)有。我們來(lái)看下 CoroutineContext 的接口方法:

@SinceKotlin("1.3")
public interface CoroutineContext {
    public operator fun <E : Element> get(key: Key<E>): E?
    public fun <R> fold(initial: R, operation: (R, Element) -> R): R
    public operator fun plus(context: CoroutineContext): CoroutineContext = ...
    public fun minusKey(key: Key<*>): CoroutineContext

    public interface Key<E : Element>

    public interface Element : CoroutineContext {
        public val key: Key<*>
        ...
    }
}

不知道大家有沒(méi)有發(fā)現(xiàn),它簡(jiǎn)直就是一個(gè)以 Key 為索引的 List

CoroutineContext List
get(Key) get(Int)
plus(CoroutineContext) plus(List)
minusKey(Key) removeAt(Int)

表中的 List.plus(List) 實(shí)際上指的是擴(kuò)展方法 Collection<T>.plus(elements: Iterable<T>): List<T>

CoroutineContext 作為一個(gè)集合,它的元素就是源碼中看到的 Element,每一個(gè) Element 都有一個(gè) key,因此它可以作為元素出現(xiàn),同時(shí)它也是 CoroutineContext 的子接口,因此也可以作為集合出現(xiàn)。

講到這里,大家就會(huì)明白,CoroutineContext 原來(lái)是個(gè)數(shù)據(jù)結(jié)構(gòu)啊。如果大家對(duì)于 List 的遞歸定義比較熟悉的話(huà),那么對(duì)于 CombinedContextEmptyCoroutineContext 也就很容易理解了,例如 scala 的 List是這么定義的:

sealed abstract class List[+A] extends ... {
    ...
    def head: A
    def tail: List[A]
    ...
}

在模式匹配的時(shí)候,List(1,2,3,4) 是可以匹配 x::y 的,x 就是 1,y 則是 List(2,3,4)。

CombinedContext 的定義也非常類(lèi)似:

internal class CombinedContext(
    private val left: CoroutineContext,
    private val element: Element
) : CoroutineContext, Serializable {
    ...
}

只不過(guò)它是反過(guò)來(lái)的,前面是集合,后面是單獨(dú)的一個(gè)元素。我們?cè)趨f(xié)程體里面訪(fǎng)問(wèn)到的 coroutineContext 大多是這個(gè) CombinedContext 類(lèi)型,表示有很多具體的上下文實(shí)現(xiàn)的集合,我們?nèi)绻胍业侥骋粋€(gè)特別的上下文實(shí)現(xiàn),就需要用對(duì)應(yīng)的 Key 來(lái)查找,例如:

suspend fun main(){
    GlobalScope.launch {
        println(coroutineContext[Job]) // "coroutine#1":StandaloneCoroutine{Active}@1ff62014
    }
    println(coroutineContext[Job]) // null,suspend main 雖然也是協(xié)程體,但它是更底層的邏輯,因此沒(méi)有 Job 實(shí)例
}

這里的 Job 實(shí)際上是對(duì)它的 companion object 的引用

public interface Job : CoroutineContext.Element {
    /**
     * Key for [Job] instance in the coroutine context.
     */
    public companion object Key : CoroutineContext.Key<Job> { ... }
    ...
}

所以我們也可以仿照 Thread.currentThread() 來(lái)一個(gè)獲取當(dāng)前 Job 的方法:

suspend inline fun Job.Key.currentJob() = coroutineContext[Job]

suspend fun coroutineJob(){
    GlobalScope.launch {
        log(Job.currentJob())
    }
    log(Job.currentJob())
}

我們可以通過(guò)指定上下文為協(xié)程添加一些特性,一個(gè)很好的例子就是為協(xié)程添加名稱(chēng),方便調(diào)試:

GlobalScope.launch(CoroutineName("Hello")) {
    ...
}

如果有多個(gè)上下文需要添加,直接用 + 就可以了:

GlobalScope.launch(Dispatchers.Main + CoroutineName("Hello")) {
    ...
}

Dispatchers.Main 是調(diào)度器的一個(gè)實(shí)現(xiàn),不用擔(dān)心,我們很快就會(huì)認(rèn)識(shí)它了。

2. 協(xié)程攔截器

費(fèi)了好大勁兒說(shuō)完上下文,這里就要說(shuō)一個(gè)比較特殊的存在了——攔截器。

public interface ContinuationInterceptor : CoroutineContext.Element {
    companion object Key : CoroutineContext.Key<ContinuationInterceptor>
    
    public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
    ...
}

攔截器也是一個(gè)上下文的實(shí)現(xiàn)方向,攔截器可以左右你的協(xié)程的執(zhí)行,同時(shí)為了保證它的功能的正確性,協(xié)程上下文集合永遠(yuǎn)將它放在最后面,這真可謂是天選之子了。

它攔截協(xié)程的方法也很簡(jiǎn)單,因?yàn)閰f(xié)程的本質(zhì)就是回調(diào) + “黑魔法”,而這個(gè)回調(diào)就是被攔截的 Continuation 了。用過(guò) OkHttp 的小伙伴一下就興奮了,攔截器我常用的啊,OkHttp 用攔截器做緩存,打日志,還可以模擬請(qǐng)求,協(xié)程攔截器也是一樣的道理。調(diào)度器就是基于攔截器實(shí)現(xiàn)的,換句話(huà)說(shuō)調(diào)度器就是攔截器的一種。

我們可以自己定義一個(gè)攔截器放到我們的協(xié)程上下文中,看看會(huì)發(fā)生什么。

class MyContinuationInterceptor: ContinuationInterceptor{
    override val key = ContinuationInterceptor
    override fun <T> interceptContinuation(continuation: Continuation<T>) = MyContinuation(continuation)
}

class MyContinuation<T>(val continuation: Continuation<T>): Continuation<T> {
    override val context = continuation.context
    override fun resumeWith(result: Result<T>) {
        log("<MyContinuation> $result" )
        continuation.resumeWith(result)
    }
}

我們只是在回調(diào)處打了一行日志。接下來(lái)我們把用例拿出來(lái):

suspend fun main() {
    GlobalScope.launch(MyContinuationInterceptor()) {
        log(1)
        val job = async {
            log(2)
            delay(1000)
            log(3)
            "Hello"
        }
        log(4)
        val result = job.await()
        log("5. $result")
    }.join()
    log(6)
}

這可能是迄今而止我們給出的最復(fù)雜的例子了,不過(guò)請(qǐng)大家不要被它嚇到,它依然很簡(jiǎn)單。我們通過(guò) launch 啟動(dòng)了一個(gè)協(xié)程,為它指定了我們自己的攔截器作為上下文,緊接著在其中用 async 啟動(dòng)了一個(gè)協(xié)程,asynclaunch 從功能上是同等類(lèi)型的函數(shù),它們都被稱(chēng)作協(xié)程的 Builder 函數(shù),不同之處在于 async 啟動(dòng)的 Job 也就是實(shí)際上的 Deferred 可以有返回結(jié)果,可以通過(guò) await 方法獲取。

可想而知,result 的值就是 Hello。那么這段程序運(yùn)行的結(jié)果如何呢?

15:31:55:989 [main] <MyContinuation> Success(kotlin.Unit)  // ①
15:31:55:992 [main] 1
15:31:56:000 [main] <MyContinuation> Success(kotlin.Unit) // ②
15:31:56:000 [main] 2
15:31:56:031 [main] 4
15:31:57:029 [kotlinx.coroutines.DefaultExecutor] <MyContinuation> Success(kotlin.Unit) // ③
15:31:57:029 [kotlinx.coroutines.DefaultExecutor] 3
15:31:57:031 [kotlinx.coroutines.DefaultExecutor] <MyContinuation> Success(Hello) // ④
15:31:57:031 [kotlinx.coroutines.DefaultExecutor] 5. Hello
15:31:57:031 [kotlinx.coroutines.DefaultExecutor] 6

“// ①” 不是程序輸出的內(nèi)容,僅為后續(xù)講解方便而做的標(biāo)注。

大家可能就要奇怪了,你不是說(shuō) Continuation 是回調(diào)么,這里面回調(diào)調(diào)用也就一次?。?code>await 那里),怎么日志打印了四次呢?

別慌,我們按順序給大家介紹。

首先,所有協(xié)程啟動(dòng)的時(shí)候,都會(huì)有一次 Continuation.resumeWith 的操作,這一次操作對(duì)于調(diào)度器來(lái)說(shuō)就是一次調(diào)度的機(jī)會(huì),我們的協(xié)程有機(jī)會(huì)調(diào)度到其他線(xiàn)程的關(guān)鍵之處就在于此。 ①、② 兩處都是這種情況。

其次,delay 是掛起點(diǎn),1000ms 之后需要繼續(xù)調(diào)度執(zhí)行該協(xié)程,因此就有了 ③ 處的日志。

最后,④ 處的日志就很容易理解了,正是我們的返回結(jié)果。

可能有朋友還會(huì)有疑問(wèn),我并沒(méi)有在攔截器當(dāng)中切換線(xiàn)程,為什么從 ③ 處開(kāi)始有了線(xiàn)程切換的操作?這個(gè)切換線(xiàn)程的邏輯源自于 delay,在 JVM 上 delay 實(shí)際上是在一個(gè) ScheduledExcecutor 里面添加了一個(gè)延時(shí)任務(wù),因此會(huì)發(fā)生線(xiàn)程切換;而在 JavaScript 環(huán)境中則是基于 setTimeout,如果運(yùn)行在 Nodejs 上,delay 就不會(huì)切線(xiàn)程了,畢竟人家是單線(xiàn)程的。

如果我們?cè)跀r截器當(dāng)中自己處理了線(xiàn)程切換,那么就實(shí)現(xiàn)了自己的一個(gè)簡(jiǎn)單的調(diào)度器,大家有興趣可以自己去嘗試。

思考:攔截器可以有多個(gè)嗎?

3. 調(diào)度器

3.1 概述

有了前面的基礎(chǔ),我們對(duì)于調(diào)度器的介紹就變得水到渠成了。

public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
    ...
    public abstract fun dispatch(context: CoroutineContext, block: Runnable)
    ...
}

它本身是協(xié)程上下文的子類(lèi),同時(shí)實(shí)現(xiàn)了攔截器的接口, dispatch 方法會(huì)在攔截器的方法 interceptContinuation 中調(diào)用,進(jìn)而實(shí)現(xiàn)協(xié)程的調(diào)度。所以如果我們想要實(shí)現(xiàn)自己的調(diào)度器,繼承這個(gè)類(lèi)就可以了,不過(guò)通常我們都用現(xiàn)成的,它們定義在 Dispatchers 當(dāng)中:

val Default: CoroutineDispatcher
val Main: MainCoroutineDispatcher
val Unconfined: CoroutineDispatcher

這個(gè)類(lèi)的定義涉及到了 Kotlin MPP 的支持,因此你在 Jvm 版本當(dāng)中還會(huì)看到 val IO: CoroutineDispatcher,在 js 和 native 當(dāng)中就只有前面提到的這三個(gè)了(對(duì) Jvm 好偏心吶)。

Jvm Js Native
Default 線(xiàn)程池 主線(xiàn)程循環(huán) 主線(xiàn)程循環(huán)
Main UI 線(xiàn)程 與 Default 相同 與 Default 相同
Unconfined 直接執(zhí)行 直接執(zhí)行 直接執(zhí)行
IO 線(xiàn)程池 -- --
  • IO 僅在 Jvm 上有定義,它基于 Default 調(diào)度器背后的線(xiàn)程池,并實(shí)現(xiàn)了獨(dú)立的隊(duì)列和限制,因此協(xié)程調(diào)度器從 Default 切換到 IO 并不會(huì)觸發(fā)線(xiàn)程切換。
  • Main 主要用于 UI 相關(guān)程序,在 Jvm 上包括 Swing、JavaFx、Android,可將協(xié)程調(diào)度到各自的 UI 線(xiàn)程上。
  • Js 本身就是單線(xiàn)程的事件循環(huán),與 Jvm 上的 UI 程序比較類(lèi)似。

3.2 編寫(xiě) UI 相關(guān)程序

Kotlin 的用戶(hù)絕大多數(shù)都是 Android 開(kāi)發(fā)者,大家對(duì) UI 的開(kāi)發(fā)需求還是比較大的。我們舉一個(gè)很常見(jiàn)的場(chǎng)景,點(diǎn)擊一個(gè)按鈕做點(diǎn)兒異步的操作再回調(diào)刷新 UI:

getUserBtn.setOnClickListener { 
    getUser { user ->
        handler.post {
            userNameView.text = user.name
        }
    }
}

我們簡(jiǎn)單得給出 getUser 函數(shù)的聲明:

typealias Callback = (User) -> Unit

fun getUser(callback: Callback){
    ...
}

由于 getUser 函數(shù)需要切到其他線(xiàn)程執(zhí)行,因此回調(diào)通常也會(huì)在這個(gè)非 UI 的線(xiàn)程中調(diào)用,所以為了確保 UI 正確被刷新,我們需要用 handler.post 切換到 UI 線(xiàn)程。上面的寫(xiě)法就是我們最古老的寫(xiě)法了。

后來(lái)又有了 RxJava,那么事情開(kāi)始變得有趣了起來(lái):

fun getUserObservable(): Observable<User> {
    return Observable.create<User> { emitter ->
        getUser {
            emitter.onNext(it)
        }
    }
}

于是點(diǎn)擊按鈕的事件可以這么寫(xiě):

getUserBtn.setOnClickListener {
    getUserObservable()
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe { user ->
                userNameView.text = user.name
            }
}

其實(shí) RxJava 在線(xiàn)程切換上的表現(xiàn)是非常優(yōu)秀的,也正是如此,很多人甚至用它只是為了切線(xiàn)程方便!

那么我們現(xiàn)在把這段代碼過(guò)渡到協(xié)程的寫(xiě)法:

suspend fun getUserCoroutine() = suspendCoroutine<User> {
    continuation ->
    getUser {
        continuation.resume(it)
    }
}

按鈕點(diǎn)擊時(shí),我們可以:

getUserBtn.setOnClickListener {
    GlobalScope.launch(Dispatchers.Main) {
        userNameView.text = getUserCoroutine().name
    }
}

大家也可以用 anko-coroutines 當(dāng)中的 View.onClick 擴(kuò)展,這樣我們就無(wú)需自己在這里用 launch 啟動(dòng)協(xié)程了。有關(guān) Anko 對(duì)協(xié)程的支持,我們后面專(zhuān)門(mén)安排一篇文章介紹。

這里又有大家沒(méi)見(jiàn)過(guò)的內(nèi)容啦,suspendCoroutine 這個(gè)方法并不是幫我們啟動(dòng)協(xié)程的,它運(yùn)行在協(xié)程當(dāng)中并且?guī)臀覀儷@取到當(dāng)前協(xié)程的 Continuation 實(shí)例,也就是拿到回調(diào),方便后面我們調(diào)用它的 resume 或者 resumeWithException 來(lái)返回結(jié)果或者拋出異常。

如果你重復(fù)調(diào)用 resume 或者 resumeWithException 會(huì)收獲一枚 IllegalStateException,仔細(xì)想想這是為什么。

對(duì)比前面的 RxJava 的做法,你會(huì)發(fā)現(xiàn)這段代碼其實(shí)很容易理解,你甚至?xí)l(fā)現(xiàn)協(xié)程的使用場(chǎng)景與 RxJava 竟是如此的相似。這里我們用到了 Dispatchers.Main 來(lái)確保 launch 啟動(dòng)的協(xié)程在調(diào)度時(shí)始終調(diào)度到 UI 線(xiàn)程,那么下面我們來(lái)看看 Dispatchers.Main 的具體實(shí)現(xiàn)。

在 Jvm 上,Main 的實(shí)現(xiàn)也比較有意思:

internal object MainDispatcherLoader {
    @JvmField
    val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()

    private fun loadMainDispatcher(): MainCoroutineDispatcher {
        return try {
            val factories = MainDispatcherFactory::class.java.let { clz ->
                ServiceLoader.load(clz, clz.classLoader).toList()
            }
            factories.maxBy { it.loadPriority }?.tryCreateDispatcher(factories)
                ?: MissingMainCoroutineDispatcher(null)
        } catch (e: Throwable) {
            MissingMainCoroutineDispatcher(e)
        }
    }
}

在 Android 當(dāng)中,協(xié)程框架通過(guò)注冊(cè) AndroidDispatcherFactory 使得 Main 最終被賦值為 HandlerDispatcher 的實(shí)例,有興趣的可以去看下 kotlinx-coroutines-android 的源碼實(shí)現(xiàn)。

注意前面對(duì)于 RxJava 和協(xié)程的實(shí)現(xiàn),我們都沒(méi)有考慮異常和取消的問(wèn)題。有關(guān)異常和取消的話(huà)題,我們會(huì)在后面的文章中詳細(xì)介紹。

3.3 綁定到任意線(xiàn)程的調(diào)度器

調(diào)度器的目的就是切線(xiàn)程,你不要想著我在 dispatch 的時(shí)候根據(jù)自己的心情來(lái)隨機(jī)調(diào)用,那你是在害你自己(不怕各位笑話(huà),這樣的代碼我還真寫(xiě)過(guò),僅供娛樂(lè))。那么問(wèn)題就簡(jiǎn)單了,我們只要提供線(xiàn)程,調(diào)度器就應(yīng)該很方便的創(chuàng)建出來(lái):

suspend fun main() {
    val myDispatcher= Executors.newSingleThreadExecutor{ r -> Thread(r, "MyThread") }.asCoroutineDispatcher()
    GlobalScope.launch(myDispatcher) {
        log(1)
    }.join()
    log(2)
}

輸出的信息就表明協(xié)程運(yùn)行在我們自己的線(xiàn)程上。

16:10:57:130 [MyThread] 1
16:10:57:136 [MyThread] 2

不過(guò)請(qǐng)大家注意,由于這個(gè)線(xiàn)程池是我們自己創(chuàng)建的,因此我們需要在合適的時(shí)候關(guān)閉它,不然的話(huà):

image

我們可以通過(guò)主動(dòng)關(guān)閉線(xiàn)程池或者調(diào)用:

myDispatcher.close()

來(lái)結(jié)束它的生命周期,再次運(yùn)行程序就會(huì)正常退出了。

image

當(dāng)然有人會(huì)說(shuō)你創(chuàng)建的線(xiàn)程池的線(xiàn)程不是 daemon 的,所以主線(xiàn)程結(jié)束時(shí) Jvm 不會(huì)停止運(yùn)行。說(shuō)的沒(méi)錯(cuò),但該釋放的還是要及時(shí)釋放,如果你只是在程序的整個(gè)生命周期當(dāng)中短暫的用了一下這個(gè)調(diào)度器,那么一直不關(guān)閉它對(duì)應(yīng)的線(xiàn)程池豈不是會(huì)有線(xiàn)程泄露嗎?這就很尷尬了。

Kotlin 協(xié)程設(shè)計(jì)者也特別害怕大家注意不到這一點(diǎn),還特地廢棄了兩個(gè) API 并且開(kāi)了一個(gè) issue 說(shuō)我們要重做這套 API,這兩個(gè)可憐的家伙是誰(shuí)呢?

廢棄的兩個(gè)基于線(xiàn)程池創(chuàng)建調(diào)度器的 API

fun newSingleThreadContext(name: String): ExecutorCoroutineDispatcher
fun newFixedThreadPoolContext(nThreads: Int, name: String): ExecutorCoroutineDispatcher

這二者可以很方便的創(chuàng)建綁定到特定線(xiàn)程的調(diào)度器,但過(guò)于簡(jiǎn)潔的 API 似乎會(huì)讓人忘記它的風(fēng)險(xiǎn)。Kotlin 一向不愛(ài)做這種不清不楚的事兒,所以您呢,還是像我們這一節(jié)例子當(dāng)中那樣自己去構(gòu)造線(xiàn)程池吧,這樣好歹自己忘了關(guān)閉也怨不著別人(哈哈哈)。

其實(shí)在多個(gè)線(xiàn)程上運(yùn)行協(xié)程,線(xiàn)程總是這樣切來(lái)切去其實(shí)并不會(huì)顯得很輕量級(jí),例如下面的例子就是比較可怕的了:

Executors.newFixedThreadPool(10)
        .asCoroutineDispatcher().use { dispatcher ->
            GlobalScope.launch(dispatcher) {
                log(1)
                val job = async {
                    log(2)
                    delay(1000)
                    log(3)
                    "Hello"
                }
                log(4)
                val result = job.await()
                log("5. $result")
            }.join()
            log(6)
        }

這里面除了 delay 那里有一次不可避免的線(xiàn)程切換外,其他幾處協(xié)程掛起點(diǎn)的繼續(xù)操作(Continuation.resume)都會(huì)切線(xiàn)程:

16:28:04:771 [pool-1-thread-1] 1
16:28:04:779 [pool-1-thread-1] 4
16:28:04:779 [pool-1-thread-2] 2
16:28:05:790 [pool-1-thread-3] 3
16:28:05:793 [pool-1-thread-4] 5. Hello
16:28:05:794 [pool-1-thread-4] 6

如果我們的線(xiàn)程池只開(kāi) 1 個(gè)線(xiàn)程,那么這里所有的輸出都將在這唯一的線(xiàn)程中打?。?/p>

16:40:14:685 [pool-1-thread-1] 1
16:40:14:706 [pool-1-thread-1] 4
16:40:14:710 [pool-1-thread-1] 2
16:40:15:723 [pool-1-thread-1] 3
16:40:15:725 [pool-1-thread-1] 5. Hello
16:40:15:725 [pool-1-thread-1] 6

對(duì)比這二者,10個(gè)線(xiàn)程的情況線(xiàn)程切換次數(shù)最少 3次,而 1 個(gè)線(xiàn)程的情況則只要 delay 1000ms 之后恢復(fù)執(zhí)行的時(shí)候那一次。只是多兩次線(xiàn)程切換,到底會(huì)有多大影響呢?我在我自己的 2015 款 mbp 上對(duì)于兩種不同的情況分別循環(huán)運(yùn)行 100 次,得到的平均時(shí)間如下:

線(xiàn)程數(shù) 10 1
耗時(shí)ms 1006.00 1004.97

注意,為了測(cè)試的公平性,在運(yùn)行 100 次循環(huán)之前已經(jīng)做好了預(yù)熱,確保所有類(lèi)都已經(jīng)加載。測(cè)試結(jié)果僅供參考。

也就是說(shuō)多兩次線(xiàn)程切換平均能多出 1ms 的耗時(shí)。生產(chǎn)環(huán)境當(dāng)中的代碼當(dāng)然會(huì)更復(fù)雜,如果這樣用線(xiàn)程池去調(diào)度,結(jié)果可想而知。

實(shí)際上通常我們只需要在一個(gè)線(xiàn)程當(dāng)中處理自己的業(yè)務(wù)邏輯,只有一些耗時(shí)的 IO 才需要切換到 IO 線(xiàn)程中處理,所以好的做法可以參考 UI 對(duì)應(yīng)的調(diào)度器,自己通過(guò)線(xiàn)程池定義調(diào)度器的做法本身沒(méi)什么問(wèn)題,但最好只用一個(gè)線(xiàn)程,因?yàn)槎嗑€(xiàn)程除了前面說(shuō)的線(xiàn)程切換的開(kāi)銷(xiāo)外,還有線(xiàn)程安全的問(wèn)題。

3.4 線(xiàn)程安全問(wèn)題

Js 和 Native 的并發(fā)模型與 Jvm 不同,Jvm 暴露了線(xiàn)程 API 給用戶(hù),這也使得協(xié)程的調(diào)度可以由用戶(hù)更靈活的選擇。越多的自由,意味著越多的代價(jià),我們?cè)?Jvm 上面編寫(xiě)協(xié)程代碼時(shí)需要明白一點(diǎn)的是,線(xiàn)程安全問(wèn)題在調(diào)度器不同的協(xié)程之間仍然存在。

好的做法,就像我們前面一節(jié)提到的,盡量把自己的邏輯控制在一個(gè)線(xiàn)程之內(nèi),這樣一方面節(jié)省了線(xiàn)程切換的開(kāi)銷(xiāo),另一方面還可以避免線(xiàn)程安全問(wèn)題,兩全其美。

如果大家在協(xié)程代碼中使用鎖之類(lèi)的并發(fā)工具就反而增加了代碼的復(fù)雜度,對(duì)此我的建議是大家在編寫(xiě)協(xié)程代碼時(shí)盡量避免對(duì)外部作用域的可變變量進(jìn)行引用,盡量使用參數(shù)傳遞而非對(duì)全局變量進(jìn)行引用。

以下是一個(gè)錯(cuò)誤的例子,大家很容易就能想明白:

suspend fun main(){
    var i = 0
    Executors.newFixedThreadPool(10)
            .asCoroutineDispatcher().use { dispatcher ->
                List(1000000) {
                    GlobalScope.launch(dispatcher) {
                        i++
                    }
                }.forEach {
                    it.join()
                }
            }
    log(i)
}

輸出的結(jié)果:

16:59:28:080 [main] 999593

4. suspend main 函數(shù)如何調(diào)度?

上一篇文章我們提到了 suspend main 會(huì)啟動(dòng)一個(gè)協(xié)程,我們示例中的協(xié)程都是它的子協(xié)程,可是這個(gè)最外層的協(xié)程到底是怎么來(lái)的呢?

我們先給出一個(gè)例子:

suspend fun main() {
    log(1)
    GlobalScope.launch {
        log(2)
    }.join()
    log(3)
}

它等價(jià)于下面的寫(xiě)法:

fun main() {
    runSuspend {
        log(1)
        GlobalScope.launch {
            log(2)
        }.join()
        log(3)
    }
}

那你說(shuō)這個(gè) runSuspend 又是何妨神圣?它是 Kotlin 標(biāo)準(zhǔn)庫(kù)的一個(gè)方法,注意它不是 kotlinx.coroutines 當(dāng)中的,它實(shí)際上屬于更底層的 API 了。

internal fun runSuspend(block: suspend () -> Unit) {
    val run = RunSuspend()
    block.startCoroutine(run)
    run.await()
}

而這里面的 RunSuspend 則是 Continuation 的實(shí)現(xiàn):

private class RunSuspend : Continuation<Unit> {
    override val context: CoroutineContext
        get() = EmptyCoroutineContext

    var result: Result<Unit>? = null

    override fun resumeWith(result: Result<Unit>) = synchronized(this) {
        this.result = result
        (this as Object).notifyAll()
    }

    fun await() = synchronized(this) {
        while (true) {
            when (val result = this.result) {
                null -> (this as Object).wait()
                else -> {
                    result.getOrThrow() // throw up failure
                    return
                }
            }
        }
    }
}

它的上下文是空的,因此 suspend main 啟動(dòng)的協(xié)程并不會(huì)有任何調(diào)度行為。

通過(guò)這個(gè)例子我們可以知道,實(shí)際上啟動(dòng)一個(gè)協(xié)程只需要有一個(gè) lambda 表達(dá)式就可以了,想當(dāng)年 Kotlin 1.1 剛發(fā)布的時(shí)候,我寫(xiě)了一系列的教程都是以標(biāo)準(zhǔn)庫(kù) API 為基礎(chǔ)的,后來(lái)發(fā)現(xiàn)標(biāo)準(zhǔn)庫(kù)的 API 也許真的不是給我們用的,所以看看就好。

上述代碼在標(biāo)準(zhǔn)庫(kù)當(dāng)中被修飾為 internal,因此我們無(wú)法直接使用它們。不過(guò)你可以把 RunSuspend.kt 當(dāng)中的內(nèi)容復(fù)制到你的工程當(dāng)中,這樣你就可以直接使用啦,其中的 var result: Result<Unit>? = null 可能會(huì)報(bào)錯(cuò),沒(méi)關(guān)系,改成 private var result: Result<Unit>? = null 就可以了。

5. 小結(jié)

在這篇文章當(dāng)中,我們介紹了協(xié)程上下文,介紹了攔截器,進(jìn)而最終引出了我們的調(diào)度器,截止目前,我們還有異常處理、協(xié)程取消、Anko 對(duì)協(xié)程的支持等話(huà)題沒(méi)有講到,如果大家有協(xié)程相關(guān)想了解的話(huà)題,可以留言哈~


歡迎關(guān)注 Kotlin 中文社區(qū)!

中文官網(wǎng):https://www.kotlincn.net/

中文官方博客:https://www.kotliner.cn/

公眾號(hào):Kotlin

知乎專(zhuān)欄:Kotlin

CSDN:Kotlin中文社區(qū)

掘金:Kotlin中文社區(qū)

簡(jiǎn)書(shū):Kotlin中文社區(qū)

開(kāi)發(fā)者頭條:Kotlin中文社區(qū)

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容