golang nats[4] request reply模式

請(qǐng)求響應(yīng)模式

無(wú)論是發(fā)布訂閱模式還是queue模式,nats都不能保證消息一定發(fā)送到訂閱方,除非訂閱者發(fā)送一個(gè)響應(yīng)給發(fā)布者。
所以訂閱者發(fā)送一個(gè)回執(zhí)給發(fā)布者,就是請(qǐng)求響應(yīng)模式。

這種模式有什么用?

nats要求訂閱者一定要先完成訂閱,發(fā)布消息后,訂閱者才能收到消息,類似離線消息的模式nats不支持。就算先完成訂閱,后發(fā)送消息,消息發(fā)送方也不知道是否有訂閱者收到了消息,請(qǐng)求響應(yīng)模式就是應(yīng)對(duì)這種情況。

基本流程

A發(fā)送消息,B收到消息,發(fā)送回執(zhí)給A。這就是request reply的基本流程。

基本實(shí)現(xiàn)原理

  • A啟用request模式發(fā)送消息(消息中包含了回執(zhí)信息,replya主題),同步等待回執(zhí)(有超時(shí)時(shí)間)。
  • B收到消息,在消息中取出回執(zhí)信息=replay主題,對(duì)replay主題,主動(dòng)發(fā)送普通消息(消息內(nèi)容可自定義,比如server A上的service1收到msgid=xxxx的消息。)。
  • A在超時(shí)內(nèi)收到消息,確認(rèn)結(jié)束。
  • A在超時(shí)內(nèi)未收到消息,超時(shí)結(jié)束。

注意

  • 因?yàn)锳發(fā)送的消息中包裝了回執(zhí)測(cè)相關(guān)信息,訂閱者B收到消息后,也要主動(dòng)發(fā)送回執(zhí),所以請(qǐng)求響應(yīng)模式,對(duì)雙方都有影響。
  • A發(fā)送消息后,等待B的回執(zhí),需要給A設(shè)置超時(shí)時(shí)間,超時(shí)后,不在等待回執(zhí),直接結(jié)束,效果和不需要回執(zhí)的消息發(fā)送一樣,不在關(guān)心是否有訂閱者收到消息。

兩種模式

request reply有兩種模式:

  • one to one 默認(rèn)模式

1條消息,N個(gè)訂閱者,消息發(fā)送方,僅會(huì)收到一條回執(zhí)記錄(因?yàn)橄l(fā)送方收到回執(zhí)消息后,就自動(dòng)斷開了對(duì)回執(zhí)消息的訂閱。),即使N個(gè)訂閱都都收到了消息。注意:pub/sub和queue模式的不同

  • one to many 非默認(rèn)模式,需要自己實(shí)現(xiàn)

1條消息,N個(gè)訂閱者,消息發(fā)送方,可以自己設(shè)定一個(gè)數(shù)量限制N,接受到N個(gè)回執(zhí)消息后,斷開對(duì)回執(zhí)消息的訂閱。

Server

package main

import (
    "github.com/nats-io/go-nats"
    "log"
    "flag"
)

const (
    //url   = "nats://192.168.3.125:4222"
    url = nats.DefaultURL
)

var (
    nc *nats.Conn

    encodeConn *nats.EncodedConn
    err        error
)

func init() {
    if nc, err = nats.Connect(url); checkErr(err) {
        //
        if encodeConn, err = nats.NewEncodedConn(nc, nats.JSON_ENCODER);
            checkErr(err) {
        }
    }
}

func main() {
    var (
        servername = flag.String("servername", "Y", "name for server")
        queueGroup = flag.String("group", "", "group name for Subscribe")
        subj       = flag.String("subj", "yasenagat", "subject name")
    )
    flag.Parse()

    mode := "queue"
    if *queueGroup == "" {
        mode = "pub/sub"
    }
    log.Printf("Server[%v] Subscribe Subject[%v] in [%v]Mode", *servername, *subj, mode)

    startService(*subj, *servername+" worker1", *queueGroup)
    startService(*subj, *servername+" worker2", *queueGroup)
    startService(*subj, *servername+" worker3", *queueGroup)

    nc.Flush()
    select {}
}

