Golang并發(fā)同步原語之-信號量Semaphore

信號量是并發(fā)編程中比較常見的一種同步機(jī)制,它會保持資源計(jì)數(shù)器一直在0-NN表示權(quán)重值大小,在用戶初始化時(shí)指定)之間。當(dāng)用戶獲取的時(shí)候會減少一會,使用完畢后再恢復(fù)過來。當(dāng)遇到請求時(shí)資源不夠的情況下,將會進(jìn)入休眠狀態(tài)以等待其它進(jìn)程釋放資源。

在 Golang 官方擴(kuò)展庫中為我們提供了一個(gè)基于權(quán)重的信號量 semaphore 并發(fā)原語。

你可以將下面的參數(shù) n 理解為資源權(quán)重總和,表示每次獲取時(shí)的權(quán)重;也可以理解為資源數(shù)量,表示每次獲取時(shí)必須一次性獲取的資源數(shù)量。為了理解方便,這里直接將其理解為資源數(shù)量。

數(shù)據(jù)結(jié)構(gòu)

semaphore.Weighted 結(jié)構(gòu)體

type waiter struct {
    n     int64
    ready chan<- struct{} // Closed when semaphore acquired.
}

// NewWeighted creates a new weighted semaphore with the given
// maximum combined weight for concurrent access.
func NewWeighted(n int64) *Weighted {
    w := &Weighted{size: n}
    return w
}

// Weighted provides a way to bound concurrent access to a resource.
// The callers can request access with a given weight.
type Weighted struct {
    size    int64
    cur     int64
    mu      sync.Mutex
    waiters list.List
}

一個(gè) watier 就表示一個(gè)請求,其中n表示這次請求的資源數(shù)量(權(quán)重)。

使用 NewWeighted() 函數(shù)創(chuàng)建一個(gè)并發(fā)訪問的最大資源數(shù),這里 n 表示資源個(gè)數(shù)。

Weighted 字段說明

  • size 表示最大資源數(shù)量,取走時(shí)會減少,釋放時(shí)會增加
  • cur 計(jì)數(shù)器,記錄當(dāng)前已使用資源數(shù),值范圍[0 - size]
  • mu
  • waiters 當(dāng)前處于等待休眠的請求者goroutine,每個(gè)請求者請求的資源數(shù)量可能不一樣,只有在請求時(shí),可用資源數(shù)量不足時(shí)請求者才會進(jìn)入請求鏈表,每個(gè)請求者表示一個(gè)goroutine

計(jì)數(shù)器 cur 會隨著資源的獲取和釋放而變化,那么為什么要引入數(shù)量(權(quán)重)這個(gè)概念呢?

方法列表

方法

  • NewWighted 方法用來創(chuàng)建一類資源,參數(shù) n 資源表示最大可用資源總個(gè)數(shù);
  • Acquire 獲取指定個(gè)數(shù)的資源,如果當(dāng)前沒有空閑資源可用,當(dāng)前請求者goroutine將陷入休眠狀態(tài);
  • Release 釋放資源
  • TryAcquireAcquire 一樣,但當(dāng)無空閑資源將直接返回false,而不阻塞。

獲取 Acquire 和 TryAcquire

對于獲取資源有兩種方法,分別為 Acquire()TryAcquire(),兩者的區(qū)別我們上面已介紹過。

在獲取和釋放資源前必須先加全局鎖。

獲取資源時(shí)根據(jù)空閑資源情況,可分為三種:

  • 有空閑資源可用,將返回nil,表示成功
  • 請求資源數(shù)量超出了初始化時(shí)指定的總數(shù)量,這個(gè)肯定永遠(yuǎn)也不可能執(zhí)行成功的,所以直接返回 ctx.Err()
  • 當(dāng)前空閑資源數(shù)量不足,需要等待其它goroutine對資源進(jìn)行釋放才可以運(yùn)行,這時(shí)將當(dāng)前請求者goroutine放入等待隊(duì)列。 這里再根據(jù)情況而定,具體見 select 判斷
