seata筆記—處理分布式事務(wù)

1.分布式事務(wù)的問題

在微服務(wù)的架構(gòu)下,隨著業(yè)務(wù)服務(wù)的拆分和數(shù)據(jù)庫的拆分,會存在多個業(yè)務(wù)對應(yīng)多個數(shù)據(jù)庫的情況,如下圖所示,訂單和庫存分別拆分成兩個獨立的數(shù)據(jù)庫,當(dāng)客戶端發(fā)送一個下單操作時,需要在訂單服務(wù)的數(shù)據(jù)庫中創(chuàng)建訂單,同時庫存服務(wù)完成商品庫存的扣減。由于每個數(shù)據(jù)庫的事務(wù)執(zhí)行情況只有自己知道,比如訂單數(shù)據(jù)庫并不知道庫存數(shù)據(jù)庫的執(zhí)行情況,就會導(dǎo)致訂單數(shù)據(jù)庫和庫存數(shù)據(jù)庫數(shù)據(jù)不一致的問題。


image.png

2.seata

Seata一款開源的分布式事務(wù)解決方案,致力于在微服務(wù)架構(gòu)下提高性能和簡單易用的分布式事務(wù)服務(wù)。

state術(shù)語

TC:事務(wù)協(xié)調(diào)者
維護(hù)全局和分支事務(wù)的狀態(tài),驅(qū)動全局事務(wù)提交或回滾。
TM:事務(wù)管理者
定義全局事務(wù)的范圍:開始全局范圍,提交或回滾全局事務(wù)
RM:資源管理器
管理分支事務(wù)處理的資源,與TC交談以注冊分支事務(wù)和報告分支事務(wù)的狀態(tài),并驅(qū)動分支事務(wù)提交或回滾。


image.png

具體執(zhí)行流程:

  • TM向TC注冊全局事務(wù),并生成全局唯一的XID
  • RM向TC注冊分支事務(wù),并將其納入該XID對應(yīng)的全局事務(wù)范圍
  • RM向TC匯報資源的準(zhǔn)備狀態(tài)
  • TC匯總所有事務(wù)參與者的執(zhí)行狀態(tài),決定該分布式事務(wù)是否全部回滾或提交
  • TC通知所有RM提交或回滾事務(wù)。

下面結(jié)合例子來解釋seata的操作過程,具體了解分布式事務(wù)的操作過程

1.seata-server的安裝

官網(wǎng)下載解壓


image.png

2.修改conf下的file.conf文件,修改里面的service和store,并添加數(shù)據(jù)庫seata

service {
  #vgroup->rgroup
  vgroup_mapping.my_test_tx_group = "fsp_tx_group"http://起一個名稱
  #only support single node
  default.grouplist = "127.0.0.1:8091"
  #degrade current not support
  enableDegrade = false
  #disable
  disable = false
  #unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
  max.commit.retry.timeout = "-1"
  max.rollback.retry.timeout = "-1"
}
//----------------------------------------------------//

store {
  ## store mode: file、db
  mode = "db" //改為數(shù)據(jù)庫存儲

  ## file store
  file {
    dir = "sessionStore"

    # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
    max-branch-session-size = 16384
    # globe session size , if exceeded throws exceptions
    max-global-session-size = 512
    # file buffer size , if exceeded allocate new buffer
    file-write-buffer-cache-size = 16384
    # when recover batch read size
    session.reload.read_size = 100
    # async, sync
    flush-disk-mode = async
  }

  ## database store
  db {
    ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
    datasource = "dbcp"
    ## mysql/oracle/h2/oceanbase etc.
   ##添加你的數(shù)據(jù)庫的相關(guān)配置
    db-type = "mysql"
    driver-class-name = "com.mysql.cj.jdbc.Driver"
    url = "jdbc:mysql://127.0.0.1:3306/seata?characterEncoding=utf8&connectTimeout=1000&socketTimeout=3000&autoReconnect=true&useUnicode=true&useSSL=false&serverTimezone=UTC"
    user = "root"
    password = "123"
    min-conn = 1
    max-conn = 3
    global.table = "global_table"
    branch.table = "branch_table"
    lock-table = "lock_table"
    query-limit = 100
  }
}
image.png

3.修改register.conf下的配置文件

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "nacos" //指明注冊中心是nacos

  nacos {
    serverAddr = "localhost:8848" //修改服務(wù)地址
    namespace = ""
    cluster = "default"
  }

4.啟動nacos和seata

注冊成功


image.png

image.png

5.開啟測試

首先創(chuàng)建3個微服務(wù),一個訂單服務(wù),一個庫存服務(wù),一個賬戶服務(wù)
當(dāng)用戶下單時,訂單服務(wù)中生成一個訂單,然后通過遠(yuǎn)程調(diào)用庫存服務(wù)扣減庫存,再通過遠(yuǎn)程調(diào)用扣減余額,最后在訂單服務(wù)中修改訂單的狀態(tài)為已完成。

1.創(chuàng)建數(shù)據(jù)庫和模塊

image.png

image.png

image.png
image.png

模塊的創(chuàng)建


image.png

