Rabbit 簡介
RabbitMQ 是一個(gè)有 Erlang 語言開發(fā)的 AMQP 的開源實(shí)現(xiàn)。
AMQP:Advanced Message Queue 高級消息隊(duì)列協(xié)議。它是應(yīng)用層協(xié)議的一個(gè)開發(fā)標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì),基于此協(xié)議的客戶端與消息中間件可傳遞消息,并不受產(chǎn)品,開發(fā)語言等條件限制。
隊(duì)列(Queue)
隊(duì)列是常用的數(shù)據(jù)結(jié)構(gòu)之一,是一種特殊的線性表,特殊之外在與它只允許在表的前端(front)進(jìn)行刪除操作,而在表的后端(rear)進(jìn)行插入操作。進(jìn)行插入操作的端稱為隊(duì)尾,進(jìn)行刪除操作的端稱為隊(duì)頭。
消息隊(duì)列(Message Queue)
消息:計(jì)算機(jī) / 應(yīng)用 間傳送的數(shù)據(jù)單位,可以非常簡單,例如只包含文件字符串,也可以很復(fù)雜,可以包含嵌入對象。
消息隊(duì)列:在消息的傳輸過程中保存消息的容器。
消息傳輸時(shí),先發(fā)送到隊(duì)列,隊(duì)列的主要目的是提供路由并保證消息的傳遞,如果發(fā)送消息時(shí)接收者不可用,消息隊(duì)列會(huì)保留消息,直到可以成功的傳遞它。
可以把消息隊(duì)列理解成快遞公司,你需要寄一個(gè)物件(消息)給你的朋友,快遞公司收到物件會(huì)保證物件送的你的朋友手中,可能存在多次寄送才能送達(dá)成功的情況,比如第一次送過去,但你的朋友不在家。
消息隊(duì)里中間件是分布式系統(tǒng)中重要的組件
解決 應(yīng)用耦合,異步消息,流量削峰 等問題
實(shí)現(xiàn) 高性能,高可用,可伸縮 和 最終一致性
RabbitMQ 概念

- Broke:又稱 server,它提供一種傳輸服務(wù),它的角色就是維護(hù)一條從生產(chǎn)者到消費(fèi)者的路線,保證數(shù)據(jù)能按照指定的方式進(jìn)行傳輸。
- Exchange:消息交換機(jī),它指定消息按什么規(guī)則,路由到哪個(gè)隊(duì)列。
- Queue:消息的載體,每個(gè)消息都會(huì)被投到一個(gè)或多個(gè)隊(duì)列。
- Binding:綁定,它的作用就是把 exchange 和 queue 按照路由規(guī)則綁定起來。
- Routing Key:路由關(guān)鍵字,exchange 根據(jù)這個(gè)關(guān)關(guān)鍵字進(jìn)行消息投遞。
- vhost:虛擬機(jī),一個(gè) broker 里面可以有多個(gè) vhost ,用作不同用戶的權(quán)限分離。
- Producer:消息生產(chǎn)者,就是投遞消息的程序。
- Consumer:消息消費(fèi)者,就是接受消息的程序。
- Channel:消息通道,在客戶端的每個(gè)連接里,可以建立多個(gè) channel 。幾乎所有的操作都在 channel 中完成。
支持的消息類型

1、簡單模式 Simple

P (produce / publisher):生產(chǎn)者,一個(gè)發(fā)送消息的用戶應(yīng)用程序。
C (consumer):消費(fèi)者,消費(fèi)和接收的意思,消費(fèi)者是一個(gè)主要用來等待接收消息的用戶應(yīng)用程序。
隊(duì)列(紅色區(qū)域):rabbitmq 內(nèi)部類似于郵箱的一個(gè)概念。雖然消息流經(jīng) rabbitmq 和你的應(yīng)用程序,但是它們只能存儲(chǔ)在隊(duì)里中。隊(duì)列只受主機(jī)的內(nèi)存和磁盤限制,實(shí)質(zhì)上是一個(gè)大的消息緩沖區(qū)。許多生產(chǎn)者可以發(fā)送消息多一個(gè)隊(duì)列,許多消費(fèi)者可以嘗試從一個(gè)隊(duì)列接收數(shù)據(jù)。
總之:生產(chǎn)者將消息發(fā)送到隊(duì)里,消費(fèi)者從隊(duì)列中獲取消息,隊(duì)列是存儲(chǔ)消息的緩沖區(qū)。
1、工作模式 Work
工作模式又稱競爭消費(fèi)者模式
主要思想就是避免執(zhí)行資源密集型認(rèn)識(shí)時(shí),必須等待它執(zhí)行完成。相反我們需要稍后完成任務(wù),我們將任務(wù)封裝為消息并將其發(fā)送到隊(duì)列。在后臺(tái)運(yùn)行的工作進(jìn)程獲取任務(wù)并最終執(zhí)行作業(yè)。當(dāng)你運(yùn)行許多消費(fèi)者時(shí),任務(wù)將在他們之間共享,但是一個(gè)消息只能被一個(gè)消費(fèi)者獲取。

P:生產(chǎn)者,任務(wù)的發(fā)布者
C1:消費(fèi)者1
C2:消費(fèi)者2
如何避免消息堆積?
- 采用 workqueue,多個(gè)消費(fèi)者監(jiān)聽同一隊(duì)列。
- 接收到消息后,通過線程池,異步消費(fèi)。
3、發(fā)布訂閱模式(廣播機(jī)制)(扇型交換機(jī))
1個(gè)生產(chǎn)者,多個(gè)消費(fèi)者,每一個(gè)消費(fèi)者都有自己的隊(duì)列,生產(chǎn)者沒有將消息直接發(fā)送到隊(duì)列,而是發(fā)送到了交換機(jī),每個(gè)隊(duì)列都要綁定到交換機(jī),生產(chǎn)者發(fā)送的消息,經(jīng)過交換機(jī)到達(dá)隊(duì)列,實(shí)現(xiàn)一個(gè)消息被多個(gè)消費(fèi)者獲取的目的。(如果沒有任何隊(duì)列與 Exchange 綁定,或者沒有符合路由規(guī)則的隊(duì)列,那么消息會(huì)丟失!)

