Go消費隊列Queue并監(jiān)聽變化

溫馨提示:覺得有幫助的話, 給我點贊哦~

1. Go消費隊列實現(xiàn)

package libs

import (
    "sync"
    "time"
)

// QueueItem 隊列元素
type QueueItem = interface{}

// QueueConsumer 隊列消費者
type QueueConsumer func(QueueItem)

type Queue struct {
    sync.RWMutex
    // 隊列元素
    items []QueueItem
    // 當前并發(fā)數(shù)
    count int
    // 并發(fā)控制
    max int
    // 是否運行
    running bool
}

// Enqueue 入隊
func (q *Queue) Enqueue(v ...QueueItem) {
    q.Lock()
    q.items = append(q.items, v...)
    q.Unlock()
}

// Dequeue 出隊
func (q *Queue) Dequeue() QueueItem {
    q.Lock()
    defer q.Unlock()

    if len(q.items) == 0 {
        return nil
    }

    // 取出第一個元素
    v := q.items[0]
    // 調(diào)整隊列
    q.items = q.items[1:]

    return v
}

// Size 隊列大小
func (q *Queue) Size() int {
    q.RLock()
    defer q.RUnlock()

    return len(q.items)
}

// Done 完成
func (q *Queue) Done() {
    q.Lock()
    defer q.Unlock()

    if q.count > 0 {
        q.count--
    }
}

// Max 設置最大并發(fā)消費
func (q *Queue) Max(max int) {
    if !q.running {
        q.max = max
    }
}

// Run 運行隊列
func (q *Queue) Run(consumer QueueConsumer) {
    if q.running {
        return
    }

    q.running = true

    ticker := time.NewTicker(time.Millisecond)

    // 監(jiān)聽隊列
    for range ticker.C {
        q.worker(consumer)
    }
}

func (q *Queue) worker(consumer QueueConsumer) {
    q.RLock()
    // 剩余協(xié)程數(shù)
    num := q.max - q.count
    q.RUnlock()

    if num > 0 {
        for i := 0;i < num;i++ {
            if v := q.Dequeue(); v != nil {
                q.Lock()
                q.count++
                q.Unlock()

                go consumer(v)
            }
        }
    }
}

func NewQueue() *Queue {
    return &Queue{
        max: 2,
        running: false,
    }
}

2. 測試例子

// queue_test.go
func TestQueue(t *testing.T) {
    q := NewQueue()

    items := []QueueItem{1,2,3,4,5,6,7,8,9, 0}

       // 初始生成數(shù)據(jù)
    q.Enqueue(items...)

    ticker := time.NewTicker(time.Second)

    go func() {
               // 模擬生成數(shù)據(jù)
        for range ticker.C {
            q.Enqueue(items...)
        }
    }()

       // 隊列消費者
       consumer := func(item QueueItem) {
              t.Logf("=>: %v", item)

              // 注意: 消費完成要調(diào)用
              q.Done()
    }

    q.Run(consumer)
}

溫馨提示:覺得有幫助的話, 給我點贊哦~

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。
禁止轉(zhuǎn)載,如需轉(zhuǎn)載請通過簡信或評論聯(lián)系作者。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容