本文和大家一起刨析 Spring 事務的相關源碼,篇幅較長,代碼片段較多,建議使用電腦閱讀
本文目標
- 理解Spring事務管理核心接口
- 理解Spring事務管理的核心邏輯
- 理解事務的傳播類型及其實現原理
版本
SpringBoot 2.3.3.RELEASE
什么是事務的傳播?
Spring 除了封裝了事務控制之外,還抽象出了 事務的傳播 這個概念,事務的傳播并不是關系型數據庫所定義的,而是Spring在封裝事務時做的增強擴展,可以通過@Transactional 指定事務的傳播,具體類型如下
| 事務傳播行為類型 | 說明 |
|---|---|
PROPAGATION_REQUIRED |
如果當前沒有事務,就新建一個事務,如果已經存在一個事務中,加入到這個事務中。Spring的默認事務傳播類型 |
PROPAGATION_SUPPORTS |
支持當前事務,如果當前沒有事務,就以非事務方式執(zhí)行。 |
PROPAGATION_MANDATORY |
使用當前的事務,如果當前沒有事務,就拋出異常。 |
PROPAGATION_REQUIRES_NEW |
新建事務,如果當前存在事務,把當前事務掛起(暫停)。 |
PROPAGATION_NOT_SUPPORTED |
以非事務方式執(zhí)行操作,如果當前存在事務,就把當前事務掛起。 |
PROPAGATION_NEVER |
以非事務方式執(zhí)行,如果當前存在事務,則拋出異常。 |
PROPAGATION_NESTED |
如果當前存在事務,則在嵌套事務內執(zhí)行。如果當前沒有事務,則執(zhí)行與PROPAGATION_REQUIRED類似的操作。 |
舉個栗子
以嵌套事務為例
@Service
public class DemoServiceImpl implements DemoService {
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private DemoServiceImpl self;
@Transactional
@Override
public void insertDB() {
String sql = "INSERT INTO sys_user(`id`, `username`) VALUES (?, ?)";
jdbcTemplate.update(sql, uuid(), "taven");
try {
// 內嵌事務將會回滾,而外部事務不會受到影響
self.nested();
} catch (Exception e) {
e.printStackTrace();
}
}
@Transactional(propagation = Propagation.NESTED)
@Override
public void nested() {
String sql = "INSERT INTO sys_user(`id`, `username`) VALUES (?, ?)";
jdbcTemplate.update(sql, uuid(), "nested");
throw new RuntimeException("rollback nested");
}
private String uuid() {
return UUID.randomUUID().toString();
}
}
上述代碼中,nested()方法標記了事務傳播類型為嵌套,如果nested()中拋出異常僅會回滾nested()方法中的sql,不會影響到insertDB()方法中已經執(zhí)行的sql
注意:service 調用內部方法時,如果直接使用this調用,事務不會生效。因此使用this調用相當于跳過了外部的代理類,所以AOP不會生效,無法使用事務
思考
眾所周知,Spring 事務是通過AOP實現的,如果是我們自己寫一個AOP控制事務,該怎么做呢?
// 偽代碼
public Object invokeWithinTransaction() {
// 開啟事務
connection.beginTransaction();
try {
// 反射執(zhí)行方法
Object result = invoke();
// 提交事務
connection.commit();
return result;
} catch(Exception e) {
// 發(fā)生異常時回滾
connection.rollback();
throw e;
}
}
在這個基礎上,我們來思考一下如果是我們自己做的話,事務的傳播該如何實現
以PROPAGATION_REQUIRED為例,這個似乎很簡單,我們判斷一下當前是否有事務(可以考慮使用ThreadLocal存儲已存在的事務對象),如果有事務,那么就不開啟新的事務。反之,沒有事務,我們就創(chuàng)建新的事務
如果事務是由當前切面開啟的,則提交/回滾事務,反之不做處理
那么事務傳播中描述的掛起(暫停)當前事務,和內嵌事務是如何實現的?
源碼入手
要閱讀事務傳播相關的源碼,我們先來了解下Spring 事務管理的核心接口與類
TransactionDefinition
該接口定義了事務的所有屬性(隔離級別,傳播類型,超時時間等等),我們日常開發(fā)中經常使用的@Transactional其實最終會被轉化為 TransactionDefinitionTransactionStatus
事務的狀態(tài),以最常用的實現 DefaultTransactionStatus 為例,該類存儲了當前的事務對象,savepoint,當前掛起的事務,是否完成,是否僅回滾等等TransactionManager
這是一個空接口,直接繼承他的 interface 有 PlatformTransactionManager(我們平時用的就是這個,默認的實現類DataSourceTransactionManager)以及
ReactiveTransactionManager(響應式事務管理器,由于不是本文重點,我們不多說)
從上述兩個接口來看,TransactionManager 的主要作用
- 通過TransactionDefinition開啟一個事務,返回TransactionStatus
- 通過TransactionStatus 提交、回滾事務(實際開啟事務的Connection通常存儲在TransactionStatus中)
public interface PlatformTransactionManager extends TransactionManager {
TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException;
void commit(TransactionStatus status) throws TransactionException;
void rollback(TransactionStatus status) throws TransactionException;
}
- TransactionInterceptor
事務攔截器,事務AOP的核心類(支持響應式事務,編程式事務,以及我們常用的標準事務),由于篇幅原因,本文只討論標準事務的相關實現
下面我們從事務邏輯的入口 TransactionInterceptor 入手,來看下Spring事務管理的核心邏輯以及事務傳播的實現
TransactionInterceptor
TransactionInterceptor 實現了MethodInvocation(這是實現AOP的一種方式),其核心邏輯在父類TransactionAspectSupport 中,方法位置:TransactionInterceptor::invokeWithinTransaction
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// If the transaction attribute is null, the method is non-transactional.
TransactionAttributeSource tas = getTransactionAttributeSource();
// 當前事務的屬性 TransactionAttribute extends TransactionDefinition
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
// 事務屬性中可以定義當前使用哪個事務管理器
// 如果沒有定義就去Spring上下文找到一個可用的 TransactionManager
final TransactionManager tm = determineTransactionManager(txAttr);
// 省略了響應式事務的處理 ...
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
Object retVal;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
// 如果有下一個攔截器則執(zhí)行,最終會執(zhí)行到目標方法,也就是我們的業(yè)務代碼
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
// 當捕獲到異常時完成當前事務 (提交或者回滾)
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
cleanupTransactionInfo(txInfo);
}
if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
TransactionStatus status = txInfo.getTransactionStatus();
if (status != null && txAttr != null) {
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
}
// 根據事務的狀態(tài)提交或者回滾
commitTransactionAfterReturning(txInfo);
return retVal;
}
// 省略了編程式事務的處理 ...
}
這里代碼很多,根據注釋的位置,我們可以把核心邏輯梳理出來
- 獲取當前事務屬性,事務管理器(以注解事務為例,這些都可以通過
@Transactional來定義) -
createTransactionIfNecessary,判斷是否有必要創(chuàng)建事務 -
invocation.proceedWithInvocation執(zhí)行攔截器鏈,最終會執(zhí)行到目標方法 -
completeTransactionAfterThrowing當拋出異常后,完成這個事務,提交或者回滾,并拋出這個異常 -
commitTransactionAfterReturning從方法命名來看,這個方法會提交事務。
但是深入源碼中會發(fā)現,該方法中也包含回滾邏輯,具體行為會根據當前TransactionStatus的一些狀態(tài)來決定(也就是說,我們也可以通過設置當前TransactionStatus,來控制事務回滾,并不一定只能通過拋出異常),詳見AbstractPlatformTransactionManager::commit
我們繼續(xù),來看看createTransactionIfNecessary做了什么
TransactionAspectSupport::createTransactionIfNecessary
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
// If no name specified, apply method identification as transaction name.
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
// 通過事務管理器開啟事務
status = tm.getTransaction(txAttr);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
createTransactionIfNecessary中的核心邏輯
- 通過PlatformTransactionManager(事務管理器)開啟事務
-
prepareTransactionInfo準備事務信息,這個具體做了什么我們稍后再講
繼續(xù)來看PlatformTransactionManager::getTransaction,該方法只有一個實現 AbstractPlatformTransactionManager::getTransaction
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
// Use defaults if no transaction definition given.
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
// 獲取當前事務,該方法有繼承 AbstractPlatformTransactionManager 的子類自行實現
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
// 如果目前存在事務
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(def, transaction, debugEnabled);
}
// Check definition settings for new transaction.
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}
// 傳播類型PROPAGATION_MANDATORY, 要求當前必須有事務
// No existing transaction found -> check propagation behavior to find out how to proceed.
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
// PROPAGATION_REQUIRED, PROPAGATION_REQUIRES_NEW, PROPAGATION_NESTED 不存在事務時創(chuàng)建事務
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
// 開啟事務
return startTransaction(def, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + def);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
}
代碼很多,重點關注注釋部分即可
-
doGetTransaction獲取當前事務 - 如果存在事務,則調用
handleExistingTransaction處理,這個我們稍后會講到
接下來,會根據事務的傳播決定是否開啟事務
- 如果事務傳播類型為
PROPAGATION_MANDATORY,且不存在事務,則拋出異常 - 如果傳播類型為
PROPAGATION_REQUIRED, PROPAGATION_REQUIRES_NEW, PROPAGATION_NESTED,且當前不存在事務,則調用startTransaction創(chuàng)建事務 - 當不滿足 3、4時,例如
PROPAGATION_NOT_SUPPORTED,此時會執(zhí)行事務同步,但是不會創(chuàng)建真正的事務
Spring 事務同步在之前一篇博客中有講到,傳送門??http://www.itdecent.cn/p/7880d9a98a5f
Spring 如何管理當前的事務
接下來講講上面提到的doGetTransaction、handleExistingTransaction,這兩個方法是由不同的TransactionManager自行實現的
我們以SpringBoot默認的TransactionManager,DataSourceTransactionManager為例
@Override
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
@Override
protected boolean isExistingTransaction(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
}
結合 AbstractPlatformTransactionManager::getTransaction 一起來看,doGetTransaction 其實獲取的是當前的Connection。
判斷當前是否存在事務,是判斷DataSourceTransactionObject 對象中是否包含connection,以及connection是否開啟了事務。
我們繼續(xù)來看下TransactionSynchronizationManager.getResource(obtainDataSource())獲取當前connection的邏輯
TransactionSynchronizationManager::getResource
private static final ThreadLocal<Map<Object, Object>> resources =
new NamedThreadLocal<>("Transactional resources");
@Nullable
// TransactionSynchronizationManager::getResource
public static Object getResource(Object key) {
// DataSourceTransactionManager 調用該方法時,以數據源作為key
// TransactionSynchronizationUtils::unwrapResourceIfNecessary 如果key為包裝類,則獲取被包裝的對象
// 我們可以忽略該邏輯
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Object value = doGetResource(actualKey);
if (value != null && logger.isTraceEnabled()) {
logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" +
Thread.currentThread().getName() + "]");
}
return value;
}
/**
* Actually check the value of the resource that is bound for the given key.
*/
@Nullable
private static Object doGetResource(Object actualKey) {
Map<Object, Object> map = resources.get();
if (map == null) {
return null;
}
Object value = map.get(actualKey);
// Transparently remove ResourceHolder that was marked as void...
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
map.remove(actualKey);
// Remove entire ThreadLocal if empty...
if (map.isEmpty()) {
resources.remove();
}
value = null;
}
return value;
}
看到這里,我們能明白DataSourceTransactionManager是如何管理線程之間的Connection,ThreadLocal 中存儲一個Map,key為數據源對象,value為該數據源在當前線程的Connection

