一、SpringBoot中使用RabbitMQ
1. 導(dǎo)入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2. yml配置
spring:
rabbitmq:
host: 192.168.42.4
port: 5672
username: aruba
password: aruba
virtual-host: /
listener:
direct:
acknowledge-mode: manual # 手動(dòng)ack
simple:
prefetch: 1 # 流控
concurrency: 10 # 多線程監(jiān)控
3. 配置交換機(jī)和隊(duì)列
@Configuration
public class RabbitMQConfig {
public static final String EXCHANGE_NAME = "MY-MQ-EX";
public static final String QUEUE_NAME = "MY-MQ-QUEUE";
public static final String ROUTING_KEY = "key.#";
/**
* 注入交換機(jī)
*
* @return
*/
@Bean
public Exchange exchangeProvider() {
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).build();
}
/**
* 注入隊(duì)列
*
* @return
*/
@Bean
public Queue queueProvider() {
return QueueBuilder.durable(QUEUE_NAME).build();
}
/**
* 注入交換機(jī)隊(duì)列綁定關(guān)系
*
* @return
*/
@Bean
public Binding bootBinding(Exchange exchangeProvider, Queue queueProvider) {
return BindingBuilder.bind(queueProvider).to(exchangeProvider).with(ROUTING_KEY).noargs();
}
}
4. 發(fā)送消息
SpringBoot中使用RabbitTemplate自動(dòng)注入,即可發(fā)送消息,并對(duì)方法都進(jìn)行了封裝
@SpringBootTest
class SpringbootRabbitmqApplicationTests {
@Autowired
public RabbitTemplate rabbitTemplate;
@Test
void send() {
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "key.send", "發(fā)送消息");
}
/**
* 攜帶信息的消息
*/
@Test
void sendWithProps() {
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,
"key.send", "發(fā)送消息", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setCorrelationId(UUID.randomUUID().toString());
return message;
}
});
}
}
5. 訂閱消息
在方法上使用@RabbitListener注解,即可指定訂閱隊(duì)列。
入?yún)⑻砑?code>Channel,就可以和之前一樣發(fā)送ack。
將消息封裝成了Message,可以獲取其攜帶信息。
@Component
public class MQListener {
@RabbitListener(queues = RabbitMQConfig.QUEUE_NAME)
public void consume(String msg, Channel channel, Message message) throws IOException {
System.out.println("隊(duì)列的消息為:" + msg);
String correlationId = message.getMessageProperties().getCorrelationId();
System.out.println("唯一標(biāo)識(shí)為:" + correlationId);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
運(yùn)行結(jié)果:

二、消息可靠性
由于RabbitMQ在發(fā)送消息和訂閱消息時(shí),都是通過(guò)網(wǎng)絡(luò)傳輸,其間必然會(huì)出現(xiàn)由網(wǎng)絡(luò)問(wèn)題產(chǎn)生的消息丟失情況,要保證消息的可靠性從下面四點(diǎn)出發(fā):
- 保證消息發(fā)送到交換機(jī)
- 保證消息路由到隊(duì)列
- 保證隊(duì)列中消息的持久化
- 保證消費(fèi)者正常消費(fèi)消息
1. Client-API方式
1.1 保證消息發(fā)送到交換機(jī)
Publisher Confirms就是為了保證消息發(fā)送到交換機(jī)的機(jī)制,一般使用異步的方式:
//4. 開(kāi)啟confirm
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息成功發(fā)送到交換機(jī)");
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("沒(méi)有送達(dá)交換機(jī)");
}
});
1.2 保證消息路由到隊(duì)列
addReturnListener方法可以確認(rèn)消息是否路由到了隊(duì)列,如果回調(diào)了說(shuō)明沒(méi)有路由到隊(duì)列
發(fā)送消息時(shí),指定mandatory參數(shù)為true
//5. 設(shè)置return回調(diào),確認(rèn)消息是否路由到了隊(duì)列
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("交換機(jī)沒(méi)有路由到隊(duì)列");
}
});
//參數(shù): 交換機(jī) routing-Key mandatory 消息其他參數(shù) 消息
channel.basicPublish("", QUEUE_NAME, true, null, message.getBytes());
1.3 保證隊(duì)列中消息的持久化
首先保證隊(duì)列的持久化,再保證消息的持久化
//3. 構(gòu)建隊(duì)列 參數(shù):隊(duì)列名 是否持久化 是否排外(只允許一個(gè)消費(fèi)者) 長(zhǎng)時(shí)間未使用是否自動(dòng)刪除 其他參數(shù)
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//6. 發(fā)送消息
String message = "hello confirm";
AMQP.BasicProperties porps = new AMQP.BasicProperties().builder()
.deliveryMode(2) //2:消息持久化 1: 不持久化
.build();
//參數(shù): 交換機(jī) routing-Key mandatory 消息其他參數(shù) 消息
channel.basicPublish("", QUEUE_NAME, true, porps, message.getBytes());
1.4 保證消費(fèi)者正常消費(fèi)消息
保證消費(fèi)者正常消費(fèi)消息只需要手動(dòng)ack即可,生產(chǎn)者完整代碼:
public class Publisher {
private static final String QUEUE_NAME = "confirm";
@Test
public void publisher() throws Exception {
//1. 獲取連接對(duì)象
Connection connection = RBConnectionUtil.getConnection();
//2. 創(chuàng)建信道
Channel channel = connection.createChannel();
//3. 構(gòu)建隊(duì)列 參數(shù):隊(duì)列名 是否持久化 是否排外(只允許一個(gè)消費(fèi)者) 長(zhǎng)時(shí)間未使用是否自動(dòng)刪除 其他參數(shù)
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//4. 開(kāi)啟confirm
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息成功發(fā)送到交換機(jī)");
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("沒(méi)有送達(dá)交換機(jī)");
}
});
//5. 設(shè)置return回調(diào),確認(rèn)消息是否路由到了隊(duì)列
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("交換機(jī)沒(méi)有路由到隊(duì)列");
}
});
//6. 發(fā)送消息
String message = "hello confirm";
AMQP.BasicProperties porps = new AMQP.BasicProperties().builder()
.deliveryMode(2) //2:消息持久化 1: 不持久化
.build();
//參數(shù): 交換機(jī) routing-Key mandatory 消息其他參數(shù) 消息
channel.basicPublish("", QUEUE_NAME, true, porps, message.getBytes());
}
}
2. SpringBoot方式
2.1 配置Confirm
yml中開(kāi)啟confirm:
spring:
rabbitmq:
publisher-confirm-type: correlated
RabbitTemplate設(shè)置回調(diào):
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息成功送達(dá)到交換機(jī)");
} else {
System.out.println("消息沒(méi)有送達(dá)到交換機(jī)");
}
}
});
2.2 配置Return
yml中開(kāi)啟return:
spring:
rabbitmq:
publisher-returns: true
RabbitTemplate設(shè)置回調(diào):
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println(String.format("交換機(jī):%s 路由消息失敗", returned.getExchange()));
}
});
2.3 消息持久化
設(shè)置Message的攜帶信息:
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,
"key.send", "發(fā)送消息", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
}
});
完整代碼:
/**
* 攜帶信息的消息
*/
@Test
void sendWithProps() {
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("消息成功送達(dá)到交換機(jī)");
} else {
System.out.println("消息沒(méi)有送達(dá)到交換機(jī)");
}
}
});
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println(String.format("交換機(jī):%s 路由消息失敗", returned.getExchange()));
}
});
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,
"key.send", "發(fā)送消息", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
message.getMessageProperties().setCorrelationId(UUID.randomUUID().toString());
return message;
}
});
}
三、死信隊(duì)列
死信隊(duì)列是存放本來(lái)應(yīng)該死亡的消息的隊(duì)列,用于對(duì)這些消息的特殊處理(如:重新入隊(duì)、持久化到數(shù)據(jù)庫(kù)),具體有以下幾種消息會(huì)被存放進(jìn)死信隊(duì)列:
- 消費(fèi)者拒絕的消息,并requeue設(shè)置為false(不重新入隊(duì)列)
- 消息的生存時(shí)間到了,還在隊(duì)列中的信息
- 隊(duì)列設(shè)置了整體的消息生存時(shí)間,到了生存時(shí)間的消息
- 到達(dá)隊(duì)列中消息最大數(shù),再路由過(guò)來(lái)的消息
1. 構(gòu)建交換機(jī)
死信隊(duì)列需要一個(gè)死信交換機(jī),并把正常消息的隊(duì)列綁定死信交換機(jī):
@Configuration
public class DeadLetterConfig {
public static final String NORMAL_EXCHANGE_NAME = "normal-ex";
public static final String NORMAL_QUEUE_NAME = "normal-queue";
public static final String NORMAL_ROUTING_KEY = "normal.#";
public static final String DEAD_EXCHANGE_NAME = "dead-ex";
public static final String DEAD_QUEUE_NAME = "dead-queue";
public static final String DEAD_ROUTING_KEY = "dead.#";
@Bean
public Exchange normalExchange() {
return ExchangeBuilder.topicExchange(NORMAL_EXCHANGE_NAME).build();
}
@Bean
public Queue normalQueue() {
// 綁定死信交換機(jī)
return QueueBuilder.durable(NORMAL_QUEUE_NAME)
.deadLetterExchange(DEAD_EXCHANGE_NAME)
.deadLetterRoutingKey("dead.msg") //準(zhǔn)備入死信隊(duì)列的消息重新設(shè)置routin-key
.build();
}
@Bean
public Binding normalBinding(Exchange normalExchange, Queue normalQueue) {
return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY).noargs();
}
@Bean
public Exchange deadExchange() {
return ExchangeBuilder.topicExchange(DEAD_EXCHANGE_NAME).build();
}
@Bean
public Queue deadQueue() {
return QueueBuilder.durable(DEAD_QUEUE_NAME).build();
}
@Bean
public Binding deadBinding(Exchange deadExchange, Queue deadQueue) {
return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();
}
}
2. 死信隊(duì)列的實(shí)現(xiàn)方式
2.1 拒絕消息入死信隊(duì)列
對(duì)正常隊(duì)列消息進(jìn)行監(jiān)聽(tīng),來(lái)做相應(yīng)的處理,首先是拒絕消息,并且要把requeue設(shè)為false:
@Component
public class DeadListener {
@RabbitListener(queues = DeadLetterConfig.NORMAL_QUEUE_NAME)
public void normalListener(Message msg, Channel channel) throws IOException {
System.out.println("接收到正常隊(duì)列消息:" + new String(msg.getBody()));
channel.basicReject(msg.getMessageProperties().getDeliveryTag(), false);
// channel.basicNack(msg.getMessageProperties().getDeliveryTag(), false, false);
}
}
嘗試發(fā)送一個(gè)消息:
@Test
public void sendNormal() {
rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE_NAME, "normal.msg", "哈嘍");
}
運(yùn)行結(jié)果:

