背景
項目采用Spring Cloud (Spring Boot 2.0.1)開發(fā),Spring Cloud是一個微服務架構實施的綜合性解決框架。
1.知識點概述
1.“微服務”
微服務架構的主旨是將原本獨立的系統拆分成多個小型服務,這些小型服務在各自的進程中獨立運行,服務之間基于HTTP的RESful API進行通信。被拆分的每一個小型服務都圍繞著系統中某一項或一些耦合度較高的業(yè)務功能進行構建,
并且每個服務都維護這自身的數據存儲、業(yè)務并發(fā)、自動化測試案例以及獨立部署機制。
由于有了輕量級的通信協作基礎,所有這些微服務可以使用不同的語言來編寫
優(yōu)點:獨立部署、擴展性強(可根據不同模塊的業(yè)務量分別部署相關服務)
缺點:運維的新挑戰(zhàn);接口的一致性;分布式的復雜性
2.事務的傳播行為
@Transactional(propagation = Propagation.REQUIRED, rollbackFor = Exception.class)
(1)REQUIRED 如果當前沒有事務,就新建一個事務,如果已經存在一個事務中,加入到這個事務中。這是最常見的選擇。
@Transactional(propagation = Propagation.SUPPORTS, rollbackFor = Exception.class)
(2)SUPPORTS 支持當前事務,如果當前沒有事務,就以非事務方式執(zhí)行。
@Transactional(propagation = Propagation.MANDATORY, rollbackFor = Exception.class)
(3)MANDATORY 使用當前的事務,如果當前沒有事務,就拋出異常。
@Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = Exception.class)
(4)REQUIRES_NEW 新建事務,如果當前存在事務,把當前事務掛起。
@Transactional(propagation = Propagation.NOT_SUPPORTED, rollbackFor = Exception.class)
(5)NOT_SUPPORTED 以非事務方式執(zhí)行操作,如果當前存在事務,就把當前事務掛起。
@Transactional(propagation = Propagation.NEVER, rollbackFor = Exception.class)
(6)NEVER 以非事務方式執(zhí)行,如果當前存在事務,則拋出異常。
@Transactional(propagation = Propagation.NESTED, rollbackFor = Exception.class)
(7)NESTED 如果當前存在事務,則在嵌套事務內執(zhí)行。如果當前沒有事務,則執(zhí)行與REQUIRED類似的操作。
3.LCN分布式事務原理
由于采用微服務架構,各個模塊相互獨立,導致原先Spring中的@Transactional注解無法滿足跨服務的事務處理。故而引入LCN分布式事務處理。
網上已經有很多關于LCN分布式事務的介紹,主要引用網上的兩張圖:


注:針對異常場景,若項目采用熔斷機制,參與方B拋出異常回滾,若參與方A 未能獲取到參與方B的異常時,參與方A不會回滾。
2.項目實例
https://github.com/codingapi/tx-lcn/wiki
TxManager源碼
https://github.com/codingapi/tx-lcn/tree/master/tx-manager
TxClient使用說明
https://github.com/codingapi/tx-lcn/wiki/TxClient%E4%BD%BF%E7%94%A8%E8%AF%B4%E6%98%8E
demo
https://github.com/codingapi/springcloud-lcn-demo
主要原理
核心步驟
(1)創(chuàng)建事務組
在事務發(fā)起方開始執(zhí)行業(yè)務代碼之前先調用TxManager創(chuàng)建事務組對象,然后拿到事務標示GroupId的過程。
(2)添加事務組
參與方在執(zhí)行完業(yè)務方法以后,將該模塊的事務信息添加通知給TxManager的操作。
(3)關閉事務組
在發(fā)起方執(zhí)行完業(yè)務代碼以后,將發(fā)起方執(zhí)行結果狀態(tài)通知給TxManager的動作。當執(zhí)行完關閉事務組的方法以后,TxManager將根據事務組信息來通知相應的參與模塊提交或回滾事務。

