前提
分布式事務(wù)是微服務(wù)實(shí)踐中一個(gè)比較棘手的問題,在筆者所實(shí)施的微服務(wù)實(shí)踐方案中,都采用了折中或者規(guī)避強(qiáng)一致性的方案。 參考Ebay多年前提出的本地消息表方案,基于RabbitMQ和MySQL(JDBC)做了輕量級(jí)的封裝,實(shí)現(xiàn)了低入侵性的事務(wù)消息模塊。 本文的內(nèi)容就是詳細(xì)分析整個(gè)方案的設(shè)計(jì)思路和實(shí)施。 環(huán)境依賴如下:
- JDK1.8+
- spring-boot-start-web:2.x.x、spring-boot-start-jdbc:2.x.x、spring-boot-start-amqp:2.x.x
- HikariCP:3.x.x(spring-boot-start-jdbc自帶)、mysql-connector-java:5.1.48
- redisson:3.12.1
方案設(shè)計(jì)思路
事務(wù)消息原則上只適合弱一致性(或者說「最終一致性」)的場景,常見的弱一致性場景如:
- 用戶服務(wù)完成了注冊(cè)動(dòng)作,向短信服務(wù)推送一條營銷相關(guān)的消息。
- 信貸體系中,訂單服務(wù)保存訂單完畢,向?qū)徟?wù)推送一條待審批的訂單記錄信息。
- ......
「強(qiáng)一致性的場景一般不應(yīng)該選用事務(wù)消息」。

一般情況下,要求強(qiáng)一致性說明要嚴(yán)格同步,也就是所有操作必須同時(shí)成功或者同時(shí)失敗,這樣就會(huì)引入同步帶來的額外消耗。 如果一個(gè)事務(wù)消息模塊設(shè)計(jì)合理,補(bǔ)償、查詢、監(jiān)控等等功能都完畢,由于系統(tǒng)交互是異步的,整體吞吐要比嚴(yán)格同步高。 在筆者負(fù)責(zé)的業(yè)務(wù)系統(tǒng)中基于事務(wù)消息使用還定制了一條基本原則:「消息內(nèi)容正確的前提下,消費(fèi)方出現(xiàn)異常需要自理」。
?
簡單來說就是:上游保證了自身的業(yè)務(wù)正確性,成功推送了正確的消息到RabbitMQ就認(rèn)為上游業(yè)務(wù)已經(jīng)結(jié)束。
?
為了降低代碼的入侵性,事務(wù)消息需要借助Spring的「編程序事務(wù)」或者「聲明式事務(wù)」。 編程序事務(wù)一般依賴于TransactionTemplate,而聲明式事務(wù)依托于AOP模塊,依賴于注解@Transactional。
接著需要自定義一個(gè)事務(wù)消息功能模塊,新增一個(gè)事務(wù)消息記錄表(其實(shí)就是「本地消息表」),用于保存每一條需要發(fā)送的消息記錄。 事務(wù)消息功能模塊的主要功能是:
- 保存消息記錄。
- 推送消息到RabbitMQ服務(wù)端。
- 消息記錄的查詢、補(bǔ)償推送等等。
事務(wù)執(zhí)行的邏輯單元
在事務(wù)執(zhí)行的邏輯單元里面,需要進(jìn)行待推送的事務(wù)消息記錄的保存,也就是:「本地(業(yè)務(wù))邏輯和事務(wù)消息記錄保存操作綁定在同一個(gè)事務(wù)」。

發(fā)送消息到RabbitMQ服務(wù)端這一步需要延后到「事務(wù)提交之后」,這樣才能保證事務(wù)提交成功和消息成功發(fā)送到RabbitMQ服務(wù)端這兩個(gè)操作是一致的。 為了把「保存待發(fā)送的事務(wù)消息」和「發(fā)送消息到RabbitMQ」兩個(gè)動(dòng)作從用戶感知角度合并為一個(gè)動(dòng)作,這里需要用到Spring特有的事務(wù)同步器TransactionSynchronization,這里分析一下事務(wù)同步器的主要方法的回調(diào)位置, 主要參考AbstractPlatformTransactionManager#commit()或者AbstractPlatformTransactionManager#processCommit()方法:

