基于Spring的RabbitTemplate實(shí)現(xiàn)消息事務(wù)

基于Spring的RabbitTemplate實(shí)現(xiàn)消息事務(wù)

分布式系統(tǒng)中常見(jiàn)一種情況,就是數(shù)據(jù)庫(kù)操作成功之后發(fā)送MQ消息。

分布式消息常見(jiàn)問(wèn)題

數(shù)據(jù)庫(kù)操作之后發(fā)送MQ消息通常會(huì)遇到一些問(wèn)題,理論上消息和事務(wù)要同時(shí)成功才算一個(gè)完整的事務(wù),那到底該把發(fā)送MQ放到數(shù)據(jù)庫(kù)事務(wù)之外還是數(shù)據(jù)庫(kù)事務(wù)之內(nèi)?

下面分析下可能存在的問(wèn)題:

  1. 消息放到數(shù)據(jù)庫(kù)事務(wù)之內(nèi)
    1. 事務(wù)處理異常,回滾事務(wù)——ok
    2. 消息發(fā)送異常,回滾事務(wù)——ok
    3. 消息發(fā)送成功提交事務(wù)——ok
    4. 消息發(fā)送成功提交失敗——不ok,不好處理,一般MQ也不能撤銷(xiāo)消息,而且消費(fèi)端可能已經(jīng)在處理了
  2. 消息放到數(shù)據(jù)庫(kù)事務(wù)之外
    1. 事務(wù)處理異常,回滾事務(wù)——ok,不用發(fā)消息
    2. 事務(wù)處理成功,發(fā)送消息成功——ok
    3. 事務(wù)處理成功,消息發(fā)送失敗——不ok,消息丟失
  3. 在2的基礎(chǔ)上增加本地消息表,放到同一個(gè)數(shù)據(jù)庫(kù),業(yè)務(wù)操作完成之后把需要發(fā)送的MQ消息插入本地消息表中
    1. 事務(wù)處理異常,回滾事務(wù)——ok,不用發(fā)消息,消息表也回滾
    2. 事務(wù)處理成功,發(fā)送消息成功
      1. 更新消息表狀態(tài)成功——ok
      2. 更新消息表狀態(tài)失敗——ok(定時(shí)任務(wù)補(bǔ)償)
    3. 事務(wù)處理成功,消息發(fā)送失敗——ok(定時(shí)任務(wù)補(bǔ)償)
    4. 通過(guò)定時(shí)掃描失敗消息重新發(fā)送MQ
      1. 重發(fā)消息需保證冪等性——ok

分布式消息事務(wù)處理

常見(jiàn)的處理邏輯是本地消息表+消息重試補(bǔ)償

image

RabbitTemplate配置和使用

我們使用RabbitMQ作為消息隊(duì)列,因此我們可以使用spring-rabbit幫助實(shí)現(xiàn)mq發(fā)送(前提是已經(jīng)安裝了RabbitMQ了)。

      <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>${spring-amqp.version}</version>
      </dependency>

application.yml配置:

spring:
  rabbitmq:
    virtual-host: /
    username: appws
    password: xxxxxx
    addresses: 10.181.57.239:5672
    publisherConfirms: true

然后就可以注入RabbitTemplate了,代碼片段如下:

// 消息隊(duì)列配置
public static final String TEST_EXCHANGE = "test.exchange";
public static final String TEST_QUEUE = "test.queue";
public static final String TEST_ROUTEKEY = "test.routekey";
@Bean
public Exchange testExchange() {
    return new TopicExchange(TEST_EXCHANGE);
}
@Bean
public Queue testQueue() {
    return new Queue(TEST_QUEUE, true);
}
@Bean
public Binding testBinding(@Qualifier("testQueue") Queue queue, @Qualifier("testExchange") Exchange exchange) {
    return BindingBuilder.bind(queue).to(exchange).with(TEST_ROUTEKEY).noargs();
}
@Autowired
private RabbitTemplate rabbitTemplate;
@Transactional
public void test(){
    // 一些DB操作
    rabbitTemplate.convertAndSend(TEST_EXCHANGE, TEST_ROUTEKEY, param);
    // 其他DB操作等
}

攔截器處理RabbitTemplate事務(wù)

? 從上面的代碼片段可以看出,業(yè)務(wù)方法使用了@Transactional注解使用了事務(wù)之后,rabbitTemplate.convertAndSend方法并沒(méi)有專(zhuān)門(mén)放到事務(wù)之外,這個(gè)時(shí)候一旦有異常,可能造成消息發(fā)送成功,但是事務(wù)異常回滾的問(wèn)題。要解決這個(gè)問(wèn)題,需要把rabbitTemplate.convertAndSend移動(dòng)到事務(wù)之外,但是通常都配置的聲明式事務(wù),不能簡(jiǎn)單的把代碼移動(dòng)到外面,這個(gè)需要利用Spring事務(wù)的一個(gè)特性TransactionSynchronization,注冊(cè)一個(gè)同步鉤子,自動(dòng)把相關(guān)代碼放到事務(wù)完成之后執(zhí)行,我們使用攔截器攔截rabbitTemplate.convertAndSend方法,實(shí)現(xiàn)不用修改現(xiàn)有代碼自動(dòng)把發(fā)送MQ消息邏輯移到事務(wù)之外:

RabbitTemplateTransactionInterceptor.java代碼詳情:

@Aspect
@Order(50)
@Component
public class RabbitTemplateTransactionInterceptor {
    /**
     * 日志
     */
    private static final Logger logger = LoggerFactory.getLogger(RabbitTemplateTransactionInterceptor.class);
    /**
     * 代理convertAndSend方法夠用
     */
    @Pointcut("execution(* org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(String, String, Object))")
    public void convertAndSend() {
        // noop
    }
    @Around("convertAndSend()")
    public void aroundMethod(ProceedingJoinPoint joinPoint) throws Throwable {
        Object[] args = joinPoint.getArgs();
        if (TransactionSynchronizationManager.isSynchronizationActive()
                && TransactionSynchronizationManager.isActualTransactionActive() // 事務(wù)開(kāi)啟判斷
                && args.length == 3) {
            logger.info("攔截RabbitTemplate發(fā)送:{}", args);
            // 注冊(cè)同步器
            TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
                @Override
                public void afterCommit() { // 事務(wù)提交之后執(zhí)行
                    try {
                        joinPoint.proceed();
                    } catch (Throwable throwable) {
                        throw new RuntimeException(throwable);
                    }
                }
            });
        } else { // 沒(méi)有開(kāi)啟事務(wù)或者參數(shù)不正確就直接執(zhí)行,不處理
            joinPoint.proceed();
        }
    }
}

注:定時(shí)掃描和消息重試在另外的邏輯中。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容