seata源碼解析系列-TCC模式

寫在前面

seata-TCC模式是一種侵入式的分布式事務(wù)解決方案,每個(gè)分支事務(wù)都需要自己實(shí)現(xiàn)TCC的行為,支持把 “自定義” 的分支事務(wù)納入到全局事務(wù)的管理中。

跟AT模式對(duì)比,TCC可以擁有更高的自主性與性能,但需要面對(duì)實(shí)現(xiàn)一個(gè)高效正確的TCC行為的帶來(lái)的高復(fù)雜度,比如保證冪等性、保證一致性、防止懸掛等邏輯都需要由自己實(shí)現(xiàn)的TCC行為進(jìn)行控制。

在代碼實(shí)現(xiàn)中,AT模式與TCC模式大同小異,都是一樣通過TM、RM、TC三種協(xié)同處理全局事務(wù)。只不過TCC模式下分支事務(wù)的注冊(cè)、提交或回滾是調(diào)用自己實(shí)現(xiàn)的TCC行為類進(jìn)行處理。

三端協(xié)同處理圖

TCC-TM端分析

首先說明,TCC流程同樣需要打上@GolbalTransaction注解的TM范圍中完成分支注冊(cè)、提交或回滾,不在細(xì)說。

TCC-RM端分析

在TCC流程中,同樣是流經(jīng)SeataAutoConfiguration->GlobalTransactionScanner進(jìn)行處理,不同的是,TCC會(huì)額外選擇
1.打上@LocalTCC、@TwoPhaseBusinessAction注解的localTCCbean。
2.打上@TwoPhaseBusinessAction注解的遠(yuǎn)程referenceTCCbean。
進(jìn)行代理
TCC由service側(cè)注冊(cè)TCCResource,并在TC里面維護(hù)了對(duì)應(yīng)actionName的RMChannel,即service側(cè)的連接;reference側(cè)經(jīng)過代理發(fā)起branch register,注冊(cè)的resourceId是actionName,因此TC發(fā)起提交或回滾時(shí),會(huì)根據(jù)actionName拿到RMChannel調(diào)用service側(cè),最后取出對(duì)應(yīng)actionName的TCCResource進(jìn)行處理??偨Y(jié)一下就是TCC發(fā)起分支注冊(cè)方與真正的執(zhí)行方并不是一致的,這個(gè)是比較繞的點(diǎn)。
選擇時(shí)會(huì)保存解析好的TCCResource和RemotingDesc。
TCCResource的保存是TCC能夠執(zhí)行的關(guān)鍵,下面先給出保存代碼,后面會(huì)再次講到TCCResource。

  public RemotingDesc parserRemotingServiceInfo(Object bean, String beanName, RemotingParser remotingParser) {
        RemotingDesc remotingBeanDesc = remotingParser.getServiceDesc(bean, beanName);
        if (remotingBeanDesc == null) {
            return null;
        }
        remotingServiceMap.put(beanName, remotingBeanDesc);

        Class<?> interfaceClass = remotingBeanDesc.getInterfaceClass();
        Method[] methods = interfaceClass.getMethods();
        if (remotingParser.isService(bean, beanName)) {
            try {
                //service bean, registry resource
                Object targetBean = remotingBeanDesc.getTargetBean();
                for (Method m : methods) {
                    TwoPhaseBusinessAction twoPhaseBusinessAction = m.getAnnotation(TwoPhaseBusinessAction.class);
                    if (twoPhaseBusinessAction != null) {
                        TCCResource tccResource = new TCCResource();
                        tccResource.setActionName(twoPhaseBusinessAction.name());
                        tccResource.setTargetBean(targetBean);
                        tccResource.setPrepareMethod(m);
                        tccResource.setCommitMethodName(twoPhaseBusinessAction.commitMethod());
                        tccResource.setCommitMethod(ReflectionUtil
                            .getMethod(interfaceClass, twoPhaseBusinessAction.commitMethod(),
                                new Class[] {BusinessActionContext.class}));
                        tccResource.setRollbackMethodName(twoPhaseBusinessAction.rollbackMethod());
                        tccResource.setRollbackMethod(ReflectionUtil
                            .getMethod(interfaceClass, twoPhaseBusinessAction.rollbackMethod(),
                                new Class[] {BusinessActionContext.class}));
                        //registry tcc resource
                        DefaultResourceManager.get().registerResource(tccResource);
                    }
                }
            } catch (Throwable t) {
                throw new FrameworkException(t, "parser remoting service error");
            }
        }
        if (remotingParser.isReference(bean, beanName)) {
            //reference bean, TCC proxy
            remotingBeanDesc.setReference(true);
        }
        return remotingBeanDesc;
    }
 public static boolean isTccAutoProxy(Object bean, String beanName, ApplicationContext applicationContext) {
        boolean isRemotingBean = parserRemotingServiceInfo(bean, beanName);
        //get RemotingBean description
        RemotingDesc remotingDesc = DefaultRemotingParser.get().getRemotingBeanDesc(beanName);
        //is remoting bean
        if (isRemotingBean) {
            if (remotingDesc != null && remotingDesc.getProtocol() == Protocols.IN_JVM) {
                //LocalTCC
                return isTccProxyTargetBean(remotingDesc);
            } else {
                // sofa:reference / dubbo:reference, factory bean
                return false;
            }
        } else {
            if (remotingDesc == null) {
                //check FactoryBean
                if (isRemotingFactoryBean(bean, beanName, applicationContext)) {
                    remotingDesc = DefaultRemotingParser.get().getRemotingBeanDesc(beanName);
                    return isTccProxyTargetBean(remotingDesc);
                } else {
                    return false;
                }
            } else {
                return isTccProxyTargetBean(remotingDesc);
            }
        }
    }

  public static boolean isTccProxyTargetBean(RemotingDesc remotingDesc) {
        if (remotingDesc == null) {
            return false;
        }
        //check if it is TCC bean
        boolean isTccClazz = false;
        Class<?> tccInterfaceClazz = remotingDesc.getInterfaceClass();
        Method[] methods = tccInterfaceClazz.getMethods();
        TwoPhaseBusinessAction twoPhaseBusinessAction;
        for (Method method : methods) {
            twoPhaseBusinessAction = method.getAnnotation(TwoPhaseBusinessAction.class);
            if (twoPhaseBusinessAction != null) {
                isTccClazz = true;
                break;
            }
        }
        if (!isTccClazz) {
            return false;
        }
        short protocols = remotingDesc.getProtocol();
        //LocalTCC
        if (Protocols.IN_JVM == protocols) {
            //in jvm TCC bean , AOP
            return true;
        }
        // sofa:reference /  dubbo:reference, AOP
        return remotingDesc.isReference();
    }

