RabbitMQ 使用教程

Rabbit 簡介

RabbitMQ 是一個(gè)有 Erlang 語言開發(fā)的 AMQP 的開源實(shí)現(xiàn)。
AMQP:Advanced Message Queue 高級消息隊(duì)列協(xié)議。它是應(yīng)用層協(xié)議的一個(gè)開發(fā)標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì),基于此協(xié)議的客戶端與消息中間件可傳遞消息,并不受產(chǎn)品,開發(fā)語言等條件限制。

隊(duì)列(Queue)

隊(duì)列是常用的數(shù)據(jù)結(jié)構(gòu)之一,是一種特殊的線性表,特殊之外在與它只允許在表的前端(front)進(jìn)行刪除操作,而在表的后端(rear)進(jìn)行插入操作。進(jìn)行插入操作的端稱為隊(duì)尾,進(jìn)行刪除操作的端稱為隊(duì)頭。

消息隊(duì)列(Message Queue)

消息:計(jì)算機(jī) / 應(yīng)用 間傳送的數(shù)據(jù)單位,可以非常簡單,例如只包含文件字符串,也可以很復(fù)雜,可以包含嵌入對象。
消息隊(duì)列:在消息的傳輸過程中保存消息的容器。
消息傳輸時(shí),先發(fā)送到隊(duì)列,隊(duì)列的主要目的是提供路由并保證消息的傳遞,如果發(fā)送消息時(shí)接收者不可用,消息隊(duì)列會(huì)保留消息,直到可以成功的傳遞它。
可以把消息隊(duì)列理解成快遞公司,你需要寄一個(gè)物件(消息)給你的朋友,快遞公司收到物件會(huì)保證物件送的你的朋友手中,可能存在多次寄送才能送達(dá)成功的情況,比如第一次送過去,但你的朋友不在家。
消息隊(duì)里中間件是分布式系統(tǒng)中重要的組件
解決 應(yīng)用耦合,異步消息,流量削峰 等問題
實(shí)現(xiàn) 高性能,高可用,可伸縮最終一致性

RabbitMQ 概念

6c7db1f271da4f01b2f9033b05afeb3d.jpg
  1. Broke:又稱 server,它提供一種傳輸服務(wù),它的角色就是維護(hù)一條從生產(chǎn)者到消費(fèi)者的路線,保證數(shù)據(jù)能按照指定的方式進(jìn)行傳輸。
  2. Exchange:消息交換機(jī),它指定消息按什么規(guī)則,路由到哪個(gè)隊(duì)列。
  3. Queue:消息的載體,每個(gè)消息都會(huì)被投到一個(gè)或多個(gè)隊(duì)列。
  4. Binding:綁定,它的作用就是把 exchange 和 queue 按照路由規(guī)則綁定起來。
  5. Routing Key:路由關(guān)鍵字,exchange 根據(jù)這個(gè)關(guān)關(guān)鍵字進(jìn)行消息投遞。
  6. vhost:虛擬機(jī),一個(gè) broker 里面可以有多個(gè) vhost ,用作不同用戶的權(quán)限分離。
  7. Producer:消息生產(chǎn)者,就是投遞消息的程序。
  8. Consumer:消息消費(fèi)者,就是接受消息的程序。
  9. Channel:消息通道,在客戶端的每個(gè)連接里,可以建立多個(gè) channel 。幾乎所有的操作都在 channel 中完成。

支持的消息類型

adb765c1fdae43d7a4db9fa3c3bc2f3d.png
1、簡單模式 Simple
bc3ab931857f465a9efff3b87a255254.png

P (produce / publisher):生產(chǎn)者,一個(gè)發(fā)送消息的用戶應(yīng)用程序。
C (consumer):消費(fèi)者,消費(fèi)和接收的意思,消費(fèi)者是一個(gè)主要用來等待接收消息的用戶應(yīng)用程序。
隊(duì)列(紅色區(qū)域):rabbitmq 內(nèi)部類似于郵箱的一個(gè)概念。雖然消息流經(jīng) rabbitmq 和你的應(yīng)用程序,但是它們只能存儲(chǔ)在隊(duì)里中。隊(duì)列只受主機(jī)的內(nèi)存和磁盤限制,實(shí)質(zhì)上是一個(gè)大的消息緩沖區(qū)。許多生產(chǎn)者可以發(fā)送消息多一個(gè)隊(duì)列,許多消費(fèi)者可以嘗試從一個(gè)隊(duì)列接收數(shù)據(jù)。

總之:生產(chǎn)者將消息發(fā)送到隊(duì)里,消費(fèi)者從隊(duì)列中獲取消息,隊(duì)列是存儲(chǔ)消息的緩沖區(qū)。

1、工作模式 Work

工作模式又稱競爭消費(fèi)者模式
主要思想就是避免執(zhí)行資源密集型認(rèn)識(shí)時(shí),必須等待它執(zhí)行完成。相反我們需要稍后完成任務(wù),我們將任務(wù)封裝為消息并將其發(fā)送到隊(duì)列。在后臺(tái)運(yùn)行的工作進(jìn)程獲取任務(wù)并最終執(zhí)行作業(yè)。當(dāng)你運(yùn)行許多消費(fèi)者時(shí),任務(wù)將在他們之間共享,但是一個(gè)消息只能被一個(gè)消費(fèi)者獲取。

5b910530353b440a8e9dd1b89b05803c.png

P:生產(chǎn)者,任務(wù)的發(fā)布者
C1:消費(fèi)者1
C2:消費(fèi)者2

如何避免消息堆積?

  1. 采用 workqueue,多個(gè)消費(fèi)者監(jiān)聽同一隊(duì)列。
  2. 接收到消息后,通過線程池,異步消費(fèi)。
3、發(fā)布訂閱模式(廣播機(jī)制)(扇型交換機(jī))

