Nsq從入門到實踐

當(dāng)nsq跑起來之后, 我們可能會遇到以下問題

  • 分布式部署
  • 處理錯誤(何時requeue)
  • 如何使用golang lib
  • 消息生命周期如何, 如重試/斷線重連邏輯.

抱著不應(yīng)該只停留在入門的態(tài)度, 筆者粗淺的研究了一下這幾個問題, 希望也對有同樣疑問的人有幫助.

分布式部署

注意:
由于NSQ的分布式網(wǎng)絡(luò)結(jié)構(gòu), 必須為每一個NSQD分配單獨的地址(如IP或host)以保證消費者在lookup找到NSQD節(jié)點后能夠正確的連接到對于的NSQD節(jié)點, 這也就意味著需要好好的規(guī)劃NSQD的廣播地址. (下文會說到在Rancher中如何配置NSQD的廣播地址)

NSQ推薦的部署方式是 讓NSQD與TOPIC生產(chǎn)者一一對應(yīng) 既一個生產(chǎn)者一個NSQD.

如: user服務(wù)需要發(fā)送一個Topic為register的消息, 那么就需要為user服務(wù)創(chuàng)建一個NSQD, 廣播地址設(shè)置這臺服務(wù)器的ip, 例如10.0.0.1. 現(xiàn)在消費者端需要連接上Nsqlookupd就能得到生產(chǎn)者NSQD節(jié)點的地址即10.0.0.1.

如果如果想要消除單點故障, 那么我們需要為user服務(wù)再添加一臺服務(wù)器, 并且也在這臺機器上添加一個Nsqd節(jié)點并連接上Nsqlookupd, 廣播地址設(shè)置為這臺服務(wù)器的ip, 如10.0.0.2, 現(xiàn)在消費者通過Nsqlookupd就能得到兩個Nsqd節(jié)點的地址并連接了, 只要有一個服務(wù)器是"活"的, 那么整個系統(tǒng)就能正常使用.

詳細可參考官方文檔的拓撲圖: topology_patterns

topology_patterns

在Rancher中部署

官方文檔提供的分布式部署方式是多主機部署, 所以如何在Rancher中部署就只有自己實踐了.

由于篇幅較多, 所以另起一篇.

NSQ Requeue And Backoff

這兩個概念與作用些許復(fù)雜, 建議結(jié)合官方文檔來看

requeue(重試)

當(dāng)錯誤發(fā)生, 需要重試時就應(yīng)該使用nsq的requeue功能.

backoff(避退)

backoff能降低消費者吞吐量以讓消費者從錯誤中恢復(fù).

當(dāng)消費者在backoff狀態(tài)時, 這個消費者將不再處理任何消息, 直到backoff超時

當(dāng)觸發(fā)backoff時控制臺將打印:

// 進入backoff狀態(tài), RDY設(shè)置為0代表準(zhǔn)備接收0條消息(不接收消息) (協(xié)議詳情看 https://nsq.io/clients/tcp_protocol_spec.html)
WRN    1 [test/test] backing off for 1m4s (backoff level 6), setting all to RDY 0
// 時間到了將設(shè)置RDY為1接收1條消息以測試狀態(tài), 官方將這個狀態(tài)稱為`tests the waters`
WRN    1 [test/test] (DESKTOP-HELJ7V4:4150) backoff timeout expired, sending RDY 1

當(dāng)有多個消費者競爭時, 出錯的消費者應(yīng)當(dāng)主動backoff不再處理消息(以讓出更多的機會給其他消費者).
如果只有一個消費者, 則消費者會等到backoff超時后才開始處理消息(空出時間讓消費者恢復(fù)).

避退是存在于整個消費者上的, 所以消費者每當(dāng)一個消息處理失敗了之后都會增加這個消費者的backoff level. 這會影響這個消費者的處理能力.

到底需不需要用backoff, 就要看業(yè)務(wù)了:

  • 消息是用來更新數(shù)據(jù)庫訂單狀態(tài)的, 這是一個不容易出錯的邏輯, 如果需要requeue則需要backoff讓出優(yōu)先級, 讓其他消費者來做, 盡量以挽救這個訂單.
  • 消息是用來通知第三方(如支付寶支付成功的http回調(diào))的, 一般requeue是發(fā)生在第三方端響應(yīng)不滿足預(yù)期的響應(yīng), 這不是我方消費者的錯誤, 應(yīng)當(dāng)不使用backoff, 避免阻塞消息消費.

參考:

Max in Flight

cnf := nsq.NewConfig()
cfg.MaxInFlight =200

MaxInFlight控制nsqd將多少消息同時發(fā)送給消費者, 默認是1, 意味著同時只有一個消息在被消費者處理, 如果你沒有控制并發(fā)數(shù)量的需求, 建議設(shè)置為CPU的數(shù)量以提高性能.

參考: Question about concurrency and Max in Flight

golang lib

nsq提供golang的client lib. 支持全部特性.

本著不重復(fù)造輪子原則, 我也想盡大可能的使用nsq lib里的代碼邏輯來實現(xiàn)需求, 但有些需求它實現(xiàn)不了:

  • 自定義requeue的等待時間
  • 判斷某錯誤是否應(yīng)該重試: 對于不應(yīng)該重試的錯誤(如參數(shù)有誤), 應(yīng)該直接FIN, 而不是REQ.