當(dāng)需要進(jìn)行TCC代理時(shí),GlobalTransactionScanner中的wrapIfNecessary會(huì)使用TccActionInterceptor進(jìn)行代理處理,接管原對(duì)象。

 @Override
    protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
        try {
            synchronized (PROXYED_SET) {
                if (PROXYED_SET.contains(beanName)) {
                    return bean;
                }
                interceptor = null;
                //check TCC proxy
                if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
                    //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
                    interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
                    ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                        (ConfigurationChangeListener)interceptor);
                } else {
                    Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                    Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);

                    if (!existsAnnotation(new Class[]{serviceInterface})
                        && !existsAnnotation(interfacesIfJdk)) {
                        return bean;
                    }

                    if (interceptor == null) {
                        if (globalTransactionalInterceptor == null) {
                            globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                            ConfigurationCache.addConfigListener(
                                ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                                (ConfigurationChangeListener)globalTransactionalInterceptor);
                        }
                        interceptor = globalTransactionalInterceptor;
                    }
                }

                LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
                if (!AopUtils.isAopProxy(bean)) {
                    bean = super.wrapIfNecessary(bean, beanName, cacheKey);
                } else {
                    AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                    Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
                    for (Advisor avr : advisor) {
                        advised.addAdvisor(0, avr);
                    }
                }
                PROXYED_SET.add(beanName);
                return bean;
            }
        } catch (Exception exx) {
            throw new RuntimeException(exx);
        }
    }