導(dǎo)入依賴

      <!-- seata-->
        <dependency>
            <groupId>com.alibaba.cloud</groupId>
            <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
            <!-- 因為兼容版本問題,所以需要剔除它自己的seata的包 -->
            <exclusions>
                <exclusion>
                    <groupId>io.seata</groupId>
                    <artifactId>seata-all</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!--        使用自己引入的seata版本-->
        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-all</artifactId>
            <version>1.1.0</version>
        </dependency>

這里主要使用order模塊對storage和account模塊進(jìn)行操作

storage層業(yè)務(wù)代碼

@RestController
public class StorageController {
    @Autowired
    private StorageService storageService;

    @RequestMapping("/storage/decrease")
    public CommonResult decrease(@RequestParam("productId")Long productId, @RequestParam("count")Integer count){
        storageService.decrease(productId,count);
        return new CommonResult(200,"扣減庫存成功!");
    }
}

service層

@Service
@Slf4j
public class StorageServiceImpl implements StorageService {
    @Resource
    private StorageDao storageDao;
    @Override
    public void decrease(Long productId, Integer count) {
        log.info("------->扣減庫存開始");
        storageDao.decrease(productId,count);
        log.info("------->扣減庫存完成");
    }
}

account層業(yè)務(wù)代碼類似,并且需要配置file.conf和register.conf,因為seata默認(rèn)不支持yml配置方式,可以使用文件的方式進(jìn)行配置。

file.conf文件下的修改

service {
  #vgroup->rgroup
  vgroup_mapping.fsp_tx_group = "default" //*******這里注意修改
  #only support single node
  default.grouplist = "127.0.0.1:8091"
  #degrade current not support
  enableDegrade = false
  #disable
  disable = false
  #unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
  max.commit.retry.timeout = "-1"
  max.rollback.retry.timeout = "-1"
}

register.conf文件不變

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "nacos"

  nacos {
    serverAddr = "localhost:8848"
    namespace = ""
    cluster = "default"
  }

我們知道,對于事務(wù)的處理,最重要的是要拿到數(shù)據(jù)源,因為通過數(shù)據(jù)源我們可以控制事務(wù)什么時候回滾或提交,所以數(shù)據(jù)源我們需要讓seata來代理,在我們的啟動注解上排除自動加載的數(shù)據(jù)源@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})

@Configuration
public class DataSourceProxyConfig {
    @Value("${mybatis.mapper-locations}")
    private String mapperLocations;

    @Bean
    @ConfigurationProperties(prefix = "spring.datasource")
    public DataSource druidDataSource() {
        return new DruidDataSource();
    }

    @Bean
    public DataSourceProxy dataSourceProxy(DataSource druidDataSource) {
        return new DataSourceProxy(druidDataSource);
    }

    @Bean
    public SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {
        SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
        bean.setDataSource(dataSourceProxy);
        ResourcePatternResolver resolver = new PathMatchingResourcePatternResolver();
        bean.setMapperLocations(resolver.getResources(mapperLocations));
        bean.setTransactionFactory(new SpringManagedTransactionFactory());
        return bean.getObject();
    }

}

下面主要看order層的業(yè)務(wù)代碼。

service層的業(yè)務(wù)結(jié)構(gòu)



AccountService和StorageService的業(yè)務(wù)代碼

@FeignClient(value = "seata-account-service")
public interface AccountService {
    @PostMapping("/account/decrease")
    CommonResult decrease(@RequestParam("userId")Long userId, @RequestParam("money")BigDecimal money);
}
@FeignClient(value = "seata-storage-service")
public interface StorageService {
    @PostMapping("/storage/decrease")
    CommonResult decrease(@RequestParam("productId")Long productId,@RequestParam("count")Integer count);
}

orderService的實現(xiàn)類

@Service
@Slf4j
public class OrderServiceImpl implements OrderService {
    @Resource
    private OrderDao orderDao;
    @Resource
    private StorageService storageService;
    @Resource
    private AccountService accountService;

    @Override
    @GlobalTransactional(name="fsp-create-order",rollbackFor = Exception.class) //這里名字不唯一,處理異常回滾
    public void create(Order order) {
        log.info("------>開始新建訂單");
        orderDao.create(order);
        log.info("------>訂單微服務(wù)開始調(diào)用庫存");
        storageService.decrease(order.getProductId(),order.getCount());
        log.info("------->訂單微服務(wù)開始調(diào)用庫存,做扣減end");
        accountService.decrease(order.getUserId(),order.getMoney());
        //修改訂單的狀態(tài)
        log.info("------->修改訂單");
        orderDao.update(order.getUserId(),0);
        log.info("------->訂單完成");
    }
}

最后執(zhí)行操作,完成。

總結(jié)

TC:事務(wù)協(xié)調(diào)者
維護(hù)全局和分支事務(wù)的狀態(tài),驅(qū)動全局事務(wù)提交或回滾。
TM:事務(wù)管理者
定義全局事務(wù)的范圍:開始全局范圍,提交或回滾全局事務(wù)
RM:資源管理器
管理分支事務(wù)處理的資源,與TC交談以注冊分支事務(wù)和報告分支事務(wù)的狀態(tài),并驅(qū)動分支事務(wù)提交或回滾。

這里TC就等于seata服務(wù)器,TM就是添加@GlobalTransactional注解的事務(wù)發(fā)起方,RM就是每一個數(shù)據(jù)庫。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容