【翻譯】kotlin協(xié)程核心庫文檔(六)—— 共享的可變狀態(tài)和并發(fā)

github原文地址

原創(chuàng)翻譯,轉載請保留或注明出處:http://www.itdecent.cn/p/01d26fbc9b80

共享的可變狀態(tài)和并發(fā)


協(xié)程可用多線程調度器(比如默認的 CommonPool )并發(fā)執(zhí)行。這樣就可以提出所有常見的并發(fā)問題。主要的問題是同步訪問共享的可變狀態(tài)。協(xié)程領域對這個問題的一些解決方案類似于多線程領域中的解決方案,但其他解決方案則是獨一無二的。

問題

我們啟動一千個協(xié)程,它們都做一千次相同的動作(總計100萬次執(zhí)行)。我們同時會測量它們的完成時間,以便進一步的比較:

suspend fun massiveRun(context: CoroutineContext, action: suspend () -> Unit) {
    val n = 1000 // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        val jobs = List(n) {
            launch(context) {
                repeat(k) { action() }
            }
        }
        jobs.forEach { it.join() }
    }
    println("Completed ${n * k} actions in $time ms")
}

我們從一個非常簡單的動作開始:在多線程 CommonPool 上下文遞增一個共享的可變變量。

var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> {
    massiveRun(CommonPool) {
        counter++
    }
    println("Counter = $counter")
}

獲取完整代碼 here

這段代碼最后打印出什么結果?它不太可能打印出“Counter = 1000000”,因為一千個協(xié)程從多個線程同時遞增計數器而且沒有做同步并發(fā)處理。

注意:如果你的運行機器使用兩個或者更少的cpu,那么你總是會看到1000000,因為CommonPool在這種情況下只會在一個線程中運行。要重現(xiàn)這個問題,可以做如下的變動:

val mtContext = newFixedThreadPoolContext(2, "mtPool") // explicitly define context with two threads
var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> {
    massiveRun(mtContext) { // use it instead of CommonPool in this sample and below
        counter++
    }
    println("Counter = $counter")
}

獲取完整代碼 here

沒有發(fā)揮作用的volatile

有一種常見的誤解:volatile 可以解決并發(fā)問題。讓我們嘗試一下:

@Volatile // in Kotlin `volatile` is an annotation 
var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> {
    massiveRun(CommonPool) {
        counter++
    }
    println("Counter = $counter")
}

獲取完整代碼 here

這段代碼運行速度更慢了,但我們仍然沒有得到 “Counter = 1000000”,因為 volatile 變量保證可線性化(這是“原子”的技術術語)讀取和寫入變量,但在大量動作(在我們的示例中即“遞增”操作)發(fā)生時并不提供原子性。

線程安全的數據結構

一種對線程、協(xié)程都有效的常規(guī)解決方法,就是使用線程安全(也稱為同步的、可線性化、原子)的數據結構,它為需要在共享狀態(tài)上執(zhí)行的相應操作提供所有必需的同步處理。在簡單的計數器場景中,我們可以使用具有 incrementAndGet 原子操作的AtomicInteger 類:

var counter = AtomicInteger()

fun main(args: Array<String>) = runBlocking<Unit> {
    massiveRun(CommonPool) {
        counter.incrementAndGet()
    }
    println("Counter = ${counter.get()}")
}

獲取完整代碼 here

這是針對此類特定問題的最快解決方案。它適用于普通計數器、集合、隊列和其他標準數據結構以及它們的基本操作。然而,它并不容易擴展為應對復雜狀態(tài)、或復雜操作沒有現(xiàn)成的線程安全實現(xiàn)的情況。

以細粒度限制線程

限制線程是解決共享可變狀態(tài)問題的一種方案,其中對特定共享狀態(tài)的所有訪問權都限制在單個線程中。它通常應用于UI程序中:所有UI狀態(tài)都局限于單個事件分發(fā)線程或應用主線程中。這在協(xié)程中很容易實現(xiàn),通過使用一個單線程上下文:

val counterContext = newSingleThreadContext("CounterContext")
var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> {
    massiveRun(CommonPool) { // run each coroutine in CommonPool
        withContext(counterContext) { // but confine each increment to the single-threaded context
            counter++
        }
    }
    println("Counter = $counter")
}

獲取完整代碼 here

這段代碼運行非常緩慢,因為它進行了細粒度的線程限制。每個增量操作都得使用 withContext 塊從多線程 CommonPool 上下文切換到單線程上下文。

