限速方式
- 漏桶算法: 講究的是服務(wù)器勻速的去處理并發(fā)請(qǐng)求,但... 為達(dá)到目的居然采用sleep了。簡(jiǎn)單來(lái)說(shuō)服務(wù)器勻速處理請(qǐng)求,超過桶容量會(huì)被舍棄
- 令牌桶算法:拿到令牌的請(qǐng)求被處理,否則被舍棄。在桶里面的令牌被拿光了的時(shí)候,此時(shí)就是一邊生產(chǎn)令牌一邊消耗令牌了,這種場(chǎng)景下也勻速了。存峰值,峰值為桶容量 + 消耗此容量所需時(shí)間產(chǎn)生的新token。
概念
- 熔斷 與 限速 與 限流,過載 你得清楚它們的意思。
- 服務(wù)熔斷:對(duì)上游服務(wù)的保護(hù)。打比方你的 A服務(wù)調(diào)用上游 B服務(wù),并發(fā)來(lái)了,A發(fā)現(xiàn)B返回的數(shù)據(jù)不正常,A于是不再掉B 給它10S 緩沖期,那么不掉B的這10S發(fā)生的過程就是熔斷。像保險(xiǎn)絲
- 服務(wù)過載:指的是自身服務(wù)流量過大,這個(gè)時(shí)候需要考慮限速或者限流對(duì)自己進(jìn)行保護(hù)
- 限速是限制流量流入的速度,才不會(huì)管你服務(wù)器 hang了沒。 打比方你服務(wù)器正常情況下,10萬(wàn)qps,現(xiàn)在來(lái)了100萬(wàn)流量并將持續(xù)1小時(shí),由于限速的作用并沒有一下打死你的服務(wù)器,然后你的服務(wù)器任然以10萬(wàn)的qps提供服務(wù),過30分鐘后突然db出現(xiàn)慢查詢了,
請(qǐng)注意這個(gè)時(shí)候相當(dāng)于你服務(wù)器每秒qps沒有10萬(wàn)了,但是限速限制的還是10萬(wàn),這個(gè)時(shí)候很可能你服務(wù)器即將gg。。。 - 限流著重是控制并發(fā)的最大流量。像資源池一樣,并發(fā)到了設(shè)定的100個(gè),那么不再接受請(qǐng)求,除非有請(qǐng)求處理完畢把資源放回了資源池。
基于 tollbooth 實(shí)現(xiàn)
如果要講究開箱機(jī)即用,用這個(gè)開源組件去做http限速你只要按著demo稍微配置下。
- 可以配置只針對(duì)GET或POST 類型請(qǐng)求做限制
- 可以配置只針對(duì)ip 做限制
- 可以配置只針對(duì)請(qǐng)求頭中帶有某種特定標(biāo)識(shí)的請(qǐng)求做限制(不重要的服務(wù)熔斷可以采用它)
- 文檔中有g(shù)in,echo等http的使用該組件的文檔
- 可以開發(fā)成中間件
- 可以做到定時(shí)清理計(jì)數(shù)~
- 可以設(shè)置丟棄返回值
......
package main
import (
"github.com/didip/tollbooth"
"net/http"
)
func HelloHandler(w http.ResponseWriter, req *http.Request) {
w.Write([]byte("Hello, World!"))
}
func main() {
// Create a request limiter per handler.
http.Handle("/", tollbooth.LimitFuncHandler(tollbooth.NewLimiter(1, nil), HelloHandler))
http.ListenAndServe(":12345", nil)
}
探索 golang.org/x/time/rate
令牌桶這個(gè)算法