//receive message
func startService(subj, name, queue string) {
    go async(nc, subj, name, queue)
}

func async(nc *nats.Conn, subj, name, queue string) {
    replyMsg := name + " Received a msg"
    if queue == "" {
        nc.Subscribe(subj, func(msg *nats.Msg) {
            nc.Publish(msg.Reply, []byte(replyMsg))
            log.Println(name, "Received a message From Async : ", string(msg.Data))
        })
    } else {
        nc.QueueSubscribe(subj, queue, func(msg *nats.Msg) {
            nc.Publish(msg.Reply, []byte(replyMsg))
            log.Println(name, "Received a message From Async : ", string(msg.Data))
        })
    }

}

func checkErr(err error) bool {
    if err != nil {
        log.Println(err)
        return false
    }
    return true
}

Client

package main

import (
    "github.com/nats-io/go-nats"
    "log"
    "github.com/pborman/uuid"
    "flag"
    "time"
)

const (
    //url   = "nats://192.168.3.125:4222"
    url = nats.DefaultURL
)

var (
    nc         *nats.Conn
    encodeConn *nats.EncodedConn
    err        error
)

func init() {
    if nc, err = nats.Connect(url); checkErr(err, func() {

    }) {
        //
        if encodeConn, err = nats.NewEncodedConn(nc, nats.JSON_ENCODER);
            checkErr(err, func() {

            }) {

        }
    }
}

func main() {
    var (
        subj = flag.String("subj", "yasenagat", "subject name")
    )
    flag.Parse()
    log.Println(*subj)
    startClient(*subj)

    time.Sleep(time.Second)
}

//send message to server
func startClient(subj string) {
    for i := 0; i < 3; i++ {
        id := uuid.New()
        log.Println(id)
        if msg, err := nc.Request(subj, []byte(id+" hello"), time.Second); checkErr(err, func() {
            // handle err
        }) {
            log.Println(string(msg.Data))
        }
    }
}

func checkErr(err error, errFun func()) bool {
    if err != nil {
        log.Println(err)
        errFun()
        return false
    }
    return true
}

pub/sub模式啟動(dòng)

$ ./main
2018/08/18 18:54:10 Server[Y] Subscribe Subject[yasenagat] in [pub/sub]Mode
2018/08/18 18:54:26 Y worker2 Received a message From Async :  b035d7c2-e7e9-4337-bb8a-a23ec85fc31a hello
2018/08/18 18:54:26 Y worker1 Received a message From Async :  b035d7c2-e7e9-4337-bb8a-a23ec85fc31a hello
2018/08/18 18:54:26 Y worker3 Received a message From Async :  b035d7c2-e7e9-4337-bb8a-a23ec85fc31a hello
2018/08/18 18:54:26 Y worker2 Received a message From Async :  2d8dfe75-8fee-4b4c-8599-1824638dfa8c hello
2018/08/18 18:54:26 Y worker1 Received a message From Async :  2d8dfe75-8fee-4b4c-8599-1824638dfa8c hello
2018/08/18 18:54:26 Y worker3 Received a message From Async :  2d8dfe75-8fee-4b4c-8599-1824638dfa8c hello
2018/08/18 18:54:26 Y worker2 Received a message From Async :  fe9f773a-129b-4919-9bc4-c8a4571fef6e hello
2018/08/18 18:54:26 Y worker1 Received a message From Async :  fe9f773a-129b-4919-9bc4-c8a4571fef6e hello
2018/08/18 18:54:26 Y worker3 Received a message From Async :  fe9f773a-129b-4919-9bc4-c8a4571fef6e hello

發(fā)送消息

$ ./main
2018/08/18 18:54:26 yasenagat
2018/08/18 18:54:26 b035d7c2-e7e9-4337-bb8a-a23ec85fc31a
2018/08/18 18:54:26 Y worker3 Received a msg
2018/08/18 18:54:26 2d8dfe75-8fee-4b4c-8599-1824638dfa8c
2018/08/18 18:54:26 Y worker2 Received a msg
2018/08/18 18:54:26 fe9f773a-129b-4919-9bc4-c8a4571fef6e
2018/08/18 18:54:26 Y worker2 Received a msg

