NSQ學(xué)習(xí):流控的實(shí)現(xiàn)

消息中間件的pull與push

消息中間件的實(shí)現(xiàn)無(wú)非兩種套路,一種讓客戶端pull,典型的比如kafka便是如此,而另一種則是push,也就是讓客戶端不需要做任何操作,只需要連接上服務(wù)端便可以源源不斷收到服務(wù)端的推送,典型的代表就是我們今天介紹的nsq。
pull的優(yōu)勢(shì)在于客戶端可以自己做流控,比如客戶端想什么時(shí)候pull就什么時(shí)候pull,不會(huì)因?yàn)榉?wù)端的強(qiáng)迫而接受,但劣勢(shì)也很明顯,如果服務(wù)端的生產(chǎn)速度很慢,客戶端需要不斷的輪詢會(huì)讓cpu處于繁忙且無(wú)用的狀態(tài)。
push的優(yōu)勢(shì)則在于能夠不受限于客戶端的速度,可以讓服務(wù)端更快的、批量的把數(shù)據(jù)push給客戶端,因此大部分push實(shí)現(xiàn)的消息中間件都是屬于內(nèi)存型,而nsq比較特殊,它實(shí)際上是內(nèi)存+磁盤的一個(gè)消息中間件。

push流的nsq如何做流控

上面也說(shuō)了,pull流的優(yōu)勢(shì)在于可以讓客戶端自由控制消息的速度,但是push流不一樣,push流不管客戶端是否多繁忙都會(huì)推送消息,如果沒(méi)有一個(gè)流控機(jī)制,很容易讓客戶端最終因?yàn)橄M(fèi)速度跟不上導(dǎo)致產(chǎn)生各種性能問(wèn)題。nsq其實(shí)也考慮到這一點(diǎn),于是采用了一個(gè)RDY的狀態(tài)字段來(lái)表示流控。簡(jiǎn)單來(lái)說(shuō),就是客戶端連接上nsqd之后,會(huì)告訴nsqd它的可接受的消息數(shù)量是多少,每當(dāng)nsqd給客戶端推送一條消息這個(gè)RDY就會(huì)減一,而客戶端消費(fèi)完畢并且發(fā)送一個(gè)FIN之后,這個(gè)RDY又會(huì)加一(其實(shí)這個(gè)設(shè)計(jì)有點(diǎn)類似tcp中的用來(lái)控制流量的窗口機(jī)制)

go-nsq客戶端

我們來(lái)參考一下golang官方實(shí)現(xiàn)的nsq客戶端是如何控制這個(gè)rdy的。
首先編寫一個(gè)客戶端:

type Customter struct {}

func (c *Customter) HandleMessage(msg *nsq.Message) error {
    fmt.Println("receive: ", string(msg.Body))
    return nil
}

func main() {
    cfg := nsq.NewConfig()
    cfg.LookupdPollInterval = time.Second
    customer, err := nsq.NewConsumer("test", "t1", cfg)
    if err != nil {
        log.Panic(err)
    }
    customer.AddHandler(&Customter{})
    if err := customer.ConnectToNSQD("127.0.0.1:4161"); err != nil {
        log.Panic(err)
    }
    select {}
}

跳進(jìn)源碼,可以看到go-nsq的Consumer結(jié)構(gòu)體有一個(gè)字段connections

// github.com/nsqio/go-nsq/customer.go
type Consumer struct {
..... 
connections        map[string]*Conn
....
}
=

當(dāng)我們上面的demo調(diào)用ConnectToNSQD的時(shí)候,這個(gè)connections的map會(huì)寫入對(duì)應(yīng)的nsqd addr作為key,連接成功的Conn作為value:

r.connections[addr] = conn
for _, c := range r.conns() {
    r.maybeUpdateRDY(c)
}

上面代碼表示會(huì)遍歷這個(gè)Customer的所有nsqd conn(customer可以同時(shí)連接多個(gè)nsqd),然后調(diào)用maybeUpdateRDY這個(gè)方法:

    // 當(dāng)剩余rdy的數(shù)量等于1,或者少于最近一次的rdycount的25%,就調(diào)整這個(gè)rdycount,這個(gè)rdycount就取用戶設(shè)置的MaxInFlight
    if remain <= 1 || remain < (lastRdyCount/4) || (count > 0 && count < remain) {
        r.log(LogLevelDebug, "(%s) sending RDY %d (%d remain from last RDY %d)",
            conn, count, remain, lastRdyCount)
        r.updateRDY(conn, count)
    } else {

由此我們可以得知,nsqd的客戶端在連接nsqd的時(shí)候就會(huì)設(shè)置一個(gè)初始的rdycount。當(dāng)然,在連接成功之后,也會(huì)有一個(gè)gorountine后臺(tái)不斷去調(diào)整這個(gè)rdycount

func (r *Consumer) rdyLoop() {
    redistributeTicker := time.NewTicker(r.config.RDYRedistributeInterval)

    for {
        select {
        case <-redistributeTicker.C:
            r.redistributeRDY()
        case <-r.exitChan:
            goto exit
        }
    }

exit:
    redistributeTicker.Stop()
    r.log(LogLevelInfo, "rdyLoop exiting")
    r.wg.Done()
}

參考鏈接

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 1. 介紹 最近在研究一些消息中間件,常用的MQ如RabbitMQ,ActiveMQ,Kafka等。NSQ是一個(gè)基...
    aoho閱讀 9,090評(píng)論 1 16
  • 一、簡(jiǎn)歷準(zhǔn)備 1、個(gè)人技能 (1)自定義控件、UI設(shè)計(jì)、常用動(dòng)畫特效 自定義控件 ①為什么要自定義控件? Andr...
    lucas777閱讀 5,388評(píng)論 2 54
  • 目錄 一 、常用消息中間件支持模型二、消費(fèi)端Push模型優(yōu)缺點(diǎn)三、消費(fèi)端Pull模型優(yōu)缺點(diǎn)四、兩種模型在實(shí)際場(chǎng)景中...
    longxingxiu閱讀 10,942評(píng)論 4 10
  • 58同城高性能移動(dòng)PUSH推送平臺(tái)架構(gòu)演進(jìn)之路 58同城作為中國(guó)最大的生活服務(wù)平臺(tái),涵蓋了房產(chǎn)、招聘、二手、二手車...
    meng_philip123閱讀 3,234評(píng)論 3 38
  • 心微動(dòng) 奈何情已遠(yuǎn) 物也非 人也非 花來(lái)花落 是是非非 往日不可追 今日種種 踏雪尋梅 你在我心中 似水無(wú)痕 一條...
    雨心田閱讀 1,267評(píng)論 18 46

友情鏈接更多精彩內(nèi)容