取號(hào)隊(duì)列模型解決長(zhǎng)連接順序發(fā)送問(wèn)題

取號(hào)隊(duì)列顧名思義是取號(hào)排隊(duì)系統(tǒng)的實(shí)現(xiàn),取號(hào)排隊(duì)系統(tǒng)無(wú)論是現(xiàn)實(shí)空間還是計(jì)算機(jī)空間都是非常場(chǎng)景的一種模型,例如餐廳取號(hào)點(diǎn)餐,這里借用這個(gè)模型完美的可以在并發(fā)不加鎖情況下解決異步發(fā)送數(shù)據(jù)包順序化

  • 取號(hào):顧客可以不阻塞的獲取號(hào)碼和點(diǎn)餐,長(zhǎng)連接發(fā)送數(shù)據(jù)可以不- 阻塞發(fā)送(會(huì)緩存一部分?jǐn)?shù)據(jù))

  • 叫號(hào):顧客由餐廳前臺(tái)統(tǒng)一按照號(hào)碼順序叫號(hào),長(zhǎng)連接根據(jù)緩存數(shù)據(jù)序號(hào)順序發(fā)送

package utakenumberqueue

import (
    "context"
    "fmt"
    "git.umu.work/be/goframework/logger"
    "time"
)

type EndNumberValue struct{}

// TakeNumberQueue 取號(hào)隊(duì)列
type TakeNumberQueue struct {
    offset  int32
    queue   map[int32]interface{}
    out     chan interface{}
    exit    chan struct{}
    timer   *time.Timer
    timeout time.Duration
}

func NewTaskNumberQueue(ctx context.Context, size int, timeout time.Duration) *TakeNumberQueue {
    timer := time.NewTimer(timeout)
    tq := &TakeNumberQueue{
        offset:  0,
        queue:   make(map[int32]interface{}, 100),
        out:     make(chan interface{}, size),
        exit:    make(chan struct{}),
        timer:   timer,
        timeout: timeout,
    }
    go func() {
        for {
            select {
            case <-timer.C:
                close(tq.exit)
                err := tq.Close(ctx)
                if err != nil {
                    logger.GetLogger(ctx).Warn(err.Error())
                }
            }
        }
    }()

    return tq
}

func (t *TakeNumberQueue) checkIsEnd(ctx context.Context, value interface{}) bool {
    select {
    case <-t.exit:
        return true
    default:
        _, ok := value.(EndNumberValue)
        if ok {
            return true
        }
    }

    return false
}

// TakeNumber 取號(hào)
func (t *TakeNumberQueue) TakeNumber(ctx context.Context, number int32, value interface{}) error {
    t.timer.Reset(t.timeout)
    if number == t.offset {
        isEnd := t.checkIsEnd(ctx, value)
        if isEnd {
            t.timer.Stop()
            close(t.out)
            return nil
        }
        t.out <- value
        t.offset += 1
        ok := true
        // 自旋
        var v interface{}
        for ok {
            v, ok = t.queue[t.offset]
            if ok {
                isEnd = t.checkIsEnd(ctx, value)
                if isEnd {
                    logger.GetLogger(ctx).Info("task number queue is closed")
                    close(t.out)
                    return nil
                }
                t.out <- v
                t.offset += 1
            }
        }
    } else if number < t.offset {
        logger.GetLogger(ctx).Debug(fmt.Sprintf("number %+v is less than offset %+v", number, t.offset))
        isEnd := t.checkIsEnd(ctx, value)
        if isEnd {
            logger.GetLogger(ctx).Info("task number queue is closed")
            close(t.out)
            return nil
        }
        t.out <- value
    } else {
        t.queue[number] = value
    }

    return nil
}

// Call 叫號(hào)
func (t *TakeNumberQueue) Call(ctx context.Context) (<-chan interface{}, error) {
    return t.out, nil
}

func (t *TakeNumberQueue) TakeEndNumber(ctx context.Context, number int32) error {
    logger.GetLogger(ctx).Info(fmt.Sprintf("task number queue end number is %+v", number))
    var end EndNumberValue = struct{}{}
    err := t.TakeNumber(ctx, number, end)
    if err != nil {
        return err
    }

    return nil
}

// Close 關(guān)閉,退出queue
func (t *TakeNumberQueue) Close(ctx context.Context) error {
    closeOutTimer := time.NewTimer(10 * time.Second)
    select {
    case _, ok := <-t.out:
        if ok {
            close(t.out)
        }
    case _, ok := <-t.exit:
        if ok {
            close(t.exit)
        }
    case <-closeOutTimer.C:
        closeOutTimer.Stop()
    }

    return nil
}

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容