溫馨提示:覺得有幫助的話, 給我點贊哦~
1. Go消費隊列實現(xiàn)
package libs
import (
"sync"
"time"
)
// QueueItem 隊列元素
type QueueItem = interface{}
// QueueConsumer 隊列消費者
type QueueConsumer func(QueueItem)
type Queue struct {
sync.RWMutex
// 隊列元素
items []QueueItem
// 當前并發(fā)數(shù)
count int
// 并發(fā)控制
max int
// 是否運行
running bool
}
// Enqueue 入隊
func (q *Queue) Enqueue(v ...QueueItem) {
q.Lock()
q.items = append(q.items, v...)
q.Unlock()
}
// Dequeue 出隊
func (q *Queue) Dequeue() QueueItem {
q.Lock()
defer q.Unlock()
if len(q.items) == 0 {
return nil
}
// 取出第一個元素
v := q.items[0]
// 調(diào)整隊列
q.items = q.items[1:]
return v
}
// Size 隊列大小
func (q *Queue) Size() int {
q.RLock()
defer q.RUnlock()
return len(q.items)
}
// Done 完成
func (q *Queue) Done() {
q.Lock()
defer q.Unlock()
if q.count > 0 {
q.count--
}
}
// Max 設置最大并發(fā)消費
func (q *Queue) Max(max int) {
if !q.running {
q.max = max
}
}
// Run 運行隊列
func (q *Queue) Run(consumer QueueConsumer) {
if q.running {
return
}
q.running = true
ticker := time.NewTicker(time.Millisecond)
// 監(jiān)聽隊列
for range ticker.C {
q.worker(consumer)
}
}
func (q *Queue) worker(consumer QueueConsumer) {
q.RLock()
// 剩余協(xié)程數(shù)
num := q.max - q.count
q.RUnlock()
if num > 0 {
for i := 0;i < num;i++ {
if v := q.Dequeue(); v != nil {
q.Lock()
q.count++
q.Unlock()
go consumer(v)
}
}
}
}
func NewQueue() *Queue {
return &Queue{
max: 2,
running: false,
}
}
2. 測試例子
// queue_test.go
func TestQueue(t *testing.T) {
q := NewQueue()
items := []QueueItem{1,2,3,4,5,6,7,8,9, 0}
// 初始生成數(shù)據(jù)
q.Enqueue(items...)
ticker := time.NewTicker(time.Second)
go func() {
// 模擬生成數(shù)據(jù)
for range ticker.C {
q.Enqueue(items...)
}
}()
// 隊列消費者
consumer := func(item QueueItem) {
t.Logf("=>: %v", item)
// 注意: 消費完成要調(diào)用
q.Done()
}
q.Run(consumer)
}
溫馨提示:覺得有幫助的話, 給我點贊哦~