共享資源的控制
共享資源可以是共享變量,也可是數(shù)據(jù)庫的共享數(shù)據(jù),如何保證共享資源的正確性至關重要。
1. 鎖模式
類似前面的例子一個商品庫存在一個搶購商品,活動中高并發(fā)可能出現(xiàn)超賣的情況,所以要對商品庫商品共享資源進行加鎖,保證同一個時刻只有一個線程對其進行讀寫。
java中有syncronized關鍵字,但是kotlin沒有,取而代之使用了@syncronized注解和syncronized()來實現(xiàn)相同效果。
除了支持syncronized這種并發(fā)原語,還支持java.util.concurrent.* 下面的并發(fā)工具,比如volatile關鍵字在Kotlin變成了注解:
@Volatile private var running = false
看下syncronized的例子
class Shop {
val goods = hashMapOf<Long,Int>()
init {
goods.put(1,10)
goods.put(2,15)
}
@Synchronized fun buyGoods1(id: Long) {
val stock = goods.getValue(id)
goods.put(id, stock - 1)
}
fun buyGoods2(id: Long) {
synchronized(this) {
val stock = goods.getValue(id)
goods.put(id, stock - 1)
}
}
}
對比Java 可以syncronized的方式進行同步加鎖,還可以用Lock的方式來對代碼進行加鎖。改造上面的方法:
val lock = ReentrantLock()
fun buyGoods3(id: Long) {
lock.lock()
try {
val stock = goods.getValue(id)
goods.put(id, stock - 1)
} catch (ex: Exception) {
println("[Exception] is $ex")
} finally {
lock.unlock()
}
}
這種寫法的缺點:
- 若是在同一個類內(nèi)有多個同步方法,將會競爭同一把鎖
- 在加鎖之后,編碼人員容易忘記解鎖操作
- 重復的模板代碼
對這個方法進行改進,提高這個方法的抽象程度:
fun <T> withLock(lock: Lock, action: () -> T): T {
lock.lock()
try {
return action()
} finally {
lock.unlock()
}
}
withLock 傳入一個lock對象的lambda表達式,我們現(xiàn)在可以不用關心對buyGoods進行加鎖,只需要在調(diào)用時候傳入一個lock對象就可以。
fun <T> withLock(lock: Lock, action: () -> T): T {
lock.lock()
try {
return action()
} finally {
lock.unlock()
}
}
val lock = ReentrantLock()
fun buyGoods(id: Long) {
val stock = goods.getValue(id)
goods.put(id, stock - 1)
}
fun buyGoods4(id: Long) {
withLock(lock, {buyGoods(4)})
}
不同的商家?guī)齑嬷g商品庫存并不會發(fā)生并發(fā)沖突,A商家購買衣服和B商家購買鞋子是可以同時進行的,但是我們現(xiàn)在給商品的buyGoods加鎖,不滿足這個實際場景。因為同一個時刻只能被一個線程調(diào)用,從而導致鎖競爭激烈,引發(fā)崩潰。
解決這個問題核心:如何對并發(fā)時最會發(fā)生沖突的部分進行加鎖。那么我們對上架buyGoods進行加鎖。
class Shop (private var goods: HashMap<Long, Int>) {
private val lock: Lock = ReentrantLock()
fun buyGoods(id: Long) {
lock.withLock {
val stock = goods.getValue(id)
goods.put(id, stock - 1)
}
}
}
class ShopApi {
private val A_goods = hashMapOf<Long,Int>()
private val B_goods = hashMapOf<Long,Int>()
private var shopA: Shop
private var shopB: Shop
init {
A_goods.put(1,10)
A_goods.put(2,15)
B_goods.put(1,20)
B_goods.put(2,10)
shopA = Shop(A_goods)
shopB = Shop(B_goods)
}
fun buyGoods(shopName: String, id: Long) {
when (shopName) {
"A" -> shopA.buyGoods(id)
"B" -> shopB.buyGoods(id)
else -> {}
}
}
}
fun main(args: Array<String>) {
val shopApi = ShopApi()
shopApi.buyGoods("A",1)
shopApi.buyGoods("A",2)
shopApi.buyGoods("B",2)
}
這樣需要初始化多個Shop.(或者運行時初始化,不過就需要考慮線程安全問題)。需要成千上萬商家,使用when來匹配是一個災難。這種方式無法支持異步操作。
改善需要下面幾點:
- 獨立的一個單元,可以有狀態(tài),可以處理邏輯(比如Shop類)
- 每個單元有獨特的標識,且系統(tǒng)最多只有一個實例
- 每個單元可以順序處理邏輯,不會有并發(fā)問題,方法同步是一種方案,線程安全的消息隊列也是一種方案。
- 最好能支持異步操作,處理成功后有返回值。
2. 有狀態(tài)的并行計算單元Actor
最先應用在Erlang中應用,而且現(xiàn)在Actor模型已經(jīng)被應用在生產(chǎn)環(huán)境中,比如Akka(一個基于Actor模型的并發(fā)框架)。很多語言也支持Actor模型,比如Scala,Java,包括Kotlin頁內(nèi)置Actor模型。Actor模型要做的事:
- 另一種思維來解決并發(fā)問題,而不是只有
共享內(nèi)存這一種方式; - 提高鎖抽象的程度,盡量不再業(yè)務中出現(xiàn)鎖,減少因為使用鎖出現(xiàn)的問題,比如死鎖;
- 為解決分布式并發(fā)問題提供一個更好的思路
舉個例子,兩個人只知道對方的地址,他們交流,給對方傳遞消息,不能用網(wǎng)絡電話類通信,只通過信件傳遞消息。很像現(xiàn)實中郵政系統(tǒng),只需要把信投遞到相應的信箱中,不需要知道如何具體處理送達過程,也可能收到回復,相當消息反饋。把這個例子的信件相當與Actor中的消息,Actor與Actor之間只能通過消息通信。

