github原文地址
原創(chuàng)翻譯,轉載請保留或注明出處:http://www.itdecent.cn/p/01d26fbc9b80
共享的可變狀態(tài)和并發(fā)
協(xié)程可用多線程調度器(比如默認的 CommonPool )并發(fā)執(zhí)行。這樣就可以提出所有常見的并發(fā)問題。主要的問題是同步訪問共享的可變狀態(tài)。協(xié)程領域對這個問題的一些解決方案類似于多線程領域中的解決方案,但其他解決方案則是獨一無二的。
問題
我們啟動一千個協(xié)程,它們都做一千次相同的動作(總計100萬次執(zhí)行)。我們同時會測量它們的完成時間,以便進一步的比較:
suspend fun massiveRun(context: CoroutineContext, action: suspend () -> Unit) {
val n = 1000 // number of coroutines to launch
val k = 1000 // times an action is repeated by each coroutine
val time = measureTimeMillis {
val jobs = List(n) {
launch(context) {
repeat(k) { action() }
}
}
jobs.forEach { it.join() }
}
println("Completed ${n * k} actions in $time ms")
}
我們從一個非常簡單的動作開始:在多線程 CommonPool 上下文遞增一個共享的可變變量。
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
counter++
}
println("Counter = $counter")
}
獲取完整代碼 here
這段代碼最后打印出什么結果?它不太可能打印出“Counter = 1000000”,因為一千個協(xié)程從多個線程同時遞增計數器而且沒有做同步并發(fā)處理。
注意:如果你的運行機器使用兩個或者更少的cpu,那么你總是會看到1000000,因為
CommonPool在這種情況下只會在一個線程中運行。要重現(xiàn)這個問題,可以做如下的變動:
val mtContext = newFixedThreadPoolContext(2, "mtPool") // explicitly define context with two threads
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(mtContext) { // use it instead of CommonPool in this sample and below
counter++
}
println("Counter = $counter")
}
獲取完整代碼 here
沒有發(fā)揮作用的volatile
有一種常見的誤解:volatile 可以解決并發(fā)問題。讓我們嘗試一下:
@Volatile // in Kotlin `volatile` is an annotation
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
counter++
}
println("Counter = $counter")
}
獲取完整代碼 here
這段代碼運行速度更慢了,但我們仍然沒有得到 “Counter = 1000000”,因為 volatile 變量保證可線性化(這是“原子”的技術術語)讀取和寫入變量,但在大量動作(在我們的示例中即“遞增”操作)發(fā)生時并不提供原子性。
線程安全的數據結構
一種對線程、協(xié)程都有效的常規(guī)解決方法,就是使用線程安全(也稱為同步的、可線性化、原子)的數據結構,它為需要在共享狀態(tài)上執(zhí)行的相應操作提供所有必需的同步處理。在簡單的計數器場景中,我們可以使用具有 incrementAndGet 原子操作的AtomicInteger 類:
var counter = AtomicInteger()
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
counter.incrementAndGet()
}
println("Counter = ${counter.get()}")
}
獲取完整代碼 here
這是針對此類特定問題的最快解決方案。它適用于普通計數器、集合、隊列和其他標準數據結構以及它們的基本操作。然而,它并不容易擴展為應對復雜狀態(tài)、或復雜操作沒有現(xiàn)成的線程安全實現(xiàn)的情況。
以細粒度限制線程
限制線程是解決共享可變狀態(tài)問題的一種方案,其中對特定共享狀態(tài)的所有訪問權都限制在單個線程中。它通常應用于UI程序中:所有UI狀態(tài)都局限于單個事件分發(fā)線程或應用主線程中。這在協(xié)程中很容易實現(xiàn),通過使用一個單線程上下文:
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) { // run each coroutine in CommonPool
withContext(counterContext) { // but confine each increment to the single-threaded context
counter++
}
}
println("Counter = $counter")
}
獲取完整代碼 here
這段代碼運行非常緩慢,因為它進行了細粒度的線程限制。每個增量操作都得使用 withContext 塊從多線程 CommonPool 上下文切換到單線程上下文。
以粗粒度限制線程
在實踐中,線程限制是在大段代碼中執(zhí)行的,例如:狀態(tài)更新類業(yè)務邏輯中大部分都是限于單線程中。下面的示例演示了這種情況,在單線程上下文中運行每個協(xié)程。
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(counterContext) { // run each coroutine in the single-threaded context
counter++
}
println("Counter = $counter")
}
獲取完整代碼 here
這段代碼運行更快而且打印出了正確的結果。
互斥
該問題的互斥解決方案是使用永遠不會同時執(zhí)行的關鍵代碼塊來保護共享狀態(tài)的所有修改。在阻塞的世界中,你通常會使用 synchronized 或者 ReentrantLock 。在協(xié)程中的替代品叫做 Mutex 。它具有 lock 和 unlock 方法,可以隔離關鍵的部分。關鍵的區(qū)別在于 Mutex.lock() 是一個掛起函數,它不會阻塞線程。
還有 withLock 擴展函數,可以方便的替代常用的 mutex.lock(); try { ... } finally { mutex.unlock() } 模式:
val mutex = Mutex()
var counter = 0
fun main(args: Array<String>) = runBlocking<Unit> {
massiveRun(CommonPool) {
mutex.withLock {
counter++
}
}
println("Counter = $counter")
}
獲取完整代碼 here
此示例中鎖是細粒度的,因此會付出一些代價。但是對于某些必須定期修改共享狀態(tài)的場景,它是一個不錯的選擇,但是沒有自然線程可以限制此狀態(tài)。
Actors
一個 actor 是由若干元素組成的一個實體:一個協(xié)程、它的狀態(tài)受限封裝在此協(xié)程中、以及一個與其他協(xié)程通信的 channel 。一個簡單的 actor 可以簡單的寫成一個函數,但是一個擁有復雜狀態(tài)的 actor 更適合由類來表示。
有一個 actor 協(xié)程構建器,它可以方便地將 actor 的郵箱 channel 組合到其作用域中(用來接收消息)、組合發(fā)送 channel 與結果集對象,這樣對 actor 的單個引用就可以作為其句柄持有。
使用 actor 的第一步是定一個 actor 要處理的消息類。Kotlin 的 sealed classes 密封類很適合這種場景。我們使用 IncCounter 消息(用來遞增計數器)和 GetCounter 消息(用來獲取值)來定義 CounterMsg 密封類。后者需要發(fā)送回復。CompletableDeferred 通信原語表示未來可知(傳達)的單個值,此處用于此目的。
// Message types for counterActor
sealed class CounterMsg
object IncCounter : CounterMsg() // one-way message to increment counter
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // a request with reply
接下來我們定義一個函數,使用 actor 協(xié)程構建器來啟動一個 actor:
// This function launches a new counter actor
fun counterActor() = actor<CounterMsg> {
var counter = 0 // actor state
for (msg in channel) { // iterate over incoming messages
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
主函數代碼很簡單:
fun main(args: Array<String>) = runBlocking<Unit> {
val counter = counterActor() // create the actor
massiveRun(CommonPool) {
counter.send(IncCounter)
}
// send a message to get a counter value from an actor
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
println("Counter = ${response.await()}")
counter.close() // shutdown the actor
}
獲取完整代碼 here
actor 本身執(zhí)行所處上下文的正確性無關緊要。一個 actor 是一個協(xié)程,而一個協(xié)程是按順序執(zhí)行的,因此將狀態(tài)限制到特定協(xié)程可以解決共享可變狀態(tài)的問題。實際上,actor 可以修改自己的私有狀態(tài),但只能通過消息互相影響(避免任何鎖定)。
actor 在高負載下比鎖更有效,因為在這種情況下它總是有工作要做,而且根本不需要切換到不同的上下文。
注意, actor 協(xié)程構建器是 produce 協(xié)程構建器的雙重構件。一個 actor 與它接收消息的 channel 相關聯(lián),而一個 producer 與它發(fā)送元素的 channel 相關聯(lián)。