分布式事務(wù)兩階段提交——Eureka+Seata方案

分布式事務(wù)兩階段提交——Nacos+Seata方案

前言

在微服務(wù)的大環(huán)境下,服務(wù)按照業(yè)務(wù)維度拆分之后會(huì)遇到事務(wù)不一致問題,Seata的開源填補(bǔ)了兩階段提交這種模式,并且無業(yè)務(wù)代碼的侵入,這里采用eureka集群整合Seata。

一、Eureka集群搭建

1、修改hosts文件映射

127.0.0.1           eureka-server1.com
127.0.0.1           eureka-server2.com

2、創(chuàng)建eureka-server工程,引入Maven依賴

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.10.RELEASE</version>
    <relativePath/>
</parent>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
    </dependency>
</dependencies>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Hoxton.SR8</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>
    </plugins>
</build>

3、application.properties文件

  • 1)application.properties


spring.profiles.active=eureka-server1
  • 2)application-eureka-server1.properties
# Eureka Server服務(wù)端口
server.port=9090

# 取消服務(wù)器自我注冊(cè),就是Eureka Server也可以被更高層的服務(wù)器來管理
eureka.client.register-with-eureka=false
# 注冊(cè)中心的服務(wù)器,沒有必要再去檢索服務(wù)
eureka.client.fetch-registry=false

# 單機(jī) hostname: localhost #eureka注冊(cè)中心實(shí)例名稱
eureka.instance.hostname=eureka-server1.com

# Eureka Server 服務(wù)URL,用于客戶端注冊(cè)
#設(shè)置與Eureka注冊(cè)中心交互的地址,查詢服務(wù)和注冊(cè)服務(wù)用到
#集群
eureka.client.service-url.defaultZone=http://eureka-server2.com:9091/eureka/

#單機(jī)
#eureka.client.serverUrl.defaultZone=http://localhost:${server.port}/eureka/
  • 3)application-eureka-server2.properties
# Eureka Server服務(wù)端口
server.port=9091


# 取消服務(wù)器自我注冊(cè),就是Eureka Server也可以被更高層的服務(wù)器來管理
eureka.client.register-with-eureka=false
# 注冊(cè)中心的服務(wù)器,沒有必要再去檢索服務(wù)
eureka.client.fetch-registry=false

# 單機(jī) hostname: localhost #eureka注冊(cè)中心實(shí)例名稱
eureka.instance.hostname=eureka-server2.com

# Eureka Server 服務(wù)URL,用于客戶端注冊(cè)
#設(shè)置與Eureka注冊(cè)中心交互的地址,查詢服務(wù)和注冊(cè)服務(wù)用到
#集群
eureka.client.service-url.defaultZone=http://eureka-server1.com:9090/eureka/

#單機(jī)
#eureka.client.serverUrl.defaultZone=http://localhost:${server.port}/eureka/

注意,多臺(tái)eureka-server服務(wù),只需要修改eureka.instance.hostnameeureka.client.service-url.defaultZone

4、新建EurekaServerApplication啟動(dòng)類

@SpringBootApplication
@EnableEurekaServer
public class EurekaServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(EurekaServerApplication.class,args);
    }
}

5、啟動(dòng)eureka-server服務(wù)

二、Seata配置

2.1、Seata服務(wù)端(TC)部署

下載Seata服務(wù)端壓縮包:https://github.com/seata/seata/releases

2.2、Seata配置

1、修改conf目錄中 flie.conf 文件,修改事務(wù)日志存儲(chǔ)模式為 db 及數(shù)據(jù)庫連接信息,且新增service模塊,如下:

transport {
  # tcp udt unix-domain-socket
  type = "TCP"
  #NIO NATIVE
  server = "NIO"
  #enable heartbeat
  heartbeat = true
  # the client batch send request enable
  enableClientBatchSendRequest = false
  #thread factory for netty
  threadFactory {
    bossThreadPrefix = "NettyBoss"
    workerThreadPrefix = "NettyServerNIOWorker"
    serverExecutorThreadPrefix = "NettyServerBizHandler"
    shareBossWorker = false
    clientSelectorThreadPrefix = "NettyClientSelector"
    clientSelectorThreadSize = 1
    clientWorkerThreadPrefix = "NettyClientWorkerThread"
    # netty boss thread size,will not be used for UDT
    bossThreadSize = 1
    #auto default pin or 8
    workerThreadSize = "default"
  }
  shutdown {
    # when destroy server, wait seconds
    wait = 3
  }
  serialization = "seata"
  compressor = "none"
}

