Spring StateMachine狀態(tài)機引擎在項目中的應用(六)-外部調(diào)用&事務

說明

前面基本上涵蓋了一個項目中配置簡單狀態(tài)機的相關(guān)實現(xiàn),不過還有一個關(guān)鍵點,就是外部代碼如何調(diào)用狀態(tài)機,以及如何讓狀態(tài)機的持久化與業(yè)務邏輯代碼在同一個事務中,避免狀態(tài)機中狀態(tài)與實際訂單中不一致,造成臟數(shù)據(jù)。

調(diào)用方式

外部調(diào)用狀態(tài)機引擎,需要以下三步:

  1. 通過創(chuàng)建/讀取的方式獲取當前訂單對應的狀態(tài)機引擎實例
  2. 構(gòu)造message。
  3. 發(fā)送message。

需要注意以下幾點:

  1. 在狀態(tài)機發(fā)送完message之后,spring statemachine會通過ActionListener來監(jiān)聽,同時判斷需要走到哪個Action中
  2. 只有在sendMessage完成之后,狀態(tài)機的當前狀態(tài)才會更新為target狀態(tài)

所以對于調(diào)用狀態(tài)機,做了以下代碼封裝:

接口:
/**
 * 存在狀態(tài)機做串聯(lián)時,統(tǒng)一的事務處理,將狀態(tài)機實例持久化也囊括在統(tǒng)一的事務中
 */
public interface StateMachineSendEventManager {

    /**
     * 發(fā)送狀態(tài)機event,調(diào)用bizManagerImpl中具體實現(xiàn),同時處理狀態(tài)機持久化
     * <p>
     * 用于訂單的狀態(tài)變更
     *
     * @param request
     * @param operationTypeEnum
     * @param eventEnum
     * @return
     * @throws BusinessException
     */
    OrderBaseResponse sendStatusChangeEvent(BizOrderStatusRequest request,
                                            BizOrderOperationTypeEnum operationTypeEnum,
                                            BizOrderStatusChangeEventEnum eventEnum) throws Exception;


    /**
     * 同上,不過是用于訂單創(chuàng)建場景
     *
     * @param request
     * @param operationTypeEnum
     * @param eventEnum
     * @return
     * @throws Exception
     */
    BizOrderCreateResponse sendOrderCreateEvent(BizOrderCreateRequest request,
                                                BizOrderOperationTypeEnum operationTypeEnum,
                                                BizOrderStatusChangeEventEnum eventEnum) throws Exception;

}
對應實現(xiàn)
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.statemachine.StateMachine;
import org.springframework.statemachine.persist.StateMachinePersister;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronizationAdapter;
import org.springframework.transaction.support.TransactionSynchronizationManager;

@Slf4j
@Component("stateMachineSendEventManager")
public class StateMachineSendEventManagerImpl implements StateMachineSendEventManager {

    @Autowired
    private BizOrderRepository bizOrderRepository;

    @Autowired
    private BizOrderStateMachineBuildFactory bizOrderStateMachineBuildFactory;

    @Autowired
    @Qualifier("bizOrderRedisStateMachinePersister")
    private StateMachinePersister<BizOrderStatusEnum,BizOrderStatusChangeEventEnum,String> bizOrderRedisStateMachinePersister;

