1. 分布式事務(wù)
所謂事務(wù),通俗一點(diǎn)講就是一系列操作要么同時(shí)成功,要么同時(shí)失敗。而分布式事務(wù)就是這一系列的操作在不同的節(jié)點(diǎn)上,那要如何保證事務(wù)的ACID特性呢。
原子性(atomicity)。一個(gè)事務(wù)是一個(gè)不可分割的工作單位,事務(wù)中包括的操作要么都成功,要么都失敗。
一致性(consistency)。事務(wù)必須是使數(shù)據(jù)庫(kù)從一個(gè)一致性狀態(tài)變到另一個(gè)一致性狀態(tài)。一致性與原子性是密切相關(guān)的。
隔離性(isolation)。一個(gè)事務(wù)的執(zhí)行不能被其他事務(wù)干擾。即一個(gè)事務(wù)內(nèi)部的操作及使用的數(shù)據(jù)對(duì)并發(fā)的其他事務(wù)是隔離的,并發(fā)執(zhí)行的各個(gè)事務(wù)之間不能互相干擾。
持久性(durability)。持久性也稱永久性(permanence),指一個(gè)事務(wù)一旦提交或回滾,數(shù)據(jù)庫(kù)會(huì)對(duì)數(shù)據(jù)持久化的保存。
2. 最終一致性方案
首先,什么叫一致性?一致性指系統(tǒng)中的所有數(shù)據(jù)備份,在同一時(shí)刻具有同樣的值,所有節(jié)點(diǎn)訪問(wèn)同一份最新的數(shù)據(jù)副本。那么,什么又叫最終一致性呢。在此之前,先給大家介紹一下BASE理論。
BA(Basically Available):基本可用。在分布式系統(tǒng)出現(xiàn)故障的時(shí)候,允許犧牲部分非核心功能的可用性,常用的手段是訪問(wèn)部分功能時(shí)進(jìn)入降級(jí)頁(yè)面,來(lái)保障核心業(yè)務(wù)的可用性。
S(Soft state):軟狀態(tài)。允許系統(tǒng)中的數(shù)據(jù)存在中間狀態(tài),并且認(rèn)為該狀態(tài)是不影響系統(tǒng)的整體可用性的,即允許系統(tǒng)在不同節(jié)點(diǎn)上的數(shù)據(jù)備份短暫性的不一致。
E(Eventually consistent):最終一致性。所謂最終一致性,就是數(shù)據(jù)不可能永久的處于軟狀態(tài),在一定的時(shí)間期限內(nèi),所有節(jié)點(diǎn)的數(shù)據(jù)備份應(yīng)當(dāng)是一致的,即數(shù)據(jù)延時(shí)一段時(shí)間后達(dá)到一致性。至于這個(gè)時(shí)間期限,取決于各種因素,包括業(yè)務(wù)需求、網(wǎng)絡(luò)延時(shí)、系統(tǒng)負(fù)載、存儲(chǔ)選型,數(shù)據(jù)復(fù)制方案設(shè)計(jì)等因素。
3. 可靠消息
所謂的可靠消息,即發(fā)布端消息不丟失,可靠抵達(dá)隊(duì)列,消費(fèi)端可靠接收。以RabbitMQ為例,消息的投遞消費(fèi)過(guò)程如下:
1.發(fā)布端確認(rèn)
-
如果使用標(biāo)準(zhǔn)的AMQP協(xié)議,保證消息不丟失的唯一方法就是使用事務(wù),使通道具有事務(wù)性,對(duì)每一條消息的發(fā)布和提交都是事務(wù)性的。在這種情況下,事務(wù)是不必要的重量級(jí),并將吞吐量降低了250倍,為了解決這個(gè)問(wèn)題,就引入了確認(rèn)機(jī)制。
-
confirmCallback確認(rèn)模式
開(kāi)啟發(fā)布者確認(rèn)
spring: rabbitmq: publisher-confirm-type: correlated自定義RabbitTemplate
@Autowired RabbitTemplate rabbitTemplate; @PostConstruct public void initRabbitTemplate(){ rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * * @param correlationData 當(dāng)前消息的唯一關(guān)聯(lián)數(shù)據(jù) * @param b 消息是否成功收到 * @param s 失敗的原因 */ @Override public void confirm(CorrelationData correlationData, boolean b, String s) { System.out.println("當(dāng)前消息【"+correlationData+"】==》服務(wù)端是否收到:"+b+"==》失敗的原因【"+s+"】"); } }); } -
returnCallback未投遞到隊(duì)列退回模式
開(kāi)啟消息抵達(dá)隊(duì)列確認(rèn)
spring: rabbitmq: publisher-returns: true # 開(kāi)啟發(fā)送端消息抵達(dá)隊(duì)列的確認(rèn) template: #只有抵達(dá)隊(duì)列,以異步發(fā)送優(yōu)先回調(diào)returnCallback mandatory: true設(shè)置消息抵達(dá)隊(duì)列回調(diào)
@PostConstruct public void initRabbitTemplate(){ rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /** * 只要消息沒(méi)有投遞給指定的隊(duì)列,就觸發(fā)這個(gè)失敗回調(diào) * @param message 投遞失敗的消息詳細(xì)信息 * @param i 回復(fù)狀態(tài)碼 * @param s 回復(fù)的文本內(nèi)容 * @param s1 當(dāng)時(shí)這個(gè)消息發(fā)送給哪個(gè)交換機(jī) * @param s2 當(dāng)時(shí)這個(gè)消息用哪個(gè)路由鍵 */ @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { System.out.println("失敗信息【"+message+"】==》狀態(tài)碼【"+i+"】==》文本內(nèi)容【"+s+"】==》交換機(jī)【"+s1+"】==》路由鍵【"+s2+"】"); } }); }
-
2. 消費(fèi)端確認(rèn)
ack機(jī)制
默認(rèn)是自動(dòng)確認(rèn)的,只要消息收到,客戶端會(huì)自動(dòng)確認(rèn),服務(wù)端就會(huì)移除這個(gè)問(wèn)題
問(wèn)題:假如收到很多消息,自動(dòng)回復(fù)給服務(wù)器ack,如果一個(gè)消息處理成功,宕機(jī)了。發(fā)生消息丟失
-
消費(fèi)者手動(dòng)確認(rèn)模式:只要沒(méi)有明確告訴MQ,消息被接收,沒(méi)有Ack,消息就一直是unacked狀態(tài),即使服務(wù)器宕機(jī),消息也不會(huì)丟失,會(huì)重新變?yōu)镽eady狀態(tài)。
-
開(kāi)啟手動(dòng)確認(rèn)模式
spring: raabbitmq: listener: simple: acknowledge-mode: manual -
手動(dòng)簽收
long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { // 簽收 long deliveryTag, boolean mulitiple(是否批量模式) channel.basicAck(deliveryTag,false); } catch (IOException e) { e.printStackTrace(); } // 拒簽 long deliveryTag, boolean mulitiple(是否批量模式), boolean requeue(是否重新入隊(duì)) channel.basicNack(deliveryTag,false,false);
-
4. 分布式事務(wù)案例
在電商背景下,以訂單和庫(kù)存系統(tǒng)之間的分布式事務(wù)為例,來(lái)介紹分布式事務(wù)基于消息隊(duì)列的最終一致性方案。下單和扣減庫(kù)存操作要么同時(shí)成功,要么同時(shí)失敗,是事務(wù)的。如果只是本地事務(wù)的話,操作同一數(shù)據(jù)庫(kù),依賴數(shù)據(jù)庫(kù)本身的事務(wù)特性,就可以完成。但是,對(duì)于分布式系統(tǒng)而言,訂單系統(tǒng)和庫(kù)存系統(tǒng)是操作不同的數(shù)據(jù)庫(kù)的,那要如何實(shí)現(xiàn)這樣的分布式事務(wù)。
1. 業(yè)務(wù)流程及問(wèn)題
-
一般的業(yè)務(wù)流程是:下單成功后,遠(yuǎn)程調(diào)用庫(kù)存服務(wù),扣減庫(kù)存。偽代碼如下
public void saveOrder(){ // 創(chuàng)建訂單 Order order = createOrder(); // 保存訂單 orderService.save(order); // 遠(yuǎn)程調(diào)用庫(kù)存服務(wù) wareFeignService.sub(order); } 訂單系統(tǒng)和庫(kù)存系統(tǒng)本地是滿足事務(wù)的。即訂單服務(wù)發(fā)生異常,訂單回滾;庫(kù)存服務(wù)發(fā)生異常,庫(kù)存是會(huì)回滾的。
由于訂單服務(wù)是以內(nèi)嵌的方式遠(yuǎn)程調(diào)用庫(kù)存服務(wù)的,也就是說(shuō),庫(kù)存服務(wù)發(fā)生異常,訂單服務(wù)感知到遠(yuǎn)程調(diào)用異常,從而訂單會(huì)回滾的,對(duì)于業(yè)務(wù)來(lái)說(shuō),這是沒(méi)有問(wèn)題的。
如果訂單服務(wù)在遠(yuǎn)程調(diào)用庫(kù)存服務(wù)之前發(fā)生異常,訂單會(huì)回滾,并且也不會(huì)調(diào)用庫(kù)存服務(wù)來(lái)扣減庫(kù)存,這也是沒(méi)有問(wèn)題的。
如果訂單服務(wù)在遠(yuǎn)程調(diào)用庫(kù)存服務(wù)之后,并且遠(yuǎn)程扣減庫(kù)存操作成功后,發(fā)生異常,則訂單會(huì)回滾,但是遠(yuǎn)程庫(kù)存服務(wù)是無(wú)法回滾的。這就導(dǎo)致了數(shù)據(jù)的不一致性。
2. 基于消息隊(duì)列的解決方案分析
我們使用RabbitMQ來(lái)實(shí)現(xiàn)分布式事務(wù)的最終一致性。
1. 業(yè)務(wù)分析
-
由于用戶下單和支付并不是同時(shí)進(jìn)行,一般都是下單成功后,30min內(nèi)可以支付。那我們來(lái)思考這樣一個(gè)問(wèn)題,如果我們?cè)谙聠纬晒涂蹨p庫(kù)存的話,會(huì)不會(huì)有什么問(wèn)題。
- 惡意刷單。下單后不支付,導(dǎo)致其他人無(wú)法下單。
-
那如果支付成功后再扣減庫(kù)存呢?
- 在支付訂單時(shí),會(huì)出現(xiàn)庫(kù)存不足,支付失敗。
綜合這兩種情況考慮,我們?cè)谙聠纬晒笙孺i定庫(kù)存,支付成功再去扣減庫(kù)存,如果超時(shí)未支付,則解鎖庫(kù)存。
2. 業(yè)務(wù)流程
下單成功,鎖定庫(kù)存
訂單支付超時(shí),則需要自動(dòng)關(guān)閉訂單。
訂單關(guān)閉,庫(kù)存需要解鎖。
3. 定時(shí)任務(wù)
如何確保下單后,30min內(nèi)保留訂單。最先想到的方法應(yīng)該時(shí)定時(shí)任務(wù)。
定時(shí)任務(wù)使用的是系統(tǒng)時(shí)間,我們無(wú)法為每一個(gè)訂單都生成一定時(shí)任務(wù)。
我想大家應(yīng)該都發(fā)現(xiàn)了使用定時(shí)任務(wù)會(huì)帶來(lái)的問(wèn)題,那就是每一個(gè)訂單的保留時(shí)間并不是一致的30min,訂單保留的時(shí)間區(qū)間為(0min,60min)。即在定時(shí)任務(wù)即將到來(lái)前完成下單,和定時(shí)任務(wù)剛結(jié)束完成下單。
所以使用定時(shí)任務(wù)來(lái)完成這個(gè)操作是不可行。
4. RabbitMQ延時(shí)隊(duì)列
利用消息的存活時(shí)間和死信來(lái)完成延時(shí)任務(wù)。
1. 消息的存活時(shí)間(TTL:Time To Live)
RabbitMQ可以對(duì)隊(duì)列和消息分別設(shè)置TTL。
對(duì)隊(duì)列設(shè)置就是隊(duì)列沒(méi)有消費(fèi)者連著的保留時(shí)間,也可以對(duì)每一個(gè)單獨(dú)的消息做單獨(dú)的設(shè)置。超過(guò)了這個(gè)時(shí)間,我們認(rèn)為這個(gè)消息就死了,稱之為死信。
如果隊(duì)列和消息都設(shè)置了,那么會(huì)取小的。所以一個(gè)消息如果被路由到不同的隊(duì)列中,這個(gè)消息的死亡時(shí)間有可能不一樣(不同的隊(duì)列設(shè)置)。
單個(gè)消息的TTL,才是實(shí)現(xiàn)延遲任務(wù)的關(guān)鍵??梢酝ㄟ^(guò)設(shè)置消息的expiration字段或者x-message-ttl屬性來(lái)設(shè)置時(shí)間,兩者是一樣的效果。
2. 死信交換機(jī)(DLX:Dead Letter Exchanges)
-
一個(gè)消息如果滿足如下條件,就會(huì)進(jìn)入死信路由(不是隊(duì)列,一個(gè)路由可以對(duì)應(yīng)多個(gè)隊(duì)列)
一個(gè)消息被消費(fèi)者拒收,并且reject方法的參數(shù)里requeue是false。也就是說(shuō)不會(huì)被再次放在隊(duì)列里,被其他消費(fèi)者使用。
消息的TTL到了,消息過(guò)期了。
隊(duì)列長(zhǎng)度限制滿了,排在前面的消息會(huì)被丟棄或者扔到死信路由上。
在某一個(gè)設(shè)置Dead Letter Exchange的隊(duì)列中有消息過(guò)期了,會(huì)自動(dòng)觸發(fā)消息的轉(zhuǎn)發(fā),發(fā)送到Dead Letter Exchange中去
先控制消息在一段時(shí)間后變成死信,然后控制變成死信的消息被路由到某個(gè)指定的交換機(jī)。二者結(jié)合就可以實(shí)現(xiàn)一個(gè)延時(shí)隊(duì)列。
在下單成功后,發(fā)送一條消息到死信隊(duì)列,經(jīng)過(guò)一段時(shí)間(TTL),死信路由到訂單釋放隊(duì)列,訂單服務(wù)監(jiān)聽(tīng)到消息釋放放訂單。
5. 分布式解決方案
1. 對(duì)以下情況,庫(kù)存需要解鎖。
首先之前提到的,訂單服務(wù)在遠(yuǎn)程調(diào)用成功后,發(fā)生異常,導(dǎo)致訂單回滾,庫(kù)存也需要解鎖。
訂單延時(shí)取消,或者主動(dòng)取消訂單,都需要解鎖庫(kù)存。
訂單服務(wù)下單成功后,訂單服務(wù)宕機(jī),超過(guò)訂單支付時(shí)間,仍然無(wú)法恢復(fù),導(dǎo)致無(wú)法發(fā)送消息通知庫(kù)存服務(wù)解鎖庫(kù)存,故需要自動(dòng)解鎖庫(kù)存。
2. 解決方案
針對(duì)以上情形,庫(kù)存解鎖的方案。
-
第一種情況,可以利用庫(kù)存自動(dòng)解鎖來(lái)解決。庫(kù)存鎖定時(shí)發(fā)送消息到延時(shí)隊(duì)列,經(jīng)過(guò)TTL后,成為死信路由到庫(kù)存解鎖隊(duì)列,庫(kù)存服務(wù)監(jiān)聽(tīng)到消息后,解鎖庫(kù)存。
- 不能在訂單未支付時(shí)就解鎖庫(kù)存,所以庫(kù)存自動(dòng)解鎖的延遲時(shí)間應(yīng)該大于訂單延時(shí)取消的時(shí)間。
對(duì)于第二種情況,主動(dòng)取消或者延時(shí)取消,都可以通過(guò)庫(kù)存的自動(dòng)解鎖來(lái)完成庫(kù)存的解鎖。
-
自動(dòng)解鎖時(shí),需要判斷訂單的狀態(tài),只有為取消狀態(tài)的訂單才可以解鎖庫(kù)存。但是這樣仍然會(huì)存在問(wèn)題。
訂單服務(wù)卡頓,導(dǎo)致訂單狀態(tài)消息一直改不了,而庫(kù)存消息先到期,查詢訂單狀態(tài)為新建狀態(tài),不解鎖庫(kù)存,并刪除消息,導(dǎo)致庫(kù)永遠(yuǎn)無(wú)法解鎖。
解決:訂單超時(shí)取消的同時(shí),發(fā)送訂單取消的消息到隊(duì)列,庫(kù)存服務(wù)監(jiān)聽(tīng)該消息,則解鎖庫(kù)存。
為了防止重復(fù)解鎖,需要滿足冪等性。
3. 代碼實(shí)現(xiàn)
使用消息隊(duì)列實(shí)現(xiàn)分布式事務(wù)的最終一致性方案的流程圖如下:
1. 訂單服務(wù)
- 創(chuàng)建交換機(jī)、隊(duì)列和綁定關(guān)系
@Configuration
public class OrderRabbitMQConfig {
/**
* 使用JSON序列化機(jī)制,進(jìn)行消息轉(zhuǎn)換
* @return
*/
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
@Bean
public Exchange orderEventExchange(){
return new TopicExchange("order-event-exchange",true,false);
}
@Bean
public Queue orderReleaseOrderQueue(){
return new Queue("order.release.order.queue",true,false,false);
}
@Bean
public Queue orderDelayQueue(){
HashMap<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange","order-event-exchange");
args.put("x-dead-letter-routing-key","order.release.order");
args.put("x-message-ttl",60000);
return new Queue("order.delay.queue",true,false,false,args);
}
@Bean
public Binding stockReleaseBinding(){
return new Binding("order.release.order.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.order",
null);
}
@Bean
public Binding stockLockedBinding(){
return new Binding("order.delay.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.create.order",
null);
}
/**
* 訂單釋放直接和庫(kù)存釋放進(jìn)行綁定
*/
@Bean
public Binding orderReleaseOtherBinding(){
return new Binding("stock.release.stock.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.other.#",
null);
}
}
- 訂單創(chuàng)建偽代碼
@Transactional
public Order createOrder(){
// 創(chuàng)建訂單
Order order = createOrder();
// 遠(yuǎn)程調(diào)用庫(kù)存服務(wù),鎖定庫(kù)存
R r = wareFeignService.orderLockStock(wareSkuLockVo);
if (r.getCode() == 0) {
// 鎖定成功,發(fā)送訂單創(chuàng)建消息到延時(shí)隊(duì)列
rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",order);
} else {
// 遠(yuǎn)程調(diào)用失敗,拋出異常
throw new Exception();
}
reture order;
}
- 訂單服務(wù)監(jiān)聽(tīng)訂單釋放信息,關(guān)閉訂單
@RabbitListener(queues = "order.release.order.queue")
@Service
public class OrderCloseListener {
@Autowired
OmsOrderService orderService;
@RabbitHandler
public void listener(OmsOrderEntity orderEntity, Channel channel, Message message) throws IOException {
System.out.println("收到過(guò)期訂單,準(zhǔn)備關(guān)單:"+orderEntity.getBizOrderId());
try{
orderService.closeOrder(orderEntity.getBizOrderId());
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}
- 關(guān)閉訂單,發(fā)送解鎖庫(kù)存消息。偽代碼如下
public void closeOrder(String bizOrderId) {
// 查詢當(dāng)前訂單是否付款
OmsOrderEntity order = getOne(new QueryWrapper<OmsOrderEntity>().eq("biz_order_id", bizOrderId));
if (order != null) {
// 判斷訂單狀態(tài),為新建狀態(tài)才取消
if (order.getOrderStatus() == OrderStatusConstant.CREATE.getCode()){
//過(guò)期未支付,取消訂單,設(shè)置訂單狀態(tài)為取消狀態(tài)
updateOrder.setOrderStatus(OrderStatusConstant.CANCEL.getCode());
// 發(fā)送MQ
rabbitTemplate.convertAndSend("order-event-exchange","order.release.other",orderTo);
}
}
}
2. 庫(kù)存服務(wù)
- 創(chuàng)建交換機(jī)、隊(duì)列和綁定關(guān)系
@Configuration
public class MyRabbitConfig {
/**
* 使用JSON序列化機(jī)制,進(jìn)行消息轉(zhuǎn)換
* @return
*/
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
@Bean
public Exchange stockEventExchange(){
return new TopicExchange("stock-event-exchange",true,true);
}
@Bean
public Queue stockReleaseStockQueue(){
return new Queue("stock.release.stock.queue",true,false,false);
}
@Bean
public Queue stockDelayQueue(){
HashMap<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange","stock-event-exchange");
args.put("x-dead-letter-routing-key","stock.release");
args.put("x-message-ttl",120000);
return new Queue("stock.delay.queue",true,false,false,args);
}
@Bean
public Binding stockReleaseBinding(){
return new Binding("stock.release.stock.queue",
Binding.DestinationType.QUEUE,
"stock-event-exchange",
"stock.release.#",
null);
}
@Bean
public Binding stockLockedBinding(){
return new Binding("stock.delay.queue",
Binding.DestinationType.QUEUE,
"stock-event-exchange",
"stock.locked",
null);
}
}
- 庫(kù)存鎖定,偽代碼如下
@Transactional
public void orderLockStock() {
// 保存工作單
taskService.save(task);
// 為每件商品鎖定庫(kù)存
for (OrderItemVo orderItem : orderItems) {
//判斷庫(kù)存是否足夠
Long count = skuWareDao.lockStock(orderItem);
if (count == 1){
// 鎖定成功
// 保存工作單詳情
taskDetailService.save(taskDetail);
// 自動(dòng)解鎖,發(fā)送工作單詳情,防止回滾后找不到數(shù)據(jù)
// 鎖定庫(kù)存,發(fā)送消息到延時(shí)隊(duì)列
rabbitTemplate.convertAndSend("stock-event-exchange", "stock.locked", taskTo);
} else {
// 鎖定失敗,拋出異常
throw new Exception();
}
}
}
- 監(jiān)聽(tīng)?zhēng)齑娼怄i信息
@Service
@RabbitListener(queues = "stock.release.stock.queue")
public class StockReleaseListener {
@Autowired
WmsSkuWareService wareSkuService;
/**
* 庫(kù)存自動(dòng)解鎖
*只要解鎖庫(kù)存的消息的失敗,需要告訴MQ解鎖失敗,消息不要?jiǎng)h除,重新放回隊(duì)列
* @param taskTo
* @param message
*
*/
@RabbitHandler
public void handleStockLockedRelease(StockLockedTaskTo taskTo, Message message, Channel channel) throws IOException {
System.out.println("收到解鎖庫(kù)存的消息");
try{
wareSkuService.unlockStock(taskTo);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
@RabbitHandler
public void handleOrderCloseRelease(OrderTo orderTo, Message message, Channel channel) throws IOException {
System.out.println("訂單關(guān)閉,準(zhǔn)備解鎖庫(kù)存");
try{
wareSkuService.unlockStock(orderTo);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}
- 自動(dòng)解鎖庫(kù)存,偽代碼如下
@Transactional
public void unlockStock(StockLockedTaskTo taskTo) {
// 判斷是否存在該任務(wù)
Detail detail = detailService.getById(taskTo.detailId)
if (detail != null){
// 查詢訂單狀態(tài),訂單狀態(tài)為【取消】,則解鎖庫(kù)存
// 遠(yuǎn)程調(diào)用訂單服務(wù),獲取訂單狀態(tài)
R r = orderFeignService.getOrderStatus(bizOrderId);
if (r.getCode() == 0) {
// 遠(yuǎn)程查詢成功
OrderVo orderVo = r.getData(new TypeReference<OrderVo>() {
});
if (orderVo == null || orderVo.getOrderStatus() == OrderStatusConstant.CANCEL.getCode()) {
// 訂單不存在,或者訂單已取消,都需要解鎖庫(kù)存
if (detail.getLockStatus() == WareTaskStatusConstant.LOCKED.getCode()){
//鎖定狀態(tài)下才需要解鎖,已解鎖的不用在解鎖
unlockStock(detail.getId(),detail.getSkuId(),detail.getSkuNum());
// 更新taskDetail狀態(tài)為已解鎖
taskDetailService.updateById(taskDetailEntity);
}
}
} else {
throw new RuntimeException("遠(yuǎn)程調(diào)用訂單服務(wù)失敗");
}
}
}
- 訂單取消,解鎖庫(kù)存,偽代碼如下
@Transactional
public void unlockStock(OrderTo orderTo) {
// 無(wú)需查詢訂單最新?tīng)顟B(tài),能來(lái)到這,肯定更新了訂單狀態(tài)的
// 判斷任務(wù)是否存在
WmsOrderTaskEntity task = taskService.getOne(new QueryWrapper<WmsOrderTaskEntity>().eq("biz_order_id", bizOrderId));
if (task != null){
// 任務(wù)存在,獲取任務(wù)項(xiàng)狀態(tài)為鎖定狀態(tài)的所有任務(wù)項(xiàng)
List<WmsOrderTaskDetailEntity> taskDetails = taskDetailService.list(new QueryWrapper<WmsOrderTaskDetailEntity>()
.eq("task_id", task.getId())
.eq("lock_status", WareTaskStatusConstant.LOCKED.getCode()));
if (taskDetails != null && taskDetails.size() > 0){
for (WmsOrderTaskDetailEntity taskDetail : taskDetails) {
// 解鎖庫(kù)存
unlockStock(taskDetail.getId(),taskDetail.getSkuId(),taskDetail.getSkuNum());
// 更新taskDetail狀態(tài)為已解鎖
taskDetailService.updateById(taskDetailEntity);
}
}
}
}
使用RabbitMQ實(shí)現(xiàn)分布式事務(wù)的最終一致性方案的大致流程解析完畢。如果對(duì)源碼感興趣的,歡迎到github倉(cāng)庫(kù)clone。
1. 分布式事務(wù)
所謂事務(wù),通俗一點(diǎn)講就是一系列操作要么同時(shí)成功,要么同時(shí)失敗。而分布式事務(wù)就是這一系列的操作在不同的節(jié)點(diǎn)上,那要如何保證事務(wù)的ACID特性呢。
原子性(atomicity)。一個(gè)事務(wù)是一個(gè)不可分割的工作單位,事務(wù)中包括的操作要么都成功,要么都失敗。
一致性(consistency)。事務(wù)必須是使數(shù)據(jù)庫(kù)從一個(gè)一致性狀態(tài)變到另一個(gè)一致性狀態(tài)。一致性與原子性是密切相關(guān)的。
隔離性(isolation)。一個(gè)事務(wù)的執(zhí)行不能被其他事務(wù)干擾。即一個(gè)事務(wù)內(nèi)部的操作及使用的數(shù)據(jù)對(duì)并發(fā)的其他事務(wù)是隔離的,并發(fā)執(zhí)行的各個(gè)事務(wù)之間不能互相干擾。
持久性(durability)。持久性也稱永久性(permanence),指一個(gè)事務(wù)一旦提交或回滾,數(shù)據(jù)庫(kù)會(huì)對(duì)數(shù)據(jù)持久化的保存。
2. 最終一致性方案
首先,什么叫一致性?一致性指系統(tǒng)中的所有數(shù)據(jù)備份,在同一時(shí)刻具有同樣的值,所有節(jié)點(diǎn)訪問(wèn)同一份最新的數(shù)據(jù)副本。那么,什么又叫最終一致性呢。在此之前,先給大家介紹一下BASE理論。
BA(Basically Available):基本可用。在分布式系統(tǒng)出現(xiàn)故障的時(shí)候,允許犧牲部分非核心功能的可用性,常用的手段是訪問(wèn)部分功能時(shí)進(jìn)入降級(jí)頁(yè)面,來(lái)保障核心業(yè)務(wù)的可用性。
S(Soft state):軟狀態(tài)。允許系統(tǒng)中的數(shù)據(jù)存在中間狀態(tài),并且認(rèn)為該狀態(tài)是不影響系統(tǒng)的整體可用性的,即允許系統(tǒng)在不同節(jié)點(diǎn)上的數(shù)據(jù)備份短暫性的不一致。
E(Eventually consistent):最終一致性。所謂最終一致性,就是數(shù)據(jù)不可能永久的處于軟狀態(tài),在一定的時(shí)間期限內(nèi),所有節(jié)點(diǎn)的數(shù)據(jù)備份應(yīng)當(dāng)是一致的,即數(shù)據(jù)延時(shí)一段時(shí)間后達(dá)到一致性。至于這個(gè)時(shí)間期限,取決于各種因素,包括業(yè)務(wù)需求、網(wǎng)絡(luò)延時(shí)、系統(tǒng)負(fù)載、存儲(chǔ)選型,數(shù)據(jù)復(fù)制方案設(shè)計(jì)等因素。
3. 可靠消息
所謂的可靠消息,即發(fā)布端消息不丟失,可靠抵達(dá)隊(duì)列,消費(fèi)端可靠接收。以RabbitMQ為例,消息的投遞消費(fèi)過(guò)程如下:
1.發(fā)布端確認(rèn)
-
如果使用標(biāo)準(zhǔn)的AMQP協(xié)議,保證消息不丟失的唯一方法就是使用事務(wù),使通道具有事務(wù)性,對(duì)每一條消息的發(fā)布和提交都是事務(wù)性的。在這種情況下,事務(wù)是不必要的重量級(jí),并將吞吐量降低了250倍,為了解決這個(gè)問(wèn)題,就引入了確認(rèn)機(jī)制。
-
confirmCallback確認(rèn)模式
開(kāi)啟發(fā)布者確認(rèn)
spring: rabbitmq: publisher-confirm-type: correlated自定義RabbitTemplate
@Autowired RabbitTemplate rabbitTemplate; @PostConstruct public void initRabbitTemplate(){ rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { /** * * @param correlationData 當(dāng)前消息的唯一關(guān)聯(lián)數(shù)據(jù) * @param b 消息是否成功收到 * @param s 失敗的原因 */ @Override public void confirm(CorrelationData correlationData, boolean b, String s) { System.out.println("當(dāng)前消息【"+correlationData+"】==》服務(wù)端是否收到:"+b+"==》失敗的原因【"+s+"】"); } }); } -
returnCallback未投遞到隊(duì)列退回模式
開(kāi)啟消息抵達(dá)隊(duì)列確認(rèn)
spring: rabbitmq: publisher-returns: true # 開(kāi)啟發(fā)送端消息抵達(dá)隊(duì)列的確認(rèn) template: #只有抵達(dá)隊(duì)列,以異步發(fā)送優(yōu)先回調(diào)returnCallback mandatory: true設(shè)置消息抵達(dá)隊(duì)列回調(diào)
@PostConstruct public void initRabbitTemplate(){ rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { /** * 只要消息沒(méi)有投遞給指定的隊(duì)列,就觸發(fā)這個(gè)失敗回調(diào) * @param message 投遞失敗的消息詳細(xì)信息 * @param i 回復(fù)狀態(tài)碼 * @param s 回復(fù)的文本內(nèi)容 * @param s1 當(dāng)時(shí)這個(gè)消息發(fā)送給哪個(gè)交換機(jī) * @param s2 當(dāng)時(shí)這個(gè)消息用哪個(gè)路由鍵 */ @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { System.out.println("失敗信息【"+message+"】==》狀態(tài)碼【"+i+"】==》文本內(nèi)容【"+s+"】==》交換機(jī)【"+s1+"】==》路由鍵【"+s2+"】"); } }); }
-
2. 消費(fèi)端確認(rèn)
ack機(jī)制
默認(rèn)是自動(dòng)確認(rèn)的,只要消息收到,客戶端會(huì)自動(dòng)確認(rèn),服務(wù)端就會(huì)移除這個(gè)問(wèn)題
問(wèn)題:假如收到很多消息,自動(dòng)回復(fù)給服務(wù)器ack,如果一個(gè)消息處理成功,宕機(jī)了。發(fā)生消息丟失
-
消費(fèi)者手動(dòng)確認(rèn)模式:只要沒(méi)有明確告訴MQ,消息被接收,沒(méi)有Ack,消息就一直是unacked狀態(tài),即使服務(wù)器宕機(jī),消息也不會(huì)丟失,會(huì)重新變?yōu)镽eady狀態(tài)。
-
開(kāi)啟手動(dòng)確認(rèn)模式
spring: raabbitmq: listener: simple: acknowledge-mode: manual -
手動(dòng)簽收
long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { // 簽收 long deliveryTag, boolean mulitiple(是否批量模式) channel.basicAck(deliveryTag,false); } catch (IOException e) { e.printStackTrace(); } // 拒簽 long deliveryTag, boolean mulitiple(是否批量模式), boolean requeue(是否重新入隊(duì)) channel.basicNack(deliveryTag,false,false);
-
4. 分布式事務(wù)案例
在電商背景下,以訂單和庫(kù)存系統(tǒng)之間的分布式事務(wù)為例,來(lái)介紹分布式事務(wù)基于消息隊(duì)列的最終一致性方案。下單和扣減庫(kù)存操作要么同時(shí)成功,要么同時(shí)失敗,是事務(wù)的。如果只是本地事務(wù)的話,操作同一數(shù)據(jù)庫(kù),依賴數(shù)據(jù)庫(kù)本身的事務(wù)特性,就可以完成。但是,對(duì)于分布式系統(tǒng)而言,訂單系統(tǒng)和庫(kù)存系統(tǒng)是操作不同的數(shù)據(jù)庫(kù)的,那要如何實(shí)現(xiàn)這樣的分布式事務(wù)。
1. 業(yè)務(wù)流程及問(wèn)題
-
一般的業(yè)務(wù)流程是:下單成功后,遠(yuǎn)程調(diào)用庫(kù)存服務(wù),扣減庫(kù)存。偽代碼如下
public void saveOrder(){ // 創(chuàng)建訂單 Order order = createOrder(); // 保存訂單 orderService.save(order); // 遠(yuǎn)程調(diào)用庫(kù)存服務(wù) wareFeignService.sub(order); } 訂單系統(tǒng)和庫(kù)存系統(tǒng)本地是滿足事務(wù)的。即訂單服務(wù)發(fā)生異常,訂單回滾;庫(kù)存服務(wù)發(fā)生異常,庫(kù)存是會(huì)回滾的。
由于訂單服務(wù)是以內(nèi)嵌的方式遠(yuǎn)程調(diào)用庫(kù)存服務(wù)的,也就是說(shuō),庫(kù)存服務(wù)發(fā)生異常,訂單服務(wù)感知到遠(yuǎn)程調(diào)用異常,從而訂單會(huì)回滾的,對(duì)于業(yè)務(wù)來(lái)說(shuō),這是沒(méi)有問(wèn)題的。
如果訂單服務(wù)在遠(yuǎn)程調(diào)用庫(kù)存服務(wù)之前發(fā)生異常,訂單會(huì)回滾,并且也不會(huì)調(diào)用庫(kù)存服務(wù)來(lái)扣減庫(kù)存,這也是沒(méi)有問(wèn)題的。
如果訂單服務(wù)在遠(yuǎn)程調(diào)用庫(kù)存服務(wù)之后,并且遠(yuǎn)程扣減庫(kù)存操作成功后,發(fā)生異常,則訂單會(huì)回滾,但是遠(yuǎn)程庫(kù)存服務(wù)是無(wú)法回滾的。這就導(dǎo)致了數(shù)據(jù)的不一致性。
2. 基于消息隊(duì)列的解決方案分析
我們使用RabbitMQ來(lái)實(shí)現(xiàn)分布式事務(wù)的最終一致性。
1. 業(yè)務(wù)分析
-
由于用戶下單和支付并不是同時(shí)進(jìn)行,一般都是下單成功后,30min內(nèi)可以支付。那我們來(lái)思考這樣一個(gè)問(wèn)題,如果我們?cè)谙聠纬晒涂蹨p庫(kù)存的話,會(huì)不會(huì)有什么問(wèn)題。
- 惡意刷單。下單后不支付,導(dǎo)致其他人無(wú)法下單。
-
那如果支付成功后再扣減庫(kù)存呢?
- 在支付訂單時(shí),會(huì)出現(xiàn)庫(kù)存不足,支付失敗。
綜合這兩種情況考慮,我們?cè)谙聠纬晒笙孺i定庫(kù)存,支付成功再去扣減庫(kù)存,如果超時(shí)未支付,則解鎖庫(kù)存。
2. 業(yè)務(wù)流程
下單成功,鎖定庫(kù)存
訂單支付超時(shí),則需要自動(dòng)關(guān)閉訂單。
訂單關(guān)閉,庫(kù)存需要解鎖。
3. 定時(shí)任務(wù)
如何確保下單后,30min內(nèi)保留訂單。最先想到的方法應(yīng)該時(shí)定時(shí)任務(wù)。
定時(shí)任務(wù)使用的是系統(tǒng)時(shí)間,我們無(wú)法為每一個(gè)訂單都生成一定時(shí)任務(wù)。
我想大家應(yīng)該都發(fā)現(xiàn)了使用定時(shí)任務(wù)會(huì)帶來(lái)的問(wèn)題,那就是每一個(gè)訂單的保留時(shí)間并不是一致的30min,訂單保留的時(shí)間區(qū)間為(0min,60min)。即在定時(shí)任務(wù)即將到來(lái)前完成下單,和定時(shí)任務(wù)剛結(jié)束完成下單。
所以使用定時(shí)任務(wù)來(lái)完成這個(gè)操作是不可行。
4. RabbitMQ延時(shí)隊(duì)列
利用消息的存活時(shí)間和死信來(lái)完成延時(shí)任務(wù)。
1. 消息的存活時(shí)間(TTL:Time To Live)
RabbitMQ可以對(duì)隊(duì)列和消息分別設(shè)置TTL。
對(duì)隊(duì)列設(shè)置就是隊(duì)列沒(méi)有消費(fèi)者連著的保留時(shí)間,也可以對(duì)每一個(gè)單獨(dú)的消息做單獨(dú)的設(shè)置。超過(guò)了這個(gè)時(shí)間,我們認(rèn)為這個(gè)消息就死了,稱之為死信。
如果隊(duì)列和消息都設(shè)置了,那么會(huì)取小的。所以一個(gè)消息如果被路由到不同的隊(duì)列中,這個(gè)消息的死亡時(shí)間有可能不一樣(不同的隊(duì)列設(shè)置)。
單個(gè)消息的TTL,才是實(shí)現(xiàn)延遲任務(wù)的關(guān)鍵??梢酝ㄟ^(guò)設(shè)置消息的expiration字段或者x-message-ttl屬性來(lái)設(shè)置時(shí)間,兩者是一樣的效果。
2. 死信交換機(jī)(DLX:Dead Letter Exchanges)
-
一個(gè)消息如果滿足如下條件,就會(huì)進(jìn)入死信路由(不是隊(duì)列,一個(gè)路由可以對(duì)應(yīng)多個(gè)隊(duì)列)
一個(gè)消息被消費(fèi)者拒收,并且reject方法的參數(shù)里requeue是false。也就是說(shuō)不會(huì)被再次放在隊(duì)列里,被其他消費(fèi)者使用。
消息的TTL到了,消息過(guò)期了。
隊(duì)列長(zhǎng)度限制滿了,排在前面的消息會(huì)被丟棄或者扔到死信路由上。
在某一個(gè)設(shè)置Dead Letter Exchange的隊(duì)列中有消息過(guò)期了,會(huì)自動(dòng)觸發(fā)消息的轉(zhuǎn)發(fā),發(fā)送到Dead Letter Exchange中去
先控制消息在一段時(shí)間后變成死信,然后控制變成死信的消息被路由到某個(gè)指定的交換機(jī)。二者結(jié)合就可以實(shí)現(xiàn)一個(gè)延時(shí)隊(duì)列。
在下單成功后,發(fā)送一條消息到死信隊(duì)列,經(jīng)過(guò)一段時(shí)間(TTL),死信路由到訂單釋放隊(duì)列,訂單服務(wù)監(jiān)聽(tīng)到消息釋放放訂單。
5. 分布式解決方案
1. 對(duì)以下情況,庫(kù)存需要解鎖。
首先之前提到的,訂單服務(wù)在遠(yuǎn)程調(diào)用成功后,發(fā)生異常,導(dǎo)致訂單回滾,庫(kù)存也需要解鎖。
訂單延時(shí)取消,或者主動(dòng)取消訂單,都需要解鎖庫(kù)存。
訂單服務(wù)下單成功后,訂單服務(wù)宕機(jī),超過(guò)訂單支付時(shí)間,仍然無(wú)法恢復(fù),導(dǎo)致無(wú)法發(fā)送消息通知庫(kù)存服務(wù)解鎖庫(kù)存,故需要自動(dòng)解鎖庫(kù)存。
2. 解決方案
針對(duì)以上情形,庫(kù)存解鎖的方案。
-
第一種情況,可以利用庫(kù)存自動(dòng)解鎖來(lái)解決。庫(kù)存鎖定時(shí)發(fā)送消息到延時(shí)隊(duì)列,經(jīng)過(guò)TTL后,成為死信路由到庫(kù)存解鎖隊(duì)列,庫(kù)存服務(wù)監(jiān)聽(tīng)到消息后,解鎖庫(kù)存。
- 不能在訂單未支付時(shí)就解鎖庫(kù)存,所以庫(kù)存自動(dòng)解鎖的延遲時(shí)間應(yīng)該大于訂單延時(shí)取消的時(shí)間。
對(duì)于第二種情況,主動(dòng)取消或者延時(shí)取消,都可以通過(guò)庫(kù)存的自動(dòng)解鎖來(lái)完成庫(kù)存的解鎖。
-
自動(dòng)解鎖時(shí),需要判斷訂單的狀態(tài),只有為取消狀態(tài)的訂單才可以解鎖庫(kù)存。但是這樣仍然會(huì)存在問(wèn)題。
訂單服務(wù)卡頓,導(dǎo)致訂單狀態(tài)消息一直改不了,而庫(kù)存消息先到期,查詢訂單狀態(tài)為新建狀態(tài),不解鎖庫(kù)存,并刪除消息,導(dǎo)致庫(kù)永遠(yuǎn)無(wú)法解鎖。
解決:訂單超時(shí)取消的同時(shí),發(fā)送訂單取消的消息到隊(duì)列,庫(kù)存服務(wù)監(jiān)聽(tīng)該消息,則解鎖庫(kù)存。
為了防止重復(fù)解鎖,需要滿足冪等性。
3. 代碼實(shí)現(xiàn)
使用消息隊(duì)列實(shí)現(xiàn)分布式事務(wù)的最終一致性方案的流程圖如下:
1. 訂單服務(wù)
- 創(chuàng)建交換機(jī)、隊(duì)列和綁定關(guān)系
@Configuration
public class OrderRabbitMQConfig {
/**
* 使用JSON序列化機(jī)制,進(jìn)行消息轉(zhuǎn)換
* @return
*/
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
@Bean
public Exchange orderEventExchange(){
return new TopicExchange("order-event-exchange",true,false);
}
@Bean
public Queue orderReleaseOrderQueue(){
return new Queue("order.release.order.queue",true,false,false);
}
@Bean
public Queue orderDelayQueue(){
HashMap<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange","order-event-exchange");
args.put("x-dead-letter-routing-key","order.release.order");
args.put("x-message-ttl",60000);
return new Queue("order.delay.queue",true,false,false,args);
}
@Bean
public Binding stockReleaseBinding(){
return new Binding("order.release.order.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.order",
null);
}
@Bean
public Binding stockLockedBinding(){
return new Binding("order.delay.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.create.order",
null);
}
/**
* 訂單釋放直接和庫(kù)存釋放進(jìn)行綁定
*/
@Bean
public Binding orderReleaseOtherBinding(){
return new Binding("stock.release.stock.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.other.#",
null);
}
}
- 訂單創(chuàng)建偽代碼
@Transactional
public Order createOrder(){
// 創(chuàng)建訂單
Order order = createOrder();
// 遠(yuǎn)程調(diào)用庫(kù)存服務(wù),鎖定庫(kù)存
R r = wareFeignService.orderLockStock(wareSkuLockVo);
if (r.getCode() == 0) {
// 鎖定成功,發(fā)送訂單創(chuàng)建消息到延時(shí)隊(duì)列
rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",order);
} else {
// 遠(yuǎn)程調(diào)用失敗,拋出異常
throw new Exception();
}
reture order;
}
- 訂單服務(wù)監(jiān)聽(tīng)訂單釋放信息,關(guān)閉訂單
@RabbitListener(queues = "order.release.order.queue")
@Service
public class OrderCloseListener {
@Autowired
OmsOrderService orderService;
@RabbitHandler
public void listener(OmsOrderEntity orderEntity, Channel channel, Message message) throws IOException {
System.out.println("收到過(guò)期訂單,準(zhǔn)備關(guān)單:"+orderEntity.getBizOrderId());
try{
orderService.closeOrder(orderEntity.getBizOrderId());
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}
- 關(guān)閉訂單,發(fā)送解鎖庫(kù)存消息。偽代碼如下
public void closeOrder(String bizOrderId) {
// 查詢當(dāng)前訂單是否付款
OmsOrderEntity order = getOne(new QueryWrapper<OmsOrderEntity>().eq("biz_order_id", bizOrderId));
if (order != null) {
// 判斷訂單狀態(tài),為新建狀態(tài)才取消
if (order.getOrderStatus() == OrderStatusConstant.CREATE.getCode()){
//過(guò)期未支付,取消訂單,設(shè)置訂單狀態(tài)為取消狀態(tài)
updateOrder.setOrderStatus(OrderStatusConstant.CANCEL.getCode());
// 發(fā)送MQ
rabbitTemplate.convertAndSend("order-event-exchange","order.release.other",orderTo);
}
}
}
2. 庫(kù)存服務(wù)
- 創(chuàng)建交換機(jī)、隊(duì)列和綁定關(guān)系
@Configuration
public class MyRabbitConfig {
/**
* 使用JSON序列化機(jī)制,進(jìn)行消息轉(zhuǎn)換
* @return
*/
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
@Bean
public Exchange stockEventExchange(){
return new TopicExchange("stock-event-exchange",true,true);
}
@Bean
public Queue stockReleaseStockQueue(){
return new Queue("stock.release.stock.queue",true,false,false);
}
@Bean
public Queue stockDelayQueue(){
HashMap<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange","stock-event-exchange");
args.put("x-dead-letter-routing-key","stock.release");
args.put("x-message-ttl",120000);
return new Queue("stock.delay.queue",true,false,false,args);
}
@Bean
public Binding stockReleaseBinding(){
return new Binding("stock.release.stock.queue",
Binding.DestinationType.QUEUE,
"stock-event-exchange",
"stock.release.#",
null);
}
@Bean
public Binding stockLockedBinding(){
return new Binding("stock.delay.queue",
Binding.DestinationType.QUEUE,
"stock-event-exchange",
"stock.locked",
null);
}
}
- 庫(kù)存鎖定,偽代碼如下
@Transactional
public void orderLockStock() {
// 保存工作單
taskService.save(task);
// 為每件商品鎖定庫(kù)存
for (OrderItemVo orderItem : orderItems) {
//判斷庫(kù)存是否足夠
Long count = skuWareDao.lockStock(orderItem);
if (count == 1){
// 鎖定成功
// 保存工作單詳情
taskDetailService.save(taskDetail);
// 自動(dòng)解鎖,發(fā)送工作單詳情,防止回滾后找不到數(shù)據(jù)
// 鎖定庫(kù)存,發(fā)送消息到延時(shí)隊(duì)列
rabbitTemplate.convertAndSend("stock-event-exchange", "stock.locked", taskTo);
} else {
// 鎖定失敗,拋出異常
throw new Exception();
}
}
}
- 監(jiān)聽(tīng)?zhēng)齑娼怄i信息
@Service
@RabbitListener(queues = "stock.release.stock.queue")
public class StockReleaseListener {
@Autowired
WmsSkuWareService wareSkuService;
/**
* 庫(kù)存自動(dòng)解鎖
*只要解鎖庫(kù)存的消息的失敗,需要告訴MQ解鎖失敗,消息不要?jiǎng)h除,重新放回隊(duì)列
* @param taskTo
* @param message
*
*/
@RabbitHandler
public void handleStockLockedRelease(StockLockedTaskTo taskTo, Message message, Channel channel) throws IOException {
System.out.println("收到解鎖庫(kù)存的消息");
try{
wareSkuService.unlockStock(taskTo);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
@RabbitHandler
public void handleOrderCloseRelease(OrderTo orderTo, Message message, Channel channel) throws IOException {
System.out.println("訂單關(guān)閉,準(zhǔn)備解鎖庫(kù)存");
try{
wareSkuService.unlockStock(orderTo);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}catch (Exception e){
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}
- 自動(dòng)解鎖庫(kù)存,偽代碼如下
@Transactional
public void unlockStock(StockLockedTaskTo taskTo) {
// 判斷是否存在該任務(wù)
Detail detail = detailService.getById(taskTo.detailId)
if (detail != null){
// 查詢訂單狀態(tài),訂單狀態(tài)為【取消】,則解鎖庫(kù)存
// 遠(yuǎn)程調(diào)用訂單服務(wù),獲取訂單狀態(tài)
R r = orderFeignService.getOrderStatus(bizOrderId);
if (r.getCode() == 0) {
// 遠(yuǎn)程查詢成功
OrderVo orderVo = r.getData(new TypeReference<OrderVo>() {
});
if (orderVo == null || orderVo.getOrderStatus() == OrderStatusConstant.CANCEL.getCode()) {
// 訂單不存在,或者訂單已取消,都需要解鎖庫(kù)存
if (detail.getLockStatus() == WareTaskStatusConstant.LOCKED.getCode()){
//鎖定狀態(tài)下才需要解鎖,已解鎖的不用在解鎖
unlockStock(detail.getId(),detail.getSkuId(),detail.getSkuNum());
// 更新taskDetail狀態(tài)為已解鎖
taskDetailService.updateById(taskDetailEntity);
}
}
} else {
throw new RuntimeException("遠(yuǎn)程調(diào)用訂單服務(wù)失敗");
}
}
}
- 訂單取消,解鎖庫(kù)存,偽代碼如下
@Transactional
public void unlockStock(OrderTo orderTo) {
// 無(wú)需查詢訂單最新?tīng)顟B(tài),能來(lái)到這,肯定更新了訂單狀態(tài)的
// 判斷任務(wù)是否存在
WmsOrderTaskEntity task = taskService.getOne(new QueryWrapper<WmsOrderTaskEntity>().eq("biz_order_id", bizOrderId));
if (task != null){
// 任務(wù)存在,獲取任務(wù)項(xiàng)狀態(tài)為鎖定狀態(tài)的所有任務(wù)項(xiàng)
List<WmsOrderTaskDetailEntity> taskDetails = taskDetailService.list(new QueryWrapper<WmsOrderTaskDetailEntity>()
.eq("task_id", task.getId())
.eq("lock_status", WareTaskStatusConstant.LOCKED.getCode()));
if (taskDetails != null && taskDetails.size() > 0){
for (WmsOrderTaskDetailEntity taskDetail : taskDetails) {
// 解鎖庫(kù)存
unlockStock(taskDetail.getId(),taskDetail.getSkuId(),taskDetail.getSkuNum());
// 更新taskDetail狀態(tài)為已解鎖
taskDetailService.updateById(taskDetailEntity);
}
}
}
}
使用RabbitMQ實(shí)現(xiàn)分布式事務(wù)的最終一致性方案的大致流程解析完畢。如果對(duì)源碼感興趣的,歡迎到github倉(cāng)庫(kù)clone。