#這里手動(dòng)加入service模塊
service {
  #transaction service group mapping
  #修改,可不改,my_test_tx_group 前綴建議為各微服務(wù)名。
  vgroup_mapping.seata_eureka_bank1_group = "seata-server"
  vgroup_mapping.seata_eureka_bank2_group = "seata-server"
  #only support when registry.type=file, please don't set multiple addresses
  # 此服務(wù)的地址
  default.grouplist = "127.0.0.1:8091"
  #disable seata
  disableGlobalTransaction = false
}

## transaction log store, only used in server side
store {
  ## store mode: file、db
  mode = "db"
  ## file store property
  file {
    ## store location dir
    dir = "sessionStore"
    # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
    maxBranchSessionSize = 16384
    # globe session size , if exceeded throws exceptions
    maxGlobalSessionSize = 512
    # file buffer size , if exceeded allocate new buffer
    fileWriteBufferCacheSize = 16384
    # when recover batch read size
    sessionReloadReadSize = 100
    # async, sync
    flushDiskMode = async
  }

  ## database store property
  db {
    ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc.
    datasource = "druid"
    ## mysql/oracle/postgresql/h2/oceanbase etc.
    dbType = "mysql"
    driverClassName = "com.mysql.jdbc.Driver"
    url = "jdbc:mysql://localhost:3306/seata"
    user = "root"
    password = "yibo"
    minConn = 5
    maxConn = 30
    globalTable = "global_table"
    branchTable = "branch_table"
    lockTable = "lock_table"
    queryLimit = 100
  }
}
## server configuration, only used in server side
server {
  recovery {
    #schedule committing retry period in milliseconds
    committingRetryPeriod = 1000
    #schedule asyn committing retry period in milliseconds
    asynCommittingRetryPeriod = 1000
    #schedule rollbacking retry period in milliseconds
    rollbackingRetryPeriod = 1000
    #schedule timeout retry period in milliseconds
    timeoutRetryPeriod = 1000
  }
  undo {
    logSaveDays = 7
    #schedule delete expired undo_log in milliseconds
    logDeletePeriod = 86400000
  }
  #check auth
  enableCheckAuth = true
  #unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanent
  maxCommitRetryTimeout = "-1"
  maxRollbackRetryTimeout = "-1"
  rollbackRetryTimeoutUnlockEnable = false
}

## metrics configuration, only used in server side
metrics {
  enabled = false
  registryType = "compact"
  # multi exporters use comma divided
  exporterList = "prometheus"
  exporterPrometheusPort = 9898
}

由于我們使用了db模式存儲(chǔ)事務(wù)日志,所以我們需要?jiǎng)?chuàng)建一個(gè)seata數(shù)據(jù)庫,Seata數(shù)據(jù)庫表初始化腳本:https://github.com/seata/seata/tree/1.1.0/script/server/db

2.3、修改注冊(cè)中心和配置中心,使用eureka作為注冊(cè)中心、直接使用file.conf配置文件存儲(chǔ)seata規(guī)則,即修改 conf目錄中 registry.conf 文件,如下:

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "eureka"

  nacos {
    application = "seata-server"
    serverAddr = "127.0.0.1:8848"
    group = "SEATA_GROUP"
    namespace = ""
    cluster = "default"
    username = ""
    password = ""
  }
  eureka {
    serviceUrl = "http://eureka-server1.com:9090/eureka/,http://eureka-server2.com:9091/eureka/"
    application = "seata-server"
    weight = "1"
  }
  redis {
    serverAddr = "localhost:6379"
    db = 0
    password = ""
    cluster = "default"
    timeout = 0
  }
  zk {
    cluster = "default"
    serverAddr = "127.0.0.1:2181"
    sessionTimeout = 6000
    connectTimeout = 2000
    username = ""
    password = ""
  }
  consul {
    cluster = "default"
    serverAddr = "127.0.0.1:8500"
  }
  etcd3 {
    cluster = "default"
    serverAddr = "http://localhost:2379"
  }
  sofa {
    serverAddr = "127.0.0.1:9603"
    application = "default"
    region = "DEFAULT_ZONE"
    datacenter = "DefaultDataCenter"
    cluster = "default"
    group = "SEATA_GROUP"
    addressWaitTime = "3000"
  }
  file {
    name = "file.conf"
  }
}

