Go語(yǔ)言與RabbitMQ

RabbitMQ 概述

RabbitMQ是采用Erlang編程語(yǔ)言實(shí)現(xiàn)了高級(jí)消息隊(duì)列協(xié)議AMQP (Advanced Message Queuing Protocol)的開源消息代理軟件(消息隊(duì)列中間件

市面上流行的消息隊(duì)列中間件有很多種,而RabbitMQ只是其中比較流行的一種

我們簡(jiǎn)單說(shuō)說(shuō)消息隊(duì)列中間件的作用

  • 解耦
  • 削峰
  • 異步處理
  • 緩存存儲(chǔ)
  • 消息通信
  • 提高系統(tǒng)拓展性

RabbitMQ 特點(diǎn)

  1. 可靠性

    通過(guò)一些機(jī)制例如,持久化,傳輸確認(rèn)等來(lái)確保消息傳遞的可靠性

  2. 拓展性

    多個(gè)RabbitMQ節(jié)點(diǎn)可以組成集群

  3. 高可用性

    隊(duì)列可以在RabbitMQ集群中設(shè)置鏡像,如此一來(lái)即使部分節(jié)點(diǎn)掛掉了,但是隊(duì)列仍然可以使用

  4. 多種協(xié)議

    原生的支持AMQP,也能支持STOMP,MQTT等協(xié)議

  5. 豐富的客戶端

    我們常用的編程語(yǔ)言都支持RabbitMQ

  6. 管理界面

    自帶提供一個(gè)WEB管理界面

  7. 插件機(jī)制

    RabbitMQ 自己提供了很多插件,可以按需要進(jìn)行拓展 Plugins

RabbitMQ基礎(chǔ)概念

總體上看RabbitMQ是一個(gè)生產(chǎn)者和消費(fèi)者的模型, 接收,存儲(chǔ) ,轉(zhuǎn)發(fā)

RabbitMQ_model.jpg

我們看看在RabbitMQ中的幾個(gè)主要概念

  1. Producer (生產(chǎn)者) : 消息的生產(chǎn)者,投遞方

  2. Consumer (消費(fèi)者) : 消息的消費(fèi)者

  3. RabbitMQ Broker (RabbitMQ 代理) : RabbitMQ 服務(wù)節(jié)點(diǎn)(單機(jī)情況中,就是代表RabbitMQ服務(wù)器)

  4. Queue (隊(duì)列) : 在RabbitMQ中Queue是存儲(chǔ)消息數(shù)據(jù)的唯一形式

  5. Binding (綁定) : RabbitMQ中綁定(Binding)是交換機(jī)(exchange)將消息(message)路由給隊(duì)列(queue)所需遵循的規(guī)則。如果要指示交換機(jī)“E”將消息路由給隊(duì)列“Q”,那么“Q”就需要與“E”進(jìn)行綁定。綁定操作需要定義一個(gè)可選的路由鍵(routing key)屬性給某些類型的交換機(jī)。路由鍵的意義在于從發(fā)送給交換機(jī)的眾多消息中選擇出某些消息,將其路由給綁定的隊(duì)列。

  6. RoutingKey (路由鍵) : 消息投遞給交換器,通常會(huì)指定一個(gè) RoutingKey ,通過(guò)這個(gè)路由鍵來(lái)明確消息的路由規(guī)則

    RoutingKey 通常是生產(chǎn)者和消費(fèi)者有協(xié)商一致的key策略,消費(fèi)者就可以合法從生產(chǎn)者手中獲取數(shù)據(jù)。這個(gè)RoutingKey主要當(dāng)Exchange交換機(jī)模式為設(shè)定為direct和topic模式的時(shí)候使用,fanout模式不使用RoutingKey

  7. Exchange (交換機(jī)) : 生產(chǎn)者將消息發(fā)送給交換器(交換機(jī)),再由交換器將消息路由導(dǎo)對(duì)應(yīng)的隊(duì)列中

    交換機(jī)四種類型 : fanout,direct,topic,headers

    1. fanout (扇形交換機(jī)) :

      將發(fā)送到該類型交換機(jī)的消息(message)路由到所有的與該交換機(jī)綁定的隊(duì)列中,如同一個(gè)"扇"狀擴(kuò)散給各個(gè)隊(duì)列

    fanout_exchange.jpg

    fanout類型的交換機(jī)會(huì)忽略RoutingKey的存在,將message直接"廣播"到綁定的所有隊(duì)列中

  1. direct (直連交換機(jī)) :

    根據(jù)消息攜帶的路由鍵(RoutingKey) 將消息投遞到對(duì)應(yīng)的隊(duì)列中

direct_exchange.jpg

direct類型的交換機(jī)(exchange)是RabbitMQ Broker的默認(rèn)類型,它有一個(gè)特別的屬性對(duì)一些簡(jiǎn)單的應(yīng)用來(lái)說(shuō)是非常有用的,在使用這個(gè)類型的Exchange時(shí),可以不必指定routing key的名字,在此類型下創(chuàng)建的Queue有一個(gè)默認(rèn)的routing key,這個(gè)routing key一般同Queue同名.

  1. Topic (主題交換機(jī)) :

    topic類型交換機(jī)在RoutingKeyBindKey 匹配規(guī)則上更加的靈活. 同樣是將消息路由到RoutingKeyBindingKey 相匹配的隊(duì)列中,但是匹配規(guī)則有如下的特點(diǎn) :

    規(guī)則1: RoutingKey 是一個(gè)使用. 的字符串 例如: "go.log.info" , "java.log.error"

    規(guī)則2: BingingKey 也會(huì)一個(gè)使用 . 分割的字符串, 但是在 BindingKey 中可以使用兩種特殊字符 *# ,其中 "*" 用于匹配一個(gè)單詞,"#"用于匹配多規(guī)格單詞(零個(gè)或者多個(gè)單詞)

topic_exchange.jpg

RoutingKey和BindingKey 是一種"模糊匹配" ,那么一個(gè)消息Message可能 會(huì)被發(fā)送到一個(gè)或者多個(gè)隊(duì)列中
無(wú)法匹配的消息將會(huì)被丟棄或者返回者生產(chǎn)者

  1. Headers (頭交換機(jī)):

    Headers類型的交換機(jī)使用的不是很多

    關(guān)于Headers Exchange 摘取一段比較容易理解的解釋 :

    有時(shí)消息的路由操作會(huì)涉及到多個(gè)屬性,此時(shí)使用消息頭就比用路由鍵更容易表達(dá),頭交換機(jī)(headers exchange)就是為此而生的。頭交換機(jī)使用多個(gè)消息屬性來(lái)代替路由鍵建立路由規(guī)則。通過(guò)判斷消息頭的值能否與指定的綁定相匹配來(lái)確立路由規(guī)則。

    我們可以綁定一個(gè)隊(duì)列到頭交換機(jī)上,并給他們之間的綁定使用多個(gè)用于匹配的頭(header)。這個(gè)案例中,消息代理得從應(yīng)用開發(fā)者那兒取到更多一段信息,換句話說(shuō),它需要考慮某條消息(message)是需要部分匹配還是全部匹配。上邊說(shuō)的“更多一段消息”就是"x-match"參數(shù)。當(dāng)"x-match"設(shè)置為“any”時(shí),消息頭的任意一個(gè)值被匹配就可以滿足條件,而當(dāng)"x-match"設(shè)置為“all”的時(shí)候,就需要消息頭的所有值都匹配成功。

    頭交換機(jī)可以視為直連交換機(jī)的另一種表現(xiàn)形式。頭交換機(jī)能夠像直連交換機(jī)一樣工作,不同之處在于頭交換機(jī)的路由規(guī)則是建立在頭屬性值之上,而不是路由鍵。路由鍵必須是一個(gè)字符串,而頭屬性值則沒(méi)有這個(gè)約束,它們甚至可以是整數(shù)或者哈希值(字典)等。

RabbitMQ 工作流程

消息生產(chǎn)流程

  1. 消息生產(chǎn)者連與RabbitMQ Broker 建立一個(gè)連接,建立好了連接之后,開啟一個(gè)信道Channel
  2. 聲明一個(gè)交換機(jī),并設(shè)置其相關(guān)的屬性(交換機(jī)類型,持久化等)
  3. 聲明一個(gè)隊(duì)列并設(shè)置其相關(guān)屬性(排他性,持久化自動(dòng)刪除等)
  4. 通過(guò)路由鍵將交換機(jī)和隊(duì)列綁定起來(lái)
  5. 消息生產(chǎn)者發(fā)送消息給 RabbitMQ Broker , 消息中包含了路由鍵,交換機(jī)等信息,交換機(jī)根據(jù)接收的路由鍵查找匹配對(duì)應(yīng)的隊(duì)列
  6. 查找匹配成功,則將消息存儲(chǔ)到隊(duì)列中
  7. 查找匹配失敗,根據(jù)生產(chǎn)者配置的屬性選擇丟棄或者回退給生產(chǎn)者
  8. 關(guān)閉信道Channel , 關(guān)閉連接

消息消費(fèi)流程

  1. 消息消費(fèi)者連與RabbitMQ Broker 建立一個(gè)連接,建立好了連接之后,開啟一個(gè)信道Channel
  2. 消費(fèi)者向RabbitMQ Broker 請(qǐng)求消費(fèi)者相應(yīng)隊(duì)列中的消息
  3. 等待RabbitMQ Broker 回應(yīng)并投遞相應(yīng)隊(duì)列中的消息,消費(fèi)者接收消息
  4. 消費(fèi)者確認(rèn)(ack) 接收消息, RabbitMQ Broker 消除已經(jīng)確認(rèn)的消息
  5. 關(guān)閉信道Channel ,關(guān)閉連接

Golang 操作RabbitMQ

RabbitMQ 支持我們常見(jiàn)的編程語(yǔ)言,此處我們使用 Golang 來(lái)操作

Golang操作RabbitMQ的前提我們需要有個(gè)RabbitMQ的服務(wù)端,至于RabbitMQ的服務(wù)怎么搭建我們此處就不詳細(xì)描述了.

Golang操作RabbitMQ的客戶端包,網(wǎng)上已經(jīng)有一個(gè)很流行的了,而且也是RabbitMQ官網(wǎng)比較推薦的,不需要我們?cè)購(gòu)念^開始構(gòu)建一個(gè)RabbitMQ的Go語(yǔ)言客戶端包. 詳情