image.png
精簡(jiǎn)版:一個(gè)gorontinue定時(shí)往里面塞,所有的請(qǐng)求想要被響應(yīng)必須先去channel取token,沒取到的丟棄。
但感覺golang.org/x/time/rate 實(shí)現(xiàn)方式巧。它是直接通過計(jì)算的一個(gè)計(jì)算算法表達(dá)出token的過程。
使用demo
package main
import (
"fmt"
"golang.org/x/time/rate"
"time"
)
const (
speed = 1 //每秒執(zhí)行的次數(shù)
capacity = 10 //桶的容量大小
)
var gameScene = rate.NewLimiter(speed , capacity )
func main() {
for i:=0;i<100;i++{
k :=i
if isGameSceneAllow(){
fmt.Println("我是被接受的請(qǐng)求",time.Now().Unix(),k)
}
}
//9秒鐘sleep,忽略代碼執(zhí)行時(shí)間,那么將會(huì)產(chǎn)生9個(gè)
time.Sleep(time.Second * 9)
//以下打印9個(gè),則證明限速起作用了
for i:=0;i<100;i++{
k :=i
if isGameSceneAllow(){
fmt.Println("我是被接受的請(qǐng)求2222",time.Now().Unix(),k)
}
}
time.Sleep(time.Second * 9)
}
func isGameSceneAllow()(b bool){
if gameScene.Allow() == false {
return
}
b =true
return
}
go-zero 結(jié)合redis+lua 做的分布式限速控制
文檔地址:https://www.yuque.com/tal-tech/go-zero/gobn7v
github: https://github.com/tal-tech/go-zero
package main
import (
"flag"
"fmt"
"log"
"runtime"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/tal-tech/go-zero/core/limit"
"github.com/tal-tech/go-zero/core/stores/redis"
)
const seconds = 5
var (
rdx = flag.String("redis", "localhost:6379", "the redis, default localhost:6379")
rdxType = flag.String("redisType", "node", "the redis type, default node")
rdxPass = flag.String("redisPass", "", "the redis password")
rdxKey = flag.String("redisKey", "rate", "the redis key, default rate")
threads = flag.Int("threads", runtime.NumCPU(), "the concurrent threads, default to cores")
)
func main() {
flag.Parse()
store := redis.NewRedis(*rdx, *rdxType, *rdxPass)
fmt.Println(store.Ping())
lmt := limit.NewPeriodLimit(seconds, 5, store, *rdxKey)
timer := time.NewTimer(time.Second * seconds)
quit := make(chan struct{})
defer timer.Stop()
go func() {
<-timer.C
close(quit)
}()
var allowed, denied int32
var wait sync.WaitGroup
for i := 0; i < *threads; i++ {
wait.Add(1)
go func() {
for {
select {
case <-quit:
wait.Done()
return
default:
if v, err := lmt.Take(strconv.FormatInt(int64(i), 10)); err == nil && v == limit.Allowed {
atomic.AddInt32(&allowed, 1)
} else if err != nil {
log.Fatal(err)
} else {
atomic.AddInt32(&denied, 1)
}
}
}
}()
}
wait.Wait()
fmt.Printf("allowed: %d, denied: %d, qps: %d\n", allowed, denied, (allowed+denied)/seconds)
}
限流
著重點(diǎn)是去限制你的服務(wù)器并發(fā)處理請(qǐng)求的能力。打比方你的服務(wù)器最多同時(shí)處理1萬(wàn)個(gè)請(qǐng)求,它的出現(xiàn)就是同時(shí)處理1萬(wàn)個(gè)請(qǐng)求,請(qǐng)求處理完畢資源就會(huì)被釋放,就可以讓新的流量進(jìn)入。
package main
import (
"log"
"net/http"
"text/template"
"time"
"github.com/julienschmidt/httprouter"
)
type middleWareHandler struct {
r *httprouter.Router
l *ConnLimiter
}
//NewMiddleWareHandler ...
func NewMiddleWareHandler(r *httprouter.Router, cc int) http.Handler {
m := middleWareHandler{}
m.r = r
m.l = NewConnLimiter(cc)
return m
}
func (m middleWareHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if !m.l.GetConn() {
defer func() { recover() }()
log.Panicln("Too many requests")
return
}
m.r.ServeHTTP(w, r)
defer m.l.ReleaseConn()
}
//RegisterHandlers ...
func RegisterHandlers() *httprouter.Router {
router := httprouter.New()
router.GET("/ce", ce)
return router
}
func ce(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
//為了演示效果這塊設(shè)置了等待
time.Sleep(time.Second * 100)
t, _ := template.ParseFiles("./videos/ce.html")
t.Execute(w, nil)
}
func main() {
r := RegisterHandlers()
//里面的參數(shù)2為設(shè)置的最大流量
mh := NewMiddleWareHandler(r, 2)
http.ListenAndServe(":9000", mh)
}
//ConnLimiter 定義一個(gè)結(jié)構(gòu)體
type ConnLimiter struct {
concurrentConn int
bucket chan int
}
//NewConnLimiter ...
func NewConnLimiter(cc int) *ConnLimiter {
return &ConnLimiter{
concurrentConn: cc,
bucket: make(chan int, cc),
}
}
//GetConn 獲取通道里面的值
func (cl *ConnLimiter) GetConn() bool {
if len(cl.bucket) >= cl.concurrentConn {
log.Printf("Reached the rate limitation.")
return false
}
cl.bucket <- 1
return true
}
//ReleaseConn 釋放通道里面的值
func (cl *ConnLimiter) ReleaseConn() {
c := <-cl.bucket
log.Printf("New connction coming: %d", c)
}
文獻(xiàn)
golang版本實(shí)現(xiàn)限速參考
算法介紹
tollbooth 一個(gè)開箱即用的限速項(xiàng)目
uber漏銅
限速