config {
  # file、nacos 、apollo、zk、consul、etcd3
  type = "file"

  nacos {
    serverAddr = "127.0.0.1:8848"
    namespace = ""
    group = "SEATA_GROUP"
    username = ""
    password = ""
  }
  consul {
    serverAddr = "127.0.0.1:8500"
  }
  apollo {
    appId = "seata-server"
    apolloMeta = "http://192.168.1.204:8801"
    namespace = "application"
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    sessionTimeout = 6000
    connectTimeout = 2000
    username = ""
    password = ""
  }
  etcd3 {
    serverAddr = "http://localhost:2379"
  }
  file {
    name = "file.conf"
  }
}

2.4、啟動(dòng)seata-server,如下:

三、各微服務(wù)配置

3.1、引入maven依賴

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.10.RELEASE</version>
    <relativePath/>
</parent>

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>

    <dependency>
        <groupId>com.alibaba.cloud</groupId>
        <artifactId>spring-cloud-alibaba-seata</artifactId>
        <version>2.2.0.RELEASE</version>
        <exclusions>
            <exclusion>
                <groupId>io.seata</groupId>
                <artifactId>seata-all</artifactId>
            </exclusion>
            <exclusion>
                <groupId>io.seata</groupId>
                <artifactId>seata-spring-boot-starter</artifactId>
            </exclusion>
        </exclusions>
    </dependency>

    <dependency>
        <groupId>io.seata</groupId>
        <artifactId>seata-spring-boot-starter</artifactId>
        <version>1.3.0</version>
    </dependency>

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-openfeign</artifactId>
    </dependency>

    <dependency>
        <groupId>tk.mybatis</groupId>
        <artifactId>mapper-spring-boot-starter</artifactId>
        <version>2.1.5</version>
    </dependency>

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.18</version>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.12</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

<dependencyManagement>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-dependencies</artifactId>
            <version>Hoxton.SR8</version>
            <type>pom</type>
            <scope>import</scope>
        </dependency>
    </dependencies>
</dependencyManagement>

<build>
    <plugins>
        <plugin>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-maven-plugin</artifactId>
        </plugin>

        <plugin>
            <groupId>org.mybatis.generator</groupId>
            <artifactId>mybatis-generator-maven-plugin</artifactId>
            <version>1.3.6</version>
            <configuration>
                <configurationFile>
                    ${basedir}/src/main/resources/generator/generatorConfig.xml
                </configurationFile>
                <overwrite>true</overwrite>
                <verbose>true</verbose>
            </configuration>
            <dependencies>
                <dependency>
                    <groupId>mysql</groupId>
                    <artifactId>mysql-connector-java</artifactId>
                    <version>8.0.18</version>
                </dependency>
                <dependency>
                    <groupId>tk.mybatis</groupId>
                    <artifactId>mapper</artifactId>
                    <version>4.1.5</version>
                </dependency>
            </dependencies>
        </plugin>
    </plugins>
</build>    

3.2、分別在各業(yè)務(wù)數(shù)據(jù)庫中創(chuàng)建undo_log表,此表為seata框架使用,sql地址:https://github.com/seata/seata/tree/develop/script/client/at/db

3.3、配置application.properties文件

# 應(yīng)用名
spring.application.name=eureka-seata-bank1

server.port=8080

#表示是否將自己注冊(cè)進(jìn)EurekaServer默認(rèn)為true
eureka.client.register-with-eureka=true
#是否從EurekaServer抓取已有的注冊(cè)信息,默認(rèn)為true,單節(jié)點(diǎn)無所謂,集群必須設(shè)置為true才能配合ribbon使用負(fù)載均衡
eureka.client.fetch-registry=true
#集群版
eureka.client.service-url.defaultZone=http://eureka-server1.com:9090/eureka/,http://eureka-server2.com:9091/eureka/

# Eureka 客戶端應(yīng)用實(shí)例的ID
eureka.instance.instance-id=${spring.application.name}:${server.port}
#點(diǎn)進(jìn)去左下角會(huì)顯示ip
eureka.instance.prefer-ip-address=true

# 調(diào)整注冊(cè)信息的獲取周期
eureka.client.registry-fetch-interval-seconds=5

# 調(diào)整客戶端應(yīng)用狀態(tài)信息上報(bào)的周期
eureka.client.instance-info-replication-interval-seconds=5

# seata config.type=file相關(guān)配置
seata.enabled=true
seata.application-id=${spring.application.name}

