一. 前言
消息隊(duì)列一般有三部分:生產(chǎn)者,隊(duì)列本身和消費(fèi)者。消息出現(xiàn)問題,一般也就圍繞這三部分。
二. 消息丟失處理
2.1 發(fā)送端確認(rèn)機(jī)制
發(fā)送端確認(rèn)(publisher confirm)機(jī)制。生產(chǎn)者將信道設(shè)置成confirm(確認(rèn))模式,一旦信道進(jìn)入confirm 模式,所有在該信道上面發(fā)布的消息都會(huì)被指派一個(gè)唯一的ID(從1 開始),一旦消息被投遞到所有匹配的隊(duì)列之后(如果消息和隊(duì)列是持久化的,那么確認(rèn)消息會(huì)在消息持久化后發(fā)出),RabbitMQ 就會(huì)發(fā)送一個(gè)確認(rèn)(Basic.Ack)給生產(chǎn)者(包含消息的唯一ID),這樣生產(chǎn)者就知道消息已經(jīng)正確送達(dá)了。
如果rabbitMQ沒能處理該條消息,比如因?yàn)樽陨韮?nèi)部錯(cuò)誤導(dǎo)致消息丟失等異常情況發(fā)生,就會(huì)發(fā)送一個(gè)Basic.Nack給生產(chǎn)者,此時(shí)程序可以進(jìn)行重試等操作。
2.1.1 程序案例
- application.properties配置
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
- controller代碼
private RabbitTemplate rabbitTemplate;
@Autowired
public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
this.rabbitTemplate.setConfirmCallback((correlationData,
flag, cause) -> {
if (flag) {
try {
System.out.println("消息確認(rèn):" +
correlationData.getId() + " "
+ new String(correlationData.getReturnedMessage().getBody(), "utf-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
} else {
System.out.println(cause);
}
});
}
核心業(yè)務(wù)代碼
CorrelationData correlationData = new CorrelationData();//correlationData消息唯一id
String cId = UUID.randomUUID().toString();
correlationData.setId(cId);
correlationData.setReturnedMessage(new Message("這是msg1的響應(yīng)".getBytes("utf-8"), null));
log.info("消息UUID為:{}", cId);
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUNTING_KEY, "your message",
correlationData);
2.2.2 事務(wù)
一但事務(wù)提交后都沒有異常,確實(shí)就說明消息是投遞成功了
Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(true);
try {
channel.txSelect();
channel.basicPublish(EXCHANGE_NAME, ROUNTING_KEY, null, "your message".getBytes());
channel.txCommit();
} catch (IOException e) {
e.printStackTrace();
try {
channel.txRollback();
} catch (IOException ex) {
ex.printStackTrace();
}
}
注意開啟事務(wù)吞吐量會(huì)下降。
在下一篇文章中,我們會(huì)討論下隊(duì)列消息持久化機(jī)制。