golang nats[3] queue模式

隊(duì)列訂閱模式

此模式中,訂閱者要指定兩個(gè)屬性,主題和隊(duì)列(queue,其實(shí)就是隊(duì)列名稱)

注意:下面所有前提=必須訂閱同一個(gè)主題

發(fā)布消息后,N個(gè)具有同樣的主題和queue的訂閱者,只有一個(gè)會(huì)收到消息。(random算法)

說(shuō)明:queue=工作組,工作組中有N個(gè)worker,發(fā)布消息后,同一個(gè)工作組中,僅有一個(gè)worker會(huì)收到消息。

相同主題,不同queue的訂閱者之間,不符合上面的描述。這種情況下,可以把同一個(gè)queue的訂閱者們,當(dāng)成一個(gè)訂閱者來(lái)處理,這樣就和普通的發(fā)布訂閱模式一樣了。

主題subj1,queue=q1的訂閱者有sub1-q1,sub2-q1,sub3-q1
主題subj1,queue=q2的訂閱者有sub1-q2,sub2-q2,sub3-q2
一個(gè)主題,兩組訂閱者,每組訂閱者中各有3個(gè)訂閱者。
對(duì)sub1發(fā)布消息,q1,q2兩個(gè)組都會(huì)收到消息(發(fā)布訂閱模式),q1,q2每個(gè)組中,分別僅有一個(gè)訂閱者會(huì)收到消息(queue模式)

server

package main

import (
    "github.com/nats-io/go-nats"
    "log"
    "flag"
)

const (
    //url   = "nats://192.168.3.125:4222"
    url = nats.DefaultURL
)

var (
    nc  *nats.Conn
    err error
)

func init() {
    if nc, err = nats.Connect(url); checkErr(err) {
        //
    }
}

func main() {
    var (
        servername = flag.String("servername", "y", "name for server")
        queueGroup = flag.String("group", "", "group name for Subscribe")
        subj       = flag.String("subj", "", "subject name")
    )
    flag.Parse()

    log.Println(*servername, *queueGroup, *subj)
    startService(*subj, *servername+" worker1", *queueGroup)
    startService(*subj, *servername+" worker2", *queueGroup)
    startService(*subj, *servername+" worker3", *queueGroup)

    select {}
}

//receive message
func startService(subj, name, queue string) {
    go async(nc, subj, name, queue)
}

func async(nc *nats.Conn, subj, name, queue string) {
    nc.QueueSubscribe(subj, queue, func(msg *nats.Msg) {
        log.Println(name, "Received a message From Async : ", string(msg.Data))
    })
}

func checkErr(err error) bool {
    if err != nil {
        log.Println(err)
        return false
    }
    return true
}

client

package main

import (
    "github.com/nats-io/go-nats"
    "log"
    "strconv"
    "github.com/pborman/uuid"
    "flag"
    "time"
)

const (
    //url   = "nats://192.168.3.125:4222"
    url = nats.DefaultURL
)

var (
    nc  *nats.Conn
    err error
)

func init() {
    if nc, err = nats.Connect(url); checkErr(err) {
        //
    }
}

func main() {
    var (
        subj = flag.String("subj", "", "subject name")
    )
    flag.Parse()
    log.Println(*subj)
    startClient(*subj)

    time.Sleep(time.Second)
}

//send message to server
func startClient(subj string) {
    for i := 0; i < 1; i++ {
        id := uuid.New()
        log.Println(id)
        nc.Publish(subj, []byte(id+" Sun "+strconv.Itoa(i)))
        nc.Publish(subj, []byte(id+" Rain "+strconv.Itoa(i)))
        nc.Publish(subj, []byte(id+" Fog "+strconv.Itoa(i)))
        nc.Publish(subj, []byte(id+" Cloudy "+strconv.Itoa(i)))
    }
}

func checkErr(err error) bool {
    if err != nil {
        log.Println(err)
        return false
    }
    return true
}

啟動(dòng)server A queue=g1,訂閱主題=weather

./main -servername=A -group=g1 -subj=weather
2018/08/18 11:32:16 A g1 weather

啟動(dòng)server B queue=g1,訂閱主題=weather

./main -servername=B -group=g1 -subj=weather
2018/08/18 11:32:21 B g1 weather

發(fā)送消息

./main -subj=weather
2018/08/18 11:32:24 weather
2018/08/18 11:32:24 3005ae7c-85ab-42d3-ad09-d44688d129ad

