NSQ源碼分析(2)- nsqd消息的推送與訂閱

NSQ針對(duì)消費(fèi)者采取消息推送的方式,因?yàn)镹SQ本身基于內(nèi)存和diskq,并不能容忍太大的消息的堆積,使用推模式也合情合理。

前一篇我們已經(jīng)看到了針對(duì)一個(gè)發(fā)送到給定topic后,這個(gè)message被復(fù)制了多份,發(fā)送到了這個(gè)topic下的每一個(gè)channel中,存在在channel的memeoryMsgChan或者backend中。

消息的訂閱與推送

關(guān)于消息的推送最重要的是兩個(gè)文件:nsqd/protocol_v2.go和nsqd/client_v2.go。

當(dāng)一個(gè)客戶端與nsqd進(jìn)程建立了一個(gè)tcp鏈接時(shí),代碼會(huì)調(diào)用protocolV2.IOLoop方法,并新建一個(gè)clientV2結(jié)構(gòu)體對(duì)象。IOLoop方法會(huì)啟動(dòng)一個(gè)協(xié)程執(zhí)行messagePump方法。

對(duì)于每一個(gè)tcp連接,都會(huì)有兩個(gè)協(xié)程:運(yùn)行IOLoop的協(xié)程用于接收客戶端的請(qǐng)求;運(yùn)行messagePump的負(fù)責(zé)處理數(shù)據(jù),把數(shù)據(jù)給客戶端clientV2推送給客戶端。

整個(gè)protocol_v2就是一個(gè)比較經(jīng)典的tcp協(xié)議的實(shí)現(xiàn)。每當(dāng)建立一個(gè)新的tcp連接,服務(wù)器都會(huì)建立一個(gè)client_v2對(duì)象,和啟動(dòng)protocol_v2.messagePump協(xié)程,一個(gè)client只會(huì)訂閱一個(gè)channel。IOLoop用于接收客戶端傳來(lái)的指令,并進(jìn)行回復(fù),并通過(guò)各個(gè)channel和其它的組件通信(包括protocol_v2.messagePump)。詳情可以看源代碼:github.com/nsqio/nsq/nsqd/protocol_v2.go

我們想要關(guān)注的消息的推送可以看messagePump的實(shí)現(xiàn),如下:

func (p *protocolV2) messagePump(client *clientV2, startedChan chan bool) {
    var err error
    var buf bytes.Buffer
    var memoryMsgChan chan *Message
    var backendMsgChan chan []byte
    var subChannel *Channel
    // NOTE: `flusherChan` is used to bound message latency for
    // the pathological case of a channel on a low volume topic
    // with >1 clients having >1 RDY counts
    var flusherChan <-chan time.Time
    var sampleRate int32

    subEventChan := client.SubEventChan
    identifyEventChan := client.IdentifyEventChan
    outputBufferTicker := time.NewTicker(client.OutputBufferTimeout)
    heartbeatTicker := time.NewTicker(client.HeartbeatInterval)
    heartbeatChan := heartbeatTicker.C
    msgTimeout := client.MsgTimeout

    // v2 opportunistically buffers data to clients to reduce write system calls
    // we force flush in two cases:
    //    1. when the client is not ready to receive messages
    //    2. we're buffered and the channel has nothing left to send us
    //       (ie. we would block in this loop anyway)
    //
    flushed := true

    // signal to the goroutine that started the messagePump
    // that we've started up
    close(startedChan)

    for {
        //IsReadyForMessages會(huì)檢查InFlightMessages的數(shù)目是否超過(guò)了客戶端設(shè)置的RDY,超過(guò)后,不再取消息推送,而是強(qiáng)制做flush。
        if subChannel == nil || !client.IsReadyForMessages() {
            // the client is not ready to receive messages...
            memoryMsgChan = nil
            backendMsgChan = nil
            flusherChan = nil
            // force flush
            client.writeLock.Lock()
            err = client.Flush()
            client.writeLock.Unlock()
            if err != nil {
                goto exit
            }
            flushed = true
        } else if flushed {
            // last iteration we flushed...
            // do not select on the flusher ticker channel
            memoryMsgChan = subChannel.memoryMsgChan
            backendMsgChan = subChannel.backend.ReadChan()
            flusherChan = nil
        } else {
            // select on the flusher ticker channel, too
            memoryMsgChan = subChannel.memoryMsgChan
            backendMsgChan = subChannel.backend.ReadChan()
            flusherChan = outputBufferTicker.C
        }

        select {
        case <-flusherChan: //ticker chan,保證定期flush
            client.writeLock.Lock()
            err = client.Flush()
            client.writeLock.Unlock()
            if err != nil {
                goto exit
            }
            flushed = true
        case <-client.ReadyStateChan://continue to next iteration:check ready state
        case subChannel = <-subEventChan://收到client的SUB的topic的channel后,更新內(nèi)存中的subChannel開(kāi)始推送;只會(huì)SUB一個(gè)channel
            // you can't SUB anymore
            subEventChan = nil
        case identifyData := <-identifyEventChan:
            //SKIP
        case <-heartbeatChan://heartbeat check
            err = p.Send(client, frameTypeResponse, heartbeatBytes)
            if err != nil {
                goto exit
            }
        case b := <-backendMsgChan:
            if sampleRate > 0 && rand.Int31n(100) > sampleRate {
                continue
            }

            msg, err := decodeMessage(b)
            if err != nil {
                p.ctx.nsqd.logf(LOG_ERROR, "failed to decode message - %s", err)
                continue
            }
            msg.Attempts++

            subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
            client.SendingMessage() //add mem count
            err = p.SendMessage(client, msg, &buf)
            if err != nil {
                goto exit
            }
            flushed = false
        case msg := <-memoryMsgChan:
            if sampleRate > 0 && rand.Int31n(100) > sampleRate {
                continue
            }
            msg.Attempts++

            subChannel.StartInFlightTimeout(msg, client.ID, msgTimeout)
            client.SendingMessage()
            err = p.SendMessage(client, msg, &buf)
            if err != nil {
                goto exit
            }
            flushed = false
        case <-client.ExitChan:
            goto exit
        }
    }

exit:
    p.ctx.nsqd.logf(LOG_INFO, "PROTOCOL(V2): [%s] exiting messagePump", client)
    heartbeatTicker.Stop()
    outputBufferTicker.Stop()
    if err != nil {
        p.ctx.nsqd.logf(LOG_ERROR, "PROTOCOL(V2): [%s] messagePump error - %s", client, err)
    }
}

首先,客戶端發(fā)送一個(gè)SUB消息來(lái)訂閱一個(gè)topic下的Channel。protocol_v2.go/protocolV2.SUB中,會(huì)往clientV2.go/client.SubEventChan發(fā)送一個(gè)channel。這里的messagePump便更新了內(nèi)存中的subChannel開(kāi)始推送這個(gè)訂閱的channel的消息。

在循環(huán)的開(kāi)頭,messageMsgChan和backendChan都會(huì)用這個(gè)subChannel對(duì)應(yīng)的channel select它們的消息。每當(dāng)有一個(gè)消息來(lái)的時(shí)候,首先(1)會(huì)調(diào)用channel的StartInFlightTimeout,channel會(huì)把這個(gè)消息加到InFlightPqueue里,這個(gè)是以timeout時(shí)間作為優(yōu)先級(jí)的優(yōu)先級(jí)隊(duì)列(最小堆),用于保存發(fā)送給客戶端但是還沒(méi)有被確認(rèn)的消息。
(2)還有會(huì)更新client的一些counter信息,如InFlightMessageCount等,根據(jù)InFlightMessageCount和RDY比較決定是否繼續(xù)推送消息。

客戶端成功消費(fèi)一條消息后,會(huì)發(fā)送一個(gè)FIN消息,帶上message ID,client會(huì)-1 InFlightMessageCount,從channel的InflightMessage中取出這個(gè)消息,并向ReadStateChan發(fā)送一個(gè)消息;如果服務(wù)端因?yàn)镽DY限制停止推送消息,收到這個(gè)消息后,也會(huì)重新查看是否可以繼續(xù)推送消息。
或者客戶端如果消費(fèi)失敗,也會(huì)發(fā)送一個(gè)REQ的請(qǐng)求,channel會(huì)把這個(gè)消息從channel的InflightMessage中取出這個(gè)消息,重新放入channel。
那如果客戶端沒(méi)有對(duì)消息做回復(fù)呢?