go get github.com/streadway/amqp

項(xiàng)目目錄

___lib
______commonFunc.go
___producer.go
___comsumer.go

commonFunc.go

package lib

import (
    "github.com/streadway/amqp"
    "log"
)
// RabbitMQ連接函數(shù)
func RabbitMQConn() (conn *amqp.Connection,err error){
    // RabbitMQ分配的用戶名稱
    var user string = "admin"
    // RabbitMQ用戶的密碼
    var pwd string = "123456"
    // RabbitMQ Broker 的ip地址
    var host string = "192.168.230.132"
    // RabbitMQ Broker 監(jiān)聽(tīng)的端口
    var port string = "5672"
    url := "amqp://"+user+":"+pwd+"@"+host+":"+port+"/"
    // 新建一個(gè)連接
    conn,err =amqp.Dial(url)
    // 返回連接和錯(cuò)誤
    return
}
// 錯(cuò)誤處理函數(shù)
func ErrorHanding(err error, msg string){
    if err != nil{
        log.Fatalf("%s: %s", msg, err)
    }
}

基礎(chǔ)隊(duì)列使用

簡(jiǎn)單隊(duì)列模式是RabbitMQ的常規(guī)用法,簡(jiǎn)單理解就是消息生產(chǎn)者發(fā)送消息給一個(gè)隊(duì)列,然后消息的消息的消費(fèi)者從隊(duì)列中讀取消息

