信號量是并發(fā)編程中比較常見的一種同步機(jī)制,它會保持資源計(jì)數(shù)器一直在0-N(N表示權(quán)重值大小,在用戶初始化時(shí)指定)之間。當(dāng)用戶獲取的時(shí)候會減少一會,使用完畢后再恢復(fù)過來。當(dāng)遇到請求時(shí)資源不夠的情況下,將會進(jìn)入休眠狀態(tài)以等待其它進(jìn)程釋放資源。
在 Golang 官方擴(kuò)展庫中為我們提供了一個(gè)基于權(quán)重的信號量 semaphore 并發(fā)原語。
你可以將下面的參數(shù) n 理解為資源權(quán)重總和,表示每次獲取時(shí)的權(quán)重;也可以理解為資源數(shù)量,表示每次獲取時(shí)必須一次性獲取的資源數(shù)量。為了理解方便,這里直接將其理解為資源數(shù)量。
數(shù)據(jù)結(jié)構(gòu)
semaphore.Weighted 結(jié)構(gòu)體
type waiter struct {
n int64
ready chan<- struct{} // Closed when semaphore acquired.
}
// NewWeighted creates a new weighted semaphore with the given
// maximum combined weight for concurrent access.
func NewWeighted(n int64) *Weighted {
w := &Weighted{size: n}
return w
}
// Weighted provides a way to bound concurrent access to a resource.
// The callers can request access with a given weight.
type Weighted struct {
size int64
cur int64
mu sync.Mutex
waiters list.List
}
一個(gè) watier 就表示一個(gè)請求,其中n表示這次請求的資源數(shù)量(權(quán)重)。
使用 NewWeighted() 函數(shù)創(chuàng)建一個(gè)并發(fā)訪問的最大資源數(shù),這里 n 表示資源個(gè)數(shù)。
Weighted 字段說明
-
size表示最大資源數(shù)量,取走時(shí)會減少,釋放時(shí)會增加 -
cur計(jì)數(shù)器,記錄當(dāng)前已使用資源數(shù),值范圍[0 - size] -
mu鎖 -
waiters當(dāng)前處于等待休眠的請求者goroutine,每個(gè)請求者請求的資源數(shù)量可能不一樣,只有在請求時(shí),可用資源數(shù)量不足時(shí)請求者才會進(jìn)入請求鏈表,每個(gè)請求者表示一個(gè)goroutine
計(jì)數(shù)器 cur 會隨著資源的獲取和釋放而變化,那么為什么要引入數(shù)量(權(quán)重)這個(gè)概念呢?
方法列表
方法
-
NewWighted方法用來創(chuàng)建一類資源,參數(shù)n資源表示最大可用資源總個(gè)數(shù); -
Acquire獲取指定個(gè)數(shù)的資源,如果當(dāng)前沒有空閑資源可用,當(dāng)前請求者goroutine將陷入休眠狀態(tài); -
Release釋放資源 -
TryAcquire同Acquire一樣,但當(dāng)無空閑資源將直接返回false,而不阻塞。
獲取 Acquire 和 TryAcquire
對于獲取資源有兩種方法,分別為 Acquire() 和 TryAcquire(),兩者的區(qū)別我們上面已介紹過。
在獲取和釋放資源前必須先加全局鎖。
獲取資源時(shí)根據(jù)空閑資源情況,可分為三種:
- 有空閑資源可用,將返回
nil,表示成功 - 請求資源數(shù)量超出了初始化時(shí)指定的總數(shù)量,這個(gè)肯定永遠(yuǎn)也不可能執(zhí)行成功的,所以直接返回
ctx.Err() - 當(dāng)前空閑資源數(shù)量不足,需要等待其它goroutine對資源進(jìn)行釋放才可以運(yùn)行,這時(shí)將當(dāng)前請求者goroutine放入等待隊(duì)列。 這里再根據(jù)情況而定,具體見 select 判斷
// Acquire acquires the semaphore with a weight of n, blocking until resources
// are available or ctx is done. On success, returns nil. On failure, returns
// ctx.Err() and leaves the semaphore unchanged.
//
// If ctx is already done, Acquire may still succeed without blocking.
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
// 有可用資源,直接成功返回nil
s.mu.Lock()
if s.size-s.cur >= n && s.waiters.Len() == 0 {
s.cur += n
s.mu.Unlock()
return nil
}
// 請求資源權(quán)重遠(yuǎn)遠(yuǎn)超出了設(shè)置的最大權(quán)重和,失敗返回 ctx.Err()
if n > s.size {
// Don't make other Acquire calls block on one that's doomed to fail.
s.mu.Unlock()
<-ctx.Done()
return ctx.Err()
}
// 有部分資源可用,將請求者放在等待隊(duì)列(頭部),并通過select 實(shí)現(xiàn)通知其它waiters
ready := make(chan struct{})
w := waiter{n: n, ready: ready}
// 放入鏈表尾部,并返回放入的元素
elem := s.waiters.PushBack(w)
s.mu.Unlock()
select {
case <-ctx.Done():
// 收到外面的控制信號
err := ctx.Err()
s.mu.Lock()
select {
case <-ready:
// Acquired the semaphore after we were canceled. Rather than trying to
// fix up the queue, just pretend we didn't notice the cancelation.
// 如果在用戶取消之前已經(jīng)獲取了資源,則直接忽略這個(gè)信號,返回nil表示成功
err = nil
default:
// 收到控制信息,且還沒有獲取到資源,就直接將原來添加的 waiter 刪除
isFront := s.waiters.Front() == elem
// 則將其從鏈接刪除 上面 ctx.Done()
s.waiters.Remove(elem)
// 如果當(dāng)前元素正好位于鏈表最前面,且還存在可用的資源,就通知其它waiters
if isFront && s.size > s.cur {
s.notifyWaiters()
}
}
s.mu.Unlock()
return err
case <-ready:
return nil
}
}
注意上面在select邏輯語句上面有一次加解鎖的操作,在 select 里面由于是全局鎖所以還需要再次加鎖。
根據(jù)可用計(jì)數(shù)器信息,可分三種情況:
- 對于 TryAcquire() 就比較簡單了,就是一個(gè)可用資源數(shù)量的判斷,數(shù)量夠用表示成功返回
true,否則false,此方法并不會進(jìn)行阻塞,而是直接返回。
// TryAcquire acquires the semaphore with a weight of n without blocking.
// On success, returns true. On failure, returns false and leaves the semaphore unchanged.
func (s *Weighted) TryAcquire(n int64) bool {
s.mu.Lock()
success := s.size-s.cur >= n && s.waiters.Len() == 0
if success {
s.cur += n
}
s.mu.Unlock()
return success
}
釋放 Release
對于釋放也很簡單,就是將已使用資源數(shù)量(計(jì)數(shù)器)進(jìn)行更新減少,并通知其它 waiters。
// Release releases the semaphore with a weight of n.
func (s *Weighted) Release(n int64) {
s.mu.Lock()
s.cur -= n
if s.cur < 0 {
s.mu.Unlock()
panic("semaphore: released more than held")
}
s.notifyWaiters()
s.mu.Unlock()
}
通知機(jī)制
通過 for 循環(huán)從鏈表頭部開始頭部依次遍歷出鏈表中的所有waiter,并更新計(jì)數(shù)器 Weighted.cur,同時(shí)將其從鏈表中刪除,直到遇到 空閑資源數(shù)量 < watier.n 為止。
func (s *Weighted) notifyWaiters() {
for {
next := s.waiters.Front()
if next == nil {
break // No more waiters blocked.
}
w := next.Value.(waiter)
if s.size-s.cur < w.n {
// Not enough tokens for the next waiter. We could keep going (to try to
// find a waiter with a smaller request), but under load that could cause
// starvation for large requests; instead, we leave all remaining waiters
// blocked.
//
// Consider a semaphore used as a read-write lock, with N tokens, N
// readers, and one writer. Each reader can Acquire(1) to obtain a read
// lock. The writer can Acquire(N) to obtain a write lock, excluding all
// of the readers. If we allow the readers to jump ahead in the queue,
// the writer will starve — there is always one token available for every
// reader.
break
}
s.cur += w.n
s.waiters.Remove(next)
close(w.ready)
}
}
可以看到如果一個(gè)鏈表里有多個(gè)等待者,其中一個(gè)等待者需要的資源(權(quán)重)比較多的時(shí)候,當(dāng)前 watier 會出現(xiàn)長時(shí)間的阻塞(即使當(dāng)前可用資源足夠其它waiter執(zhí)行,期間會有一些資源浪費(fèi)), 直到有足夠的資源可以讓這個(gè)等待者執(zhí)行,然后繼續(xù)執(zhí)行它后面的等待者。
使用示例
官方文檔提供了一個(gè)基于信號量的典型的“工作池”模式,見https://pkg.go.dev/golang.org/x/sync/semaphore#example-package-WorkerPool,演示了如何通過信號量控制一定數(shù)量的 goroutine 并發(fā)工作。
這是一個(gè)通過信號量實(shí)現(xiàn)并發(fā)對 考拉茲猜想的示例,對1-32之間的數(shù)字進(jìn)行計(jì)算,并打印32個(gè)符合結(jié)果的值。
package main
import (
"context"
"fmt"
"log"
"runtime"
"golang.org/x/sync/semaphore"
)
// Example_workerPool demonstrates how to use a semaphore to limit the number of
// goroutines working on parallel tasks.
//
// This use of a semaphore mimics a typical “worker pool” pattern, but without
// the need to explicitly shut down idle workers when the work is done.
func main() {
ctx := context.TODO()
// 權(quán)重值為邏輯cpu個(gè)數(shù)
var (
maxWorkers = runtime.GOMAXPROCS(0)
sem = semaphore.NewWeighted(int64(maxWorkers))
out = make([]int, 32)
)
// Compute the output using up to maxWorkers goroutines at a time.
for i := range out {
// When maxWorkers goroutines are in flight, Acquire blocks until one of the
// workers finishes.
if err := sem.Acquire(ctx, 1); err != nil {
log.Printf("Failed to acquire semaphore: %v", err)
break
}
go func(i int) {
defer sem.Release(1)
out[i] = collatzSteps(i + 1)
}(i)
}
// 如果使用了 errgroup 原語則不需要下面這段語句
if err := sem.Acquire(ctx, int64(maxWorkers)); err != nil {
log.Printf("Failed to acquire semaphore: %v", err)
}
fmt.Println(out)
}
// collatzSteps computes the number of steps to reach 1 under the Collatz
// conjecture. (See https://en.wikipedia.org/wiki/Collatz_conjecture.)
func collatzSteps(n int) (steps int) {
if n <= 0 {
panic("nonpositive input")
}
for ; n > 1; steps++ {
if steps < 0 {
panic("too many steps")
}
if n%2 == 0 {
n /= 2
continue
}
const maxInt = int(^uint(0) >> 1)
if n > (maxInt-1)/3 {
panic("overflow")
}
n = 3*n + 1
}
return steps
}
上面先聲明了總權(quán)重值為邏輯CPU數(shù)量,每次 for 循環(huán)都會調(diào)用一次 sem.Acquire(ctx, 1), 即表示最多每個(gè)CPU可運(yùn)行一個(gè) goroutine,如果當(dāng)前權(quán)重值不足的話,其它groutine將處于阻塞狀態(tài),這里共循環(huán)32次,即阻塞數(shù)量最大為 32-maxWorkers。
每獲取成功一個(gè)權(quán)重就會執(zhí)行g(shù)o匿名函數(shù),并在函數(shù)結(jié)束時(shí)釋放權(quán)重。為了保證每次for循環(huán)都會正常結(jié)束,最后調(diào)用了 sem.Acquire(ctx, int64(maxWorkers)) ,表示最后一次執(zhí)行必須獲取的權(quán)重值為 maxWorkers。當(dāng)然如果使用 errgroup 同步原語的話,這一步可以省略掉
以下為使用 errgroup 的方法
func main() {
ctx := context.TODO()
var (
maxWorkers = runtime.GOMAXPROCS(0)
sem = semaphore.NewWeighted(int64(maxWorkers))
out = make([]int, 32)
)
group, _ := errgroup.WithContext(context.Background())
for i := range out {
if err := sem.Acquire(ctx, 1); err != nil {
log.Printf("Failed to acquire semaphore: %v", err)
break
}
group.Go(func() error {
go func(i int) {
defer sem.Release(1)
out[i] = collatzSteps(i + 1)
}(i)
return nil
})
}
// 這里會阻塞,直到所有g(shù)oroutine都執(zhí)行完畢
if err := group.Wait(); err != nil {
fmt.Println(err)
}
fmt.Println(out)
}
轉(zhuǎn)自 https://blog.haohtml.com/archives/25563