Go語(yǔ)言使用NSQ消息隊(duì)列

1. 概述

NSQ 是一個(gè)基于Go語(yǔ)言的分布式實(shí)時(shí)消息平臺(tái),它基于MIT開(kāi)源協(xié)議發(fā)布,由bitly公司開(kāi)源出來(lái)的一款簡(jiǎn)單易用的消息中間件

相關(guān)描述如下:

NSQ是一個(gè)實(shí)時(shí)分布式消息傳遞平臺(tái),旨在大規(guī)模運(yùn)行,每天處理數(shù)十億條消息。 它促進(jìn)了沒(méi)有單點(diǎn)故障的分布式和分散式拓?fù)洌瑥亩鴮?shí)現(xiàn)了容錯(cuò)能力和高可用性,并提供了可靠的消息傳遞保證。 查看功能和保證。 從操作上講,NSQ易于配置和部署(所有參數(shù)均在命令行上指定,并且編譯的二進(jìn)制文件不具有運(yùn)行時(shí)相關(guān)性)。 為了獲得最大的靈活性,它與數(shù)據(jù)格式無(wú)關(guān)(消息可以是JSON,MsgPack,協(xié)議緩沖區(qū)或其他任何東西)。 官方提供了Go和Python庫(kù)(以及許多其他客戶端庫(kù)),并且,如果您有興趣構(gòu)建自己的庫(kù),則有一個(gè)協(xié)議規(guī)范。

NSQ的特點(diǎn)

  • 支持水平橫向拓展(無(wú)縫添加更多節(jié)點(diǎn)到集群中)
  • 部署配置容易,自帶集群管理界面(nsqadmin)
  • 提倡分布式拓?fù)?減少單點(diǎn)故障,提高容錯(cuò)
  • 低延遲的消息傳遞
  • 可靠的消交付保障保障
    • 默認(rèn)中消息都在內(nèi)存中, nsq 內(nèi)部機(jī)制保證在程序關(guān)閉時(shí)將隊(duì)列中的數(shù)據(jù)持久化到硬盤,重啟后就會(huì)恢復(fù)。
    • 消息最少被投遞一次

比較知名和常用的消息處理系統(tǒng)還有

RabbitMQ

KafKa

2. 基礎(chǔ)應(yīng)用場(chǎng)景

我們知道一般的消息隊(duì)列(Message Queue) 常用的場(chǎng)景有系統(tǒng)解耦 異步處理 流量削峰 消息通信

3. 相關(guān)文檔

  1. 項(xiàng)目地址 : https://github.com/nsqio/nsq
  2. 項(xiàng)目文檔 英文: https://nsq.io/overview/design.html
  3. 下載地址: https://nsq.io/deployment/installing.html
  4. 客戶端下載地址 : https://nsq.io/clients/client_libraries.html

4.安裝操作

根據(jù)自己的操作平臺(tái)下載解壓即可

  • 根據(jù)自己的操作系統(tǒng)下載對(duì)應(yīng)的壓縮包文件
  • 解壓壓縮文件
  • 進(jìn)入解壓后 bin 目錄中

bin 目錄中我們能看到如下文件

-rwxr-xr-x 1 captain 197121 5515776 8月  28 13:46 nsq_stat.exe*
-rwxr-xr-x 1 captain 197121 5823488 8月  28 13:46 nsq_tail.exe*
-rwxr-xr-x 1 captain 197121 5997568 8月  28 13:46 nsq_to_file.exe*
-rwxr-xr-x 1 captain 197121 5923840 8月  28 13:46 nsq_to_http.exe*
-rwxr-xr-x 1 captain 197121 5903872 8月  28 13:46 nsq_to_nsq.exe*
-rwxr-xr-x 1 captain 197121 8787968 8月  28 13:46 nsqadmin.exe*
-rwxr-xr-x 1 captain 197121 9108992 8月  28 13:46 nsqd.exe*
-rwxr-xr-x 1 captain 197121 8384000 8月  28 13:46 nsqlookupd.exe*
-rwxr-xr-x 1 captain 197121 5639680 8月  28 13:46 to_nsq.exe*

5. NSQ服務(wù)端基礎(chǔ)組件介紹

5.1 nsqd