1個(gè)生產(chǎn)者,多個(gè)消費(fèi)者,每一個(gè)消費(fèi)者都有自己的隊(duì)列,生產(chǎn)者沒有將消息直接發(fā)送到隊(duì)列,而是發(fā)送到了交換機(jī),每個(gè)隊(duì)列都要綁定到交換機(jī),生產(chǎn)者發(fā)送的消息,經(jīng)過交換機(jī)到達(dá)隊(duì)列,實(shí)現(xiàn)一個(gè)消息被多個(gè)消費(fèi)者獲取的目的。(如果沒有任何隊(duì)列與 Exchange 綁定,或者沒有符合路由規(guī)則的隊(duì)列,那么消息會(huì)丟失!

b2cb58d7654d474fadf58d916d2ebcc6.png

交換機(jī)(Exchange)【藍(lán)色圓圈x】

一方面接收生產(chǎn)者發(fā)送的消息。另一方面知道如何處理消息,如遞交個(gè)某個(gè)特別隊(duì)列,遞交給所有隊(duì)列,還是將消息丟棄。到底如何操作,取決于交換機(jī)的類型。Exchange 只負(fù)責(zé)轉(zhuǎn)發(fā)消息,不具備存儲(chǔ)消息的能力。

Exchange 類型有以下三種:

1、Direct Exchange(直連型交換機(jī)):
根據(jù)消息攜帶的路由鍵將消息投遞給對應(yīng)隊(duì)列。
大致流程為:有一個(gè)隊(duì)列綁定到一個(gè)直連交換機(jī)上,同時(shí)賦予一個(gè)路由鍵 key。然后當(dāng)一個(gè)消息攜帶著路由鍵 key,這個(gè)消息通過生產(chǎn)者發(fā)送給交換機(jī)時(shí),交換機(jī)就會(huì)根據(jù)這個(gè)路由值 key 去尋找綁定值也是 key 的隊(duì)列。
2、Fanout Exchange (扇型交換機(jī))
這個(gè)交換機(jī)沒有路由鍵概念,就算你綁定了路由鍵也是無效的。這個(gè)交換機(jī)在接收到消息后,會(huì)直接轉(zhuǎn)發(fā)到綁定到它上面的所有隊(duì)列。
3、Topic Exchange (主題交換機(jī))
這個(gè)交換機(jī)其實(shí)跟直連交換機(jī)流程差不多,但是它的特點(diǎn)就是在它的路由鍵和綁定鍵之間有規(guī)則

(星號(hào) *)用來表示一個(gè)單詞(必須出現(xiàn)一個(gè)單詞)
(井號(hào) #)用來表示任意數(shù)量的單詞

舉例:
隊(duì)列Q1 綁定鍵為 .TT.
隊(duì)列Q2 綁定鍵為 TT.#
如果一條消息攜帶的路由鍵為 A.TT.B,那么隊(duì)列Q1將會(huì)收到
如果一條消息攜帶的路由鍵為TT.AA.BB,那么隊(duì)列Q2將會(huì)收到

當(dāng)一個(gè)隊(duì)列的綁定鍵為 "#" 的時(shí)候,這個(gè)隊(duì)列將會(huì)無視消息的路由鍵,接收所有的消息。
當(dāng)"*" 和 "#" 這兩個(gè)特殊字符都未在綁定鍵中出現(xiàn)的時(shí)候,此時(shí)主題交換機(jī)就擁有直連交換機(jī)的行為。
所以主題交換機(jī)也就實(shí)現(xiàn)了扇型交換機(jī)的功能,和直連交換機(jī)的功能。
除了以上常用的交換機(jī),還有 Header Exchange(頭交換機(jī)),Default Exchange(默認(rèn)交換機(jī))和 (Dead Letter Exchange)(死信交換機(jī))

4、路由模式(直連交換機(jī))

在廣播模式中,生產(chǎn)者發(fā)布消息,所有消費(fèi)者都可以獲取所有消息。
在某些場景下,我們希望不同的消息被不同的隊(duì)列消費(fèi)。這時(shí)就要用到 Direct 類型的 Exchange 發(fā)送消息時(shí),也必須指定消息的 routing key。


0576d00349b04de9b67bdb42a3912221.png

P:生產(chǎn)者,向 Exchange 發(fā)送消息,發(fā)送消息時(shí),會(huì)指定一個(gè) routing key。
X:Exchange (交換機(jī)),接收生產(chǎn)者的消息,然后把消息遞交給 routing key 完全匹配的隊(duì)列
C1:消費(fèi)者,其所在隊(duì)列指定了需要routing key 為 error 的消息。
C2:消費(fèi)者,其所在隊(duì)列指定了需要 routing key 為 info,error,warning 的消息。

在這種情況下,一個(gè)消息在發(fā)布時(shí)指定了路由鍵為 error 將只會(huì)被 c1 消耗。路由鍵為 info,error, warning 的消息都將被 c2 消耗,其他消息都將被丟失。

5、主題模式(Topic )(主題交換機(jī))

Topic 類型的 Exchange 與 Direct 相比,都是可以根據(jù) RoutingKey 把消息路由到不同的隊(duì)列。只不過 Topic 類型 Exchange 可以讓隊(duì)列在綁定 Routing key 的時(shí)候使用通配符。


7bb76ef2d5fc485a85a4a6049b74ae3d.png

消息持久化

目的是為了避免消息丟失
消息丟失:

  • 消費(fèi)者的 ACK 機(jī)制??梢苑乐瓜M(fèi)者丟失消息。

消費(fèi)者領(lǐng)取消息后,還沒執(zhí)行操作就掛掉了?或者拋出了異常?消息消費(fèi)失敗,但是 RabbitMQ 無從得知,這樣消息就丟失了!
因此,RabbitMQ 有一個(gè) ACK 機(jī)制。當(dāng)消費(fèi)者獲取消息后,會(huì)向 RabbitMQ 發(fā)送回執(zhí) ACK,告知消息已經(jīng)被接收。不過這種回執(zhí) ACK 分兩種情況:

  1. 自動(dòng) ACK : 消息一旦被接收,消費(fèi)者自動(dòng)發(fā)送 ACK
  2. 手動(dòng) ACK:消息接收后,不會(huì)發(fā)送 ACK ,需要手動(dòng)調(diào)用。
  • 如果在消費(fèi)者消費(fèi)之前,MQ 就宕機(jī)了,消息就沒了。

消息持久化的前提是:隊(duì)列,Exchange 都持久化。

  • Exchange 持久化:
// 獲取通道
Channel channel = connection.createChannel();
// 聲明 exchange ,指定類型為 topic
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true)
// 消息內(nèi)容
  • 隊(duì)列持久化
// 獲取通道
Channel channel = connection.createChannel();
// 聲明隊(duì)列
channel.queueDeclare(QUEUE_NAME, true, false, false, false)
  • 消息持久化
// 發(fā)送消息,并且指定 routing key 為 insert 代表新增
channel.basicPublish(EXCHANGE_NAME, routing_key="item.insert", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println(" [商品服務(wù):] Sent ‘ " + message + " ’ ");

代碼示例

1、引入依賴
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.5.13</version>
</dependency>
2、修改配置文件
spring:
  rabbitmq:
    host: 127.0.0.1 #ip
    port: 5672      #端口
    username: guest #賬號(hào)
    password: guest #密碼
    virtualHost:    #鏈接的虛擬主機(jī)
    addresses: 127.0.0.1:5672     #多個(gè)以逗號(hào)分隔,與host功能一樣。
    requestedHeartbeat: 60 #指定心跳超時(shí),單位秒,0為不指定;默認(rèn)60s
    publisherConfirms: true  #發(fā)布確認(rèn)機(jī)制是否啟用
    publisherReturns: #發(fā)布返回是否啟用
    connectionTimeout: #鏈接超時(shí)。單位ms。0表示無窮大不超時(shí)
    ### ssl相關(guān)
    ssl:
      enabled: #是否支持ssl
      keyStore: #指定持有SSL certificate的key store的路徑
      keyStoreType: #key store類型 默認(rèn)PKCS12
      keyStorePassword: #指定訪問key store的密碼
      trustStore: #指定持有SSL certificates的Trust store
      trustStoreType: #默認(rèn)JKS
      trustStorePassword: #訪問密碼
      algorithm: #ssl使用的算法,例如,TLSv1.1
      verifyHostname: #是否開啟hostname驗(yàn)證
    ### cache相關(guān)
    cache:
      channel: 
        size: #緩存中保持的channel數(shù)量
        checkoutTimeout: #當(dāng)緩存數(shù)量被設(shè)置時(shí),從緩存中獲取一個(gè)channel的超時(shí)時(shí)間,單位毫秒;如果為0,則總是創(chuàng)建一個(gè)新channel
      connection:
        mode: #連接工廠緩存模式:CHANNEL 和 CONNECTION
        size: #緩存的連接數(shù),只有是CONNECTION模式時(shí)生效
    ### listener
    listener:
       type: #兩種類型,SIMPLE,DIRECT
       ## simple類型
       simple:
         concurrency: #最小消費(fèi)者數(shù)量
         maxConcurrency: #最大的消費(fèi)者數(shù)量
         transactionSize: #指定一個(gè)事務(wù)處理的消息數(shù)量,最好是小于等于prefetch的數(shù)量
         missingQueuesFatal: #是否停止容器當(dāng)容器中的隊(duì)列不可用
         ## 與direct相同配置部分
         autoStartup: #是否自動(dòng)啟動(dòng)容器
         acknowledgeMode: #表示消息確認(rèn)方式,其有三種配置方式,分別是none、manual和auto;默認(rèn)auto
         prefetch: #指定一個(gè)請求能處理多少個(gè)消息,如果有事務(wù)的話,必須大于等于transaction數(shù)量
         defaultRequeueRejected: #決定被拒絕的消息是否重新入隊(duì);默認(rèn)是true(與參數(shù)acknowledge-mode有關(guān)系)
         idleEventInterval: #container events發(fā)布頻率,單位ms
         ##重試機(jī)制
         retry: 
           stateless: #有無狀態(tài)
           enabled:  #是否開啟
           maxAttempts: #最大重試次數(shù),默認(rèn)3
           initialInterval: #重試間隔
           multiplier: #對于上一次重試的乘數(shù)
           maxInterval: #最大重試時(shí)間間隔
       direct:
         consumersPerQueue: #每個(gè)隊(duì)列消費(fèi)者數(shù)量
         missingQueuesFatal:
         #...其余配置看上方公共配置
     ## template相關(guān)
     template:
       mandatory: #是否啟用強(qiáng)制信息;默認(rèn)false
       receiveTimeout: #`receive()`接收方法超時(shí)時(shí)間
       replyTimeout: #`sendAndReceive()`超時(shí)時(shí)間
       exchange: #默認(rèn)的交換機(jī)
       routingKey: #默認(rèn)的路由
       defaultReceiveQueue: #默認(rèn)的接收隊(duì)列
       ## retry重試相關(guān)
       retry: 
         enabled: #是否開啟
         maxAttempts: #最大重試次數(shù)
         initialInterval: #重試間隔
         multiplier: #失敗間隔乘數(shù)
         maxInterval: #最大間隔

簡單模式

1、配置類
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 簡單模式配置
 */
@Configuration
public class RabbitMqConfig {
    
    public static final String QUEUE_NAME = "my_queue"; //隊(duì)列名稱

    /**
     * Queue(隊(duì)列)
     */
    @Bean
    public Queue simpleQueue() {
       /**
         * name:隊(duì)列名稱。
         * durable:是否持久化,默認(rèn)是false,持久化隊(duì)列(內(nèi)部會(huì)有一個(gè)actualName: 隊(duì)列的真實(shí)名稱,默認(rèn)用name參數(shù),如果name為空,則根據(jù)規(guī)則生成一個(gè))
         * exclusive:默認(rèn)也是false,只能被當(dāng)前創(chuàng)建的連接使用,而且當(dāng)連接關(guān)閉后隊(duì)列即被刪除。此參考優(yōu)先級高于durable
         * autoDelete:是否自動(dòng)刪除,當(dāng)沒有生產(chǎn)者或者消費(fèi)者使用此隊(duì)列,該隊(duì)列會(huì)自動(dòng)刪除。
         * arguments:設(shè)置隊(duì)列的屬性參數(shù)
         *  1、x-message-ttl:消息的過期時(shí)間,單位:毫秒;
         *  2、x-expires:隊(duì)列過期時(shí)間,隊(duì)列在多長時(shí)間未被訪問將被刪除,單位:毫秒;
         *  3、x-max-length:隊(duì)列最大長度,超過該最大值,則將從隊(duì)列頭部開始刪除消息;
         *  4、x-max-length-bytes:隊(duì)列消息內(nèi)容占用最大空間,受限于內(nèi)存大小,超過該閾值則從隊(duì)列頭部開始刪除消息;
         *  5、x-overflow:設(shè)置隊(duì)列溢出行為。這決定了當(dāng)達(dá)到隊(duì)列的最大長度時(shí)消息會(huì)發(fā)生什么。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁隊(duì)列類型僅支持drop-head;
         *  6、x-dead-letter-exchange:死信交換器名稱,過期或被刪除(因隊(duì)列長度超長或因空間超出閾值)的消息可指定發(fā)送到該交換器中;
         *  7、x-dead-letter-routing-key:死信消息路由鍵,在消息發(fā)送到死信交換器時(shí)會(huì)使用該路由鍵,如果不設(shè)置,則使用消息的原來的路由鍵值
         *  8、x-single-active-consumer:表示隊(duì)列是否是單一活動(dòng)消費(fèi)者,true時(shí),注冊的消費(fèi)組內(nèi)只有一個(gè)消費(fèi)者消費(fèi)消息,其他被忽略,false時(shí)消息循環(huán)分發(fā)給所有消費(fèi)者(默認(rèn)false)
         *  9、x-max-priority:隊(duì)列要支持的最大優(yōu)先級數(shù);如果未設(shè)置,隊(duì)列將不支持消息優(yōu)先級;
         * 10、x-queue-mode(Lazy mode):將隊(duì)列設(shè)置為延遲模式,在磁盤上保留盡可能多的消息,以減少RAM的使用;如果未設(shè)置,隊(duì)列將保留內(nèi)存緩存以盡可能快地傳遞消息;
         * 11、x-queue-master-locator:在集群模式下設(shè)置鏡像隊(duì)列的主節(jié)點(diǎn)信息。
         */
        return new Queue(QUEUE_NAME, true, false, false, null);
    }
}
2、消費(fèi)者
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;

/**
 * 簡單模式
 * 消息消費(fèi)者
 */
@Component
@RabbitListener(queues = "my_queue")//監(jiān)聽的隊(duì)列名稱
//@RabbitListener(queuesToDeclare = @Queue("simple_queue"))  //如果simple_queue隊(duì)列不存在,則創(chuàng)建simple_queue隊(duì)列。默認(rèn)隊(duì)列是持久化,非獨(dú)占式的
public class SimpleConsumer {

    //消費(fèi)者如果監(jiān)聽到消息隊(duì)列有消息傳入,則會(huì)自動(dòng)消費(fèi)
    @RabbitHandler
    public void receive(Map message) {
        System.out.println("簡單模式 -> 消費(fèi)者收到map類型消息  : " + testMessage.toString());
    }
    
    @RabbitHandler
    public void receive2(String message) {
        System.out.println("簡單模式 -> 消費(fèi)者收到string類型消息  : " + testMessage.toString());
    }
}

@RabbitListener 注解屬性的作用:
queuesToDeclare:如果 simple_queue 隊(duì)列不存在,則會(huì)自動(dòng)創(chuàng)建 simple_queue 隊(duì)列。默認(rèn)隊(duì)列是持久化,非獨(dú)占式的。
queues:里面的隊(duì)列必須存在,否則就會(huì)報(bào)錯(cuò)。
@RabbitListener(queues = {"simple_queue2"}),如果隊(duì)列 simple_queue2 不存在,那么啟動(dòng)消費(fèi)者就會(huì)報(bào)錯(cuò)
注意:
@RabbitListener 既可以標(biāo)記在類上,也可以標(biāo)記在方法上
標(biāo)記在類上:需配合 @RabbitHandler 注解一起使用。當(dāng)有收到消息的時(shí)候,就交給@RabbitHandler 的方法處理,具體使用哪個(gè)方法處理,根據(jù) MessageConverter 轉(zhuǎn)換后的參數(shù)類型(入?yún)㈩愋瓦M(jìn)行決定)
標(biāo)記在方法上:就有指定的方法進(jìn)行處理

3、生產(chǎn)者
/**
 * 簡單模式
 * 消息生產(chǎn)者
 */
@Controller
@RequestMapping(value = "simple")
public class SimpleProducer {

    @Autowired
    RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,這提供了接收/發(fā)送等等方法

    /**
     * 向rabbitMq發(fā)送消息
     */
    @RequestMapping(value = "sendMsg")
    @ResponseBody
    public String send(String messageId,String messageData) {
        SimpleDateFormat simpleDateFormat  = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String createTime = simpleDateFormat.format(new Date());
        Map<String,Object> map=new HashMap<>();
        map.put("messageId",messageId);
        map.put("messageData",messageData);
        map.put("createTime",createTime);
        //將消息發(fā)送到隊(duì)列my_queue中
        rabbitTemplate.convertAndSend("my_queue", map);
        //receive將接收到消息
        System.out.println("rabbitMQ 簡單模式消息發(fā)送成功!");
        return "true";
    }
}
消息手動(dòng)確認(rèn)

1、yml 文件添加配置

listener:
  simple:
    concurrency: 1
    max-concurrency: 1
    acknowledge-mode: manual
    prefetch: 1
消費(fèi)者代碼修改
/**
 * 簡單模式
 * 消息消費(fèi)者
 */
@Component
public class SimpleConsumer {

    @RabbitListener(queues = "my_queue")//監(jiān)聽的隊(duì)列名稱
    public void process(Message message, Channel channel) throws IOException {
        String str = new String(message.getBody());
        JSONObject msgData = (JSONObject) JSON.parse(str);
        Object messageId = msgData.get("messageId");
        if (null==messageId || messageId.toString().equals("")) {
            /**
             * 有異常就拒收消息
             * basicNack(long deliveryTag, boolean multiple, boolean requeue)
             * deliveryTag:當(dāng)前消息在隊(duì)列中的的索引;
             * multiple:為true的話就是批量確認(rèn)
             * requeue:true將消息重返當(dāng)前消息隊(duì)列,重新發(fā)送給消費(fèi)者;
             *         false將消息丟棄
             */
            channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
            System.out.println("簡單模式 -> 消費(fèi)者拒收消息 : " + msgData.toString());
        }else {
            /**
             * 沒有異常就確認(rèn)消息
             * basicAck(long deliveryTag, boolean multiple)
             * deliveryTag:當(dāng)前消息在隊(duì)列中的的索引;
             * multiple:為true的話就是批量確認(rèn)
             */
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            System.out.println("簡單模式 -> 消費(fèi)者收到消息  : " + msgData.toString());
        }
    }
}
消息確認(rèn)機(jī)制

1、自動(dòng)確認(rèn)
這也是默認(rèn)的消息確認(rèn)情況。AcknowledgeMode.NONE
RabbitMQ 成功將消息發(fā)出(即將消息成功寫入 TCP Socket)中立即認(rèn)為本次投遞已經(jīng)被正確處理,不管消費(fèi)者是否成功處理本次投遞。
所以這種情況如果消費(fèi)端消費(fèi)邏輯拋出異常,也就是消費(fèi)端沒有處理成功這條消息,那么就相當(dāng)于丟失了消息。一般這種情況我們都是使用 try catch 捕捉異常后,打印日志用于追蹤數(shù)據(jù),這樣找出對應(yīng)數(shù)據(jù)再做后續(xù)處理。
2、手動(dòng)確認(rèn)
這個(gè)比較關(guān)鍵,也是我們配置接收消息確認(rèn)機(jī)制時(shí),多數(shù)選擇的模式。
消費(fèi)者收到消息后,手動(dòng)調(diào)用 basic.ack / basic.nack / basic.reject 后,RabbitMQ 收到這些消息后,才認(rèn)為本次投遞成功。

  • basic.ack 用于肯定確認(rèn)
  • basic.nack 用于否定確認(rèn)(注意:這是 AMQP 0-9-1 的 RabbitMQ擴(kuò)展)
    -basic.reject 用于否定確認(rèn),但與 basic.nack 相比有一個(gè)限制:一次只能拒絕單條消息。

channel.basicReject(deliveryTag, true):拒絕消費(fèi)當(dāng)前消息
第一個(gè)參數(shù)是當(dāng)前消息在隊(duì)列中的索引
第二參數(shù)傳入 true ,就是將數(shù)據(jù)重新丟會(huì)隊(duì)列里,那么下次還會(huì)消費(fèi)這個(gè)消息。
第二參數(shù)傳入 false,就是告訴服務(wù)器,我們已經(jīng)知道這條消息數(shù)據(jù)了,因?yàn)橐恍┰蚓芙^它,而且服務(wù)器也把這個(gè)消息丟掉就行,下次不想再消費(fèi)這條消息了。
使用拒絕后重新入列這個(gè)確認(rèn)模式要謹(jǐn)慎,因?yàn)橐话愣际浅霈F(xiàn)異常的時(shí)候,catch 異常再拒絕入列,選擇是否重新入列。如果使用不當(dāng)會(huì)導(dǎo)致一些每次都被你重入列的消息一直消費(fèi) - 入列 - 消費(fèi) - 入列 這樣循環(huán),會(huì)導(dǎo)致消息積壓。

