nsq_to_file創(chuàng)建消費(fèi)者,讀取nsq消息并寫入文件中,支持topic的模糊匹配和實(shí)時(shí)更新,以及消息落文件的定時(shí)rotate等設(shè)置。研究nsq_to_file的代碼有助于理解nsq消息隊(duì)列的工作流程。
創(chuàng)建消費(fèi)者的代碼為:
func newConsumerFileLogger(topic string, cfg *nsq.Config) (*ConsumerFileLogger, error) {
f, err := NewFileLogger(*gzipEnabled, *gzipLevel, *filenameFormat, topic)
if err != nil {
return nil, err
}
consumer, err := nsq.NewConsumer(topic, *channel, cfg)
if err != nil {
return nil, err
}
consumer.AddHandler(f)
err = consumer.ConnectToNSQDs(nsqdTCPAddrs)
if err != nil {
log.Fatal(err)
}
err = consumer.ConnectToNSQLookupds(lookupdHTTPAddrs)
if err != nil {
log.Fatal(err)
}
return &ConsumerFileLogger{
C: consumer,
F: f,
}, nil
}
其中調(diào)用了go-nsq庫(kù)。在go-nsq中通過(guò)consumer.ConnectToNSQLookupds,調(diào)用nsqlookupd的/lookup接口查詢當(dāng)前的所有生產(chǎn)者。然后連接每一個(gè)nsqd,并訂閱相應(yīng)的channel(不指定就默認(rèn)為"nsq_to_file",如果nsqd
不存在該channel會(huì)自動(dòng)創(chuàng)建)。
當(dāng)與nsqd的TCP連接收到消息時(shí),go-nsq中會(huì)回調(diào)消費(fèi)者的HandleMessage函數(shù)。
func (f *FileLogger) HandleMessage(m *nsq.Message) error {
m.DisableAutoResponse()
f.logChan <- m
return nil
}
func (f *FileLogger) router(r *nsq.Consumer) {
pos := 0
output := make([]*nsq.Message, *maxInFlight)
sync := false
ticker := time.NewTicker(time.Duration(30) * time.Second)
closing := false
closeFile := false
exit := false
for {
select {
case <-r.StopChan:
sync = true
closeFile = true
exit = true
case <-f.termChan:
ticker.Stop()
r.Stop()
sync = true
closing = true
case <-f.hupChan:
sync = true
closeFile = true
case <-ticker.C:
if f.needsFileRotate() {
if *skipEmptyFiles {
closeFile = true
} else {
f.updateFile()
}
}
sync = true
case m := <-f.logChan:
if f.needsFileRotate() {
f.updateFile()
sync = true
}
_, err := f.writer.Write(m.Body)
if err != nil {
log.Fatalf("ERROR: writing message to disk - %s", err)
}
_, err = f.writer.Write([]byte("\n"))
if err != nil {
log.Fatalf("ERROR: writing newline to disk - %s", err)
}
output[pos] = m
pos++
if pos == cap(output) {
sync = true
}
}
if closing || sync || r.IsStarved() {
if pos > 0 {
log.Printf("syncing %d records to disk", pos)
err := f.Sync()
if err != nil {
log.Fatalf("ERROR: failed syncing messages - %s", err)
}
for pos > 0 {
pos--
m := output[pos]
m.Finish()
output[pos] = nil
}
}
sync = false
}
if closeFile {
f.Close()
closeFile = false
}
if exit {
close(f.ExitChan)
break
}
}
}