Kotlin Coroutines(協(xié)程) 完全解析(五),協(xié)程的并發(fā)

Kotlin Coroutines(協(xié)程) 完全解析系列:

Kotlin Coroutines(協(xié)程) 完全解析(一),協(xié)程簡介

Kotlin Coroutines(協(xié)程) 完全解析(二),深入理解協(xié)程的掛起、恢復(fù)與調(diào)度

Kotlin Coroutines(協(xié)程) 完全解析(三),封裝異步回調(diào)、協(xié)程間關(guān)系及協(xié)程的取消

Kotlin Coroutines(協(xié)程) 完全解析(四),協(xié)程的異常處理

Kotlin Coroutines(協(xié)程) 完全解析(五),協(xié)程的并發(fā)

本文基于 Kotlin v1.3.0-rc-146,Kotlin-Coroutines v1.0.0-RC1

通過前面幾篇文章可以明白協(xié)程就是可以掛起和恢復(fù)執(zhí)行的運(yùn)算邏輯,掛起函數(shù)用狀態(tài)機(jī)的方式用掛起點(diǎn)將協(xié)程的運(yùn)算邏輯拆分為不同的片段,每次運(yùn)行協(xié)程執(zhí)行的不同的邏輯片段。所以協(xié)程在運(yùn)行時(shí)只是線程中的一塊代碼,線程的并發(fā)處理方式都可以用在協(xié)程上。不過協(xié)程還提供兩種特有的方式,一是不阻塞線程的互斥鎖Mutex,一是通過 ThreadLocal 實(shí)現(xiàn)的協(xié)程局部數(shù)據(jù)。

1. Mutex

線程中鎖都是阻塞式,在沒有獲取鎖時(shí)無法執(zhí)行其他邏輯,而協(xié)程可以通過掛起函數(shù)解決這個(gè),沒有獲取鎖就掛起協(xié)程,獲取后再恢復(fù)協(xié)程,協(xié)程掛起時(shí)線程并沒有阻塞可以執(zhí)行其他邏輯。這種互斥鎖就是Mutex,它與synchronized關(guān)鍵字有些類似,還提供了withLock擴(kuò)展函數(shù),替代常用的mutex.lock; try {...} finally { mutex.unlock() }:

fun main(args: Array<String>) = runBlocking<Unit> {
    val mutex = Mutex()
    var counter = 0
    repeat(10000) {
        GlobalScope.launch {
            mutex.withLock {
                counter ++
            }
        }
    }
    println("The final count is $counter")
}

Mutex的使用比較簡單,不過需要注意的是多個(gè)協(xié)程競爭的應(yīng)該是同一個(gè)Mutex互斥鎖。

2. 協(xié)程局部數(shù)據(jù)

線程中可以使用ThreadLocal作為線程局部數(shù)據(jù),每個(gè)線程中的數(shù)據(jù)都是獨(dú)立的。協(xié)程中可以通過ThreadLocal.asContextElement()擴(kuò)展函數(shù)實(shí)現(xiàn)協(xié)程局部數(shù)據(jù),每次協(xié)程切換會恢復(fù)之前的值。先看下面的示例:

fun main(args: Array<String>) = runBlocking<Unit> {
    val threadLocal = ThreadLocal<String>().apply { set("Init") }
    printlnValue(threadLocal)
    val job = GlobalScope.launch(threadLocal.asContextElement("launch")) {
        printlnValue(threadLocal)
        threadLocal.set("launch changed")
        printlnValue(threadLocal)
        yield()
        printlnValue(threadLocal)
    }
    job.join()
    printlnValue(threadLocal)
}

private fun printlnValue(threadLocal: ThreadLocal<String>) {
    println("${Thread.currentThread()} thread local value: ${threadLocal.get()}")
}

輸出如下:

Thread[main,5,main] thread local value: Init
Thread[DefaultDispatcher-worker-1,5,main] thread local value: launch
Thread[DefaultDispatcher-worker-1,5,main] thread local value: launch changed
Thread[DefaultDispatcher-worker-2,5,main] thread local value: launch
Thread[main,5,main] thread local value: Init

上面的輸出有個(gè)疑問的地方,為什么執(zhí)行yield()掛起函數(shù)后 threadLocal 的值不是launch changed而變回了launch?

下面直接分析源碼:

// 注意這里 value 的默認(rèn)值是 ThreadLocal 當(dāng)前值
public fun <T> ThreadLocal<T>.asContextElement(value: T = get()): ThreadContextElement<T> =
    ThreadLocalElement(value, this)

