當(dāng)nsq跑起來之后, 我們可能會遇到以下問題
- 分布式部署
- 處理錯誤(何時requeue)
- 如何使用golang lib
- 消息生命周期如何, 如重試/斷線重連邏輯.
抱著不應(yīng)該只停留在入門的態(tài)度, 筆者粗淺的研究了一下這幾個問題, 希望也對有同樣疑問的人有幫助.
分布式部署
注意:
由于NSQ的分布式網(wǎng)絡(luò)結(jié)構(gòu), 必須為每一個NSQD分配單獨的地址(如IP或host)以保證消費者在lookup找到NSQD節(jié)點后能夠正確的連接到對于的NSQD節(jié)點, 這也就意味著需要好好的規(guī)劃NSQD的廣播地址. (下文會說到在Rancher中如何配置NSQD的廣播地址)
NSQ推薦的部署方式是 讓NSQD與TOPIC生產(chǎn)者一一對應(yīng) 既一個生產(chǎn)者一個NSQD.
如: user服務(wù)需要發(fā)送一個Topic為register的消息, 那么就需要為user服務(wù)創(chuàng)建一個NSQD, 廣播地址設(shè)置這臺服務(wù)器的ip, 例如10.0.0.1. 現(xiàn)在消費者端需要連接上Nsqlookupd就能得到生產(chǎn)者NSQD節(jié)點的地址即10.0.0.1.
如果如果想要消除單點故障, 那么我們需要為user服務(wù)再添加一臺服務(wù)器, 并且也在這臺機器上添加一個Nsqd節(jié)點并連接上Nsqlookupd, 廣播地址設(shè)置為這臺服務(wù)器的ip, 如10.0.0.2, 現(xiàn)在消費者通過Nsqlookupd就能得到兩個Nsqd節(jié)點的地址并連接了, 只要有一個服務(wù)器是"活"的, 那么整個系統(tǒng)就能正常使用.
詳細可參考官方文檔的拓撲圖: topology_patterns