當(dāng)多個(gè)消費(fèi)者訂閱同一個(gè)隊(duì)列的時(shí)候,隊(duì)列中的消息是平均分?jǐn)偨o多個(gè)消費(fèi)者處理

定義一個(gè)消息的生產(chǎn)者

producer.go

package main

import (
    "encoding/json"
    "log"
    "myDemo/rabbitmq_demo/lib"

    "github.com/streadway/amqp"
)
type simpleDemo struct {
    Name string `json:"name"`
    Addr string `json:"addr"`
}
func main() {
    // 連接RabbitMQ服務(wù)器
    conn, err := lib.RabbitMQConn()
    lib.ErrorHanding(err, "Failed to connect to RabbitMQ")
    // 關(guān)閉連接
    defer conn.Close()
    // 新建一個(gè)通道
    ch, err := conn.Channel()
    lib.ErrorHanding(err, "Failed to open a channel")
    // 關(guān)閉通道
    defer ch.Close()
    // 聲明或者創(chuàng)建一個(gè)隊(duì)列用來(lái)保存消息
    q, err := ch.QueueDeclare(
        // 隊(duì)列名稱
        "simple:queue", // name
        false,   // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    lib.ErrorHanding(err, "Failed to declare a queue")
    data := simpleDemo{
        Name: "Tom",
        Addr: "Beijing",
    }
    dataBytes,err := json.Marshal(data)
    if err != nil{
        lib.ErrorHanding(err,"struct to json failed")
    }
    err = ch.Publish(
        "",     // exchange
        q.Name, // routing key
        false,  // mandatory
        false,  // immediate
        amqp.Publishing{
            ContentType: "text/plain",
            Body:        dataBytes,
        })
    log.Printf(" [x] Sent %s", dataBytes)
    lib.ErrorHanding(err, "Failed to publish a message")
}

定義一個(gè)消息的消費(fèi)者

comsumer.go

package main

import (
    "log"
    "myDemo/rabbitmq_demo/lib"
)

func main() {
    conn, err := lib.RabbitMQConn()
    lib.ErrorHanding(err,"failed to connect to RabbitMQ")
    defer conn.Close()
    ch, err := conn.Channel()
    lib.ErrorHanding(err,"failed to open a channel")
    defer ch.Close()
    q, err := ch.QueueDeclare(
        "simple:queue", // name
        false,   // durable
        false,   // delete when unused
        false,   // exclusive
        false,   // no-wait
        nil,     // arguments
    )
    lib.ErrorHanding(err,"Failed to declare a queue")
    // 定義一個(gè)消費(fèi)者
    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        true,   // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    lib.ErrorHanding(err,"Failed to register a consume")
    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    select {}
}

工作隊(duì)列

工作隊(duì)列也稱為 任務(wù)隊(duì)列 任務(wù)隊(duì)列是為了避免等待執(zhí)行一些耗時(shí)的任務(wù),而是將需要執(zhí)行的任務(wù)封裝為消息發(fā)送給工作隊(duì)列,后臺(tái)運(yùn)行的工作進(jìn)程將任務(wù)消息取出來(lái)并執(zhí)行相關(guān)任務(wù) , 多個(gè)后臺(tái)工作進(jìn)程同時(shí)間進(jìn)行,那么任務(wù)在他們之間共享

work-queue.png

我們定義一個(gè)任務(wù)的生產(chǎn)者,用于生產(chǎn)任務(wù)消息

task.go

package main

import (
    "github.com/streadway/amqp"
    "log"
    "myDemo/rabbitmq_demo/lib"
    "os"
    "strings"
)

func bodyFrom(args []string) string {
    var s string
    if (len(args) < 2) || os.Args[1] == "" {
        s = "no task"
    } else {
        s = strings.Join(args[1:], " ")
    }
    return s
}
func main() {
    // 連接RabbitMQ服務(wù)器
    conn, err := lib.RabbitMQConn()
    lib.ErrorHanding(err, "Failed to connect to RabbitMQ")
    // 關(guān)閉連接
    defer conn.Close()
    // 新建一個(gè)通道
    ch, err := conn.Channel()
    lib.ErrorHanding(err, "Failed to open a channel")
    // 關(guān)閉通道
    defer ch.Close()
    // 聲明或者創(chuàng)建一個(gè)隊(duì)列用來(lái)保存消息
    q, err := ch.QueueDeclare(
        // 隊(duì)列名稱
        "task:queue", // name
        false,          // durable
        false,          // delete when unused
        false,          // exclusive
        false,          // no-wait
        nil,            // arguments
    )
    lib.ErrorHanding(err, "Failed to declare a queue")
    body := bodyFrom(os.Args)
    err = ch.Publish(
        "",
        q.Name,
        false,
        false,
        amqp.Publishing{
            ContentType: "text/plain",
            // 將消息標(biāo)記為持久消息
            DeliveryMode: amqp.Persistent,
            Body:         []byte(body),
        })
    lib.ErrorHanding(err, "Failed to publish a message")
    log.Printf("sent %s", body)
}

定義一個(gè)工作者,用于消費(fèi)掉任務(wù)消息

worker.go

package main

import (
    "log"
    "myDemo/rabbitmq_demo/lib"
)

func main() {
    conn, err := lib.RabbitMQConn()
    lib.ErrorHanding(err, "Failed to connect to RabbitMQ")
    defer conn.Close()

    ch, err := conn.Channel()
    lib.ErrorHanding(err, "Failed to open a channel")
    defer ch.Close()

    q, err := ch.QueueDeclare(
        "task:queue", // name
        false,         // durable
        false,        // delete when unused
        false,        // exclusive
        false,        // no-wait
        nil,          // arguments
    )
    lib.ErrorHanding(err, "Failed to declare a queue")
    // 將預(yù)取計(jì)數(shù)器設(shè)置為1
    // 在并行處理中將消息分配給不同的工作進(jìn)程
    err = ch.Qos(
        1,     // prefetch count
        0,     // prefetch size
        false, // global
    )
    lib.ErrorHanding(err, "Failed to set QoS")

    msgs, err := ch.Consume(
        q.Name, // queue
        "",     // consumer
        false,  // auto-ack
        false,  // exclusive
        false,  // no-local
        false,  // no-wait
        nil,    // args
    )
    lib.ErrorHanding(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        for d := range msgs {
            log.Printf("Received a message: %s", d.Body)
            log.Printf("Done")
            d.Ack(false)
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever
}

測(cè)試

#shell1
go run task.go
#shell2
go run worker.go
#shell3
go run worker.go

RabbitMQ 的用法很多,詳情參看官網(wǎng)文檔

參考資料

https://www.rabbitmq.com/getstarted.html
http://rabbitmq.mr-ping.com/
https://github.com/streadway/amqp
https://blog.csdn.net/u013256816/category_6532725.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容