消息超時(shí)的設(shè)計(jì)與實(shí)現(xiàn)

在nsqd.go中,還有一部分重要的實(shí)現(xiàn),queueScanLoop方法中,每隔QueueScanInterval的時(shí)間,會(huì)從方法cache的channels list中隨機(jī)選擇QueueScanSelectionCount個(gè)channel,然后去執(zhí)行resizePool。這個(gè)實(shí)現(xiàn)參考了redis的probabilistic expiration algorithm.

參考《Redis設(shè)計(jì)與實(shí)現(xiàn)》9.6 Redis的過(guò)期鍵刪除策略,結(jié)合了兩種策略:

  1. 惰性刪除。每次客戶端對(duì)某個(gè)key讀寫(xiě)時(shí),會(huì)檢查它是否過(guò)期,如果過(guò)期,就把它刪掉。
  2. 定期刪除。定期刪除并不會(huì)遍歷整個(gè)DB,它會(huì)在規(guī)定時(shí)間內(nèi),分多次遍歷服務(wù)器中各個(gè)DB,從數(shù)據(jù)庫(kù)的expires字典中隨機(jī)檢查一部分鍵的過(guò)期時(shí)間,如果過(guò)期,則刪除。

對(duì)于nsqd的channel,它有兩個(gè)隊(duì)列需要定時(shí)檢查,一個(gè)是InFlightQueue,一個(gè)是DeferredQueue。任何一個(gè)有工作做,這個(gè)channel就被視為dirty的。
每隔default 100ms(QueueScanInterval),nsqd會(huì)隨機(jī)選擇20(QueueScanSelectionCount)個(gè)channel扔到workerCh chan之中。
每隔5s,queueScanLoop都會(huì)調(diào)用resizePool。resizePool可以看做是一個(gè)fixed pool size的協(xié)程池,idealPoolSize= min(AllChannelNum * 0.25, QueueScanWorkerPoolMax)。這么多的協(xié)程的工作就是,對(duì)于從workerCh收到的每一個(gè)channel,都會(huì)調(diào)用它的channel.go/channel.processInFlightQueue方法和channel.go/channel.processDeferredQueue方法,任何的變動(dòng)都會(huì)把這次queueScan行為標(biāo)記為dirty。
每次這20個(gè)channel全部都scan完畢后,會(huì)統(tǒng)計(jì)dirtyNum / QueueScanSelectionNum的比例,如果大于某個(gè)預(yù)設(shè)的閾值QueueScanDirtyPercent,將不會(huì)間隔時(shí)間,直接開(kāi)始下一輪的QueueScan。
那么為什么每隔5s要重新調(diào)用resizePool呢?這是為了根據(jù)最新的allChannelNum給予機(jī)會(huì)去更新resizePool協(xié)程池的協(xié)程數(shù)。因?yàn)镻oolSize是NSQD的數(shù)據(jù)域,是全局的狀態(tài),每次調(diào)用并不會(huì)另外新建一個(gè)協(xié)程池,而是根據(jù)idealSize調(diào)整它的大小。這部分代碼實(shí)現(xiàn)也比較經(jīng)典,可以學(xué)習(xí)一下“如何使用Golang實(shí)現(xiàn)一個(gè)協(xié)程池的經(jīng)典實(shí)現(xiàn),尤其是需要?jiǎng)討B(tài)調(diào)整池大小的需求”。

// resizePool adjusts the size of the pool of queueScanWorker goroutines
//
//  1 <= pool <= min(num * 0.25, QueueScanWorkerPoolMax)
//
func (n *NSQD) resizePool(num int, workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    idealPoolSize := int(float64(num) * 0.25)
    if idealPoolSize < 1 {
        idealPoolSize = 1
    } else if idealPoolSize > n.getOpts().QueueScanWorkerPoolMax {
        idealPoolSize = n.getOpts().QueueScanWorkerPoolMax
    }
    for {
        if idealPoolSize == n.poolSize {
            break
        } else if idealPoolSize < n.poolSize {
            // contract
            closeCh <- 1
            n.poolSize--
        } else {
            // expand
            n.waitGroup.Wrap(func() {
                n.queueScanWorker(workCh, responseCh, closeCh)
            })
            n.poolSize++
        }
    }
}