交換機(jī)(Exchange)【藍(lán)色圓圈x】
一方面接收生產(chǎn)者發(fā)送的消息。另一方面知道如何處理消息,如遞交個(gè)某個(gè)特別隊(duì)列,遞交給所有隊(duì)列,還是將消息丟棄。到底如何操作,取決于交換機(jī)的類型。Exchange 只負(fù)責(zé)轉(zhuǎn)發(fā)消息,不具備存儲(chǔ)消息的能力。
Exchange 類型有以下三種:
1、Direct Exchange(直連型交換機(jī)):
根據(jù)消息攜帶的路由鍵將消息投遞給對應(yīng)隊(duì)列。
大致流程為:有一個(gè)隊(duì)列綁定到一個(gè)直連交換機(jī)上,同時(shí)賦予一個(gè)路由鍵 key。然后當(dāng)一個(gè)消息攜帶著路由鍵 key,這個(gè)消息通過生產(chǎn)者發(fā)送給交換機(jī)時(shí),交換機(jī)就會(huì)根據(jù)這個(gè)路由值 key 去尋找綁定值也是 key 的隊(duì)列。
2、Fanout Exchange (扇型交換機(jī))
這個(gè)交換機(jī)沒有路由鍵概念,就算你綁定了路由鍵也是無效的。這個(gè)交換機(jī)在接收到消息后,會(huì)直接轉(zhuǎn)發(fā)到綁定到它上面的所有隊(duì)列。
3、Topic Exchange (主題交換機(jī))
這個(gè)交換機(jī)其實(shí)跟直連交換機(jī)流程差不多,但是它的特點(diǎn)就是在它的路由鍵和綁定鍵之間有規(guī)則
(星號(hào) *)用來表示一個(gè)單詞(必須出現(xiàn)一個(gè)單詞)
(井號(hào) #)用來表示任意數(shù)量的單詞
舉例:
隊(duì)列Q1 綁定鍵為 .TT.
隊(duì)列Q2 綁定鍵為 TT.#
如果一條消息攜帶的路由鍵為 A.TT.B,那么隊(duì)列Q1將會(huì)收到
如果一條消息攜帶的路由鍵為TT.AA.BB,那么隊(duì)列Q2將會(huì)收到
當(dāng)一個(gè)隊(duì)列的綁定鍵為 "#" 的時(shí)候,這個(gè)隊(duì)列將會(huì)無視消息的路由鍵,接收所有的消息。
當(dāng)"*" 和 "#" 這兩個(gè)特殊字符都未在綁定鍵中出現(xiàn)的時(shí)候,此時(shí)主題交換機(jī)就擁有直連交換機(jī)的行為。
所以主題交換機(jī)也就實(shí)現(xiàn)了扇型交換機(jī)的功能,和直連交換機(jī)的功能。
除了以上常用的交換機(jī),還有 Header Exchange(頭交換機(jī)),Default Exchange(默認(rèn)交換機(jī))和 (Dead Letter Exchange)(死信交換機(jī))
4、路由模式(直連交換機(jī))
在廣播模式中,生產(chǎn)者發(fā)布消息,所有消費(fèi)者都可以獲取所有消息。
在某些場景下,我們希望不同的消息被不同的隊(duì)列消費(fèi)。這時(shí)就要用到 Direct 類型的 Exchange 發(fā)送消息時(shí),也必須指定消息的 routing key。

P:生產(chǎn)者,向 Exchange 發(fā)送消息,發(fā)送消息時(shí),會(huì)指定一個(gè) routing key。
X:Exchange (交換機(jī)),接收生產(chǎn)者的消息,然后把消息遞交給 routing key 完全匹配的隊(duì)列
C1:消費(fèi)者,其所在隊(duì)列指定了需要routing key 為 error 的消息。
C2:消費(fèi)者,其所在隊(duì)列指定了需要 routing key 為 info,error,warning 的消息。
在這種情況下,一個(gè)消息在發(fā)布時(shí)指定了路由鍵為 error 將只會(huì)被 c1 消耗。路由鍵為 info,error, warning 的消息都將被 c2 消耗,其他消息都將被丟失。
5、主題模式(Topic )(主題交換機(jī))
Topic 類型的 Exchange 與 Direct 相比,都是可以根據(jù) RoutingKey 把消息路由到不同的隊(duì)列。只不過 Topic 類型 Exchange 可以讓隊(duì)列在綁定 Routing key 的時(shí)候使用通配符。

消息持久化
目的是為了避免消息丟失
消息丟失:
- 消費(fèi)者的 ACK 機(jī)制??梢苑乐瓜M(fèi)者丟失消息。
消費(fèi)者領(lǐng)取消息后,還沒執(zhí)行操作就掛掉了?或者拋出了異常?消息消費(fèi)失敗,但是 RabbitMQ 無從得知,這樣消息就丟失了!
因此,RabbitMQ 有一個(gè) ACK 機(jī)制。當(dāng)消費(fèi)者獲取消息后,會(huì)向 RabbitMQ 發(fā)送回執(zhí) ACK,告知消息已經(jīng)被接收。不過這種回執(zhí) ACK 分兩種情況:
- 自動(dòng) ACK : 消息一旦被接收,消費(fèi)者自動(dòng)發(fā)送 ACK
- 手動(dòng) ACK:消息接收后,不會(huì)發(fā)送 ACK ,需要手動(dòng)調(diào)用。
- 如果在消費(fèi)者消費(fèi)之前,MQ 就宕機(jī)了,消息就沒了。
消息持久化的前提是:隊(duì)列,Exchange 都持久化。
- Exchange 持久化:
// 獲取通道
Channel channel = connection.createChannel();
// 聲明 exchange ,指定類型為 topic
channel.exchangeDeclare(EXCHANGE_NAME, "topic", true)
// 消息內(nèi)容
- 隊(duì)列持久化
// 獲取通道
Channel channel = connection.createChannel();
// 聲明隊(duì)列
channel.queueDeclare(QUEUE_NAME, true, false, false, false)
- 消息持久化
// 發(fā)送消息,并且指定 routing key 為 insert 代表新增
channel.basicPublish(EXCHANGE_NAME, routing_key="item.insert", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println(" [商品服務(wù):] Sent ‘ " + message + " ’ ");
代碼示例
1、引入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.5.13</version>
</dependency>
2、修改配置文件
spring:
rabbitmq:
host: 127.0.0.1 #ip
port: 5672 #端口
username: guest #賬號(hào)
password: guest #密碼
virtualHost: #鏈接的虛擬主機(jī)
addresses: 127.0.0.1:5672 #多個(gè)以逗號(hào)分隔,與host功能一樣。
requestedHeartbeat: 60 #指定心跳超時(shí),單位秒,0為不指定;默認(rèn)60s
publisherConfirms: true #發(fā)布確認(rèn)機(jī)制是否啟用
publisherReturns: #發(fā)布返回是否啟用
connectionTimeout: #鏈接超時(shí)。單位ms。0表示無窮大不超時(shí)
### ssl相關(guān)
ssl:
enabled: #是否支持ssl
keyStore: #指定持有SSL certificate的key store的路徑
keyStoreType: #key store類型 默認(rèn)PKCS12
keyStorePassword: #指定訪問key store的密碼
trustStore: #指定持有SSL certificates的Trust store
trustStoreType: #默認(rèn)JKS
trustStorePassword: #訪問密碼
algorithm: #ssl使用的算法,例如,TLSv1.1
verifyHostname: #是否開啟hostname驗(yàn)證
### cache相關(guān)
cache:
channel:
size: #緩存中保持的channel數(shù)量
checkoutTimeout: #當(dāng)緩存數(shù)量被設(shè)置時(shí),從緩存中獲取一個(gè)channel的超時(shí)時(shí)間,單位毫秒;如果為0,則總是創(chuàng)建一個(gè)新channel
connection:
mode: #連接工廠緩存模式:CHANNEL 和 CONNECTION
size: #緩存的連接數(shù),只有是CONNECTION模式時(shí)生效
### listener
listener:
type: #兩種類型,SIMPLE,DIRECT
## simple類型
simple:
concurrency: #最小消費(fèi)者數(shù)量
maxConcurrency: #最大的消費(fèi)者數(shù)量
transactionSize: #指定一個(gè)事務(wù)處理的消息數(shù)量,最好是小于等于prefetch的數(shù)量
missingQueuesFatal: #是否停止容器當(dāng)容器中的隊(duì)列不可用
## 與direct相同配置部分
autoStartup: #是否自動(dòng)啟動(dòng)容器
acknowledgeMode: #表示消息確認(rèn)方式,其有三種配置方式,分別是none、manual和auto;默認(rèn)auto
prefetch: #指定一個(gè)請求能處理多少個(gè)消息,如果有事務(wù)的話,必須大于等于transaction數(shù)量
defaultRequeueRejected: #決定被拒絕的消息是否重新入隊(duì);默認(rèn)是true(與參數(shù)acknowledge-mode有關(guān)系)
idleEventInterval: #container events發(fā)布頻率,單位ms
##重試機(jī)制
retry:
stateless: #有無狀態(tài)
enabled: #是否開啟
maxAttempts: #最大重試次數(shù),默認(rèn)3
initialInterval: #重試間隔
multiplier: #對于上一次重試的乘數(shù)
maxInterval: #最大重試時(shí)間間隔
direct:
consumersPerQueue: #每個(gè)隊(duì)列消費(fèi)者數(shù)量
missingQueuesFatal:
#...其余配置看上方公共配置
## template相關(guān)
template:
mandatory: #是否啟用強(qiáng)制信息;默認(rèn)false
receiveTimeout: #`receive()`接收方法超時(shí)時(shí)間
replyTimeout: #`sendAndReceive()`超時(shí)時(shí)間
exchange: #默認(rèn)的交換機(jī)
routingKey: #默認(rèn)的路由
defaultReceiveQueue: #默認(rèn)的接收隊(duì)列
## retry重試相關(guān)
retry:
enabled: #是否開啟
maxAttempts: #最大重試次數(shù)
initialInterval: #重試間隔
multiplier: #失敗間隔乘數(shù)
maxInterval: #最大間隔
簡單模式
1、配置類
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 簡單模式配置
*/
@Configuration
public class RabbitMqConfig {
public static final String QUEUE_NAME = "my_queue"; //隊(duì)列名稱
/**
* Queue(隊(duì)列)
*/
@Bean
public Queue simpleQueue() {
/**
* name:隊(duì)列名稱。
* durable:是否持久化,默認(rèn)是false,持久化隊(duì)列(內(nèi)部會(huì)有一個(gè)actualName: 隊(duì)列的真實(shí)名稱,默認(rèn)用name參數(shù),如果name為空,則根據(jù)規(guī)則生成一個(gè))
* exclusive:默認(rèn)也是false,只能被當(dāng)前創(chuàng)建的連接使用,而且當(dāng)連接關(guān)閉后隊(duì)列即被刪除。此參考優(yōu)先級高于durable
* autoDelete:是否自動(dòng)刪除,當(dāng)沒有生產(chǎn)者或者消費(fèi)者使用此隊(duì)列,該隊(duì)列會(huì)自動(dòng)刪除。
* arguments:設(shè)置隊(duì)列的屬性參數(shù)
* 1、x-message-ttl:消息的過期時(shí)間,單位:毫秒;
* 2、x-expires:隊(duì)列過期時(shí)間,隊(duì)列在多長時(shí)間未被訪問將被刪除,單位:毫秒;
* 3、x-max-length:隊(duì)列最大長度,超過該最大值,則將從隊(duì)列頭部開始刪除消息;
* 4、x-max-length-bytes:隊(duì)列消息內(nèi)容占用最大空間,受限于內(nèi)存大小,超過該閾值則從隊(duì)列頭部開始刪除消息;
* 5、x-overflow:設(shè)置隊(duì)列溢出行為。這決定了當(dāng)達(dá)到隊(duì)列的最大長度時(shí)消息會(huì)發(fā)生什么。有效值是drop-head、reject-publish或reject-publish-dlx。仲裁隊(duì)列類型僅支持drop-head;
* 6、x-dead-letter-exchange:死信交換器名稱,過期或被刪除(因隊(duì)列長度超長或因空間超出閾值)的消息可指定發(fā)送到該交換器中;
* 7、x-dead-letter-routing-key:死信消息路由鍵,在消息發(fā)送到死信交換器時(shí)會(huì)使用該路由鍵,如果不設(shè)置,則使用消息的原來的路由鍵值
* 8、x-single-active-consumer:表示隊(duì)列是否是單一活動(dòng)消費(fèi)者,true時(shí),注冊的消費(fèi)組內(nèi)只有一個(gè)消費(fèi)者消費(fèi)消息,其他被忽略,false時(shí)消息循環(huán)分發(fā)給所有消費(fèi)者(默認(rèn)false)
* 9、x-max-priority:隊(duì)列要支持的最大優(yōu)先級數(shù);如果未設(shè)置,隊(duì)列將不支持消息優(yōu)先級;
* 10、x-queue-mode(Lazy mode):將隊(duì)列設(shè)置為延遲模式,在磁盤上保留盡可能多的消息,以減少RAM的使用;如果未設(shè)置,隊(duì)列將保留內(nèi)存緩存以盡可能快地傳遞消息;
* 11、x-queue-master-locator:在集群模式下設(shè)置鏡像隊(duì)列的主節(jié)點(diǎn)信息。
*/
return new Queue(QUEUE_NAME, true, false, false, null);
}
}
2、消費(fèi)者
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* 簡單模式
* 消息消費(fèi)者
*/
@Component
@RabbitListener(queues = "my_queue")//監(jiān)聽的隊(duì)列名稱
//@RabbitListener(queuesToDeclare = @Queue("simple_queue")) //如果simple_queue隊(duì)列不存在,則創(chuàng)建simple_queue隊(duì)列。默認(rèn)隊(duì)列是持久化,非獨(dú)占式的
public class SimpleConsumer {
//消費(fèi)者如果監(jiān)聽到消息隊(duì)列有消息傳入,則會(huì)自動(dòng)消費(fèi)
@RabbitHandler
public void receive(Map message) {
System.out.println("簡單模式 -> 消費(fèi)者收到map類型消息 : " + testMessage.toString());
}
@RabbitHandler
public void receive2(String message) {
System.out.println("簡單模式 -> 消費(fèi)者收到string類型消息 : " + testMessage.toString());
}
}
@RabbitListener 注解屬性的作用:
queuesToDeclare:如果 simple_queue 隊(duì)列不存在,則會(huì)自動(dòng)創(chuàng)建 simple_queue 隊(duì)列。默認(rèn)隊(duì)列是持久化,非獨(dú)占式的。
queues:里面的隊(duì)列必須存在,否則就會(huì)報(bào)錯(cuò)。
@RabbitListener(queues = {"simple_queue2"}),如果隊(duì)列 simple_queue2 不存在,那么啟動(dòng)消費(fèi)者就會(huì)報(bào)錯(cuò)
注意:
@RabbitListener 既可以標(biāo)記在類上,也可以標(biāo)記在方法上
標(biāo)記在類上:需配合 @RabbitHandler 注解一起使用。當(dāng)有收到消息的時(shí)候,就交給@RabbitHandler 的方法處理,具體使用哪個(gè)方法處理,根據(jù) MessageConverter 轉(zhuǎn)換后的參數(shù)類型(入?yún)㈩愋瓦M(jìn)行決定)
標(biāo)記在方法上:就有指定的方法進(jìn)行處理
3、生產(chǎn)者
/**
* 簡單模式
* 消息生產(chǎn)者
*/
@Controller
@RequestMapping(value = "simple")
public class SimpleProducer {
@Autowired
RabbitTemplate rabbitTemplate; //使用RabbitTemplate,這提供了接收/發(fā)送等等方法
/**
* 向rabbitMq發(fā)送消息
*/
@RequestMapping(value = "sendMsg")
@ResponseBody
public String send(String messageId,String messageData) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String createTime = simpleDateFormat.format(new Date());
Map<String,Object> map=new HashMap<>();
map.put("messageId",messageId);
map.put("messageData",messageData);
map.put("createTime",createTime);
//將消息發(fā)送到隊(duì)列my_queue中
rabbitTemplate.convertAndSend("my_queue", map);
//receive將接收到消息
System.out.println("rabbitMQ 簡單模式消息發(fā)送成功!");
return "true";
}
}
消息手動(dòng)確認(rèn)
1、yml 文件添加配置
listener:
simple:
concurrency: 1
max-concurrency: 1
acknowledge-mode: manual
prefetch: 1
消費(fèi)者代碼修改
/**
* 簡單模式
* 消息消費(fèi)者
*/
@Component
public class SimpleConsumer {
@RabbitListener(queues = "my_queue")//監(jiān)聽的隊(duì)列名稱
public void process(Message message, Channel channel) throws IOException {
String str = new String(message.getBody());
JSONObject msgData = (JSONObject) JSON.parse(str);
Object messageId = msgData.get("messageId");
if (null==messageId || messageId.toString().equals("")) {
/**
* 有異常就拒收消息
* basicNack(long deliveryTag, boolean multiple, boolean requeue)
* deliveryTag:當(dāng)前消息在隊(duì)列中的的索引;
* multiple:為true的話就是批量確認(rèn)
* requeue:true將消息重返當(dāng)前消息隊(duì)列,重新發(fā)送給消費(fèi)者;
* false將消息丟棄
*/
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
System.out.println("簡單模式 -> 消費(fèi)者拒收消息 : " + msgData.toString());
}else {
/**
* 沒有異常就確認(rèn)消息
* basicAck(long deliveryTag, boolean multiple)
* deliveryTag:當(dāng)前消息在隊(duì)列中的的索引;
* multiple:為true的話就是批量確認(rèn)
*/
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("簡單模式 -> 消費(fèi)者收到消息 : " + msgData.toString());
}
}
}
消息確認(rèn)機(jī)制
1、自動(dòng)確認(rèn)
這也是默認(rèn)的消息確認(rèn)情況。AcknowledgeMode.NONE
RabbitMQ 成功將消息發(fā)出(即將消息成功寫入 TCP Socket)中立即認(rèn)為本次投遞已經(jīng)被正確處理,不管消費(fèi)者是否成功處理本次投遞。
所以這種情況如果消費(fèi)端消費(fèi)邏輯拋出異常,也就是消費(fèi)端沒有處理成功這條消息,那么就相當(dāng)于丟失了消息。一般這種情況我們都是使用 try catch 捕捉異常后,打印日志用于追蹤數(shù)據(jù),這樣找出對應(yīng)數(shù)據(jù)再做后續(xù)處理。
2、手動(dòng)確認(rèn)
這個(gè)比較關(guān)鍵,也是我們配置接收消息確認(rèn)機(jī)制時(shí),多數(shù)選擇的模式。
消費(fèi)者收到消息后,手動(dòng)調(diào)用 basic.ack / basic.nack / basic.reject 后,RabbitMQ 收到這些消息后,才認(rèn)為本次投遞成功。
- basic.ack 用于肯定確認(rèn)
- basic.nack 用于否定確認(rèn)(注意:這是 AMQP 0-9-1 的 RabbitMQ擴(kuò)展)
-basic.reject 用于否定確認(rèn),但與 basic.nack 相比有一個(gè)限制:一次只能拒絕單條消息。
channel.basicReject(deliveryTag, true):拒絕消費(fèi)當(dāng)前消息
第一個(gè)參數(shù)是當(dāng)前消息在隊(duì)列中的索引
第二參數(shù)傳入 true ,就是將數(shù)據(jù)重新丟會(huì)隊(duì)列里,那么下次還會(huì)消費(fèi)這個(gè)消息。
第二參數(shù)傳入 false,就是告訴服務(wù)器,我們已經(jīng)知道這條消息數(shù)據(jù)了,因?yàn)橐恍┰蚓芙^它,而且服務(wù)器也把這個(gè)消息丟掉就行,下次不想再消費(fèi)這條消息了。
使用拒絕后重新入列這個(gè)確認(rèn)模式要謹(jǐn)慎,因?yàn)橐话愣际浅霈F(xiàn)異常的時(shí)候,catch 異常再拒絕入列,選擇是否重新入列。如果使用不當(dāng)會(huì)導(dǎo)致一些每次都被你重入列的消息一直消費(fèi) - 入列 - 消費(fèi) - 入列 這樣循環(huán),會(huì)導(dǎo)致消息積壓。
channel.basicNack(delivertTag, false, true):否定消費(fèi)確認(rèn)
第一個(gè)參數(shù):是當(dāng)前消息在隊(duì)列中的索引
第二個(gè)參數(shù):指是否針對多條消息,如果是 true,也就是說一次性針對當(dāng)前通道的消息 tagID 小于當(dāng)前這條消息的,都拒絕確認(rèn)。
第三個(gè)參數(shù):指是否重新入列,也就是指不確認(rèn)的消息是否重新丟回到隊(duì)列里面去。
同樣使用不確認(rèn)后重新入列這個(gè)確認(rèn)模式要謹(jǐn)慎,因?yàn)檫@里也可能考慮不周出現(xiàn)消息一直被重新丟回去的情況,導(dǎo)致積壓。
4、消息回調(diào)
ConfirmCallback:當(dāng)消息到達(dá)交換機(jī)觸發(fā)回調(diào)
ReturnsCallback:消息(帶路由鍵 routingKey) 到達(dá)交換機(jī),與交換機(jī)的所有綁定鍵進(jìn)行匹配,觸發(fā)回調(diào)。
若要使用消息回調(diào):
- 修改配置
publisher-confirm-type: correlated
publisher-returns: true
- 設(shè)置 mandatory
···
設(shè)置rabbitTemplate的mandatory為true 或者在配置中設(shè)置 rabbitmq.template.mandatory=true
配置文件新增代碼:
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
//設(shè)置開啟Mandatory,才能觸發(fā)回調(diào)函數(shù),無論消息推送結(jié)果怎么樣都強(qiáng)制調(diào)用回調(diào)函數(shù)
rabbitTemplate.setMandatory(true);
//確認(rèn)消息發(fā)送到交換機(jī)
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("=======================> ConfirmCallback <=======================");
System.out.println("ConfirmCallback ===>"+"相關(guān)數(shù)據(jù):"+correlationData);
System.out.println("ConfirmCallback ===>"+"確認(rèn)情況:"+ack);
System.out.println("ConfirmCallback ===>"+"原因:"+cause);
}
});
//確認(rèn)消息已發(fā)送到隊(duì)列
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
Message message = returnedMessage.getMessage();
int replyCode = returnedMessage.getReplyCode();
String replyText = returnedMessage.getReplyText();
String exchange = returnedMessage.getExchange();
String routingKey = returnedMessage.getRoutingKey();
System.out.println("=======================> ReturnsCallback <=======================");
System.out.println("ReturnCallback ===>"+"消息:"+message.toString());
System.out.println("ReturnCallback ===>"+"回應(yīng)碼:"+replyCode);
System.out.println("ReturnCallback ===>"+"回應(yīng)信息:"+replyText);
System.out.println("ReturnCallback ===>"+"交換機(jī):"+exchange);
System.out.println("ReturnCallback ===>"+"路由鍵:"+routingKey);
}
});
return rabbitTemplate;
}
###### 工作模式
運(yùn)行許多消費(fèi)者,任務(wù)在他們之間共享,但是一個(gè)消息只能被一個(gè)消費(fèi)者獲取。設(shè)置 prefetchCount 值為1。這告訴 RabbitMQ 一次不要向消費(fèi)者發(fā)送多于一條消息。
換句話說,不要向消費(fèi)者發(fā)送新消息,直到它處理并確認(rèn)了前一個(gè)消息。相反,它會(huì)將其分派給不是忙碌 的下一個(gè)消費(fèi)者。
當(dāng)有多個(gè)消費(fèi)者時(shí),我們的消息會(huì)被哪個(gè)消費(fèi)者消費(fèi)呢?我們又該如何均衡消費(fèi)者消費(fèi)消息的多少呢?
主要有兩種模式:
輪詢模式的分發(fā): 一個(gè)消費(fèi)者一條,按均分配(關(guān)閉手動(dòng)應(yīng)答,開啟自動(dòng)應(yīng)答)
公平分發(fā):根據(jù)消費(fèi)者的消費(fèi)能力進(jìn)行公平分發(fā),處理快的處理的多,處理慢的處理的少;按勞分配(關(guān)閉自動(dòng)應(yīng)答,開啟手動(dòng)應(yīng)答)
1、 yml 文件添加 prefetch
listener:
simple:
prefetch: 1
2、創(chuàng)建兩個(gè)消費(fèi)者(生產(chǎn)者,配置類不變)
/**
簡單模式
-
消息消費(fèi)者
*/
@Component
public class SimpleConsumer {@RabbitListener(queues = "my_queue")//監(jiān)聽的隊(duì)列名稱
public void process(Message message, Channel channel) throws IOException, InterruptedException {
String str = new String(message.getBody());
JSONObject msgData = (JSONObject) JSON.parse(str);
Object messageId = msgData.get("messageId");
if (null==messageId || messageId.toString().equals("")) {
/**
* 有異常就拒收消息
* basicNack(long deliveryTag, boolean multiple, boolean requeue)
* deliveryTag:當(dāng)前消息在隊(duì)列中的的索引;
* multiple:為true的話就是批量確認(rèn)
* requeue:true將消息重返當(dāng)前消息隊(duì)列,重新發(fā)送給消費(fèi)者;
* false將消息丟棄
/
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
System.out.println("簡單模式 -> 消費(fèi)者拒收消息 : " + msgData.toString());
}else {
/*
* 沒有異常就確認(rèn)消息
* basicAck(long deliveryTag, boolean multiple)
* deliveryTag:當(dāng)前消息在隊(duì)列中的的索引;
* multiple:為true的話就是批量確認(rèn)
*/
Thread.sleep(2000);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("簡單模式 process -> 消費(fèi)者收到消息 : " + msgData.toString());
}
}@RabbitListener(queues = "my_queue")//監(jiān)聽的隊(duì)列名稱
public void process2(Message message, Channel channel) throws IOException, InterruptedException {
String str = new String(message.getBody());
JSONObject msgData = (JSONObject) JSON.parse(str);
Object messageId = msgData.get("messageId");
if (null==messageId || messageId.toString().equals("")) {
/**
* 有異常就拒收消息
* basicNack(long deliveryTag, boolean multiple, boolean requeue)
* deliveryTag:當(dāng)前消息在隊(duì)列中的的索引;
* multiple:為true的話就是批量確認(rèn)
* requeue:true將消息重返當(dāng)前消息隊(duì)列,重新發(fā)送給消費(fèi)者;
* false將消息丟棄
/
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
System.out.println("簡單模式 -> 消費(fèi)者拒收消息 : " + msgData.toString());
}else {
/*
* 沒有異常就確認(rèn)消息
* basicAck(long deliveryTag, boolean multiple)
* deliveryTag:當(dāng)前消息在隊(duì)列中的的索引;
* multiple:為true的話就是批量確認(rèn)
*/
Thread.sleep(5000);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("簡單模式 process2 -> 消費(fèi)者收到消息 : " + msgData.toString());
}
}
}
#### 訂閱模型-Fanout
###### 配置類
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
訂閱模式-fanout
扇型交換機(jī)
創(chuàng)建三個(gè)隊(duì)列 :fanout_queue1 fanout_queue2 fanout_queue3
將三個(gè)隊(duì)列都綁定在交換機(jī) fanout_exchange 上
-
因?yàn)槭巧刃徒粨Q機(jī), 路由鍵無需配置,配置也不起作用
*/
@Configuration
public class FanoutRabbitMqConfig {
//隊(duì)列名稱常量
public static final String QUEUE_NAME1 = "fanout_queue1";
public static final String QUEUE_NAME2 = "fanout_queue2";
public static final String QUEUE_NAME3 = "fanout_queue3";
//交換機(jī)名稱常量
public static final String EXCHANGE_NAME = "fanout_exchange";/**
- 聲明隊(duì)列 Queue
*/
@Bean
public Queue fanoutQueue1() {
return new Queue(QUEUE_NAME1, true, false, false, null);
}
/**
- 聲明隊(duì)列 Queue
*/
@Bean
public Queue fanoutQueue2() {
return new Queue(QUEUE_NAME2, true, false, false, null);
}
/**
- 聲明隊(duì)列 Queue
*/
@Bean
public Queue fanoutQueue3() {
return new Queue(QUEUE_NAME3, true, false, false, null);
}
/**
- 聲明交換機(jī)
*/
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(EXCHANGE_NAME,true,false,null);
}
/**
- 交換機(jī)隊(duì)列綁定
*/
@Bean
Binding bindingExchange1() {
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
@Bean
Binding bindingExchange2() {
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}@Bean
Binding bindingExchange3() {
return BindingBuilder.bind(fanoutQueue3()).to(fanoutExchange());
}
} - 聲明隊(duì)列 Queue
###### 生產(chǎn)者
聲明 Exchange,不再聲明 Queue
發(fā)送消息到 Exchange,不再發(fā)送到 Queue
import com.alibaba.fastjson.JSONObject;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import java.text.SimpleDateFormat;
import java.util.Date;
@Controller
@RequestMapping(value = "fanout")
public class FanoutProducer {
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 向rabbitMq發(fā)送消息
*/
@RequestMapping(value = "sendMsg")
@ResponseBody
public String send(String messageId,String messageData) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String createTime = simpleDateFormat.format(new Date());
JSONObject jsonObject = new JSONObject();
jsonObject.put("messageId",messageId);
jsonObject.put("messageData",messageData);
jsonObject.put("createTime",createTime);
//將消息發(fā)送到隊(duì)列my_queue中
rabbitTemplate.convertAndSend("fanout_exchange",null, jsonObject.toString());
System.out.println("rabbitMQ 簡單模式消息發(fā)送成功!");
return "true";
}
}
###### 消費(fèi)者
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class FanoutReceiver {
@RabbitListener(queues = "fanout_queue1")
public void receive1(String msgData, Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("fanout_queue1消費(fèi)者收到消息 : " +msgData);
}
@RabbitListener(queues = "fanout_queue2")
public void receive2(String msgData, Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("fanout_queue2消費(fèi)者收到消息 : " +msgData);
}
@RabbitListener(queues = "fanout_queue3")
public void receive3(String msgData, Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("fanout_queue3消費(fèi)者收到消息 : " +msgData);
}
}
###### 發(fā)送消息后臺(tái)結(jié)果
rabbitMQ 簡單模式消息發(fā)送成功!
fanout_queue3消費(fèi)者收到消息 : {"createTime":"2022-10-06 23:02:29","messageId":"55","messageData":"bhhh"}
fanout_queue1消費(fèi)者收到消息 : {"createTime":"2022-10-06 23:02:29","messageId":"55","messageData":"bhhh"}
fanout_queue2消費(fèi)者收到消息 : {"createTime":"2022-10-06 23:02:29","messageId":"55","messageData":"bhhh"}
rabbitMQ 簡單模式消息發(fā)送成功!
fanout_queue2消費(fèi)者收到消息 : {"createTime":"2022-10-06 23:03:03","messageId":"66","messageData":"就立刻"}
fanout_queue1消費(fèi)者收到消息 : {"createTime":"2022-10-06 23:03:03","messageId":"66","messageData":"就立刻"}
fanout_queue3消費(fèi)者收到消息 : {"createTime":"2022-10-06 23:03:03","messageId":"66","messageData":"就立刻"}
#### 訂閱模型-Direct
###### 配置類
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
訂閱模式-direct
直連交換機(jī)
創(chuàng)建三個(gè)隊(duì)列 :direct_queue1 direct_queue2 direct_queue3
-
將三個(gè)隊(duì)列都綁定在交換機(jī) direct_exchange 上
*/
@Configuration
public class DirectRabbitMqConfig {
//隊(duì)列名稱常量
public static final String QUEUE_NAME1 = "direct_queue1";
public static final String QUEUE_NAME2 = "direct_queue2";
public static final String QUEUE_NAME3 = "direct_queue3";
//交換機(jī)名稱常量
public static final String EXCHANGE_NAME = "direct_exchange";/**
- 聲明隊(duì)列 Queue
*/
@Bean
public Queue directQueue1() {
return new Queue(QUEUE_NAME1, true, false, false, null);
}
/**
- 聲明隊(duì)列 Queue
*/
@Bean
public Queue directQueue2() {
return new Queue(QUEUE_NAME2, true, false, false, null);
}
/**
- 聲明隊(duì)列 Queue
*/
@Bean
public Queue directQueue3() {
return new Queue(QUEUE_NAME3, true, false, false, null);
}
/**
- 聲明交換機(jī)
*/
@Bean
DirectExchange directExchange() {
return new DirectExchange(EXCHANGE_NAME,true,false,null);
}
/**
- 交換機(jī)隊(duì)列綁定
*/
@Bean
Binding bindingDirectExchange1() {
return BindingBuilder.bind(directQueue1()).to(directExchange()).with("info");
}
@Bean
Binding bindingDirectExchange2() {
return BindingBuilder.bind(directQueue2()).to(directExchange()).with("error");
}@Bean
Binding bindingDirectExchange3() {
return BindingBuilder.bind(directQueue3()).to(directExchange()).with("warn");
}
} - 聲明隊(duì)列 Queue
###### 生產(chǎn)者
import com.alibaba.fastjson.JSONObject;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import java.text.SimpleDateFormat;
import java.util.Date;
@Controller
@RequestMapping(value = "direct")
public class DirectProducer {
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 向rabbitMq發(fā)送消息
*/
@RequestMapping(value = "sendMsg")
@ResponseBody
public String send(String messageId,String messageData,String routingKey) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String createTime = simpleDateFormat.format(new Date());
JSONObject jsonObject = new JSONObject();
jsonObject.put("messageId",messageId);
jsonObject.put("messageData",messageData);
jsonObject.put("createTime",createTime);
//將消息發(fā)送到交換機(jī)
rabbitTemplate.convertAndSend("direct_exchange",routingKey, jsonObject.toString());
System.out.println("rabbitMQ 簡單模式消息發(fā)送成功!");
return "true";
}
}
###### 消費(fèi)者
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class DirectReceiver {
@RabbitListener(queues = "direct_queue1")
public void receive1(String msgData, Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("direct_queue1消費(fèi)者收到消息 : " +msgData);
}
@RabbitListener(queues = "direct_queue2")
public void receive2(String msgData, Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("direct_queue2消費(fèi)者收到消息 : " +msgData);
}
@RabbitListener(queues = "direct_queue3")
public void receive3(String msgData, Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("direct_queue3消費(fèi)者收到消息 : " +msgData);
}
}
###### 發(fā)送消息后臺(tái)結(jié)果
消息發(fā)送成功!路由鍵:warn
direct_queue3消費(fèi)者收到消息 : {"createTime":"2022-10-07 11:31:45","messageId":"1","messageData":"sacascsac"}
消息發(fā)送成功!路由鍵:info
direct_queue1消費(fèi)者收到消息 : {"createTime":"2022-10-07 11:31:54","messageId":"1","messageData":"sacascsac"}
消息發(fā)送成功!路由鍵:error
direct_queue2消費(fèi)者收到消息 : {"createTime":"2022-10-07 11:31:59","messageId":"1","messageData":"sacascsac"}
消息發(fā)送成功!路由鍵:error1 (消息被丟棄)
#### 訂閱模型-Topic
###### 配置類
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
訂閱模式-topic
主題交換機(jī)
創(chuàng)建三個(gè)隊(duì)列 :topic_queue1 topic_queue2 topic_queue3
-
將三個(gè)隊(duì)列都綁定在交換機(jī) topic_exchange 上
*/
@Configuration
public class TopicRabbitMqConfig {
//隊(duì)列名稱常量
public static final String QUEUE_NAME1 = "topic_queue1";
public static final String QUEUE_NAME2 = "topic_queue2";
public static final String QUEUE_NAME3 = "topic_queue3";
//交換機(jī)名稱常量
public static final String EXCHANGE_NAME = "topic_exchange";/**
- 聲明隊(duì)列 Queue
*/
@Bean
public Queue topicQueue1() {
return new Queue(QUEUE_NAME1, true, false, false, null);
}
/**
- 聲明隊(duì)列 Queue
*/
@Bean
public Queue topicQueue2() {
return new Queue(QUEUE_NAME2, true, false, false, null);
}
/**
- 聲明隊(duì)列 Queue
*/
@Bean
public Queue topicQueue3() {
return new Queue(QUEUE_NAME3, true, false, false, null);
}
/**
- 聲明交換機(jī)
*/
@Bean
TopicExchange topicExchange() {
return new TopicExchange(EXCHANGE_NAME,true,false,null);
}
/**
- 交換機(jī)隊(duì)列綁定
- 路由鍵使用通配符
*/
@Bean
Binding bindingTopicExchange1() {
//消息攜帶的路由鍵是以"topic."開頭,就會(huì)分發(fā)到該隊(duì)列
return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("topic.#");
}
@Bean
Binding bindingTopicExchange2() {
//消息攜帶的路由鍵是包含.topic.,就會(huì)分發(fā)到該隊(duì)列
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with(".topic.");
}@Bean
Binding bindingTopicExchange3() {
//消息攜帶的路由鍵是以".topic"結(jié)尾,就會(huì)分發(fā)到該隊(duì)列
return BindingBuilder.bind(topicQueue3()).to(topicExchange()).with("#.topic");
}
} - 聲明隊(duì)列 Queue
###### 生產(chǎn)者
import com.alibaba.fastjson.JSONObject;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;
import java.text.SimpleDateFormat;
import java.util.Date;
@Controller
@RequestMapping(value = "topic")
public class TopicProducer {
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 向rabbitMq發(fā)送消息
*/
@RequestMapping(value = "sendMsg")
@ResponseBody
public String send(String messageId,String messageData,String routingKey) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String createTime = simpleDateFormat.format(new Date());
JSONObject jsonObject = new JSONObject();
jsonObject.put("messageId",messageId);
jsonObject.put("messageData",messageData);
jsonObject.put("createTime",createTime);
//將消息發(fā)送到隊(duì)列my_queue中
rabbitTemplate.convertAndSend("topic_exchange",routingKey, jsonObject.toString());
System.out.println("消息發(fā)送成功!路由鍵:"+routingKey);
return "true";
}
}
###### 消費(fèi)者
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class TopicReceiver {
@RabbitListener(queues = "topic_queue1")
public void receive1(String msgData, Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("topic_queue1消費(fèi)者收到消息 : " +msgData);
}
@RabbitListener(queues = "topic_queue2")
public void receive2(String msgData, Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("topic_queue2消費(fèi)者收到消息 : " +msgData);
}
@RabbitListener(queues = "topic_queue3")
public void receive3(String msgData, Message message, Channel channel) throws IOException {
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
System.out.println("topic_queue3消費(fèi)者收到消息 : " +msgData);
}
}
###### 發(fā)送消息后臺(tái)結(jié)果
消息發(fā)送成功!路由鍵:topic.one
topic_queue1消費(fèi)者收到消息 : {"createTime":"2022-10-07 12:05:17","messageId":"1","messageData":"sacascsac"}
消息發(fā)送成功!路由鍵:A.topic.B
topic_queue2消費(fèi)者收到消息 : {"createTime":"2022-10-07 12:05:46","messageId":"1","messageData":"sacascsac"}
消息發(fā)送成功!路由鍵:C.topic
topic_queue3消費(fèi)者收到消息 : {"createTime":"2022-10-07 12:06:02","messageId":"1","messageData":"sacascsac"}
消息發(fā)送成功!路由鍵:SFC.topic.AFBGB
topic_queue2消費(fèi)者收到消息 : {"createTime":"2022-10-07 12:06:52","messageId":"1","messageData":"sacascsac"}
消息發(fā)送成功!路由鍵:S.D.FC.topic.A.S.D.F (消息被丟棄)
#### 一個(gè)簡單的消息推送接收的流程

生產(chǎn)者產(chǎn)生消息,將消息推送到中間方框里面也就是rabbitMq的服務(wù)器,然后經(jīng)過服務(wù)器里面的交換機(jī)、隊(duì)列等各種關(guān)系將數(shù)據(jù)處理入列后,最終由右邊的消費(fèi)者獲取對應(yīng)監(jiān)聽的消息進(jìn)行消耗處理。