以粗粒度限制線程

在實踐中,線程限制是在大段代碼中執(zhí)行的,例如:狀態(tài)更新類業(yè)務邏輯中大部分都是限于單線程中。下面的示例演示了這種情況,在單線程上下文中運行每個協(xié)程。

val counterContext = newSingleThreadContext("CounterContext")
var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> {
    massiveRun(counterContext) { // run each coroutine in the single-threaded context
        counter++
    }
    println("Counter = $counter")
}

獲取完整代碼 here

這段代碼運行更快而且打印出了正確的結果。

互斥

該問題的互斥解決方案是使用永遠不會同時執(zhí)行的關鍵代碼塊來保護共享狀態(tài)的所有修改。在阻塞的世界中,你通常會使用 synchronized 或者 ReentrantLock 。在協(xié)程中的替代品叫做 Mutex 。它具有 lockunlock 方法,可以隔離關鍵的部分。關鍵的區(qū)別在于 Mutex.lock() 是一個掛起函數,它不會阻塞線程。

還有 withLock 擴展函數,可以方便的替代常用的 mutex.lock(); try { ... } finally { mutex.unlock() } 模式:

val mutex = Mutex()
var counter = 0

fun main(args: Array<String>) = runBlocking<Unit> {
    massiveRun(CommonPool) {
        mutex.withLock {
            counter++
        }
    }
    println("Counter = $counter")
}

獲取完整代碼 here

此示例中鎖是細粒度的,因此會付出一些代價。但是對于某些必須定期修改共享狀態(tài)的場景,它是一個不錯的選擇,但是沒有自然線程可以限制此狀態(tài)。

Actors

一個 actor 是由若干元素組成的一個實體:一個協(xié)程、它的狀態(tài)受限封裝在此協(xié)程中、以及一個與其他協(xié)程通信的 channel 。一個簡單的 actor 可以簡單的寫成一個函數,但是一個擁有復雜狀態(tài)的 actor 更適合由類來表示。

有一個 actor 協(xié)程構建器,它可以方便地將 actor 的郵箱 channel 組合到其作用域中(用來接收消息)、組合發(fā)送 channel 與結果集對象,這樣對 actor 的單個引用就可以作為其句柄持有。

使用 actor 的第一步是定一個 actor 要處理的消息類。Kotlin 的 sealed classes 密封類很適合這種場景。我們使用 IncCounter 消息(用來遞增計數器)和 GetCounter 消息(用來獲取值)來定義 CounterMsg 密封類。后者需要發(fā)送回復。CompletableDeferred 通信原語表示未來可知(傳達)的單個值,此處用于此目的。

// Message types for counterActor
sealed class CounterMsg
object IncCounter : CounterMsg() // one-way message to increment counter        
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply

接下來我們定義一個函數,使用 actor 協(xié)程構建器來啟動一個 actor:

// This function launches a new counter actor
fun counterActor() = actor<CounterMsg> {
    var counter = 0 // actor state
    for (msg in channel) { // iterate over incoming messages
        when (msg) {
            is IncCounter -> counter++
            is GetCounter -> msg.response.complete(counter)
        }
    }
}

主函數代碼很簡單:

fun main(args: Array<String>) = runBlocking<Unit> {
    val counter = counterActor() // create the actor
    massiveRun(CommonPool) {
        counter.send(IncCounter)
    }
    // send a message to get a counter value from an actor
    val response = CompletableDeferred<Int>()
    counter.send(GetCounter(response))
    println("Counter = ${response.await()}")
    counter.close() // shutdown the actor
}

獲取完整代碼 here

actor 本身執(zhí)行所處上下文的正確性無關緊要。一個 actor 是一個協(xié)程,而一個協(xié)程是按順序執(zhí)行的,因此將狀態(tài)限制到特定協(xié)程可以解決共享可變狀態(tài)的問題。實際上,actor 可以修改自己的私有狀態(tài),但只能通過消息互相影響(避免任何鎖定)。

actor 在高負載下比鎖更有效,因為在這種情況下它總是有工作要做,而且根本不需要切換到不同的上下文。

注意, actor 協(xié)程構建器是 produce 協(xié)程構建器的雙重構件。一個 actor 與它接收消息的 channel 相關聯(lián),而一個 producer 與它發(fā)送元素的 channel 相關聯(lián)。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容