Kotlin(十九)協(xié)程初探究<2>

共享資源的控制

共享資源可以是共享變量,也可是數(shù)據(jù)庫的共享數(shù)據(jù),如何保證共享資源的正確性至關重要。

1. 鎖模式

類似前面的例子一個商品庫存在一個搶購商品,活動中高并發(fā)可能出現(xiàn)超賣的情況,所以要對商品庫商品共享資源進行加鎖,保證同一個時刻只有一個線程對其進行讀寫。
java中有syncronized關鍵字,但是kotlin沒有,取而代之使用了@syncronized注解和syncronized()來實現(xiàn)相同效果。

除了支持syncronized這種并發(fā)原語,還支持java.util.concurrent.* 下面的并發(fā)工具,比如volatile關鍵字在Kotlin變成了注解:

@Volatile private var running = false

看下syncronized的例子

class Shop {
    val goods = hashMapOf<Long,Int>()

    init {
        goods.put(1,10)
        goods.put(2,15)
    }

    @Synchronized fun buyGoods1(id: Long) {
        val stock = goods.getValue(id)
        goods.put(id, stock - 1)
    }

    fun buyGoods2(id: Long) {
        synchronized(this) {
            val stock = goods.getValue(id)
            goods.put(id, stock - 1)
        }
    }
}

對比Java 可以syncronized的方式進行同步加鎖,還可以用Lock的方式來對代碼進行加鎖。改造上面的方法:

    val lock = ReentrantLock()

    fun buyGoods3(id: Long) {
        lock.lock()
        try {
            val stock = goods.getValue(id)
            goods.put(id, stock - 1)
        } catch (ex: Exception) {
            println("[Exception] is $ex")
        } finally {
            lock.unlock()
        }
    }

這種寫法的缺點:

  • 若是在同一個類內(nèi)有多個同步方法,將會競爭同一把鎖
  • 在加鎖之后,編碼人員容易忘記解鎖操作
  • 重復的模板代碼

對這個方法進行改進,提高這個方法的抽象程度:

fun <T> withLock(lock: Lock, action: () -> T): T {
    lock.lock()
    try {
        return action()
    } finally {
        lock.unlock()
    }
}

withLock 傳入一個lock對象的lambda表達式,我們現(xiàn)在可以不用關心對buyGoods進行加鎖,只需要在調(diào)用時候傳入一個lock對象就可以。

fun <T> withLock(lock: Lock, action: () -> T): T {
    lock.lock()
    try {
        return action()
    } finally {
        lock.unlock()
    }
}

val lock = ReentrantLock()

fun buyGoods(id: Long) {
        val stock = goods.getValue(id)
        goods.put(id, stock - 1)
}

fun buyGoods4(id: Long) {
        withLock(lock, {buyGoods(4)})
}

不同的商家?guī)齑嬷g商品庫存并不會發(fā)生并發(fā)沖突,A商家購買衣服和B商家購買鞋子是可以同時進行的,但是我們現(xiàn)在給商品的buyGoods加鎖,不滿足這個實際場景。因為同一個時刻只能被一個線程調(diào)用,從而導致鎖競爭激烈,引發(fā)崩潰。

解決這個問題核心:如何對并發(fā)時最會發(fā)生沖突的部分進行加鎖。那么我們對上架buyGoods進行加鎖。

class Shop (private var goods: HashMap<Long, Int>) {
    private val lock: Lock = ReentrantLock()

    fun buyGoods(id: Long) {
        lock.withLock {
            val stock = goods.getValue(id)
            goods.put(id, stock - 1)
        }
    }
}

class ShopApi {
    private val A_goods = hashMapOf<Long,Int>()
    private val B_goods = hashMapOf<Long,Int>()

    private var shopA: Shop
    private var shopB: Shop

    init {
        A_goods.put(1,10)
        A_goods.put(2,15)
        B_goods.put(1,20)
        B_goods.put(2,10)
        shopA = Shop(A_goods)
        shopB = Shop(B_goods)
    }

    fun buyGoods(shopName: String, id: Long) {
        when (shopName) {
            "A" -> shopA.buyGoods(id)
            "B" -> shopB.buyGoods(id)
            else -> {}
        }
    }
}

fun main(args: Array<String>) {
    val shopApi = ShopApi()
    shopApi.buyGoods("A",1)
    shopApi.buyGoods("A",2)
    shopApi.buyGoods("B",2)
}

這樣需要初始化多個Shop.(或者運行時初始化,不過就需要考慮線程安全問題)。需要成千上萬商家,使用when來匹配是一個災難。這種方式無法支持異步操作。

改善需要下面幾點:

  • 獨立的一個單元,可以有狀態(tài),可以處理邏輯(比如Shop類)
  • 每個單元有獨特的標識,且系統(tǒng)最多只有一個實例
  • 每個單元可以順序處理邏輯,不會有并發(fā)問題,方法同步是一種方案,線程安全的消息隊列也是一種方案。
  • 最好能支持異步操作,處理成功后有返回值。

