本篇文章是對Mybatis知識點的一個擴(kuò)展,主要一起來研究下Spring是如何來管理事務(wù)的。順便再多聊一點其他的知識點,在學(xué)習(xí)的過程中主要帶著以下問題來進(jìn)行有目的的學(xué)習(xí)
然后最終來回答下面這些問題。
- Mybatis是如何整合進(jìn)Spring中的
- Spring如何知道哪些接口是Mapper接口的?
- Mapper接口是如何變成Spring Bean的?
- Spring在哪里聲明的SqlSession的實現(xiàn)邏輯?
- Spring中聲明式事務(wù)的實現(xiàn)方式是怎樣的?
- Spring中如何處理嵌套事務(wù)的?
- Spring中事務(wù)的傳播方式是如何實現(xiàn)的?
https://cloud.tencent.com/developer/article/1497631
一、如何整合進(jìn)Spring中的
默認(rèn)大家對Spring都比較了解了,這里只說結(jié)果。都知道接口是不能被實例化的,那么接口是如何成為Bean的呢?
1.1 如何知道哪些是Mybatis的接口呢?
-
@MapperScanSpring中在配置類上加上這個注解。根據(jù)源碼能看到還導(dǎo)入了MapperScannerRegistrar
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
@Documented
@Import(MapperScannerRegistrar.class)
@Repeatable(MapperScans.class)
public @interface MapperScan {}
public class MapperScannerRegistrar implements ImportBeanDefinitionRegistrar, ResourceLoaderAware {}
MapperScannerRegistrar 會在配置類解析時候拿到MapperScan注解信息,并解析里面的參數(shù)。生成一個 MapperScannerConfigurer 信息。
從源碼中能看到Mybatis的很多配置信息,都會被注入到MapperScannerConfigurer中。

public class MapperScannerConfigurer
implements BeanDefinitionRegistryPostProcessor, InitializingBean, ApplicationContextAware, BeanNameAware {}
實現(xiàn)自BeanDefinitionRegistryPostProcessor會前置,拿到MapperScan中的basePackage,最終通過ClassPathMapperScanner掃描并添加到
BeanDefinitionRegistry中。

到這里這種方式就能知道哪些是Mybatis中的Mapper接口了。
還有第二種方式當(dāng)發(fā)現(xiàn)Spring容器中沒有MapperScannerConfigurer。會自動注入一個

會直接指定哪些類被Mapper修飾,就將他生成Bean。

好了,到這里就知道如何來確定那些接口是要生成Mybatis接口的了。下面看下個問題。
1.2 Mapper接口是如何變成Spring Bean的?
接口是不能被實例化的,但是在Spring中如何想讓接口實例化就可以使用 FactoryBean + 動態(tài)代理的方式,實現(xiàn)接口類的實例化。
- 首先利用 ClassPathBeanDefinitionScanner 找到符合規(guī)則的類生成 BeanDefinition。
- 給 BeanDefinition 指定BeanClass,執(zhí)行 FactoryBean 是
MapperFactoryBean

二、Spring在哪里聲明的SqlSession的實現(xiàn)邏輯?
通過Mybatis的學(xué)習(xí)知道SqlSession一共有2個包裝類。SqlSessionManager和SqlSessionTemplate。那么SqlSession是在哪里指定用哪個的呢?
答案就在 MapperFactoryBean
public class MapperFactoryBean<T> extends SqlSessionDaoSupport implements FactoryBean<T> {
private SqlSessionTemplate sqlSessionTemplate;
public void setSqlSessionFactory(SqlSessionFactory sqlSessionFactory) {
if (this.sqlSessionTemplate == null || sqlSessionFactory != this.sqlSessionTemplate.getSqlSessionFactory()) {
this.sqlSessionTemplate = createSqlSessionTemplate(sqlSessionFactory);
}
}
@SuppressWarnings("WeakerAccess")
protected SqlSessionTemplate createSqlSessionTemplate(SqlSessionFactory sqlSessionFactory) {
return new SqlSessionTemplate(sqlSessionFactory);
}
}
三、Spring中聲明式事務(wù)的實現(xiàn)方式是怎樣的
看了Mybatis中事務(wù)這一章節(jié),知道如果使用了SqlSessionTemplate,那么事務(wù)的權(quán)限就外包給了Spring。那么Spring中事務(wù)怎么處理的呢?
終于進(jìn)入正題了。Spring中提供兩種事務(wù)的能力。
- 聲明式事務(wù)
- 編程式事務(wù)
3.1 聲明式事務(wù)
使用 Transactional 修飾方法,其主要實現(xiàn)是使用切面實現(xiàn)。
-
TransactionAspectSupport#invokeWithinTransaction。攔截方法。獲取事務(wù)管理器。
這里我們先來思考下,通過前面的學(xué)習(xí)知道事務(wù)的最底層實現(xiàn)是jdbc驅(qū)動來實現(xiàn)的。

