隊(duì)列訂閱模式
此模式中,訂閱者要指定兩個(gè)屬性,主題和隊(duì)列(queue,其實(shí)就是隊(duì)列名稱)
注意:下面所有前提=必須訂閱同一個(gè)主題
發(fā)布消息后,N個(gè)具有同樣的主題和queue的訂閱者,只有一個(gè)會(huì)收到消息。(random算法)
說(shuō)明:queue=工作組,工作組中有N個(gè)worker,發(fā)布消息后,同一個(gè)工作組中,僅有一個(gè)worker會(huì)收到消息。
相同主題,不同queue的訂閱者之間,不符合上面的描述。這種情況下,可以把同一個(gè)queue的訂閱者們,當(dāng)成一個(gè)訂閱者來(lái)處理,這樣就和普通的發(fā)布訂閱模式一樣了。
主題subj1,queue=q1的訂閱者有sub1-q1,sub2-q1,sub3-q1
主題subj1,queue=q2的訂閱者有sub1-q2,sub2-q2,sub3-q2
一個(gè)主題,兩組訂閱者,每組訂閱者中各有3個(gè)訂閱者。
對(duì)sub1發(fā)布消息,q1,q2兩個(gè)組都會(huì)收到消息(發(fā)布訂閱模式),q1,q2每個(gè)組中,分別僅有一個(gè)訂閱者會(huì)收到消息(queue模式)
server
package main
import (
"github.com/nats-io/go-nats"
"log"
"flag"
)
const (
//url = "nats://192.168.3.125:4222"
url = nats.DefaultURL
)
var (
nc *nats.Conn
err error
)
func init() {
if nc, err = nats.Connect(url); checkErr(err) {
//
}
}
func main() {
var (
servername = flag.String("servername", "y", "name for server")
queueGroup = flag.String("group", "", "group name for Subscribe")
subj = flag.String("subj", "", "subject name")
)
flag.Parse()
log.Println(*servername, *queueGroup, *subj)
startService(*subj, *servername+" worker1", *queueGroup)
startService(*subj, *servername+" worker2", *queueGroup)
startService(*subj, *servername+" worker3", *queueGroup)
select {}
}
//receive message
func startService(subj, name, queue string) {
go async(nc, subj, name, queue)
}
func async(nc *nats.Conn, subj, name, queue string) {
nc.QueueSubscribe(subj, queue, func(msg *nats.Msg) {
log.Println(name, "Received a message From Async : ", string(msg.Data))
})
}
func checkErr(err error) bool {
if err != nil {
log.Println(err)
return false
}
return true
}
client
package main
import (
"github.com/nats-io/go-nats"
"log"
"strconv"
"github.com/pborman/uuid"
"flag"
"time"
)
const (
//url = "nats://192.168.3.125:4222"
url = nats.DefaultURL
)
var (
nc *nats.Conn
err error
)
func init() {
if nc, err = nats.Connect(url); checkErr(err) {
//
}
}
func main() {
var (
subj = flag.String("subj", "", "subject name")
)
flag.Parse()
log.Println(*subj)
startClient(*subj)
time.Sleep(time.Second)
}
//send message to server
func startClient(subj string) {
for i := 0; i < 1; i++ {
id := uuid.New()
log.Println(id)
nc.Publish(subj, []byte(id+" Sun "+strconv.Itoa(i)))
nc.Publish(subj, []byte(id+" Rain "+strconv.Itoa(i)))
nc.Publish(subj, []byte(id+" Fog "+strconv.Itoa(i)))
nc.Publish(subj, []byte(id+" Cloudy "+strconv.Itoa(i)))
}
}
func checkErr(err error) bool {
if err != nil {
log.Println(err)
return false
}
return true
}
啟動(dòng)server A queue=g1,訂閱主題=weather
./main -servername=A -group=g1 -subj=weather
2018/08/18 11:32:16 A g1 weather
啟動(dòng)server B queue=g1,訂閱主題=weather
./main -servername=B -group=g1 -subj=weather
2018/08/18 11:32:21 B g1 weather
發(fā)送消息
./main -subj=weather
2018/08/18 11:32:24 weather
2018/08/18 11:32:24 3005ae7c-85ab-42d3-ad09-d44688d129ad
結(jié)果 server A收到消息
2018/08/18 11:32:24 A worker3 Received a message From Async : 3005ae7c-85ab-42d3-ad09-d44688d129ad Rain 0
2018/08/18 11:32:24 A worker2 Received a message From Async : 3005ae7c-85ab-42d3-ad09-d44688d129ad Sun 0
結(jié)果 server B收到消息
2018/08/18 11:32:24 B worker3 Received a message From Async : 3005ae7c-85ab-42d3-ad09-d44688d129ad Fog 0
2018/08/18 11:32:24 B worker3 Received a message From Async : 3005ae7c-85ab-42d3-ad09-d44688d129ad Cloudy 0
主題相同,queue不同
啟動(dòng)server c queue=test,訂閱主題=weather
> ./main -servername=C -group=test -subj=weather
2018/08/18 11:37:43 C test weather
發(fā)消息
./main -subj=weather
2018/08/18 11:37:47 weather
2018/08/18 11:37:47 b4e201dd-ea4a-4ec3-aa45-99489695f0c2
Server c 收到了全部消息
2018/08/18 11:37:47 C worker1 Received a message From Async : b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Fog 0
2018/08/18 11:37:47 C worker3 Received a message From Async : b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Sun 0
2018/08/18 11:37:47 C worker3 Received a message From Async : b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Rain 0
2018/08/18 11:37:47 C worker3 Received a message From Async : b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Cloudy 0
Server A 收到3條消息
2018/08/18 11:37:47 A worker1 Received a message From Async : b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Rain 0
2018/08/18 11:37:47 A worker3 Received a message From Async : b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Sun 0
2018/08/18 11:37:47 A worker3 Received a message From Async : b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Cloudy 0
Server B 收到1條消息
2018/08/18 11:37:47 B worker2 Received a message From Async : b4e201dd-ea4a-4ec3-aa45-99489695f0c2 Fog 0
總結(jié):queue模式,在分發(fā)消息時(shí),進(jìn)行負(fù)載均衡,隨機(jī)發(fā)送給同一組中的任意一個(gè)訂閱者,可以隨時(shí)增加刪除訂閱者,配合響應(yīng)的監(jiān)控?cái)?shù)據(jù)和統(tǒng)計(jì)數(shù)據(jù),對(duì)下游的業(yè)務(wù)進(jìn)行自動(dòng)伸縮。
提高系統(tǒng)的可用性,避免業(yè)務(wù)在單點(diǎn)處理導(dǎo)致系統(tǒng)瓶頸。
栗子:
比如用戶登錄,對(duì)login主題發(fā)送消息,積分系統(tǒng)訂閱了login主題,收到login的消息后,對(duì)用戶的積分進(jìn)行處理。為了保證積分處理的高可用,可以使用相同的queue=score,啟動(dòng)多個(gè)積分處理服務(wù)。
監(jiān)控積分業(yè)務(wù)的處理時(shí)間,如果某個(gè)積分處理服務(wù),業(yè)務(wù)執(zhí)行時(shí)間過(guò)長(zhǎng)(比如由于某些/某類用戶的特殊情況,積分算法不同等),造成了消息積壓,不能及時(shí)處理。
在積分系統(tǒng)的下游仍有處理能力的時(shí)候(比如依賴下游的某個(gè)接口,此接口的處理能力依然是正常的),可以自動(dòng)啟動(dòng)多個(gè)積分處理服務(wù),訂閱主題login,queue=score,分散計(jì)算壓力。
如果是下游的處理能力受限,則可能要進(jìn)行限流處理,不但不能啟動(dòng)多個(gè)積分處理服務(wù),還要限制積分業(yè)務(wù)的處理速度。