倉(cāng)頡編程入門(mén):并發(fā)編程

倉(cāng)頡語(yǔ)言實(shí)現(xiàn)了M:N輕量線(xiàn)程模型,支持在少量系統(tǒng)線(xiàn)程之上創(chuàng)建海量用戶(hù)線(xiàn)程,在實(shí)現(xiàn)層面用戶(hù)線(xiàn)程對(duì)應(yīng)協(xié)程,倉(cāng)頡runtime會(huì)自動(dòng)管理和調(diào)度這些協(xié)程。
當(dāng)用戶(hù)線(xiàn)程t做I/O等資源訪(fǎng)問(wèn)操作時(shí),若資源尚未就緒,線(xiàn)程t就會(huì)被runtime掛起等待、并調(diào)入其他線(xiàn)程運(yùn)行,當(dāng)資源就緒后又會(huì)適時(shí)恢復(fù)t的執(zhí)行,高效利用CPU資源,實(shí)現(xiàn)高并發(fā)能力。

創(chuàng)建線(xiàn)程

創(chuàng)建一個(gè)新的倉(cāng)頡線(xiàn)程,可以使用關(guān)鍵字 spawn 并傳遞一個(gè)無(wú)形參的 lambda 表達(dá)式,該 lambda 表達(dá)式即為在新線(xiàn)程中執(zhí)行的代碼。

main() {
    spawn { =>
        println("New thread before sleeping")
        sleep(100 * Duration.millisecond) // sleep for 100ms.
        println("New thread after sleeping")
    }
    println("Main thread")
}

在上面的例子中,新線(xiàn)程會(huì)在主線(xiàn)程結(jié)束時(shí)一起停止,無(wú)論這個(gè)新線(xiàn)程是否已完成運(yùn)行。

訪(fǎng)問(wèn)線(xiàn)程

spawn 表達(dá)式的返回類(lèi)型是 Future<T>,T是線(xiàn)程函數(shù)的返回值類(lèi)型。

public class Future<T> {
    public func get(): T
    public func get(timeout: Duration): T
    public func tryGet(): Option<T>
    public func cancel():Unit
    public prop thread:Thread
}

get()

阻塞當(dāng)前線(xiàn)程,等待并獲取當(dāng)前Future<T> 對(duì)象對(duì)應(yīng)的線(xiàn)程的結(jié)果。

main() {
    let fut: Future<Int64> = spawn {=>
        //睡眠 1 秒
        sleep(1000 * Duration.millisecond)
        return 1
    }

    //等待線(xiàn)程完成
    let result: Int64 = fut.get()
    println(result)  // 1
}

get(timeout: Duration)

阻塞當(dāng)前線(xiàn)程,等待指定時(shí)長(zhǎng)并獲取當(dāng)前Future<T> 對(duì)象對(duì)應(yīng)的線(xiàn)程的返回值。如果相應(yīng)的線(xiàn)程在指定時(shí)間內(nèi)未完成執(zhí)行,則該函數(shù)將拋出異常TimeoutException。

cancel()

給當(dāng)前Future實(shí)例對(duì)應(yīng)的倉(cāng)頡線(xiàn)程發(fā)送取消請(qǐng)求。該方法不會(huì)立即停止線(xiàn)程執(zhí)行,僅發(fā)送請(qǐng)求,相應(yīng)地Future類(lèi)的函數(shù)hasPendingCancellation可用于檢查線(xiàn)程是否存在取消請(qǐng)求,開(kāi)發(fā)者可以通過(guò)該檢查來(lái)自行決定是否提前終止線(xiàn)程以及如何終止線(xiàn)程。

main(): Unit {
    /* 創(chuàng)建線(xiàn)程 */
    let future = spawn {
        while (true) {
            if (Thread.currentThread.hasPendingCancellation) {
                return 0
            }
        }
        return 1
    }
    /* 向線(xiàn)程發(fā)送取消請(qǐng)求 */
    future.cancel()
    let res = future.get()
    println(res)     // 0
}

同步機(jī)制