在Rancher中部署
官方文檔提供的分布式部署方式是多主機部署, 所以如何在Rancher中部署就只有自己實踐了.
由于篇幅較多, 所以另起一篇.
NSQ Requeue And Backoff
這兩個概念與作用些許復(fù)雜, 建議結(jié)合官方文檔來看
requeue(重試)
當(dāng)錯誤發(fā)生, 需要重試時就應(yīng)該使用nsq的requeue功能.
backoff(避退)
backoff能降低消費者吞吐量以讓消費者從錯誤中恢復(fù).
當(dāng)消費者在backoff狀態(tài)時, 這個消費者將不再處理任何消息, 直到backoff超時
當(dāng)觸發(fā)backoff時控制臺將打印:
// 進入backoff狀態(tài), RDY設(shè)置為0代表準(zhǔn)備接收0條消息(不接收消息) (協(xié)議詳情看 https://nsq.io/clients/tcp_protocol_spec.html)
WRN 1 [test/test] backing off for 1m4s (backoff level 6), setting all to RDY 0
// 時間到了將設(shè)置RDY為1接收1條消息以測試狀態(tài), 官方將這個狀態(tài)稱為`tests the waters`
WRN 1 [test/test] (DESKTOP-HELJ7V4:4150) backoff timeout expired, sending RDY 1
當(dāng)有多個消費者競爭時, 出錯的消費者應(yīng)當(dāng)主動backoff不再處理消息(以讓出更多的機會給其他消費者).
如果只有一個消費者, 則消費者會等到backoff超時后才開始處理消息(空出時間讓消費者恢復(fù)).
避退是存在于整個消費者上的, 所以消費者每當(dāng)一個消息處理失敗了之后都會增加這個消費者的backoff level. 這會影響這個消費者的處理能力.
到底需不需要用backoff, 就要看業(yè)務(wù)了:
- 消息是用來更新數(shù)據(jù)庫訂單狀態(tài)的, 這是一個不容易出錯的邏輯, 如果需要requeue則需要backoff讓出優(yōu)先級, 讓其他消費者來做, 盡量以挽救這個訂單.
- 消息是用來通知第三方(如支付寶支付成功的http回調(diào))的, 一般requeue是發(fā)生在第三方端響應(yīng)不滿足預(yù)期的響應(yīng), 這不是我方消費者的錯誤, 應(yīng)當(dāng)不使用backoff, 避免阻塞消息消費.
參考:
Max in Flight
cnf := nsq.NewConfig()
cfg.MaxInFlight =200
MaxInFlight控制nsqd將多少消息同時發(fā)送給消費者, 默認是1, 意味著同時只有一個消息在被消費者處理, 如果你沒有控制并發(fā)數(shù)量的需求, 建議設(shè)置為CPU的數(shù)量以提高性能.
參考: Question about concurrency and Max in Flight
golang lib
nsq提供golang的client lib. 支持全部特性.
本著不重復(fù)造輪子原則, 我也想盡大可能的使用nsq lib里的代碼邏輯來實現(xiàn)需求, 但有些需求它實現(xiàn)不了:
- 自定義requeue的等待時間
- 判斷某錯誤是否應(yīng)該重試: 對于不應(yīng)該重試的錯誤(如參數(shù)有誤), 應(yīng)該直接FIN, 而不是REQ.
我也只好自己寫代碼了.
先看看它原有的幾個邏輯
// Handler is the message processing interface for Consumer
//
// Implement this interface for handlers that return whether or not message
// processing completed successfully.
//
// When the return value is nil Consumer will automatically handle FINishing.
//
// When the returned value is non-nil Consumer will automatically handle REQueing.
type Handler interface {
HandleMessage(message *Message) error
}
消息自動重試(REQ)與完成(FIN):
func (r *Consumer) handlerLoop(handler Handler) {
r.log(LogLevelDebug, "starting Handler")
for {
message, ok := <-r.incomingMessages
if !ok {
goto exit
}
if r.shouldFailMessage(message, handler) {
message.Finish()
continue
}
err := handler.HandleMessage(message)
if err != nil {
r.log(LogLevelError, "Handler returned error (%s) for msg %s", err, message.ID)
if !message.IsAutoResponseDisabled() {
message.Requeue(-1)
}
continue
}
if !message.IsAutoResponseDisabled() {
message.Finish()
}
}
exit:
r.log(LogLevelDebug, "stopping Handler")
if atomic.AddInt32(&r.runningHandlers, -1) == 0 {
r.exit()
}
}
判斷失敗:
func (r *Consumer) shouldFailMessage(message *Message, handler interface{}) bool {
// message passed the max number of attempts
if r.config.MaxAttempts > 0 && message.Attempts > r.config.MaxAttempts {
r.log(LogLevelWarning, "msg %s attempted %d times, giving up",
message.ID, message.Attempts)
logger, ok := handler.(FailedMessageLogger)
if ok {
logger.LogFailedMessage(message)
}
return true
}
return false
}
看懂它, 并根據(jù)需求改進它.
優(yōu)化requeue邏輯
可以看到當(dāng)handler返回的error不為空時, nsq將自動requeue, 這種重試是很方便但是也有壞處.
使用這個重試機制的壞處是:
- 不能自定義requeue的等待時間(默認等待時間=config.DefaultRequeueDelay*Attempts)
- 會在控制臺打印一個ERR(不能自定義格式, 而且有一些err不應(yīng)該打印到控制臺), 這點可能有潔癖的開發(fā)者受不了.
- 一些錯誤不應(yīng)該重試, 如入?yún)⒉缓戏? 再怎么重試也是徒勞. 這時候應(yīng)該
直接失敗.
我建議不要使用這個err機制, 而應(yīng)當(dāng)手動使用msg.Requeue(-1)或者msg.RequeueWithoutBackoff(-1) 來顯式requeue.
我的做法是再包裹一次Handler, 在閉包內(nèi)部自定義錯誤邏輯. 我就不寫上我亂糟糟的代碼了, 您能實現(xiàn)得更簡單.
nsqadmin
nsqadmin 提供一個web頁面來管理nsq的消息/Topic/Channel.
Lookup

我們知道還沒有生產(chǎn)者產(chǎn)生消息時(比如剛剛才部署), topic不存在, 這時如果有消費者連接上nsqlookup就會一直報錯 topic not found, 為了避免這個報錯, 就可以在Create Topic/Channel欄目中預(yù)先創(chuàng)建topic和channel.
比如下圖是添加了名為test的topic.

可以看到提示說這個topic當(dāng)前不活躍, 也就是只在nsqloopup新建了topic但是沒有在任何nsqd里生產(chǎn). 這個提示在nsq開始發(fā)送第一個消息后消失.
如何保證消息被至少投遞一次
重試
在Handler中返回一個錯誤就會觸發(fā)重試, 重試的消息被存儲在nsq的Deferred隊列, 一定延時后消費者會再次收到此消息.
斷線恢復(fù)
發(fā)送給消費者的消息總會被nsq先存儲在InFlight隊列, 消費者處理完消息需要給nsq發(fā)送FIN消息, 這時nsq才算完成了消息的投遞.
如果消費者沒有發(fā)送FIN給nsq的話(如斷線了)會出現(xiàn)什么情況? 在nsq后臺有一個專門的協(xié)程處理InFlight隊列, 當(dāng)消息超過了一定時間還沒有被FIN 則會重新加入隊列發(fā)送給其他消費者.