golang nsq消費(fèi)者時(shí)間過長,防止與nsqd服務(wù)斷開連接

在使用nsq消費(fèi)時(shí),發(fā)現(xiàn)如果HandleMessage方法執(zhí)行時(shí)間超過2分鐘,在此方法運(yùn)行完后,就會(huì) 拋出 IO error - EOF 錯(cuò)誤。測試發(fā)現(xiàn),當(dāng)超過2分鐘時(shí),nsqadmin 后臺(tái)顯示的節(jié)點(diǎn)未連接。在查看github Issues時(shí),有人提出過,于是做了測試,發(fā)現(xiàn)一切正常。(在實(shí)際開發(fā)中,不建議把慢執(zhí)行放在消費(fèi)服務(wù)里,因?yàn)镠andleMessage方法不執(zhí)行完,是不會(huì)消費(fèi)下一個(gè)隊(duì)列消息的)

func (m *MyHandler) HandleMessage(msg *nsq.Message) (err error) {
    // msg.NSQDAddress的地址是[--broadcast-address所指的ip] --broadcast-address=nsqd
    fmt.Printf("%s recv from %v, msg:%v\n", m.Title, msg.NSQDAddress, string(msg.Body))
    done := make(chan int)
    go func() {
        t := time.Tick(time.Second * 10)
        for {
            select {
            case <-t:
                fmt.Println("touch.")
                msg.Touch()
            case <-done:
                return
            }
        }
    }()
    // 測試sleep500秒
    time.Sleep(time.Second * 500) 
    done <- 1

    /*//msg.DisableAutoResponse()  // 禁用自動(dòng)提交
    // err := 執(zhí)行業(yè)務(wù)邏輯
    if err != nil {
        //msg.Requeue(time.Second * 5) // 重新放入隊(duì)列消費(fèi)
    } else {
        //msg.Finish() // 手動(dòng)提交消費(fèi)完成,移出隊(duì)列
    }*/
    return
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容