程序封裝
package rabbitmq
import (
"fmt"
"github.com/streadway/amqp"
"time"
"sync"
)
// 定義全局變量,指針類(lèi)型
var mqConn *amqp.Connection
var mqChan *amqp.Channel
// 定義生產(chǎn)者接口
type Producer interface {
MsgContent() string
}
// 定義接收者接口
type Receiver interface {
Consumer([]byte) error
}
// 定義RabbitMQ對(duì)象
type RabbitMQ struct {
connection *amqp.Connection
channel *amqp.Channel
queueName string // 隊(duì)列名稱(chēng)
routingKey string // key名稱(chēng)
exchangeName string // 交換機(jī)名稱(chēng)
exchangeType string // 交換機(jī)類(lèi)型
producerList []Producer
receiverList []Receiver
mu sync.RWMutex
}
// 定義隊(duì)列交換機(jī)對(duì)象
type QueueExchange struct {
QuName string // 隊(duì)列名稱(chēng)
RtKey string // key值
ExName string // 交換機(jī)名稱(chēng)
ExType string // 交換機(jī)類(lèi)型
}
// 鏈接rabbitMQ
func (r *RabbitMQ)mqConnect() {
var err error
RabbitUrl := fmt.Sprintf("amqp://%s:%s@%s:%d/", "guest", "guest", "******", 5673)
mqConn, err = amqp.Dial(RabbitUrl)
r.connection = mqConn // 賦值給RabbitMQ對(duì)象
if err != nil {
fmt.Printf("MQ打開(kāi)鏈接失敗:%s \n", err)
}
mqChan, err = mqConn.Channel()
r.channel = mqChan // 賦值給RabbitMQ對(duì)象
if err != nil {
fmt.Printf("MQ打開(kāi)管道失敗:%s \n", err)
}
}
// 關(guān)閉RabbitMQ連接
func (r *RabbitMQ)mqClose() {
// 先關(guān)閉管道,再關(guān)閉鏈接
err := r.channel.Close()
if err != nil {
fmt.Printf("MQ管道關(guān)閉失敗:%s \n", err)
}
err = r.connection.Close()
if err != nil {
fmt.Printf("MQ鏈接關(guān)閉失敗:%s \n", err)
}
}
// 創(chuàng)建一個(gè)新的操作對(duì)象
func New(q *QueueExchange) *RabbitMQ {
return &RabbitMQ{
queueName:q.QuName,
routingKey:q.RtKey,
exchangeName: q.ExName,
exchangeType: q.ExType,
}
}
// 啟動(dòng)RabbitMQ客戶端,并初始化
func (r *RabbitMQ) Start() {
// 開(kāi)啟監(jiān)聽(tīng)生產(chǎn)者發(fā)送任務(wù)
for _, producer := range r.producerList {
go r.listenProducer(producer)
}
// 開(kāi)啟監(jiān)聽(tīng)接收者接收任務(wù)
for _, receiver := range r.receiverList {
go r.listenReceiver(receiver)
}
time.Sleep(1*time.Second)
}
// 注冊(cè)發(fā)送指定隊(duì)列指定路由的生產(chǎn)者
func (r *RabbitMQ) RegisterProducer(producer Producer) {
r.producerList = append(r.producerList, producer)
}
// 發(fā)送任務(wù)
func (r *RabbitMQ) listenProducer(producer Producer) {
// 驗(yàn)證鏈接是否正常,否則重新鏈接
if r.channel == nil {
r.mqConnect()
}
// 用于檢查隊(duì)列是否存在,已經(jīng)存在不需要重復(fù)聲明
_, err := r.channel.QueueDeclarePassive(r.queueName, true,false,false,true,nil)
if err != nil{
// 隊(duì)列不存在,聲明隊(duì)列
// name:隊(duì)列名稱(chēng);durable:是否持久化,隊(duì)列存盤(pán),true服務(wù)重啟后信息不會(huì)丟失,影響性能;autoDelete:是否自動(dòng)刪除;noWait:是否非阻塞,
// true為是,不等待RMQ返回信息;args:參數(shù),傳nil即可;exclusive:是否設(shè)置排他
_, err = r.channel.QueueDeclare(r.queueName, true, false, false, true, nil)
if err != nil {
fmt.Printf("MQ注冊(cè)隊(duì)列失敗:%s \n", err)
return
}
}
// 隊(duì)列綁定
err = r.channel.QueueBind(r.queueName, r.routingKey, r.exchangeName, true,nil)
if err != nil {
fmt.Printf("MQ綁定隊(duì)列失敗:%s \n", err)
return
}
// 用于檢查交換機(jī)是否存在,已經(jīng)存在不需要重復(fù)聲明
err = r.channel.ExchangeDeclarePassive(r.exchangeName, r.exchangeType, true, false, false, true, nil)
if err != nil{
// 注冊(cè)交換機(jī)
// name:交換機(jī)名稱(chēng),kind:交換機(jī)類(lèi)型,durable:是否持久化,隊(duì)列存盤(pán),true服務(wù)重啟后信息不會(huì)丟失,影響性能;autoDelete:是否自動(dòng)刪除;
// noWait:是否非阻塞, true為是,不等待RMQ返回信息;args:參數(shù),傳nil即可; internal:是否為內(nèi)部
err = r.channel.ExchangeDeclare(r.exchangeName, r.exchangeType, true, false, false, true, nil)
if err != nil {
fmt.Printf("MQ注冊(cè)交換機(jī)失敗:%s \n", err)
return
}
}
// 發(fā)送任務(wù)消息
err = r.channel.Publish(r.exchangeName, r.routingKey, false, false, amqp.Publishing{
ContentType: "text/plain",
Body: []byte(producer.MsgContent()),
})
if err != nil {
fmt.Printf("MQ任務(wù)發(fā)送失敗:%s \n", err)
return
}
}
// 注冊(cè)接收指定隊(duì)列指定路由的數(shù)據(jù)接收者
func (r *RabbitMQ) RegisterReceiver(receiver Receiver) {
r.mu.Lock()
r.receiverList = append(r.receiverList, receiver)
r.mu.Unlock()
}
// 監(jiān)聽(tīng)接收者接收任務(wù)
func (r *RabbitMQ) listenReceiver(receiver Receiver) {
// 處理結(jié)束關(guān)閉鏈接
defer r.mqClose()
// 驗(yàn)證鏈接是否正常
if r.channel == nil {
r.mqConnect()
}
// 用于檢查隊(duì)列是否存在,已經(jīng)存在不需要重復(fù)聲明
_, err := r.channel.QueueDeclarePassive(r.queueName, true,false,false,true,nil)
if err != nil{
// 隊(duì)列不存在,聲明隊(duì)列
// name:隊(duì)列名稱(chēng);durable:是否持久化,隊(duì)列存盤(pán),true服務(wù)重啟后信息不會(huì)丟失,影響性能;autoDelete:是否自動(dòng)刪除;noWait:是否非阻塞,
// true為是,不等待RMQ返回信息;args:參數(shù),傳nil即可;exclusive:是否設(shè)置排他
_, err = r.channel.QueueDeclare(r.queueName, true, false, false, true, nil)
if err != nil {
fmt.Printf("MQ注冊(cè)隊(duì)列失敗:%s \n", err)
return
}
}
// 綁定任務(wù)
err = r.channel.QueueBind(r.queueName, r.routingKey, r.exchangeName, true, nil)
if err != nil {
fmt.Printf("綁定隊(duì)列失敗:%s \n", err)
return
}
// 獲取消費(fèi)通道,確保rabbitMQ一個(gè)一個(gè)發(fā)送消息
err = r.channel.Qos(1, 0, true)
msgList, err := r.channel.Consume(r.queueName, "", false, false, false, false, nil)
if err != nil {
fmt.Printf("獲取消費(fèi)通道異常:%s \n", err)
return
}
for msg := range msgList {
// 處理數(shù)據(jù)
err := receiver.Consumer(msg.Body)
if err!=nil {
err = msg.Ack(true)
if err != nil {
fmt.Printf("確認(rèn)消息未完成異常:%s \n", err)
return
}
}else {
// 確認(rèn)消息,必須為false
err = msg.Ack(false)
if err != nil {
fmt.Printf("確認(rèn)消息完成異常:%s \n", err)
return
}
return
}
}
}
使用方法
package main
import (
"fmt"
"test/rabbitmq"
)
type TestPro struct {
msgContent string
}
// 實(shí)現(xiàn)發(fā)送者
func (t *TestPro) MsgContent() string {
return t.msgContent
}
// 實(shí)現(xiàn)接收者
func (t *TestPro) Consumer(dataByte []byte) error {
fmt.Println(string(dataByte))
return nil
}
func main() {
msg := fmt.Sprintf("這是測(cè)試任務(wù)")
t := &TestPro{
msg,
}
queueExchange := &rabbitmq.QueueExchange{
"test.rabbit",
"rabbit.key",
"test.rabbit.mq",
"direct",
}
mq := rabbitmq.New(queueExchange)
mq.RegisterProducer(t)
mq.RegisterReceiver(t)
mq.Start()
}