nsqd是一個(gè)守護(hù)進(jìn)程負(fù)責(zé)接收,排隊(duì),消息傳遞 到客戶端。 它可以獨(dú)立運(yùn)行,但通常由nsqlookupd實(shí)例的群集中配置(在這種情況下,它將能聲明topics和發(fā)現(xiàn)channel)。 它偵聽(tīng)兩個(gè)TCP端口,一個(gè)偵聽(tīng)客戶端,另一個(gè)偵聽(tīng)HTTP API。 它可以選擇在第三個(gè)端口上偵聽(tīng)HTTPS。

5.2 nsqlookupd

nsqlookupd 是管理拓?fù)湫畔⒌氖刈o(hù)程序。 客戶端查詢nsqlookupd以發(fā)現(xiàn)特定 topicnsqd 生產(chǎn)者和 nsqd 節(jié)點(diǎn)廣播topicchannel信息。
有兩個(gè)接口:

nsqd用于廣播的TCP接口

客戶端(nsqadmin)執(zhí)行發(fā)現(xiàn)和管理操作的HTTP接口

5.3 nsqadmin

nsqadmin 是一套 WEB管理界面,用來(lái)匯集集群的實(shí)時(shí)統(tǒng)計(jì),并執(zhí)行不同的管理任務(wù)。

重點(diǎn)提示:

NSQ還有許多功能組件,我們只介紹這三個(gè)(nsqd nsqlookupd nsqadmin)最常用和主要的

NSQ的所有組件都可以通過(guò)參數(shù) -- help 查看相關(guān)配置

nsqdnsqlookupd 都有對(duì)應(yīng)的http API ,需要使用的時(shí)候查看文檔即可

6.操作NSQ

6.1 安裝客戶端

根據(jù)不同的開(kāi)發(fā)語(yǔ)言選擇不同的客戶端

我們是使用Golang操作所以采用NSQ的官方提供客戶端 go-nsq

go get -u github.com/nsqio/go-nsq

6.1 單機(jī)啟動(dòng)nsqd

默認(rèn)啟動(dòng)的nsqd 監(jiān)聽(tīng) HTTP對(duì)應(yīng)的4151端口和TCP對(duì)應(yīng)的4150端口

$ ./nsqd
[nsqd] 2019/11/10 13:41:29.575014 INFO: nsqd v1.2.0 (built w/go1.12.9)
[nsqd] 2019/11/10 13:41:29.593002 INFO: ID: 825
[nsqd] 2019/11/10 13:41:29.597000 INFO: TOPIC(topic_demo): created
[nsqd] 2019/11/10 13:41:29.599998 INFO: TOPIC(topic_demo): new channel(aa)
[nsqd] 2019/11/10 13:41:29.599998 INFO: NSQ: persisting topic/channel metadata to nsqd.dat
[nsqd] 2019/11/10 13:41:29.644973 INFO: HTTP: listening on [::]:4151
[nsqd] 2019/11/10 13:41:29.644973 INFO: TCP: listening on [::]:4150

我們同樣可以指定端口

$ ./nsqd -http-address="0.0.0.0:8080" -tcp-address="0.0.0.0:8081"
[nsqd] 2019/11/10 14:05:40.726849 INFO: nsqd v1.2.0 (built w/go1.12.9)
[nsqd] 2019/11/10 14:05:40.745838 INFO: ID: 825
[nsqd] 2019/11/10 14:05:40.747836 INFO: NSQ: persisting topic/channel metadata to nsqd.dat
[nsqd] 2019/11/10 14:05:40.788814 INFO: TCP: listening on [::]:8081
[nsqd] 2019/11/10 14:05:40.788814 INFO: HTTP: listening on [::]:8080

這樣我們就啟動(dòng)了一個(gè)nsqd 的實(shí)例

在NSQ中有兩個(gè)非常重要的概念 topicChannel

我們看一下文檔中的描述:

每個(gè)nsqd實(shí)例旨在一次處理多個(gè)數(shù)據(jù)流。這些數(shù)據(jù)流稱為“topics”,一個(gè)topic具有1個(gè)或多個(gè)“channels”。每個(gè)channel都會(huì)收到topic所有消息的副本,實(shí)際上下游的服務(wù)是通過(guò)對(duì)應(yīng)的channel來(lái)消費(fèi)topic消息。