queue模式啟動(dòng)

$ ./main -group=test
2018/08/18 19:14:31 Server[Y] Subscribe Subject[yasenagat] in [queue]Mode
2018/08/18 19:14:33 Y worker2 Received a message From Async :  4ecf2728-b3a7-4181-893a-aefde3bc8d2e hello Y worker2 Received a msg
2018/08/18 19:14:33 Y worker3 Received a message From Async :  4e7f1363-9a47-4705-b87a-4aaeb80164f0 hello Y worker3 Received a msg
2018/08/18 19:14:33 Y worker2 Received a message From Async :  38b1f74b-8a3b-46ba-a10e-62e50efbc127 hello Y worker2 Received a msg

發(fā)送消息

$ ./main
2018/08/18 19:14:33 yasenagat
2018/08/18 19:14:33 4ecf2728-b3a7-4181-893a-aefde3bc8d2e
2018/08/18 19:14:33 Y worker2 Received a msg
2018/08/18 19:14:33 4e7f1363-9a47-4705-b87a-4aaeb80164f0
2018/08/18 19:14:33 Y worker3 Received a msg
2018/08/18 19:14:33 38b1f74b-8a3b-46ba-a10e-62e50efbc127
2018/08/18 19:14:33 Y worker2 Received a msg

queue模式下,發(fā)送3條消息,3個(gè)訂閱者有相同的queue,每條消息只有一個(gè)訂閱者收到。

pub/sub模式下,發(fā)送3條消息,3個(gè)訂閱者都收到3條消息,一共9條。

總結(jié):

回執(zhí)主要解決:訂閱者是否收到消息的問(wèn)題、有多少個(gè)訂閱者收到消息的問(wèn)題。(不是具體業(yè)務(wù)是否執(zhí)行完成的回執(zhí)!)
基于事件的架構(gòu)模式可以構(gòu)建于消息機(jī)制之上,依賴消息機(jī)制。異步調(diào)用的其中一種實(shí)現(xiàn)方式,就是基于事件模式。異步調(diào)用又是分布式系統(tǒng)中常見(jiàn)的任務(wù)處理方式。

業(yè)務(wù)模式

  • 業(yè)務(wù)A發(fā)送eventA給事件中心,等待回執(zhí)
  • 事件中心告知A收到了消息,開始對(duì)外發(fā)送廣播
  • 訂閱者B訂閱了eventA主題
  • 事件中心對(duì)eventA主題發(fā)送廣播,等待回執(zhí)
  • B收到消息,告知事件中心,收到eventA,開始執(zhí)行任務(wù)taskA
  • B異步執(zhí)行完taskA,通知事件中心taskAComplete,等待回執(zhí)
  • 事件中心發(fā)送回執(zhí)給B,對(duì)外發(fā)送廣播,taskAComplete
  • ........

如果超時(shí),未能收到回執(zhí),需要回執(zhí)信息的確認(rèn)方可以主動(dòng)調(diào)用相關(guān)接口,查詢?nèi)蝿?wù)執(zhí)行狀態(tài),根據(jù)任務(wù)狀態(tài)做后續(xù)的處理。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 點(diǎn)擊查看原文 Web SDK 開發(fā)手冊(cè) SDK 概述 網(wǎng)易云信 SDK 為 Web 應(yīng)用提供一個(gè)完善的 IM 系統(tǒng)...
    layjoy閱讀 14,391評(píng)論 0 15
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,694評(píng)論 19 139
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 16,211評(píng)論 2 11
  • 今天早晨,處在霧霾中的北京朋友發(fā)來(lái)一張照片: 是褚時(shí)健的橙子,於是我們有了以下的對(duì)話: "褚橙啊,我還沒(méi)吃過(guò),好吃...
    MZ_梅枝閱讀 464評(píng)論 0 0
  • ——舍與得,珍惜現(xiàn)在擁有 電影《人在囧途泰囧》有三位主...
    行走在學(xué)習(xí)的路上閱讀 1,073評(píng)論 4 51

友情鏈接更多精彩內(nèi)容