基于Spring的RabbitTemplate實(shí)現(xiàn)消息事務(wù)
分布式系統(tǒng)中常見(jiàn)一種情況,就是數(shù)據(jù)庫(kù)操作成功之后發(fā)送MQ消息。
分布式消息常見(jiàn)問(wèn)題
數(shù)據(jù)庫(kù)操作之后發(fā)送MQ消息通常會(huì)遇到一些問(wèn)題,理論上消息和事務(wù)要同時(shí)成功才算一個(gè)完整的事務(wù),那到底該把發(fā)送MQ放到數(shù)據(jù)庫(kù)事務(wù)之外還是數(shù)據(jù)庫(kù)事務(wù)之內(nèi)?
下面分析下可能存在的問(wèn)題:
- 消息放到數(shù)據(jù)庫(kù)事務(wù)之內(nèi)
- 事務(wù)處理異常,回滾事務(wù)——ok
- 消息發(fā)送異常,回滾事務(wù)——ok
- 消息發(fā)送成功提交事務(wù)——ok
- 消息發(fā)送成功提交失敗——不ok,不好處理,一般MQ也不能撤銷(xiāo)消息,而且消費(fèi)端可能已經(jīng)在處理了
- 消息放到數(shù)據(jù)庫(kù)事務(wù)之外
- 事務(wù)處理異常,回滾事務(wù)——ok,不用發(fā)消息
- 事務(wù)處理成功,發(fā)送消息成功——ok
- 事務(wù)處理成功,消息發(fā)送失敗——不ok,消息丟失
- 在2的基礎(chǔ)上增加本地消息表,放到同一個(gè)數(shù)據(jù)庫(kù),業(yè)務(wù)操作完成之后把需要發(fā)送的MQ消息插入本地消息表中
- 事務(wù)處理異常,回滾事務(wù)——ok,不用發(fā)消息,消息表也回滾
- 事務(wù)處理成功,發(fā)送消息成功
- 更新消息表狀態(tài)成功——ok
- 更新消息表狀態(tài)失敗——ok(定時(shí)任務(wù)補(bǔ)償)
- 事務(wù)處理成功,消息發(fā)送失敗——ok(定時(shí)任務(wù)補(bǔ)償)
- 通過(guò)定時(shí)掃描失敗消息重新發(fā)送MQ
- 重發(fā)消息需保證冪等性——ok
分布式消息事務(wù)處理
常見(jiàn)的處理邏輯是本地消息表+消息重試補(bǔ)償

RabbitTemplate配置和使用
我們使用RabbitMQ作為消息隊(duì)列,因此我們可以使用spring-rabbit幫助實(shí)現(xiàn)mq發(fā)送(前提是已經(jīng)安裝了RabbitMQ了)。
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>${spring-amqp.version}</version>
</dependency>
application.yml配置:
spring:
rabbitmq:
virtual-host: /
username: appws
password: xxxxxx
addresses: 10.181.57.239:5672
publisherConfirms: true
然后就可以注入RabbitTemplate了,代碼片段如下:
// 消息隊(duì)列配置
public static final String TEST_EXCHANGE = "test.exchange";
public static final String TEST_QUEUE = "test.queue";
public static final String TEST_ROUTEKEY = "test.routekey";
@Bean
public Exchange testExchange() {
return new TopicExchange(TEST_EXCHANGE);
}
@Bean
public Queue testQueue() {
return new Queue(TEST_QUEUE, true);
}
@Bean
public Binding testBinding(@Qualifier("testQueue") Queue queue, @Qualifier("testExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(TEST_ROUTEKEY).noargs();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@Transactional
public void test(){
// 一些DB操作
rabbitTemplate.convertAndSend(TEST_EXCHANGE, TEST_ROUTEKEY, param);
// 其他DB操作等
}
攔截器處理RabbitTemplate事務(wù)
? 從上面的代碼片段可以看出,業(yè)務(wù)方法使用了@Transactional注解使用了事務(wù)之后,rabbitTemplate.convertAndSend方法并沒(méi)有專(zhuān)門(mén)放到事務(wù)之外,這個(gè)時(shí)候一旦有異常,可能造成消息發(fā)送成功,但是事務(wù)異常回滾的問(wèn)題。要解決這個(gè)問(wèn)題,需要把rabbitTemplate.convertAndSend移動(dòng)到事務(wù)之外,但是通常都配置的聲明式事務(wù),不能簡(jiǎn)單的把代碼移動(dòng)到外面,這個(gè)需要利用Spring事務(wù)的一個(gè)特性TransactionSynchronization,注冊(cè)一個(gè)同步鉤子,自動(dòng)把相關(guān)代碼放到事務(wù)完成之后執(zhí)行,我們使用攔截器攔截rabbitTemplate.convertAndSend方法,實(shí)現(xiàn)不用修改現(xiàn)有代碼自動(dòng)把發(fā)送MQ消息邏輯移到事務(wù)之外:
RabbitTemplateTransactionInterceptor.java代碼詳情:
@Aspect
@Order(50)
@Component
public class RabbitTemplateTransactionInterceptor {
/**
* 日志
*/
private static final Logger logger = LoggerFactory.getLogger(RabbitTemplateTransactionInterceptor.class);
/**
* 代理convertAndSend方法夠用
*/
@Pointcut("execution(* org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(String, String, Object))")
public void convertAndSend() {
// noop
}
@Around("convertAndSend()")
public void aroundMethod(ProceedingJoinPoint joinPoint) throws Throwable {
Object[] args = joinPoint.getArgs();
if (TransactionSynchronizationManager.isSynchronizationActive()
&& TransactionSynchronizationManager.isActualTransactionActive() // 事務(wù)開(kāi)啟判斷
&& args.length == 3) {
logger.info("攔截RabbitTemplate發(fā)送:{}", args);
// 注冊(cè)同步器
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() { // 事務(wù)提交之后執(zhí)行
try {
joinPoint.proceed();
} catch (Throwable throwable) {
throw new RuntimeException(throwable);
}
}
});
} else { // 沒(méi)有開(kāi)啟事務(wù)或者參數(shù)不正確就直接執(zhí)行,不處理
joinPoint.proceed();
}
}
}
注:定時(shí)掃描和消息重試在另外的邏輯中。