消息中間件的pull與push
消息中間件的實(shí)現(xiàn)無(wú)非兩種套路,一種讓客戶端pull,典型的比如kafka便是如此,而另一種則是push,也就是讓客戶端不需要做任何操作,只需要連接上服務(wù)端便可以源源不斷收到服務(wù)端的推送,典型的代表就是我們今天介紹的nsq。
pull的優(yōu)勢(shì)在于客戶端可以自己做流控,比如客戶端想什么時(shí)候pull就什么時(shí)候pull,不會(huì)因?yàn)榉?wù)端的強(qiáng)迫而接受,但劣勢(shì)也很明顯,如果服務(wù)端的生產(chǎn)速度很慢,客戶端需要不斷的輪詢會(huì)讓cpu處于繁忙且無(wú)用的狀態(tài)。
push的優(yōu)勢(shì)則在于能夠不受限于客戶端的速度,可以讓服務(wù)端更快的、批量的把數(shù)據(jù)push給客戶端,因此大部分push實(shí)現(xiàn)的消息中間件都是屬于內(nèi)存型,而nsq比較特殊,它實(shí)際上是內(nèi)存+磁盤的一個(gè)消息中間件。
push流的nsq如何做流控
上面也說(shuō)了,pull流的優(yōu)勢(shì)在于可以讓客戶端自由控制消息的速度,但是push流不一樣,push流不管客戶端是否多繁忙都會(huì)推送消息,如果沒(méi)有一個(gè)流控機(jī)制,很容易讓客戶端最終因?yàn)橄M(fèi)速度跟不上導(dǎo)致產(chǎn)生各種性能問(wèn)題。nsq其實(shí)也考慮到這一點(diǎn),于是采用了一個(gè)RDY的狀態(tài)字段來(lái)表示流控。簡(jiǎn)單來(lái)說(shuō),就是客戶端連接上nsqd之后,會(huì)告訴nsqd它的可接受的消息數(shù)量是多少,每當(dāng)nsqd給客戶端推送一條消息這個(gè)RDY就會(huì)減一,而客戶端消費(fèi)完畢并且發(fā)送一個(gè)FIN之后,這個(gè)RDY又會(huì)加一(其實(shí)這個(gè)設(shè)計(jì)有點(diǎn)類似tcp中的用來(lái)控制流量的窗口機(jī)制)
go-nsq客戶端
我們來(lái)參考一下golang官方實(shí)現(xiàn)的nsq客戶端是如何控制這個(gè)rdy的。
首先編寫一個(gè)客戶端:
type Customter struct {}
func (c *Customter) HandleMessage(msg *nsq.Message) error {
fmt.Println("receive: ", string(msg.Body))
return nil
}
func main() {
cfg := nsq.NewConfig()
cfg.LookupdPollInterval = time.Second
customer, err := nsq.NewConsumer("test", "t1", cfg)
if err != nil {
log.Panic(err)
}
customer.AddHandler(&Customter{})
if err := customer.ConnectToNSQD("127.0.0.1:4161"); err != nil {
log.Panic(err)
}
select {}
}
跳進(jìn)源碼,可以看到go-nsq的Consumer結(jié)構(gòu)體有一個(gè)字段connections:
// github.com/nsqio/go-nsq/customer.go
type Consumer struct {
.....
connections map[string]*Conn
....
}
=
當(dāng)我們上面的demo調(diào)用ConnectToNSQD的時(shí)候,這個(gè)connections的map會(huì)寫入對(duì)應(yīng)的nsqd addr作為key,連接成功的Conn作為value:
r.connections[addr] = conn
for _, c := range r.conns() {
r.maybeUpdateRDY(c)
}
上面代碼表示會(huì)遍歷這個(gè)Customer的所有nsqd conn(customer可以同時(shí)連接多個(gè)nsqd),然后調(diào)用maybeUpdateRDY這個(gè)方法:
// 當(dāng)剩余rdy的數(shù)量等于1,或者少于最近一次的rdycount的25%,就調(diào)整這個(gè)rdycount,這個(gè)rdycount就取用戶設(shè)置的MaxInFlight
if remain <= 1 || remain < (lastRdyCount/4) || (count > 0 && count < remain) {
r.log(LogLevelDebug, "(%s) sending RDY %d (%d remain from last RDY %d)",
conn, count, remain, lastRdyCount)
r.updateRDY(conn, count)
} else {
由此我們可以得知,nsqd的客戶端在連接nsqd的時(shí)候就會(huì)設(shè)置一個(gè)初始的rdycount。當(dāng)然,在連接成功之后,也會(huì)有一個(gè)gorountine后臺(tái)不斷去調(diào)整這個(gè)rdycount:
func (r *Consumer) rdyLoop() {
redistributeTicker := time.NewTicker(r.config.RDYRedistributeInterval)
for {
select {
case <-redistributeTicker.C:
r.redistributeRDY()
case <-r.exitChan:
goto exit
}
}
exit:
redistributeTicker.Stop()
r.log(LogLevelInfo, "rdyLoop exiting")
r.wg.Done()
}