NSQ源碼(前言)-nsq介紹

NSQ是一個go語言實現(xiàn)的消息隊列,每天能夠處理數(shù)億級別的消息,其設(shè)計目標(biāo)是為在分布式環(huán)境下運行的去中心化服務(wù)提供一個強大的基礎(chǔ)架構(gòu)。

nsq文檔

nsq組件

  • nsqd
  • nsqlookupd
  • nsqadmin

nsqd

nsqd is the daemon that receives, queues, and delivers messages to clients.
It can be run standalone but is normally configured in a cluster with nsqlookupd instance(s) (in which case it will announce topics and channels for discovery).
It listens on two TCP ports, one for clients and another for the HTTP API. It can optionally listen on a third port for HTTPS.

  • 可以單獨作為queue部署
  • 也可以結(jié)合nsqlookupd(topic和channel發(fā)現(xiàn))部署,
  • 監(jiān)聽2個端口一個是tcp另外一個是http

nsqadmin

nsqadmin is a Web UI to view aggregated cluster stats in realtime and perform various administrative tasks.
是一個管理員接口,查看狀態(tài)等信息

nsqadmin_screenshot.png

nsqlookupd

nsqlookupd is the daemon that manages topology information. Clients query nsqlookupd to discover nsqd producers for a specific topic and nsqd nodes broadcasts topic and channel information.
There are two interfaces: A TCP interface which is used by nsqd for broadcasts and an HTTP interface for clients to perform discovery and administrative actions.

是管理的拓?fù)湫畔?,并提供了最終一致發(fā)現(xiàn)服務(wù)的守護(hù)進(jìn)程

承諾

  1. messages are not durable (by default)
    默認(rèn)情況下并不持久化消息 --mem-queue-size選項指定最大的queue size,后續(xù)分析選項的作用
  2. messages are delivered at least once
    至少投遞一次,后面我們會分析如何實現(xiàn)
  3. messages received are un-ordered
    消息無序
  4. consumers eventually find all topic producers
    最終一致性

nsq 消息推送流程

Topic
channel
consumer

  • producer向nsqd發(fā)送消息時,指定topic如果topic不存在即創(chuàng)建一個該名稱的topic;
config := nsq.NewConfig()
producer, err := nsq.NewProducer("192.168.200.151:4150", config)
if err != nil {
    log.Fatal(err)
}
defer producer.Stop()
msg := "hello world"
messageBody := []byte(msg)
topicName := "test"

// Synchronously publish a single message to the specified topic.
// Messages can also be sent asynchronously and/or in batches.
err := producer.Publish(topicName, messageBody)
if err != nil {
    log.Fatal(err)
}
  • consumer向nsqlookupd注冊
func (h *messageHandler) HandleMessage(m *go_nsq.Message) error {
    if len(m.Body) == 0 {
        return nil
    }
    log.Println(string(m.Body), m.ID, m.Attempts, m.NSQDAddress, m.Timestamp)
    return nil
}
config := go_nsq.NewConfig()
consumer, err := go_nsq.NewConsumer("test", "test_channel22", config)
if err != nil {
    log.Println(err)
}
defer consumer.Stop()
consumer.AddConcurrentHandlers(&messageHandler{}, 2)
err = consumer.ConnectToNSQLookupd("192.168.200.151:4161")
//err = consumer.ConnectToNSQD("192.168.200.151:4150")
if err != nil {
    log.Fatal(err)
}

sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGUSR1)

<-sigChan

consumer通過nsqlookup的找到對應(yīng)topic的所有producers,并獲取producer所在的nsqd,并向nsqd注冊channel和topic;nsq就是通過增加的nsqlookupd來避免SPOF。


架構(gòu)圖

多個consumer注冊到同一個channel,一個 producer向它的本地 nsqd發(fā)送消息,要做到這點,首先要先打開一個連接( NSQ 提供 HTTP API 和 TCP 客戶端 等2種方式連接到 nsqd),然后發(fā)送一個包含 topic和消息主體的發(fā)布命令(pub/mpub/publish),在這種情況下,我們將消息發(fā)布到 topic上,消息會采用多播的方式被拷貝到各個 channel中, 然后通過多個 channel以分散到我們不同需求的 consumer中。


消息推送

從下一節(jié)開始我們分析nsq的源碼,分析是如何實現(xiàn)高性能推送服務(wù),并分析一些參數(shù)的用途。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容