topicchannel不是預(yù)先配置的。topic在首次使用時(shí)創(chuàng)建,方法是將其發(fā)布到指定topic,或者訂閱指定topic上的channel。channel是通過(guò)訂閱指定的channel在第一次使用時(shí)創(chuàng)建的。

topicchannel都相互獨(dú)立地緩沖數(shù)據(jù),防止緩慢的消費(fèi)者導(dǎo)致其他chennel的積壓(同樣適用于topic級(jí)別)。

channel可以并且通常會(huì)連接多個(gè)客戶端。假設(shè)所有連接的客戶端都處于準(zhǔn)備接收消息的狀態(tài),則每條消息將被傳遞到隨機(jī)客戶端。例如:

topic-channel.gif

總而言之,消息是從topic->channel多播的(每個(gè)channel都接收該topic的所有消息的副本),但從channel-> 消息消費(fèi)者 均勻分發(fā)(每個(gè)消費(fèi)者都接收該頻道的一部分消息)。

6.1.1 單NSQ的使用

編寫一個(gè)消息生產(chǎn)者
nsq_single_product.go

package main

import (
    "fmt"
    "github.com/nsqio/go-nsq"
    "time"
)
func main() {
    nsqAddr := "127.0.0.1:8081"
    conf :=nsq.NewConfig()
    p ,err := nsq.NewProducer(nsqAddr,conf)
    if err != nil {
        fmt.Println(err)
        return
    }
    for  {
        message := "message :"+ time.Now().Format("2006-01-02 15:04:05")
        fmt.Println(message)
        // 發(fā)送消息
        p.Publish("topic-demo1",[]byte(message))
        time.Sleep(time.Second)
    }

}

編寫一個(gè)消息消費(fèi)者

nsq_single_consumer.go

package main

import (
    "fmt"
    "github.com/nsqio/go-nsq"
)

type NewHandler struct{}

func (m *NewHandler) HandleMessage(msg *nsq.Message) (err error) {
    addr := msg.NSQDAddress
    message := string(msg.Body)
    fmt.Println(addr, message)
    return
}
func MyConsumers(topic, channel, addr string) {
    conf := nsq.NewConfig()
    new_consumer, err := nsq.NewConsumer(topic, channel, conf)
    if err != nil {

    }
    // 接收消息
    new_handler := &NewHandler{}
    new_consumer.AddHandler(new_handler)
    err = new_consumer.ConnectToNSQD(addr)
    if err != nil {

    }
}
func main() {
    addr := "127.0.0.1:8081"
    go MyConsumers("topic-demo1", "channel-aa", addr)
    // 模擬多個(gè)從多個(gè)channel去消息
    go MyConsumers("topic-demo1", "channel-bb", addr)
    select {}
}

6.1.2 通過(guò)nsqadmin查看

啟動(dòng)nsqadmin

nsqadmin 的web界面默認(rèn)監(jiān)聽(tīng)了 4171端口

$ ./nsqadmin --nsqd-http-address="127.0.0.1:8080"
[nsqadmin] 2019/11/10 16:06:15.842033 INFO: nsqadmin v1.2.0 (built w/go1.12.9)
[nsqadmin] 2019/11/10 16:06:15.858026 INFO: HTTP: listening on [::]:4171

我們?cè)诘刂窓谥休斎?/p>

http://127.0.0.1:4171/

就能看看管理界面

nsqadmin
nsqadmin
6.1.3 NSQ的單點(diǎn)結(jié)構(gòu)
nsq.png

6.3 NSQ集群

6.3.1 啟動(dòng)NSQ各組件

構(gòu)建一個(gè)NSQ的基礎(chǔ)拓?fù)浣Y(jié)構(gòu)

我們可以簡(jiǎn)單的說(shuō)nsqlookupd 是用來(lái)管理nsqd實(shí)例節(jié)點(diǎn)的

第一步
啟動(dòng)nsqlookupd

啟動(dòng)的nsqlookupd 采用了默認(rèn)配置 通過(guò)參數(shù) --help 查看配置項(xiàng)