2. 有狀態(tài)的并行計算單元Actor

最先應用在Erlang中應用,而且現(xiàn)在Actor模型已經(jīng)被應用在生產(chǎn)環(huán)境中,比如Akka(一個基于Actor模型的并發(fā)框架)。很多語言也支持Actor模型,比如Scala,Java,包括Kotlin頁內(nèi)置Actor模型。Actor模型要做的事:

  • 另一種思維來解決并發(fā)問題,而不是只有共享內(nèi)存這一種方式;
  • 提高鎖抽象的程度,盡量不再業(yè)務中出現(xiàn)鎖,減少因為使用鎖出現(xiàn)的問題,比如死鎖;
  • 為解決分布式并發(fā)問題提供一個更好的思路

舉個例子,兩個人只知道對方的地址,他們交流,給對方傳遞消息,不能用網(wǎng)絡電話類通信,只通過信件傳遞消息。很像現(xiàn)實中郵政系統(tǒng),只需要把信投遞到相應的信箱中,不需要知道如何具體處理送達過程,也可能收到回復,相當消息反饋。把這個例子的信件相當與Actor中的消息,Actor與Actor之間只能通過消息通信。

截屏2021-03-01 11.53.16.png

Actor模式不同的人之間信息是不可變的,你不能發(fā)出這封信件后又去修改它的內(nèi)容,同時接收信件的人是從他的信箱里有序的處理消息,Actor內(nèi)部狀態(tài)將不再有線程安全問題。

實現(xiàn)一個簡化版方案例子,將購物消息發(fā)給商家Actor,商家Actor進行減庫存操作,并返回一個唯一的訂單號,支持查詢當前庫存。

import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.UntypedAbstractActor
import akka.pattern.Patterns
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import java.util.*

class ShopActor(val stocks: HashMap<Long, Int>) : UntypedAbstractActor() {
    var orderNumber = 1L
    override fun onReceive(message: Any?) {
        when (message) {
            is Action.Buy -> {
                val stock = stocks.getValue(message.id)
                if (stock > message.amount) {
                    stocks.plus(Pair(message.id, stock - message.amount))
                    sender.tell(orderNumber, self)
                    orderNumber++
                } else {
                    sender.tell("low stocks", self)
                }
            }
            is Action.GetStock -> {
                sender.tell(stocks.get(message.id), self)
            }
        }
    }
}

sealed class Action {
    data class BuyOrInit(val id: Long, val userId: Long, val amount: Long, val shopName: String, val stocks: Map<Long, Int>) : Action()
    data class Buy(val id: Long, val userId: Long, val amount: Long) : Action()
    data class GetStock(val id: Long) : Action()
    data class GetStockOrInit(val id: Long, val shopName: String, val stocks: Map<Long, Int>) : Action()
}

class ManageActor : UntypedAbstractActor() {
    override fun onReceive(message: Any?) {
        when (message) {
            is Action.BuyOrInit -> getOrInit(message.shopName,message.stocks).forward(Action.Buy(message.id, message.userId, message.amount), context)
            is Action.GetStockOrInit -> getOrInit(message.shopName,message.stocks).forward(Action.GetStock(message.id), context)
        }
    }

    fun getOrInit(shopName: String, stocks: Map<Long, Int>): ActorRef {
        return context.findChild("shop-actor-${shopName}").orElseGet { context.actorOf(Props.create(ShopActor::class.java, stocks), "shop-actor-${shopName}") }
    }

}

fun main(args: Array<String>) {
    val stocksA = hashMapOf(Pair(1L, 10), Pair(2L, 5), Pair(3L, 20))
    val stocksB = hashMapOf(Pair(1L, 15), Pair(2L, 8), Pair(3L, 30))
    val actorSystem = ActorSystem.apply("shop-system") //
    val manageActor = actorSystem.actorOf(Props.create(ManageActor::class.java), "manage-actor")
    val timeout = Timeout(Duration.create(3, "seconds"))

    val resA = Patterns.ask(manageActor, Action.GetStockOrInit(1L, "A", stocksA), timeout)
    val stock = Await.result(resA, timeout.duration())
    println("the stock is ${stock}")

    val resB = Patterns.ask(manageActor, Action.BuyOrInit(2L, 1L, 1,"B", stocksB), timeout)
    val orderNumber = Await.result(resB, timeout.duration())
    println("the orderNumber is ${orderNumber}")


}

shopActor 內(nèi)部有兩個狀態(tài),stocks 和 orderNumber 分別代表庫存和訂單號.
定義sealed class 表示用戶請求行為,ShopActor內(nèi)部有onReceive方法,根據(jù)用戶不同請求做不同處理。
ManagerActor負責管理和初始化ShopActor。

相關CQRS架構(gòu)和一些理解參考

下面是這個例子的理解圖例

截屏2021-03-01 12.45.54.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。
禁止轉(zhuǎn)載,如需轉(zhuǎn)載請通過簡信或評論聯(lián)系作者。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容