上圖僅僅演示了事務(wù)正確提交的場景(不包含異常的場景)。 這里可以明確知道,事務(wù)同步器TransactionSynchronization的afterCommit()和afterCompletion(int status)方法都在真正的事務(wù)提交點(diǎn)AbstractPlatformTransactionManager#doCommit()之后回調(diào), 因此可以選用這兩個(gè)方法其中之一用于執(zhí)行推送消息到RabbitMQ服務(wù)端,整體的僞代碼如下:
@Transactionalpublic?Dto?businessMethod(){
????business?transaction?code?block?...
????//?保存事務(wù)消息
????[saveTransactionMessageRecord()]
????//?注冊(cè)事務(wù)同步器?-?在afterCommit()方法中推送消息到RabbitMQ
????[register?TransactionSynchronization,send?message?in?method?afterCommit()]
????business?transaction?code?block?...
}上面?zhèn)未a中,「保存事務(wù)消息」和「注冊(cè)事務(wù)同步器」兩個(gè)步驟可以安插在事務(wù)方法中的任意位置,也就是說與執(zhí)行順序無關(guān)。
事務(wù)消息的補(bǔ)償
雖然建議下游服務(wù)自理自身服務(wù)消費(fèi)異常的場景,但是有些時(shí)候迫于無奈還是需要上有把對(duì)應(yīng)的消息重新推送,這算是一個(gè)特殊的場景。另外還有一個(gè)場景需要考慮:事務(wù)提交之后出發(fā)事務(wù)同步器TransactionSynchronization的afterCommit()方法失敗。這是一個(gè)低概率的場景,但是在生產(chǎn)中一定會(huì)出現(xiàn),一個(gè)比較典型的原因就是:
「事務(wù)提交完成后尚未來得及觸發(fā)TransactionSynchronization#afterCommit()方法進(jìn)行推送服務(wù)實(shí)例就被重啟」
如下圖所示:

為了統(tǒng)一處理補(bǔ)償推送的問題,使用了有限狀態(tài)判斷消息是否已經(jīng)推送成功:
- 在事務(wù)方法內(nèi),保存事務(wù)消息的時(shí)候,標(biāo)記消息記錄推送狀態(tài)為「處理中」。
- 事務(wù)同步器接口TransactionSynchronization的afterCommit()方法的實(shí)現(xiàn)中,推送對(duì)應(yīng)的消息到RabbitMQ,然后更變事務(wù)消息記錄的狀態(tài)為「推送成功」。
還有一種極為特殊的情況是RabbitMQ服務(wù)端本身出現(xiàn)故障導(dǎo)致消息推送異常,這種情況下需要進(jìn)行重試(補(bǔ)償推送),「經(jīng)驗(yàn)證明短時(shí)間內(nèi)的反復(fù)重試是沒有意義的」,故障的服務(wù)常規(guī)不會(huì)瞬時(shí)恢復(fù),所以可以考慮使用「指數(shù)退避算法」進(jìn)行重試,同時(shí)需要限制最大重試次數(shù)。