Actor模式不同的人之間信息是不可變的,你不能發(fā)出這封信件后又去修改它的內(nèi)容,同時接收信件的人是從他的信箱里有序的處理消息,Actor內(nèi)部狀態(tài)將不再有線程安全問題。
實現(xiàn)一個簡化版方案例子,將購物消息發(fā)給商家Actor,商家Actor進行減庫存操作,并返回一個唯一的訂單號,支持查詢當前庫存。
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.UntypedAbstractActor
import akka.pattern.Patterns
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import java.util.*
class ShopActor(val stocks: HashMap<Long, Int>) : UntypedAbstractActor() {
var orderNumber = 1L
override fun onReceive(message: Any?) {
when (message) {
is Action.Buy -> {
val stock = stocks.getValue(message.id)
if (stock > message.amount) {
stocks.plus(Pair(message.id, stock - message.amount))
sender.tell(orderNumber, self)
orderNumber++
} else {
sender.tell("low stocks", self)
}
}
is Action.GetStock -> {
sender.tell(stocks.get(message.id), self)
}
}
}
}
sealed class Action {
data class BuyOrInit(val id: Long, val userId: Long, val amount: Long, val shopName: String, val stocks: Map<Long, Int>) : Action()
data class Buy(val id: Long, val userId: Long, val amount: Long) : Action()
data class GetStock(val id: Long) : Action()
data class GetStockOrInit(val id: Long, val shopName: String, val stocks: Map<Long, Int>) : Action()
}
class ManageActor : UntypedAbstractActor() {
override fun onReceive(message: Any?) {
when (message) {
is Action.BuyOrInit -> getOrInit(message.shopName,message.stocks).forward(Action.Buy(message.id, message.userId, message.amount), context)
is Action.GetStockOrInit -> getOrInit(message.shopName,message.stocks).forward(Action.GetStock(message.id), context)
}
}
fun getOrInit(shopName: String, stocks: Map<Long, Int>): ActorRef {
return context.findChild("shop-actor-${shopName}").orElseGet { context.actorOf(Props.create(ShopActor::class.java, stocks), "shop-actor-${shopName}") }
}
}
fun main(args: Array<String>) {
val stocksA = hashMapOf(Pair(1L, 10), Pair(2L, 5), Pair(3L, 20))
val stocksB = hashMapOf(Pair(1L, 15), Pair(2L, 8), Pair(3L, 30))
val actorSystem = ActorSystem.apply("shop-system") //
val manageActor = actorSystem.actorOf(Props.create(ManageActor::class.java), "manage-actor")
val timeout = Timeout(Duration.create(3, "seconds"))
val resA = Patterns.ask(manageActor, Action.GetStockOrInit(1L, "A", stocksA), timeout)
val stock = Await.result(resA, timeout.duration())
println("the stock is ${stock}")
val resB = Patterns.ask(manageActor, Action.BuyOrInit(2L, 1L, 1,"B", stocksB), timeout)
val orderNumber = Await.result(resB, timeout.duration())
println("the orderNumber is ${orderNumber}")
}
shopActor 內(nèi)部有兩個狀態(tài),stocks 和 orderNumber 分別代表庫存和訂單號.
定義sealed class 表示用戶請求行為,ShopActor內(nèi)部有onReceive方法,根據(jù)用戶不同請求做不同處理。
ManagerActor負責管理和初始化ShopActor。
下面是這個例子的理解圖例