$ ./nsqlookupd
[nsqlookupd] 2019/11/10 16:40:55.968588 INFO: nsqlookupd v1.2.0 (built w/go1.12.9)
[nsqlookupd] 2019/11/10 16:40:55.983580 INFO: HTTP: listening on [::]:4161
[nsqlookupd] 2019/11/10 16:40:55.984579 INFO: TCP: listening on [::]:4160

第二步

添加nsqd 實(shí)例

與前面的啟動(dòng)不同,需要帶上參數(shù) -lookupd-tcp-address

添加第一個(gè)實(shí)例

./nsqd -http-address="0.0.0.0:8080" -tcp-address="0.0.0.0:8081" -lookupd-tcp-address="127.0.0.1:4160"

添加第二個(gè)實(shí)例

 ./nsqd -http-address="0.0.0.0:8090" -tcp-address="0.0.0.0:8091" -lookupd-tcp-address="127.0.0.1:4160"

第三步

啟動(dòng)nsqadmin

與前面的也不同了需要帶上參數(shù) -lookupd-http-address

$ ./nsqadmin -lookupd-http-address="127.0.0.1:4161"

在瀏覽器中訪問(wèn)nsqadmin

nsqadmin-nodes
nsqadmin-lookup
6.3.2 NSQ的拓?fù)浣Y(jié)構(gòu)
nsqlookupd
  1. 在集群模式中,消息生產(chǎn)方發(fā)送消息給任意一個(gè)nsqd 實(shí)例都不影響
  2. 消息的消費(fèi)者需要通過(guò)nsqlookupd 查詢nsqd的地址后才能獲取消息
  3. 增加nsqd 節(jié)點(diǎn)完全不影響其他的節(jié)點(diǎn)
6.3.3 Golang使用NSQ代碼示例

消息生產(chǎn)者

nsq_cluster_product.go

package main

import (
    "bufio"
    "fmt"
    "github.com/nsqio/go-nsq"
    "log"
    "os"
    "strings"
)

var pro *nsq.Producer

func NewPro(addr string) (err error) {
    conf := nsq.NewConfig()
    pro, err = nsq.NewProducer(addr, conf)
    if err != nil {
        log.Println(err)
        return err
    }
    return nil
}
func main() {
    nsqAddr := "127.0.0.1:8091"
    err := NewPro(nsqAddr)
    if err != nil {
        fmt.Println(err)
        return
    }else{
        fmt.Println("connect 127.0.0.1:8091 success")
    }
    // 讀取標(biāo)準(zhǔn)輸入
    reader := bufio.NewReader(os.Stdin)
    for {
        // 讀取所有內(nèi)容直到遇見(jiàn)回車(\n)
        data, err := reader.ReadString('\n')
        if err != nil {
            fmt.Println("read data from stdin is field : ", err)
            continue
        }
        // 當(dāng)輸入q的時(shí)候退出
        data = strings.TrimSpace(data)
        if strings.ToUpper(data) == "Q" {
            break
        }
        err = pro.Publish("topic-demo1", []byte(data))
        if err != nil {
            fmt.Println("nsq publish is field ", err)
            continue
        }
    }
    fmt.Println("exit !")
}

消息消費(fèi)者

nsq_cluster_consumer.go

package main

import (
    "fmt"
    "github.com/nsqio/go-nsq"
)

type Handler struct{}

func (m *Handler) HandleMessage(msg *nsq.Message) (err error) {
    addr := msg.NSQDAddress
    message := string(msg.Body)
    fmt.Println(addr, message)
    return
}
func NewConsumers(t string, c string, addr string) error {
    conf := nsq.NewConfig()
    nc, err := nsq.NewConsumer(t, c, conf)
    if err != nil {
        fmt.Println("create consumer failed err ", err)
        return err
    }
    consumer := &Handler{}
    nc.AddHandler(consumer)
    // 連接nsqlookupd
    if err:= nc.ConnectToNSQLookupd(addr);err!=nil{
        fmt.Println("connect nsqlookupd failed ", err)
        return err
    }
    return nil
}
func main() {
    // 這是nsqlookupd的地址
    addr := "127.0.0.1:4161"
    err := NewConsumers("topic-demo1", "channel-aa", addr)
    if err != nil {
        fmt.Println("new nsq consumer failed", err)
        return
    }
    select {}
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容