指數(shù)值、間隔值和最大重試次數(shù)上限需要根據(jù)實(shí)際情況設(shè)定,否則容易出現(xiàn)消息延時(shí)過大或者重試過于頻繁等問題。
方案實(shí)施
引入核心依賴:
<properties>
<spring.boot.version>2.2.4.RELEASE</spring.boot.version>
<redisson.version>3.12.1</redisson.version>
<mysql.connector.version>5.1.48</mysql.connector.version></properties><dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring.boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies></dependencyManagement><dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.connector.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>${redisson.version}</version>
</dependency></dependencies>
spring-boot-starter-jdbc、mysql-connector-java和spring-boot-starter-aop是MySQL事務(wù)相關(guān),而spring-boot-starter-amqp是RabbitMQ客戶端的封裝,redisson主要使用其分布式鎖, 用于補(bǔ)償定時(shí)任務(wù)的加鎖執(zhí)行(以防止服務(wù)多個(gè)節(jié)點(diǎn)并發(fā)執(zhí)行補(bǔ)償推送)。
表設(shè)計(jì)
事務(wù)消息模塊主要涉及兩張表,以MySQL為例,見表DDL如下:
CREATE?TABLE?`t_transactional_message`
(
????id??????????????????BIGINT?UNSIGNED?AUTO_INCREMENT?PRIMARY?KEY,
????create_time?????????DATETIME????????NOT?NULL?DEFAULT?CURRENT_TIMESTAMP,
????edit_time???????????DATETIME????????NOT?NULL?DEFAULT?CURRENT_TIMESTAMP?ON?UPDATE?CURRENT_TIMESTAMP,
????creator?????????????VARCHAR(20)?????NOT?NULL?DEFAULT?'admin',
????editor??????????????VARCHAR(20)?????NOT?NULL?DEFAULT?'admin',
????deleted?????????????TINYINT?????????NOT?NULL?DEFAULT?0,
????current_retry_times?TINYINT?????????NOT?NULL?DEFAULT?0?COMMENT?'當(dāng)前重試次數(shù)',
????max_retry_times?????TINYINT?????????NOT?NULL?DEFAULT?5?COMMENT?'最大重試次數(shù)',
????queue_name??????????VARCHAR(255)????NOT?NULL?COMMENT?'隊(duì)列名',
????exchange_name???????VARCHAR(255)????NOT?NULL?COMMENT?'交換器名',
????exchange_type???????VARCHAR(8)??????NOT?NULL?COMMENT?'交換類型',
????routing_key?????????VARCHAR(255)?COMMENT?'路由鍵',
????business_module?????VARCHAR(32)?????NOT?NULL?COMMENT?'業(yè)務(wù)模塊',
????business_key????????VARCHAR(255)????NOT?NULL?COMMENT?'業(yè)務(wù)鍵',
????next_schedule_time??DATETIME????????NOT?NULL?COMMENT?'下一次調(diào)度時(shí)間',
????message_status??????TINYINT?????????NOT?NULL?DEFAULT?0?COMMENT?'消息狀態(tài)',
????init_backoff????????BIGINT?UNSIGNED?NOT?NULL?DEFAULT?10?COMMENT?'退避初始化值,單位為秒',
????backoff_factor??????TINYINT?????????NOT?NULL?DEFAULT?2?COMMENT?'退避因子(也就是指數(shù))',
????INDEX?idx_queue_name?(queue_name),
????INDEX?idx_create_time?(create_time),
????INDEX?idx_next_schedule_time?(next_schedule_time),
????INDEX?idx_business_key?(business_key)
)?COMMENT?'事務(wù)消息表';
CREATE?TABLE?`t_transactional_message_content`
(
????id?????????BIGINT?UNSIGNED?AUTO_INCREMENT?PRIMARY?KEY,
????message_id?BIGINT?UNSIGNED?NOT?NULL?COMMENT?'事務(wù)消息記錄ID',
????content????TEXT?COMMENT?'消息內(nèi)容'
)?COMMENT?'事務(wù)消息內(nèi)容表';因?yàn)榇四K有可能擴(kuò)展出一個(gè)后臺(tái)管理模塊,所以要把消息的管理和狀態(tài)相關(guān)字段和大體積的消息內(nèi)容分別存放在兩個(gè)表,從而避免大批量查詢消息記錄的時(shí)候MySQL服務(wù)IO使用率過高的問題(這是和上一個(gè)公司的DBA團(tuán)隊(duì)商討后得到的一個(gè)比較合理的方案)。 預(yù)留了兩個(gè)業(yè)務(wù)字段business_module和business_key用于標(biāo)識(shí)業(yè)務(wù)模塊和業(yè)務(wù)鍵(一般是唯一識(shí)別號(hào),例如訂單號(hào))。