# 不同的微服務(wù)vgroup_mapping.seata_eureka_bank1_group配置不同
#這里的名字與file.conf中vgroup_mapping.seata_eureka_bank1_group = "seata-server"相同
seata.tx-service-group=seata_eureka_bank1_group
#這里的名字與file.conf中vgroup_mapping.seata_eureka_bank1_group = "seata-server"相同
seata.service.vgroup-mapping.seata_eureka_bank1_group=seata-server
#這里的名字與file.conf中default.grouplist = "127.0.0.1:8091"相同
seata.service.grouplist.default=127.0.0.1:8091
# 開啟數(shù)據(jù)源自動(dòng)代理
seata.enable-auto-data-source-proxy=true
# 配置中心為本地file文件
seata.config.type=file

# 配置中心為本地file文件的文件名稱
seata.config.file.name=file.conf

seata.registry.type=eureka
seata.registry.eureka.application=seata-server
seata.registry.eureka.serviceUrl=http://eureka-server1.com:9090/eureka/,http://eureka-server2.com:9091/eureka/
seata.registry.eureka.weight=1

mybatis.type-aliases-package=com.yibo.eureka.seata.entity
mybatis.mapper-locations=classpath:mapper/*.xml
mapper.identity=MYSQL
mapper.not-empty=false

spring.datasource.url=jdbc:mysql://localhost:3306/trade_bank1?useUnicode=true&useAffectedRows=true&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=yibo
spring.datasource.driver-class-name=com.mysql.jdbc.Driver

# 設(shè)置連接超時(shí)時(shí)間 default 2000
ribbon.ConnectTimeout=6000
# 設(shè)置讀取超時(shí)時(shí)間  default 5000
ribbon.ReadTimeout=6000
# 對(duì)所有操作請(qǐng)求都進(jìn)行重試  default false
ribbon.OkToRetryOnAllOperations=true
# 切換實(shí)例的重試次數(shù)  default 1
ribbon.MaxAutoRetriesNextServer=2
# 對(duì)當(dāng)前實(shí)例的重試次數(shù) default 0
ribbon.MaxAutoRetries=1

3.4、啟動(dòng)類配置

@MapperScan("com.yibo.eureka.seata.mapper")//掃描mybatis的指定包下的接口
@SpringBootApplication
@EnableDiscoveryClient  //服務(wù)發(fā)現(xiàn),對(duì)外暴露服務(wù)
@EnableEurekaClient     //本服務(wù)啟動(dòng)后會(huì)自動(dòng)注冊(cè)進(jìn)Eureka服務(wù)中
@EnableFeignClients
public class EurekaSeataBank1Application {

    public static void main(String[] args) {
        SpringApplication.run(EurekaSeataBank1Application.class,args);
    }
}

四、業(yè)務(wù)邏輯實(shí)現(xiàn)

4.1、Controller實(shí)現(xiàn)

@RestController
@RequestMapping("/bank1")
public class Bank1Controller {

    @Autowired
    private AccountService accountService;
    
    @GetMapping("/transfer/{amount}")
    public String transfer(@PathVariable("amount") Long amount){
        accountService.updateAccountBalance("1",amount);
        return "bank1"+amount;
    }
}

4.2、Service實(shí)現(xiàn),@GlobalTransactional注解用以開啟全局事務(wù),@Transactional注解用于分支事務(wù)

@Service
@Slf4j
public class AccountServiceImpl implements AccountService {

    @Autowired
    private AccountInfoMapper accountInfoMapper;

    @Autowired
    private Bank2Client bank2Client;

    @Transactional
    @GlobalTransactional//開啟全局事務(wù)
    public void updateAccountBalance(String accountNo, Long amount) {
        log.info("bank1 service begin,XID:{}", RootContext.getXID());
        //扣減張三的金額
        accountInfoMapper.updateAccountBalance(accountNo,amount *-1);
        //調(diào)用李四微服務(wù),轉(zhuǎn)賬
        String transfer = bank2Client.transfer(amount);
        if("fallback".equals(transfer)){
            //調(diào)用李四微服務(wù)異常
            throw new RuntimeException("調(diào)用李四微服務(wù)異常");
        }
        if(amount == 2){
            //人為制造異常
            throw new RuntimeException("bank1 make exception..");
        }
    }
}

4.3、Bank2Client接口的FeignClient

@FeignClient(value="eureka-seata-bank2")
public interface Bank2Client {

    //遠(yuǎn)程調(diào)用微服務(wù)
    @GetMapping("/bank2/transfer/{amount}")
    public String transfer(@PathVariable("amount") Long amount);
}

其他微服務(wù)按此配置即可。

github源碼地址:https://github.com/jjhyb/distributed-transaction

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容