// queueScanWorker receives work (in the form of a channel) from queueScanLoop
// and processes the deferred and in-flight queues
func (n *NSQD) queueScanWorker(workCh chan *Channel, responseCh chan bool, closeCh chan int) {
    for {
        select {
        case c := <-workCh:
            now := time.Now().UnixNano()
            dirty := false
            if c.processInFlightQueue(now) {
                dirty = true
            }
            if c.processDeferredQueue(now) {
                dirty = true
            }
            responseCh <- dirty
        case <-closeCh:
            return
        }
    }
}

// queueScanLoop runs in a single goroutine to process in-flight and deferred
// priority queues. It manages a pool of queueScanWorker (configurable max of
// QueueScanWorkerPoolMax (default: 4)) that process channels concurrently.
//
// It copies Redis's probabilistic expiration algorithm: it wakes up every
// QueueScanInterval (default: 100ms) to select a random QueueScanSelectionCount
// (default: 20) channels from a locally cached list (refreshed every
// QueueScanRefreshInterval (default: 5s)).
//
// If either of the queues had work to do the channel is considered "dirty".
//
// If QueueScanDirtyPercent (default: 25%) of the selected channels were dirty,
// the loop continues without sleep.
func (n *NSQD) queueScanLoop() {
    workCh := make(chan *Channel, n.getOpts().QueueScanSelectionCount)
    responseCh := make(chan bool, n.getOpts().QueueScanSelectionCount)
    closeCh := make(chan int)

    workTicker := time.NewTicker(n.getOpts().QueueScanInterval)
    refreshTicker := time.NewTicker(n.getOpts().QueueScanRefreshInterval)

    channels := n.channels()
    n.resizePool(len(channels), workCh, responseCh, closeCh)

    for {
        select {
        case <-workTicker.C:
            if len(channels) == 0 {
                continue
            }
        case <-refreshTicker.C:
            channels = n.channels()
            n.resizePool(len(channels), workCh, responseCh, closeCh)
            continue
        case <-n.exitChan:
            goto exit
        }

        num := n.getOpts().QueueScanSelectionCount
        if num > len(channels) {
            num = len(channels)
        }

    loop:
        for _, i := range util.UniqRands(num, len(channels)) {
            workCh <- channels[i]
        }

        numDirty := 0
        for i := 0; i < num; i++ {
            if <-responseCh {
                numDirty++
            }
        }

        if float64(numDirty)/float64(num) > n.getOpts().QueueScanDirtyPercent {
            goto loop
        }
    }

exit:
    n.logf(LOG_INFO, "QUEUESCAN: closing")
    close(closeCh)
    workTicker.Stop()
    refreshTicker.Stop()
}

channel.go/channel.processInFlightQueue的實(shí)現(xiàn)比較簡(jiǎn)單,把channel的InflightPQueue中的message按照超時(shí)時(shí)間由早到晚把超時(shí)時(shí)間小于給定時(shí)間的消息依次取出,做一些一致性的數(shù)據(jù)操作后,重新放入channel之中(也會(huì)發(fā)送TryUpdateReadyState)。processDeferredQueue也是類似的。
這里通過(guò)了一定的概率加受控制的并發(fā)協(xié)程池,清理內(nèi)存中timeout未被客戶端所確認(rèn)的消息,重新放入隊(duì)列,保證了消息的可達(dá)性。(存在重復(fù)消費(fèi)消息的可能)

經(jīng)典的GO并發(fā)

我們其實(shí)可以發(fā)現(xiàn),同一個(gè)channel,可能會(huì)有很多的client從它的memoryMsgChan和backendChan里select監(jiān)聽(tīng)消息,因?yàn)橥粋€(gè)消息對(duì)于Golang的channel來(lái)說(shuō)只會(huì)被一個(gè)監(jiān)聽(tīng)者收到,所以,通過(guò)這樣的機(jī)制實(shí)現(xiàn)了一定程度上的消費(fèi)者的負(fù)載均衡。
NSQ的代碼很適合用Golang的Goroutine, Channel, & Mutex并發(fā)的good practice來(lái)學(xué)習(xí)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容