前言
??上篇我們分析了spring-tx中的AOP部分,包括TransactionAttributeSourcePointcut如何定位潛在的事務(wù)方法,以及TransactionInterceptor又如何結(jié)合PlatformTransactionManager為方法應(yīng)用事務(wù)管理,相信看過上篇的同學(xué)也從中g(shù)et到了使用AOP的新姿勢??
??不過到目前為止,除去開篇中的概念介紹,我們對PlatformTransactionManager還知之甚少,本篇我們就深入一點兒。DataSourceTransactionManager作為最具代表性的 PlatformTransactionManager實現(xiàn),我們就以它為例來看看隱藏在抽象下的實現(xiàn)細節(jié)吧。
一個小例子
??來看一個使用Spring聲明式事務(wù)的小例子。
@Service
public class ServiceA {
@Autowired
private ServiceB serviceB;
// Propagation.REQUIRED 有就加入,沒有就自己玩
@Transactional(propagation = Propagation.REQUIRED)
public void service() {
// do some CRUD stuff...
// then call serviceB#service()
serviceB.service();
}
}
@Service
public class ServiceB {
// Propagation.REQUIRES_NEW 自掃門前雪,各用各的事務(wù)
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void service() {
// do some CRUD stuff...
}
}
@RestController
public class DemoController {
@Autowired
private ServiceA serviceA;
@PostMapping("/demo")
public void demo() {
serviceA.service();
}
}
這里我們有兩個事務(wù)方法,分別屬于ServiceA和ServiceB,處理請求的入口在DemoController。請求到達后,
- 進入服務(wù) A 的
service()方法 - 開啟事務(wù) tx-a
- 執(zhí)行CRUD
- 進入服務(wù) B 的
service()方法 - 掛起 tx-a,開啟新事務(wù) tx-b
- 執(zhí)行CRUD
- 結(jié)束
serviceB#service()調(diào)用,視執(zhí)行情況提交或回滾事務(wù) - 結(jié)束
serviceA#service()調(diào)用,視執(zhí)行情況提交或回滾事務(wù)
麻雀雖小五臟俱全,這個小例子也是后續(xù)分析源碼時的參照,先給出來混個臉熟吧。
DataSourceTransactionManager
Hierarchy overview

