分布式事務(wù)設(shè)計
場景
在業(yè)務(wù)中有一處需要用戶為訂單付款,該業(yè)務(wù)會修改用戶庫的balance(用戶余額表),扣減用戶的余額,然后會修改訂單庫的order(訂單表)和enterprise(企業(yè)余額表),將訂單狀態(tài)設(shè)置為已被支付,并增加企業(yè)的余額。這里就同時修改多個數(shù)據(jù)庫,涉及到了分布式事務(wù)的問題。我最終是使用了RocketMQ的事務(wù)消息,并從外圍解決了消息回查的問題。
他人思路
在設(shè)計我的解決方案前嘗試搜索了一下別人的實現(xiàn) 傳送門。他的解決方案是在producer和consumer方設(shè)置了兩個scheduler,感覺是有些復(fù)雜的。我是在其基礎(chǔ)上進(jìn)行了簡化,并解決了一些其他問題,使得整個解決方案比較完整和邏輯自洽。
我的設(shè)計
A和B是兩個Service,A執(zhí)行本地事務(wù),B執(zhí)行遠(yuǎn)程事務(wù)。A會調(diào)用B的遠(yuǎn)程服務(wù),完成整個業(yè)務(wù)。就本項目而言,A就是用戶模塊的AccountService,B就是訂單模塊的OrderService。A和B都有一張表,存儲著消息數(shù)據(jù)。從MQ的視角看來,A是消息的Producer,B是消息的Consumer。
A(本地事務(wù)執(zhí)行方,MQProducer)
db
producer_msg(msgId,body,message_status,create_time,update_time,send_times,topic) msgId這里為orderIdmq
作為producer時,注冊Topic account:當(dāng)執(zhí)行本地事務(wù)時同時插入producer_msg,默認(rèn)status都是未被消費。如果本地事務(wù)執(zhí)行失敗,那么直接回滾,不插入。當(dāng)消息發(fā)送失敗時,我們已經(jīng)在producer_msg插入了記錄,可以進(jìn)行回查。scheduler
A需要同步B的數(shù)據(jù)庫,使得兩個數(shù)據(jù)庫數(shù)據(jù)一致,不同的即為確認(rèn)信息發(fā)送失敗的。
消息狀態(tài)有未被消費、已被消費、消費失敗、超過消費失敗的重試次數(shù)、超過確認(rèn)消息發(fā)送失敗的重試次數(shù)和已被回滾。
A和B數(shù)據(jù)庫同步維護(hù)所有消息,只是A數(shù)據(jù)庫保存內(nèi)容更多,比如會保存消息的body。
如果消息已經(jīng)是超過重試次數(shù)或已被消費,那么A不會再去考慮它。
A的Scheduler會遍歷A數(shù)據(jù)庫,找出未被消費和消費失敗的id且創(chuàng)建時間距離當(dāng)前時間超過1min,發(fā)送給B。
B會遍歷這些id
for(id in ids){
如果 id 不存在,說明確認(rèn)消息發(fā)送失敗,
如果 id 存在,則將該id對應(yīng)的status一并返回,map.put(id,status)
}
A 接收到map后,keySet取得所有id,拿發(fā)送過去的id減去這些id(差集),就是確認(rèn)消息發(fā)送失敗的消息,進(jìn)行重新發(fā)送;遍歷map,將本地數(shù)據(jù)庫同步為B數(shù)據(jù)庫。
這個方法可能會出現(xiàn)消息重復(fù),因為A剛發(fā)送消息,B該沒有處理,A的Scheduler就去查詢了,當(dāng)然消息都沒有被消費,因為A會重發(fā)剛才的消息,但是B有做消息去重,所以不會影響。
B(遠(yuǎn)程事務(wù)執(zhí)行方,MQConsumer)
db
consumer_msg(msgId,create_time. message_status,topic) msgId這里是orderIdmq
作為consumer,注冊Topic account:
當(dāng)接收到消息后,查詢是否被執(zhí)行過,如果沒有被消費過(id未找到)或者消費失敗了(這里解決了消息重復(fù)消費的問題),則執(zhí)行遠(yuǎn)程事務(wù)后插入/更新consumer_ msg(status為已被消費),已被消費則跳過。
遠(yuǎn)程事務(wù)執(zhí)行失敗時,插入/更新consumer_ msg(status為消費失?。?br> 超過重試消費次數(shù)的消息也更新consumer_ msg,status為超過消費的重試次數(shù)。
B這里就維護(hù)它所接收的消息的狀態(tài)。
消息表
在producer這一方設(shè)計了producer_transaction_message表。
- msgId是消息唯一id,可以采用業(yè)務(wù)上的id來實現(xiàn),比如訂單id。
- body是消息體,比如訂單對象的序列化結(jié)果。
- message_status是消息狀態(tài)
- update_time是最后更新記錄時間
- create_time是消息創(chuàng)建時間
- send_times是確認(rèn)消息重復(fù)發(fā)送次數(shù)
- topic是消息主題,這里均為account
在consumer這一方設(shè)計了consumer_transaction_message表。
看得出來是producer的表的部分列,其含義也是相同的。
分布式事務(wù)實現(xiàn)代碼
Producer方
MQProducerConfig(配置MQProducer)
@Configuration
@Slf4j
@Getter
public class MQProducerConfig {
@Value("${spring.rocketmq.group-name}")
private String groupName;
@Value("${spring.rocketmq.namesrv-addr}")
private String namesrvAddr;
@Value("${spring.rocketmq.topic}")
private String topic;
@Value("${spring.rocketmq.confirm-message-faiure-retry-times}")
private Integer retryTimes;
public static final Integer CHECK_GAP = 1;
@Bean
public MQProducer mqProducer() throws MQClientException {
TransactionMQProducer producer = new TransactionMQProducer(groupName);
producer.setNamesrvAddr(namesrvAddr);
producer.setTransactionCheckListener(new TransactionCheckListener() {
@Override
public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
// doNothing
return LocalTransactionState.COMMIT_MESSAGE;
}
});
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
producer.shutdown();
}
}));
producer.start();
log.info("producer started!");
return producer;
}
}
AccountLocalTransactionExecutor(執(zhí)行本地事務(wù))
@Component
@Slf4j
public class AccountLocalTransactionExecutor implements LocalTransactionExecuter {
@Autowired
private PayService payService;
@Autowired
private ProducerTransactionMessageService messageService;
@Override
public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
try {
String paymentPassword = (String) arg;
OrderDO order = ProtoStuffUtil.deserialize(msg.getBody(), OrderDO.class);
if (order.getOrderStatus() != OrderStatus.UNPAID) {
log.info("{} 訂單狀態(tài)不為unpaid", order.getId());
throw new OrderStateIllegalException(order.getOrderStatus().toString());
}
// 本地事務(wù),減少用戶賬戶余額
// 拋出異常時會進(jìn)行回滾,下面構(gòu)造消息存儲到數(shù)據(jù)庫也不會被執(zhí)行
payService.decreaseAccount(order.getUser().getId(), order.getTotalPrice(), paymentPassword);
// 保存消息至數(shù)據(jù)庫
ProducerTransactionMessageDO messageDO = ProducerTransactionMessageDO.builder()
.id(order.getId())
.body(msg.getBody())
.createTime(LocalDateTime.now())
.updateTime(LocalDateTime.now())
.messageStatus(MessageStatus.UNCONSUMED)
.topic(msg.getTopic())
.sendTimes(0)
.build();
messageService.save(messageDO);
// 成功通知MQ消息變更 該消息變?yōu)椋?lt;確認(rèn)發(fā)送>
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
e.printStackTrace();
log.info("本地事務(wù)執(zhí)行失敗,直接回滾!");
// 失敗則不通知MQ 該消息一直處于:<暫緩發(fā)送>
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
}
AccountServiceImpl(Producer支付業(yè)務(wù)入口)
@Service
@Slf4j
public class AccountServiceImpl implements AccountService {
@Autowired
private MQProducerConfig config;
@Autowired
private MQProducer producer;
@Autowired
private AccountLocalTransactionExecutor executor;
@Autowired
private ProducerTransactionMessageService messageService;
@Autowired
private PayService payService;
@Override
public void commit(OrderDO order, String paymentPassword) {
Message message = new Message();
message.setTopic(config.getTopic());
message.setBody(ProtoStuffUtil.serialize(order));
TransactionSendResult result = null;
try {
result = this.producer.sendMessageInTransaction(message, executor, paymentPassword);
log.info("事務(wù)消息發(fā)送結(jié)果:{}", result);
log.info("TransactionState:{} ", result.getLocalTransactionState());
// 因為無法獲得executor中拋出的異常,只能模糊地返回訂單支付失敗信息。
// TODO 想辦法從executor中找到原生異常
} catch (Exception e) {
log.info("AccountService拋出異常...");
e.printStackTrace();
}
if (result.getLocalTransactionState() == LocalTransactionState.ROLLBACK_MESSAGE) {
throw new OrderPaymentException(order.getId());
}
}
@Transactional
@Override
public void rollback(ProducerTransactionMessageDO message) {
OrderDO order = ProtoStuffUtil.deserialize(message.getBody(), OrderDO.class);
message.setMessageStatus(MessageStatus.ROLLBACK);
message.setUpdateTime(LocalDateTime.now());
messageService.update(message);
payService.increaseAccount(order.getUser().getId(), order.getTotalPrice());
}
}
TransactionCheckScheduler(消息回查)
@Component
public class TransactionCheckScheduler {
@Autowired
private ProducerTransactionMessageService messageService;
/**
* 每分鐘執(zhí)行一次事務(wù)回查
*/
@Scheduled(fixedRate = 60 * 1000)
public void checkTransactionMessage(){
messageService.check();
}
}
ProducerTransactionMessageServiceImpl(Producer消息服務(wù)提供者)
@Slf4j
public class ProducerTransactionMessageServiceImpl implements ProducerTransactionMessageService {
@Autowired
private MQProducer producer;
@Autowired
private MQProducerConfig config;
@Autowired
private ProductTransactionMessageDOMapper mapper;
@Autowired
private ConsumerTransactionMessageService consumerTransactionMessageService;
@Transactional
@Override
public void save(ProducerTransactionMessageDO message) {
mapper.insert(message);
}
@Transactional
@Override
public void check() {
List<Long> all = mapper.findMessageIdsByStatusCreatedAfter(Arrays.asList(MessageStatus.UNCONSUMED, MessageStatus.CONSUME_FAILED), MQProducerConfig.CHECK_GAP);
Map<Long, MessageStatus> statusMap = consumerTransactionMessageService.findConsumerMessageStatuses(all);
for (Map.Entry<Long, MessageStatus> entry : statusMap.entrySet()) {
mapper.updateByPrimaryKeySelective(ProducerTransactionMessageDO.builder().id(entry.getKey()).messageStatus(entry.getValue()).updateTime(LocalDateTime.now()).build());
}
all.removeAll(statusMap.keySet());
// 此時all為確認(rèn)消息發(fā)送失敗的
this.reSend(mapper.selectBatchByPrimaryKeys(all));
}
@Transactional
@Override
public void reSend(List<ProducerTransactionMessageDO> messages) {
for (ProducerTransactionMessageDO messageDO : messages) {
if (messageDO.getSendTimes() == config.getRetryTimes()) {
messageDO.setUpdateTime(LocalDateTime.now());
messageDO.setMessageStatus(MessageStatus.OVER_CONFIRM_RETRY_TIME);
mapper.updateByPrimaryKeySelective(messageDO);
continue;
}
Message message = new Message();
message.setTopic(config.getTopic());
message.setBody(messageDO.getBody());
try {
SendResult result = producer.send(message);
messageDO.setSendTimes(messageDO.getSendTimes() + 1);
messageDO.setUpdateTime(LocalDateTime.now());
mapper.updateByPrimaryKeySelective(messageDO);
log.info("發(fā)送重試消息完畢,Message:{},result:{}", message, result);
} catch (Exception e) {
e.printStackTrace();
log.info("發(fā)送重試消息時失敗! Message:{}", message);
}
}
}
@Transactional
@Override
public void delete(Long id) {
mapper.deleteByPrimaryKey(id);
}
@Transactional(readOnly = true)
@Override
public List<ProducerTransactionMessageDO> findByIds(List<Long> ids) {
return mapper.selectBatchByPrimaryKeys(ids);
}
@Transactional(readOnly = true)
@Override
public PageInfo<ProducerTransactionMessageDO> findByQueryDTO(MessageQueryConditionDTO dto) {
return mapper.findByCondition(dto, dto.getPageNum(), dto.getPageSize()).toPageInfo();
}
@Override
public void update(ProducerTransactionMessageDO message) {
mapper.updateByPrimaryKeySelective(message);
}
}
Consumer
MQConsumerConfig(配置MQConsumer)
@Configuration
@Slf4j
@Getter
public class MQConsumerConfig {
private DefaultMQPushConsumer consumer;
@Value("${spring.rocketmq.group-name}")
private String groupName;
@Value("${spring.rocketmq.namesrv-addr}")
private String namesrvAddr;
@Value("${spring.rocketmq.topic}")
private String topic;
@Autowired
private AccountMessageListener accountMessageListener;
@Value("${spring.rocketmq.consume-failure-retry-times}")
private Integer retryTimes;
@PostConstruct
public void init() throws MQClientException {
this.consumer = new DefaultMQPushConsumer(groupName);
this.consumer.setNamesrvAddr(namesrvAddr);
// 啟動后從隊列頭部開始消費
this.consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
this.consumer.subscribe(topic, "*");
this.consumer.registerMessageListener(accountMessageListener);
this.consumer.start();
log.info("consumer started!");
}
}
AccountMessageListener(消息接收方)
@Component
@Slf4j
public class AccountMessageListener implements MessageListenerConcurrently {
@Autowired
private OrderService orderService;
@Autowired
@Qualifier("consumerTransactionMessageService")
private ConsumerTransactionMessageService messageService;
@Autowired
private MQConsumerConfig config;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
log.info("接收到消息數(shù)量為:{}", msgs.size());
for (MessageExt msg : msgs) {
ConsumerTransactionMessageDO messageDO = null;
OrderDO order = null;
try {
String topic = msg.getTopic();
String keys = msg.getKeys();
order = ProtoStuffUtil.deserialize(msg.getBody(), OrderDO.class);
log.info("消費者接收到消息:topic: {}, keys:{} , order: {}", topic, keys, order);
// 如果已經(jīng)被消費過并且消費成功,那么不再重復(fù)消費(未被消費->id不存在或消費失敗或超過重試次數(shù)的都會繼續(xù)消費)
if(messageService.isMessageConsumedSuccessfully(order.getId())){
continue;
}
messageDO = ConsumerTransactionMessageDO.builder()
.id(order.getId())
.createTime(LocalDateTime.now())
.topic(msg.getTopic())
.build();
// 業(yè)務(wù)邏輯處理
orderService.finishOrder(order);
// 如果業(yè)務(wù)邏輯拋出異常,那么會跳過插入CONSUMED
messageDO.setMessageStatus(MessageStatus.CONSUMED);
// 如果是未被消費,第一次就消費成功了,則插入
// 如果是超過重試次數(shù),又人工設(shè)置重試,則更新狀態(tài)為已被消費
messageService.insertOrUpdate(messageDO);
} catch (Exception e) {
e.printStackTrace();
// 重試次數(shù)達(dá)到最大重試次數(shù)
if (msg.getReconsumeTimes() == config.getRetryTimes()) {
log.info("客戶端重試三次,需要人工處理");
messageService.update(
ConsumerTransactionMessageDO.builder()
.id(order.getId())
.messageStatus(MessageStatus.OVER_CONSUME_RETRY_TIME).build()
);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} else {
log.info("消費失敗,進(jìn)行重試,當(dāng)前重試次數(shù)為: {}", msg.getReconsumeTimes());
messageDO.setMessageStatus(MessageStatus.CONSUME_FAILED);
// 如果第一次消費失敗,那么插入
// 如果之前消費失敗,繼續(xù)重試,那么doNothing
// 如果之前是超過重試次數(shù),人工設(shè)置重試,那么將狀態(tài)改為消費失敗
messageService.insertOrUpdate(messageDO);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
ConsumerTransactionMessageServiceImpl(Consumer消息服務(wù)提供者)
public class ConsumerTransactionMessageServiceImpl implements ConsumerTransactionMessageService {
@Autowired
private ConsumerTransactionMessageDOMapper mapper;
@Transactional(readOnly = true)
@Override
public Map<Long, MessageStatus> findConsumerMessageStatuses(List<Long> ids) {
Map<Long, MessageStatus> result = new HashMap<>();
for (Long id : ids) {
MessageStatus status = mapper.findStatusById(id);
if (status != null) {
result.put(id, status);
}
}
return result;
}
@Transactional(readOnly = true)
@Override
public ConsumerTransactionMessageDO selectByPrimaryKey(Long id) {
return mapper.selectByPrimaryKey(id);
}
@Transactional
@Override
public void insert(ConsumerTransactionMessageDO record) {
mapper.insert(record);
}
@Override
public void insertOrUpdate(ConsumerTransactionMessageDO record) {
ConsumerTransactionMessageDO recordInDB = mapper.selectByPrimaryKey(record.getId());
if (recordInDB == null) {
mapper.insert(record);
} else {
recordInDB.setMessageStatus(record.getMessageStatus());
mapper.updateByPrimaryKeySelective(recordInDB);
}
}
@Transactional
@Override
public void insertIfNotExists(ConsumerTransactionMessageDO record) {
if (mapper.selectByPrimaryKey(record.getId()) == null) {
mapper.insert(record);
}
}
@Transactional
@Override
public void update(ConsumerTransactionMessageDO record) {
mapper.updateByPrimaryKeySelective(record);
}
@Transactional(readOnly = true)
@Override
public boolean isMessageConsumedSuccessfully(Long id) {
MessageStatus status = mapper.findStatusById(id);
return status == MessageStatus.CONSUMED;
}
}
消息管理
尚需提供一個消息的監(jiān)控平臺,可以搜索和查看消息的狀態(tài),尤其是需要人工處理的死信,可以回滾本地事務(wù)或重新發(fā)送。
界面類似于下圖:
當(dāng)前僅開發(fā)了消息管理系統(tǒng)的數(shù)據(jù)接口,尚未開發(fā)其客戶端。
@RestController
@RequestMapping("/message_console")
public class MessageConsoleController {
@Autowired
private ProducerTransactionMessageService messageService;
@Autowired
private AccountService accountService;
@RequestMapping(value = "/query", method = RequestMethod.POST)
public PageInfo<ProducerTransactionMessageDO> findByQueryDTO(@RequestBody MessageQueryConditionDTO queryDTO) {
if (queryDTO.getPageNum() == null || queryDTO.getPageNum() <= 0) {
queryDTO.setPageNum(Integer.valueOf(PageProperties.DEFAULT_PAGE_NUM));
}
if (queryDTO.getPageSize() == null || queryDTO.getPageSize() <= 0) {
queryDTO.setPageSize(Integer.valueOf(PageProperties.DEFAULT_PAGE_SIZE));
}
return messageService.findByQueryDTO(queryDTO);
}
@RequestMapping(value = "/reSend", method = RequestMethod.POST)
public void reSend(@RequestBody MessageIdDTO dto) {
List<ProducerTransactionMessageDO> messages = messageService.findByIds(dto.getIds());
for (ProducerTransactionMessageDO messageDO : messages) {
messageDO.setMessageStatus(MessageStatus.UNCONSUMED);
messageDO.setSendTimes(0);
}
messageService.reSend(messages);
}
@RequestMapping(value = "/rollback", method = RequestMethod.POST)
public void rollback(@RequestBody MessageIdDTO dto) {
for (ProducerTransactionMessageDO message : messageService.findByIds(dto.getIds())) {
accountService.rollback(message);
}
}
}
總結(jié)
自上次開發(fā)完SpringBootSOASkeleton之后,就一直希望能完成一個數(shù)據(jù)庫按業(yè)務(wù)分庫和分布式事務(wù)的項目。大概花了兩周,大概嘗試了TCC和可靠消息最終一致兩種方法,最終解決了分布式事務(wù)的問題。
TCC是我首先采用的技術(shù),使用了Github開源的ByteTCC,但花了很多時間沒有跑通,另外用起來非常復(fù)雜,對業(yè)務(wù)邏輯侵入非常大,最后是放棄了,但也留下來基于ByTeTCC的完成度比較高的代碼,最后以Git的一個tag結(jié)束了它的生命周期。
然后我考慮使用MQ,尤其是原本對事務(wù)消息有所支持的RocketMQ來實現(xiàn)分布式事務(wù)。因為消息回查的功能被閹割,又去閱讀了其源碼和他人考慮的解決方案去實現(xiàn)它。就目前這個解決方案而言,自我感覺是比較完善的,既不是非常復(fù)雜, 又解決了RocketMQ原來存在的很多問題。但因為還是一個學(xué)生,對分布式比較缺乏經(jīng)驗,如果大家能發(fā)現(xiàn)其中存在的問題,也希望在博客下評論或Github提issue。
全部代碼已經(jīng)放到Github上,按照《Linux集群搭建》配置的環(huán)境下,代碼是可以跑通的,只是確認(rèn)消息發(fā)送失敗這種場景很難模擬出來,這也是有待觀察的。
參考資料
大規(guī)模SOA系統(tǒng)中的分布事務(wù)處事-程立
支付寶架構(gòu)與技術(shù)
RocketMQ用戶指南v3.2.4
高并發(fā)下的冪等策略分析
RocketMQ源碼解析
分布式開放消息系統(tǒng)(RocketMQ)的原理與實踐
https://blog.csdn.net/songxinjianqwe/article/details/78923482