go 限速與限流

限速方式

  1. 漏桶算法: 講究的是服務(wù)器勻速的去處理并發(fā)請(qǐng)求,但... 為達(dá)到目的居然采用sleep了。簡(jiǎn)單來(lái)說(shuō)服務(wù)器勻速處理請(qǐng)求,超過桶容量會(huì)被舍棄
  2. 令牌桶算法:拿到令牌的請(qǐng)求被處理,否則被舍棄。在桶里面的令牌被拿光了的時(shí)候,此時(shí)就是一邊生產(chǎn)令牌一邊消耗令牌了,這種場(chǎng)景下也勻速了。存峰值,峰值為桶容量 + 消耗此容量所需時(shí)間產(chǎn)生的新token。

概念

  1. 熔斷 與 限速 與 限流,過載 你得清楚它們的意思。
  2. 服務(wù)熔斷:對(duì)上游服務(wù)的保護(hù)。打比方你的 A服務(wù)調(diào)用上游 B服務(wù),并發(fā)來(lái)了,A發(fā)現(xiàn)B返回的數(shù)據(jù)不正常,A于是不再掉B 給它10S 緩沖期,那么不掉B的這10S發(fā)生的過程就是熔斷。像保險(xiǎn)絲
  3. 服務(wù)過載:指的是自身服務(wù)流量過大,這個(gè)時(shí)候需要考慮限速或者限流對(duì)自己進(jìn)行保護(hù)
  4. 限速是限制流量流入的速度,才不會(huì)管你服務(wù)器 hang了沒。 打比方你服務(wù)器正常情況下,10萬(wàn)qps,現(xiàn)在來(lái)了100萬(wàn)流量并將持續(xù)1小時(shí),由于限速的作用并沒有一下打死你的服務(wù)器,然后你的服務(wù)器任然以10萬(wàn)的qps提供服務(wù),過30分鐘后突然db出現(xiàn)慢查詢了,請(qǐng)注意這個(gè)時(shí)候相當(dāng)于你服務(wù)器每秒qps沒有10萬(wàn)了,但是限速限制的還是10萬(wàn),這個(gè)時(shí)候很可能你服務(wù)器即將gg。。。
  5. 限流著重是控制并發(fā)的最大流量。像資源池一樣,并發(fā)到了設(shè)定的100個(gè),那么不再接受請(qǐng)求,除非有請(qǐng)求處理完畢把資源放回了資源池。

基于 tollbooth 實(shí)現(xiàn)

如果要講究開箱機(jī)即用,用這個(gè)開源組件去做http限速你只要按著demo稍微配置下。

  1. 可以配置只針對(duì)GET或POST 類型請(qǐng)求做限制
  2. 可以配置只針對(duì)ip 做限制
  3. 可以配置只針對(duì)請(qǐng)求頭中帶有某種特定標(biāo)識(shí)的請(qǐng)求做限制(不重要的服務(wù)熔斷可以采用它)
  4. 文檔中有g(shù)in,echo等http的使用該組件的文檔
  5. 可以開發(fā)成中間件
  6. 可以做到定時(shí)清理計(jì)數(shù)~
  7. 可以設(shè)置丟棄返回值
    ......
package main

import (
    "github.com/didip/tollbooth"
    "net/http"
)

func HelloHandler(w http.ResponseWriter, req *http.Request) {
    w.Write([]byte("Hello, World!"))
}

func main() {
    // Create a request limiter per handler.
    http.Handle("/", tollbooth.LimitFuncHandler(tollbooth.NewLimiter(1, nil), HelloHandler))
    http.ListenAndServe(":12345", nil)
}

探索 golang.org/x/time/rate

令牌桶這個(gè)算法


image.png

精簡(jiǎn)版:一個(gè)gorontinue定時(shí)往里面塞,所有的請(qǐng)求想要被響應(yīng)必須先去channel取token,沒取到的丟棄。
但感覺golang.org/x/time/rate 實(shí)現(xiàn)方式巧。它是直接通過計(jì)算的一個(gè)計(jì)算算法表達(dá)出token的過程。

使用demo


package main

import (
    "fmt"
    "golang.org/x/time/rate"
    "time"
)

const (
    speed = 1  //每秒執(zhí)行的次數(shù)
    capacity = 10 //桶的容量大小
)

var gameScene = rate.NewLimiter(speed , capacity )

func main() {

    for i:=0;i<100;i++{
        k :=i
        if isGameSceneAllow(){
            fmt.Println("我是被接受的請(qǐng)求",time.Now().Unix(),k)
        }
    }


    //9秒鐘sleep,忽略代碼執(zhí)行時(shí)間,那么將會(huì)產(chǎn)生9個(gè)
    time.Sleep(time.Second * 9)


    //以下打印9個(gè),則證明限速起作用了
    for i:=0;i<100;i++{
        k :=i
        if isGameSceneAllow(){
            fmt.Println("我是被接受的請(qǐng)求2222",time.Now().Unix(),k)
        }
    }

    time.Sleep(time.Second * 9)

}

