說明
前面基本上涵蓋了一個項目中配置簡單狀態(tài)機的相關(guān)實現(xiàn),不過還有一個關(guān)鍵點,就是外部代碼如何調(diào)用狀態(tài)機,以及如何讓狀態(tài)機的持久化與業(yè)務邏輯代碼在同一個事務中,避免狀態(tài)機中狀態(tài)與實際訂單中不一致,造成臟數(shù)據(jù)。
調(diào)用方式
外部調(diào)用狀態(tài)機引擎,需要以下三步:
- 通過創(chuàng)建/讀取的方式獲取當前訂單對應的狀態(tài)機引擎實例
- 構(gòu)造message。
- 發(fā)送message。
需要注意以下幾點:
- 在狀態(tài)機發(fā)送完message之后,spring statemachine會通過ActionListener來監(jiān)聽,同時判斷需要走到哪個Action中
- 只有在sendMessage完成之后,狀態(tài)機的當前狀態(tài)才會更新為target狀態(tài)
所以對于調(diào)用狀態(tài)機,做了以下代碼封裝:
接口:
/**
* 存在狀態(tài)機做串聯(lián)時,統(tǒng)一的事務處理,將狀態(tài)機實例持久化也囊括在統(tǒng)一的事務中
*/
public interface StateMachineSendEventManager {
/**
* 發(fā)送狀態(tài)機event,調(diào)用bizManagerImpl中具體實現(xiàn),同時處理狀態(tài)機持久化
* <p>
* 用于訂單的狀態(tài)變更
*
* @param request
* @param operationTypeEnum
* @param eventEnum
* @return
* @throws BusinessException
*/
OrderBaseResponse sendStatusChangeEvent(BizOrderStatusRequest request,
BizOrderOperationTypeEnum operationTypeEnum,
BizOrderStatusChangeEventEnum eventEnum) throws Exception;
/**
* 同上,不過是用于訂單創(chuàng)建場景
*
* @param request
* @param operationTypeEnum
* @param eventEnum
* @return
* @throws Exception
*/
BizOrderCreateResponse sendOrderCreateEvent(BizOrderCreateRequest request,
BizOrderOperationTypeEnum operationTypeEnum,
BizOrderStatusChangeEventEnum eventEnum) throws Exception;
}
對應實現(xiàn)
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.statemachine.StateMachine;
import org.springframework.statemachine.persist.StateMachinePersister;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;
@Slf4j
@Component("stateMachineSendEventManager")
public class StateMachineSendEventManagerImpl implements StateMachineSendEventManager {
@Autowired
private BizOrderRepository bizOrderRepository;
@Autowired
private BizOrderStateMachineBuildFactory bizOrderStateMachineBuildFactory;
@Autowired
@Qualifier("bizOrderRedisStateMachinePersister")
private StateMachinePersister<BizOrderStatusEnum,BizOrderStatusChangeEventEnum,String> bizOrderRedisStateMachinePersister;
/**
* 發(fā)送狀態(tài)機event,調(diào)用bizManagerImpl中具體實現(xiàn),同時處理狀態(tài)機持久化
* <p>
* 這里會send stateMachine event,從而跳轉(zhuǎn)到對應的action --> bizManagerImpl,出現(xiàn)事務嵌套的情況
* <p>
* 不過事務傳播默認是TransactionDefinition.PROPAGATION_REQUIRED,所以還是同一個事務中,
* 只是事務范圍擴大至stateMachine的持久化場景了,不要修改默認的傳播機制
*
* @param request
* @return
* @throws BusinessException
*/
@Override
@Transactional(value = "finOrderocTransactionManager", rollbackFor = {BusinessException.class, Exception.class})
public OrderBaseResponse sendStatusChangeEvent(BizOrderStatusRequest request,
BizOrderOperationTypeEnum operationTypeEnum,
BizOrderStatusChangeEventEnum eventEnum) throws Exception {
// 獲取狀態(tài)機信息
StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> stateMachine =
getStateMachineFromStatusReq(request, operationTypeEnum);
boolean result = statusChangeCommonOps(stateMachine, request, eventEnum);
OrderBaseResponse resp = new OrderBaseResponse();
if (!result) {
resp.setResultCode(BizOrderErrorCode.ORDER_STATE_MACHINE_EXECUTE_ERR.name());
resp.setMsg("訂單狀態(tài)操作異常");
}
log.info("order status change resp is {}", resp);
// 更新redis中數(shù)據(jù)
// 發(fā)送event寫log的動作還是放在業(yè)務里面,這里無法囊括所有業(yè)務數(shù)據(jù)
if (result) {
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
// 將數(shù)據(jù)持久化到redis中,以bizOrderId作為對應Key
try {
bizOrderRedisStateMachinePersister.persist(stateMachine, request.getBizCode());
} catch (Exception e) {
log.error("Persist bizOrderStateMachine error", e);
}
}
});
}
return resp;
}
/**
* 同上,不過是用于訂單創(chuàng)建場景,請求為BizOrderCreateRequest
*
* @param bizOrderCreateRequest
* @param operationTypeEnum
* @param eventEnum
* @return
* @throws Exception
*/
@Override
@Transactional(value = "finOrderocTransactionManager", rollbackFor = {BusinessException.class, Exception.class})
public BizOrderCreateResponse sendOrderCreateEvent(BizOrderCreateRequest bizOrderCreateRequest,
BizOrderOperationTypeEnum operationTypeEnum,
BizOrderStatusChangeEventEnum eventEnum) throws Exception {
// 獲取對應的stateMachine
StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> stateMachine =
getStateMachineFromCreateReq(bizOrderCreateRequest, operationTypeEnum);
Message<BizOrderStatusChangeEventEnum> eventMsg = MessageBuilder.withPayload(eventEnum)
// key 與 status change 時不同,對應的model也不同
.setHeader(BizOrderConstants.BIZORDER_CONTEXT_CREATE_KEY, bizOrderCreateRequest)
// 根據(jù)傳遞過來的訂單狀態(tài)決定后續(xù)choice跳轉(zhuǎn)
.setHeader(BizOrderConstants.FINAL_STATUS_KEY, bizOrderCreateRequest.getBizOrderCreateModel().getOrderStatus())
.build();
BizOrderCreateResponse createResponse = new BizOrderCreateResponse();
boolean sendResult = false;
if (BizOrderStateMachineUtils.acceptEvent(stateMachine, eventMsg)) {
sendResult = stateMachine.sendEvent(eventMsg);
log.info("order statemachine send event={},result={}", eventMsg, sendResult);
} else {
createResponse.setResultCode(BizOrderErrorCode.NO_ORDER_STATE_MACHINE_TRANSTION_ERR.name());
createResponse.setMsg("當前訂單無法執(zhí)行請求動作");
}
if (sendResult) {
createResponse.setBizOrderId(bizOrderCreateRequest.getBizOrderCreateModel().getBizOrderId());
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
@Override
public void afterCommit() {
// 將數(shù)據(jù)持久化到redis中,以bizOrderId作為對應Key
try {
bizOrderRedisStateMachinePersister.persist(stateMachine,
createResponse.getBizOrderId());
} catch (Exception e) {
throw new BusinessException(BizOrderErrorCode.ORDER_STATE_MACHINE_EXECUTE_ERR, "狀態(tài)機持久化失敗");
}
}
});
}
return createResponse;
}
/**
* 狀態(tài)處理的通用操作抽取
*
* @param stateMachine 狀態(tài)機
* @param statusRequest 狀態(tài)變更請求
* @return 執(zhí)行結(jié)果
* @throws Exception 異常
*/
private boolean statusChangeCommonOps(
StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> stateMachine,
BizOrderStatusRequest statusRequest,
BizOrderStatusChangeEventEnum eventEnum) {
log.info("order statemachine send event={}", eventEnum);
// 執(zhí)行引擎,sendEvent,result為執(zhí)行結(jié)果,通過actionListener跳轉(zhuǎn)到對應的Action
Message<BizOrderStatusChangeEventEnum> eventMsg = MessageBuilder.
withPayload(eventEnum)
.setHeader(BizOrderConstants.BIZORDER_CONTEXT_KEY, statusRequest)
// 只有在需要判斷(choice)的場景才用得到,guard實現(xiàn)中使用
.setHeader(BizOrderConstants.FINAL_STATUS_KEY, statusRequest.getBizOrderStatusModel().getTargetOrderStatus())
.build();
// 取到對應的狀態(tài)機,判斷是否可以執(zhí)行
boolean result = false;
// 狀態(tài)機的當前狀態(tài),只有在執(zhí)行結(jié)束后才會變化,也就是節(jié)點對應的action執(zhí)行完才會變更
// 所以在result=true的情況下,更新狀態(tài)機的持久化狀態(tài)才有效
if (BizOrderStateMachineUtils.acceptEvent(stateMachine, eventMsg)) {
result = stateMachine.sendEvent(eventMsg);
log.info("order statemachine send event={},result={}", eventMsg, result);
} else {
throw new BusinessException(BizOrderErrorCode.NO_ORDER_STATE_MACHINE_TRANSTION_ERR, "當前訂單無法執(zhí)行請求動作");
}
return result;
}
/**
* 從statusRequest中獲取statemachine實例
*
* @param statusRequest 狀態(tài)請求
* @param operationTypeEnum 操作類型
* @return 狀態(tài)機實例
* @throws Exception 異常
*/
private StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum>
getStateMachineFromStatusReq(BizOrderStatusRequest statusRequest,
BizOrderOperationTypeEnum operationTypeEnum) throws Exception {
log.info("Order status change request={},operationType={}", statusRequest, operationTypeEnum);
if (!StringUtils.equals(statusRequest.getBizCode(), statusRequest.getBizOrderStatusModel().getBizOrderId())) {
throw new BusinessException(BizOrderErrorCode.ORDER_COMMON_ILLEGAL_ARGUMENT, "請求數(shù)據(jù)異常");
}
// 查詢訂單,判斷請求數(shù)據(jù)是否合法
BizOrder bizOrder = bizOrderRepository.selectByBizPrimaryKey(statusRequest.getBizCode());
if (null == bizOrder
|| !StringUtils.equals(bizOrder.getBizType(), statusRequest.getBizOrderStatusModel().getBizType())
|| !StringUtils.equals(bizOrder.getOrderStatus(), statusRequest.getBizOrderStatusModel().getCurrentOrderStatus())
) {
throw new BusinessException(BizOrderErrorCode.ORDER_COMMON_ILLEGAL_ARGUMENT, "請求數(shù)據(jù)與訂單實際數(shù)據(jù)不符");
}
// 構(gòu)造狀態(tài)機模板
StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> srcStateMachine =
bizOrderStateMachineBuildFactory.createStateMachine(statusRequest.getBizOrderStatusModel().getBizType(),
statusRequest.getBizOrderStatusModel().getSubBizType());
// 從redis中獲取對應的statemachine,并判斷當前節(jié)點是否可以滿足,如果無法從redis中獲取對應的的statemachine,則取自DB
StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> stateMachine
= bizOrderRedisStateMachinePersister.restore(srcStateMachine, statusRequest.getBizCode());
// 由于DB中已持久化,基本上不太可能出現(xiàn)null的情況,目前唯一能想到會出現(xiàn)的情況就是緩存擊穿,先拋錯
if (null == stateMachine) {
throw new BusinessException(BizOrderErrorCode.NO_CORRESPONDING_STATEMACHINE_ERR, "不存在訂單對應的狀態(tài)機實例");
}
log.info("order stateMachine info is {}", srcStateMachine);
return stateMachine;
}
/**
* 獲取statemachine實例
*
* @param createRequest 狀態(tài)請求
* @param operationTypeEnum 操作類型
* @return 狀態(tài)機實例
* @throws Exception 異常
*/
private StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> getStateMachineFromCreateReq(BizOrderCreateRequest createRequest,
BizOrderOperationTypeEnum operationTypeEnum) throws Exception {
log.info("Order create request={},operationType={}", createRequest, operationTypeEnum);
// 構(gòu)造狀態(tài)機模板
StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> srcStateMachine =
bizOrderStateMachineBuildFactory.createStateMachine(createRequest.getBizOrderCreateModel().getBizType(),
createRequest.getBizOrderCreateModel().getSubBizType());
if (null == srcStateMachine) {
throw new BusinessException(BizOrderErrorCode.NO_CORRESPONDING_STATEMACHINE_ERR, "不存在訂單對應的狀態(tài)機實例");
}
// 如果是sign,表示訂單已存在,需要額外判斷并restore狀態(tài)機;如果是直接create,則不需要處理這些判斷
if (StringUtils.equalsIgnoreCase(BizOrderOperationTypeEnum.SIGN.getOperationType(),
createRequest.getOperationType())) {
if (!StringUtils.equals(createRequest.getBizCode(), createRequest.getBizOrderCreateModel().getBizOrderId())) {
throw new BusinessException(BizOrderErrorCode.ORDER_COMMON_ILLEGAL_ARGUMENT, "請求數(shù)據(jù)異常");
}
// 查詢訂單,判斷請求數(shù)據(jù)是否合法
BizOrder bizOrder = bizOrderRepository.selectByBizPrimaryKey(createRequest.getBizCode());
if (null == bizOrder
|| !StringUtils.equals(bizOrder.getBizType(), createRequest.getBizOrderCreateModel().getBizType())
) {
throw new BusinessException(BizOrderErrorCode.ORDER_COMMON_ILLEGAL_ARGUMENT, "請求數(shù)據(jù)與訂單實際數(shù)據(jù)不符");
}
// 從redis中獲取對應的statemachine,并判斷當前節(jié)點是否可以滿足,如果無法從redis中獲取對應的的statemachine,則取自DB
StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> stateMachine
= bizOrderRedisStateMachinePersister.restore(srcStateMachine, createRequest.getBizOrderCreateModel().getBizOrderId());
// 由于DB中已持久化,基本上不太可能出現(xiàn)null的情況,目前唯一能想到會出現(xiàn)的情況就是緩存擊穿,先拋錯
if (null == stateMachine) {
throw new BusinessException(BizOrderErrorCode.NO_CORRESPONDING_STATEMACHINE_ERR, "不存在訂單對應的狀態(tài)機實例");
}
return stateMachine;
}
log.info("order stateMachine info is {}", srcStateMachine);
return srcStateMachine;
}
}
這里需要關(guān)注BizOrderStateMachineUtils.acceptEvent,相當于在執(zhí)行之前判斷是否可以執(zhí)行,其實sendEvent中存在一樣判斷是否可執(zhí)行的代碼,不過這里抽取出來做了一個事前判斷,實現(xiàn)如下:
import org.springframework.messaging.Message;
import org.springframework.statemachine.StateMachine;
import org.springframework.statemachine.state.State;
import org.springframework.statemachine.support.StateMachineUtils;
import org.springframework.statemachine.transition.Transition;
import org.springframework.statemachine.trigger.DefaultTriggerContext;
import org.springframework.statemachine.trigger.Trigger;
public class BizOrderStateMachineUtils {
/**
* 判斷是否可以執(zhí)行對應的event
*
* @param stateMachine
* @param eventMsg
* @return
*/
public static boolean acceptEvent(StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> stateMachine,
Message<BizOrderStatusChangeEventEnum> eventMsg) {
State<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> cs = stateMachine.getState();
for (Transition<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> transition : stateMachine.getTransitions()) {
State<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> source = transition.getSource();
Trigger<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> trigger = transition.getTrigger();
if (cs != null && StateMachineUtils.containsAtleastOne(source.getIds(), cs.getIds())) {
if (trigger != null && trigger.evaluate(new DefaultTriggerContext<>(eventMsg.getPayload()))) {
return true;
}
}
}
return false;
}
}
其他說明:
- 關(guān)注getStateMachineFromXXX,這里其實就是調(diào)用了builderFactory創(chuàng)建了對應的實例。
- 還有個遺留問題,即每次從redis/db中restore時,都需要生成一個最初狀態(tài)的狀態(tài)機實例,然后傳入進去,轉(zhuǎn)化成當前狀態(tài),這種方式可能會造成比較大的資源消耗。
- 在sendXXXEvent方法上,都加上了@Transactional注解,這里代碼中的注釋也說的很清楚,會跟AbstractBizManager中的@Tranactional形成嵌套,所以事務傳播方式只能是默認的TransactionDefinition.PROPAGATION_REQUIRED,不能修改,否則會問題。
- 至于再外部調(diào)用,直接調(diào)用StateMachineSendEventManager即可,與狀態(tài)機無關(guān)了。
- 至此,完結(jié)。