1.分布式事務(wù)的問題
在微服務(wù)的架構(gòu)下,隨著業(yè)務(wù)服務(wù)的拆分和數(shù)據(jù)庫的拆分,會存在多個業(yè)務(wù)對應(yīng)多個數(shù)據(jù)庫的情況,如下圖所示,訂單和庫存分別拆分成兩個獨立的數(shù)據(jù)庫,當(dāng)客戶端發(fā)送一個下單操作時,需要在訂單服務(wù)的數(shù)據(jù)庫中創(chuàng)建訂單,同時庫存服務(wù)完成商品庫存的扣減。由于每個數(shù)據(jù)庫的事務(wù)執(zhí)行情況只有自己知道,比如訂單數(shù)據(jù)庫并不知道庫存數(shù)據(jù)庫的執(zhí)行情況,就會導(dǎo)致訂單數(shù)據(jù)庫和庫存數(shù)據(jù)庫數(shù)據(jù)不一致的問題。

2.seata
Seata一款開源的分布式事務(wù)解決方案,致力于在微服務(wù)架構(gòu)下提高性能和簡單易用的分布式事務(wù)服務(wù)。
state術(shù)語
TC:事務(wù)協(xié)調(diào)者
維護(hù)全局和分支事務(wù)的狀態(tài),驅(qū)動全局事務(wù)提交或回滾。
TM:事務(wù)管理者
定義全局事務(wù)的范圍:開始全局范圍,提交或回滾全局事務(wù)
RM:資源管理器
管理分支事務(wù)處理的資源,與TC交談以注冊分支事務(wù)和報告分支事務(wù)的狀態(tài),并驅(qū)動分支事務(wù)提交或回滾。

具體執(zhí)行流程:
- TM向TC注冊全局事務(wù),并生成全局唯一的XID
- RM向TC注冊分支事務(wù),并將其納入該XID對應(yīng)的全局事務(wù)范圍
- RM向TC匯報資源的準(zhǔn)備狀態(tài)
- TC匯總所有事務(wù)參與者的執(zhí)行狀態(tài),決定該分布式事務(wù)是否全部回滾或提交
- TC通知所有RM提交或回滾事務(wù)。
下面結(jié)合例子來解釋seata的操作過程,具體了解分布式事務(wù)的操作過程
1.seata-server的安裝
官網(wǎng)下載解壓

2.修改conf下的file.conf文件,修改里面的service和store,并添加數(shù)據(jù)庫seata
service {
#vgroup->rgroup
vgroup_mapping.my_test_tx_group = "fsp_tx_group"http://起一個名稱
#only support single node
default.grouplist = "127.0.0.1:8091"
#degrade current not support
enableDegrade = false
#disable
disable = false
#unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
max.commit.retry.timeout = "-1"
max.rollback.retry.timeout = "-1"
}
//----------------------------------------------------//
store {
## store mode: file、db
mode = "db" //改為數(shù)據(jù)庫存儲
## file store
file {
dir = "sessionStore"
# branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
max-branch-session-size = 16384
# globe session size , if exceeded throws exceptions
max-global-session-size = 512
# file buffer size , if exceeded allocate new buffer
file-write-buffer-cache-size = 16384
# when recover batch read size
session.reload.read_size = 100
# async, sync
flush-disk-mode = async
}
## database store
db {
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
datasource = "dbcp"
## mysql/oracle/h2/oceanbase etc.
##添加你的數(shù)據(jù)庫的相關(guān)配置
db-type = "mysql"
driver-class-name = "com.mysql.cj.jdbc.Driver"
url = "jdbc:mysql://127.0.0.1:3306/seata?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC"
user = "root"
password = "123"
min-conn = 1
max-conn = 3
global.table = "global_table"
branch.table = "branch_table"
lock-table = "lock_table"
query-limit = 100
}
}

3.修改register.conf下的配置文件
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos" //指明注冊中心是nacos
nacos {
serverAddr = "localhost:8848" //修改服務(wù)地址
namespace = ""
cluster = "default"
}
4.啟動nacos和seata
注冊成功


5.開啟測試
首先創(chuàng)建3個微服務(wù),一個訂單服務(wù),一個庫存服務(wù),一個賬戶服務(wù)
當(dāng)用戶下單時,訂單服務(wù)中生成一個訂單,然后通過遠(yuǎn)程調(diào)用庫存服務(wù)扣減庫存,再通過遠(yuǎn)程調(diào)用扣減余額,最后在訂單服務(wù)中修改訂單的狀態(tài)為已完成。
1.創(chuàng)建數(shù)據(jù)庫和模塊




模塊的創(chuàng)建