channel.basicNack(delivertTag, false, true):否定消費(fèi)確認(rèn)
第一個(gè)參數(shù):是當(dāng)前消息在隊(duì)列中的索引
第二個(gè)參數(shù):指是否針對多條消息,如果是 true,也就是說一次性針對當(dāng)前通道的消息 tagID 小于當(dāng)前這條消息的,都拒絕確認(rèn)。
第三個(gè)參數(shù):指是否重新入列,也就是指不確認(rèn)的消息是否重新丟回到隊(duì)列里面去。
同樣使用不確認(rèn)后重新入列這個(gè)確認(rèn)模式要謹(jǐn)慎,因?yàn)檫@里也可能考慮不周出現(xiàn)消息一直被重新丟回去的情況,導(dǎo)致積壓。

4、消息回調(diào)

ConfirmCallback:當(dāng)消息到達(dá)交換機(jī)觸發(fā)回調(diào)
ReturnsCallback:消息(帶路由鍵 routingKey) 到達(dá)交換機(jī),與交換機(jī)的所有綁定鍵進(jìn)行匹配,觸發(fā)回調(diào)。
若要使用消息回調(diào):

  1. 修改配置
publisher-confirm-type: correlated
publisher-returns: true
  1. 設(shè)置 mandatory
    ···
    設(shè)置rabbitTemplate的mandatory為true 或者在配置中設(shè)置 rabbitmq.template.mandatory=true