DataSourceTransactionManager 在開啟事務后,會調用TransactionSynchronizationManager::bindResource將指定數據源的Connection綁定到當前線程
AbstractPlatformTransactionManager::handleExistingTransaction
我們繼續(xù)回頭看,如果存在事務的情況,如何處理
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
// 如果事務的傳播要求以非事務方式執(zhí)行 拋出異常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
// PROPAGATION_NOT_SUPPORTED 如果存在事務,則掛起當前事務,以非事務方式執(zhí)行
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
// 掛起當前事務
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
// 構建一個無事務的TransactionStatus
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
// PROPAGATION_REQUIRES_NEW 如果存在事務,則掛起當前事務,新建一個事務
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
return startTransaction(definition, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
// PROPAGATION_NESTED 內嵌事務,就是我們開頭舉得例子
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
// 非JTA事務管理器都是通過savePoint實現的內嵌事務
// savePoint:關系型數據庫中事務可以創(chuàng)建還原點,并且可以回滾到還原點
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
// 創(chuàng)建還原點
status.createAndHoldSavepoint();
return status;
}
else {
// Nested transaction through nested begin and commit/rollback calls.
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction.
return startTransaction(definition, transaction, debugEnabled, null);
}
}
// 如果執(zhí)行到這一步傳播類型一定是,PROPAGATION_SUPPORTS 或者 PROPAGATION_REQUIRED
// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
// 校驗目前方法中的事務定義和已存在的事務定義是否一致
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 構建一個TransactionStatus,但不開啟事務
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
這里代碼很多,邏輯看上述注釋即可。這里終于看到了期待已久的掛起事務和內嵌事務了,我們還是看一下DataSourceTransactionManager的實現
- 掛起事務:通過
TransactionSynchronizationManager::unbindResource根據數據源獲取當前的Connection,并在resource中移除該Connection。之后會將該Connection存儲到TransactionStatus對象中
// DataSourceTransactionManager::doSuspend
@Override
protected Object doSuspend(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
txObject.setConnectionHolder(null);
return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
在事務提交或者回滾后,調用 AbstractPlatformTransactionManager::cleanupAfterCompletion會將TransactionStatus 中緩存的Connection重新綁定到resource中
- 內嵌事務:通過關系型數據庫的savePoint實現,提交或回滾的時候會判斷如果當前事務為savePoint則釋放savePoint或者回滾到savePoint,具體邏輯參考
AbstractPlatformTransactionManager::processRollback和AbstractPlatformTransactionManager::processCommit
至此,事務的傳播源碼分析結束
prepareTransactionInfo
上文留下了一個問題,prepareTransactionInfo 方法做了什么,我們先來看下TransactionInfo的結構
protected static final class TransactionInfo {
@Nullable
private final PlatformTransactionManager transactionManager;
@Nullable
private final TransactionAttribute transactionAttribute;
private final String joinpointIdentification;
@Nullable
private TransactionStatus transactionStatus;
@Nullable
private TransactionInfo oldTransactionInfo;
// ...
}
該類在Spring中的作用,是為了內部傳遞對象。ThreadLocal中存儲了最新的TransactionInfo,通過當前TransactionInfo可以找到他的oldTransactionInfo。每次創(chuàng)建事務時會新建一個TransactionInfo(無論有沒有真正的事務被創(chuàng)建)存儲到ThreadLocal中,在每次事務結束后,會將當前ThreadLocal中的TransactionInfo重置為oldTransactionInfo,這樣的結構形成了一個鏈表,使得Spring事務在邏輯上可以無限嵌套下去
如果覺得有收獲,可以關注我的公眾號,你的點贊和關注就是對我最大的支持