一般情況下,如果服務(wù)通過配置自行提前聲明隊(duì)列和交換器的綁定關(guān)系,那么發(fā)送RabbitMQ消息的時(shí)候其實(shí)只依賴于exchangeName和routingKey兩個(gè)字段(header類型的交換器是特殊的,也比較少用,這里暫時(shí)不用考慮),考慮到服務(wù)可能會(huì)遺漏聲明操作, 發(fā)送消息的時(shí)候會(huì)基于隊(duì)列進(jìn)行首次綁定聲明并且緩存相關(guān)的信息(RabbitMQ中的隊(duì)列-交換器綁定聲明只要每次聲明綁定關(guān)系的參數(shù)一致,則不會(huì)拋出異常)。
方案代碼設(shè)計(jì)
下面的方案設(shè)計(jì)描述中,暫時(shí)忽略了消息事務(wù)管理后臺(tái)的API設(shè)計(jì),這些可以在后期補(bǔ)充。
定義貧血模型實(shí)體類TransactionalMessage和TransactionalMessageContent:
@Datapublic?class?TransactionalMessage?{
????private?Long?id;
????private?LocalDateTime?createTime;
????private?LocalDateTime?editTime;
????private?String?creator;
????private?String?editor;
????private?Integer?deleted;
????private?Integer?currentRetryTimes;
????private?Integer?maxRetryTimes;
????private?String?queueName;
????private?String?exchangeName;
????private?String?exchangeType;
????private?String?routingKey;
????private?String?businessModule;
????private?String?businessKey;
????private?LocalDateTime?nextScheduleTime;
????private?Integer?messageStatus;
????private?Long?initBackoff;
????private?Integer?backoffFactor;
}
@Datapublic?class?TransactionalMessageContent?{
????private?Long?id;
????private?Long?messageId;
????private?String?content;
}然后定義dao接口(這里暫時(shí)不展開實(shí)現(xiàn)的細(xì)節(jié)代碼,存儲(chǔ)使用MySQL,如果要替換為其他類型的數(shù)據(jù)庫,只需要使用不同的實(shí)現(xiàn)即可):
public interface TransactionalMessageDao {
void insertSelective(TransactionalMessage record);
void updateStatusSelective(TransactionalMessage record);
List<TransactionalMessage> queryPendingCompensationRecords(LocalDateTime minScheduleTime,
LocalDateTime maxScheduleTime,
int limit);
}
public interface TransactionalMessageContentDao {
void insert(TransactionalMessageContent record);
List<TransactionalMessageContent> queryByMessageIds(String messageIds);
}
接著定義事務(wù)消息服務(wù)接口TransactionalMessageService:
//?對(duì)外提供的服務(wù)類接口
public?interface?TransactionalMessageService?{
????void?sendTransactionalMessage(Destination?destination,?TxMessage?message);
}
@Getter@RequiredArgsConstructorpublic?enum?ExchangeType?{
????FANOUT("fanout"),
????DIRECT("direct"),
????TOPIC("topic"),
????DEFAULT(""),
????;
????private?final?String?type;
}
//?發(fā)送消息的目的地public?interface?Destination?{
????ExchangeType?exchangeType();
????String?queueName();
????String?exchangeName();
????String?routingKey();
}
@Builderpublic?class?DefaultDestination?implements?Destination?{
????private?ExchangeType?exchangeType;
????private?String?queueName;
????private?String?exchangeName;
????private?String?routingKey;
????@Override
????public?ExchangeType?exchangeType()?{
????????return?exchangeType;
????}
????@Override
????public?String?queueName()?{
????????return?queueName;
????}
????@Override
????public?String?exchangeName()?{
????????return?exchangeName;
????}
????@Override
????public?String?routingKey()?{
????????return?routingKey;
????}
}
//?事務(wù)消息public?interface?TxMessage?{
????String?businessModule();
????String?businessKey();
????String?content();
}
@Builderpublic?class?DefaultTxMessage?implements?TxMessage?{
????private?String?businessModule;
????private?String?businessKey;
????private?String?content;
????@Override
????public?String?businessModule()?{
????????return?businessModule;
????}
????@Override
????public?String?businessKey()?{
????????return?businessKey;
????}
????@Override
????public?String?content()?{
????????return?content;
????}
}
//?消息狀態(tài)@RequiredArgsConstructor
public?enum?TxMessageStatus?{
????/**
?????*?成功
?????*/
????SUCCESS(1),
????/**
?????*?待處理
?????*/
????PENDING(0),
????/**
?????*?處理失敗
?????*/
????FAIL(-1),
????;
????private?final?Integer?status;
}TransactionalMessageService的實(shí)現(xiàn)類是事務(wù)消息的核心功能實(shí)現(xiàn),代碼如下:
@Slf4j@Service@RequiredArgsConstructorpublic?class?RabbitTransactionalMessageService?implements?TransactionalMessageService?{
????private?final?AmqpAdmin?amqpAdmin;
????private?final?TransactionalMessageManagementService?managementService;
????private?static?final?ConcurrentMap<String,?Boolean>?QUEUE_ALREADY_DECLARE?=?new?ConcurrentHashMap<>();
????@Override
????public?void?sendTransactionalMessage(Destination?destination,?TxMessage?message)?{
????????String?queueName?=?destination.queueName();
????????String?exchangeName?=?destination.exchangeName();
????????String?routingKey?=?destination.routingKey();
????????ExchangeType?exchangeType?=?destination.exchangeType();
????????//?原子性的預(yù)聲明
????????QUEUE_ALREADY_DECLARE.computeIfAbsent(queueName,?k?->?{
????????????Queue?queue?=?new?Queue(queueName);
????????????amqpAdmin.declareQueue(queue);
????????????Exchange?exchange?=?new?CustomExchange(exchangeName,?exchangeType.getType());
????????????amqpAdmin.declareExchange(exchange);
????????????Binding?binding?=?BindingBuilder.bind(queue).to(exchange).with(routingKey).noargs();
????????????amqpAdmin.declareBinding(binding);
????????????return?true;
????????});
????????TransactionalMessage?record?=?new?TransactionalMessage();
????????record.setQueueName(queueName);
????????record.setExchangeName(exchangeName);
????????record.setExchangeType(exchangeType.getType());
????????record.setRoutingKey(routingKey);
????????record.setBusinessModule(message.businessModule());
????????record.setBusinessKey(message.businessKey());
????????String?content?=?message.content();
????????//?保存事務(wù)消息記錄
????????managementService.saveTransactionalMessageRecord(record,?content);
????????//?注冊(cè)事務(wù)同步器
????????TransactionSynchronizationManager.registerSynchronization(new?TransactionSynchronizationAdapter()?{
????????????@Override
????????????public?void?afterCommit()?{
????????????????managementService.sendMessageSync(record,?content);
????????????}
????????});
????}
}消息記錄狀態(tài)和內(nèi)容持久化的管理統(tǒng)一放在TransactionalMessageManagementService中:
@Slf4j@RequiredArgsConstructor@Servicepublic?class?TransactionalMessageManagementService?{
????private?final?TransactionalMessageDao?messageDao;
????private?final?TransactionalMessageContentDao?contentDao;
????private?final?RabbitTemplate?rabbitTemplate;
????private?static?final?LocalDateTime?END?=?LocalDateTime.of(2999,?1,?1,?0,?0,?0);
????private?static?final?long?DEFAULT_INIT_BACKOFF?=?10L;
????private?static?final?int?DEFAULT_BACKOFF_FACTOR?=?2;
????private?static?final?int?DEFAULT_MAX_RETRY_TIMES?=?5;
????private?static?final?int?LIMIT?=?100;
????public?void?saveTransactionalMessageRecord(TransactionalMessage?record,?String?content)?{
????????record.setMessageStatus(TxMessageStatus.PENDING.getStatus());
????????record.setNextScheduleTime(calculateNextScheduleTime(LocalDateTime.now(),?DEFAULT_INIT_BACKOFF,
????????????????DEFAULT_BACKOFF_FACTOR,?0));
????????record.setCurrentRetryTimes(0);
????????record.setInitBackoff(DEFAULT_INIT_BACKOFF);
????????record.setBackoffFactor(DEFAULT_BACKOFF_FACTOR);
????????record.setMaxRetryTimes(DEFAULT_MAX_RETRY_TIMES);
????????messageDao.insertSelective(record);
????????TransactionalMessageContent?messageContent?=?new?TransactionalMessageContent();
????????messageContent.setContent(content);
????????messageContent.setMessageId(record.getId());
????????contentDao.insert(messageContent);
????}
????public?void?sendMessageSync(TransactionalMessage?record,?String?content)?{
????????try?{
????????????rabbitTemplate.convertAndSend(record.getExchangeName(),?record.getRoutingKey(),?content);
????????????if?(log.isDebugEnabled())?{
????????????????log.debug("發(fā)送消息成功,目標(biāo)隊(duì)列:{},消息內(nèi)容:{}",?record.getQueueName(),?content);
????????????}
????????????//?標(biāo)記成功
????????????markSuccess(record);
????????}?catch?(Exception?e)?{
????????????//?標(biāo)記失敗
????????????markFail(record,?e);
????????}
????}
????private?void?markSuccess(TransactionalMessage?record)?{
????????//?標(biāo)記下一次執(zhí)行時(shí)間為最大值
????????record.setNextScheduleTime(END);
????????record.setCurrentRetryTimes(record.getCurrentRetryTimes().compareTo(record.getMaxRetryTimes())?>=?0??
????????????????record.getMaxRetryTimes()?:?record.getCurrentRetryTimes()?+?1);
????????record.setMessageStatus(TxMessageStatus.SUCCESS.getStatus());
????????record.setEditTime(LocalDateTime.now());
????????messageDao.updateStatusSelective(record);
????}
????private?void?markFail(TransactionalMessage?record,?Exception?e)?{
????????log.error("發(fā)送消息失敗,目標(biāo)隊(duì)列:{}",?record.getQueueName(),?e);
????????record.setCurrentRetryTimes(record.getCurrentRetryTimes().compareTo(record.getMaxRetryTimes())?>=?0??
????????????????record.getMaxRetryTimes()?:?record.getCurrentRetryTimes()?+?1);
????????//?計(jì)算下一次的執(zhí)行時(shí)間
????????LocalDateTime?nextScheduleTime?=?calculateNextScheduleTime(
????????????????record.getNextScheduleTime(),
????????????????record.getInitBackoff(),
????????????????record.getBackoffFactor(),
????????????????record.getCurrentRetryTimes()
????????);
????????record.setNextScheduleTime(nextScheduleTime);
????????record.setMessageStatus(TxMessageStatus.FAIL.getStatus());
????????record.setEditTime(LocalDateTime.now());
????????messageDao.updateStatusSelective(record);
????}
????/**
?????*?計(jì)算下一次執(zhí)行時(shí)間
?????*
?????*?@param?base??????????基礎(chǔ)時(shí)間
?????*?@param?initBackoff ??退避基準(zhǔn)值
?????*?@param?backoffFactor?退避指數(shù)
?????*?@param?round?????????輪數(shù)
?????*?@return?LocalDateTime
?????*/
????private?LocalDateTime?calculateNextScheduleTime(LocalDateTime?base,
????????????????????????????????????????????????????long?initBackoff,
????????????????????????????????????????????????????long?backoffFactor,
????????????????????????????????????????????????????long?round)?{
????????double?delta?=?initBackoff?*?Math.pow(backoffFactor,?round);
????????return?base.plusSeconds((long)?delta);
????}
????/**
?????*?推送補(bǔ)償?-?里面的參數(shù)應(yīng)該根據(jù)實(shí)際場景定制
?????*/
????public?void?processPendingCompensationRecords()?{
????????//?時(shí)間的右值為當(dāng)前時(shí)間減去退避初始值,這里預(yù)防把剛保存的消息也推送了
????????LocalDateTime?max?=?LocalDateTime.now().plusSeconds(-DEFAULT_INIT_BACKOFF);
????????//?時(shí)間的左值為右值減去1小時(shí)
????????LocalDateTime?min?=?max.plusHours(-1);
????????Map<Long,?TransactionalMessage>?collect?=?messageDao.queryPendingCompensationRecords(min,?max,?LIMIT)
????????????????.stream()
????????????????.collect(Collectors.toMap(TransactionalMessage::getId,?x?->?x));
????????if?(!collect.isEmpty())?{
????????????StringJoiner?joiner?=?new?StringJoiner(",",?"(",?")");
????????????collect.keySet().forEach(x?->?joiner.add(x.toString()));
????????????contentDao.queryByMessageIds(joiner.toString())
????????????????????.forEach(item?->?{
????????????????????????TransactionalMessage?message?=?collect.get(item.getMessageId());
????????????????????????sendMessageSync(message,?item.getContent());
????????????????????});
????????}
????}
}這里有一點(diǎn)尚待優(yōu)化:更新事務(wù)消息記錄狀態(tài)的方法可以優(yōu)化為批量更新,在limit比較大的時(shí)候,批量更新的效率會(huì)更高。
最后是定時(shí)任務(wù)的配置類:
@Slf4j@RequiredArgsConstructor@Configuration@EnableSchedulingpublic?class?ScheduleJobAutoConfiguration?{
????private?final?TransactionalMessageManagementService?managementService;
????/**
?????*?這里用的是本地的Redis,實(shí)際上要做成配置
?????*/
????private?final?RedissonClient?redisson?=?Redisson.create();
????@Scheduled(fixedDelay?=?10000)
????public?void?transactionalMessageCompensationTask()?throws?Exception?{
????????RLock?lock?=?redisson.getLock("transactionalMessageCompensationTask");
????????//?等待時(shí)間5秒,預(yù)期300秒執(zhí)行完畢,這兩個(gè)值需要按照實(shí)際場景定製
????????boolean?tryLock?=?lock.tryLock(5,?300,?TimeUnit.SECONDS);
????????if?(tryLock)?{
????????????try?{
????????????????long?start?=?System.currentTimeMillis();
????????????????log.info("開始執(zhí)行事務(wù)消息推送補(bǔ)償定時(shí)任務(wù)...");
????????????????managementService.processPendingCompensationRecords();
????????????????long?end?=?System.currentTimeMillis();
????????????????long?delta?=?end?-?start;
????????????????//?以防鎖過早釋放
????????????????if?(delta?<?5000)?{
????????????????????Thread.sleep(5000?-?delta);
????????????????}
????????????????log.info("執(zhí)行事務(wù)消息推送補(bǔ)償定時(shí)任務(wù)完畢,耗時(shí):{}?ms...",?end?-?start);
????????????}?finally?{
????????????????lock.unlock();
????????????}
????????}
????}
}基本代碼編寫完,整個(gè)項(xiàng)目的結(jié)構(gòu)如下:

最后添加兩個(gè)測試類:
@RequiredArgsConstructor@Componentpublic?class?MockBusinessRunner?implements?CommandLineRunner?{
????private?final?MockBusinessService?mockBusinessService;
????@Override
????public?void?run(String...?args)?throws?Exception?{
????????mockBusinessService.saveOrder();
????}
}
@Slf4j@RequiredArgsConstructor@Servicepublic?class?MockBusinessService?{
????private?final?JdbcTemplate?jdbcTemplate;
????private?final?TransactionalMessageService?transactionalMessageService;
????private?final?ObjectMapper?objectMapper;
????@Transactional(rollbackFor?=?Exception.class)
????public?void?saveOrder()?throws?Exception?{
????????String?orderId?=?UUID.randomUUID().toString();
????????BigDecimal?amount?=?BigDecimal.valueOf(100L);
????????Map<String,?Object>?message?=?new?HashMap<>();
????????message.put("orderId",?orderId);
????????message.put("amount",?amount);
????????jdbcTemplate.update("INSERT?INTO?t_order(order_id,amount)?VALUES?(?,?)",?p?->?{
????????????p.setString(1,?orderId);
????????????p.setBigDecimal(2,?amount);
????????});
????????String?content?=?objectMapper.writeValueAsString(message);
????????transactionalMessageService.sendTransactionalMessage(
????????????????DefaultDestination.builder()
????????????????????????.exchangeName("tm.test.exchange")
????????????????????????.queueName("tm.test.queue")
????????????????????????.routingKey("tm.test.key")
????????????????????????.exchangeType(ExchangeType.DIRECT)
????????????????????????.build(),
????????????????DefaultTxMessage.builder()
????????????????????????.businessKey(orderId)
????????????????????????.businessModule("SAVE_ORDER")
????????????????????????.content(content)
????????????????????????.build()
????????);
????????log.info("保存訂單:{}成功...",?orderId);
????}
}某次測試結(jié)果如下:
2020-02-05 21:10:13.287 INFO 49556 --- [ main] club.throwable.cm.MockBusinessService : 保存訂單:07a75323-460b-42cb-aa63-1a0a45ce19bf成功...

