Spring 4.3 源碼分析之 事務組件概述

1. Spring Tx 概述

首先我們常說的事務其實是數(shù)據(jù)庫(比如: Mysql) 的事務, 而 Spring 做的事情其實是針對不同 orm (Mybatis | Hibernate)進行封裝, 抽成統(tǒng)一的方法 獲取數(shù)據(jù)庫連接, 進行數(shù)據(jù)庫操作, 進行事務的提交|回滾 <- 對其實就是這么簡單.

2. Spring Tx 主要組件 TransactionDefinition

TransactionDefinition 是一個接口, 其主要包含了 Spring 中對事務傳播類別的定義, 隔離級別的定義, 代碼如下:

public interface TransactionDefinition {

    // 若當前線程不存在事務中, 則開啟一個事務, 若當前存在事務, 則加入其中
    int PROPAGATION_REQUIRED = 0;

    // 若當前存在事務, 則加入到事務中, 若不存在, 則以非事務的方式運行
    int PROPAGATION_SUPPORTS = 1;

    // 若有事務, 則用當前的事務, 若沒有, 則直接拋出異常
    int PROPAGATION_MANDATORY = 2;

    // 若當前存在事務, 則掛起事務, 若當前不存在事務, 則開啟一個新事務運行
    int PROPAGATION_REQUIRES_NEW = 3;

    // 不支持以事務的方式運行, 若當前存在事務, 則將當前的事務掛起
    int PROPAGATION_NOT_SUPPORTED = 4;

    // 不支持事務, 若當前線程含有事務, 則直接拋出異常
    int PROPAGATION_NEVER = 5;

    // 這個時在原來的事務中通過 savepoint 的方式 開啟一個局部事務
    int PROPAGATION_NESTED = 6;

    // 默認隔離級別
    int ISOLATION_DEFAULT = -1;

    // read_uncommitted 級別
    int ISOLATION_READ_UNCOMMITTED = Connection.TRANSACTION_READ_UNCOMMITTED;

    // READ_COMMITTED 級別
    int ISOLATION_READ_COMMITTED = Connection.TRANSACTION_READ_COMMITTED;

    // REPEATABLE_READ 級別
    int ISOLATION_REPEATABLE_READ = Connection.TRANSACTION_REPEATABLE_READ;

    // SERIALIZABLE 級別
    int ISOLATION_SERIALIZABLE = Connection.TRANSACTION_SERIALIZABLE;

    // 默認超時時間
    int TIMEOUT_DEFAULT = -1;

    // 獲取傳播行為
    int getPropagationBehavior();

    //  獲取隔離級別
    int getIsolationLevel();

    // 獲取超時時間
    int getTimeout();

    // 事務是否是只讀模式
    boolean isReadOnly();

    // 返回事務的名字
    String getName();
}

其默認的實現(xiàn)是 DefaultTransactionDefinition, 在這個類中主要對一些值增加了默認值的賦值, 并增加一些對屬性賦值的設定, 其中有個賦值工具類 Constants, 可以通過這個類將 字符串轉為數(shù)字.

3. Spring Tx 主要組件 TransactionStatus

TransactionStatus 這個接口中定義了事務執(zhí)行過程中的一些屬性, 是否有savePoint, 是否 rollBackOnly, 事務是否已經(jīng)完成; 而其抽象類中主要是完成 savePoint 以及 rollBackOnly 的具體實現(xiàn); 其默認的實現(xiàn)類是 DefaultTransactionStatus, 在這個類中有:

// 事務連接器, 比如 DataSourceTransactionManager 中的 DataSourceTransactionObject
private final Object transaction;
// 是否是新事務
private final boolean newTransaction;
// 是否開啟 事務同步器 <- 其實就是在 TransactionSynchronousManager 中注冊屬性信息
private final boolean newSynchronization;
// 這個事務是否是 readOnly
private final boolean readOnly;
// debug模式
private final boolean debug;
//  suspend 的上個事務的信息, suspendedResources 可能是 null
private final Object suspendedResources;
4. Spring Tx 主要組件 PlatformTransactionManager

PlatformTransactionManager 這個接口中定義了 Spring 執(zhí)行事務的主方法:

public interface PlatformTransactionManager {
    // 開始事務
    TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException;
    // 提交事務
    void commit(TransactionStatus status) throws TransactionException;
    // 回滾事務
    void rollback(TransactionStatus status) throws TransactionException;
}

對于上面的接口, 在抽象類 AbstractPlatformTransactionManager 中進行了相應的實現(xiàn)(PS: 其實這里主要運用了 策略模式 + 模版模式),
其主要屬性有:

// 永遠激活 事務同步器
public static final int SYNCHRONIZATION_ALWAYS = 0;

// 只有在有事務時才激活事務同步器
public static final int SYNCHRONIZATION_ON_ACTUAL_TRANSACTION = 1;

// Never active transaction synchronization, not even for actual transactions.
// 從不激活事務同步器
public static final int SYNCHRONIZATION_NEVER = 2;

/** Constants instance for AbstractPlatformTransactionManager */
// 可通過 Constants 設置AbstractPlatformTransactionManager中的屬性 --> 類似于 BeanWrapper
private static final Constants constants = new Constants(AbstractPlatformTransactionManager.class);

// 是否 開啟事務同步器支持
private int transactionSynchronization = SYNCHRONIZATION_ALWAYS;

// 事務默認的超時時間
private int defaultTimeout = TransactionDefinition.TIMEOUT_DEFAULT;

// 嵌套式事務是否允許
private boolean nestedTransactionAllowed = false;

// 是否校驗 是否存在事務
private boolean validateExistingTransaction = false;

// 分布式事務中的 rollback 屬性
private boolean globalRollbackOnParticipationFailure = true;

// 分布式事務中的 rollback 屬性
private boolean failEarlyOnGlobalRollbackOnly = false;

// 在 commit 過程中若出現(xiàn) 異常是否會 rollback
private boolean rollbackOnCommitFailure = false;

在這個類中實現(xiàn)了事務操作的主要方法;

5. AbstractPlatformTransactionManager 開啟事務方法

其主邏輯如下:

1. 獲取事務連接器, 比如 DataSourceTransactionManager 中就是 DataSourceTransactionObject 對象(存放的是 connect, savePoint, 是否是新事務)
2. 若不存在 TransactionDefinition, 則創(chuàng)建一個默認的 TransactionDefinition
3. 判斷當前是否存在事務中
    3.1 如果當前已經(jīng)存在事務, 且當前事務的傳播屬性設置為 PROPAGATION_NEVER, 那么拋出異常
    3.2 如果當前事務的配置屬性是 PROPAGATION_NOT_SUPPORTED, 同時當前線程已經(jīng)存在事務了, 那么將事務掛起, 并且封裝 TransactionStatus
    3.3 如果當前事務的配置屬性是 PROPAGATION_REQUIRES_NEW, 創(chuàng)建新事務, 同時將當前線程存在的事務掛起, 與創(chuàng)建全新事務的過程類是, 區(qū)別在于在創(chuàng)建全新事務時不用考慮已有事務的掛起, 但在這里, 需要考慮已有事務的掛起
    3.4  嵌套事務的處理, 創(chuàng)建 TransactionStatus, 創(chuàng)建保存點
    3.5 對于那些沒有匹配的傳播級別, 默認的封裝以下 TransactionStatus
4. 檢查事務屬性中 timeout 的設置是否合理  <-- 這里的 timeout 只在 DataSourceUtils.applyTransactionTimeout 中看到有對應的檢查工作
5. 如果當前線程不存在事務, 但是 propagationBehavior 被設置為 PROPAGATION_MANDATORY 拋棄異常
6. PROPAGATION_REQUIRED, PROPAGATION_REQUIRES_NEW, PROPAGATION_NEXTED 都需要新建事務
7.  創(chuàng)建 TransactionStatus, 開始事務, 準備 TransactionSynchronous

與之對應的代碼如下:

public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException {
    // 這個 doGetTransaction() 抽象函數(shù), transaction對象的獲取 由具體的事務處理器實現(xiàn), 比如 DataSourceTransactionManager
    // 這里是 DataSourceTransactionObject 對象(存放的是 connect, savePoint, 是否是新事務)
    Object transaction = doGetTransaction();

    // Cache debug flag to avoid repeated checks.
    boolean debugEnabled = logger.isDebugEnabled();

    // 關于這個 DefaultTransactionDefinition, 在前面編程式使用事務處理的時候遇到過, 這個 DefaultTransactionDefinition 的默認事務處理屬性是
    // propagationBehavior = PROPAGATION_REQUIRED; isolationLevel = ISOLATION_DEFAULT; timeout=TIMEOUT_DEFAULT; readlyOnly = false
    if (definition == null) {                       // 如果參數(shù) definition == null -> 則創(chuàng)建一個默認的 TransactionDefinition <- 這其實就是一個事務屬性配置的對象
        // Use defaults if no transaction definition given.
        definition = new DefaultTransactionDefinition();
    }

    // 判斷當前被線程是否存在事務, 判斷依據(jù)為當前線程記錄的連接不為空且連接中(connectionHolder) 中的 tranactionActive 屬性 = true
    // 如果已經(jīng)存在事務, 那么需要要根據(jù)在事務屬性中定義的事務傳播屬性配置來處理事務
    if (isExistingTransaction(transaction)) {
        // 這里對當前線程中已經(jīng)由事務存在的情況進行處理, 所有的處理結果都封裝在 TransactionStatus 中
        // Existing transaction found -> check propagation behavior to find out how to behave.
        return handleExistingTransaction(definition, transaction, debugEnabled);
    }

    // 檢查事務屬性中 timeout 的設置是否合理  <-- 這里的 timeout 只在 DataSourceUtils.applyTransactionTimeout 中看到有對應的檢查工作
    // Check definition settings for new transaction.
    if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
        throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
    }

    // 如果當前線程不存在事務, 但是 propagationBehavior 被設置為 PROPAGATION_MANDATORY 拋棄異常
    // No existing transaction found -> check propagation behavior to find out how to proceed.
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
        throw new IllegalTransactionStateException(
                "No existing transaction found for transaction marked with propagation 'mandatory'");
    }
    // PROPAGATION_REQUIRED, PROPAGATION_REQUIRES_NEW, PROPAGATION_NEXTED 都需要新建事務
    else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
            definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
            definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {

        // 空掛起 <- 這里掛起 (PS: 有同學可能疑問, 上面明明判斷了是否在事務中, 這里沒有必要掛起, 但有種情況 txObject.getConnectionHolder() != null && txObject.getConnectionHolder().isTransactionActive() == false)
        SuspendedResourcesHolder suspendedResources = suspend(null); // <- 這里掛起其實主要還是 TransactionSynchronizationManager 中配置的資源
        if (debugEnabled) {
            logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
        }
        try {
            // 是否開啟一個新的 事務同步器
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            // 創(chuàng)建事務狀態(tài) DefaultTransactionStatus
            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            // 構造 transaction, 包括設置 ConnectionHolder, 隔離級別, timeout, 如果是新連接, 綁定到當前線程
            doBegin(transaction, definition);
            // 新同步事務的設置, 針對當前線程的設置
            prepareSynchronization(status, definition);
            return status;
        }
        catch (RuntimeException ex) {
            resume(null, suspendedResources);  // resume 掛起來的 資源
            throw ex;
        }
        catch (Error err) {
            resume(null, suspendedResources);  // resume 掛起來的 資源
            throw err;
        }
    }
    else {
        // 這里其實就是沒有開啟事務, 相比上面的 if 中, 就少了 doBegin 函數(shù)的調用
        // Create "empty" transaction: no actual transaction, but potentially synchronization.
        if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
            logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                    "isolation level will effectively be ignored: " + definition);
        }
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);    // 關鍵是這里第二個參數(shù)是 nul
    }
}

下面是處理已經(jīng)存在事務那部分:

private TransactionStatus handleExistingTransaction(
        TransactionDefinition definition, Object transaction, boolean debugEnabled)
        throws TransactionException {

    // 如果當前已經(jīng)存在事務, 且當前事務的傳播屬性設置為 PROPAGATION_NEVER, 那么拋出異常
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
        throw new IllegalTransactionStateException(
                "Existing transaction found for transaction marked with propagation 'never'");
    }

    // 如果當前事務的配置屬性是 PROPAGATION_NOT_SUPPORTED, 同時當前線程已經(jīng)存在事務了, 那么將事務掛起
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
        if (debugEnabled) {
            logger.debug("Suspending current transaction");
        }
        // 將事務的掛起 <- 其實就是 TransactionSynchronizationManager 中的屬性
        Object suspendedResources = suspend(transaction);
        // 是否開啟一個新的事務同步器
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);

        // 意味著事務方法不需要放在事務環(huán)境中執(zhí)行, 同時掛起事務的信息保存在 TransactionStatus 中, 這里包括了, 進程 ThreadLocal 對事務信息的記錄
        return prepareTransactionStatus( // 注意這里第二個參數(shù)是 null
                definition, null, false, newSynchronization, debugEnabled, suspendedResources);
    }

    // 如果當前事務的配置屬性是 PROPAGATION_REQUIRES_NEW, 創(chuàng)建新事務, 同時將當前線程存在的事務掛起, 與創(chuàng)建全新事務的過程類是, 區(qū)別在于在創(chuàng)建全新事務時不用考慮已有事務的掛起, 但在這里, 需要考慮已有事務的掛起
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
        if (debugEnabled) {
            logger.debug("Suspending current transaction, creating new transaction with name [" +
                    definition.getName() + "]");
        }
        // 將事務的掛起 <- 其實就是 TransactionSynchronizationManager 中的屬性
        SuspendedResourcesHolder suspendedResources = suspend(transaction);
        try {
            // 是否開啟一個新的事務同步器
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            // 掛起事務的信息記錄保存在 TransactionStatus 中,  這里包括ThreadLocal 對事務信息的記錄
            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            // 構造 transaction, 包括設置 ConnectionHolder, 隔離級別, timeout, 如果是新連接, 綁定到當前線程
            doBegin(transaction, definition);
            // 新同步事務的設置, 針對當前線程的設置
            prepareSynchronization(status, definition);
            return status;
        }
        catch (RuntimeException beginEx) { // 拋出異常的話, 直接恢復 剛才掛起的事務
            resumeAfterBeginException(transaction, suspendedResources, beginEx);
            throw beginEx;
        }
        catch (Error beginErr) {
            resumeAfterBeginException(transaction, suspendedResources, beginErr);
            throw beginErr;
        }
    }

    // 嵌套事務的處理, 創(chuàng)建 TransactionStatus, 創(chuàng)建保存點
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        if (!isNestedTransactionAllowed()) {                        // 檢查是否允許嵌套事務
            throw new NestedTransactionNotSupportedException(
                    "Transaction manager does not allow nested transactions by default - " +
                    "specify 'nestedTransactionAllowed' property with value 'true'");
        }
        if (debugEnabled) {
            logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
        }
        if (useSavepointForNestedTransaction()) {
            // 如果沒有可以使用保存點的方式控制事務回滾, 那么在嵌套式事務的建立初始建立保存點
            // Create savepoint within existing Spring-managed transaction,
            // through the SavepointManager API implemented by TransactionStatus.
            // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
            // 在 Spring 管理的事務中, 創(chuàng)建事務保存點
            DefaultTransactionStatus status =
                    prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);   //  <-- 這里的兩個 false 分別表示 是否是新事務, 新的事務同步器
            status.createAndHoldSavepoint();
            return status;
        }
        else {
            // Nested transaction through nested begin and commit/rollback calls.
            // Usually only for JTA: Spring synchronization might get activated here
            // in case of a pre-existing JTA transaction.
            // 有些情況是不能使用保存點操作, 比如 JTA, 那么就建立新事務
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            // 掛起事務的信息記錄保存在 TransactionStatus 中,  這里包括ThreadLocal 對事務信息的記錄
            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, null);
            // 構造 transaction, 包括設置 ConnectionHolder, 隔離級別, timeout, 如果是新連接, 綁定到當前線程
            doBegin(transaction, definition);
            // 新同步事務的設置, 針對當前線程的設置
            prepareSynchronization(status, definition);
            return status;
        }
    }

    // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
    if (debugEnabled) {
        logger.debug("Participating in existing transaction");
    }
    // 對已經(jīng)存在的事務的屬性進行校驗
    if (isValidateExistingTransaction()) {
        // 隔離級別的校驗 <- 是否一致性 TransactionDefinition 與 TransactionSynchronizationManager 中的值
        if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
            Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
            if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                Constants isoConstants = DefaultTransactionDefinition.constants;
                throw new IllegalTransactionStateException("Participating transaction with definition [" +
                        definition + "] specifies isolation level which is incompatible with existing transaction: " +
                        (currentIsolationLevel != null ?
                                isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                "(unknown)"));
            }
        }
        // readOnly 的校驗 <- 是否一致性 TransactionDefinition 與 TransactionSynchronizationManager 中的值
        if (!definition.isReadOnly()) {
            if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                throw new IllegalTransactionStateException("Participating transaction with definition [" +
                        definition + "] is not marked as read-only but existing transaction is");
            }
        }
    }
    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    //  返回 TransactionStatus 注意第三個參數(shù) false 表示 當前事務沒有使用新事務
    return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