    /**
     * 發(fā)送狀態(tài)機event,調(diào)用bizManagerImpl中具體實現(xiàn),同時處理狀態(tài)機持久化
     * <p>
     * 這里會send stateMachine event,從而跳轉(zhuǎn)到對應的action --> bizManagerImpl,出現(xiàn)事務嵌套的情況
     * <p>
     * 不過事務傳播默認是TransactionDefinition.PROPAGATION_REQUIRED,所以還是同一個事務中,
     * 只是事務范圍擴大至stateMachine的持久化場景了,不要修改默認的傳播機制
     *
     * @param request
     * @return
     * @throws BusinessException
     */
    @Override
    @Transactional(value = "finOrderocTransactionManager", rollbackFor = {BusinessException.class, Exception.class})
    public OrderBaseResponse sendStatusChangeEvent(BizOrderStatusRequest request,
                                                   BizOrderOperationTypeEnum operationTypeEnum,
                                                   BizOrderStatusChangeEventEnum eventEnum) throws Exception {

        // 獲取狀態(tài)機信息
        StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> stateMachine =
                getStateMachineFromStatusReq(request, operationTypeEnum);


        boolean result = statusChangeCommonOps(stateMachine, request, eventEnum);

        OrderBaseResponse resp = new OrderBaseResponse();
        if (!result) {
            resp.setResultCode(BizOrderErrorCode.ORDER_STATE_MACHINE_EXECUTE_ERR.name());
            resp.setMsg("訂單狀態(tài)操作異常");
        }

        log.info("order status change resp is {}", resp);

        // 更新redis中數(shù)據(jù)
        // 發(fā)送event寫log的動作還是放在業(yè)務里面,這里無法囊括所有業(yè)務數(shù)據(jù)
        if (result) {
            TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
                @Override
                public void afterCommit() {
                    // 將數(shù)據(jù)持久化到redis中,以bizOrderId作為對應Key
                    try {
                        bizOrderRedisStateMachinePersister.persist(stateMachine, request.getBizCode());
                    } catch (Exception e) {
                        log.error("Persist bizOrderStateMachine error", e);
                    }
                }
            });
        }

        return resp;
    }

    /**
     * 同上,不過是用于訂單創(chuàng)建場景,請求為BizOrderCreateRequest
     *
     * @param bizOrderCreateRequest
     * @param operationTypeEnum
     * @param eventEnum
     * @return
     * @throws Exception
     */
    @Override
    @Transactional(value = "finOrderocTransactionManager", rollbackFor = {BusinessException.class, Exception.class})
    public BizOrderCreateResponse sendOrderCreateEvent(BizOrderCreateRequest bizOrderCreateRequest,
                                                       BizOrderOperationTypeEnum operationTypeEnum,
                                                       BizOrderStatusChangeEventEnum eventEnum) throws Exception {


        // 獲取對應的stateMachine
        StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> stateMachine =
                getStateMachineFromCreateReq(bizOrderCreateRequest, operationTypeEnum);

        Message<BizOrderStatusChangeEventEnum> eventMsg = MessageBuilder.withPayload(eventEnum)
                // key 與 status change 時不同,對應的model也不同
                .setHeader(BizOrderConstants.BIZORDER_CONTEXT_CREATE_KEY, bizOrderCreateRequest)
                // 根據(jù)傳遞過來的訂單狀態(tài)決定后續(xù)choice跳轉(zhuǎn)
                .setHeader(BizOrderConstants.FINAL_STATUS_KEY, bizOrderCreateRequest.getBizOrderCreateModel().getOrderStatus())
                .build();

        BizOrderCreateResponse createResponse = new BizOrderCreateResponse();

        boolean sendResult = false;

        if (BizOrderStateMachineUtils.acceptEvent(stateMachine, eventMsg)) {
            sendResult = stateMachine.sendEvent(eventMsg);
            log.info("order statemachine send event={},result={}", eventMsg, sendResult);
        } else {
            createResponse.setResultCode(BizOrderErrorCode.NO_ORDER_STATE_MACHINE_TRANSTION_ERR.name());
            createResponse.setMsg("當前訂單無法執(zhí)行請求動作");
        }

        if (sendResult) {
            createResponse.setBizOrderId(bizOrderCreateRequest.getBizOrderCreateModel().getBizOrderId());
            TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
                @Override
                public void afterCommit() {
                    // 將數(shù)據(jù)持久化到redis中,以bizOrderId作為對應Key
                    try {
                        bizOrderRedisStateMachinePersister.persist(stateMachine,
                                createResponse.getBizOrderId());
                    } catch (Exception e) {
                        throw new BusinessException(BizOrderErrorCode.ORDER_STATE_MACHINE_EXECUTE_ERR, "狀態(tài)機持久化失敗");
                    }
                }
            });
        }


        return createResponse;
    }

    /**
     * 狀態(tài)處理的通用操作抽取
     *
     * @param stateMachine  狀態(tài)機
     * @param statusRequest 狀態(tài)變更請求
     * @return 執(zhí)行結(jié)果
     * @throws Exception 異常
     */
    private boolean statusChangeCommonOps(
            StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> stateMachine,
            BizOrderStatusRequest statusRequest,
            BizOrderStatusChangeEventEnum eventEnum) {

        log.info("order statemachine send event={}", eventEnum);


        // 執(zhí)行引擎,sendEvent,result為執(zhí)行結(jié)果,通過actionListener跳轉(zhuǎn)到對應的Action
        Message<BizOrderStatusChangeEventEnum> eventMsg = MessageBuilder.
                withPayload(eventEnum)
                .setHeader(BizOrderConstants.BIZORDER_CONTEXT_KEY, statusRequest)
                // 只有在需要判斷(choice)的場景才用得到,guard實現(xiàn)中使用
                .setHeader(BizOrderConstants.FINAL_STATUS_KEY, statusRequest.getBizOrderStatusModel().getTargetOrderStatus())
                .build();

        // 取到對應的狀態(tài)機,判斷是否可以執(zhí)行
        boolean result = false;

        // 狀態(tài)機的當前狀態(tài),只有在執(zhí)行結(jié)束后才會變化,也就是節(jié)點對應的action執(zhí)行完才會變更
        // 所以在result=true的情況下,更新狀態(tài)機的持久化狀態(tài)才有效
        if (BizOrderStateMachineUtils.acceptEvent(stateMachine, eventMsg)) {
            result = stateMachine.sendEvent(eventMsg);
            log.info("order statemachine send event={},result={}", eventMsg, result);
        } else {
            throw new BusinessException(BizOrderErrorCode.NO_ORDER_STATE_MACHINE_TRANSTION_ERR, "當前訂單無法執(zhí)行請求動作");
        }
        return result;

    }

    /**
     * 從statusRequest中獲取statemachine實例
     *
     * @param statusRequest     狀態(tài)請求
     * @param operationTypeEnum 操作類型
     * @return 狀態(tài)機實例
     * @throws Exception 異常
     */
    private StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum>
    getStateMachineFromStatusReq(BizOrderStatusRequest statusRequest,
                                 BizOrderOperationTypeEnum operationTypeEnum) throws Exception {
        log.info("Order status change request={},operationType={}", statusRequest, operationTypeEnum);

        if (!StringUtils.equals(statusRequest.getBizCode(), statusRequest.getBizOrderStatusModel().getBizOrderId())) {
            throw new BusinessException(BizOrderErrorCode.ORDER_COMMON_ILLEGAL_ARGUMENT, "請求數(shù)據(jù)異常");
        }

        // 查詢訂單,判斷請求數(shù)據(jù)是否合法
        BizOrder bizOrder = bizOrderRepository.selectByBizPrimaryKey(statusRequest.getBizCode());
        if (null == bizOrder
                || !StringUtils.equals(bizOrder.getBizType(), statusRequest.getBizOrderStatusModel().getBizType())
                || !StringUtils.equals(bizOrder.getOrderStatus(), statusRequest.getBizOrderStatusModel().getCurrentOrderStatus())
                ) {
            throw new BusinessException(BizOrderErrorCode.ORDER_COMMON_ILLEGAL_ARGUMENT, "請求數(shù)據(jù)與訂單實際數(shù)據(jù)不符");
        }

        // 構(gòu)造狀態(tài)機模板
        StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> srcStateMachine =
                bizOrderStateMachineBuildFactory.createStateMachine(statusRequest.getBizOrderStatusModel().getBizType(),
                        statusRequest.getBizOrderStatusModel().getSubBizType());

        // 從redis中獲取對應的statemachine,并判斷當前節(jié)點是否可以滿足,如果無法從redis中獲取對應的的statemachine,則取自DB
        StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> stateMachine
                = bizOrderRedisStateMachinePersister.restore(srcStateMachine, statusRequest.getBizCode());

        // 由于DB中已持久化,基本上不太可能出現(xiàn)null的情況,目前唯一能想到會出現(xiàn)的情況就是緩存擊穿,先拋錯
        if (null == stateMachine) {
            throw new BusinessException(BizOrderErrorCode.NO_CORRESPONDING_STATEMACHINE_ERR, "不存在訂單對應的狀態(tài)機實例");
        }
        log.info("order stateMachine info is {}", srcStateMachine);

        return stateMachine;
    }

    /**
     * 獲取statemachine實例
     *
     * @param createRequest     狀態(tài)請求
     * @param operationTypeEnum 操作類型
     * @return 狀態(tài)機實例
     * @throws Exception 異常
     */
    private StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> getStateMachineFromCreateReq(BizOrderCreateRequest createRequest,
                                                                                                         BizOrderOperationTypeEnum operationTypeEnum) throws Exception {
        log.info("Order create request={},operationType={}", createRequest, operationTypeEnum);

        // 構(gòu)造狀態(tài)機模板
        StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> srcStateMachine =
                bizOrderStateMachineBuildFactory.createStateMachine(createRequest.getBizOrderCreateModel().getBizType(),
                        createRequest.getBizOrderCreateModel().getSubBizType());

        if (null == srcStateMachine) {
            throw new BusinessException(BizOrderErrorCode.NO_CORRESPONDING_STATEMACHINE_ERR, "不存在訂單對應的狀態(tài)機實例");
        }

        // 如果是sign,表示訂單已存在,需要額外判斷并restore狀態(tài)機;如果是直接create,則不需要處理這些判斷
        if (StringUtils.equalsIgnoreCase(BizOrderOperationTypeEnum.SIGN.getOperationType(),
                createRequest.getOperationType())) {
            if (!StringUtils.equals(createRequest.getBizCode(), createRequest.getBizOrderCreateModel().getBizOrderId())) {
                throw new BusinessException(BizOrderErrorCode.ORDER_COMMON_ILLEGAL_ARGUMENT, "請求數(shù)據(jù)異常");
            }

            // 查詢訂單,判斷請求數(shù)據(jù)是否合法
            BizOrder bizOrder = bizOrderRepository.selectByBizPrimaryKey(createRequest.getBizCode());
            if (null == bizOrder
                    || !StringUtils.equals(bizOrder.getBizType(), createRequest.getBizOrderCreateModel().getBizType())
                    ) {
                throw new BusinessException(BizOrderErrorCode.ORDER_COMMON_ILLEGAL_ARGUMENT, "請求數(shù)據(jù)與訂單實際數(shù)據(jù)不符");
            }

            // 從redis中獲取對應的statemachine,并判斷當前節(jié)點是否可以滿足,如果無法從redis中獲取對應的的statemachine,則取自DB
            StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> stateMachine
                    = bizOrderRedisStateMachinePersister.restore(srcStateMachine, createRequest.getBizOrderCreateModel().getBizOrderId());

            // 由于DB中已持久化,基本上不太可能出現(xiàn)null的情況,目前唯一能想到會出現(xiàn)的情況就是緩存擊穿,先拋錯
            if (null == stateMachine) {
                throw new BusinessException(BizOrderErrorCode.NO_CORRESPONDING_STATEMACHINE_ERR, "不存在訂單對應的狀態(tài)機實例");
            }

            return stateMachine;
        }

        log.info("order stateMachine info is {}", srcStateMachine);

        return srcStateMachine;
    }
}

