在傳統(tǒng)業(yè)務(wù)中我們的數(shù)據(jù)庫都是單機(jī)的,數(shù)據(jù)庫本身就提供了ACID。但隨著業(yè)務(wù)的增長我們需要分庫分表將數(shù)據(jù)放到多個(gè)數(shù)據(jù)庫中,這個(gè)時(shí)候單個(gè)庫的事務(wù)就無法滿足需求了,就需要理解和掌握分布式事務(wù)了。
什么是分布式事務(wù)
分布式事務(wù)就是指事務(wù)的參與者、支持事務(wù)的服務(wù)器、資源服務(wù)器以及事務(wù)管理器分別位于不同的分布式系統(tǒng)的不同節(jié)點(diǎn)之上。簡單的說,就是一次大的操作由不同的小操作組成,這些小的操作分布在不同的服務(wù)器上,且屬于不同的應(yīng)用,分布式事務(wù)需要保證這些小操作要么全部成功,要么全部失敗。本質(zhì)上來說,分布式事務(wù)就是為了保證不同數(shù)據(jù)庫的數(shù)據(jù)一致性。

打個(gè)比方,一個(gè)電商系統(tǒng)的付款服務(wù)和訂單的服務(wù)是兩個(gè)應(yīng)用,并且付款表和訂單表不在一個(gè)數(shù)據(jù)庫。那么如何保證用戶付完款之后就能給用戶創(chuàng)建好訂單,并且創(chuàng)建訂單失敗就會(huì)給用戶退款。而且在一個(gè)大型電商系統(tǒng)里每一個(gè)數(shù)據(jù)庫都不止一個(gè)節(jié)點(diǎn),又如何保證每個(gè)節(jié)點(diǎn)的數(shù)據(jù)都是一致的呢。
這個(gè)時(shí)候加州大學(xué)的計(jì)算機(jī)科學(xué)家 Eric Brewer 提出了CAP定理來奠定了分布式系統(tǒng)設(shè)計(jì)的基礎(chǔ)。
CAP理論
- C-Consistent,操作成功后,所有節(jié)點(diǎn)看到的數(shù)據(jù)都是一樣的。對(duì)于數(shù)據(jù)分布在不同節(jié)點(diǎn)上的數(shù)據(jù)上來說,如果在某個(gè)節(jié)點(diǎn)更新了數(shù)據(jù),那么在其他節(jié)點(diǎn)如果都能讀取到這個(gè)最新的數(shù)據(jù),那么就稱為強(qiáng)一致,如果有某個(gè)節(jié)點(diǎn)沒有讀取到,那就是分布式不一致。
- A-Availability,可用性,服務(wù)全部一致可用,在規(guī)定時(shí)間內(nèi)完成合理的響應(yīng)??捎眯缘膬蓚€(gè)關(guān)鍵一個(gè)是合理的時(shí)間,一個(gè)是合理的響應(yīng)。合理的時(shí)間指的是請求不能無限被阻塞,應(yīng)該在合理的時(shí)間給出返回。合理的響應(yīng)指的是系統(tǒng)應(yīng)該明確返回結(jié)果并且結(jié)果是正確的。
- P-Partition tolerance,分區(qū)容錯(cuò)性。指分布式系統(tǒng)在遇到某個(gè)節(jié)點(diǎn)故障后,仍能夠?qū)ν馓峁┓?wù)。也就是說在分布式集群中某一個(gè)節(jié)點(diǎn)出現(xiàn)故障,但是整個(gè)集群都正常的。
CAP理論有一個(gè)很關(guān)鍵的定理:一個(gè)分布式系統(tǒng)最多只能同時(shí)滿足一致性(Consistency)、可用性(Availability)和分區(qū)容錯(cuò)性(Partition tolerance)這三項(xiàng)中的兩項(xiàng)。也就是說一個(gè)分布式系統(tǒng)是不可能同時(shí)滿足強(qiáng)一致、高可用、分區(qū)容錯(cuò)性的,通常系統(tǒng)設(shè)計(jì)的時(shí)候會(huì)犧牲某一個(gè)指標(biāo)來實(shí)現(xiàn)另外兩個(gè)指標(biāo)。