配置文件新增代碼:

@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
//設(shè)置開啟Mandatory,才能觸發(fā)回調(diào)函數(shù),無論消息推送結(jié)果怎么樣都強(qiáng)制調(diào)用回調(diào)函數(shù)
rabbitTemplate.setMandatory(true);
//確認(rèn)消息發(fā)送到交換機(jī)
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("=======================> ConfirmCallback <=======================");
System.out.println("ConfirmCallback ===>"+"相關(guān)數(shù)據(jù):"+correlationData);
System.out.println("ConfirmCallback ===>"+"確認(rèn)情況:"+ack);
System.out.println("ConfirmCallback ===>"+"原因:"+cause);
}
});
//確認(rèn)消息已發(fā)送到隊(duì)列
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
Message message = returnedMessage.getMessage();
int replyCode = returnedMessage.getReplyCode();
String replyText = returnedMessage.getReplyText();
String exchange = returnedMessage.getExchange();
String routingKey = returnedMessage.getRoutingKey();
System.out.println("=======================> ReturnsCallback <=======================");
System.out.println("ReturnCallback ===>"+"消息:"+message.toString());
System.out.println("ReturnCallback ===>"+"回應(yīng)碼:"+replyCode);
System.out.println("ReturnCallback ===>"+"回應(yīng)信息:"+replyText);
System.out.println("ReturnCallback ===>"+"交換機(jī):"+exchange);
System.out.println("ReturnCallback ===>"+"路由鍵:"+routingKey);
}
});
return rabbitTemplate;
}