在并發(fā)編程中,如果缺少同步機(jī)制來(lái)保護(hù)多個(gè)線(xiàn)程共享的變量,很容易會(huì)出現(xiàn)數(shù)據(jù)競(jìng)爭(zhēng)問(wèn)題。
例如以下案例,當(dāng)創(chuàng)建1000個(gè)線(xiàn)程對(duì)一個(gè)變量加1時(shí),理想輸出應(yīng)該是1000,然而實(shí)際每次執(zhí)行結(jié)果都不是1000并且值都不一樣。

import std.collection.*
var count = 0
main() {
    let list = ArrayList<Future<Int64>>()
     for (_ in 0..1000) {
        let fut= spawn {
            sleep(Duration.millisecond)
            count++
            return count
        }
        list.add(fut)
    }
    for (f in list) {
        f.get()
    }
    println("count = ${count}") // count = 993
}

倉(cāng)頡編程語(yǔ)言提供三種常見(jiàn)的同步機(jī)制來(lái)確保數(shù)據(jù)的線(xiàn)程安全:原子操作、互斥鎖和條件變量。

原子操作 Atomic

倉(cāng)頡提供整數(shù)類(lèi)型、Bool 類(lèi)型和引用類(lèi)型的原子操作。
整數(shù)類(lèi)型的原子操作支持基本的讀寫(xiě)、交換以及算術(shù)運(yùn)算操作:

操作 功能
load 讀取
store 寫(xiě)入
swap 交換,返回交換前的值
compareAndSwap 比較再交換,交換成功返回 true,否則返回 false
fetchAdd 加法,返回執(zhí)行加操作之前的值
fetchSub 減法,返回執(zhí)行減操作之前的值
fetchAnd 與,返回執(zhí)行與操作之前的值
fetchOr 或,返回執(zhí)行或操作之前的值
fetchXor 異或,返回執(zhí)行異或操作之前的值

Bool類(lèi)型引用類(lèi)型的原子操作只提供讀寫(xiě)和交換操作:

操作 功能
load 讀取
store 寫(xiě)入
swap 交換,返回交換前的值
compareAndSwap 比較再交換,交換成功返回 true,否則返回 false

將以上案例使用原子操作修改如下:

import std.collection.*
import std.sync.*
var count = AtomicInt64(0);
main() {
    let list = ArrayList<Future<Int64>>()
     for (_ in 0..1000) {
        let fut= spawn {
            sleep(Duration.millisecond)
            return count.fetchAdd(1)
        }
        list.add(fut)
    }
    for (f in list) {
        f.get()
    }
    println("count = ${count.load()}")  //count = 1000
}

可重入互斥鎖Mutex

可重入互斥鎖的作用是對(duì)臨界區(qū)加以保護(hù),使得任意時(shí)刻最多只有一個(gè)線(xiàn)程能夠執(zhí)行臨界區(qū)的代碼。當(dāng)一個(gè)線(xiàn)程試圖獲取一個(gè)已被其他線(xiàn)程持有的鎖時(shí),該線(xiàn)程會(huì)被阻塞,直到鎖被釋放,該線(xiàn)程才會(huì)被喚醒,可重入是指線(xiàn)程獲取該鎖后可再次獲得該鎖。
使用可重入互斥鎖時(shí),必須牢記兩條規(guī)則:
1.在訪(fǎng)問(wèn)共享數(shù)據(jù)之前,必須嘗試獲取鎖
2.處理完共享數(shù)據(jù)后,必須釋放鎖,以便其他線(xiàn)程可以獲得鎖。

import std.sync.*
import std.collection.*
var count: Int64 = 0
let mtx = Mutex()
main() {
    let list = ArrayList<Future<Unit>>()
    for (_ in 0..1000) {
        let fut = spawn {
            sleep(Duration.millisecond) 
            mtx.lock()  // 獲取鎖
            count++
            mtx.unlock() // 釋放鎖
        }
        list.add(fut)
    }
    for (f in list) {
        f.get()
    }
    println("count = ${count}")  //count = 1000
}

Condition

