自從分布式服務(wù)盛行江湖之后,分布式事務(wù)一直都是個熱門話題,網(wǎng)上也有很多其解決方案,如 TCC、基于可靠消息的最終一致性等等。但是大多都是偏于理論,缺乏實戰(zhàn)案例。最重要的是,這些方案都要求在應(yīng)用的業(yè)務(wù)層面把分布式事務(wù)技術(shù)約束考慮到設(shè)計中,通常每一個服務(wù)都需要設(shè)計實現(xiàn)正向和反向的冪等接口。這樣的設(shè)計約束,往往會導(dǎo)致很高的研發(fā)和維護成本。
于是乎,一個高效并且對業(yè)務(wù) 0 侵入的方案,呼之而來——seata (原名 Fescar) 阿里巴巴開源的分布式事務(wù)中間件。對于 seata 的介紹,請參考官方文檔,上面已經(jīng)介紹的很好了,而本文章要討論的就是 seata 的使用及其實現(xiàn)機制。強烈建議先看 seata 官方文檔,再看本篇文章,因為好多原理,文章中不會細聊。
seata-server
與 XA 的模型類似,seata 有 3 個組件來協(xié)調(diào)分布式事務(wù)的處理過程。

- Tansaction Coordinator (TC): 事務(wù)協(xié)調(diào)器,維護全局事務(wù)的運行狀態(tài),負責(zé)協(xié)調(diào)并驅(qū)動全局事務(wù)的提交或回滾。
- Transaction Manager (TM): 控制全局事務(wù)的邊界,負責(zé)開啟一個全局事務(wù),并最終發(fā)起全局提交或全局回滾的決議。
- Resource Manager (RM): 控制分支事務(wù),負責(zé)分支注冊、狀態(tài)匯報,并接收事務(wù)協(xié)調(diào)器的指令,驅(qū)動分支(本地)事務(wù)的提交和回滾。
所以,必須先部署 TC,來協(xié)調(diào)整個事務(wù)。我們可以直接下載 seata-server 源碼:https://github.com/seata/seata,然后打開server模塊,修改其registry.conf文件,將 registry 下的 type 改為zk,默認為file,并且修改 zk 的 serverAddr為你自己的 zk 地址。
type 為注冊中心類型,它支持 file、nacos(也是阿里開源的服務(wù)注冊中心及配置中心)、zk、eureka 等,由于我們 dubbo 是用 zk 作為注冊中心的,所以這里為zk。
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "zk"
nacos {
serverAddr = "localhost"
namespace = "public"
cluster = "default"
}
eureka {
serviceUrl = "http://localhost:1001/eureka"
application = "default"
weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = "0"
}
zk {
cluster = "default"
serverAddr = "zk.tbj.com:2181"
session.timeout = 6000
connect.timeout = 2000
}
然后啟動 Server 類:
2019-06-12 20:31:19.255 INFO [main]org.I0Itec.zkclient.ZkClient.waitForKeeperState:936 -Waiting for keeper state SyncConnected
2019-06-12 20:31:19.266 INFO [main-SendThread(zk.tbj.com:2181)]org.apache.zookeeper.ClientCnxn.logStartConnect:1025 -Opening socket connection to server zk.tbj.com/192.168.85.2:2181. Will not attempt to authenticate using SASL (unknown error)
2019-06-12 20:31:19.281 INFO [main-SendThread(zk.tbj.com:2181)]org.apache.zookeeper.ClientCnxn.primeConnection:879 -Socket connection established to zk.tbj.com/192.168.85.2:2181, initiating session
2019-06-12 20:31:19.296 INFO [main-SendThread(zk.tbj.com:2181)]org.apache.zookeeper.ClientCnxn.onConnected:1299 -Session establishment complete on server zk.tbj.com/192.168.85.2:2181, sessionid = 0x16ad4597ef21702, negotiated timeout = 6000
2019-06-12 20:31:19.299 INFO [main-EventThread]org.I0Itec.zkclient.ZkClient.processStateChanged:713 -zookeeper state changed (SyncConnected)
如果看到以上日志,說明 seata-server 即 TC 啟動完畢。其實 seata-server 就是一個 Netty 服務(wù)器,TM、RM 與 seata-server(TC) 之間使用 Netty 進行通信的。
seata 與 springboot 集成
在講實現(xiàn)機制之前,我們先通過一個 demo 來了解下 seata 是怎么用的。我們還是緊跟時代步伐,與當(dāng)下最流行的 Spring Boot 框架集成,Rpc 框架我們采用 dubbo。其實,seata 官網(wǎng)提供了多種集成方式,如 Spring,Spring Cloud 等等。具體看 seata-samples。
我們直接看springboot模塊,同樣,我需要改其registry.conf文件,將 registry 下的 type 改為zk,默認為file,并且修改 zk 的 serverAddr為你自己的 zk 地址。 另外,需要修改application.yml文件中的數(shù)據(jù)庫連接參數(shù),和 dubbo 的參數(shù),改成你自己的參數(shù),如:
server:
port: 9999
servlet:
context-path: /demo
spring:
application:
name: seata-springboot-app
http:
encoding:
charset: UTF-8
enabled: true
force: true
datasource:
driverClassName: com.mysql.jdbc.Driver
url: jdbc:mysql://localhost:3306/seata?useSSL=false&serverTimezone=UTC
username: root
password: 123456
poolPingConnectionsNotUsedFor: 60000
removeAbandoned: true
connectionProperties: druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
minIdle: 1
validationQuery: SELECT 1 FROM DUAL
initialSize: 5
maxWait: 60000
poolPreparedStatements: false
filters: stat,wall
testOnBorrow: false
testWhileIdle: true
minEvictableIdleTimeMillis: 300000
timeBetweenEvictionRunsMillis: 60000
testOnReturn: false
maxActive: 50
druid:
user: admin
password: admin
jpa:
hibernate:
ddl-auto: none
show-sql: true
dubbo:
server: true
registry: zookeeper://zk.tbj.com:2181
provider:
port: 20999
我們看其數(shù)據(jù)庫配置類DruidConfiguration:
/**
* The type Druid configuration.
*/
@Configuration
public class DruidConfiguration {
@Value("${spring.datasource.druid.user}")
private String druidUser;
@Value("${spring.datasource.druid.password}")
private String druidPassword;
/**
* Druid data source druid data source.
*
* @return the druid data source
*/
@Bean(destroyMethod = "close", initMethod = "init")
@ConfigurationProperties(prefix = "spring.datasource")
public DruidDataSource druidDataSource() {
DruidDataSource druidDataSource = new DruidDataSource();
return druidDataSource;
}
/**
* Data source data source.
*
* @param druidDataSource the druid data source
* @return the data source
*/
@Primary
@Bean("dataSource")
public DataSource dataSource(DruidDataSource druidDataSource) {
DataSourceProxy dataSourceProxy = new DataSourceProxy(druidDataSource);
return dataSourceProxy;
}
/**
* 注冊一個StatViewServlet
*
* @return servlet registration bean
*/
@Bean
public ServletRegistrationBean<StatViewServlet> druidStatViewServlet() {
ServletRegistrationBean<StatViewServlet> servletRegistrationBean = new ServletRegistrationBean<StatViewServlet>(
new StatViewServlet(), "/druid/*");
servletRegistrationBean.addInitParameter("loginUsername", druidUser);
servletRegistrationBean.addInitParameter("loginPassword", druidPassword);
servletRegistrationBean.addInitParameter("resetEnable", "false");
return servletRegistrationBean;
}
/**
* 注冊一個:filterRegistrationBean
*
* @return filter registration bean
*/
@Bean
public FilterRegistrationBean<WebStatFilter> druidStatFilter() {
FilterRegistrationBean<WebStatFilter> filterRegistrationBean = new FilterRegistrationBean<WebStatFilter>(
new WebStatFilter());
// 添加過濾規(guī)則.
filterRegistrationBean.addUrlPatterns("/*");
// 添加不需要忽略的格式信息.
filterRegistrationBean.addInitParameter("exclusions", "*.js,*.gif,*.jpg,*.png,*.css,*.ico,/druid/*");
return filterRegistrationBean;
}
}
主要的是上面兩個配置,druidDataSource()和dataSource(DruidDataSource druidDataSource,后面兩個是 Druid 管理平臺監(jiān)控配置。dataSource(DruidDataSource druidDataSource是把生成的druidDataSource對象使用DataSourceProxy進行代理。
再看 seata 配置類SeataConfiguration:
/**
* The type Fescar configuration.
*/
@Configuration
public class SeataConfiguration {
@Value("${spring.application.name}")
private String applicationId;
/**
* 注冊一個StatViewServlet
*
* @return global transaction scanner
*/
@Bean
public GlobalTransactionScanner globalTransactionScanner() {
GlobalTransactionScanner globalTransactionScanner = new GlobalTransactionScanner(applicationId,
"my_test_tx_group");
return globalTransactionScanner;
}
}
這里配置了一個GlobalTransactionScanner,用來掃描@GlobalTransaction注解。
現(xiàn)在來簡單介紹下業(yè)務(wù),我們需要進行資產(chǎn)分配,會根據(jù)指定資產(chǎn) Id 修改資產(chǎn)計劃記錄的狀態(tài),然后調(diào)用assetService.increase()的 dubbo 服務(wù),對資產(chǎn)的金額進行 + 1操作。
接下來直接看關(guān)鍵代碼AssignServiceImpl:
/**
* The type Assign service.
*/
@Service
public class AssignServiceImpl implements AssignService {
private static final Logger LOGGER = LoggerFactory.getLogger(AssignServiceImpl.class);
@Autowired
private AssignRepository assignRepository;
@Reference(check = false)
private io.seata.samples.springboot.service.AssetService assetService;
@Override
@Transactional
@GlobalTransactional
public AssetAssign increaseAmount(String id) {
LOGGER.info("Assign Service Begin ... xid: " + RootContext.getXID() + "\n");
// 此處受 @Transactional 管理,故 seata 并未考慮它的事務(wù)
AssetAssign assetAssign = assignRepository.findById(id).get();
assetAssign.setStatus("2");
assignRepository.save(assetAssign);
// remote call asset service
assetService.increase();
return assetAssign;
}
}
這里同時使用了@Transactional和@GlobalTransactional兩個注解,第一個注解是開啟了對increaseAmount方法操作的本地事務(wù),后者開啟了 seata 管理的全局事務(wù)。需要注意的是,由于increaseAmount方法操作被@Transactional管理,故t_asset_assign表的更新操作,seata 不會去管理它,因為它自己會進行事務(wù)的回滾或提交。于是 seata 只需管理assetService.increase()這個遠程調(diào)用的分支事務(wù)。
遠程方法assetService.increase():
/**
* The type Asset service.
*/
@Service(interfaceClass = AssetService.class, timeout = 10000)
@Component
public class AssetServiceImpl implements AssetService {
/**
* The constant LOGGER.
*/
public static final Logger LOGGER = LoggerFactory.getLogger(AssetService.class);
/**
* The constant ASSET_ID.
*/
public static final String ASSET_ID = "DF001";
@Autowired
private AssetRepository assetRepository;
@Override
public int increase() {
LOGGER.info("Asset Service Begin ... xid: " + RootContext.getXID() + "\n");
Asset asset = assetRepository.findById(ASSET_ID).get();
asset.setAmount(asset.getAmount().add(new BigDecimal("1")));
assetRepository.save(asset);
throw new RuntimeException("test exception for seata, your transaction should be rollbacked,asset=" + asset);
}
}
這里Asset更新完之后,直接拋出RuntimeException模擬了事務(wù)回滾。
在執(zhí)行程序之前,我們先看數(shù)據(jù)庫中的數(shù)據(jù)t_asset_assign表:

t_asset表:
直接運行springboot模塊的啟動類SeataSpringbootApp,然后在瀏覽器上訪問:http://localhost:9999/demo/asset/assign,看輸出結(jié)果:
RuntimeException: test exception for seata, your transaction should be rollbacked,
asset=Asset{id='DF001', amount=2, voucherCode='e2d1c4512d554db9ae4a5f30cbc2e4b1'}
異常信息已經(jīng)顯示出來了,說明程序已經(jīng)運行到最后一行了。然后看數(shù)據(jù)庫的數(shù)據(jù)有沒有改變,t_asset_assign表:

t_asset表:
發(fā)現(xiàn)數(shù)據(jù)并沒有變化,說明分布式事務(wù)起作用了。到了這里,小伙伴們是不是很 exciting,只需要簡單的配置,seata 就讓程序像執(zhí)行本地事務(wù)一樣管理分布式事務(wù),最主要的是對業(yè)務(wù)代碼 0 侵入,這對喜歡偷懶的程序猿來說,簡直就是福音呀。
seata 實現(xiàn)機制
接下來我們就要步入本篇文章的主題了,那就是 seata 是如何實現(xiàn)分布式事務(wù)的?
我們發(fā)現(xiàn),我們就僅僅增加了seata 的@GlobalTransactional注解,就實現(xiàn)了分布式事務(wù)。其實 seata 增加了個攔截器來專門處理被@GlobalTransactional注解的方法,即GlobalTransactionalInterceptor,其分布式事務(wù)的執(zhí)行流程都在這里完成的:
/**
* The type Global transactional interceptor.
*/
public class GlobalTransactionalInterceptor implements MethodInterceptor {
private static final Logger LOGGER = LoggerFactory.getLogger(GlobalTransactionalInterceptor.class);
private static final FailureHandler DEFAULT_FAIL_HANDLER = new DefaultFailureHandlerImpl();
private final TransactionalTemplate transactionalTemplate = new TransactionalTemplate();
private final GlobalLockTemplate<Object> globalLockTemplate = new GlobalLockTemplate<>();
private final FailureHandler failureHandler;
/**
* Instantiates a new Global transactional interceptor.
*
* @param failureHandler the failure handler
*/
public GlobalTransactionalInterceptor(FailureHandler failureHandler) {
if (null == failureHandler) {
failureHandler = DEFAULT_FAIL_HANDLER;
}
this.failureHandler = failureHandler;
}
@Override
public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
Class<?> targetClass = (methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null);
Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, GlobalTransactional.class);
final GlobalLock globalLockAnnotation = getAnnotation(method, GlobalLock.class);
if (globalTransactionalAnnotation != null) {
return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
} else if (globalLockAnnotation != null) {
return handleGlobalLock(methodInvocation);
} else {
return methodInvocation.proceed();
}
}
private Object handleGlobalLock(final MethodInvocation methodInvocation) throws Exception {
return globalLockTemplate.execute(new Callable<Object>() {
@Override
public Object call() throws Exception {
try {
return methodInvocation.proceed();
} catch (Throwable e) {
if (e instanceof Exception) {
throw (Exception)e;
} else {
throw new RuntimeException(e);
}
}
}
});
}
private Object handleGlobalTransaction(final MethodInvocation methodInvocation,
final GlobalTransactional globalTrxAnno) throws Throwable {
try {
return transactionalTemplate.execute(new TransactionalExecutor() {
@Override
public Object execute() throws Throwable {
return methodInvocation.proceed();
}
@Override
public int timeout() {
return globalTrxAnno.timeoutMills();
}
@Override
public String name() {
String name = globalTrxAnno.name();
if (!StringUtils.isNullOrEmpty(name)) {
return name;
}
return formatMethod(methodInvocation.getMethod());
}
});
} catch (TransactionalExecutor.ExecutionException e) {
TransactionalExecutor.Code code = e.getCode();
switch (code) {
case RollbackDone:
throw e.getOriginalException();
case BeginFailure:
failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case CommitFailure:
failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
throw e.getCause();
case RollbackFailure:
failureHandler.onRollbackFailure(e.getTransaction(), e.getCause());
throw e.getCause();
default:
throw new ShouldNeverHappenException("Unknown TransactionalExecutor.Code: " + code);
}
}
}
private <T extends Annotation> T getAnnotation(Method method, Class<T> clazz) {
if (method == null) {
return null;
}
return method.getAnnotation(clazz);
}
private String formatMethod(Method method) {
String paramTypes = Arrays.stream(method.getParameterTypes())
.map(Class::getName)
.reduce((p1, p2) -> String.format("%s, %s", p1, p2))
.orElse("");
return method.getName() + "(" + paramTypes + ")";
}
}
主要邏輯是在TransactionalTemplate#execute方法:
public Object execute(TransactionalExecutor business) throws TransactionalExecutor.ExecutionException {
// 1. get or create a transaction
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();
try {
// 2. begin transaction
try {
triggerBeforeBegin();
tx.begin(business.timeout(), business.name());
triggerAfterBegin();
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.BeginFailure);
}
Object rs = null;
try {
// Do Your Business
rs = business.execute();
} catch (Throwable ex) {
// 3. any business exception, rollback.
try {
triggerBeforeRollback();
tx.rollback();
triggerAfterRollback();
// 3.1 Successfully rolled back
throw new TransactionalExecutor.ExecutionException(tx, TransactionalExecutor.Code.RollbackDone, ex);
} catch (TransactionException txe) {
// 3.2 Failed to rollback
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.RollbackFailure, ex);
}
}
// 4. everything is fine, commit.
try {
triggerBeforeCommit();
tx.commit();
triggerAfterCommit();
} catch (TransactionException txe) {
// 4.1 Failed to commit
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.CommitFailure);
}
return rs;
} finally {
//5. clear
triggerAfterCompletion();
cleanUp();
}
}
大致的執(zhí)行流程為:
- 獲取全局事務(wù)信息:先從
ThreadLocal中獲取,如果沒有則創(chuàng)建一個DefaultGlobalTransaction。 - 開啟全局事務(wù)
tx.begin(business.timeout(), business.name()):通過DefaultTransactionManager的 begin 方法開啟全局事務(wù)。DefaultTransactionManager負責(zé) TM 與 TC 通訊,發(fā)送begin、commit、rollback指令。TC 接收到 TM 發(fā)過來的 begin 指令后,會返回一個全局唯一的 XID 給 TM。 - 執(zhí)行業(yè)務(wù)代碼
business.execute():在每個本地事務(wù)中,會生成分支事務(wù)標識 BranchId, 然后根據(jù)業(yè)務(wù) SQL 執(zhí)行前后的鏡像,生成 undoLog,并隨著業(yè)務(wù) SQL 一起提交。 - 全局事務(wù)回滾
tx.rollback():當(dāng)業(yè)務(wù)代碼執(zhí)行過程中拋出任何異常,都會進行全局事務(wù)的回滾操作。根據(jù) XID 和 BranchId 查找 undoLog,然后反向生成業(yè)務(wù) SQL,接著執(zhí)行該 SQL,并且刪除 undoLog 記錄。 - 全局事務(wù)提交
tx.commit():當(dāng)業(yè)務(wù)代碼執(zhí)行正常時,則會提交全局事務(wù)。分支事務(wù)此時已經(jīng)完成提交,只需要刪除 undoLog 即可。
文章最后,我們對 TC、TM、RM之間交互流程,在官方圖片的基礎(chǔ)上,做一些總結(jié):

至此,TransactionalTemplate#execute大致執(zhí)行流程已經(jīng)講完,由于篇幅問題,我沒有深入探討,后面我會專門出一篇文章來細講 TM 的工作流程,盡請期待。