CA without P:如果不要求P,則C(強(qiáng)一致性)和A(可用性)是可以保證的。但放棄P的同時(shí)也就意味著放棄了系統(tǒng)的擴(kuò)展性,也就是分布式節(jié)點(diǎn)受限,沒辦法部署子節(jié)點(diǎn),這是違背分布式系統(tǒng)設(shè)計(jì)的初衷的。傳統(tǒng)的關(guān)系型數(shù)據(jù)庫RDBMS:Oracle、MySQL就是CA。
CP without A:如果不要求A,相當(dāng)于每個(gè)請求都需要在服務(wù)器之間保持強(qiáng)一致,而P(分區(qū))會(huì)導(dǎo)致同步時(shí)間無限延長(也就是等待數(shù)據(jù)同步完才能正常訪問服務(wù)),一旦發(fā)生網(wǎng)絡(luò)故障或者消息丟失等情況,就要犧牲用戶的體驗(yàn),等待所有數(shù)據(jù)全部一致了之后再讓用戶訪問系統(tǒng)。設(shè)計(jì)成CP的系統(tǒng)其實(shí)不少,最典型的就是分布式數(shù)據(jù)庫,如Zookeeper、HBase等。
AP wihtout C:要高可用并允許分區(qū),則需放棄一致性。一旦分區(qū)發(fā)生,節(jié)點(diǎn)之間可能會(huì)失去聯(lián)系,為了高可用,每個(gè)節(jié)點(diǎn)只能用本地?cái)?shù)據(jù)提供服務(wù),而這樣會(huì)導(dǎo)致全局?jǐn)?shù)據(jù)的不一致性。但系統(tǒng)依然能正常提供服務(wù)。
需要說一下在分布式系統(tǒng)中網(wǎng)絡(luò)無法100%可靠,分區(qū)其實(shí)是一個(gè)必然現(xiàn)象,如果我們選擇了CA而放棄了P,那么當(dāng)發(fā)生分區(qū)現(xiàn)象時(shí),為了保證一致性,這個(gè)時(shí)候必須拒絕請求,但是A又不允許,所以分布式系統(tǒng)理論上不可能選擇CA架構(gòu),只能選擇CP或者AP架構(gòu)。
BASE理論
eBay的架構(gòu)師Dan Pritchett源于對(duì)大規(guī)模分布式系統(tǒng)的實(shí)踐總結(jié),在ACM上發(fā)表文章提出BASE理論。
BASE理論是對(duì)CAP理論的延伸,核心思想是即使無法做到強(qiáng)一致性(Strong Consistency,CAP的一致性就是強(qiáng)一致性),但應(yīng)用可以采用適合的方式達(dá)到最終一致性(Eventual Consitency)。BASE是Basically Available(基本可用)、Soft state(軟狀態(tài))和Eventually consistent(最終一致性)三個(gè)短語的縮寫。
- Basically Available(基本可用): 指分布式系統(tǒng)在出現(xiàn)不可預(yù)知故障的時(shí)候,允許損失部分可用性。比如電商大促,服務(wù)降級(jí)的體現(xiàn)。
- Soft-state(軟狀態(tài)):指允許系統(tǒng)中的數(shù)據(jù)存在中間狀態(tài),并認(rèn)為該中間狀態(tài)的存在不會(huì)影響系統(tǒng)的整體可用性。
- Eventually consistent(最終一致):強(qiáng)調(diào)的是所有的數(shù)據(jù)更新操作,在經(jīng)過一段時(shí)間的同步之后,最終都能夠達(dá)到一個(gè)一致的狀態(tài)。
在BASE中用軟狀態(tài)和最終一致,保證了延遲后的一致性。BASE和 ACID 是相反的,它完全不同于ACID的強(qiáng)一致性模型,而是通過犧牲強(qiáng)一致性來獲得可用性,并允許數(shù)據(jù)在一段時(shí)間內(nèi)是不一致的,但最終達(dá)到一致狀態(tài)。
分布式事務(wù)解決方案
有了上面分布式理論基礎(chǔ)之后就有了常用的幾種解決方案,在說到解決方案之前不得不提醒一下:在業(yè)務(wù)規(guī)模不大的情況下設(shè)計(jì)系統(tǒng)盡量去規(guī)避分布式事務(wù)問題。比如電商系統(tǒng)的早期階段,單機(jī)架構(gòu)就能滿足業(yè)務(wù)就不要用微服務(wù)的架構(gòu),或者說微服務(wù)架構(gòu)早期可以把服務(wù)拆分的更粗一些來避免跨庫業(yè)務(wù)。因?yàn)闊o論是那種解決方案都要花費(fèi)不少的時(shí)間成本。
通過XA協(xié)議實(shí)現(xiàn)兩階段提交
XA是由X/Open組織提出的分布式事務(wù)規(guī)范,整體是由一個(gè)事務(wù)管理器(TM)和多個(gè)資源管理器(RM)組成,RM一般就是指我們的數(shù)據(jù)庫而TM相當(dāng)于程序中的數(shù)據(jù)源。整個(gè)事務(wù)過程分為兩個(gè)階段提交,prepare和commit。

