?---實(shí)踐是檢驗(yàn)真理的唯一標(biāo)準(zhǔn)---
yml參數(shù)配置
這次我使用的是RabbitTemplate
rabbitmq:
host: 192.168.225.136
port: 5672
username: thinking
password: 123
virtual-host: host1
publisher-returns: true
# 事務(wù)模式下這行需要?jiǎng)h除
publisher-confirm-type: correlated
template:
# 找不到路由規(guī)則的消息 是否保留
mandatory: true
為什么Template不需要定義configuration文件來(lái)接收yml文件的參數(shù)?
這是個(gè)常識(shí)問(wèn)題,我這里做個(gè)記錄。。。
我都能忘記昨天吃了東西的,好在我喜歡做筆記。。。
springboot中何時(shí)加載Template,可以仔細(xì)看看自動(dòng)裝配注解:EnableAutoConfiguration
這類Template模板的初始化有個(gè)Properties文件,不如:
RabbitProperties
RedisProperties
方法中注解:ConfigurationProperties 指定了默認(rèn)取得yml格式內(nèi)容
至于具體的屬性可以找set方法
我們的demo都是基于RabbitTemplate來(lái)寫(xiě)。。。
初始化數(shù)據(jù)
通過(guò)枚舉ExchangeEnum、QueueEnum、BindingEnum動(dòng)態(tài)維護(hù)和創(chuàng)建
1.初始化交換機(jī)
@Bean("createExchange")
public Object createExchange(RabbitAdmin rabbitAdmin) {
// 遍歷交換機(jī)枚舉
ExchangeEnum.toList().forEach(exchangeEnum -> {
// 根據(jù)交換機(jī)模式 生成不同的交換機(jī)
switch (exchangeEnum.getType()) {
case fanout:
rabbitAdmin.declareExchange(new FanoutExchange(exchangeEnum.getExchangeName(),
exchangeEnum.isDurable(), exchangeEnum.isAutoDelete()));
break;
case topic:
rabbitAdmin.declareExchange(new TopicExchange(exchangeEnum.getExchangeName(),
exchangeEnum.isDurable(), exchangeEnum.isAutoDelete()));
break;
case direct:
rabbitAdmin.declareExchange(new DirectExchange(exchangeEnum.getExchangeName(),
exchangeEnum.isDurable(), exchangeEnum.isAutoDelete()));
break;
}
});
return null;
}
2.初始化隊(duì)列
@Bean("createQueue")
public Object createQueue(RabbitAdmin rabbitAdmin) {
// 遍歷隊(duì)列枚舉 將隊(duì)列注冊(cè)到spring bean工廠 讓spring實(shí)現(xiàn)隊(duì)列的管理
QueueEnum.toList().forEach(queueEnum -> {
rabbitAdmin.declareQueue(new Queue(queueEnum.getName(),
queueEnum.isDurable(), queueEnum.isExclusive(), queueEnum.isAutoDelete(), queueEnum.getArguments()));
});
return null;
}
3.交換機(jī)和隊(duì)列綁定
@Bean("createBinding")
public Object createBinding(RabbitAdmin rabbitAdmin) {
// 遍歷隊(duì)列枚舉 將隊(duì)列綁定到指定交換機(jī)
BindingEnum.toList().forEach(bindingEnum -> {
// 交換機(jī)
ExchangeEnum exchangeEnum = bindingEnum.getExchangeEnum();
// queue
QueueEnum queueEnum = bindingEnum.getQueueEnum();
// 綁定
rabbitAdmin.declareBinding(new Binding(
// queue名稱
queueEnum.getName(),
Binding.DestinationType.QUEUE,
// exchange名稱
exchangeEnum.getExchangeName(),
// queue的routingKey
queueEnum.getRoutingKey(),
// 綁定的參數(shù)
bindingEnum.getArguments()));
});
return null;
}
延遲隊(duì)列
1.定義隊(duì)列
/**
* 超時(shí)隊(duì)列---不需要定義RabbitListener方法
*/
deal_queue("deal_queue", "deal.queue", true, false, false, dealParams()),
/**
* 超時(shí)接收隊(duì)列
*/
reply_queue("reply_queue", "reply.queue", true, false, false, null),
public static Map<String, Object> dealParams(){
// reply_to 隊(duì)列
Map<String,Object> map = new HashMap<>();
//設(shè)置消息的過(guò)期時(shí)間 單位毫秒
map.put("x-message-ttl",10000);
//設(shè)置附帶的死信交換機(jī)
map.put("x-dead-letter-exchange","reply_exchange");
//指定重定向的路由建 消息作廢之后可以決定需不需要更改他的路由建 如果需要 就在這里指定
map.put("x-dead-letter-routing-key","reply.queue");
return map;
}
2.定義交換機(jī)
/**
* 超時(shí)交換機(jī)
*/
deal_exchange("deal_exchange", ExchangeTypeEnum.topic, true, false),
/**
* 超時(shí)接收交換機(jī)
*/
reply_exchange("reply_exchange", ExchangeTypeEnum.topic, true, false),
3.交換機(jī)和隊(duì)列綁定
deal_binding(ExchangeEnum.deal_exchange, QueueEnum.deal_queue, null),
reply_binding(ExchangeEnum.reply_exchange, QueueEnum.reply_queue, null)
4.不定義超時(shí)隊(duì)列的@RabbitListener,只定義超時(shí)接收隊(duì)列的@RabbitListener
@RabbitListener(queues = {"reply_queue"})
@RabbitHandler
public void reply_queue(Message message, Channel channel) throws Exception {
System.err.println("消費(fèi)端-reply: " + new String(message.getBody(), "UTF-8"));
Long deliveryTag = message.getMessageProperties().getDeliveryTag();
channel.basicAck(deliveryTag, false);
}
測(cè)試:
/**
* 延遲隊(duì)列測(cè)試
*/
public void deal_queue_test() {
ExchangeEnum exchangeEnum = BindingEnum.deal_binding.getExchangeEnum();
QueueEnum queueEnum = BindingEnum.deal_binding.getQueueEnum();
// 消息
String message = "11111111111111111111111111111111111111";
MessageProperties messageProperties = getMessageProperties();
// 發(fā)送
rabbitTemplate.convertSendAndReceive(
exchangeEnum.getExchangeName(),
queueEnum.getRoutingKey(),
new Message(message.getBytes(), messageProperties));
}
異步隊(duì)列
1.AsyncRabbitTemplate定義
/**
* 異步隊(duì)列
* @param rabbitTemplate
* @return
*/
@Bean
public AsyncRabbitTemplate asyncRabbitTemplate(RabbitTemplate rabbitTemplate){
AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate);
asyncRabbitTemplate.setReceiveTimeout(50000);
return asyncRabbitTemplate;
}
2.測(cè)試
public void async() {
System.err.println("---------------async--------------start---------");
AsyncRabbitTemplate.RabbitConverterFuture<Object> future = asyncRabbitTemplate.convertSendAndReceive("reply_exchange", "reply.queue", "123123123");
// 配置下面代碼時(shí) 如果 隊(duì)列監(jiān)聽(tīng)中沒(méi)有返回值時(shí)會(huì)報(bào)錯(cuò)
future.addCallback(new ListenableFutureCallback<Object>() {
@Override
public void onFailure(Throwable ex) {
ex.printStackTrace();
}
@Override
public void onSuccess(Object result) {
System.out.println("回調(diào)收到結(jié)果=> " + result);
}
});
System.err.println("---------------async--------------end---------");
}
3.監(jiān)聽(tīng)方法
@RabbitListener(queues = {"async_queue"})
@RabbitHandler
public Object async_queue(Message message, Channel channel) throws Exception {
System.err.println("消費(fèi)端-async: " + new String(message.getBody(), "UTF-8"));
return "ok";
}
Java api
1.消息回退:
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
long deliveryTag:消息唯一標(biāo)識(shí),這是RabbitMQ自動(dòng)生成的,不需要人為管理,只需要從message.getMessageProperties().getDeliveryTag() 就可以獲得。
boolean multiple:是否批量退回,不開(kāi)啟就使用false,開(kāi)啟批量退回需要增加自己的業(yè)務(wù)判斷邏輯(比如:攢夠幾條再批量回退,或者設(shè)置等待間隔等等)
boolean requeue:是否退回到消息隊(duì)列,退回就使用true,就是交給其他消費(fèi)者處理。
2.拒絕消息
void basicReject(long deliveryTag, boolean requeue) throws IOException;
deliveryTag:發(fā)布的每一條消息都會(huì)獲得一個(gè)唯一的deliveryTag,它在channel范圍內(nèi)是唯一的
requeue:表示如何處理這條消息,為true表示重新放入RabbitMQ的發(fā)送隊(duì)列中,為false表示通知RabbitMQ銷毀該消息
3.確認(rèn)ack
void basicAck(long deliveryTag, boolean multiple) throws IOException;
deliveryTag:該消息的index
multiple:是否批量.true:將一次性ack所有小于deliveryTag的消息。
4.創(chuàng)建一個(gè)隊(duì)列
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;
?
durable:true、false true:在服務(wù)器重啟時(shí),能夠存活
exclusive :是否為當(dāng)前連接的專用隊(duì)列,在連接斷開(kāi)后,會(huì)自動(dòng)刪除該隊(duì)列,生產(chǎn)環(huán)境中應(yīng)該很少用到吧。
autodelete:當(dāng)沒(méi)有任何消費(fèi)者使用時(shí),自動(dòng)刪除該隊(duì)列
5.啟動(dòng)一個(gè)消費(fèi)者,并返回服務(wù)端生成的消費(fèi)者標(biāo)識(shí)
/**
* queue:隊(duì)列名
* autoAck:true 接收到傳遞過(guò)來(lái)的消息后acknowledged(應(yīng)答服務(wù)器),false 接收到消息后不應(yīng)答服務(wù)器
* consumerTag:客戶端生成的一個(gè)消費(fèi)者標(biāo)識(shí)
* nolocal:如果服務(wù)器不應(yīng)將在此通道連接上發(fā)布的消息傳遞給此使用者,則為true;請(qǐng)注意RabbitMQ服務(wù)器上不支持此標(biāo)記
* exclusive: 如果是單個(gè)消費(fèi)者,則為true
* arguments:消費(fèi)的一組參數(shù)
* deliverCallback: 當(dāng)一個(gè)消息發(fā)送過(guò)來(lái)后的回調(diào)接口
* cancelCallback:當(dāng)一個(gè)消費(fèi)者取消訂閱時(shí)的回調(diào)接口;取消消費(fèi)者訂閱隊(duì)列時(shí)除了使用{@link Channel#basicCancel}之外的所有方式都會(huì)調(diào)用該回調(diào)方法
* shutdownSignalCallback: 當(dāng)channel/connection 關(guān)閉后回調(diào)
*/
channel.basicConsume(QUEUE_NAME, true, ctag, false, false, arguments, deliverCallback, consumerTag -> {}, (consumerTag, sig) -> {});
6.取消消費(fèi)者訂閱
/**
* 取消消費(fèi)者對(duì)隊(duì)列的訂閱關(guān)系
* consumerTag:服務(wù)器端生成的消費(fèi)者標(biāo)識(shí)
**/
void basicCancel(String consumerTag)
7.主動(dòng)拉取隊(duì)列中的一條消息
/**
* 從消息隊(duì)列中取出第一條消息;整個(gè)方法的執(zhí)行過(guò)程是首先消費(fèi)隊(duì)列,然后檢索第一條消息,然后再取消訂閱
*/
GetResponse response = channel.basicGet(QUEUE_NAME, true);
System.out.println("消費(fèi)者接收到的消息是:"+new String(response.getBody(), "UTF-8"));
參數(shù)介紹
1.隊(duì)列參數(shù)
x-dead-letter-exchange 死信交換機(jī)
x-dead-letter-routing-key 死信消息重定向路由鍵
x-expires 隊(duì)列在指定毫秒數(shù)后被刪除
x-ha-policy 創(chuàng)建HA隊(duì)列
x-ha-nodes HA隊(duì)列的分布節(jié)點(diǎn)
x-max-length 隊(duì)列的最大消息數(shù)
x-message-ttl 毫秒為單位的消息過(guò)期時(shí)間,隊(duì)列級(jí)別
x-max-priority 最大優(yōu)先值為255的隊(duì)列優(yōu)先排序功能
2.消息參數(shù)
content-type 消息體的MIME類型,如application/json
content-encoding 消息的編碼類型
message-id 消息的唯一性標(biāo)識(shí),由應(yīng)用進(jìn)行設(shè)置
correlation-id 一般用做關(guān)聯(lián)消息的message-id,常用于消息的響應(yīng)
timestamp 消息的創(chuàng)建時(shí)刻,整形,精確到秒
完整項(xiàng)目地址在微信公眾中,謝謝大家支持
