倉(cāng)頡語(yǔ)言實(shí)現(xiàn)了M:N輕量線(xiàn)程模型,支持在少量系統(tǒng)線(xiàn)程之上創(chuàng)建海量用戶(hù)線(xiàn)程,在實(shí)現(xiàn)層面用戶(hù)線(xiàn)程對(duì)應(yīng)協(xié)程,倉(cāng)頡runtime會(huì)自動(dòng)管理和調(diào)度這些協(xié)程。
當(dāng)用戶(hù)線(xiàn)程t做I/O等資源訪(fǎng)問(wèn)操作時(shí),若資源尚未就緒,線(xiàn)程t就會(huì)被runtime掛起等待、并調(diào)入其他線(xiàn)程運(yùn)行,當(dāng)資源就緒后又會(huì)適時(shí)恢復(fù)t的執(zhí)行,高效利用CPU資源,實(shí)現(xiàn)高并發(fā)能力。
創(chuàng)建線(xiàn)程
創(chuàng)建一個(gè)新的倉(cāng)頡線(xiàn)程,可以使用關(guān)鍵字 spawn 并傳遞一個(gè)無(wú)形參的 lambda 表達(dá)式,該 lambda 表達(dá)式即為在新線(xiàn)程中執(zhí)行的代碼。
main() {
spawn { =>
println("New thread before sleeping")
sleep(100 * Duration.millisecond) // sleep for 100ms.
println("New thread after sleeping")
}
println("Main thread")
}
在上面的例子中,新線(xiàn)程會(huì)在主線(xiàn)程結(jié)束時(shí)一起停止,無(wú)論這個(gè)新線(xiàn)程是否已完成運(yùn)行。
訪(fǎng)問(wèn)線(xiàn)程
spawn 表達(dá)式的返回類(lèi)型是 Future<T>,T是線(xiàn)程函數(shù)的返回值類(lèi)型。
public class Future<T> {
public func get(): T
public func get(timeout: Duration): T
public func tryGet(): Option<T>
public func cancel():Unit
public prop thread:Thread
}
get()
阻塞當(dāng)前線(xiàn)程,等待并獲取當(dāng)前Future<T> 對(duì)象對(duì)應(yīng)的線(xiàn)程的結(jié)果。
main() {
let fut: Future<Int64> = spawn {=>
//睡眠 1 秒
sleep(1000 * Duration.millisecond)
return 1
}
//等待線(xiàn)程完成
let result: Int64 = fut.get()
println(result) // 1
}
get(timeout: Duration)
阻塞當(dāng)前線(xiàn)程,等待指定時(shí)長(zhǎng)并獲取當(dāng)前Future<T> 對(duì)象對(duì)應(yīng)的線(xiàn)程的返回值。如果相應(yīng)的線(xiàn)程在指定時(shí)間內(nèi)未完成執(zhí)行,則該函數(shù)將拋出異常TimeoutException。
cancel()
給當(dāng)前Future實(shí)例對(duì)應(yīng)的倉(cāng)頡線(xiàn)程發(fā)送取消請(qǐng)求。該方法不會(huì)立即停止線(xiàn)程執(zhí)行,僅發(fā)送請(qǐng)求,相應(yīng)地Future類(lèi)的函數(shù)hasPendingCancellation可用于檢查線(xiàn)程是否存在取消請(qǐng)求,開(kāi)發(fā)者可以通過(guò)該檢查來(lái)自行決定是否提前終止線(xiàn)程以及如何終止線(xiàn)程。
main(): Unit {
/* 創(chuàng)建線(xiàn)程 */
let future = spawn {
while (true) {
if (Thread.currentThread.hasPendingCancellation) {
return 0
}
}
return 1
}
/* 向線(xiàn)程發(fā)送取消請(qǐng)求 */
future.cancel()
let res = future.get()
println(res) // 0
}
同步機(jī)制
在并發(fā)編程中,如果缺少同步機(jī)制來(lái)保護(hù)多個(gè)線(xiàn)程共享的變量,很容易會(huì)出現(xiàn)數(shù)據(jù)競(jìng)爭(zhēng)問(wèn)題。
例如以下案例,當(dāng)創(chuàng)建1000個(gè)線(xiàn)程對(duì)一個(gè)變量加1時(shí),理想輸出應(yīng)該是1000,然而實(shí)際每次執(zhí)行結(jié)果都不是1000并且值都不一樣。
import std.collection.*
var count = 0
main() {
let list = ArrayList<Future<Int64>>()
for (_ in 0..1000) {
let fut= spawn {
sleep(Duration.millisecond)
count++
return count
}
list.add(fut)
}
for (f in list) {
f.get()
}
println("count = ${count}") // count = 993
}
倉(cāng)頡編程語(yǔ)言提供三種常見(jiàn)的同步機(jī)制來(lái)確保數(shù)據(jù)的線(xiàn)程安全:原子操作、互斥鎖和條件變量。
原子操作 Atomic
倉(cāng)頡提供整數(shù)類(lèi)型、Bool 類(lèi)型和引用類(lèi)型的原子操作。
整數(shù)類(lèi)型的原子操作支持基本的讀寫(xiě)、交換以及算術(shù)運(yùn)算操作:
| 操作 | 功能 |
|---|---|
| load | 讀取 |
| store | 寫(xiě)入 |
| swap | 交換,返回交換前的值 |
| compareAndSwap | 比較再交換,交換成功返回 true,否則返回 false |
| fetchAdd | 加法,返回執(zhí)行加操作之前的值 |
| fetchSub | 減法,返回執(zhí)行減操作之前的值 |
| fetchAnd | 與,返回執(zhí)行與操作之前的值 |
| fetchOr | 或,返回執(zhí)行或操作之前的值 |
| fetchXor | 異或,返回執(zhí)行異或操作之前的值 |
Bool類(lèi)型和引用類(lèi)型的原子操作只提供讀寫(xiě)和交換操作:
| 操作 | 功能 |
|---|---|
| load | 讀取 |
| store | 寫(xiě)入 |
| swap | 交換,返回交換前的值 |
| compareAndSwap | 比較再交換,交換成功返回 true,否則返回 false |
將以上案例使用原子操作修改如下:
import std.collection.*
import std.sync.*
var count = AtomicInt64(0);
main() {
let list = ArrayList<Future<Int64>>()
for (_ in 0..1000) {
let fut= spawn {
sleep(Duration.millisecond)
return count.fetchAdd(1)
}
list.add(fut)
}
for (f in list) {
f.get()
}
println("count = ${count.load()}") //count = 1000
}
可重入互斥鎖Mutex
可重入互斥鎖的作用是對(duì)臨界區(qū)加以保護(hù),使得任意時(shí)刻最多只有一個(gè)線(xiàn)程能夠執(zhí)行臨界區(qū)的代碼。當(dāng)一個(gè)線(xiàn)程試圖獲取一個(gè)已被其他線(xiàn)程持有的鎖時(shí),該線(xiàn)程會(huì)被阻塞,直到鎖被釋放,該線(xiàn)程才會(huì)被喚醒,可重入是指線(xiàn)程獲取該鎖后可再次獲得該鎖。
使用可重入互斥鎖時(shí),必須牢記兩條規(guī)則:
1.在訪(fǎng)問(wèn)共享數(shù)據(jù)之前,必須嘗試獲取鎖;
2.處理完共享數(shù)據(jù)后,必須釋放鎖,以便其他線(xiàn)程可以獲得鎖。
import std.sync.*
import std.collection.*
var count: Int64 = 0
let mtx = Mutex()
main() {
let list = ArrayList<Future<Unit>>()
for (_ in 0..1000) {
let fut = spawn {
sleep(Duration.millisecond)
mtx.lock() // 獲取鎖
count++
mtx.unlock() // 釋放鎖
}
list.add(fut)
}
for (f in list) {
f.get()
}
println("count = ${count}") //count = 1000
}
Condition
Condition 實(shí)例由互斥鎖創(chuàng)建,一個(gè)互斥鎖可以創(chuàng)建多個(gè) Condition 實(shí)例。Condition 可以使線(xiàn)程阻塞并等待來(lái)自另一個(gè)線(xiàn)程的信號(hào)以恢復(fù)執(zhí)行。
調(diào)用 Condition 接口的 wait、notify 或 notifyAll 方法前,需要確保當(dāng)前線(xiàn)程已經(jīng)持有綁定的鎖。
調(diào)用 Condition 接口的 wait方法包含如下動(dòng)作:
1.添加當(dāng)前線(xiàn)程到對(duì)應(yīng)鎖的等待隊(duì)列中;
2.阻塞當(dāng)前線(xiàn)程,同時(shí)完全釋放該鎖,并記錄鎖的重入次數(shù);
3.等待某個(gè)其他線(xiàn)程使用同一個(gè) Condition 實(shí)例的 notify 或 notifyAll 方法向該線(xiàn)程發(fā)出信號(hào);
4.當(dāng)前線(xiàn)程被喚醒后,會(huì)自動(dòng)嘗試重新獲取鎖,且持有鎖的重入狀態(tài)與第 2 步記錄的重入次數(shù)相同;但是如果嘗試獲取鎖失敗,則當(dāng)前線(xiàn)程會(huì)阻塞在該鎖上。
import std.sync.*
let mtx = Mutex()
let condition = synchronized(mtx) {
mtx.condition()
}
var flag: Bool = true
main(): Int64 {
let fut = spawn {
mtx.lock()
while (flag) {
println("執(zhí)行1")
condition.wait()
println("執(zhí)行4")
}
mtx.unlock()
}
sleep(10 * Duration.millisecond)
mtx.lock()
println("執(zhí)行2")
flag = false
mtx.unlock()
mtx.lock()
println("執(zhí)行3")
condition.notifyAll()
mtx.unlock()
fut.get()
return 0
}
synchronized 關(guān)鍵字
倉(cāng)頡編程語(yǔ)言提供一個(gè) synchronized 關(guān)鍵字,搭配 Lock 一起使用,可以在其后跟隨的作用域內(nèi)自動(dòng)進(jìn)行加鎖解鎖操作,用來(lái)解決類(lèi)似的問(wèn)題。
一個(gè)線(xiàn)程在進(jìn)入 synchronized 修飾的代碼塊之前,會(huì)自動(dòng)獲取 Lock 實(shí)例對(duì)應(yīng)的鎖,如果無(wú)法獲取鎖,則當(dāng)前線(xiàn)程被阻塞;
一個(gè)線(xiàn)程在退出 synchronized 修飾的代碼塊之前,會(huì)自動(dòng)釋放該 Lock 實(shí)例的鎖。
import std.sync.*
import std.collection.*
var count: Int64 = 0
var mtx: Mutex = Mutex()
main() {
let list = ArrayList<Future<Unit>>()
for (i in 0..10) {
let fut = spawn {
while (true) {
synchronized(mtx) {
count = count + 1
break
}
}
}
list.add(fut)
}
for (f in list) {
f.get()
}
println("count = ${count}") //count = 10
}
ThreadLocal
使用 core 包中的 ThreadLocal 可以創(chuàng)建并使用線(xiàn)程局部變量,每一個(gè)線(xiàn)程都有它獨(dú)立的一個(gè)存儲(chǔ)空間來(lái)保存這些線(xiàn)程局部變量。因此,在每個(gè)線(xiàn)程可以安全地訪(fǎng)問(wèn)他們各自的線(xiàn)程局部變量,而不受其他線(xiàn)程的影響。
import std.sync.*
main() {
let tl = ThreadLocal<Int64>()
let fut = spawn {
tl.set(0)
var t=0
for(_ in 0..1000){
t++
}
tl.set(t)
println("tl in spawn1 = ${tl.get().getOrThrow()}") //1000
}
let fut2 = spawn {
tl.set(0)
var t=0
for(_ in 0..10){
t++
}
tl.set(t)
println("tl in spawn2 = ${tl.get().getOrThrow()}") //10
}
fut.get()
fut2.get()
}