- 第一階段TM要求所有的RM進(jìn)行數(shù)據(jù)庫預(yù)提交操作,所有RM都OK了才會(huì)進(jìn)入第二階段,只要一個(gè)RM返回失敗就會(huì)全部回滾并終止。
- 第二階段TM要求所有的RM提交數(shù)據(jù),要注意的是后面commit如果出錯(cuò)的話并不會(huì)回滾已經(jīng)提交的commit。
MySQL從5.0.3開始支持XA分布式事務(wù),且只有InnoDB存儲(chǔ)引擎支持。在我們J2EE項(xiàng)目中可用使用Atomikos(充當(dāng)TM)來做XA分布式事務(wù)。這里用springboot舉例:
- 引入atomikos包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
- 配置數(shù)據(jù)源和事務(wù)管理器,需要注意的是數(shù)據(jù)源要使用MysqlXADataSource,事務(wù)管理器需要使用JtaTransactionManager
import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import com.mysql.cj.jdbc.MysqlXADataSource;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.transaction.jta.JtaTransactionManager;
import javax.sql.DataSource;
import javax.transaction.UserTransaction;
@Configuration
public class DBConfig {
@Bean("db20")
public DataSource db20(){
MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();
mysqlXADataSource.setUser("root");
mysqlXADataSource.setPassword("123456");
mysqlXADataSource.setUrl("jdbc:mysql://192.168.3.20:3306/shard_order?serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=UTF-8");
AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
atomikosDataSourceBean.setXaDataSource(mysqlXADataSource);
return atomikosDataSourceBean;
}
@Bean("db21")
public DataSource db21(){
MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();
mysqlXADataSource.setUser("root");
mysqlXADataSource.setPassword("123456");
mysqlXADataSource.setUrl("jdbc:mysql://192.168.3.21:3306/shard_order?serverTimezone=UTC&useSSL=false&useUnicode=true&characterEncoding=UTF-8");
AtomikosDataSourceBean atomikosDataSourceBean = new AtomikosDataSourceBean();
atomikosDataSourceBean.setXaDataSource(mysqlXADataSource);
return atomikosDataSourceBean;
}
@Bean("xaTransaction")
public JtaTransactionManager jtaTransactionManager(){
UserTransaction userTransaction = new UserTransactionImp();
UserTransactionManager userTransactionManager = new UserTransactionManager();
return new JtaTransactionManager(userTransaction,userTransactionManager);
}
}
- 在service層調(diào)用
@Service
public class OrderService {
@Transactional(transactionManager = "xaTransaction")
public void insertTest(@Qualifier("db20") DataSource dataSource20,
@Qualifier("db21")DataSource dataSource21){
JdbcTemplate jdbc195 = new JdbcTemplate(dataSource20);
String sql1 = "INSERT INTO `order_info_1`(`id`, `order_amount`, `order_status`, `user_id`) VALUES (5, 5.00, 1, 4)";
int i = jdbc195.update(sql1);
System.out.println("**************影響的行數(shù):"+i);
JdbcTemplate jdbc197 = new JdbcTemplate(dataSource21);
String sql2 = "INSERT INTO `order_info_1`(`id`, `order_amount`, `order_status`, `user_id`) VALUES (6, 6.00, 2, 4);";
int i1 = jdbc197.update(sql2);
System.out.println("**************影響的行數(shù):" + i1);
}
}
這樣就寫好了一個(gè)分布式事務(wù),上面兩個(gè)sql只要其中一個(gè)失敗就會(huì)回滾。
可以看到使用XA協(xié)議的方案做分布式事務(wù)非常簡單,對(duì)代碼完全沒有侵入性。而且主流的數(shù)據(jù)庫和數(shù)據(jù)庫中間件Sharding-JDBC、MyCat等都默認(rèn)支持XA協(xié)議,但XA協(xié)議有一個(gè)缺點(diǎn)就是性能比較低,通常會(huì)比本地事務(wù)性能差十倍。
通過XA協(xié)議實(shí)現(xiàn)兩階段提交其實(shí)是CAP里的CA,它的特點(diǎn)是強(qiáng)一致性,但是可以通過Mysql等數(shù)據(jù)庫的主從復(fù)制來滿足P。XA的適用場景是一些并發(fā)量不是很高的業(yè)務(wù),在業(yè)務(wù)從小規(guī)模到中等規(guī)模過度的時(shí)候可以選擇XA來做分布式事務(wù)。
TCC事務(wù)補(bǔ)償機(jī)制
TCC分別對(duì)應(yīng)Try、Confirm和Cancel三種操作實(shí)現(xiàn)的。最早是由Pat Helland于2007年發(fā)表的一篇名為《Life beyond Distributed Transactions:an Apostate’s Opinion》的論文提出。
- Try階段:嘗試執(zhí)行,完成所有業(yè)務(wù)檢查(一致性),預(yù)留必須業(yè)務(wù)資源(準(zhǔn)隔離性)
- Confirm階段:確認(rèn)執(zhí)行真正執(zhí)行業(yè)務(wù),不作任何業(yè)務(wù)檢查,只使用Try階段預(yù)留的業(yè)務(wù)資源,Confirm操作滿足冪等性。要求具備冪等設(shè)計(jì),Confirm失敗后需要進(jìn)行重試。
- Cancel階段:取消執(zhí)行,釋放Try階段預(yù)留的業(yè)務(wù)資源。
使用方法其實(shí)就是針對(duì)每一個(gè)操作都需要提前注冊一個(gè)與其對(duì)應(yīng)的補(bǔ)償操作,在執(zhí)行失敗后按照失敗節(jié)點(diǎn)向前補(bǔ)償,撤銷之前的操作。
舉個(gè)例子:A和B兩家銀行,一個(gè)用戶從A到B進(jìn)行轉(zhuǎn)賬,A減1000成功,B加1000成功整個(gè)事務(wù)結(jié)束,如果B沒有加成功則通知A執(zhí)行加1000進(jìn)行補(bǔ)償操作。
代碼實(shí)現(xiàn)如下:
@Service
public class TransferAccountService {
@Autowired
@Qualifier("db20JdbcTemplate")
private JdbcTemplate db20JdbcTemplate;
@Autowired
@Qualifier("db21JdbcTemplate")
private JdbcTemplate db21JdbcTemplate;
@Transactional(transactionManager = "db20TransactionManager")
public void transfer() {
//銀行A開始轉(zhuǎn)賬
int ares = db20JdbcTemplate.update("update user_account set account=account-1000 where user_id=2");
int res = 0;
try {
//第一步try 驗(yàn)證銀行A是否轉(zhuǎn)賬成功
if (ares == 0) {
return;
}
// int i = 1/0; // 運(yùn)行點(diǎn)A發(fā)生異常
//第二步Confirm 銀行B開始轉(zhuǎn)賬
res = db21JdbcTemplate.update("update user_account set account=account+1000 where user_id=2");
//int i = 1/0; // 運(yùn)行點(diǎn)B發(fā)生異常
} catch (Exception e) {
e.printStackTrace();
//第三步Cancel 一旦轉(zhuǎn)賬失敗進(jìn)行補(bǔ)償
//這個(gè)補(bǔ)償要判斷好銀行B的轉(zhuǎn)賬是否操作成功,如果在運(yùn)行點(diǎn)A發(fā)的異常就說明轉(zhuǎn)賬沒有成功這個(gè)時(shí)候才需要補(bǔ)償,如果是在運(yùn)行點(diǎn)B發(fā)生的異常那么再補(bǔ)償就是過度補(bǔ)償
if (res == 0) {
//銀行B沒有轉(zhuǎn)賬成功補(bǔ)償銀行A
db20JdbcTemplate.update("update user_account set account=account+1000 where user_id=2");
}
}
}
TCC其實(shí)很好理解就是在程序里每一個(gè)分布式邏輯都按照T,C,C三個(gè)步驟去做對(duì)應(yīng)的處理即可,不過TCC有一個(gè)很大的問題就是代碼入侵性很強(qiáng)復(fù)雜度很高。要做本身業(yè)務(wù)的基礎(chǔ)上做一些額外的事,并且對(duì)程序員和測試的要求會(huì)比較高。try的粒度很有講究一定要判斷是否需要Cancel,不然就會(huì)發(fā)生過度補(bǔ)償?shù)膯栴}。
TCC相比XA協(xié)議不需要TM事務(wù)管理器來統(tǒng)一管理事務(wù),性能會(huì)比XA協(xié)議要好很多。
使用本地消息表實(shí)現(xiàn)最終一致性
本地消息表這個(gè)方案最初是ebay提出,是基于BASE理論設(shè)計(jì)的,是最終一致性模型。
此方案也很好理解,核心是將需要分布式處理的任務(wù)通過消息日志的方式來異步執(zhí)行。消息日志可以存儲(chǔ)到數(shù)據(jù)庫或消息隊(duì)列,再通過業(yè)務(wù)規(guī)則自動(dòng)或人工發(fā)起重試。人工重試更多的是應(yīng)用于支付場景,通過對(duì)賬系統(tǒng)對(duì)事后問題的處理。

