取號(hào)隊(duì)列顧名思義是取號(hào)排隊(duì)系統(tǒng)的實(shí)現(xiàn),取號(hào)排隊(duì)系統(tǒng)無(wú)論是現(xiàn)實(shí)空間還是計(jì)算機(jī)空間都是非常場(chǎng)景的一種模型,例如餐廳取號(hào)點(diǎn)餐,這里借用這個(gè)模型完美的可以在并發(fā)不加鎖情況下解決異步發(fā)送數(shù)據(jù)包順序化
取號(hào):顧客可以不阻塞的獲取號(hào)碼和點(diǎn)餐,長(zhǎng)連接發(fā)送數(shù)據(jù)可以不- 阻塞發(fā)送(會(huì)緩存一部分?jǐn)?shù)據(jù))
叫號(hào):顧客由餐廳前臺(tái)統(tǒng)一按照號(hào)碼順序叫號(hào),長(zhǎng)連接根據(jù)緩存數(shù)據(jù)序號(hào)順序發(fā)送
package utakenumberqueue
import (
"context"
"fmt"
"git.umu.work/be/goframework/logger"
"time"
)
type EndNumberValue struct{}
// TakeNumberQueue 取號(hào)隊(duì)列
type TakeNumberQueue struct {
offset int32
queue map[int32]interface{}
out chan interface{}
exit chan struct{}
timer *time.Timer
timeout time.Duration
}
func NewTaskNumberQueue(ctx context.Context, size int, timeout time.Duration) *TakeNumberQueue {
timer := time.NewTimer(timeout)
tq := &TakeNumberQueue{
offset: 0,
queue: make(map[int32]interface{}, 100),
out: make(chan interface{}, size),
exit: make(chan struct{}),
timer: timer,
timeout: timeout,
}
go func() {
for {
select {
case <-timer.C:
close(tq.exit)
err := tq.Close(ctx)
if err != nil {
logger.GetLogger(ctx).Warn(err.Error())
}
}
}
}()
return tq
}
func (t *TakeNumberQueue) checkIsEnd(ctx context.Context, value interface{}) bool {
select {
case <-t.exit:
return true
default:
_, ok := value.(EndNumberValue)
if ok {
return true
}
}
return false
}
// TakeNumber 取號(hào)
func (t *TakeNumberQueue) TakeNumber(ctx context.Context, number int32, value interface{}) error {
t.timer.Reset(t.timeout)
if number == t.offset {
isEnd := t.checkIsEnd(ctx, value)
if isEnd {
t.timer.Stop()
close(t.out)
return nil
}
t.out <- value
t.offset += 1
ok := true
// 自旋
var v interface{}
for ok {
v, ok = t.queue[t.offset]
if ok {
isEnd = t.checkIsEnd(ctx, value)
if isEnd {
logger.GetLogger(ctx).Info("task number queue is closed")
close(t.out)
return nil
}
t.out <- v
t.offset += 1
}
}
} else if number < t.offset {
logger.GetLogger(ctx).Debug(fmt.Sprintf("number %+v is less than offset %+v", number, t.offset))
isEnd := t.checkIsEnd(ctx, value)
if isEnd {
logger.GetLogger(ctx).Info("task number queue is closed")
close(t.out)
return nil
}
t.out <- value
} else {
t.queue[number] = value
}
return nil
}
// Call 叫號(hào)
func (t *TakeNumberQueue) Call(ctx context.Context) (<-chan interface{}, error) {
return t.out, nil
}
func (t *TakeNumberQueue) TakeEndNumber(ctx context.Context, number int32) error {
logger.GetLogger(ctx).Info(fmt.Sprintf("task number queue end number is %+v", number))
var end EndNumberValue = struct{}{}
err := t.TakeNumber(ctx, number, end)
if err != nil {
return err
}
return nil
}
// Close 關(guān)閉,退出queue
func (t *TakeNumberQueue) Close(ctx context.Context) error {
closeOutTimer := time.NewTimer(10 * time.Second)
select {
case _, ok := <-t.out:
if ok {
close(t.out)
}
case _, ok := <-t.exit:
if ok {
close(t.exit)
}
case <-closeOutTimer.C:
closeOutTimer.Stop()
}
return nil
}