func isGameSceneAllow()(b bool){
    if gameScene.Allow() == false {
        return
    }
    b =true
    return
}

go-zero 結(jié)合redis+lua 做的分布式限速控制

文檔地址:https://www.yuque.com/tal-tech/go-zero/gobn7v
github: https://github.com/tal-tech/go-zero

package main

import (
    "flag"
    "fmt"
    "log"
    "runtime"
    "strconv"
    "sync"
    "sync/atomic"
    "time"

    "github.com/tal-tech/go-zero/core/limit"
    "github.com/tal-tech/go-zero/core/stores/redis"
)

const seconds = 5

var (
    rdx     = flag.String("redis", "localhost:6379", "the redis, default localhost:6379")
    rdxType = flag.String("redisType", "node", "the redis type, default node")
    rdxPass = flag.String("redisPass", "", "the redis password")
    rdxKey  = flag.String("redisKey", "rate", "the redis key, default rate")
    threads = flag.Int("threads", runtime.NumCPU(), "the concurrent threads, default to cores")
)

func main() {
    flag.Parse()

    store := redis.NewRedis(*rdx, *rdxType, *rdxPass)
    fmt.Println(store.Ping())
    lmt := limit.NewPeriodLimit(seconds, 5, store, *rdxKey)
    timer := time.NewTimer(time.Second * seconds)
    quit := make(chan struct{})
    defer timer.Stop()
    go func() {
        <-timer.C
        close(quit)
    }()

    var allowed, denied int32
    var wait sync.WaitGroup
    for i := 0; i < *threads; i++ {
        wait.Add(1)
        go func() {
            for {
                select {
                case <-quit:
                    wait.Done()
                    return
                default:
                    if v, err := lmt.Take(strconv.FormatInt(int64(i), 10)); err == nil && v == limit.Allowed {
                        atomic.AddInt32(&allowed, 1)
                    } else if err != nil {
                        log.Fatal(err)
                    } else {
                        atomic.AddInt32(&denied, 1)
                    }
                }
            }
        }()
    }

    wait.Wait()
    fmt.Printf("allowed: %d, denied: %d, qps: %d\n", allowed, denied, (allowed+denied)/seconds)
}

限流

著重點(diǎn)是去限制你的服務(wù)器并發(fā)處理請(qǐng)求的能力。打比方你的服務(wù)器最多同時(shí)處理1萬(wàn)個(gè)請(qǐng)求,它的出現(xiàn)就是同時(shí)處理1萬(wàn)個(gè)請(qǐng)求,請(qǐng)求處理完畢資源就會(huì)被釋放,就可以讓新的流量進(jìn)入。

package main

import (
    "log"
    "net/http"
    "text/template"
    "time"

    "github.com/julienschmidt/httprouter"
)

type middleWareHandler struct {
    r *httprouter.Router
    l *ConnLimiter
}

//NewMiddleWareHandler ...
func NewMiddleWareHandler(r *httprouter.Router, cc int) http.Handler {
    m := middleWareHandler{}
    m.r = r
    m.l = NewConnLimiter(cc)
    return m
}

func (m middleWareHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    if !m.l.GetConn() {
        defer func() { recover() }()
        log.Panicln("Too many requests")
        return
    }
    m.r.ServeHTTP(w, r)
    defer m.l.ReleaseConn()
}

//RegisterHandlers ...
func RegisterHandlers() *httprouter.Router {
    router := httprouter.New()
    router.GET("/ce", ce)
    return router
}

func ce(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
    //為了演示效果這塊設(shè)置了等待
    time.Sleep(time.Second * 100)
    t, _ := template.ParseFiles("./videos/ce.html")
    t.Execute(w, nil)
}

func main() {
    r := RegisterHandlers()
    //里面的參數(shù)2為設(shè)置的最大流量
    mh := NewMiddleWareHandler(r, 2)
    http.ListenAndServe(":9000", mh)
}


//ConnLimiter 定義一個(gè)結(jié)構(gòu)體
type ConnLimiter struct {
    concurrentConn int
    bucket         chan int
}

//NewConnLimiter ...
func NewConnLimiter(cc int) *ConnLimiter {
    return &ConnLimiter{
        concurrentConn: cc,
        bucket:         make(chan int, cc),
    }
}

//GetConn 獲取通道里面的值
func (cl *ConnLimiter) GetConn() bool {
    if len(cl.bucket) >= cl.concurrentConn {
        log.Printf("Reached the rate limitation.")
        return false
    }

    cl.bucket <- 1
    return true
}

//ReleaseConn 釋放通道里面的值
func (cl *ConnLimiter) ReleaseConn() {
    c := <-cl.bucket
    log.Printf("New connction coming: %d", c)
}

文獻(xiàn)

golang版本實(shí)現(xiàn)限速參考
算法介紹
tollbooth 一個(gè)開箱即用的限速項(xiàng)目
uber漏銅
限速

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容