舉一個(gè)支付訂單的場景:
首先需要在用戶扣除賬戶金額的同時(shí)將所支付的訂單消息存入到消息表中。
@Transactional(transactionManager = "db20TransactionManager")
public int payment(int uid, int order_id, int amount) {
//查詢用戶賬戶信息
List<Map<String,Object>> userAccountList = db20JdbcTemplate.queryForList("select id,user_id,account from user_account where user_id = ?",uid);
if (userAccountList.size() == 0) {
return 1;
}
UserAccount userAccount = new UserAccount();
try {
BeanUtils.populate(userAccount,userAccountList.get(0));
}catch (Exception e){
e.printStackTrace();
return 1;
}
int account = userAccount.getAccount();
if (account < amount) {
return 2;
}
//更新用戶賬戶金額
userAccount.setAccount(account - amount);
db20JdbcTemplate.update("UPDATE `user_account` SET `account` = ? WHERE `user_id` = ?;", userAccount.getAccount(), userAccount.getUser_id());
PayMsg payMsg = new PayMsg();
payMsg.setId(1001);
payMsg.setOrder_id(order_id);
payMsg.setStatus(0);//0-未發(fā)送,1-發(fā)送成功,2-超次數(shù)
payMsg.setFail_count(0);
//寫入本地消息表
db20JdbcTemplate.update("INSERT INTO `pay_msg`(`id`, `order_id`, `status`, `fail_count`) VALUES (?, ?, ?, ?)", payMsg.getId(), payMsg.getOrder_id(), payMsg.getStatus(), payMsg.getFail_count());
return 0;
}
然后不斷輪詢消息表的數(shù)據(jù)去更改訂單狀態(tài),這里是通過http的方式去調(diào)用更改訂單狀態(tài)的接口,也可以通過RPC的方式。
@Scheduled(cron = "0/5 * * * * ?")
public void orderNotify() throws Exception{
List<PayMsg> payMsgList =new ArrayList<>();
//查詢未處理成功的消息
List<Map<String,Object>> mapList = db20JdbcTemplate.queryForList("select id,order_id,status,fail_count from pay_msg where status = 0");
if(mapList.isEmpty()){
return;
}
for(Map<String,Object> map : mapList){
PayMsg payMsg = new PayMsg();
BeanUtils.populate(payMsg,map);
payMsgList.add(payMsg);
}
for (PayMsg payMsg: payMsgList) {
int order_id = payMsg.getOrder_id();
//調(diào)用訂單接口來更改消息狀態(tài),這里會(huì)有重試,最多重試五次
CloseableHttpClient httpClient = HttpClientBuilder.create().build();
HttpGet httpGet = new HttpGet("http://localhost:8080/handleorder?id="+order_id);
CloseableHttpResponse httpResponse = httpClient.execute(httpGet);
String response = EntityUtils.toString(httpResponse.getEntity());
System.out.println("************調(diào)用結(jié)果:"+response);
if("success".equals(response)){
payMsg.setStatus(1);
}else{
int count = payMsg.getFail_count();
payMsg.setFail_count(count+1);
if(count+1>5){
payMsg.setStatus(2);
}
}
db20JdbcTemplate.update("update pay_msg set status=?,fail_count=? where id=?",payMsg.getStatus(),payMsg.getFail_count(),payMsg.getId());
}
}
這樣一個(gè)簡單的分布式事務(wù)方案就OK了,要注意的是要保證業(yè)務(wù)的冪等性。相比XA和TCC,本地消息表的方案代碼入侵性比TCC更少一些,也因?yàn)橹粫?huì)用到本地事務(wù)會(huì)比XA的性能更好一點(diǎn),適用于對(duì)一致性要求不高(不需要很及時(shí))的場景。
以上所有的示例代碼可以看這里https://github.com/burgleaf/distributed-transaction