來(lái)到TccActionInterceptor,調(diào)用原有對(duì)象會(huì)經(jīng)過TccActionInterceptor的invoke方法,判斷調(diào)用的原方法是否打上了@TwoPhaseBusinessAction注解,沒打上直接執(zhí)行invocation.proceed()原方法,否則取出xid與綁定TCC模式到上下文,取出方法與入?yún)?、原方法?zhí)行回調(diào)、@TwoPhaseBusinessAction注解參數(shù),調(diào)用ActionInterceptorHandler的proceed方法進(jìn)行處理,處理完成后需要清除上下文。

   @Override
    public Object invoke(final MethodInvocation invocation) throws Throwable {
        if (!RootContext.inGlobalTransaction() || disable || RootContext.inSagaBranch()) {
            //not in transaction
            return invocation.proceed();
        }
        Method method = getActionInterfaceMethod(invocation);
        TwoPhaseBusinessAction businessAction = method.getAnnotation(TwoPhaseBusinessAction.class);
        //try method
        if (businessAction != null) {
            //save the xid
            String xid = RootContext.getXID();
            //save the previous branchType
            BranchType previousBranchType = RootContext.getBranchType();
            //if not TCC, bind TCC branchType
            if (BranchType.TCC != previousBranchType) {
                RootContext.bindBranchType(BranchType.TCC);
            }
            try {
                Object[] methodArgs = invocation.getArguments();
                //Handler the TCC Aspect
                Map<String, Object> ret = actionInterceptorHandler.proceed(method, methodArgs, xid, businessAction,
                        invocation::proceed);
                //return the final result
                return ret.get(Constants.TCC_METHOD_RESULT);
            }
            finally {
                //if not TCC, unbind branchType
                if (BranchType.TCC != previousBranchType) {
                    RootContext.unbindBranchType();
                }
            }
        }
        return invocation.proceed();
    }

進(jìn)入actionInterceptorHandler.proceed,核心方法只有調(diào)用doTccActionLogStore獲取到batchId設(shè)置到上下文中,然后將上下文放置在入?yún)⒅校瑘?zhí)行原方法后返回。

  public Map<String, Object> proceed(Method method, Object[] arguments, String xid, TwoPhaseBusinessAction businessAction,
                                       Callback<Object> targetCallback) throws Throwable {
        Map<String, Object> ret = new HashMap<>(4);

        //TCC name
        String actionName = businessAction.name();
        BusinessActionContext actionContext = new BusinessActionContext();
        actionContext.setXid(xid);
        //set action name
        actionContext.setActionName(actionName);

        //Creating Branch Record
        String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext);
        actionContext.setBranchId(branchId);

        //set the parameter whose type is BusinessActionContext
        Class<?>[] types = method.getParameterTypes();
        int argIndex = 0;
        for (Class<?> cls : types) {
            if (cls.getName().equals(BusinessActionContext.class.getName())) {
                arguments[argIndex] = actionContext;
                break;
            }
            argIndex++;
        }
        //the final parameters of the try method
        ret.put(Constants.TCC_METHOD_ARGUMENTS, arguments);
        //the final result
        ret.put(Constants.TCC_METHOD_RESULT, targetCallback.execute());
        return ret;
    }

進(jìn)入doTccActionLogStore,會(huì)進(jìn)行以下步驟:
1.調(diào)用fetchActionRequestContext,將打上@BusinessActionContextParameter的方法入?yún)⒎诺缴舷挛闹小?br> 2.調(diào)用initBusinessContext,將@TwoPhaseBusinessAction的參數(shù)(pepare、commit、rolback、action方法名)放到上下文中。
3.調(diào)用initFrameworkContext,將本地ip放到上下文中。
4.調(diào)用branchRegister,向TC進(jìn)行分支注冊(cè)。

  protected String doTccActionLogStore(Method method, Object[] arguments, TwoPhaseBusinessAction businessAction,
                                         BusinessActionContext actionContext) {
        String actionName = actionContext.getActionName();
        String xid = actionContext.getXid();
        //
        Map<String, Object> context = fetchActionRequestContext(method, arguments);
        context.put(Constants.ACTION_START_TIME, System.currentTimeMillis());

        //init business context
        initBusinessContext(context, method, businessAction);
        //Init running environment context
        initFrameworkContext(context);
        actionContext.setActionContext(context);

        //init applicationData
        Map<String, Object> applicationContext = new HashMap<>(4);
        applicationContext.put(Constants.TCC_ACTION_CONTEXT, context);
        String applicationContextStr = JSON.toJSONString(applicationContext);
        try {
            //registry branch record
            Long branchId = DefaultResourceManager.get().branchRegister(BranchType.TCC, actionName, null, xid,
                applicationContextStr, null);
            return String.valueOf(branchId);
        } catch (Throwable t) {
            String msg = String.format("TCC branch Register error, xid: %s", xid);
            LOGGER.error(msg, t);
            throw new FrameworkException(t, msg);
        }
    }

向TC注冊(cè)分支的時(shí)候,會(huì)夾帶上面提到的TCC上下文與actionName一起到TC,同樣是以RMclient進(jìn)行交互,不再細(xì)說。