問題:
git上的源碼是1.5.4版本的Spring Boot,若使用2.0版本,項目啟動會報錯,主要原因是2.0版本移除了context.embedded包,導致EmbeddedServletContainerInitializedEvent.java對象找不到(該對象主要和Listener配合使用,“通過監(jiān)聽,在刷新上下文后將要發(fā)布的事件,用于獲取A的本地端口運行服務器?!盩xClient主要從該Event中獲取服務器端口號。)
解決:
(1)改寫springcloud項目下com.codingapi.tx.springcloud.listener包中的ServerListener.java
@Component
public class ServerListener implements ApplicationListener<ApplicationEvent> {
private Logger logger = LoggerFactory.getLogger(ServerListener.class);
private int serverPort;
@Value("${server.port}")
private String port;
@Autowired
private InitService initService;
@Override
public void onApplicationEvent(ApplicationEvent event) {
// logger.info("onApplicationEvent -> onApplicationEvent. "+event.getEmbeddedServletContainer());
// this.serverPort = event.getEmbeddedServletContainer().getPort();
//TODO Spring boot 2.0.0沒有EmbeddedServletContainerInitializedEvent 此處寫死;modify by young
this.serverPort = Integer.parseInt(port);
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
// 若連接不上txmanager start()方法將阻塞
initService.start();
}
});
thread.setName("TxInit-thread");
thread.start();
}
public int getPort() {
return this.serverPort;
}
public void setServerPort(int serverPort) {
this.serverPort = serverPort;
}
}
@Component注解會自動掃描配置文件中的server.port值;
(2)改寫tx-manager項目下com.codingapi.tm.listener包中的ApplicationStartListener.java
@Component
public class ApplicationStartListener implements ApplicationListener<ApplicationEvent> {
@Override
public void onApplicationEvent(ApplicationEvent event) {
//TODO Spring boot 2.0.0沒有EmbeddedServletContainerInitializedEvent 此處寫死;modify by young
// int serverPort = event.getEmbeddedServletContainer().getPort();
String ip = getIp();
Constants.address = ip+":48888";//寫死端口號,反正TxManager端口也是配置文件配好的(●′ω`●)
}
private String getIp(){
String host = null;
try {
host = InetAddress.getLocalHost().getHostAddress();
} catch (UnknownHostException e) {
e.printStackTrace();
}
return host;
}
}
(3)改寫tx-manager項目下com.codingapi.tm.manager.service.impl包中MicroServiceImpl.java類的getState()方法
@Override
public TxState getState() {
TxState state = new TxState();
String ipAddress = "";
//TODO Spring boot 2.0.0沒有discoveryClient.getLocalServiceInstance() 用InetAddress獲取host;modify by young
//String ipAddress = discoveryClient.getLocalServiceInstance().getHost();
try {
ipAddress = InetAddress.getLocalHost().getHostAddress();
} catch (Exception e) {
e.printStackTrace();
}
if (!isIp(ipAddress)) {
ipAddress = "127.0.0.1";
}
state.setIp(ipAddress);
state.setPort(Constants.socketPort);
state.setMaxConnection(SocketManager.getInstance().getMaxConnection());
state.setNowConnection(SocketManager.getInstance().getNowConnection());
state.setRedisSaveMaxTime(configReader.getRedisSaveMaxTime());
state.setTransactionNettyDelayTime(configReader.getTransactionNettyDelayTime());
state.setTransactionNettyHeartTime(configReader.getTransactionNettyHeartTime());
state.setNotifyUrl(configReader.getCompensateNotifyUrl());
state.setCompensate(configReader.isCompensateAuto());
state.setCompensateTryTime(configReader.getCompensateTryTime());
state.setCompensateMaxWaitTime(configReader.getCompensateMaxWaitTime());
state.setSlbList(getServices());
return state;
}
注:有些服務器這個方法獲取不到ip,注冊到tx.manager的ip為127.0.0.1
@Value("${spring.cloud.client.ip-address}")
private String ipAddress;
public TxState getState() {
TxState state = new TxState();
// String ipAddress = "";
//TODO Spring boot 2.0.0沒有discoveryClient.getLocalServiceInstance() 用InetAddress獲取host;modify by young
//String ipAddress = discoveryClient.getLocalServiceInstance().getHost();
// try {
// ipAddress = InetAddress.getLocalHost().getHostAddress();
// } catch (Exception e) {
// e.printStackTrace();
// }
if (!isIp(ipAddress)) {
ipAddress = "127.0.0.1";
}
...
}
(4)改寫tx-client項目下TransactionServerFactoryServiceImpl.java第60行;
//分布式事務已經開啟,業(yè)務進行中 **/
if (info.getTxTransactionLocal() != null || StringUtils.isNotEmpty(info.getTxGroupId())) {
//檢查socket通訊是否正常 (第一次執(zhí)行時啟動txRunningTransactionServer的業(yè)務處理控制,然后嵌套調用其他事務的業(yè)務方法時都并到txInServiceTransactionServer業(yè)務處理下)
if (SocketManager.getInstance().isNetState()) {
if (info.getTxTransactionLocal() != null) {
return txDefaultTransactionServer;
} else {
// if(transactionControl.isNoTransactionOperation() // 表示整個應用沒有獲取過DB連接
// || info.getTransaction().readOnly()) { //無事務業(yè)務的操作
// return txRunningNoTransactionServer;
// }else {
// return txRunningTransactionServer;
// }
if(!transactionControl.isNoTransactionOperation()) { //TODO 有事務業(yè)務的操作 modify by young
return txRunningTransactionServer;
}else {
return txRunningNoTransactionServer;
}
}
} else {
logger.warn("tx-manager not connected.");
return txDefaultTransactionServer;
}
}
//分布式事務處理邏輯*結束***********/
重新maven deploy打出jar包,上傳自己的maven服務器;
就此Spring Boot2.0引入LCN分布式事務正常啟動。
優(yōu)化
git項目上引用tx.manager.url(txmanager服務器路徑)需要在各自的微服務中添加TxManagerHttpRequestService.java和TxManagerTxUrlService.java的實現類;
可將兩個方法繼承到springcloud源碼包中
(1)添加TxManagerConfiguration.java
@Configuration
@ConditionalOnProperty(value = "tx.manager.url")
public class TxManagerConfiguration {
@Bean
@RefreshScope
@ConfigurationProperties(prefix = "tx.manager")
public TxManagerProperity txManagerProperity(){
return new TxManagerProperity();
};
@Bean
public TxManagerTxUrlService txManagerTxUrlService(){
return new TxManagerTxUrlServiceImpl(txManagerProperity());
}
@Bean
public TxManagerHttpRequestService txManagerHttpRequestService(){
return new TxManagerHttpRequestServiceImpl();
}
}
(2) 添加TxManagerProperity .java
public class TxManagerProperity {
private String url;
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
}
(3)修改TxManagerTxUrlServiceImpl.java
public class TxManagerTxUrlServiceImpl implements TxManagerTxUrlService {
private TxManagerProperity property;
public TxManagerTxUrlServiceImpl(TxManagerProperity property) {
this.property = property;
}
@Override
public String getTxUrl() {
return property.getUrl();
}
}
(4)自動配置掃描包添加新增的配置文件
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.codingapi.tx.TransactionConfiguration,\
com.codingapi.longfor.TxManagerConfiguration
重新deploy jar包并被項目引用后,各個子服務無需再添加TxManagerHttpRequestService.java和TxManagerTxUrlService.java的實現類
測試
(1)參與方大于2個,參與方A分別調用參與方B和參與方C,只要參與方A捕獲到Exception,3方均會回滾;
(2)參與方使用不同的事務傳播行為,支持REQUIRED,REQUIRES_NEW,但是使用SUPPORTS時,A拋異常,B和C可以回滾,A不會回滾。
(3)A既繼承了ITxTransaction,又添加了@TxTransaction和@Transactional注解,異常情況均會回滾
@TxTransaction(isStart = true)
@Transactional(propagation = Propagation.SUPPORTS, rollbackFor = Exception.class)
(4)若項目采用熔斷機制,參與方B拋出異常回滾,若參與方A 未能獲取到參與方B的異常時,參與方A不會回滾
注:當前還未引入補償機制,后續(xù)補充