###### 工作模式
運(yùn)行許多消費(fèi)者,任務(wù)在他們之間共享,但是一個(gè)消息只能被一個(gè)消費(fèi)者獲取。設(shè)置 prefetchCount 值為1。這告訴 RabbitMQ 一次不要向消費(fèi)者發(fā)送多于一條消息。
換句話說,不要向消費(fèi)者發(fā)送新消息,直到它處理并確認(rèn)了前一個(gè)消息。相反,它會(huì)將其分派給不是忙碌 的下一個(gè)消費(fèi)者。
當(dāng)有多個(gè)消費(fèi)者時(shí),我們的消息會(huì)被哪個(gè)消費(fèi)者消費(fèi)呢?我們又該如何均衡消費(fèi)者消費(fèi)消息的多少呢?
主要有兩種模式:
輪詢模式的分發(fā): 一個(gè)消費(fèi)者一條,按均分配(關(guān)閉手動(dòng)應(yīng)答,開啟自動(dòng)應(yīng)答)
公平分發(fā):根據(jù)消費(fèi)者的消費(fèi)能力進(jìn)行公平分發(fā),處理快的處理的多,處理慢的處理的少;按勞分配(關(guān)閉自動(dòng)應(yīng)答,開啟手動(dòng)應(yīng)答)
1、 yml 文件添加 prefetch

