微服務(wù)之間通過(guò)RabbitMQ通信

微服務(wù)之間通過(guò)RabbitMQ通信

微服務(wù)之間是相互獨(dú)立的,不像單個(gè)工程一樣各個(gè)模塊之間可以直接通過(guò)方法調(diào)用實(shí)現(xiàn)通信,相互獨(dú)立的服務(wù)直接一般的通信方式是使用 HTTP協(xié)議、rpc協(xié)議或者使用消息中間件如RabbitMQ``Kafka

image

在這篇文章 使用Golang和MongoDB構(gòu)建微服務(wù) 已經(jīng)實(shí)現(xiàn)了一個(gè)微服務(wù)的應(yīng)用,在文章中已經(jīng)實(shí)現(xiàn)了各個(gè)服務(wù)直接的通信,是使用的 HTTP的形式 ,那各個(gè)服務(wù)之間如何通過(guò) RabbitMQ進(jìn)行消息通信呢,我們現(xiàn)在要實(shí)現(xiàn)一個(gè)功能,就是一個(gè)用戶預(yù)訂電影票的接口,需要服務(wù) User Service(port 8000) 和 服務(wù) Booking Service(port 8003)之間通信,用戶預(yù)訂之后,把預(yù)訂信息寫(xiě)入到 booking的數(shù)據(jù)庫(kù)中

安裝 RabbitMQ

安裝 RabbitMQ 之前需要先安裝 Erlang 的環(huán)境 ,然后下載安裝RabbitMQ ,請(qǐng)選擇對(duì)應(yīng)的版本,安裝完成之后,RabbitMQ在Windows上是作為一個(gè)服務(wù)在后臺(tái)運(yùn)行,關(guān)于 RabbitMQ 的接口如何使用,請(qǐng)參考官網(wǎng)的 教程,有各個(gè)主流語(yǔ)言的實(shí)現(xiàn)我們使用的是Go版本,請(qǐng)下載對(duì)應(yīng)的實(shí)現(xiàn)接口 go get github.com/streadway/amqp

對(duì)RabbitMQ的接口做一下簡(jiǎn)單的封裝

  • 定義一個(gè)接口

messaging/message.go

type IMessageClient interface {
    ConnectToBroker(connectionStr string) error
    PublishToQueue(data []byte, queueName string) error
    SubscribeToQueue(queueName string, handlerFunc func(amqp.Delivery)) error
    Close()
}

type MessageClient struct {
    conn *amqp.Connection
}
  • 連接接口
func (m *MessageClient) ConnectToBroker(connectionStr string) error {
    if connectionStr == "" {
        panic("the connection str mustnt be null")
    }
    var err error
    m.conn, err = amqp.Dial(connectionStr)
    return err
}
  • 發(fā)布消息接口