internal class ThreadLocalElement<T>(
    private val value: T,
    private val threadLocal: ThreadLocal<T>
) : ThreadContextElement<T> {
    override val key: CoroutineContext.Key<*> = ThreadLocalKey(threadLocal)

    override fun updateThreadContext(context: CoroutineContext): T {
        val oldState = threadLocal.get()
// 設(shè)置 threadLocal 的值為 value 前先保存了之前的值
        threadLocal.set(value)
        return oldState
    }

    override fun restoreThreadContext(context: CoroutineContext, oldState: T) {
// 將 threadLocal 修改為之前保存的值
        threadLocal.set(oldState)
    }
    ...
}

// 協(xié)程啟動和恢復(fù)都會用此函數(shù)包裝,在 Dispatched.run()、DisptchedContinuation.resumeWith() 、
// DisptchedContinuation.resumeUndispatched() 等協(xié)程啟動和恢復(fù)的地方都可以發(fā)現(xiàn)此函數(shù)的蹤影
internal actual inline fun <T> withCoroutineContext(context: CoroutineContext, countOrElement: Any?, block: () -> T): T {
// updateThreadContext() 函數(shù)會調(diào)用到 ThreadContextElement.updateThreadContext(context)
// oldValue 是 threadLocal 之前的值
    val oldValue = updateThreadContext(context, countOrElement)
    try {
        return block()
    } finally {
// restoreThreadContext() 函數(shù)會調(diào)用到 ThreadContextElement.restoreThreadContext(context, oldValue)
        restoreThreadContext(context, oldValue)
    }
}

根據(jù)上面的源碼和斷點(diǎn)調(diào)試,可以發(fā)現(xiàn)協(xié)程的啟動和恢復(fù)都會執(zhí)行一次ThreadContextElement.updateThreadContext(context)ThreadContextElement.restoreThreadContext(context, oldValue),現(xiàn)在再分析一次上面的代碼運(yùn)行:

fun main(args: Array<String>) = runBlocking<Unit> {
    val threadLocal = ThreadLocal<String>().apply { set("Init") }
// 此時(shí)在 main 線程,threadLocal 的值為 Init
    printlnValue(threadLocal)
    val job = GlobalScope.launch(threadLocal.asContextElement("launch")) {
// 啟動協(xié)程后,切換到 DefaultDispatcher-worker-1 線程,threadLocal 在該線程的值為 null
        // 調(diào)用 updateThreadContext() 設(shè)置 threadLocal 的值為 launch,保存之前的為 null
        printlnValue(threadLocal)
        // 在 DefaultDispatcher-worker-1 線程,修改 threadLocal 的值為 launch changed
        threadLocal.set("launch changed")
        printlnValue(threadLocal)
        // yield() 掛起函數(shù)會掛起當(dāng)前協(xié)程,并將協(xié)程分發(fā)到 Dispatcher.Default 的隊(duì)列中等待恢復(fù)
        // 掛起協(xié)程后調(diào)用 restoreThreadContext() 修改 threadLocal 為 null
        yield()
// 恢復(fù)協(xié)程后,此時(shí)在 DefaultDispatcher-worker-2 線程,threadLocal 的值為 null
        // 再次調(diào)用 updateThreadContext() 設(shè)置 threadLocal 的值為 launch,保存之前的為 null
        printlnValue(threadLocal)
        // 結(jié)束協(xié)程后,restoreThreadContext() 修改 threadLocal 為 null
    }
    job.join()
    // 此時(shí)已經(jīng)從 DefaultDispatcher-worker-2 線程切換回 main 線程,main 線程中的 threadlocal 沒有修改過,還是為 Init
    printlnValue(threadLocal)
}

private fun printlnValue(threadLocal: ThreadLocal<String>) {
    println("${Thread.currentThread()} thread local value: ${threadLocal.get()}")
}

所以 ThreadContextElement 并不能跟蹤所有ThreadLocal對象的訪問,而且每次掛起時(shí)更新的值將丟失。最重要的牢記它的原理:啟動和恢復(fù)時(shí)保存ThreadLocal在當(dāng)前線程的值,并修改為 value,掛起和結(jié)束時(shí)修改當(dāng)前線程ThreadLocal的值為之前保存的值。

3. 已有線程同步方式

協(xié)程中的并發(fā)與線程的并發(fā)大部分是相同的,所以本篇文章應(yīng)該是目前為止該系列文章中最容易理解的一篇,本系列Kotlin Coroutines(協(xié)程) 完全解析暫時(shí)就到這里,后面待 select 表達(dá)式、Channel、Actor 等實(shí)驗(yàn)性內(nèi)容正式發(fā)布后繼續(xù)解析,還有在 Android 項(xiàng)目中協(xié)程的實(shí)際運(yùn)用,敬請期待。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容