寫在前面
seata-TCC模式是一種侵入式的分布式事務(wù)解決方案,每個(gè)分支事務(wù)都需要自己實(shí)現(xiàn)TCC的行為,支持把 “自定義” 的分支事務(wù)納入到全局事務(wù)的管理中。
跟AT模式對(duì)比,TCC可以擁有更高的自主性與性能,但需要面對(duì)實(shí)現(xiàn)一個(gè)高效正確的TCC行為的帶來(lái)的高復(fù)雜度,比如保證冪等性、保證一致性、防止懸掛等邏輯都需要由自己實(shí)現(xiàn)的TCC行為進(jìn)行控制。
在代碼實(shí)現(xiàn)中,AT模式與TCC模式大同小異,都是一樣通過TM、RM、TC三種協(xié)同處理全局事務(wù)。只不過TCC模式下分支事務(wù)的注冊(cè)、提交或回滾是調(diào)用自己實(shí)現(xiàn)的TCC行為類進(jìn)行處理。

TCC-TM端分析
首先說明,TCC流程同樣需要打上@GolbalTransaction注解的TM范圍中完成分支注冊(cè)、提交或回滾,不在細(xì)說。
TCC-RM端分析
在TCC流程中,同樣是流經(jīng)SeataAutoConfiguration->GlobalTransactionScanner進(jìn)行處理,不同的是,TCC會(huì)額外選擇
1.打上@LocalTCC、@TwoPhaseBusinessAction注解的localTCCbean。
2.打上@TwoPhaseBusinessAction注解的遠(yuǎn)程referenceTCCbean。
進(jìn)行代理
TCC由service側(cè)注冊(cè)TCCResource,并在TC里面維護(hù)了對(duì)應(yīng)actionName的RMChannel,即service側(cè)的連接;reference側(cè)經(jīng)過代理發(fā)起branch register,注冊(cè)的resourceId是actionName,因此TC發(fā)起提交或回滾時(shí),會(huì)根據(jù)actionName拿到RMChannel調(diào)用service側(cè),最后取出對(duì)應(yīng)actionName的TCCResource進(jìn)行處理??偨Y(jié)一下就是TCC發(fā)起分支注冊(cè)方與真正的執(zhí)行方并不是一致的,這個(gè)是比較繞的點(diǎn)。
選擇時(shí)會(huì)保存解析好的TCCResource和RemotingDesc。
TCCResource的保存是TCC能夠執(zhí)行的關(guān)鍵,下面先給出保存代碼,后面會(huì)再次講到TCCResource。
public RemotingDesc parserRemotingServiceInfo(Object bean, String beanName, RemotingParser remotingParser) {
RemotingDesc remotingBeanDesc = remotingParser.getServiceDesc(bean, beanName);
if (remotingBeanDesc == null) {
return null;
}
remotingServiceMap.put(beanName, remotingBeanDesc);
Class<?> interfaceClass = remotingBeanDesc.getInterfaceClass();
Method[] methods = interfaceClass.getMethods();
if (remotingParser.isService(bean, beanName)) {
try {
//service bean, registry resource
Object targetBean = remotingBeanDesc.getTargetBean();
for (Method m : methods) {
TwoPhaseBusinessAction twoPhaseBusinessAction = m.getAnnotation(TwoPhaseBusinessAction.class);
if (twoPhaseBusinessAction != null) {
TCCResource tccResource = new TCCResource();
tccResource.setActionName(twoPhaseBusinessAction.name());
tccResource.setTargetBean(targetBean);
tccResource.setPrepareMethod(m);
tccResource.setCommitMethodName(twoPhaseBusinessAction.commitMethod());
tccResource.setCommitMethod(ReflectionUtil
.getMethod(interfaceClass, twoPhaseBusinessAction.commitMethod(),
new Class[] {BusinessActionContext.class}));
tccResource.setRollbackMethodName(twoPhaseBusinessAction.rollbackMethod());
tccResource.setRollbackMethod(ReflectionUtil
.getMethod(interfaceClass, twoPhaseBusinessAction.rollbackMethod(),
new Class[] {BusinessActionContext.class}));
//registry tcc resource
DefaultResourceManager.get().registerResource(tccResource);
}
}
} catch (Throwable t) {
throw new FrameworkException(t, "parser remoting service error");
}
}
if (remotingParser.isReference(bean, beanName)) {
//reference bean, TCC proxy
remotingBeanDesc.setReference(true);
}
return remotingBeanDesc;
}
public static boolean isTccAutoProxy(Object bean, String beanName, ApplicationContext applicationContext) {
boolean isRemotingBean = parserRemotingServiceInfo(bean, beanName);
//get RemotingBean description
RemotingDesc remotingDesc = DefaultRemotingParser.get().getRemotingBeanDesc(beanName);
//is remoting bean
if (isRemotingBean) {
if (remotingDesc != null && remotingDesc.getProtocol() == Protocols.IN_JVM) {
//LocalTCC
return isTccProxyTargetBean(remotingDesc);
} else {
// sofa:reference / dubbo:reference, factory bean
return false;
}
} else {
if (remotingDesc == null) {
//check FactoryBean
if (isRemotingFactoryBean(bean, beanName, applicationContext)) {
remotingDesc = DefaultRemotingParser.get().getRemotingBeanDesc(beanName);
return isTccProxyTargetBean(remotingDesc);
} else {
return false;
}
} else {
return isTccProxyTargetBean(remotingDesc);
}
}
}
public static boolean isTccProxyTargetBean(RemotingDesc remotingDesc) {
if (remotingDesc == null) {
return false;
}
//check if it is TCC bean
boolean isTccClazz = false;
Class<?> tccInterfaceClazz = remotingDesc.getInterfaceClass();
Method[] methods = tccInterfaceClazz.getMethods();
TwoPhaseBusinessAction twoPhaseBusinessAction;
for (Method method : methods) {
twoPhaseBusinessAction = method.getAnnotation(TwoPhaseBusinessAction.class);
if (twoPhaseBusinessAction != null) {
isTccClazz = true;
break;
}
}
if (!isTccClazz) {
return false;
}
short protocols = remotingDesc.getProtocol();
//LocalTCC
if (Protocols.IN_JVM == protocols) {
//in jvm TCC bean , AOP
return true;
}
// sofa:reference / dubbo:reference, AOP
return remotingDesc.isReference();
}
當(dāng)需要進(jìn)行TCC代理時(shí),GlobalTransactionScanner中的wrapIfNecessary會(huì)使用TccActionInterceptor進(jìn)行代理處理,接管原對(duì)象。
@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
try {
synchronized (PROXYED_SET) {
if (PROXYED_SET.contains(beanName)) {
return bean;
}
interceptor = null;
//check TCC proxy
if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
//TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)interceptor);
} else {
Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);
if (!existsAnnotation(new Class[]{serviceInterface})
&& !existsAnnotation(interfacesIfJdk)) {
return bean;
}
if (interceptor == null) {
if (globalTransactionalInterceptor == null) {
globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
ConfigurationCache.addConfigListener(
ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
(ConfigurationChangeListener)globalTransactionalInterceptor);
}
interceptor = globalTransactionalInterceptor;
}
}
LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
if (!AopUtils.isAopProxy(bean)) {
bean = super.wrapIfNecessary(bean, beanName, cacheKey);
} else {
AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
for (Advisor avr : advisor) {
advised.addAdvisor(0, avr);
}
}
PROXYED_SET.add(beanName);
return bean;
}
} catch (Exception exx) {
throw new RuntimeException(exx);
}
}
來(lái)到TccActionInterceptor,調(diào)用原有對(duì)象會(huì)經(jīng)過TccActionInterceptor的invoke方法,判斷調(diào)用的原方法是否打上了@TwoPhaseBusinessAction注解,沒打上直接執(zhí)行invocation.proceed()原方法,否則取出xid與綁定TCC模式到上下文,取出方法與入?yún)?、原方法?zhí)行回調(diào)、@TwoPhaseBusinessAction注解參數(shù),調(diào)用ActionInterceptorHandler的proceed方法進(jìn)行處理,處理完成后需要清除上下文。
@Override
public Object invoke(final MethodInvocation invocation) throws Throwable {
if (!RootContext.inGlobalTransaction() || disable || RootContext.inSagaBranch()) {
//not in transaction
return invocation.proceed();
}
Method method = getActionInterfaceMethod(invocation);
TwoPhaseBusinessAction businessAction = method.getAnnotation(TwoPhaseBusinessAction.class);
//try method
if (businessAction != null) {
//save the xid
String xid = RootContext.getXID();
//save the previous branchType
BranchType previousBranchType = RootContext.getBranchType();
//if not TCC, bind TCC branchType
if (BranchType.TCC != previousBranchType) {
RootContext.bindBranchType(BranchType.TCC);
}
try {
Object[] methodArgs = invocation.getArguments();
//Handler the TCC Aspect
Map<String, Object> ret = actionInterceptorHandler.proceed(method, methodArgs, xid, businessAction,
invocation::proceed);
//return the final result
return ret.get(Constants.TCC_METHOD_RESULT);
}
finally {
//if not TCC, unbind branchType
if (BranchType.TCC != previousBranchType) {
RootContext.unbindBranchType();
}
}
}
return invocation.proceed();
}
進(jìn)入actionInterceptorHandler.proceed,核心方法只有調(diào)用doTccActionLogStore獲取到batchId設(shè)置到上下文中,然后將上下文放置在入?yún)⒅校瑘?zhí)行原方法后返回。
public Map<String, Object> proceed(Method method, Object[] arguments, String xid, TwoPhaseBusinessAction businessAction,
Callback<Object> targetCallback) throws Throwable {
Map<String, Object> ret = new HashMap<>(4);
//TCC name
String actionName = businessAction.name();
BusinessActionContext actionContext = new BusinessActionContext();
actionContext.setXid(xid);
//set action name
actionContext.setActionName(actionName);
//Creating Branch Record
String branchId = doTccActionLogStore(method, arguments, businessAction, actionContext);
actionContext.setBranchId(branchId);
//set the parameter whose type is BusinessActionContext
Class<?>[] types = method.getParameterTypes();
int argIndex = 0;
for (Class<?> cls : types) {
if (cls.getName().equals(BusinessActionContext.class.getName())) {
arguments[argIndex] = actionContext;
break;
}
argIndex++;
}
//the final parameters of the try method
ret.put(Constants.TCC_METHOD_ARGUMENTS, arguments);
//the final result
ret.put(Constants.TCC_METHOD_RESULT, targetCallback.execute());
return ret;
}
進(jìn)入doTccActionLogStore,會(huì)進(jìn)行以下步驟:
1.調(diào)用fetchActionRequestContext,將打上@BusinessActionContextParameter的方法入?yún)⒎诺缴舷挛闹小?br>
2.調(diào)用initBusinessContext,將@TwoPhaseBusinessAction的參數(shù)(pepare、commit、rolback、action方法名)放到上下文中。
3.調(diào)用initFrameworkContext,將本地ip放到上下文中。
4.調(diào)用branchRegister,向TC進(jìn)行分支注冊(cè)。
protected String doTccActionLogStore(Method method, Object[] arguments, TwoPhaseBusinessAction businessAction,
BusinessActionContext actionContext) {
String actionName = actionContext.getActionName();
String xid = actionContext.getXid();
//
Map<String, Object> context = fetchActionRequestContext(method, arguments);
context.put(Constants.ACTION_START_TIME, System.currentTimeMillis());
//init business context
initBusinessContext(context, method, businessAction);
//Init running environment context
initFrameworkContext(context);
actionContext.setActionContext(context);
//init applicationData
Map<String, Object> applicationContext = new HashMap<>(4);
applicationContext.put(Constants.TCC_ACTION_CONTEXT, context);
String applicationContextStr = JSON.toJSONString(applicationContext);
try {
//registry branch record
Long branchId = DefaultResourceManager.get().branchRegister(BranchType.TCC, actionName, null, xid,
applicationContextStr, null);
return String.valueOf(branchId);
} catch (Throwable t) {
String msg = String.format("TCC branch Register error, xid: %s", xid);
LOGGER.error(msg, t);
throw new FrameworkException(t, msg);
}
}
向TC注冊(cè)分支的時(shí)候,會(huì)夾帶上面提到的TCC上下文與actionName一起到TC,同樣是以RMclient進(jìn)行交互,不再細(xì)說。
TCC與AT模式最主要的區(qū)別是分支事務(wù)的提交與回滾回調(diào)有截然不同的處理。
分支事務(wù)的提交與回滾回調(diào)會(huì)流經(jīng)TCCResourceManager處理,首先看下分支事務(wù)的提交,調(diào)用了branchCommit方法,會(huì)取出actionName對(duì)應(yīng)的TCCResource,獲得目標(biāo)bean與對(duì)應(yīng)的commit方法,將從回調(diào)帶來(lái)的上下文信息與xid、batchId重新組成businessActionContext上下文,使用反射對(duì)commit方法進(jìn)行調(diào)用,調(diào)用后會(huì)返回是否成功的狀態(tài),來(lái)決定TC是否重試分支提交。
@Override
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
if (tccResource == null) {
throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId));
}
Object targetTCCBean = tccResource.getTargetBean();
Method commitMethod = tccResource.getCommitMethod();
if (targetTCCBean == null || commitMethod == null) {
throw new ShouldNeverHappenException(String.format("TCC resource is not available, resourceId: %s", resourceId));
}
try {
//BusinessActionContext
BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
applicationData);
Object ret = commitMethod.invoke(targetTCCBean, businessActionContext);
LOGGER.info("TCC resource commit result : {}, xid: {}, branchId: {}, resourceId: {}", ret, xid, branchId, resourceId);
boolean result;
if (ret != null) {
if (ret instanceof TwoPhaseResult) {
result = ((TwoPhaseResult)ret).isSuccess();
} else {
result = (boolean)ret;
}
} else {
result = true;
}
return result ? BranchStatus.PhaseTwo_Committed : BranchStatus.PhaseTwo_CommitFailed_Retryable;
} catch (Throwable t) {
String msg = String.format("commit TCC resource error, resourceId: %s, xid: %s.", resourceId, xid);
LOGGER.error(msg, t);
return BranchStatus.PhaseTwo_CommitFailed_Retryable;
}
}
同樣的,分支回調(diào)會(huì)走到branchRollback方法,也是根據(jù)actionName獲取TCCResource,獲得目標(biāo)bean與對(duì)應(yīng)的rollback方法,將從回調(diào)帶來(lái)的上下文信息與xid、batchId重新組成businessActionContext上下文,使用反射對(duì)rollback方法進(jìn)行調(diào)用,調(diào)用后會(huì)返回是否成功的狀態(tài),來(lái)決定TC是否重試分支回滾。
@Override
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
TCCResource tccResource = (TCCResource)tccResourceCache.get(resourceId);
if (tccResource == null) {
throw new ShouldNeverHappenException(String.format("TCC resource is not exist, resourceId: %s", resourceId));
}
Object targetTCCBean = tccResource.getTargetBean();
Method rollbackMethod = tccResource.getRollbackMethod();
if (targetTCCBean == null || rollbackMethod == null) {
throw new ShouldNeverHappenException(String.format("TCC resource is not available, resourceId: %s", resourceId));
}
try {
//BusinessActionContext
BusinessActionContext businessActionContext = getBusinessActionContext(xid, branchId, resourceId,
applicationData);
Object ret = rollbackMethod.invoke(targetTCCBean, businessActionContext);
LOGGER.info("TCC resource rollback result : {}, xid: {}, branchId: {}, resourceId: {}", ret, xid, branchId, resourceId);
boolean result;
if (ret != null) {
if (ret instanceof TwoPhaseResult) {
result = ((TwoPhaseResult)ret).isSuccess();
} else {
result = (boolean)ret;
}
} else {
result = true;
}
return result ? BranchStatus.PhaseTwo_Rollbacked : BranchStatus.PhaseTwo_RollbackFailed_Retryable;
} catch (Throwable t) {
String msg = String.format("rollback TCC resource error, resourceId: %s, xid: %s.", resourceId, xid);
LOGGER.error(msg, t);
return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
}
}
TCC-TC端分析
同樣的,RM端分支注冊(cè)會(huì)創(chuàng)建BatchSession保存在數(shù)據(jù)庫(kù),TM執(zhí)行全局事務(wù)的提交或回滾時(shí)也會(huì)進(jìn)行回調(diào)與補(bǔ)償處理,TC端的處理不再細(xì)說。