listener:
simple:
prefetch: 1

2、創(chuàng)建兩個(gè)消費(fèi)者(生產(chǎn)者,配置類不變)

/**

  • 簡單模式

  • 消息消費(fèi)者
    */
    @Component
    public class SimpleConsumer {

    @RabbitListener(queues = "my_queue")//監(jiān)聽的隊(duì)列名稱
    public void process(Message message, Channel channel) throws IOException, InterruptedException {
    String str = new String(message.getBody());
    JSONObject msgData = (JSONObject) JSON.parse(str);
    Object messageId = msgData.get("messageId");
    if (null==messageId || messageId.toString().equals("")) {
    /**
    * 有異常就拒收消息
    * basicNack(long deliveryTag, boolean multiple, boolean requeue)
    * deliveryTag:當(dāng)前消息在隊(duì)列中的的索引;
    * multiple:為true的話就是批量確認(rèn)
    * requeue:true將消息重返當(dāng)前消息隊(duì)列,重新發(fā)送給消費(fèi)者;
    * false將消息丟棄
    /
    channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
    System.out.println("簡單模式 -> 消費(fèi)者拒收消息 : " + msgData.toString());
    }else {
    /
    *
    * 沒有異常就確認(rèn)消息
    * basicAck(long deliveryTag, boolean multiple)
    * deliveryTag:當(dāng)前消息在隊(duì)列中的的索引;
    * multiple:為true的話就是批量確認(rèn)
    */
    Thread.sleep(2000);
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    System.out.println("簡單模式 process -> 消費(fèi)者收到消息 : " + msgData.toString());
    }
    }

    @RabbitListener(queues = "my_queue")//監(jiān)聽的隊(duì)列名稱
    public void process2(Message message, Channel channel) throws IOException, InterruptedException {
    String str = new String(message.getBody());
    JSONObject msgData = (JSONObject) JSON.parse(str);
    Object messageId = msgData.get("messageId");
    if (null==messageId || messageId.toString().equals("")) {
    /**
    * 有異常就拒收消息
    * basicNack(long deliveryTag, boolean multiple, boolean requeue)
    * deliveryTag:當(dāng)前消息在隊(duì)列中的的索引;
    * multiple:為true的話就是批量確認(rèn)
    * requeue:true將消息重返當(dāng)前消息隊(duì)列,重新發(fā)送給消費(fèi)者;
    * false將消息丟棄
    /
    channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
    System.out.println("簡單模式 -> 消費(fèi)者拒收消息 : " + msgData.toString());
    }else {
    /
    *
    * 沒有異常就確認(rèn)消息
    * basicAck(long deliveryTag, boolean multiple)
    * deliveryTag:當(dāng)前消息在隊(duì)列中的的索引;
    * multiple:為true的話就是批量確認(rèn)
    */
    Thread.sleep(5000);
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    System.out.println("簡單模式 process2 -> 消費(fèi)者收到消息 : " + msgData.toString());
    }
    }
    }


#### 訂閱模型-Fanout
###### 配置類

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**

  • 訂閱模式-fanout

  • 扇型交換機(jī)

  • 創(chuàng)建三個(gè)隊(duì)列 :fanout_queue1 fanout_queue2 fanout_queue3

  • 將三個(gè)隊(duì)列都綁定在交換機(jī) fanout_exchange 上

  • 因?yàn)槭巧刃徒粨Q機(jī), 路由鍵無需配置,配置也不起作用
    */
    @Configuration
    public class FanoutRabbitMqConfig {
    //隊(duì)列名稱常量
    public static final String QUEUE_NAME1 = "fanout_queue1";
    public static final String QUEUE_NAME2 = "fanout_queue2";
    public static final String QUEUE_NAME3 = "fanout_queue3";
    //交換機(jī)名稱常量
    public static final String EXCHANGE_NAME = "fanout_exchange";

    /**

    • 聲明隊(duì)列 Queue
      */
      @Bean
      public Queue fanoutQueue1() {
      return new Queue(QUEUE_NAME1, true, false, false, null);
      }

    /**

    • 聲明隊(duì)列 Queue
      */
      @Bean
      public Queue fanoutQueue2() {
      return new Queue(QUEUE_NAME2, true, false, false, null);
      }

    /**

    • 聲明隊(duì)列 Queue
      */
      @Bean
      public Queue fanoutQueue3() {
      return new Queue(QUEUE_NAME3, true, false, false, null);
      }

    /**

    • 聲明交換機(jī)
      */
      @Bean
      FanoutExchange fanoutExchange() {
      return new FanoutExchange(EXCHANGE_NAME,true,false,null);
      }

    /**

    • 交換機(jī)隊(duì)列綁定
      */
      @Bean
      Binding bindingExchange1() {
      return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
      }

    @Bean
    Binding bindingExchange2() {
    return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
    }

    @Bean
    Binding bindingExchange3() {
    return BindingBuilder.bind(fanoutQueue3()).to(fanoutExchange());
    }
    }

###### 生產(chǎn)者
聲明 Exchange,不再聲明 Queue
發(fā)送消息到 Exchange,不再發(fā)送到 Queue

import com.alibaba.fastjson.JSONObject;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import java.text.SimpleDateFormat;
import java.util.Date;

