SpringBoot整合RabbitMQ
引入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
開(kāi)啟注解
@EnableRabbit
配置文件rabbitMQ
spring.rabbitmq.host=192.168.56.10
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
配置config
@Configuration
public class MyRabbitConfig {
// 配置使用json的方式序列化對(duì)象
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}
創(chuàng)建交換機(jī)
// 注入amqpAdmin
@Autowired
AmqpAdmin amqpAdmin;
// 創(chuàng)建交換機(jī)
// 交換機(jī)名稱 是否持久化 是否自動(dòng)刪除 相關(guān)參數(shù)
// String name, boolean durable, boolean autoDelete, Map<String, Object> arguments
DirectExchange directExchange = new DirectExchange("hello-java-exchange", true, false, null);
amqpAdmin.declareExchange(directExchange);
創(chuàng)建隊(duì)列
// 注入amqpAdmin
@Autowired
AmqpAdmin amqpAdmin;
// 創(chuàng)建隊(duì)列
// 隊(duì)列名稱 是否持久化 是否只能連接一個(gè)交換機(jī) 是否自動(dòng)刪除 相關(guān)參數(shù)
// String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
Queue queue = new Queue("hello-java-queue", true, false, false, null);
amqpAdmin.declareQueue(queue);
綁定交換機(jī)和隊(duì)列
// 注入amqpAdmin
@Autowired
AmqpAdmin amqpAdmin;
// 綁定交換機(jī)和隊(duì)列
// 將exchange指定的交換機(jī)和destination目的地進(jìn)行綁定(綁定隊(duì)列也可以綁定交換機(jī))使用routingKey作為指定的路由鍵
// String destination, Binding.DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments
Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE, "hello-java-exchange", "hello.java", null);
amqpAdmin.declareBinding(binding);
發(fā)送消息
// 發(fā)送消息
// 交換機(jī) 路由鍵 消息
// String exchange, String routingKey, Object object
rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", "hello world");
接收/監(jiān)聽(tīng)消息
- 使用@RabbitListener,要想使用必須開(kāi)啟@EnableRabbit
// queues:要監(jiān)聽(tīng)的隊(duì)列數(shù)組
// 監(jiān)聽(tīng)的內(nèi)容會(huì)自動(dòng)封裝到方法的參數(shù)上
@RabbitListener(queues = {"hello-java-queue"})
public void receiveMessage(Object message) {
System.out.println("message: " + message.toString());
System.out.println("類型:" + message.getClass());
System.out.println("接收到消息...內(nèi)容...");
}
// 接收監(jiān)聽(tīng)到的消息
message: (Body:'{"id":1,"name":"哈哈","sort":null,"status":null,"createTime":1618122170115}' MessageProperties [headers={__TypeId__=com.atguigu.gulimall.order.entity.OrderReturnReasonEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=hello-java-exchange, receivedRoutingKey=hello.java, deliveryTag=1, consumerTag=amq.ctag-0TUO4qdSSCcnsHUQ9XZyzg, consumerQueue=hello-java-queue])
類型:class org.springframework.amqp.core.Message
接收到消息...內(nèi)容...
由監(jiān)聽(tīng)的消息類型可以看出為class org.springframework.amqp.core.Message類型,因此可以改造參數(shù)類型
- Message message:原生消息詳細(xì)信息,頭+體
- T<發(fā)送的消息的類型>:OrderReturnReasonEntity content
- Channel channel:當(dāng)前傳輸數(shù)據(jù)的通道
@RabbitListener(queues = {"hello-java-queue"})
public void receiveMessage(Message message, ContentEntity content, Channel channel) {
// 獲取消息體,即json的數(shù)據(jù) {"id":1,"name":"哈哈","sort":null,"status":null,"createTime":1618122170115}
byte[] body = message.getBody();
// 獲取消息頭信息 即
// [headers={__TypeId__=com.atguigu.gulimall.order.entity.ContentEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=hello-java-exchange, receivedRoutingKey=hello.java, deliveryTag=1, consumerTag=amq.ctag-0TUO4qdSSCcnsHUQ9XZyzg, consumerQueue=hello-java-queue])
MessageProperties messageProperties = message.getMessageProperties();
System.out.println("message: " + message.toString());
System.out.println("類型:" + message.getClass());
System.out.println("轉(zhuǎn)化后消息...內(nèi)容...");
System.out.println("content: " + content.toString());
}
Queue隊(duì)列:可以很多人都來(lái)監(jiān)聽(tīng)。只要收到消息,隊(duì)列刪除消息,并且只能同時(shí)用且只有一個(gè)人接收到此消息
- 同一個(gè)消息,只能被一個(gè)客戶端收到
- 只有當(dāng)前消息隊(duì)列處理完成,才可以接收下一個(gè)消息隊(duì)列
使用場(chǎng)景
@RabbitHandler: 標(biāo)記方法
-
@RabbitListener:類+方法
使用@RabbitListener標(biāo)記在類上(重載區(qū)分不同的消息),說(shuō)明這個(gè)類就是用來(lái)接受消息隊(duì)列的方法類,在該類下的所以方法上標(biāo)記@RabbitHandler,每個(gè)方法指定不同的接收參數(shù),這樣就可以接收不同類型的消息
@RabbitListener
class receive {
@RabbitHandler(queues = {"hello-java-queue"})
public void receiveMessage1(Entity1 content) {
System.out.println("content1: " + content.toString());
}
@RabbitHandler(queues = {"hello-java-queue"})
public void receiveMessage2(Entity2 content) {
System.out.println("content2: " + content.toString());
}
}
RabbitMQ消息確認(rèn)機(jī)制-可靠抵達(dá)
- 保證消息不丟失,可靠抵達(dá),可以使用事務(wù)消息,性能下降250倍,為此引入確認(rèn)機(jī)制
- publisher confirmCallback 確認(rèn)模式(觸發(fā)時(shí)機(jī):服務(wù)端將消息發(fā)送給RabbitMQ所在的服務(wù)器)
- publisher returnCallback 未投遞到queue退出模式(觸發(fā)時(shí)機(jī):RabbitmQ所在的服務(wù)器調(diào)用交換機(jī)投遞給對(duì)應(yīng)隊(duì)列)
- consumer ack機(jī)制(觸發(fā)機(jī)制:消費(fèi)端成功獲取到消息隊(duì)列的消息)
可靠抵達(dá)-服務(wù)端確認(rèn)(confirmCallback 、returnCallback )
- 開(kāi)啟發(fā)送端確認(rèn)
# 開(kāi)啟發(fā)送端確認(rèn)
spring.rabbitmq.publisher-confirms=true
- 開(kāi)啟發(fā)送端消息抵達(dá)隊(duì)列的確認(rèn)
# 開(kāi)啟發(fā)送端消息抵達(dá)隊(duì)列的確認(rèn)
spring.rabbitmq.publisher-returns=true
# 只要發(fā)送端消息抵達(dá)隊(duì)列,以異步方式優(yōu)先回調(diào)這個(gè)returnConfirm(綁定一起使用)
spring.rabbitmq.template.mandatory=true
- 定制RabbitTemplate自定義confirmCallback 、returnCallback 觸發(fā)方法
/**
* 定制RabbitTemplate
* 1. MQ服務(wù)器收到消息就回調(diào)
* 1. spring.rabbitmq.publisher-confirms=true
* 2. 設(shè)置回調(diào)確認(rèn)confirmCallback
* 2. 消息正確抵達(dá)隊(duì)列進(jìn)行回調(diào)
* 1. spring.rabbitmq.publisher-returns=true
* 2. spring.rabbitmq.template.mandatory=true
* 3. 設(shè)置回調(diào)確認(rèn)returnCallback
*/
// PostConstruct: 當(dāng)MyRabbitConfig對(duì)象創(chuàng)建完再執(zhí)行該方法
@PostConstruct
public void initRabbitTemplate() {
// 設(shè)置MQ服務(wù)器收到消息回調(diào)
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
* 只要消息抵達(dá)MQ服務(wù)器ack就為true
* @param correlationData:當(dāng)前消息的唯一關(guān)聯(lián)數(shù)據(jù)(這個(gè)是消息的唯一id)即發(fā)送時(shí)傳的CorrelationData參數(shù)
* @param b:ack,消息是否成功還是失敗
* @param s:失敗的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("correlationData: " + correlationData);
System.out.println("ack: " + b);
System.out.println("s: " + s);
}
});
// 設(shè)置消息抵達(dá)隊(duì)列回調(diào)
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* 只要消息沒(méi)有投遞給指定的隊(duì)列,就觸發(fā)這個(gè)失敗回調(diào)
* @param message:投遞失敗的消息詳細(xì)信息
* @param i:回復(fù)的狀態(tài)碼
* @param s:回復(fù)的文本內(nèi)容
* @param s1:當(dāng)時(shí)這個(gè)消息發(fā)送給哪個(gè)交換機(jī)
* @param s2:當(dāng)時(shí)這個(gè)消息發(fā)送給哪個(gè)路由鍵
*/
@Override
public void returnedMessage(Message message, int i, String s, String s1, String s2) {
System.out.println("fail message: " + message);
System.out.println("i: " + i);
System.out.println("s: " + s);
System.out.println("s1: " + s1);
System.out.println("s2: " + s2);
}
});
}
可靠抵達(dá)-消費(fèi)端確認(rèn)(ack)
保證每個(gè)消息被正確消費(fèi),此時(shí)才可以MQ刪除這個(gè)消息
- basic.ack 用于肯定確認(rèn);MQ服務(wù)器會(huì)移除此消息
- basic.nack用于否定確認(rèn);可以指定MQ服務(wù)器是否丟棄此消息,可以批量
- basic.reject用于否定確認(rèn);跟nack使用一樣,但是不能批量
- 默認(rèn)是自動(dòng)ack,只要消息接收到,客戶端會(huì)自動(dòng)確認(rèn),服務(wù)端就會(huì)移除這個(gè)消息,如果客戶端在處理消息時(shí)候宕機(jī)則會(huì)丟失消息,因此要手動(dòng)確認(rèn),保證消息不丟失。當(dāng)客戶端宕機(jī)后,消息會(huì)從unacked狀態(tài)變成ready狀態(tài),當(dāng)下一次新的客戶端連接進(jìn)來(lái)再將消息重新發(fā)送給客戶端
# 設(shè)置客戶端手動(dòng)確認(rèn)接受到消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual
@RabbitListener(queues = {"hello-java-queue"})
public void receiveMessage1(Message message, Content content, Channel channel) {
System.out.println("content1: " + content.toString());
// 通道內(nèi)按順序自增
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 確認(rèn)消息接收成功,非批量簽收模式
// long deliveryTag, boolean multipe (當(dāng)前消息的標(biāo)簽,是否批量簽收)
channel.basicAck(deliveryTag, false);
// 消息接收成功,但是拒絕簽收消息
// long deliveryTag, boolean multipe, boolean requeue (當(dāng)前消息的標(biāo)簽,是否批量簽收,是否重新入隊(duì)(false丟掉消息,true將消息重新入隊(duì)))
channel.basicNack(deliveryTag,false,false);
} catch (IOException e) {
// 網(wǎng)絡(luò)中斷
}
}