// Acquire acquires the semaphore with a weight of n, blocking until resources
// are available or ctx is done. On success, returns nil. On failure, returns
// ctx.Err() and leaves the semaphore unchanged.
//
// If ctx is already done, Acquire may still succeed without blocking.
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
    // 有可用資源,直接成功返回nil
    s.mu.Lock()
    if s.size-s.cur >= n && s.waiters.Len() == 0 {
        s.cur += n
        s.mu.Unlock()
        return nil
    }

    // 請求資源權(quán)重遠(yuǎn)遠(yuǎn)超出了設(shè)置的最大權(quán)重和,失敗返回 ctx.Err()
    if n > s.size {
        // Don't make other Acquire calls block on one that's doomed to fail.
        s.mu.Unlock()
        <-ctx.Done()
        return ctx.Err()
    }

    // 有部分資源可用,將請求者放在等待隊(duì)列(頭部),并通過select 實(shí)現(xiàn)通知其它waiters
    ready := make(chan struct{})
    w := waiter{n: n, ready: ready}
    // 放入鏈表尾部,并返回放入的元素
    elem := s.waiters.PushBack(w)
    s.mu.Unlock()

    select {
    case <-ctx.Done():
        // 收到外面的控制信號
        err := ctx.Err()
        s.mu.Lock()
        select {
        case <-ready:
            // Acquired the semaphore after we were canceled.  Rather than trying to
            // fix up the queue, just pretend we didn't notice the cancelation.
            // 如果在用戶取消之前已經(jīng)獲取了資源,則直接忽略這個(gè)信號,返回nil表示成功
            err = nil
        default:
            // 收到控制信息,且還沒有獲取到資源,就直接將原來添加的 waiter 刪除
            isFront := s.waiters.Front() == elem

            // 則將其從鏈接刪除 上面 ctx.Done()
            s.waiters.Remove(elem)

            // 如果當(dāng)前元素正好位于鏈表最前面,且還存在可用的資源,就通知其它waiters
            if isFront && s.size > s.cur {
                s.notifyWaiters()
            }
        }
        s.mu.Unlock()
        return err

    case <-ready:
        return nil
    }
}

注意上面在select邏輯語句上面有一次加解鎖的操作,在 select 里面由于是全局鎖所以還需要再次加鎖。

根據(jù)可用計(jì)數(shù)器信息,可分三種情況:

  1. 對于 TryAcquire() 就比較簡單了,就是一個(gè)可用資源數(shù)量的判斷,數(shù)量夠用表示成功返回 true ,否則 false,此方法并不會進(jìn)行阻塞,而是直接返回。
// TryAcquire acquires the semaphore with a weight of n without blocking.
// On success, returns true. On failure, returns false and leaves the semaphore unchanged.
func (s *Weighted) TryAcquire(n int64) bool {
    s.mu.Lock()
    success := s.size-s.cur >= n && s.waiters.Len() == 0
    if success {
        s.cur += n
    }
    s.mu.Unlock()
    return success
}

釋放 Release

對于釋放也很簡單,就是將已使用資源數(shù)量(計(jì)數(shù)器)進(jìn)行更新減少,并通知其它 waiters。

// Release releases the semaphore with a weight of n.
func (s *Weighted) Release(n int64) {
    s.mu.Lock()
    s.cur -= n
    if s.cur < 0 {
        s.mu.Unlock()
        panic("semaphore: released more than held")
    }
    s.notifyWaiters()
    s.mu.Unlock()
}

通知機(jī)制

通過 for 循環(huán)從鏈表頭部開始頭部依次遍歷出鏈表中的所有waiter,并更新計(jì)數(shù)器 Weighted.cur,同時(shí)將其從鏈表中刪除,直到遇到 空閑資源數(shù)量 < watier.n 為止。

func (s *Weighted) notifyWaiters() {
    for {
        next := s.waiters.Front()
        if next == nil {
            break // No more waiters blocked.
        }

        w := next.Value.(waiter)
        if s.size-s.cur < w.n {
            // Not enough tokens for the next waiter.  We could keep going (to try to
            // find a waiter with a smaller request), but under load that could cause
            // starvation for large requests; instead, we leave all remaining waiters
            // blocked.
            //
            // Consider a semaphore used as a read-write lock, with N tokens, N
            // readers, and one writer.  Each reader can Acquire(1) to obtain a read
            // lock.  The writer can Acquire(N) to obtain a write lock, excluding all
            // of the readers.  If we allow the readers to jump ahead in the queue,
            // the writer will starve — there is always one token available for every
            // reader.
            break
        }

        s.cur += w.n
        s.waiters.Remove(next)
        close(w.ready)
    }
}

可以看到如果一個(gè)鏈表里有多個(gè)等待者,其中一個(gè)等待者需要的資源(權(quán)重)比較多的時(shí)候,當(dāng)前 watier 會出現(xiàn)長時(shí)間的阻塞(即使當(dāng)前可用資源足夠其它waiter執(zhí)行,期間會有一些資源浪費(fèi)), 直到有足夠的資源可以讓這個(gè)等待者執(zhí)行,然后繼續(xù)執(zhí)行它后面的等待者。