6. AbstractPlatformTransactionManager 提交事務方法

在提交事務時, 主要是放在 TransactionSynchronousManager 中回調函數(shù)的調用, 事務的提交, savePoint 的處理

private void processCommit(DefaultTransactionStatus status) throws TransactionException {
    try {
        boolean beforeCompletionInvoked = false;
        try {
            // 事務提交的準備工作由具體的事務處理器完成
            prepareForCommit(status);                   // 預留方法, 留給子類區(qū)擴充
            triggerBeforeCommit(status);                // 添加的 TransactionSynchronization 中的對應方法 beforeCommit 的調用
            triggerBeforeCompletion(status);            // 添加的 TransactionSynchronization 中的對應方法 beforeCompletion 的調用
            beforeCompletionInvoked = true;
            boolean globalRollbackOnly = false;         // 手動設置回回滾 <- 這里是 針對分布式事務
            if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
                globalRollbackOnly = status.isGlobalRollbackOnly();
            }
            if (status.hasSavepoint()) {                // 若是嵌套事務, 則直接釋放保存點
                if (status.isDebug()) {
                    logger.debug("Releasing transaction savepoint");
                }
                status.releaseHeldSavepoint();          // 如果存在保存點則清除保存點信息
            }
            /**
             * 下面對當前線程中保存的事務狀態(tài)進行處理, 如果當前的事務是一個新事務, 調用具體的事務處理器完成提交
             * 如果當前所持有的事務不是新事務, 則不提交, 由已有的事務來完成提交
             */
            else if (status.isNewTransaction()) {       // 若是新事務, 則直接提交
                if (status.isDebug()) {
                    logger.debug("Initiating transaction commit");
                }
                doCommit(status);                       // 進行事務的提交 <- 這是個抽象方法, 交由子類實現(xiàn)
            }
            // Throw UnexpectedRollbackException if we have a global rollback-only
            // marker but still didn't get a corresponding exception from commit.
            if (globalRollbackOnly) {                   // 分布式事務里面手動設置了 rollbackOnly 則直接拋出異常 <-- 這個現(xiàn)在很少使用了
                throw new UnexpectedRollbackException(
                        "Transaction silently rolled back because it has been marked as rollback-only");
            }
        }
        catch (UnexpectedRollbackException ex) {
            // can only be caused by doCommit
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
            throw ex;
        }
        catch (TransactionException ex) {
            // can only be caused by doCommit
            if (isRollbackOnCommitFailure()) {         // 在 commit 提交異常時, 是否需要進行回滾操作
                doRollbackOnCommitException(status, ex);
            }
            else {                                     // 添加的 TransactionSynchronization 中的對應方法 afterCompletion 的調用
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
            }
            throw ex;
        }
        catch (RuntimeException ex) {                  // 提交的過程中, 若出現(xiàn) RuntimeException 則直接回滾
            if (!beforeCompletionInvoked) {
                triggerBeforeCompletion(status);
            }
            doRollbackOnCommitException(status, ex);
            throw ex;
        }
        catch (Error err) {
            if (!beforeCompletionInvoked) {
                triggerBeforeCompletion(status);      // 添加的 TransactionSynchronization 中的對應方法 beforeCompletion 的調用
            }
            doRollbackOnCommitException(status, err); // 提交過程中出現(xiàn)異常則回滾
            throw err;
        }
        // Trigger afterCommit callbacks, with an exception thrown there
        // propagated to callers but the transaction still considered as committed.
        // 觸發(fā) AfterCommit 回滾
        try {
            triggerAfterCommit(status);               // 添加的 TransactionSynchronization 中的對應方法 afterCommit 的調用
        }
        finally {                                     // 添加的 TransactionSynchronization 中的對應方法 afterCompletion 的調用
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
        }
    }
    finally {                                          // commit 完成, 這里做一些清理數(shù)據(jù)的工作
        cleanupAfterCompletion(status);
    }
}