func (m *MessageClient) PublishToQueue(body []byte, queueName string) error {
    if m.conn == nil {
        panic("before publish you must connect the RabbitMQ first")
    }

    ch, err := m.conn.Channel()
    defer ch.Close()
    failOnError(err, "Failed to open a channel")

    q, err := ch.QueueDeclare(
        queueName,
        false,
        false,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to declare a queue")

    err = ch.Publish(
        "",
        q.Name,
        false,
        false,
        amqp.Publishing{
            ContentType: "application/json",
            Body:        body,
        },
    )
    failOnError(err, "Failed to publish a message")

    return nil
}
  • 訂閱消息接口
func (m *MessageClient) SubscribeToQueue(queueName string, handlerFunc func(amqp.Delivery)) error {
    ch, err := m.conn.Channel()
    //defer ch.Close()
    failOnError(err, "Failed to open a channel")

    q, err := ch.QueueDeclare(
        queueName,
        false,
        false,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to declare a queue")

    msgs, err := ch.Consume(
        q.Name,
        "",
        true,
        false,
        false,
        false,
        nil,
    )

    failOnError(err, "Failed to register a consumer")
    go consumeLoop(msgs, handlerFunc)
    return nil
}

實(shí)現(xiàn)通信

User Service中定義一個(gè)新的POST接口 /user/{name}/booking,實(shí)現(xiàn)用戶的預(yù)訂功能,預(yù)訂之后,通過(guò)RabbitMQ發(fā)布一個(gè)消息給
Booking Service,Booking Service接收到消息之后,做相應(yīng)的處理(寫(xiě)入數(shù)據(jù)庫(kù))

User Service

  • 初始化 MessageClient

users/controllers/user.go

var client messaging.IMessageClient

func init() {
    client = &messaging.MessageClient{}
    err := client.ConnectToBroker("amqp://guest:guest@localhost:5672/")
    if err != nil {
        fmt.Println("connect to rabbitmq error", err)
    }
}
  • 添加新的路由和實(shí)現(xiàn)

routes.go

register("POST", "/user/{name}/booking", controllers.NewBooking, nil)

users/controllers/user.go

func NewBooking(w http.ResponseWriter, r *http.Request) {
    params := mux.Vars(r)
    user_name := params["name"]
    defer r.Body.Close()

    var bookings models.Booking
    body, _ := ioutil.ReadAll(r.Body)
    err := json.Unmarshal(body, &bookings)
    if err != nil {
        fmt.Println("the format body error ", err)
    }
    fmt.Println("user name:", user_name, bookings)
    go notifyMsg(body)
}
  • 用一個(gè)協(xié)程實(shí)現(xiàn)消息的發(fā)布
func notifyMsg(body []byte) {
    err := client.PublishToQueue(body, "new_booking")
    if err != nil {
        fmt.Println("Failed to publis message", err)
    }
}

Booking Service

  • 初始化MessageClient
var client messaging.IMessageClient

func initMessage() {
    client = &messaging.MessageClient{}
    err := client.ConnectToBroker("amqp://guest:guest@localhost:5672/")
    if err != nil {
        fmt.Println("Failed to connect to RabbitMQ", err)
    }

    err = client.SubscribeToQueue("new_booking", getBooking)
    if err != nil {
        fmt.Println("Failed to comsuer the msg", err)
    }
}

在 web服務(wù)之前啟動(dòng)

func main() {

    initMessage()

    r := routes.NewRouter()
    http.ListenAndServe(":8003", r)

}
  • 接收后的消息處理
func getBooking(delivery amqp.Delivery) {

  var booking models.Booking
    json.Unmarshal(delivery.Body, &booking)
  booking.Id = bson.NewObjectId().Hex()
    dao.Insert("Booking", "BookModel", booking)
    fmt.Println("the booking msg", booking)
}

驗(yàn)證,需要啟動(dòng) User ServiceBooking Service
使用 Postman 發(fā)送對(duì)應(yīng)的數(shù)據(jù)

post 127.0.0.1:8000/user/kevin_woo/booking

{
    "name":"kevin_woo",
    "books":[
        {
            "date":"20180727",
            "movies":["5b4c45d49d5e3e33c4a5b97a"]
        },
        {
            "date":"20180810",
            "movies":["5b4c45ea9d5e3e33c4a5b97b"]
        }
    ]
}

可以看到數(shù)據(jù)庫(kù)已經(jīng)有了一條新的預(yù)訂信息

說(shuō)明,我這里POST的數(shù)據(jù)就是booking數(shù)據(jù)庫(kù)中的結(jié)構(gòu),實(shí)際情況需要對(duì)數(shù)據(jù)進(jìn)行封裝處理,在POST數(shù)據(jù)時(shí),沒(méi)有對(duì)數(shù)據(jù)進(jìn)行驗(yàn)證,
在實(shí)際開(kāi)發(fā)過(guò)程中需要對(duì)各個(gè)數(shù)據(jù)做相應(yīng)的驗(yàn)證,這里主要是看一下 RabbitMQ的消息傳遞處理的過(guò)程

源碼 Github

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容