這里需要關(guān)注BizOrderStateMachineUtils.acceptEvent,相當于在執(zhí)行之前判斷是否可以執(zhí)行,其實sendEvent中存在一樣判斷是否可執(zhí)行的代碼,不過這里抽取出來做了一個事前判斷,實現(xiàn)如下:

import org.springframework.messaging.Message;
import org.springframework.statemachine.StateMachine;
import org.springframework.statemachine.state.State;
import org.springframework.statemachine.support.StateMachineUtils;
import org.springframework.statemachine.transition.Transition;
import org.springframework.statemachine.trigger.DefaultTriggerContext;
import org.springframework.statemachine.trigger.Trigger;

public class BizOrderStateMachineUtils {

    /**
     * 判斷是否可以執(zhí)行對應的event
     *
     * @param stateMachine
     * @param eventMsg
     * @return
     */
    public static boolean acceptEvent(StateMachine<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> stateMachine,
                                      Message<BizOrderStatusChangeEventEnum> eventMsg) {
        State<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> cs = stateMachine.getState();

        for (Transition<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> transition : stateMachine.getTransitions()) {
            State<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> source = transition.getSource();
            Trigger<BizOrderStatusEnum, BizOrderStatusChangeEventEnum> trigger = transition.getTrigger();

            if (cs != null && StateMachineUtils.containsAtleastOne(source.getIds(), cs.getIds())) {
                if (trigger != null && trigger.evaluate(new DefaultTriggerContext<>(eventMsg.getPayload()))) {
                    return true;
                }
            }
        }
        return false;
    }

}

其他說明:

  1. 關(guān)注getStateMachineFromXXX,這里其實就是調(diào)用了builderFactory創(chuàng)建了對應的實例。
  2. 還有個遺留問題,即每次從redis/db中restore時,都需要生成一個最初狀態(tài)的狀態(tài)機實例,然后傳入進去,轉(zhuǎn)化成當前狀態(tài),這種方式可能會造成比較大的資源消耗。
  3. 在sendXXXEvent方法上,都加上了@Transactional注解,這里代碼中的注釋也說的很清楚,會跟AbstractBizManager中的@Tranactional形成嵌套,所以事務傳播方式只能是默認的TransactionDefinition.PROPAGATION_REQUIRED,不能修改,否則會問題。
  4. 至于再外部調(diào)用,直接調(diào)用StateMachineSendEventManager即可,與狀態(tài)機無關(guān)了。
  5. 至此,完結(jié)。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容