結(jié)果 server A收到消息

2018/08/18 11:32:24 A worker3 Received a message From Async :  3005ae7c-85ab-42d3-ad09-d44688d129ad Rain 0
2018/08/18 11:32:24 A worker2 Received a message From Async :  3005ae7c-85ab-42d3-ad09-d44688d129ad Sun 0

結(jié)果 server B收到消息

2018/08/18 11:32:24 B worker3 Received a message From Async :  3005ae7c-85ab-42d3-ad09-d44688d129ad Fog 0
2018/08/18 11:32:24 B worker3 Received a message From Async :  3005ae7c-85ab-42d3-ad09-d44688d129ad Cloudy 0

主題相同,queue不同

啟動(dòng)server c queue=test,訂閱主題=weather

> ./main -servername=C -group=test -subj=weather
2018/08/18 11:37:43 C test weather

發(fā)消息

./main -subj=weather
2018/08/18 11:37:47 weather
2018/08/18 11:37:47 b4e201dd-ea4a-4ec3-aa45-99489695f0c2

Server c 收到了全部消息

2018/08/18 11:37:47 C worker1 Received a message From Async :  b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Fog 0
2018/08/18 11:37:47 C worker3 Received a message From Async :  b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Sun 0
2018/08/18 11:37:47 C worker3 Received a message From Async :  b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Rain 0
2018/08/18 11:37:47 C worker3 Received a message From Async :  b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Cloudy 0

Server A 收到3條消息

2018/08/18 11:37:47 A worker1 Received a message From Async :  b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Rain 0
2018/08/18 11:37:47 A worker3 Received a message From Async :  b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Sun 0
2018/08/18 11:37:47 A worker3 Received a message From Async :  b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Cloudy 0

Server B 收到1條消息

2018/08/18 11:37:47 B worker2 Received a message From Async :  b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Fog 0

總結(jié):queue模式,在分發(fā)消息時(shí),進(jìn)行負(fù)載均衡,隨機(jī)發(fā)送給同一組中的任意一個(gè)訂閱者,可以隨時(shí)增加刪除訂閱者,配合響應(yīng)的監(jiān)控?cái)?shù)據(jù)和統(tǒng)計(jì)數(shù)據(jù),對(duì)下游的業(yè)務(wù)進(jìn)行自動(dòng)伸縮。

提高系統(tǒng)的可用性,避免業(yè)務(wù)在單點(diǎn)處理導(dǎo)致系統(tǒng)瓶頸。

栗子:
比如用戶登錄,對(duì)login主題發(fā)送消息,積分系統(tǒng)訂閱了login主題,收到login的消息后,對(duì)用戶的積分進(jìn)行處理。為了保證積分處理的高可用,可以使用相同的queue=score,啟動(dòng)多個(gè)積分處理服務(wù)。
監(jiān)控積分業(yè)務(wù)的處理時(shí)間,如果某個(gè)積分處理服務(wù),業(yè)務(wù)執(zhí)行時(shí)間過(guò)長(zhǎng)(比如由于某些/某類用戶的特殊情況,積分算法不同等),造成了消息積壓,不能及時(shí)處理。

在積分系統(tǒng)的下游仍有處理能力的時(shí)候(比如依賴下游的某個(gè)接口,此接口的處理能力依然是正常的),可以自動(dòng)啟動(dòng)多個(gè)積分處理服務(wù),訂閱主題login,queue=score,分散計(jì)算壓力。
如果是下游的處理能力受限,則可能要進(jìn)行限流處理,不但不能啟動(dòng)多個(gè)積分處理服務(wù),還要限制積分業(yè)務(wù)的處理速度。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 你是生命中最美的遇見(jiàn) 不用為了
    淋濕了空傘下的等待閱讀 254評(píng)論 0 0
  • 今天在畫這張圖的時(shí)候,有點(diǎn)震撼,也有點(diǎn)驚嘆,一生就這樣了,除了有時(shí)間真短暫的想法,更多的感覺(jué)的緊迫感,得好好做規(guī)劃...
    啊小小宋閱讀 1,681評(píng)論 3 5
  • 周衛(wèi)平焦點(diǎn)網(wǎng)絡(luò)第十期堅(jiān)持分享第85天。 今天決定把陽(yáng)臺(tái)徹底的清理一下,把不用的,臟的等等都清理清理,把灰塵和垃圾都...
    心所安處閱讀 160評(píng)論 0 0

友情鏈接更多精彩內(nèi)容