導(dǎo)入依賴
<!-- seata-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<!-- 因為兼容版本問題,所以需要剔除它自己的seata的包 -->
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- 使用自己引入的seata版本-->
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>1.1.0</version>
</dependency>
這里主要使用order模塊對storage和account模塊進(jìn)行操作
storage層業(yè)務(wù)代碼
@RestController
public class StorageController {
@Autowired
private StorageService storageService;
@RequestMapping("/storage/decrease")
public CommonResult decrease(@RequestParam("productId")Long productId, @RequestParam("count")Integer count){
storageService.decrease(productId,count);
return new CommonResult(200,"扣減庫存成功!");
}
}
service層
@Service
@Slf4j
public class StorageServiceImpl implements StorageService {
@Resource
private StorageDao storageDao;
@Override
public void decrease(Long productId, Integer count) {
log.info("------->扣減庫存開始");
storageDao.decrease(productId,count);
log.info("------->扣減庫存完成");
}
}
account層業(yè)務(wù)代碼類似,并且需要配置file.conf和register.conf,因為seata默認(rèn)不支持yml配置方式,可以使用文件的方式進(jìn)行配置。
file.conf文件下的修改
service {
#vgroup->rgroup
vgroup_mapping.fsp_tx_group = "default" //*******這里注意修改
#only support single node
default.grouplist = "127.0.0.1:8091"
#degrade current not support
enableDegrade = false
#disable
disable = false
#unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
max.commit.retry.timeout = "-1"
max.rollback.retry.timeout = "-1"
}
register.conf文件不變
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"
nacos {
serverAddr = "localhost:8848"
namespace = ""
cluster = "default"
}
我們知道,對于事務(wù)的處理,最重要的是要拿到數(shù)據(jù)源,因為通過數(shù)據(jù)源我們可以控制事務(wù)什么時候回滾或提交,所以數(shù)據(jù)源我們需要讓seata來代理,在我們的啟動注解上排除自動加載的數(shù)據(jù)源@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
@Configuration
public class DataSourceProxyConfig {
@Value("${mybatis.mapper-locations}")
private String mapperLocations;
@Bean
@ConfigurationProperties(prefix = "spring.datasource")
public DataSource druidDataSource() {
return new DruidDataSource();
}
@Bean
public DataSourceProxy dataSourceProxy(DataSource druidDataSource) {
return new DataSourceProxy(druidDataSource);
}
@Bean
public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(dataSourceProxy);
ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
bean.setMapperLocations(resolver.getResources(mapperLocations));
bean.setTransactionFactory(new SpringManagedTransactionFactory());
return bean.getObject();
}
}
下面主要看order層的業(yè)務(wù)代碼。
service層的業(yè)務(wù)結(jié)構(gòu)

AccountService和StorageService的業(yè)務(wù)代碼
@FeignClient(value = "seata-account-service")
public interface AccountService {
@PostMapping("/account/decrease")
CommonResult decrease(@RequestParam("userId")Long userId, @RequestParam("money")BigDecimal money);
}
@FeignClient(value = "seata-storage-service")
public interface StorageService {
@PostMapping("/storage/decrease")
CommonResult decrease(@RequestParam("productId")Long productId,@RequestParam("count")Integer count);
}
orderService的實現(xiàn)類
@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
@Resource
private OrderDao orderDao;
@Resource
private StorageService storageService;
@Resource
private AccountService accountService;
@Override
@GlobalTransactional(name="fsp-create-order",rollbackFor = Exception.class) //這里名字不唯一,處理異常回滾
public void create(Order order) {
log.info("------>開始新建訂單");
orderDao.create(order);
log.info("------>訂單微服務(wù)開始調(diào)用庫存");
storageService.decrease(order.getProductId(),order.getCount());
log.info("------->訂單微服務(wù)開始調(diào)用庫存,做扣減end");
accountService.decrease(order.getUserId(),order.getMoney());
//修改訂單的狀態(tài)
log.info("------->修改訂單");
orderDao.update(order.getUserId(),0);
log.info("------->訂單完成");
}
}
最后執(zhí)行操作,完成。
總結(jié)
TC:事務(wù)協(xié)調(diào)者
維護(hù)全局和分支事務(wù)的狀態(tài),驅(qū)動全局事務(wù)提交或回滾。
TM:事務(wù)管理者
定義全局事務(wù)的范圍:開始全局范圍,提交或回滾全局事務(wù)
RM:資源管理器
管理分支事務(wù)處理的資源,與TC交談以注冊分支事務(wù)和報告分支事務(wù)的狀態(tài),并驅(qū)動分支事務(wù)提交或回滾。
這里TC就等于seata服務(wù)器,TM就是添加@GlobalTransactional注解的事務(wù)發(fā)起方,RM就是每一個數(shù)據(jù)庫。