我也只好自己寫代碼了.

先看看它原有的幾個邏輯

// Handler is the message processing interface for Consumer
//
// Implement this interface for handlers that return whether or not message
// processing completed successfully.
//
// When the return value is nil Consumer will automatically handle FINishing.
//
// When the returned value is non-nil Consumer will automatically handle REQueing.
type Handler interface {
    HandleMessage(message *Message) error
}

消息自動重試(REQ)與完成(FIN):

func (r *Consumer) handlerLoop(handler Handler) {
    r.log(LogLevelDebug, "starting Handler")

    for {
        message, ok := <-r.incomingMessages
        if !ok {
            goto exit
        }

        if r.shouldFailMessage(message, handler) {
            message.Finish()
            continue
        }

        err := handler.HandleMessage(message)
        if err != nil {
            r.log(LogLevelError, "Handler returned error (%s) for msg %s", err, message.ID)
            if !message.IsAutoResponseDisabled() {
                message.Requeue(-1)
            }
            continue
        }

        if !message.IsAutoResponseDisabled() {
            message.Finish()
        }
    }

exit:
    r.log(LogLevelDebug, "stopping Handler")
    if atomic.AddInt32(&r.runningHandlers, -1) == 0 {
        r.exit()
    }
}

判斷失敗:

func (r *Consumer) shouldFailMessage(message *Message, handler interface{}) bool {
    // message passed the max number of attempts
    if r.config.MaxAttempts > 0 && message.Attempts > r.config.MaxAttempts {
        r.log(LogLevelWarning, "msg %s attempted %d times, giving up",
            message.ID, message.Attempts)

        logger, ok := handler.(FailedMessageLogger)
        if ok {
            logger.LogFailedMessage(message)
        }

        return true
    }
    return false
}

看懂它, 并根據(jù)需求改進它.

優(yōu)化requeue邏輯

可以看到當(dāng)handler返回的error不為空時, nsq將自動requeue, 這種重試是很方便但是也有壞處.

使用這個重試機制的壞處是:

  • 不能自定義requeue的等待時間(默認等待時間=config.DefaultRequeueDelay*Attempts)
  • 會在控制臺打印一個ERR(不能自定義格式, 而且有一些err不應(yīng)該打印到控制臺), 這點可能有潔癖的開發(fā)者受不了.
  • 一些錯誤不應(yīng)該重試, 如入?yún)⒉缓戏? 再怎么重試也是徒勞. 這時候應(yīng)該直接失敗.

我建議不要使用這個err機制, 而應(yīng)當(dāng)手動使用msg.Requeue(-1)或者msg.RequeueWithoutBackoff(-1) 來顯式requeue.

我的做法是再包裹一次Handler, 在閉包內(nèi)部自定義錯誤邏輯. 我就不寫上我亂糟糟的代碼了, 您能實現(xiàn)得更簡單.

nsqadmin

nsqadmin 提供一個web頁面來管理nsq的消息/Topic/Channel.

Lookup

loopup

我們知道還沒有生產(chǎn)者產(chǎn)生消息時(比如剛剛才部署), topic不存在, 這時如果有消費者連接上nsqlookup就會一直報錯 topic not found, 為了避免這個報錯, 就可以在Create Topic/Channel欄目中預(yù)先創(chuàng)建topic和channel.

比如下圖是添加了名為test的topic.

可以看到提示說這個topic當(dāng)前不活躍, 也就是只在nsqloopup新建了topic但是沒有在任何nsqd里生產(chǎn). 這個提示在nsq開始發(fā)送第一個消息后消失.

如何保證消息被至少投遞一次

重試

在Handler中返回一個錯誤就會觸發(fā)重試, 重試的消息被存儲在nsq的Deferred隊列, 一定延時后消費者會再次收到此消息.

斷線恢復(fù)

發(fā)送給消費者的消息總會被nsq先存儲在InFlight隊列, 消費者處理完消息需要給nsq發(fā)送FIN消息, 這時nsq才算完成了消息的投遞.

如果消費者沒有發(fā)送FIN給nsq的話(如斷線了)會出現(xiàn)什么情況? 在nsq后臺有一個專門的協(xié)程處理InFlight隊列, 當(dāng)消息超過了一定時間還沒有被FIN 則會重新加入隊列發(fā)送給其他消費者.

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,554評論 19 139
  • 關(guān)于Mongodb的全面總結(jié) MongoDB的內(nèi)部構(gòu)造《MongoDB The Definitive Guide》...
    中v中閱讀 32,302評論 2 89
  • 如果智慧是成功的彼岸,讀書則恰似一葉扁舟,滿載希望與你共同駛向前方;如果智慧是快樂的門扇,讀書則是一枚鑰匙,打開心...
    S從心閱讀 372評論 0 1
  • 類目系統(tǒng)是整個電商系統(tǒng)中的基礎(chǔ)部分,比較容易被忽略,但確實是比較重要的一個部分,目前淘寶或者京東的商品數(shù)量已經(jīng)有...
    DearNicole閱讀 19,127評論 18 53
  • 一般情況下,右鍵拖動子view到父view上,xcode彈出選框如下: 我們可以看到,spacing只能關(guān)聯(lián)到 T...
    jezong閱讀 561評論 0 0

友情鏈接更多精彩內(nèi)容