@Controller
@RequestMapping(value = "fanout")
public class FanoutProducer {

@Autowired
RabbitTemplate rabbitTemplate;

/**
 * 向rabbitMq發(fā)送消息
 */
@RequestMapping(value = "sendMsg")
@ResponseBody
public String send(String messageId,String messageData) {
    SimpleDateFormat simpleDateFormat  = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    String createTime = simpleDateFormat.format(new Date());
    JSONObject jsonObject = new JSONObject();
    jsonObject.put("messageId",messageId);
    jsonObject.put("messageData",messageData);
    jsonObject.put("createTime",createTime);
    //將消息發(fā)送到隊(duì)列my_queue中
    rabbitTemplate.convertAndSend("fanout_exchange",null, jsonObject.toString());
    System.out.println("rabbitMQ 簡單模式消息發(fā)送成功!");
    return "true";
}

}

###### 消費(fèi)者

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;

@Component
public class FanoutReceiver {

@RabbitListener(queues = "fanout_queue1")
public void receive1(String msgData, Message message, Channel channel) throws IOException {
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    System.out.println("fanout_queue1消費(fèi)者收到消息  : " +msgData);
}

@RabbitListener(queues = "fanout_queue2")
public void receive2(String msgData, Message message, Channel channel) throws IOException {
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    System.out.println("fanout_queue2消費(fèi)者收到消息  : " +msgData);
}

@RabbitListener(queues = "fanout_queue3")
public void receive3(String msgData, Message message, Channel channel) throws IOException {
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    System.out.println("fanout_queue3消費(fèi)者收到消息  : " +msgData);
}

}

###### 發(fā)送消息后臺(tái)結(jié)果

rabbitMQ 簡單模式消息發(fā)送成功!
fanout_queue3消費(fèi)者收到消息 : {"createTime":"2022-10-06 23:02:29","messageId":"55","messageData":"bhhh"}
fanout_queue1消費(fèi)者收到消息 : {"createTime":"2022-10-06 23:02:29","messageId":"55","messageData":"bhhh"}
fanout_queue2消費(fèi)者收到消息 : {"createTime":"2022-10-06 23:02:29","messageId":"55","messageData":"bhhh"}
rabbitMQ 簡單模式消息發(fā)送成功!
fanout_queue2消費(fèi)者收到消息 : {"createTime":"2022-10-06 23:03:03","messageId":"66","messageData":"就立刻"}
fanout_queue1消費(fèi)者收到消息 : {"createTime":"2022-10-06 23:03:03","messageId":"66","messageData":"就立刻"}
fanout_queue3消費(fèi)者收到消息 : {"createTime":"2022-10-06 23:03:03","messageId":"66","messageData":"就立刻"}

#### 訂閱模型-Direct
###### 配置類

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**

  • 訂閱模式-direct

  • 直連交換機(jī)

  • 創(chuàng)建三個(gè)隊(duì)列 :direct_queue1 direct_queue2 direct_queue3

  • 將三個(gè)隊(duì)列都綁定在交換機(jī) direct_exchange 上
    */
    @Configuration
    public class DirectRabbitMqConfig {
    //隊(duì)列名稱常量
    public static final String QUEUE_NAME1 = "direct_queue1";
    public static final String QUEUE_NAME2 = "direct_queue2";
    public static final String QUEUE_NAME3 = "direct_queue3";
    //交換機(jī)名稱常量
    public static final String EXCHANGE_NAME = "direct_exchange";

    /**

    • 聲明隊(duì)列 Queue
      */
      @Bean
      public Queue directQueue1() {
      return new Queue(QUEUE_NAME1, true, false, false, null);
      }

    /**

    • 聲明隊(duì)列 Queue
      */
      @Bean
      public Queue directQueue2() {
      return new Queue(QUEUE_NAME2, true, false, false, null);
      }

    /**

    • 聲明隊(duì)列 Queue
      */
      @Bean
      public Queue directQueue3() {
      return new Queue(QUEUE_NAME3, true, false, false, null);
      }

    /**

    • 聲明交換機(jī)
      */
      @Bean
      DirectExchange directExchange() {
      return new DirectExchange(EXCHANGE_NAME,true,false,null);
      }

    /**

    • 交換機(jī)隊(duì)列綁定
      */
      @Bean
      Binding bindingDirectExchange1() {
      return BindingBuilder.bind(directQueue1()).to(directExchange()).with("info");
      }

    @Bean
    Binding bindingDirectExchange2() {
    return BindingBuilder.bind(directQueue2()).to(directExchange()).with("error");
    }

    @Bean
    Binding bindingDirectExchange3() {
    return BindingBuilder.bind(directQueue3()).to(directExchange()).with("warn");
    }
    }

###### 生產(chǎn)者

import com.alibaba.fastjson.JSONObject;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import java.text.SimpleDateFormat;
import java.util.Date;

@Controller
@RequestMapping(value = "direct")
public class DirectProducer {

@Autowired
RabbitTemplate rabbitTemplate;

/**
 * 向rabbitMq發(fā)送消息
 */
@RequestMapping(value = "sendMsg")
@ResponseBody
public String send(String messageId,String messageData,String routingKey) {
    SimpleDateFormat simpleDateFormat  = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    String createTime = simpleDateFormat.format(new Date());
    JSONObject jsonObject = new JSONObject();
    jsonObject.put("messageId",messageId);
    jsonObject.put("messageData",messageData);
    jsonObject.put("createTime",createTime);
    //將消息發(fā)送到交換機(jī)
    rabbitTemplate.convertAndSend("direct_exchange",routingKey, jsonObject.toString());
    System.out.println("rabbitMQ 簡單模式消息發(fā)送成功!");
    return "true";
}

}

###### 消費(fèi)者

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;

@Component
public class DirectReceiver {

@RabbitListener(queues = "direct_queue1")
public void receive1(String msgData, Message message, Channel channel) throws IOException {
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    System.out.println("direct_queue1消費(fèi)者收到消息  : " +msgData);
}

@RabbitListener(queues = "direct_queue2")
public void receive2(String msgData, Message message, Channel channel) throws IOException {
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    System.out.println("direct_queue2消費(fèi)者收到消息  : " +msgData);
}

@RabbitListener(queues = "direct_queue3")
public void receive3(String msgData, Message message, Channel channel) throws IOException {
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    System.out.println("direct_queue3消費(fèi)者收到消息  : " +msgData);
}

}

###### 發(fā)送消息后臺(tái)結(jié)果

