微服務(wù)之間通過(guò)RabbitMQ通信
微服務(wù)之間是相互獨(dú)立的,不像單個(gè)工程一樣各個(gè)模塊之間可以直接通過(guò)方法調(diào)用實(shí)現(xiàn)通信,相互獨(dú)立的服務(wù)直接一般的通信方式是使用 HTTP協(xié)議、rpc協(xié)議或者使用消息中間件如RabbitMQ``Kafka等

在這篇文章 使用Golang和MongoDB構(gòu)建微服務(wù) 已經(jīng)實(shí)現(xiàn)了一個(gè)微服務(wù)的應(yīng)用,在文章中已經(jīng)實(shí)現(xiàn)了各個(gè)服務(wù)直接的通信,是使用的 HTTP的形式 ,那各個(gè)服務(wù)之間如何通過(guò) RabbitMQ進(jìn)行消息通信呢,我們現(xiàn)在要實(shí)現(xiàn)一個(gè)功能,就是一個(gè)用戶預(yù)訂電影票的接口,需要服務(wù) User Service(port 8000) 和 服務(wù) Booking Service(port 8003)之間通信,用戶預(yù)訂之后,把預(yù)訂信息寫(xiě)入到 booking的數(shù)據(jù)庫(kù)中
安裝 RabbitMQ
安裝 RabbitMQ 之前需要先安裝 Erlang 的環(huán)境 ,然后下載安裝RabbitMQ ,請(qǐng)選擇對(duì)應(yīng)的版本,安裝完成之后,RabbitMQ在Windows上是作為一個(gè)服務(wù)在后臺(tái)運(yùn)行,關(guān)于 RabbitMQ 的接口如何使用,請(qǐng)參考官網(wǎng)的 教程,有各個(gè)主流語(yǔ)言的實(shí)現(xiàn)我們使用的是Go版本,請(qǐng)下載對(duì)應(yīng)的實(shí)現(xiàn)接口 go get github.com/streadway/amqp
對(duì)RabbitMQ的接口做一下簡(jiǎn)單的封裝
- 定義一個(gè)接口
messaging/message.go
type IMessageClient interface {
ConnectToBroker(connectionStr string) error
PublishToQueue(data []byte, queueName string) error
SubscribeToQueue(queueName string, handlerFunc func(amqp.Delivery)) error
Close()
}
type MessageClient struct {
conn *amqp.Connection
}
- 連接接口
func (m *MessageClient) ConnectToBroker(connectionStr string) error {
if connectionStr == "" {
panic("the connection str mustnt be null")
}
var err error
m.conn, err = amqp.Dial(connectionStr)
return err
}
- 發(fā)布消息接口
func (m *MessageClient) PublishToQueue(body []byte, queueName string) error {
if m.conn == nil {
panic("before publish you must connect the RabbitMQ first")
}
ch, err := m.conn.Channel()
defer ch.Close()
failOnError(err, "Failed to open a channel")
q, err := ch.QueueDeclare(
queueName,
false,
false,
false,
false,
nil,
)
failOnError(err, "Failed to declare a queue")
err = ch.Publish(
"",
q.Name,
false,
false,
amqp.Publishing{
ContentType: "application/json",
Body: body,
},
)
failOnError(err, "Failed to publish a message")
return nil
}
- 訂閱消息接口
func (m *MessageClient) SubscribeToQueue(queueName string, handlerFunc func(amqp.Delivery)) error {
ch, err := m.conn.Channel()
//defer ch.Close()
failOnError(err, "Failed to open a channel")
q, err := ch.QueueDeclare(
queueName,
false,
false,
false,
false,
nil,
)
failOnError(err, "Failed to declare a queue")
msgs, err := ch.Consume(
q.Name,
"",
true,
false,
false,
false,
nil,
)
failOnError(err, "Failed to register a consumer")
go consumeLoop(msgs, handlerFunc)
return nil
}
實(shí)現(xiàn)通信
在 User Service中定義一個(gè)新的POST接口 /user/{name}/booking,實(shí)現(xiàn)用戶的預(yù)訂功能,預(yù)訂之后,通過(guò)RabbitMQ發(fā)布一個(gè)消息給
Booking Service,Booking Service接收到消息之后,做相應(yīng)的處理(寫(xiě)入數(shù)據(jù)庫(kù))
User Service
- 初始化
MessageClient
users/controllers/user.go
var client messaging.IMessageClient
func init() {
client = &messaging.MessageClient{}
err := client.ConnectToBroker("amqp://guest:guest@localhost:5672/")
if err != nil {
fmt.Println("connect to rabbitmq error", err)
}
}
- 添加新的路由和實(shí)現(xiàn)
routes.go
register("POST", "/user/{name}/booking", controllers.NewBooking, nil)
users/controllers/user.go
func NewBooking(w http.ResponseWriter, r *http.Request) {
params := mux.Vars(r)
user_name := params["name"]
defer r.Body.Close()
var bookings models.Booking
body, _ := ioutil.ReadAll(r.Body)
err := json.Unmarshal(body, &bookings)
if err != nil {
fmt.Println("the format body error ", err)
}
fmt.Println("user name:", user_name, bookings)
go notifyMsg(body)
}
- 用一個(gè)協(xié)程實(shí)現(xiàn)消息的發(fā)布
func notifyMsg(body []byte) {
err := client.PublishToQueue(body, "new_booking")
if err != nil {
fmt.Println("Failed to publis message", err)
}
}
Booking Service
- 初始化MessageClient
var client messaging.IMessageClient
func initMessage() {
client = &messaging.MessageClient{}
err := client.ConnectToBroker("amqp://guest:guest@localhost:5672/")
if err != nil {
fmt.Println("Failed to connect to RabbitMQ", err)
}
err = client.SubscribeToQueue("new_booking", getBooking)
if err != nil {
fmt.Println("Failed to comsuer the msg", err)
}
}
在 web服務(wù)之前啟動(dòng)
func main() {
initMessage()
r := routes.NewRouter()
http.ListenAndServe(":8003", r)
}
- 接收后的消息處理
func getBooking(delivery amqp.Delivery) {
var booking models.Booking
json.Unmarshal(delivery.Body, &booking)
booking.Id = bson.NewObjectId().Hex()
dao.Insert("Booking", "BookModel", booking)
fmt.Println("the booking msg", booking)
}
驗(yàn)證,需要啟動(dòng) User Service 和 Booking Service
使用 Postman 發(fā)送對(duì)應(yīng)的數(shù)據(jù)
post 127.0.0.1:8000/user/kevin_woo/booking
{
"name":"kevin_woo",
"books":[
{
"date":"20180727",
"movies":["5b4c45d49d5e3e33c4a5b97a"]
},
{
"date":"20180810",
"movies":["5b4c45ea9d5e3e33c4a5b97b"]
}
]
}
可以看到數(shù)據(jù)庫(kù)已經(jīng)有了一條新的預(yù)訂信息
說(shuō)明,我這里POST的數(shù)據(jù)就是booking數(shù)據(jù)庫(kù)中的結(jié)構(gòu),實(shí)際情況需要對(duì)數(shù)據(jù)進(jìn)行封裝處理,在POST數(shù)據(jù)時(shí),沒(méi)有對(duì)數(shù)據(jù)進(jìn)行驗(yàn)證,
在實(shí)際開(kāi)發(fā)過(guò)程中需要對(duì)各個(gè)數(shù)據(jù)做相應(yīng)的驗(yàn)證,這里主要是看一下 RabbitMQ的消息傳遞處理的過(guò)程