那么切面中要想實現(xiàn),就必須保證切面中的線程執(zhí)行的數(shù)據(jù)庫操作,一定是同一個SqlSession這樣才能在方法正常執(zhí)行時候做commit,異常時候做rollback操作。

那我們看下他是如何保證切面中的數(shù)據(jù)庫操作一定是同一個SqlSession的吧。這部分邏輯就在 SqlSessionTemplate 中。

- 獲取當(dāng)前線程是否已經(jīng)有SqlSession了,如果有就直接使用,這樣就保證在切面中的事務(wù)用的是同一個事務(wù)了。
3.2 編程式事務(wù)
TransactionTemplate#execute
編程是事務(wù)需要實現(xiàn)者自己來管理事務(wù)的,Spring提供的擴(kuò)展接口類是 CallbackPreferringPlatformTransactionManager。如果發(fā)現(xiàn)容器中默認(rèn)的事務(wù)管理類是這個
就直接調(diào)動全局的這個事務(wù)管理方法。如果不是就自己來處理。這種設(shè)計的好處是,事務(wù)管理器既可以做關(guān)系型數(shù)據(jù)庫的事務(wù)管理,也可以滿足一些特定場景的事務(wù)控制(eg: 給Kafka的邏輯做一個事務(wù)管理)。

四、Spring中如何處理嵌套事務(wù)的?
什么是嵌套事務(wù),舉一個偽代碼的例子。下面 saveUser 代碼中有2個Mapper。但是有幾個SqlSession呢?
UserMapper userMapper;
RegistroyMapper registoryMapper;
@Transactional(rollbackFor = {Throwable.class, RuntimeException.class, ExecutionException.class})
public void save(User user){
userMapper.save(user);
}
@Transactional(rollbackFor = {Throwable.class, RuntimeException.class, ExecutionException.class})
public void saveUser(String userName,Strign password){
User user = registoryMapper.regis(userName,password);
save(user);
}
通過上面的學(xué)習(xí)我們了解到如果是Spring來管理的事務(wù)是一個線程對應(yīng)一個SqlSession。所以說上面?zhèn)未a中的兩個Mapper
其實是用的同一個SqlSession,這樣才能保證是在同一個事務(wù)中。核心代碼邏輯就在這里 SqlSessionUtils#getSqlSession。
從Spring中的事務(wù)管理器中獲取 SqlSession。是否使用同一個事務(wù),外包給Spring容器去托管。這就給Spring提供了很多可以發(fā)揮的空間。
比如說傳播機(jī)制等。
public static SqlSession getSqlSession(SqlSessionFactory sessionFactory, ExecutorType executorType,
PersistenceExceptionTranslator exceptionTranslator) {
notNull(sessionFactory, NO_SQL_SESSION_FACTORY_SPECIFIED);
notNull(executorType, NO_EXECUTOR_TYPE_SPECIFIED);
SqlSessionHolder holder = (SqlSessionHolder) TransactionSynchronizationManager.getResource(sessionFactory);
SqlSession session = sessionHolder(executorType, holder);
if (session != null) {
return session;
}
LOGGER.debug(() -> "Creating a new SqlSession");
session = sessionFactory.openSession(executorType);
registerSessionHolder(sessionFactory, executorType, exceptionTranslator, session);
return session;
}
五、Spring中事務(wù)的傳播方式是如何實現(xiàn)的?
| 傳播方式 | 說明 | 常用 |
|---|---|---|
| TransactionDefinition.PROPAGATION_REQUIRED | 如果存在一個事務(wù),則支持當(dāng)前事務(wù)。如果沒有事務(wù)則開啟 | ? |
| TransactionDefinition.PROPAGATION_SUPPORTS | 如果存在一個事務(wù),支持當(dāng)前事務(wù)。如果沒有事務(wù),則非事務(wù)的執(zhí)行 | |
| TransactionDefinition.PROPAGATION_MANDATORY | 如果已經(jīng)存在一個事務(wù),支持當(dāng)前事務(wù)。如果沒有一個活動的事務(wù),則拋出異常 | |
| TransactionDefinition.PROPAGATION_NEVER | 總是非事務(wù)地執(zhí)行,如果存在一個活動事務(wù),則拋出異常 | |
| TransactionDefinition.PROPAGATION_NOT_SUPPORTED | 總是非事務(wù)地執(zhí)行,并掛起任何存在的事務(wù) | |
| TransactionDefinition.PROPAGATION_REQUIRES_NEW | 總是開啟一個新的事務(wù)。如果一個事務(wù)已經(jīng)存在,則將這個存在的事務(wù)掛起。 | |
| TransactionDefinition.PROPAGATION_NESTED | 如果一個活動的事務(wù)存在,則運行在一個嵌套的事務(wù)中. 如果沒有活動事務(wù)則按TransactionDefinition.PROPAGATION_REQUIRED 屬性執(zhí)行 |
思考傳播機(jī)制如何實現(xiàn)
首先我們先思考下傳播機(jī)制是如何實現(xiàn)的,因為我們知道 要保證是同一個事務(wù),那么一定是同一個SqlSession,這樣才能保證是同一個事務(wù)。
而如果要新開事務(wù),就要先將當(dāng)前線程綁定的SqlSession等事務(wù)信息,給掛起,那么是如何進(jìn)行掛起的呢? SqlSession又是如何跟線程綁定的呢?
5.1 SqlSession是如何跟線程綁定的呢?
通過TransactionSynchronizationManager中的ThreadLocal跟線程綁定(new NamedThreadLocal<>("Transactional resources"))。注意: 如果主線程下創(chuàng)建子線程是不能綁定上的。
private static void registerSessionHolder(SqlSessionFactory sessionFactory, ExecutorType executorType,
PersistenceExceptionTranslator exceptionTranslator, SqlSession session) {
SqlSessionHolder holder = new SqlSessionHolder(session, executorType, exceptionTranslator);
TransactionSynchronizationManager.bindResource(sessionFactory, holder);
TransactionSynchronizationManager
.registerSynchronization(new SqlSessionSynchronization(holder, sessionFactory));
holder.setSynchronizedWithTransaction(true);
holder.requested();
}
5.2 事務(wù)是如何嵌套的?
答案就在 TransactionAspectSupport#TransactionInfo 中。一個事務(wù)注解對應(yīng)一個TransactionInfo,如果出現(xiàn)嵌套
就會生成一個事務(wù)鏈。如下圖一樣。