TCC與AT模式最主要的區(qū)別是分支事務(wù)的提交與回滾回調(diào)有截然不同的處理。
分支事務(wù)的提交與回滾回調(diào)會(huì)流經(jīng)TCCResourceManager處理,首先看下分支事務(wù)的提交,調(diào)用了branchCommit方法,會(huì)取出actionName對(duì)應(yīng)的TCCResource,獲得目標(biāo)bean與對(duì)應(yīng)的commit方法,將從回調(diào)帶來(lái)的上下文信息與xid、batchId重新組成businessActionContext上下文,使用反射對(duì)commit方法進(jìn)行調(diào)用,調(diào)用后會(huì)返回是否成功的狀態(tài),來(lái)決定TC是否重試分支提交。

    @Override
    public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                     String applicationData) throws TransactionException {
        TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
        if (tccResource == null) {
            throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId));
        }
        Object targetTCCBean = tccResource.getTargetBean();
        Method commitMethod = tccResource.getCommitMethod();
        if (targetTCCBean == null || commitMethod == null) {
            throw new ShouldNeverHappenException(String.format("TCC resource is not available, resourceId: %s", resourceId));
        }
        try {
            //BusinessActionContext
            BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
                applicationData);
            Object ret = commitMethod.invoke(targetTCCBean, businessActionContext);
            LOGGER.info("TCC resource commit result : {}, xid: {}, branchId: {}, resourceId: {}", ret, xid, branchId, resourceId);
            boolean result;
            if (ret != null) {
                if (ret instanceof TwoPhaseResult) {
                    result = ((TwoPhaseResult)ret).isSuccess();
                } else {
                    result = (boolean)ret;
                }
            } else {
                result = true;
            }
            return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;
        } catch (Throwable t) {
            String msg = String.format("commit TCC resource error, resourceId: %s, xid: %s.", resourceId, xid);
            LOGGER.error(msg, t);
            return BranchStatus.PhaseTwo_CommitFailed_Retryable;
        }
    }

同樣的,分支回調(diào)會(huì)走到branchRollback方法,也是根據(jù)actionName獲取TCCResource,獲得目標(biāo)bean與對(duì)應(yīng)的rollback方法,將從回調(diào)帶來(lái)的上下文信息與xid、batchId重新組成businessActionContext上下文,使用反射對(duì)rollback方法進(jìn)行調(diào)用,調(diào)用后會(huì)返回是否成功的狀態(tài),來(lái)決定TC是否重試分支回滾。

  @Override
    public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
                                       String applicationData) throws TransactionException {
        TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
        if (tccResource == null) {
            throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId));
        }
        Object targetTCCBean = tccResource.getTargetBean();
        Method rollbackMethod = tccResource.getRollbackMethod();
        if (targetTCCBean == null || rollbackMethod == null) {
            throw new ShouldNeverHappenException(String.format("TCC resource is not available, resourceId: %s", resourceId));
        }
        try {
            //BusinessActionContext
            BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
                applicationData);
            Object ret = rollbackMethod.invoke(targetTCCBean, businessActionContext);
            LOGGER.info("TCC resource rollback result : {}, xid: {}, branchId: {}, resourceId: {}", ret, xid, branchId, resourceId);
            boolean result;
            if (ret != null) {
                if (ret instanceof TwoPhaseResult) {
                    result = ((TwoPhaseResult)ret).isSuccess();
                } else {
                    result = (boolean)ret;
                }
            } else {
                result = true;
            }
            return result ? BranchStatus.PhaseTwo_Rollbacked : BranchStatus.PhaseTwo_RollbackFailed_Retryable;
        } catch (Throwable t) {
            String msg = String.format("rollback TCC resource error, resourceId: %s, xid: %s.", resourceId, xid);
            LOGGER.error(msg, t);
            return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
        }
    }

TCC-TC端分析

同樣的,RM端分支注冊(cè)會(huì)創(chuàng)建BatchSession保存在數(shù)據(jù)庫(kù),TM執(zhí)行全局事務(wù)的提交或回滾時(shí)也會(huì)進(jìn)行回調(diào)與補(bǔ)償處理,TC端的處理不再細(xì)說。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。
禁止轉(zhuǎn)載,如需轉(zhuǎn)載請(qǐng)通過簡(jiǎn)信或評(píng)論聯(lián)系作者。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容