模擬訂單數(shù)據(jù)成功保存,而且RabbitMQ消息在事務(wù)成功提交后正常發(fā)送到RabbitMQ服務(wù)端中,如RabbitMQ控制臺(tái)數(shù)據(jù)所示。
小結(jié)
事務(wù)消息模塊的設(shè)計(jì)僅僅是使異步消息推送這個(gè)功能實(shí)現(xiàn)趨向于完備,其實(shí)一個(gè)合理的異步消息交互系統(tǒng),一定會(huì)提供同步查詢接口,這一點(diǎn)是基于異步消息沒有回調(diào)或者沒有響應(yīng)的特性導(dǎo)致的。 一般而言,一個(gè)系統(tǒng)的吞吐量和系統(tǒng)的異步化處理占比成正相關(guān)(這一點(diǎn)可以參考Amdahl's Law),所以在系統(tǒng)架構(gòu)設(shè)計(jì)實(shí)際中應(yīng)該盡可能使用異步交互,提高系統(tǒng)吞吐量同時(shí)減少同步阻塞帶來的無謂等待。 事務(wù)消息模塊可以擴(kuò)展出一個(gè)后臺(tái)管理,甚至可以配合Micrometer、Prometheus和Grafana體系做實(shí)時(shí)數(shù)據(jù)監(jiān)控。
本文使用 文章同步助手 同步