上面是事務提交的主流程, 流程總體清晰, 與之而來的出現(xiàn)了一個新的角色 TransactionSynchronizationManager

7. Spring Tx 組件 TransactionSynchronizationManager

在事務執(zhí)行的過程中, 需要保存很多變量值, 包括一些回調函數(shù) TransactionSynchronization

private static final ThreadLocal<Map<Object, Object>> resources =                    // key 是 dataSource, value 是 ConnectionHolder, 這里的 Map 是為了解決, 同一個線程操作多個 DataSource 而準備de
        new NamedThreadLocal<Map<Object, Object>>("Transactional resources");

private static final ThreadLocal<Set<TransactionSynchronization>> synchronizations = // 存儲 TransactionSynchronization <- 這里面存儲的都是回調函數(shù)
        new NamedThreadLocal<Set<TransactionSynchronization>>("Transaction synchronizations");

private static final ThreadLocal<String> currentTransactionName =                    // 當前事務的名稱
        new NamedThreadLocal<String>("Current transaction name");

private static final ThreadLocal<Boolean> currentTransactionReadOnly =               // 當前事務是否是 readOnly
        new NamedThreadLocal<Boolean>("Current transaction read-only status");

private static final ThreadLocal<Integer> currentTransactionIsolationLevel =         // 當前事務的隔離級別
        new NamedThreadLocal<Integer>("Current transaction isolation level");

private static final ThreadLocal<Boolean> actualTransactionActive =                  // 當前線程是否已經(jīng)在事務中
        new NamedThreadLocal<Boolean>("Actual transaction active");

每次在事務 suspend 或 resume 時, 其實操作的就是通過TransactionSynchronizationManager 將屬性放在 ThreadLocal 中

8. Spring Tx 組件 TransactionAttributeSource

TransactionAttributeSource 它是事務屬性獲取器(PS: 這里出現(xiàn)了 TransactionAttribute這個對象, 其實就是 TransactionDefinition 加上 一些其他屬性), 主要的 TransactionAttributeSource 有如下:

1. NameMatchTransactionAttributeSource:   通過將 Properties 里面的屬性轉化成 methodName <--> TransactionAttribute 的TransactionAttributeSource
2. MethodMapTransactionAttributeSource:   通過配置文件配置 className.methodName <--> TransactionAttribute 形式注入的 MethodMapTransactionAttributeSource
3. MatchAlwaysTransactionAttributeSource: 只要是用戶定義的方法就返回 true 的 TransactionAttributeSource
4. CompositeTransactionAttributeSource:   組合多個 TransactionAttributeSource, 只要其中有一個獲取 TransactionAttribute, 就 OK
5. AnnotationTransactionAttributeSource:  通過獲取方法上的注解信息來獲知 事務的屬性, 解析主要由 SpringTransactionAnnotationParser 來進行

在上面幾個類中, AnnotationTransactionAttributeSource 是我們最長使用的 TransactionDefinition 的解析器, 它內部其實蠻簡單的, 主要還是通過 SpringTransactionAnnotationParser 解析方法上注解 @Transactional 中的信息來獲得事務屬性

9. Spring Tx 組件 TransactionAspectSupport

這是事務支持的一個工具類, 其也是 TransactionInterceptor 的父類, 在這個類中定義了執(zhí)行事務的主邏輯 -> 方法 invokeWithinTransaction (PS: 其實就是aop 中的 aroundAdvice)

protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
        throws Throwable {
    // If the transaction attribute is null, the method is non-transactional.
    // 這里讀取事務的屬性和設置, 通過 TransactionAttributeSource 對象取得
    final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
    // 獲取 beanFactory 中的 transactionManager
    final PlatformTransactionManager tm = determineTransactionManager(txAttr);
    // 構造方法唯一標識(類, 方法, 如 service.UserServiceImpl.save)
    final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

    /**
     * 這里區(qū)分不同類型的 PlatformTransactionManager 因為它們的調用方式不同
     * 對 CallbackPreferringPlatformTransactionManager 來說, 需要回調函數(shù)來
     * 實現(xiàn)事務的創(chuàng)建和提交
     * 對于非 CallbackPreferringPlatformTransactionManager 來說, 不需要通過
     * 回調函數(shù)來實現(xiàn)事務的創(chuàng)建和提交
     * 像 DataSourceTransactionManager 就不是 CallbackPreferringPlatformTransactionManager
     * 不需要通過回調的方式來使用
     */
    if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
        // Standard transaction demarcation with getTransaction and commit/rollback calls.
        // 這里創(chuàng)建事務, 同時把創(chuàng)建事務過程中得到的信息放到 TransactionInfo 中去 (創(chuàng)建事務的起點)
        TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
        Object retVal = null;
        try {
            // This is an around advice: Invoke the next interceptor in the chain.
            // This will normally result in a target object being invoked.
            // 這里的調用使用處理沿著攔截器鏈進行, 使最后目標對象的方法得到調用
            retVal = invocation.proceedWithInvocation();
        }
        catch (Throwable ex) {
            // target invocation exception
            // 如果在事務處理方法調用中出現(xiàn)異常, 事務處理如何進行需要根據(jù)具體的情況考慮回滾或者提交
            completeTransactionAfterThrowing(txInfo, ex);
            throw ex;
        }
        finally {
            // 這里把與線程綁定的 TransactionInfo 設置為 oldTransactionInfo
            cleanupTransactionInfo(txInfo);
        }
        // 這里通過事務處理器來對事務進行提交
        commitTransactionAfterReturning(txInfo);
        return retVal;
    }
}

其實上面的主邏輯還是: 創(chuàng)建事務, 執(zhí)行SQL, 提交/回滾事務

10. Spring Tx 組件 TransactionTemplate

TransactionTemplate 這個類是編程式事務常用的類, 當使用 JdbcTemplate 時其實也是在使用這個 template 類 (PS: TransactionTemplate 繼承 DefaultTransactionDefinition, 其本身就是個TransactionDefinition), 其執(zhí)行邏輯也是獲取 事務, 開始事務, 執(zhí)行 SQL, 提交/回滾事務 <-- 其內部還是使用 PlatformTransactionManager 來完成的

11. 總結

Spring 事務其實只是做了一些上層的封裝, 其有以下幾個設計特點:

1. 通過 ThreadLocal 將事務執(zhí)行過程中的很多屬性(包括回調函數(shù))存儲其中
2. 針對 整個事務的執(zhí)行流程, Spring都封裝到 AbstractPlatformTransactionManager 中, 在其里面定義了事務獲取, 執(zhí)行, 提交的整個邏輯, 并且遺留下一些模版方法給子類實現(xiàn) <- 模版方法, 而其下的各個子類, 又是策略模式
3. 通過 Spring AOP 來實現(xiàn)通過在方法上增加注解實現(xiàn)事務的封裝
4. Spring 定義了程序中事務的傳播行為
12. 參考:

Spring Aop核心源碼分析
Spring技術內幕
Spring 揭秘
Spring 源碼深度分析
開濤 Spring 雜談
傷神 Spring 源碼分析
Spring源碼情操陶冶

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

  • Spring Boot 參考指南 介紹 轉載自:https://www.gitbook.com/book/qbgb...
    毛宇鵬閱讀 47,275評論 6 342
  • Spring 事務屬性分析 事務管理對于企業(yè)應用而言至關重要。它保證了用戶的每一次操作都是可靠的,即便出現(xiàn)了異常的...
    壹點零閱讀 1,382評論 0 2
  • Spring Cloud為開發(fā)人員提供了快速構建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,612評論 19 139
  • 很多人喜歡這篇文章,特此同步過來 由淺入深談論spring事務 前言 這篇其實也要歸納到《常識》系列中,但這重點又...
    碼農戲碼閱讀 4,929評論 2 59
  • 暫且喊我小妮吧,我有個毛病,說大不大,你也可以說是怪癖。我每次寫文章,除了工作外的文稿,我都喜歡先放出一張圖。這是...
    劉小妮同學閱讀 356評論 1 1

友情鏈接更多精彩內容