Condition 實(shí)例由互斥鎖創(chuàng)建,一個(gè)互斥鎖可以創(chuàng)建多個(gè) Condition 實(shí)例。Condition 可以使線(xiàn)程阻塞并等待來(lái)自另一個(gè)線(xiàn)程的信號(hào)以恢復(fù)執(zhí)行。
調(diào)用 Condition 接口的 wait、notify 或 notifyAll 方法前,需要確保當(dāng)前線(xiàn)程已經(jīng)持有綁定的鎖。
調(diào)用 Condition 接口的 wait方法包含如下動(dòng)作:
1.添加當(dāng)前線(xiàn)程到對(duì)應(yīng)鎖的等待隊(duì)列中;
2.阻塞當(dāng)前線(xiàn)程,同時(shí)完全釋放該鎖,并記錄鎖的重入次數(shù);
3.等待某個(gè)其他線(xiàn)程使用同一個(gè) Condition 實(shí)例的 notify 或 notifyAll 方法向該線(xiàn)程發(fā)出信號(hào);
4.當(dāng)前線(xiàn)程被喚醒后,會(huì)自動(dòng)嘗試重新獲取鎖,且持有鎖的重入狀態(tài)與第 2 步記錄的重入次數(shù)相同;但是如果嘗試獲取鎖失敗,則當(dāng)前線(xiàn)程會(huì)阻塞在該鎖上。

import std.sync.*
let mtx = Mutex()
let condition = synchronized(mtx) {
    mtx.condition()
}
var flag: Bool = true
main(): Int64 {
    let fut = spawn {
        mtx.lock()
        while (flag) {
            println("執(zhí)行1")
            condition.wait()
            println("執(zhí)行4")
        }
        mtx.unlock()
    }
    sleep(10 * Duration.millisecond)
    mtx.lock()
    println("執(zhí)行2")
    flag = false
    mtx.unlock()
    mtx.lock()
    println("執(zhí)行3")
    condition.notifyAll()
    mtx.unlock()
    fut.get()
    return 0
}

synchronized 關(guān)鍵字

倉(cāng)頡編程語(yǔ)言提供一個(gè) synchronized 關(guān)鍵字,搭配 Lock 一起使用,可以在其后跟隨的作用域內(nèi)自動(dòng)進(jìn)行加鎖解鎖操作,用來(lái)解決類(lèi)似的問(wèn)題。
一個(gè)線(xiàn)程在進(jìn)入 synchronized 修飾的代碼塊之前,會(huì)自動(dòng)獲取 Lock 實(shí)例對(duì)應(yīng)的鎖,如果無(wú)法獲取鎖,則當(dāng)前線(xiàn)程被阻塞;
一個(gè)線(xiàn)程在退出 synchronized 修飾的代碼塊之前,會(huì)自動(dòng)釋放該 Lock 實(shí)例的鎖。

import std.sync.*
import std.collection.*
var count: Int64 = 0
var mtx: Mutex = Mutex()
main() {
    let list = ArrayList<Future<Unit>>()
    for (i in 0..10) {
        let fut = spawn {
            while (true) {
                synchronized(mtx) {
                    count = count + 1
                    break
                }
            }
        }
        list.add(fut)
    }
    for (f in list) {
        f.get()
    }
    println("count = ${count}")  //count = 10
}

ThreadLocal

使用 core 包中的 ThreadLocal 可以創(chuàng)建并使用線(xiàn)程局部變量,每一個(gè)線(xiàn)程都有它獨(dú)立的一個(gè)存儲(chǔ)空間來(lái)保存這些線(xiàn)程局部變量。因此,在每個(gè)線(xiàn)程可以安全地訪(fǎng)問(wèn)他們各自的線(xiàn)程局部變量,而不受其他線(xiàn)程的影響。

import std.sync.*
main() {
    let tl = ThreadLocal<Int64>()
    let fut = spawn {
        tl.set(0)
        var t=0
        for(_ in 0..1000){
            t++
        }
        tl.set(t) 
        println("tl in spawn1 = ${tl.get().getOrThrow()}") //1000
    }
      let fut2 = spawn {
        tl.set(0)
        var t=0
        for(_ in 0..10){
            t++
        }
        tl.set(t) 
        println("tl in spawn2 = ${tl.get().getOrThrow()}") //10
    }
    fut.get()
    fut2.get()
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容