使用示例

官方文檔提供了一個(gè)基于信號量的典型的“工作池”模式,見https://pkg.go.dev/golang.org/x/sync/semaphore#example-package-WorkerPool,演示了如何通過信號量控制一定數(shù)量的 goroutine 并發(fā)工作。

這是一個(gè)通過信號量實(shí)現(xiàn)并發(fā)對 考拉茲猜想的示例,對1-32之間的數(shù)字進(jìn)行計(jì)算,并打印32個(gè)符合結(jié)果的值。

package main

import (
    "context"
    "fmt"
    "log"
    "runtime"

    "golang.org/x/sync/semaphore"
)

// Example_workerPool demonstrates how to use a semaphore to limit the number of
// goroutines working on parallel tasks.
//
// This use of a semaphore mimics a typical “worker pool” pattern, but without
// the need to explicitly shut down idle workers when the work is done.
func main() {
    ctx := context.TODO()

    // 權(quán)重值為邏輯cpu個(gè)數(shù)
    var (
        maxWorkers = runtime.GOMAXPROCS(0)
        sem        = semaphore.NewWeighted(int64(maxWorkers))
        out        = make([]int, 32)
    )

    // Compute the output using up to maxWorkers goroutines at a time.
    for i := range out {
        // When maxWorkers goroutines are in flight, Acquire blocks until one of the
        // workers finishes.
        if err := sem.Acquire(ctx, 1); err != nil {
            log.Printf("Failed to acquire semaphore: %v", err)
            break
        }

        go func(i int) {
            defer sem.Release(1)
            out[i] = collatzSteps(i + 1)
        }(i)
    }

    // 如果使用了 errgroup 原語則不需要下面這段語句
    if err := sem.Acquire(ctx, int64(maxWorkers)); err != nil {
        log.Printf("Failed to acquire semaphore: %v", err)
    }

    fmt.Println(out)

}

// collatzSteps computes the number of steps to reach 1 under the Collatz
// conjecture. (See https://en.wikipedia.org/wiki/Collatz_conjecture.)
func collatzSteps(n int) (steps int) {
    if n <= 0 {
        panic("nonpositive input")
    }

    for ; n > 1; steps++ {
        if steps < 0 {
            panic("too many steps")
        }

        if n%2 == 0 {
            n /= 2
            continue
        }

        const maxInt = int(^uint(0) >> 1)
        if n > (maxInt-1)/3 {
            panic("overflow")
        }
        n = 3*n + 1
    }

    return steps
}

上面先聲明了總權(quán)重值為邏輯CPU數(shù)量,每次 for 循環(huán)都會調(diào)用一次 sem.Acquire(ctx, 1), 即表示最多每個(gè)CPU可運(yùn)行一個(gè) goroutine,如果當(dāng)前權(quán)重值不足的話,其它groutine將處于阻塞狀態(tài),這里共循環(huán)32次,即阻塞數(shù)量最大為 32-maxWorkers

每獲取成功一個(gè)權(quán)重就會執(zhí)行g(shù)o匿名函數(shù),并在函數(shù)結(jié)束時(shí)釋放權(quán)重。為了保證每次for循環(huán)都會正常結(jié)束,最后調(diào)用了 sem.Acquire(ctx, int64(maxWorkers)) ,表示最后一次執(zhí)行必須獲取的權(quán)重值為 maxWorkers。當(dāng)然如果使用 errgroup 同步原語的話,這一步可以省略掉

以下為使用 errgroup 的方法

func main() {
    ctx := context.TODO()
    var (
        maxWorkers = runtime.GOMAXPROCS(0)
        sem        = semaphore.NewWeighted(int64(maxWorkers))
        out        = make([]int, 32)
    )

    group, _ := errgroup.WithContext(context.Background())
    for i := range out {
        if err := sem.Acquire(ctx, 1); err != nil {
            log.Printf("Failed to acquire semaphore: %v", err)
            break
        }
        group.Go(func() error {
            go func(i int) {
                defer sem.Release(1)
                out[i] = collatzSteps(i + 1)
            }(i)
            return nil
        })
    }

    // 這里會阻塞,直到所有g(shù)oroutine都執(zhí)行完畢
    if err := group.Wait(); err != nil {
        fmt.Println(err)
    }
    fmt.Println(out)
}

轉(zhuǎn)自 https://blog.haohtml.com/archives/25563

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容