??DataSourceTransactionManager繼承自AbstractPlatformTransactionManager,基類中實現(xiàn)了事務(wù)管理的一整套工作流:
- 檢查是否已存在事務(wù)
- 應(yīng)用適當?shù)膫鞑バ袨?/li>
- 必要時暫停和恢復(fù)事務(wù)
- 提交事務(wù)時檢查
rollback-only標記 - 回滾事務(wù)時的處理——如實回滾或僅設(shè)置
rollback-only標記 - 觸發(fā)已注冊的
transaction synchronization回調(diào)
子類只需要根據(jù)事務(wù)的狀態(tài)實現(xiàn)特定的模板方法即可,比如事務(wù)開始、暫停、恢復(fù)、提交和回滾。
TransactionInterceptor
??事務(wù)攔截器想必大家都很熟悉了,簡化后的模型大致如下:
// 詳細的分析請看上篇
public Object invoke(MethodInvocation invocation) throws Throwable {
Object result = null;
// 1. 解析@Transactional注解
TransactionAttribute txAttr = xxx;
// 2. 獲取事物管理器
PlatformTransactionManager txManager = xxx;
// 3. 根據(jù)配置的事務(wù)屬性開啟事務(wù)
TransactionStatus status = txManager.getTransaction(txAttr);
try {
// 4. 執(zhí)行目標方法
result = invocation.proceed();
} catch (Throwable ex) {
// 5-1. 方法異常退出,根據(jù)配置決定是提交還是回滾
if (txAttr.rollbackOn(ex)) {
txManager.rollback(status);
} else {
txManager.commit(status);
}
throw ex;
}
// 5-2. 方法正常執(zhí)行完成,提交事務(wù)
txManager.commit(status);
return result;
}
核心流程大致分為5步,其中和事務(wù)管理器有關(guān)的是第3和第5步。前情提要到此結(jié)束,那我們就從第3步getTransaction(...)開始唄~
getTransaction
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
// 使用默認配置,如果沒有的話
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
// 模板方法,子類實現(xiàn)
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
// 檢查是否已存在事務(wù)
if (isExistingTransaction(transaction)) {
// 已存在事務(wù),根據(jù)傳播行為來決定如何處理
return handleExistingTransaction(def, transaction, debugEnabled);
}
// 檢查一下超時配置,默認 -1 ,不能更低了
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}
// 當前不存在事務(wù),同樣要根據(jù)傳播行為來決定如何處理
// PROPAGATION_MANDATORY 要求一定要運行在事務(wù)中,當前事務(wù)不存在,因此這里選擇拋出異常
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
// PROPAGATION_REQUIRED 在沒有事務(wù)存在時會開啟新的獨立事務(wù)
// PROPAGATION_REQUIRES_NEW 無論是否有事務(wù)存在都會開啟新的獨立事務(wù)
// PROPAGATION_NESTED 在沒有事務(wù)存在時和PROPAGATION_REQUIRED的行為一致
// 換言之,走到這里,這三種傳播行為都會開啟新的獨立事務(wù)
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 掛起當前事務(wù),這里傳null是因為當前沒有事務(wù)
// 仍然調(diào)用suspend的意義在于觸發(fā) TransactionSynchronization#suspend() 回調(diào)
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
// 開啟新的獨立事務(wù)
return startTransaction(def, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error ex) {
// 事務(wù)開啟失敗,恢復(fù)已存在的事務(wù)
// 同樣為了觸發(fā) TransactionSynchronization 回調(diào),這里是 resume()
resume(null, suspendedResources);
throw ex;
}
}
// 其它傳播行為在當前沒有事務(wù)存在時會繼續(xù)以非事務(wù)方式運行,比如PROPAGATION_SUPPORTS
// 不用為它們開啟新的事務(wù),但需要處理可能注冊的transaction synchronization
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + def);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
}
??getTransaction(...)首先會調(diào)用模板方法doGetTransaction()來獲取事務(wù)對象(transaction object)。事務(wù)對象雖然特定于具體的PlatformTransactionManeger實現(xiàn),但一般都會攜帶相應(yīng)的事務(wù)狀態(tài),我們來分析一下DataSourceTransactionManager對它的實現(xiàn)。
@Override
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
// 是否允許嵌套事務(wù),傳播行為PROPAGATION_NESTED時有用
txObject.setSavepointAllowed(isNestedTransactionAllowed());
// 以javax.sql.DataSource為鍵,ConnectionHolder為值綁定到線程私有存儲
// 通過TransactionSynchronizationManager統(tǒng)一管理(重要)
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
// 若能獲取到這樣一個ConnectionHolder,自然是一個已存在的,因此是false
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
??開篇介紹ResourceHolder接口的時候,我們曾討論過如何保證多個方法運行在同一個事務(wù)中的話題。若將事務(wù)窄化為數(shù)據(jù)庫事務(wù)的話,自然是要保證多個方法使用同一個JDBC Connection,ConnectionHolder作為ResouceHolder的一種實現(xiàn)就是用來攜帶JDBC Connection的。當然了,除了作為java.sql.Connection的容器以外,ConnectionHolder還有一些可配置屬性,比如設(shè)置rollback-only標記、超時時長等。DataSourceTransactionObject可以說是java.sql.Connection的一個快照,它保存了數(shù)據(jù)庫連接的一系列狀態(tài)(用于后續(xù)對數(shù)據(jù)庫連接的復(fù)原),比如當前的隔離級別、自動提交(auto commit)的取值以及一個指示java.sql.Connection是通過javax.sql.DataSource新創(chuàng)建的還是從線程私有存儲中獲取的標記。這兩個類都比較簡單,大家可以自行查閱一下,這里就不細說了。
??了解了ConnectionHolder和DataSourceTransactionObject以后,再看isExistingTransaction(...)就很簡單了。
@Override
protected boolean isExistingTransaction(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
// 1. 必須持有 ConnectionHolder 實例
// 2. 事務(wù)已開啟,新開的事務(wù)會設(shè)置這個標志
// 滿足這兩個條件就可以認為已經(jīng)有一個已存在的事務(wù)
return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
}
還記得前面給出的小例子嗎?對于服務(wù) A 的service()方法來說,很明顯我們是無法從線程私有存儲中獲取到ConnectionHolder實例的,因此isExistingTransaction(...)必然返回 false。這里我們也先跳過handleExistingTransaction(...),晚點再來分析它。接下來,在沒有任何已知事務(wù)的前提下,我們一起來看看spring-tx是如何根據(jù)傳播行為的配置情況做不同處理的。
??PROPAGATION_MANDATORY要求一定要運行在事務(wù)中,而當前不存在任何事務(wù),因此會拋出異常。PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW和PROPAGATION_NESTED這三種傳播行為在沒有事務(wù)存在時的表現(xiàn)是一致的,它們都會開啟一個新的獨立事務(wù)。新事務(wù)的開啟也意味著對舊事務(wù)(或者說外層事務(wù))的掛起,所以我們看到這第一步便是調(diào)用suspend(...),而這里傳null也是因為當前不存在任何已知事務(wù)。
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction)
throws TransactionException {
// 首先查看 transaction synchronization 是否已激活
if (TransactionSynchronizationManager.isSynchronizationActive()) {
// 執(zhí)行 TransactionSynchronization#suspend() 回調(diào),解除 transaction synchronization 激活狀態(tài)
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
// 若已有事務(wù),調(diào)用模板方法執(zhí)行掛起邏輯
if (transaction != null) {
suspendedResources = doSuspend(transaction);
}
// 將當前事務(wù)的名稱、readOnly標記、隔離級別以及是否事務(wù)已激活進行重置
// 并把這些屬于舊事務(wù)的信息記錄到 SuspendedResourcesHolder , 用于后續(xù)的復(fù)原
String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName(null);
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
TransactionSynchronizationManager.setActualTransactionActive(false);
return new SuspendedResourcesHolder(
suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
}
catch (RuntimeException | Error ex) {
// 事務(wù)掛起失敗不代表失效,需要進行恢復(fù) -> 回調(diào) TransactionSynchronization#resume()
doResumeSynchronization(suspendedSynchronizations);
throw ex;
}
}
// 事務(wù)已激活,但 transaction synchronization 未激活
else if (transaction != null) {
// 這種情況下只需要調(diào)用模板方法掛起事務(wù)即可
Object suspendedResources = doSuspend(transaction);
return new SuspendedResourcesHolder(suspendedResources);
}
// 事務(wù)未激活、transaction synchronization 也未激活,沒有什么需要掛起的
else {
return null;
}
}
??suspend(...)中牽涉到spring-tx事務(wù)同步機制(transaction synchronization mechanism),這部分內(nèi)容我們在開篇中介紹過。如若事務(wù)同步機制已激活,就需要先執(zhí)行對應(yīng)的回調(diào),這也是doSuspendSynchronization()做的工作。接下來才是調(diào)用模板方法doSuspend(...),DataSourceTransactionManager實現(xiàn)了這個方法。
@Override
protected Object doSuspend(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
// 清除持有的 ConnectionHolder
txObject.setConnectionHolder(null);
// 并將其從ThreadLocal中解綁
return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
主要就是將ConnectionHolder從事務(wù)對象和線程私有存儲中移除,然后轉(zhuǎn)移到SuspendedResourcesHolder中用于后續(xù)的恢復(fù)?;氐?code>getTransaction(...),掛起舊事務(wù)后自然是要開啟新事物。
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
// 是否需要激活 transaction synchronization
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 創(chuàng)建一個代表事務(wù)狀態(tài)的對象
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 模板方法,通知新事務(wù)已啟動
doBegin(transaction, definition);
// 按需初始化 transaction synchronization
prepareSynchronization(status, definition);
return status;
}
簡單明了的三步:
- 創(chuàng)建
TransactionStatus實例,它包含了一系列的事務(wù)狀態(tài) - 調(diào)用模板方法
doBegin(...),通知新事務(wù)已啟動 - 按需開啟事務(wù)同步機制
DeaultTransactionStatus就是一個上下文對象或者說信息的集合體,代碼好像沒什么可說的,大家自行查閱一下吧,我們重點來看一下doBegin(...)模板方法。
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
// 事務(wù)對象未持有 ConnectionHolder
// 或事務(wù)對象持有 ConnectionHolder,并且與事務(wù)同步
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
// 獲取一個新的數(shù)據(jù)庫連接
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
// 綁定到事務(wù)對象,注意這里 newConnectionHolder 為 true
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
// 標記 ConnectionHolder 與事務(wù)同步,對應(yīng)上面的檢查,避免過多的獲取數(shù)據(jù)庫連接
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
// 應(yīng)用read-only標記,并返回初始的隔離級別
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
// 應(yīng)用上配置的隔離級別,初始的隔離級別已經(jīng)記錄下來了,后續(xù)可恢復(fù)
txObject.setPreviousIsolationLevel(previousIsolationLevel);
// 打 read-only 標記
txObject.setReadOnly(definition.isReadOnly());
// 事務(wù)的開啟需要設(shè)置自動提交為false
if (con.getAutoCommit()) {
// 記錄下需要在后續(xù)恢復(fù)自動提交
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
// 自動提交設(shè)為false,標識著事務(wù)的開啟
con.setAutoCommit(false);
}
// read-only 標記相關(guān),執(zhí)行 SET TRANSACTION READ ONLY 語句
prepareTransactionalConnection(con, definition);
// 重要:標記事務(wù)已激活,此時新的數(shù)據(jù)庫連接已獲取,并且關(guān)閉了自動提交
txObject.getConnectionHolder().setTransactionActive(true);
// 應(yīng)用超時時長
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// 綁定到線程私有存儲,后續(xù)可通過 TransactionSynchronizationManager#getResource(...) 獲取
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
// 出錯的話,釋放數(shù)據(jù)庫連接并拋出異常
catch (Throwable ex) {
if (txObject.isNewConnectionHolder()) {
DataSourceUtils.releaseConnection(con, obtainDataSource());
txObject.setConnectionHolder(null, false);
}
throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
}
}
doBegin(...)設(shè)置transaction active標記,isExistingTransaction(...)會檢查這個標記,這樣就構(gòu)成了一個閉環(huán)。最后一步是調(diào)用prepareSynchronization(...)。
protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
// 檢查是否是全新開啟的事務(wù)同步機制
// 換言之 TransactionSynchronizationManager.isSynchronizationActive() 需為false
if (status.isNewSynchronization()) {
// 記錄事務(wù)是否已激活,也就是是否有事務(wù)對象
TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
// 記錄當前的隔離級別
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
definition.getIsolationLevel() : null);
// 記錄是否是read-only型事務(wù)
TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
// 記錄事務(wù)名稱
TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
// 激活事務(wù)同步機制
// initSynchronization()調(diào)用之后isSynchronizationActive()就返回true,我們已經(jīng)在suspend(...)中見過這個檢查
TransactionSynchronizationManager.initSynchronization();
}
}
再次回到getTransaction(...),可以看到startTransaction(...)是包裹在 try 塊中的,以便在出現(xiàn)異常時恢復(fù)已有事務(wù),那我們就看看具體是怎么恢復(fù)的唄。
protected final void resume(@Nullable Object transaction,
@Nullable SuspendedResourcesHolder resourcesHolder) throws TransactionException {
// 如果有掛起的事務(wù)
if (resourcesHolder != null) {
// 掛起的資源,也就是doSuspend(...)中解綁的ConnectionHolder
Object suspendedResources = resourcesHolder.suspendedResources;
// 調(diào)用模板方法,重新綁定ConnectionHolder到ThreadLocal
if (suspendedResources != null) {
doResume(transaction, suspendedResources);
}
// suspend(...)的逆操作
List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
if (suspendedSynchronizations != null) {
TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
// 回調(diào)TransactionSynchronization#resume(),重新激活 transaction synchronization
doResumeSynchronization(suspendedSynchronizations);
}
}
}
@Override
protected void doResume(@Nullable Object transaction, Object suspendedResources) {
// 重新綁定
TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
}
基本上是suspend(...)的逆操作,至此,創(chuàng)建新事務(wù)的流程就分析完了。等等,你以為這就完了?還記得我們的小例子嗎?ServiceB說這才哪到哪,好戲才剛剛開始呢。沒錯,對ServiceB而言,ServiceA#service()已經(jīng)開啟事務(wù)了,等到它調(diào)用service()的時候,isExistingTransaction(...)就返回 true 而不是 false 了,那spring-tx是如何處理當前已存在事務(wù)的情況呢?
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
// PROPAGATION_NEVER不允許運行在事務(wù)中,因此拋出異常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
// PROPAGATION_NOT_SUPPORTED不支持在事務(wù)中運行
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
// 掛起當前事務(wù),以非事務(wù)方式運行
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
// 創(chuàng)建不支持事務(wù)的 TransactionStatus,因此 newTransaction 為 false
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
// PROPAGATION_REQUIRES_NEW需要開啟新事務(wù)
// 流程和之前分析的一致,只不過這里不再傳 null 了而已
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
// 掛起已有事務(wù)
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
// 開啟新事務(wù)
return startTransaction(definition, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error beginEx) {
// 出現(xiàn)異常后恢復(fù)已有事務(wù)
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
// PROPAGATION_NESTED在已有事務(wù)的情況下是創(chuàng)建嵌套事務(wù),也就是Savepoint
// 限于篇幅,Savepoint相關(guān)的我們就不講了,用得也不是那么多
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
if (useSavepointForNestedTransaction()) {
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
status.createAndHoldSavepoint();
return status;
}
else {
return startTransaction(definition, transaction, debugEnabled, null);
}
}
// 剩下來還能發(fā)揮作用的就只有PROPAGATION_SUPPORTS 和 PROPAGATION_REQUIRED 了
// 這兩在已有事務(wù)的情況下行為是一致的,都是加入已有事務(wù)
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
// 檢驗事務(wù)配置
if (isValidateExistingTransaction()) {
// 隔離級別需保持一致
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
// read-only 標記要一致
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 創(chuàng)建加入已有事務(wù)的 TransactionStatus,因此 newTransaction 為false
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
protected final DefaultTransactionStatus prepareTransactionStatus(
TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
// 創(chuàng)建 TransactionStatus
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);
// 按需初始化事務(wù)同步機制
prepareSynchronization(status, definition);
return status;
}
handleExistingTransaction(...)也對不同的傳播行為做了處理,至于其中涉及到的方法調(diào)用,前面都已經(jīng)分析過了。各位同學(xué)在自己看源碼的時候,建議動手調(diào)試調(diào)試,徹底梳理清楚其間的狀態(tài)標志及其轉(zhuǎn)換為好。
rollback
??嘟嘟嘟,CRUD出異常了,這會該回滾事務(wù)了。
@Override
public final void rollback(TransactionStatus status) throws TransactionException {
// 同一個事務(wù)不能多次回滾
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
// 處理回滾,unexpected用于標記這是不是一個預(yù)期之外的回滾
// 方法執(zhí)行出現(xiàn)異常,且符合rollbackOn配置,那么回滾是正常的,否則就是不正常的
processRollback(defStatus, false);
}
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
// 預(yù)期之外的異常?
boolean unexpectedRollback = unexpected;
try {
// 回調(diào) TransactionSynchronization#beforeCompletion()
triggerBeforeCompletion(status);
// 有Savepoint,回滾到保存點
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
status.rollbackToHeldSavepoint();
}
// 獨立事務(wù)
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
// 調(diào)用模板方法進行回滾
doRollback(status);
}
else {
// 非獨立事務(wù),加入的一個已存在的事務(wù)
if (status.hasTransaction()) {
// 已標記為rollback-only,出現(xiàn)于多個事務(wù)方法的嵌套調(diào)用,比如 A -> B -> C,且傳播行為的配置不會導(dǎo)致開啟多個事務(wù)
// 或者配置了外層事務(wù)會因為內(nèi)層事務(wù)的異常而回滾,默認true
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
// 調(diào)用模板方法設(shè)置rollback-only標記
doSetRollbackOnly(status);
}
else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
}
else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
// 不需要fail-fast,清楚標志,后續(xù)不拋異常
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
}
catch (RuntimeException | Error ex) {
// 回調(diào) TransactionSynchronization#afterCompletion(...),狀態(tài)未知
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
// 回調(diào) TransactionSynchronization#afterCompletion(...),已回滾
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
// 拋出異常
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
}
finally {
// 清理
cleanupAfterCompletion(status);
}
}
回滾分為三種情況:
- 嵌套事務(wù)(創(chuàng)建有
Savepoint)回滾 - 獨立事務(wù)回滾
- 非獨立事務(wù)回滾
篇幅有限,Savepoint就不說了,重點看2、3兩種情況。對于獨立事務(wù),調(diào)用doRollback(...)進行回滾,基于java.sql.Connection的回滾機制。
@Override
protected void doRollback(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
}
try {
// 回滾
con.rollback();
}
catch (SQLException ex) {
throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
}
}
對于非獨立事務(wù),調(diào)用doSetRollbackOnly(...),僅僅是打上rollback-only標記用于后續(xù)的檢測,processRollback(...)本身就對這個標記進行了檢測。
@Override
protected void doSetRollbackOnly(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
if (status.isDebug()) {
logger.debug("Setting JDBC transaction [" + txObject.getConnectionHolder().getConnection() +
"] rollback-only");
}
// 代理給ConnectionHolder,打上rollback-only標記
txObject.setRollbackOnly();
}
能做的都做了,最后該清理現(xiàn)場了。
private void cleanupAfterCompletion(DefaultTransactionStatus status) {
// 標記事務(wù)已完成
status.setCompleted();
// 若開啟了事務(wù)同步機制,清除相關(guān)的線程私有變量
if (status.isNewSynchronization()) {
TransactionSynchronizationManager.clear();
}
// 若是獨立事務(wù),調(diào)用模板方法進行清理
if (status.isNewTransaction()) {
doCleanupAfterCompletion(status.getTransaction());
}
// 若還存在外層事務(wù)
if (status.getSuspendedResources() != null) {
if (status.isDebug()) {
logger.debug("Resuming suspended transaction after completion of inner transaction");
}
// 恢復(fù)外層事務(wù)
Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
}
}
/**
* 基本上是 doBegin(...)的逆操作
*/
@Override
protected void doCleanupAfterCompletion(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
// 解綁
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
Connection con = txObject.getConnectionHolder().getConnection();
try {
// 如有必要,恢復(fù)自動提交
if (txObject.isMustRestoreAutoCommit()) {
con.setAutoCommit(true);
}
// 恢復(fù)原有隔離級別和readOnly標記
DataSourceUtils.resetConnectionAfterTransaction(
con, txObject.getPreviousIsolationLevel(), txObject.isReadOnly());
}
catch (Throwable ex) {
logger.debug("Could not reset JDBC Connection after transaction", ex);
}
// 若是獨立事務(wù),釋放連接
if (txObject.isNewConnectionHolder()) {
if (logger.isDebugEnabled()) {
logger.debug("Releasing JDBC Connection [" + con + "] after transaction");
}
DataSourceUtils.releaseConnection(con, this.dataSource);
}
// 清除ConnectionHolder的狀態(tài)
txObject.getConnectionHolder().clear();
}
commit
??要是沒出什么幺蛾子,就可以提交事務(wù)了。
@Override
public final void commit(TransactionStatus status) throws TransactionException {
// 同一個事務(wù)不能重復(fù)提交
if (status.isCompleted()) {
throw new IllegalTransactionStateException(
"Transaction is already completed - do not call commit or rollback more than once per transaction");
}
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
// 當前事務(wù)被標記為rollback-only
// 何時打上rollback-only標記可參考對 rollback 的分析
if (defStatus.isLocalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Transactional code has requested rollback");
}
// 執(zhí)行回滾而不是提交
processRollback(defStatus, false);
return;
}
// 全局回滾,XA事務(wù)可能會有吧
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
if (defStatus.isDebug()) {
logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
}
// 執(zhí)行回滾,unexpected為true
// 比如一個XA事務(wù),事務(wù)A執(zhí)行正常,事務(wù)B執(zhí)行異常
// 此時這個大事務(wù)是不能提交的,事務(wù)協(xié)調(diào)者會標記全局rollback-only
// 這樣就可以讓原本正常的事務(wù)A也回滾而不是提交
processRollback(defStatus, true);
return;
}
// 終于可以提交了,開心??
processCommit(defStatus);
}
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
// 準備提交,擴展點
prepareForCommit(status);
// 回調(diào) TransactionSynchronization#beforeCommit()
triggerBeforeCommit(status);
// 回調(diào) TransactionSynchronization#beforeCompletion()
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
// Savepoint,忽略不說了
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
unexpectedRollback = status.isGlobalRollbackOnly();
status.releaseHeldSavepoint();
}
// 獨立事務(wù)
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
// 讀取全局rollback-only標記
unexpectedRollback = status.isGlobalRollbackOnly();
// 調(diào)用模板方法進行提交
doCommit(status);
}
// fail-fast,讀取全局rollback-only標記
else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
// 拋出異常,如果全局標記有 rollback-only
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
// 回調(diào) TransactionSynchronization#afterCompletion(...)
// 對應(yīng)上面拋出的異常,此時事務(wù)是回滾的而不是已提交
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// 如果需要在提交失敗時進行回滾的話,默認false
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
}
else {
// 回調(diào) TransactionSynchronization#afterCompletion(...),狀態(tài)未知
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException | Error ex) {
// 確保 TransactionSynchronization#beforeCompletion()得到執(zhí)行
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
// 回滾事務(wù),如果提交失敗的話,注意與前面遇到TransactionException時處理的異同
doRollbackOnCommitException(status, ex);
throw ex;
}
// 回調(diào) TransactionSynchronization#afterCommit()
try {
triggerAfterCommit(status);
}
finally {
// 回調(diào) TransactionSynchronization#afterCompletion(...),狀態(tài)已提交
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
// 清理
cleanupAfterCompletion(status);
}
}
??commit相對來講是比較復(fù)雜的,它牽涉到對rollback-only標記的處理。這里解釋一下global rollback only,區(qū)別于local rollback only,這個標記多用于多事務(wù)協(xié)作的情況,比如兩階段提交協(xié)議下的分布式事務(wù)。
??區(qū)別于rollback,commit不需要處理非獨立事務(wù)的情況,內(nèi)層事務(wù)不需要提交,它只需要等待外層事務(wù)提交即可。對于獨立事務(wù),模板方法doCommit(...)也只是簡單的代理給java.sql.Connection的提交機制。
@Override
protected void doCommit(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Committing JDBC transaction on Connection [" + con + "]");
}
try {
con.commit();
}
catch (SQLException ex) {
// processCommit(...)對doCommit(...)拋出的TransactionException有特殊處理
throw new TransactionSystemException("Could not commit JDBC transaction", ex);
}
}
后記
??最后回顧一下我們的小例子,現(xiàn)在它還有秘密嗎?本篇我們一起探索了PlatformTransactionManeger抽象下的事務(wù)管理機制,篇幅較長,相信耐心看完的你一定有所收獲??