消息發(fā)送成功!路由鍵:warn
direct_queue3消費(fèi)者收到消息 : {"createTime":"2022-10-07 11:31:45","messageId":"1","messageData":"sacascsac"}
消息發(fā)送成功!路由鍵:info
direct_queue1消費(fèi)者收到消息 : {"createTime":"2022-10-07 11:31:54","messageId":"1","messageData":"sacascsac"}
消息發(fā)送成功!路由鍵:error
direct_queue2消費(fèi)者收到消息 : {"createTime":"2022-10-07 11:31:59","messageId":"1","messageData":"sacascsac"}
消息發(fā)送成功!路由鍵:error1 (消息被丟棄)

#### 訂閱模型-Topic
###### 配置類

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**

  • 訂閱模式-topic

  • 主題交換機(jī)

  • 創(chuàng)建三個(gè)隊(duì)列 :topic_queue1 topic_queue2 topic_queue3

  • 將三個(gè)隊(duì)列都綁定在交換機(jī) topic_exchange 上
    */
    @Configuration
    public class TopicRabbitMqConfig {
    //隊(duì)列名稱常量
    public static final String QUEUE_NAME1 = "topic_queue1";
    public static final String QUEUE_NAME2 = "topic_queue2";
    public static final String QUEUE_NAME3 = "topic_queue3";
    //交換機(jī)名稱常量
    public static final String EXCHANGE_NAME = "topic_exchange";

    /**

    • 聲明隊(duì)列 Queue
      */
      @Bean
      public Queue topicQueue1() {
      return new Queue(QUEUE_NAME1, true, false, false, null);
      }

    /**

    • 聲明隊(duì)列 Queue
      */
      @Bean
      public Queue topicQueue2() {
      return new Queue(QUEUE_NAME2, true, false, false, null);
      }

    /**

    • 聲明隊(duì)列 Queue
      */
      @Bean
      public Queue topicQueue3() {
      return new Queue(QUEUE_NAME3, true, false, false, null);
      }

    /**

    • 聲明交換機(jī)
      */
      @Bean
      TopicExchange topicExchange() {
      return new TopicExchange(EXCHANGE_NAME,true,false,null);
      }

    /**

    • 交換機(jī)隊(duì)列綁定
    • 路由鍵使用通配符
      */
      @Bean
      Binding bindingTopicExchange1() {
      //消息攜帶的路由鍵是以"topic."開頭,就會(huì)分發(fā)到該隊(duì)列
      return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("topic.#");
      }

    @Bean
    Binding bindingTopicExchange2() {
    //消息攜帶的路由鍵是包含.topic.,就會(huì)分發(fā)到該隊(duì)列
    return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with(".topic.");
    }

    @Bean
    Binding bindingTopicExchange3() {
    //消息攜帶的路由鍵是以".topic"結(jié)尾,就會(huì)分發(fā)到該隊(duì)列
    return BindingBuilder.bind(topicQueue3()).to(topicExchange()).with("#.topic");
    }
    }

###### 生產(chǎn)者

import com.alibaba.fastjson.JSONObject;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

import java.text.SimpleDateFormat;
import java.util.Date;

@Controller
@RequestMapping(value = "topic")
public class TopicProducer {

@Autowired
RabbitTemplate rabbitTemplate;

/**
 * 向rabbitMq發(fā)送消息
 */
@RequestMapping(value = "sendMsg")
@ResponseBody
public String send(String messageId,String messageData,String routingKey) {
    SimpleDateFormat simpleDateFormat  = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    String createTime = simpleDateFormat.format(new Date());
    JSONObject jsonObject = new JSONObject();
    jsonObject.put("messageId",messageId);
    jsonObject.put("messageData",messageData);
    jsonObject.put("createTime",createTime);
    //將消息發(fā)送到隊(duì)列my_queue中
    rabbitTemplate.convertAndSend("topic_exchange",routingKey, jsonObject.toString());
    System.out.println("消息發(fā)送成功!路由鍵:"+routingKey);
    return "true";
}

}

###### 消費(fèi)者

import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;

@Component
public class TopicReceiver {

@RabbitListener(queues = "topic_queue1")
public void receive1(String msgData, Message message, Channel channel) throws IOException {
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    System.out.println("topic_queue1消費(fèi)者收到消息  : " +msgData);
}

@RabbitListener(queues = "topic_queue2")
public void receive2(String msgData, Message message, Channel channel) throws IOException {
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    System.out.println("topic_queue2消費(fèi)者收到消息  : " +msgData);
}

@RabbitListener(queues = "topic_queue3")
public void receive3(String msgData, Message message, Channel channel) throws IOException {
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    System.out.println("topic_queue3消費(fèi)者收到消息  : " +msgData);
}

}

###### 發(fā)送消息后臺(tái)結(jié)果

消息發(fā)送成功!路由鍵:topic.one
topic_queue1消費(fèi)者收到消息 : {"createTime":"2022-10-07 12:05:17","messageId":"1","messageData":"sacascsac"}
消息發(fā)送成功!路由鍵:A.topic.B
topic_queue2消費(fèi)者收到消息 : {"createTime":"2022-10-07 12:05:46","messageId":"1","messageData":"sacascsac"}
消息發(fā)送成功!路由鍵:C.topic
topic_queue3消費(fèi)者收到消息 : {"createTime":"2022-10-07 12:06:02","messageId":"1","messageData":"sacascsac"}
消息發(fā)送成功!路由鍵:SFC.topic.AFBGB
topic_queue2消費(fèi)者收到消息 : {"createTime":"2022-10-07 12:06:52","messageId":"1","messageData":"sacascsac"}
消息發(fā)送成功!路由鍵:S.D.FC.topic.A.S.D.F (消息被丟棄)

#### 一個(gè)簡單的消息推送接收的流程
![微信截圖_20230216160930.png](https://upload-images.jianshu.io/upload_images/7399010-4a1c61239e36c5d2.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

生產(chǎn)者產(chǎn)生消息,將消息推送到中間方框里面也就是rabbitMq的服務(wù)器,然后經(jīng)過服務(wù)器里面的交換機(jī)、隊(duì)列等各種關(guān)系將數(shù)據(jù)處理入列后,最終由右邊的消費(fèi)者獲取對應(yīng)監(jiān)聽的消息進(jìn)行消耗處理。

















?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容