當(dāng)里層的事務(wù)處理完成后會執(zhí)行清理動作,同時在將第一個的事務(wù)在進(jìn)行恢復(fù)跟線程綁定。
private void restoreThreadLocalStatus() {
// Use stack to restore old transaction TransactionInfo.
// Will be null if none was set.
transactionInfoHolder.set(this.oldTransactionInfo);
}
5.3 事務(wù)是如何掛起的?
前面知道每一個 @Transaction 注解會對應(yīng)一個 TransactionAspectSupport#TransactionInfo。而事務(wù)掛起后,會先跟線程進(jìn)行解綁。
然后掛起的事務(wù) SuspendedResourcesHolder 會被添加在 TransactionStatus 中。
掛起的數(shù)據(jù)保存在哪里
protected final class TransactionInfo {
// 事務(wù)管理器
@Nullable
private final PlatformTransactionManager transactionManager;
// 事務(wù)信息
@Nullable
private final TransactionAttribute transactionAttribute;
// 切面點
private final String joinpointIdentification;
// DefaultTransactionStatus
@Nullable
private TransactionStatus transactionStatus;
@Nullable
private TransactionInfo oldTransactionInfo;
}
public class DefaultTransactionStatus extends AbstractTransactionStatus {
@Nullable
private final Object transaction;
private final boolean newTransaction;
private final boolean newSynchronization;
private final boolean readOnly;
private final boolean debug;
@Nullable
private final Object suspendedResources;
}
如何進(jìn)行掛起的
TransactionSynchronization 事務(wù)同步器,為了解決事務(wù)的傳播方式
- suspend 暫定事務(wù),將事務(wù)從當(dāng)前線程上解綁
- resume 恢復(fù)事務(wù),將事務(wù)從新恢復(fù)到當(dāng)前線程上
- beforeCommit 觸發(fā)提交事務(wù),執(zhí)行commit
- beforeCompletion 事務(wù)提交后
- afterCommit 提交后
- afterCompletion 完成后調(diào)用
SqlSessionSynchronization 也是跟當(dāng)前線程綁定的
- 位置
TransactionSynchronizationManager#ThreadLocal<Set<TransactionSynchronization>> synchronizations
// 掛起時候,將SqlSessionHolder與當(dāng)前線程進(jìn)行解綁
@Override
public void suspend() {
if (this.holderActive) {
LOGGER.debug(() -> "Transaction synchronization suspending SqlSession [" + this.holder.getSqlSession() + "]");
TransactionSynchronizationManager.unbindResource(this.sessionFactory);
}
}
/**
* 恢復(fù)時候重新跟當(dāng)前線程綁定
*/
@Override
public void resume() {
if (this.holderActive) {
LOGGER.debug(() -> "Transaction synchronization resuming SqlSession [" + this.holder.getSqlSession() + "]");
TransactionSynchronizationManager.bindResource(this.sessionFactory, this.holder);
}
}
5.4 傳播方式具體實現(xiàn)
下面這段代碼就是事務(wù)注解的切面處理類,Spring事務(wù)的所有邏輯和擴(kuò)展支持都在這里。
TransactionAspectSupport#invokeWithinTransaction
首先我們先看整體的邏輯
- 獲取當(dāng)切面上的
@Transaction注解信息 - 根據(jù)注解信息找到指定的事務(wù)管理器,如果沒有執(zhí)行就使用默認(rèn)的
- 生成事務(wù)信息
TransactionInfo傳播機(jī)制,事務(wù)掛起都在這個類上 - 失敗執(zhí)行回滾&成功提交&如果是嵌套事務(wù),從
TransactionInfo中將掛起的事務(wù)重新跟線程進(jìn)行綁定
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// If the transaction attribute is null, the method is non-transactional.
TransactionAttributeSource tas = getTransactionAttributeSource();
// 獲取被事務(wù)注解標(biāo)記的事務(wù)信息
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
// 根據(jù)事務(wù)注解上指定的事務(wù)管理器名稱,去系統(tǒng)中獲取,如果沒有就拿系統(tǒng)中默認(rèn)的事務(wù)管理器
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
// 切面攔截點: com.alibaba.purchase.domain.replenish.impl.ReplenishDomainWriteServiceImpl.mockSave
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
// 這里只看關(guān)系型數(shù)據(jù)的的事務(wù)邏輯。CallbackPreferringPlatformTransactionManager是具有回調(diào)性質(zhì)的事務(wù)管理器,多用于處理自定的事務(wù)
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
// 獲取事務(wù)的信息,包含傳播方式
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
cleanupTransactionInfo(txInfo);
}
commitTransactionAfterReturning(txInfo);
return retVal;
}
}
這里只看傳播機(jī)制吧。AbstractPlatformTransactionManager#handleExistingTransaction
- TransactionDefinition.PROPAGATION_NEVER 如果存在事務(wù)就報錯
- TransactionDefinition.PROPAGATION_NOT_SUPPORTED 如果有事務(wù),就掛起(當(dāng)前事務(wù)跟線程解綁)。不使用事務(wù)進(jìn)行執(zhí)行。
- TransactionDefinition.PROPAGATION_REQUIRES_NEW 當(dāng)前事務(wù)掛起,新開個事務(wù)。
/**
* Create a TransactionStatus for an existing transaction.
*/
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
// TransactionDefinition.PROPAGATION_NEVER(總是非事務(wù)地執(zhí)行,如果存在一個活動事務(wù),則拋出異常)就直接阻斷報錯
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
// TransactionDefinition.PROPAGATION_NOT_SUPPORTED 總是非事務(wù)地執(zhí)行,并掛起任何存在的事務(wù)
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
// 數(shù)據(jù)暫存在TransactionSynchronizationManager#synchronizations同步器中
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
// 總是開啟一個新的事務(wù)。如果一個事務(wù)已經(jīng)存在,則將這個存在的事務(wù)掛起。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
// 如果有事務(wù)存在,則運行在一個嵌套的事務(wù)中. 如果沒有活動事務(wù)則按TransactionDefinition.PROPAGATION_REQUIRED 屬性執(zhí)行
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
// 使用當(dāng)前事務(wù),并增加當(dāng)前事務(wù)的一次引用。
status.createAndHoldSavepoint();
return status;
}
else {
// Nested transaction through nested begin and commit/rollback calls.
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction.
// 沒有新建一個事務(wù)
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, null);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
}
// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
//
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
5.5 嵌套事務(wù)如何知道是否要提交
當(dāng)兩個Mapper中使用的是同一個SqlSession,那么會不會第二個事務(wù)在執(zhí)行后,就直接commit了呢,此時第一個事務(wù)有一次commit。導(dǎo)致異常呢?
解決方案在這里 DefaultTransactionStatus
第二個事務(wù)狀態(tài)中
- newTransaction = false
-
newSynchronization = false
而下面代碼中會做校驗,只需要同步時候才會提交事務(wù)。
protected final void triggerBeforeCommit(DefaultTransactionStatus status) {
if (status.isNewSynchronization()) {
if (status.isDebug()) {
logger.trace("Triggering beforeCommit synchronization");
}
TransactionSynchronizationUtils.triggerBeforeCommit(status.isReadOnly());
}
}
第一個事務(wù)狀態(tài)中
- newTransaction = true
-
newSynchronization = true
才會真正的去執(zhí)行。
5.6 這樣設(shè)計是否線程安全
線程安全只有在多線程環(huán)境下才會出現(xiàn)。那么這里一定會有多線程問題。而事務(wù)是跟線程進(jìn)行綁定的,所以這里雖然有多線程但是不會有線程安全問題。
但是這里我們看源碼線程綁定時候使用的ThreadLocal,所以你在線程中創(chuàng)建子線程或者是線程中使用線程池,這里的事務(wù)都不會共享的。
本文由mdnice多平臺發(fā)布