2.2 消息生存時(shí)間
發(fā)送消息時(shí),通過(guò)消息的額外參數(shù)MessageProperties的setExpiration方法設(shè)置過(guò)期時(shí)間:
@Test
public void sendExpire() {
rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE_NAME,
"normal.msg", "哈嘍",
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 該消息10s后過(guò)期
message.getMessageProperties().setExpiration("10000");
return message;
}
});
}
記得把上面消息的監(jiān)聽(tīng)注釋掉,否則會(huì)消費(fèi)消息
運(yùn)行結(jié)果:

2.3 隊(duì)列消息的整體生存時(shí)間
管理頁(yè)面把之前的正常隊(duì)列刪除,在重新創(chuàng)建時(shí),為正常隊(duì)列設(shè)置ttl:

設(shè)置ttl:
@Bean
public Queue normalQueue() {
// 綁定死信交換機(jī)
return QueueBuilder.durable(NORMAL_QUEUE_NAME)
.ttl(5000) // 整體消息過(guò)期時(shí)間為5s
.deadLetterExchange(DEAD_EXCHANGE_NAME)
.deadLetterRoutingKey("dead.msg") // 準(zhǔn)備入死信隊(duì)列的消息重新設(shè)置routin-key
.build();
}
發(fā)送正常消息,運(yùn)行結(jié)果:

2.4 達(dá)到隊(duì)列最大數(shù)
同樣先刪除正常隊(duì)列,后調(diào)用maxLength為隊(duì)列設(shè)置最大消息數(shù):
@Bean
public Queue normalQueue() {
// 綁定死信交換機(jī)
return QueueBuilder.durable(NORMAL_QUEUE_NAME)
// .ttl(5000) // 整體消息過(guò)期時(shí)間為5s
.maxLength(1) // 設(shè)置消息最大數(shù)
.deadLetterExchange(DEAD_EXCHANGE_NAME)
.deadLetterRoutingKey("dead.msg") // 準(zhǔn)備入死信隊(duì)列的消息重新設(shè)置routin-key
.build();
}
發(fā)送兩次正常消息,運(yùn)行結(jié)果:

四、延遲交換機(jī)
死信隊(duì)列的問(wèn)題:由于死信隊(duì)列只會(huì)監(jiān)聽(tīng)隊(duì)列頭的過(guò)期時(shí)間,一旦隊(duì)列頭的消息過(guò)期時(shí)間比后面排隊(duì)的消息過(guò)期時(shí)間長(zhǎng),那么后面消息的過(guò)期時(shí)間并不會(huì)生效,而是等待隊(duì)列頭的過(guò)期時(shí)間到了后,才一并進(jìn)入死信隊(duì)列
刪除正常隊(duì)列,恢復(fù)配置:
@Bean
public Queue normalQueue() {
// 綁定死信交換機(jī)
return QueueBuilder.durable(NORMAL_QUEUE_NAME)
// .ttl(5000) // 整體消息過(guò)期時(shí)間為5s
// .maxLength(1) // 設(shè)置消息最大數(shù)
.deadLetterExchange(DEAD_EXCHANGE_NAME)
.deadLetterRoutingKey("dead.msg") // 準(zhǔn)備入死信隊(duì)列的消息重新設(shè)置routin-key
.build();
}
發(fā)送兩次消息,第一次過(guò)期時(shí)間為30s,第二次為2s:
@Test
public void sendExpire30() {
rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE_NAME,
"normal.msg", "哈嘍",
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("30000");
return message;
}
});
}
@Test
public void sendExpire2() {
rabbitTemplate.convertAndSend(DeadLetterConfig.NORMAL_EXCHANGE_NAME,
"normal.msg", "哈嘍",
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("2000");
return message;
}
});
}
結(jié)果,過(guò)了幾秒后,隊(duì)列中還是兩個(gè)消息:

解決方法:根據(jù)時(shí)間創(chuàng)建多個(gè)隊(duì)列或者使用延遲交換機(jī)
延遲交換機(jī)是一個(gè)插件,默認(rèn)并不帶,原理就是將消息暫時(shí)放在交換機(jī)中,由交換機(jī)根據(jù)消息過(guò)期時(shí)間的先后來(lái)路由到隊(duì)列,缺點(diǎn):由于消息在交換機(jī)中,重啟會(huì)導(dǎo)致消息的丟失
1. 插件下載和使用
根據(jù)自己的RabbitMQ版本進(jìn)行下載:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.8.17/rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez
mv rabbitmq_delayed_message_exchange-3.8.17.8f537ac.ez /usr/local/rabbitmq/rabbitmq_server-3.8.35/plugins
啟動(dòng)插件:
cd /usr/local/rabbitmq/rabbitmq_server-3.8.35/sbin
./rabbitmq-plugins enable rabbitmq_delayed_message_exchange
重啟服務(wù)或系統(tǒng)后,多了一個(gè)x-delayed-message的交換機(jī)類型:

2. 配置延遲交換機(jī)
使用CustomExchange構(gòu)造x-delayed-message類型交換機(jī),并使用其他參數(shù)x-delayed-type指定使用哪種原型交換機(jī)類型,這邊使用的是topic:
@Configuration
public class DelayExchangeConfig {
public static final String EXCHANGE_NAME = "delay-exchange";
public static final String DELAY_QUEUE = "delay_queue";
public static final String DELAY_ROUTIN_KEY = "delay.#";
@Bean
public Exchange delayExchange() {
Map<String, Object> args = new HashMap<>();
// 使用哪種原型交換機(jī)類型
args.put("x-delayed-type", "topic");
Exchange exchange = new CustomExchange(EXCHANGE_NAME, "x-delayed-message", true, false, args);
return exchange;
}
@Bean
public Queue delayQueue() {
return QueueBuilder.durable(DELAY_QUEUE).build();
}
@Bean
public Binding delayBinding(Queue delayQueue, Exchange delayExchange) {
return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTIN_KEY).noargs();
}
}
3. 發(fā)送消息
MessageProperties使用setDelay方法為消息設(shè)置延遲:
@Test
public void sendDelay30() {
rabbitTemplate.convertAndSend(DelayExchangeConfig.EXCHANGE_NAME,
"delay.msg", "哈嘍",
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(30000);
return message;
}
});
}
@Test
public void sendDelay5() {
rabbitTemplate.convertAndSend(DelayExchangeConfig.EXCHANGE_NAME,
"delay.msg", "哈嘍",
new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(5000);
return message;
}
});
}
消息在交換機(jī)進(jìn)行等待后,首先入隊(duì)列的為5秒延遲的,后面入隊(duì)列的為30秒延遲的:

五、集群
1. 配置主機(jī)名
RabbitMQ集群的搭建要配置主機(jī)名:HOSTNAME,先修改network配置文件
vi /etc/sysconfig/network
追加HOSTNAME:
HOSTNAME=rabbit1

再修改hosts文件:
vi /etc/hosts
追加內(nèi)容:
192.168.42.4 rabbit1

重啟系統(tǒng)后,RabbitMQ先前配置的管理賬號(hào)會(huì)丟失,需要重新配置
2. 克隆虛擬機(jī)
2.1 從機(jī)主機(jī)名配置

克隆后,對(duì)從機(jī)進(jìn)行主機(jī)名的配置,network配置文件:

hosts文件,中需要添加集群主節(jié)點(diǎn)的ip和hostname:

2.2 建立集群關(guān)聯(lián)
啟動(dòng)RabbitMQ服務(wù)后,管理界面的節(jié)點(diǎn)會(huì)帶上主機(jī)名:

接下來(lái),配置從機(jī)加入到主節(jié)點(diǎn)集群中,執(zhí)行以下命令即可:
cd /usr/local/rabbitmq/rabbitmq_server-3.8.35/sbin/
./rabbitmqctl stop_app
./rabbitmqctl reset
./rabbitmqctl join_cluster rabbit@rabbit1
./rabbitmqctl start_app
加入成功后,管理界面中就會(huì)出現(xiàn)多個(gè)節(jié)點(diǎn):

3. 配置鏡像模式
目前集群是普通模式,隊(duì)列中的消息只會(huì)存在于一個(gè)節(jié)點(diǎn)上,而不會(huì)同步到其他隊(duì)列,一旦該節(jié)點(diǎn)宕機(jī),其他節(jié)點(diǎn)將無(wú)法訪問(wèn)消息。
鏡像模式是指,集群中所有節(jié)點(diǎn)都有一份單獨(dú)的拷貝,即使單一節(jié)點(diǎn)宕機(jī),其他節(jié)點(diǎn)中依然存在消息的拷貝,這樣才能實(shí)現(xiàn)高可用
在管理界面進(jìn)行配置鏡像策略:

新建一個(gè)隊(duì)列,并查看詳情:
