Spring Tx源碼解析(三)

前言

??上篇我們分析了spring-tx中的AOP部分,包括TransactionAttributeSourcePointcut如何定位潛在的事務(wù)方法,以及TransactionInterceptor又如何結(jié)合PlatformTransactionManager為方法應(yīng)用事務(wù)管理,相信看過上篇的同學(xué)也從中g(shù)et到了使用AOP的新姿勢??

??不過到目前為止,除去開篇中的概念介紹,我們對PlatformTransactionManager還知之甚少,本篇我們就深入一點兒。DataSourceTransactionManager作為最具代表性的 PlatformTransactionManager實現(xiàn),我們就以它為例來看看隱藏在抽象下的實現(xiàn)細節(jié)吧。

一個小例子

??來看一個使用Spring聲明式事務(wù)的小例子。

    @Service
    public class ServiceA {
        @Autowired
        private ServiceB serviceB;
        // Propagation.REQUIRED 有就加入,沒有就自己玩
        @Transactional(propagation = Propagation.REQUIRED)
        public void service() {
            // do some CRUD stuff...
            // then call serviceB#service()
            serviceB.service();
        }
    }

    @Service
    public class ServiceB {
        // Propagation.REQUIRES_NEW 自掃門前雪,各用各的事務(wù)
        @Transactional(propagation = Propagation.REQUIRES_NEW)
        public void service() {
            // do some CRUD stuff...
        }
    }
    
    @RestController
    public class DemoController {
        @Autowired
        private ServiceA serviceA;
        
        @PostMapping("/demo")
        public void demo() {
            serviceA.service();
        }
    }

這里我們有兩個事務(wù)方法,分別屬于ServiceAServiceB,處理請求的入口在DemoController。請求到達后,

  1. 進入服務(wù) A 的service()方法
  2. 開啟事務(wù) tx-a
  3. 執(zhí)行CRUD
  4. 進入服務(wù) B 的service()方法
  5. 掛起 tx-a,開啟新事務(wù) tx-b
  6. 執(zhí)行CRUD
  7. 結(jié)束serviceB#service()調(diào)用,視執(zhí)行情況提交或回滾事務(wù)
  8. 結(jié)束serviceA#service()調(diào)用,視執(zhí)行情況提交或回滾事務(wù)

麻雀雖小五臟俱全,這個小例子也是后續(xù)分析源碼時的參照,先給出來混個臉熟吧。

DataSourceTransactionManager

Hierarchy overview
ds-tx-manager.png

??DataSourceTransactionManager繼承自AbstractPlatformTransactionManager,基類中實現(xiàn)了事務(wù)管理的一整套工作流:

  1. 檢查是否已存在事務(wù)
  2. 應(yīng)用適當?shù)膫鞑バ袨?/li>
  3. 必要時暫停和恢復(fù)事務(wù)
  4. 提交事務(wù)時檢查rollback-only標記
  5. 回滾事務(wù)時的處理——如實回滾或僅設(shè)置rollback-only標記
  6. 觸發(fā)已注冊的transaction synchronization回調(diào)

子類只需要根據(jù)事務(wù)的狀態(tài)實現(xiàn)特定的模板方法即可,比如事務(wù)開始、暫停、恢復(fù)、提交和回滾。

TransactionInterceptor

??事務(wù)攔截器想必大家都很熟悉了,簡化后的模型大致如下:

    // 詳細的分析請看上篇
        public Object invoke(MethodInvocation invocation) throws Throwable {
        Object result = null;
        // 1. 解析@Transactional注解
        TransactionAttribute txAttr = xxx;
        // 2. 獲取事物管理器
        PlatformTransactionManager txManager = xxx;
        // 3. 根據(jù)配置的事務(wù)屬性開啟事務(wù)
        TransactionStatus status = txManager.getTransaction(txAttr);
        try {
            // 4. 執(zhí)行目標方法
            result = invocation.proceed();
        } catch (Throwable ex) {
            // 5-1. 方法異常退出,根據(jù)配置決定是提交還是回滾
            if (txAttr.rollbackOn(ex)) {
                txManager.rollback(status);
            } else {
                txManager.commit(status);
            }
            throw ex;
        }
        // 5-2. 方法正常執(zhí)行完成,提交事務(wù)
        txManager.commit(status);
        return result;
    }

核心流程大致分為5步,其中和事務(wù)管理器有關(guān)的是第3和第5步。前情提要到此結(jié)束,那我們就從第3步getTransaction(...)開始唄~

getTransaction
    @Override
    public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
            throws TransactionException {
        // 使用默認配置,如果沒有的話
        TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
        // 模板方法,子類實現(xiàn)
        Object transaction = doGetTransaction();
        boolean debugEnabled = logger.isDebugEnabled();
        // 檢查是否已存在事務(wù)
        if (isExistingTransaction(transaction)) {
            // 已存在事務(wù),根據(jù)傳播行為來決定如何處理
            return handleExistingTransaction(def, transaction, debugEnabled);
        }
        // 檢查一下超時配置,默認 -1 ,不能更低了
        if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
            throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
        }
        // 當前不存在事務(wù),同樣要根據(jù)傳播行為來決定如何處理
        // PROPAGATION_MANDATORY 要求一定要運行在事務(wù)中,當前事務(wù)不存在,因此這里選擇拋出異常
    if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
            throw new IllegalTransactionStateException(
                    "No existing transaction found for transaction marked with propagation 'mandatory'");
        }
    // PROPAGATION_REQUIRED 在沒有事務(wù)存在時會開啟新的獨立事務(wù)
    // PROPAGATION_REQUIRES_NEW 無論是否有事務(wù)存在都會開啟新的獨立事務(wù)
    // PROPAGATION_NESTED 在沒有事務(wù)存在時和PROPAGATION_REQUIRED的行為一致
    // 換言之,走到這里,這三種傳播行為都會開啟新的獨立事務(wù)
        else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
      // 掛起當前事務(wù),這里傳null是因為當前沒有事務(wù)
      // 仍然調(diào)用suspend的意義在于觸發(fā) TransactionSynchronization#suspend() 回調(diào)
            SuspendedResourcesHolder suspendedResources = suspend(null);
            if (debugEnabled) {
                logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
            }
            try {
        // 開啟新的獨立事務(wù)
                return startTransaction(def, transaction, debugEnabled, suspendedResources);
            }
            catch (RuntimeException | Error ex) {
        // 事務(wù)開啟失敗,恢復(fù)已存在的事務(wù)
        // 同樣為了觸發(fā) TransactionSynchronization 回調(diào),這里是 resume()
                resume(null, suspendedResources);
                throw ex;
            }
        }
        // 其它傳播行為在當前沒有事務(wù)存在時會繼續(xù)以非事務(wù)方式運行,比如PROPAGATION_SUPPORTS
    // 不用為它們開啟新的事務(wù),但需要處理可能注冊的transaction synchronization
        else {
            // Create "empty" transaction: no actual transaction, but potentially synchronization.
            if (def.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
                logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                        "isolation level will effectively be ignored: " + def);
            }
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
        }
    }

??getTransaction(...)首先會調(diào)用模板方法doGetTransaction()來獲取事務(wù)對象(transaction object)。事務(wù)對象雖然特定于具體的PlatformTransactionManeger實現(xiàn),但一般都會攜帶相應(yīng)的事務(wù)狀態(tài),我們來分析一下DataSourceTransactionManager對它的實現(xiàn)。

    @Override
    protected Object doGetTransaction() {
        DataSourceTransactionObject txObject = new DataSourceTransactionObject();
    // 是否允許嵌套事務(wù),傳播行為PROPAGATION_NESTED時有用
        txObject.setSavepointAllowed(isNestedTransactionAllowed());
    // 以javax.sql.DataSource為鍵,ConnectionHolder為值綁定到線程私有存儲
    // 通過TransactionSynchronizationManager統(tǒng)一管理(重要)
        ConnectionHolder conHolder =
                (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
    // 若能獲取到這樣一個ConnectionHolder,自然是一個已存在的,因此是false
        txObject.setConnectionHolder(conHolder, false);
        return txObject;
    }

??開篇介紹ResourceHolder接口的時候,我們曾討論過如何保證多個方法運行在同一個事務(wù)中的話題。若將事務(wù)窄化為數(shù)據(jù)庫事務(wù)的話,自然是要保證多個方法使用同一個JDBC Connection,ConnectionHolder作為ResouceHolder的一種實現(xiàn)就是用來攜帶JDBC Connection的。當然了,除了作為java.sql.Connection的容器以外,ConnectionHolder還有一些可配置屬性,比如設(shè)置rollback-only標記、超時時長等。DataSourceTransactionObject可以說是java.sql.Connection的一個快照,它保存了數(shù)據(jù)庫連接的一系列狀態(tài)(用于后續(xù)對數(shù)據(jù)庫連接的復(fù)原),比如當前的隔離級別、自動提交(auto commit)的取值以及一個指示java.sql.Connection是通過javax.sql.DataSource新創(chuàng)建的還是從線程私有存儲中獲取的標記。這兩個類都比較簡單,大家可以自行查閱一下,這里就不細說了。

??了解了ConnectionHolderDataSourceTransactionObject以后,再看isExistingTransaction(...)就很簡單了。

    @Override
    protected boolean isExistingTransaction(Object transaction) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    // 1. 必須持有 ConnectionHolder 實例
    // 2. 事務(wù)已開啟,新開的事務(wù)會設(shè)置這個標志
    // 滿足這兩個條件就可以認為已經(jīng)有一個已存在的事務(wù)
        return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
    }

還記得前面給出的小例子嗎?對于服務(wù) A 的service()方法來說,很明顯我們是無法從線程私有存儲中獲取到ConnectionHolder實例的,因此isExistingTransaction(...)必然返回 false。這里我們也先跳過handleExistingTransaction(...),晚點再來分析它。接下來,在沒有任何已知事務(wù)的前提下,我們一起來看看spring-tx是如何根據(jù)傳播行為的配置情況做不同處理的。

??PROPAGATION_MANDATORY要求一定要運行在事務(wù)中,而當前不存在任何事務(wù),因此會拋出異常。PROPAGATION_REQUIREDPROPAGATION_REQUIRES_NEWPROPAGATION_NESTED這三種傳播行為在沒有事務(wù)存在時的表現(xiàn)是一致的,它們都會開啟一個新的獨立事務(wù)。新事務(wù)的開啟也意味著對舊事務(wù)(或者說外層事務(wù))的掛起,所以我們看到這第一步便是調(diào)用suspend(...),而這里傳null也是因為當前不存在任何已知事務(wù)。

    protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) 
    throws TransactionException {
    // 首先查看 transaction synchronization 是否已激活
        if (TransactionSynchronizationManager.isSynchronizationActive()) {
      // 執(zhí)行 TransactionSynchronization#suspend() 回調(diào),解除 transaction synchronization 激活狀態(tài)
            List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
            try {
                Object suspendedResources = null;
        // 若已有事務(wù),調(diào)用模板方法執(zhí)行掛起邏輯
                if (transaction != null) {
                    suspendedResources = doSuspend(transaction);
                }
        // 將當前事務(wù)的名稱、readOnly標記、隔離級別以及是否事務(wù)已激活進行重置
        // 并把這些屬于舊事務(wù)的信息記錄到 SuspendedResourcesHolder , 用于后續(xù)的復(fù)原
                String name = TransactionSynchronizationManager.getCurrentTransactionName();
                TransactionSynchronizationManager.setCurrentTransactionName(null);
                boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
                TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
                Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
                boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
                TransactionSynchronizationManager.setActualTransactionActive(false);
                return new SuspendedResourcesHolder(
                        suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
            }
            catch (RuntimeException | Error ex) {
        // 事務(wù)掛起失敗不代表失效,需要進行恢復(fù) -> 回調(diào) TransactionSynchronization#resume()
                doResumeSynchronization(suspendedSynchronizations);
                throw ex;
            }
        }
    // 事務(wù)已激活,但 transaction synchronization 未激活
        else if (transaction != null) {
            // 這種情況下只需要調(diào)用模板方法掛起事務(wù)即可
      Object suspendedResources = doSuspend(transaction);
            return new SuspendedResourcesHolder(suspendedResources);
        }
    // 事務(wù)未激活、transaction synchronization 也未激活,沒有什么需要掛起的
        else {
            return null;
        }
    }

??suspend(...)中牽涉到spring-tx事務(wù)同步機制(transaction synchronization mechanism),這部分內(nèi)容我們在開篇中介紹過。如若事務(wù)同步機制已激活,就需要先執(zhí)行對應(yīng)的回調(diào),這也是doSuspendSynchronization()做的工作。接下來才是調(diào)用模板方法doSuspend(...),DataSourceTransactionManager實現(xiàn)了這個方法。

    @Override
    protected Object doSuspend(Object transaction) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    // 清除持有的 ConnectionHolder
        txObject.setConnectionHolder(null);
    // 并將其從ThreadLocal中解綁
        return TransactionSynchronizationManager.unbindResource(obtainDataSource());
    }

主要就是將ConnectionHolder從事務(wù)對象和線程私有存儲中移除,然后轉(zhuǎn)移到SuspendedResourcesHolder中用于后續(xù)的恢復(fù)?;氐?code>getTransaction(...),掛起舊事務(wù)后自然是要開啟新事物。

    private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
            boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
        // 是否需要激活 transaction synchronization
        boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    // 創(chuàng)建一個代表事務(wù)狀態(tài)的對象
        DefaultTransactionStatus status = newTransactionStatus(
                definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
    // 模板方法,通知新事務(wù)已啟動
        doBegin(transaction, definition);
    // 按需初始化 transaction synchronization
        prepareSynchronization(status, definition);
        return status;
    }

簡單明了的三步:

  1. 創(chuàng)建TransactionStatus實例,它包含了一系列的事務(wù)狀態(tài)
  2. 調(diào)用模板方法doBegin(...),通知新事務(wù)已啟動
  3. 按需開啟事務(wù)同步機制

DeaultTransactionStatus就是一個上下文對象或者說信息的集合體,代碼好像沒什么可說的,大家自行查閱一下吧,我們重點來看一下doBegin(...)模板方法。

  @Override
    protected void doBegin(Object transaction, TransactionDefinition definition) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        Connection con = null;
        try {
      // 事務(wù)對象未持有 ConnectionHolder
      // 或事務(wù)對象持有 ConnectionHolder,并且與事務(wù)同步
            if (!txObject.hasConnectionHolder() ||
                    txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
        // 獲取一個新的數(shù)據(jù)庫連接
                Connection newCon = obtainDataSource().getConnection();
                if (logger.isDebugEnabled()) {
                    logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
                }
        // 綁定到事務(wù)對象,注意這里 newConnectionHolder 為 true
                txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
            }
            // 標記 ConnectionHolder 與事務(wù)同步,對應(yīng)上面的檢查,避免過多的獲取數(shù)據(jù)庫連接
            txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
            con = txObject.getConnectionHolder().getConnection();
            // 應(yīng)用read-only標記,并返回初始的隔離級別
            Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
            // 應(yīng)用上配置的隔離級別,初始的隔離級別已經(jīng)記錄下來了,后續(xù)可恢復(fù)
      txObject.setPreviousIsolationLevel(previousIsolationLevel);
      // 打 read-only 標記
            txObject.setReadOnly(definition.isReadOnly());
            // 事務(wù)的開啟需要設(shè)置自動提交為false
            if (con.getAutoCommit()) {
        // 記錄下需要在后續(xù)恢復(fù)自動提交
                txObject.setMustRestoreAutoCommit(true);
                if (logger.isDebugEnabled()) {
                    logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
                }
        // 自動提交設(shè)為false,標識著事務(wù)的開啟
                con.setAutoCommit(false);
            }
            // read-only 標記相關(guān),執(zhí)行 SET TRANSACTION READ ONLY 語句
            prepareTransactionalConnection(con, definition);
      // 重要:標記事務(wù)已激活,此時新的數(shù)據(jù)庫連接已獲取,并且關(guān)閉了自動提交
            txObject.getConnectionHolder().setTransactionActive(true);
            // 應(yīng)用超時時長
            int timeout = determineTimeout(definition);
            if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
                txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
            }
            // 綁定到線程私有存儲,后續(xù)可通過 TransactionSynchronizationManager#getResource(...) 獲取
            if (txObject.isNewConnectionHolder()) {
                TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
            }
        }
        // 出錯的話,釋放數(shù)據(jù)庫連接并拋出異常
        catch (Throwable ex) {
            if (txObject.isNewConnectionHolder()) {
                DataSourceUtils.releaseConnection(con, obtainDataSource());
                txObject.setConnectionHolder(null, false);
            }
            throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
        }
    }

doBegin(...)設(shè)置transaction active標記,isExistingTransaction(...)會檢查這個標記,這樣就構(gòu)成了一個閉環(huán)。最后一步是調(diào)用prepareSynchronization(...)。

    protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
    // 檢查是否是全新開啟的事務(wù)同步機制
    // 換言之 TransactionSynchronizationManager.isSynchronizationActive() 需為false
    if (status.isNewSynchronization()) {
      // 記錄事務(wù)是否已激活,也就是是否有事務(wù)對象
            TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
      // 記錄當前的隔離級別
            TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
                    definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
                            definition.getIsolationLevel() : null);
      // 記錄是否是read-only型事務(wù)
            TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
      // 記錄事務(wù)名稱
            TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
      // 激活事務(wù)同步機制
      // initSynchronization()調(diào)用之后isSynchronizationActive()就返回true,我們已經(jīng)在suspend(...)中見過這個檢查
            TransactionSynchronizationManager.initSynchronization();
        }
    }

再次回到getTransaction(...),可以看到startTransaction(...)是包裹在 try 塊中的,以便在出現(xiàn)異常時恢復(fù)已有事務(wù),那我們就看看具體是怎么恢復(fù)的唄。

    protected final void resume(@Nullable Object transaction,
                             @Nullable SuspendedResourcesHolder resourcesHolder) throws TransactionException {
        // 如果有掛起的事務(wù)
        if (resourcesHolder != null) {
      // 掛起的資源,也就是doSuspend(...)中解綁的ConnectionHolder
            Object suspendedResources = resourcesHolder.suspendedResources;
      // 調(diào)用模板方法,重新綁定ConnectionHolder到ThreadLocal
            if (suspendedResources != null) {
                doResume(transaction, suspendedResources);
            }
      // suspend(...)的逆操作
            List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
            if (suspendedSynchronizations != null) {
                TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
      TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
                TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
                TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
        // 回調(diào)TransactionSynchronization#resume(),重新激活 transaction synchronization
                doResumeSynchronization(suspendedSynchronizations);
            }
        }
    }

  @Override
    protected void doResume(@Nullable Object transaction, Object suspendedResources) {
    // 重新綁定
        TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
    }

基本上是suspend(...)的逆操作,至此,創(chuàng)建新事務(wù)的流程就分析完了。等等,你以為這就完了?還記得我們的小例子嗎?ServiceB說這才哪到哪,好戲才剛剛開始呢。沒錯,對ServiceB而言,ServiceA#service()已經(jīng)開啟事務(wù)了,等到它調(diào)用service()的時候,isExistingTransaction(...)就返回 true 而不是 false 了,那spring-tx是如何處理當前已存在事務(wù)的情況呢?

    private TransactionStatus handleExistingTransaction(
            TransactionDefinition definition, Object transaction, boolean debugEnabled)
            throws TransactionException {
        // PROPAGATION_NEVER不允許運行在事務(wù)中,因此拋出異常
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
            throw new IllegalTransactionStateException(
                    "Existing transaction found for transaction marked with propagation 'never'");
        }
        // PROPAGATION_NOT_SUPPORTED不支持在事務(wù)中運行
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
            if (debugEnabled) {
                logger.debug("Suspending current transaction");
            }
      // 掛起當前事務(wù),以非事務(wù)方式運行
            Object suspendedResources = suspend(transaction);
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
      // 創(chuàng)建不支持事務(wù)的 TransactionStatus,因此 newTransaction 為 false
            return prepareTransactionStatus(
                    definition, null, false, newSynchronization, debugEnabled, suspendedResources);
        }
        // PROPAGATION_REQUIRES_NEW需要開啟新事務(wù)
    // 流程和之前分析的一致,只不過這里不再傳 null 了而已
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
            if (debugEnabled) {
                logger.debug("Suspending current transaction, creating new transaction with name [" +
                        definition.getName() + "]");
            }
      // 掛起已有事務(wù)
            SuspendedResourcesHolder suspendedResources = suspend(transaction);
            try {
        // 開啟新事務(wù)
                return startTransaction(definition, transaction, debugEnabled, suspendedResources);
            }
            catch (RuntimeException | Error beginEx) {
        // 出現(xiàn)異常后恢復(fù)已有事務(wù)
                resumeAfterBeginException(transaction, suspendedResources, beginEx);
                throw beginEx;
            }
        }
        // PROPAGATION_NESTED在已有事務(wù)的情況下是創(chuàng)建嵌套事務(wù),也就是Savepoint
    // 限于篇幅,Savepoint相關(guān)的我們就不講了,用得也不是那么多
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            if (!isNestedTransactionAllowed()) {
                throw new NestedTransactionNotSupportedException(
                        "Transaction manager does not allow nested transactions by default - " +
                        "specify 'nestedTransactionAllowed' property with value 'true'");
            }
            if (debugEnabled) {
                logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
            }
            if (useSavepointForNestedTransaction()) {
                DefaultTransactionStatus status =
                        prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
                status.createAndHoldSavepoint();
                return status;
            }
            else {
                return startTransaction(definition, transaction, debugEnabled, null);
            }
        }

    // 剩下來還能發(fā)揮作用的就只有PROPAGATION_SUPPORTS 和 PROPAGATION_REQUIRED 了
        // 這兩在已有事務(wù)的情況下行為是一致的,都是加入已有事務(wù)    
        if (debugEnabled) {
            logger.debug("Participating in existing transaction");
        }
    // 檢驗事務(wù)配置
        if (isValidateExistingTransaction()) {
      // 隔離級別需保持一致
            if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
                Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                    Constants isoConstants = DefaultTransactionDefinition.constants;
                    throw new IllegalTransactionStateException("Participating transaction with definition [" +
                            definition + "] specifies isolation level which is incompatible with existing transaction: " +
                            (currentIsolationLevel != null ?
                                    isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                    "(unknown)"));
                }
            }
      // read-only 標記要一致
            if (!definition.isReadOnly()) {
                if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                    throw new IllegalTransactionStateException("Participating transaction with definition [" +
                            definition + "] is not marked as read-only but existing transaction is");
                }
            }
        }
        boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    // 創(chuàng)建加入已有事務(wù)的 TransactionStatus,因此 newTransaction 為false 
        return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
    }

    protected final DefaultTransactionStatus prepareTransactionStatus(
            TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
            boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
        // 創(chuàng)建 TransactionStatus
        DefaultTransactionStatus status = newTransactionStatus(
                definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);
    // 按需初始化事務(wù)同步機制
        prepareSynchronization(status, definition);
        return status;
    }

handleExistingTransaction(...)也對不同的傳播行為做了處理,至于其中涉及到的方法調(diào)用,前面都已經(jīng)分析過了。各位同學(xué)在自己看源碼的時候,建議動手調(diào)試調(diào)試,徹底梳理清楚其間的狀態(tài)標志及其轉(zhuǎn)換為好。

rollback

??嘟嘟嘟,CRUD出異常了,這會該回滾事務(wù)了。

    @Override
    public final void rollback(TransactionStatus status) throws TransactionException {
    // 同一個事務(wù)不能多次回滾
        if (status.isCompleted()) {
            throw new IllegalTransactionStateException(
                    "Transaction is already completed - do not call commit or rollback more than once per transaction");
        }
        DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
    // 處理回滾,unexpected用于標記這是不是一個預(yù)期之外的回滾
    // 方法執(zhí)行出現(xiàn)異常,且符合rollbackOn配置,那么回滾是正常的,否則就是不正常的
        processRollback(defStatus, false);
    }

    private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
        try {
      // 預(yù)期之外的異常?
            boolean unexpectedRollback = unexpected;
            try {
        // 回調(diào) TransactionSynchronization#beforeCompletion()
                triggerBeforeCompletion(status);
                // 有Savepoint,回滾到保存點
                if (status.hasSavepoint()) {
                    if (status.isDebug()) {
                        logger.debug("Rolling back transaction to savepoint");
                    }
                    status.rollbackToHeldSavepoint();
                }
        // 獨立事務(wù)
                else if (status.isNewTransaction()) {
                    if (status.isDebug()) {
                        logger.debug("Initiating transaction rollback");
                    }
          // 調(diào)用模板方法進行回滾
                    doRollback(status);
                }
                else {
                    // 非獨立事務(wù),加入的一個已存在的事務(wù)
                    if (status.hasTransaction()) {
            // 已標記為rollback-only,出現(xiàn)于多個事務(wù)方法的嵌套調(diào)用,比如 A -> B -> C,且傳播行為的配置不會導(dǎo)致開啟多個事務(wù)
            // 或者配置了外層事務(wù)會因為內(nèi)層事務(wù)的異常而回滾,默認true
                        if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
                            if (status.isDebug()) {
                                logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
                            }
              // 調(diào)用模板方法設(shè)置rollback-only標記
                            doSetRollbackOnly(status);
                        }
                        else {
                            if (status.isDebug()) {
                                logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
                            }
                        }
                    }
                    else {
                        logger.debug("Should roll back transaction but cannot - no transaction available");
                    }
          // 不需要fail-fast,清楚標志,后續(xù)不拋異常
                    if (!isFailEarlyOnGlobalRollbackOnly()) {
                        unexpectedRollback = false;
                    }
                }
            }
            catch (RuntimeException | Error ex) {
        // 回調(diào) TransactionSynchronization#afterCompletion(...),狀態(tài)未知
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                throw ex;
            }
      // 回調(diào) TransactionSynchronization#afterCompletion(...),已回滾
            triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
            // 拋出異常
            if (unexpectedRollback) {
                throw new UnexpectedRollbackException(
                        "Transaction rolled back because it has been marked as rollback-only");
            }
        }
        finally {
      // 清理
            cleanupAfterCompletion(status);
        }
    }

回滾分為三種情況:

  1. 嵌套事務(wù)(創(chuàng)建有Savepoint)回滾
  2. 獨立事務(wù)回滾
  3. 非獨立事務(wù)回滾

篇幅有限,Savepoint就不說了,重點看2、3兩種情況。對于獨立事務(wù),調(diào)用doRollback(...)進行回滾,基于java.sql.Connection的回滾機制。

    @Override
    protected void doRollback(DefaultTransactionStatus status) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
        Connection con = txObject.getConnectionHolder().getConnection();
        if (status.isDebug()) {
            logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
        }
        try {
      // 回滾
            con.rollback();
        }
        catch (SQLException ex) {
            throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
        }
    }

對于非獨立事務(wù),調(diào)用doSetRollbackOnly(...),僅僅是打上rollback-only標記用于后續(xù)的檢測,processRollback(...)本身就對這個標記進行了檢測。

@Override
    protected void doSetRollbackOnly(DefaultTransactionStatus status) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
        if (status.isDebug()) {
            logger.debug("Setting JDBC transaction [" + txObject.getConnectionHolder().getConnection() +
                    "] rollback-only");
        }
    // 代理給ConnectionHolder,打上rollback-only標記
        txObject.setRollbackOnly();
    }

能做的都做了,最后該清理現(xiàn)場了。

    private void cleanupAfterCompletion(DefaultTransactionStatus status) {
    // 標記事務(wù)已完成
        status.setCompleted();
    // 若開啟了事務(wù)同步機制,清除相關(guān)的線程私有變量
        if (status.isNewSynchronization()) {
            TransactionSynchronizationManager.clear();
        }
    // 若是獨立事務(wù),調(diào)用模板方法進行清理
        if (status.isNewTransaction()) {
            doCleanupAfterCompletion(status.getTransaction());
        }
    // 若還存在外層事務(wù)
        if (status.getSuspendedResources() != null) {
            if (status.isDebug()) {
                logger.debug("Resuming suspended transaction after completion of inner transaction");
            }
      // 恢復(fù)外層事務(wù)
            Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
            resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
        }
    }

    /**
   * 基本上是 doBegin(...)的逆操作
   */
    @Override
    protected void doCleanupAfterCompletion(Object transaction) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        // 解綁
        if (txObject.isNewConnectionHolder()) {
            TransactionSynchronizationManager.unbindResource(obtainDataSource());
        }
        Connection con = txObject.getConnectionHolder().getConnection();
        try {
      // 如有必要,恢復(fù)自動提交
            if (txObject.isMustRestoreAutoCommit()) {
                con.setAutoCommit(true);
            }
      // 恢復(fù)原有隔離級別和readOnly標記
            DataSourceUtils.resetConnectionAfterTransaction(
                    con, txObject.getPreviousIsolationLevel(), txObject.isReadOnly());
        }
        catch (Throwable ex) {
            logger.debug("Could not reset JDBC Connection after transaction", ex);
        }
        // 若是獨立事務(wù),釋放連接
        if (txObject.isNewConnectionHolder()) {
            if (logger.isDebugEnabled()) {
                logger.debug("Releasing JDBC Connection [" + con + "] after transaction");
            }
            DataSourceUtils.releaseConnection(con, this.dataSource);
        }
        // 清除ConnectionHolder的狀態(tài)
        txObject.getConnectionHolder().clear();
    }
commit

??要是沒出什么幺蛾子,就可以提交事務(wù)了。

    @Override
    public final void commit(TransactionStatus status) throws TransactionException {
    // 同一個事務(wù)不能重復(fù)提交
        if (status.isCompleted()) {
            throw new IllegalTransactionStateException(
                    "Transaction is already completed - do not call commit or rollback more than once per transaction");
        }
        DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
    // 當前事務(wù)被標記為rollback-only
    // 何時打上rollback-only標記可參考對 rollback 的分析
        if (defStatus.isLocalRollbackOnly()) {
            if (defStatus.isDebug()) {
                logger.debug("Transactional code has requested rollback");
            }
      // 執(zhí)行回滾而不是提交
            processRollback(defStatus, false);
            return;
        }
        // 全局回滾,XA事務(wù)可能會有吧
        if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
            if (defStatus.isDebug()) {
                logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
            }
      // 執(zhí)行回滾,unexpected為true
      // 比如一個XA事務(wù),事務(wù)A執(zhí)行正常,事務(wù)B執(zhí)行異常
      // 此時這個大事務(wù)是不能提交的,事務(wù)協(xié)調(diào)者會標記全局rollback-only
      // 這樣就可以讓原本正常的事務(wù)A也回滾而不是提交
            processRollback(defStatus, true);
            return;
        }
        // 終于可以提交了,開心??
        processCommit(defStatus);
    }

    private void processCommit(DefaultTransactionStatus status) throws TransactionException {
        try {
            boolean beforeCompletionInvoked = false;

            try {
                boolean unexpectedRollback = false;
        // 準備提交,擴展點
                prepareForCommit(status);
        // 回調(diào) TransactionSynchronization#beforeCommit()
                triggerBeforeCommit(status);
        // 回調(diào) TransactionSynchronization#beforeCompletion()
                triggerBeforeCompletion(status);
                beforeCompletionInvoked = true;
                // Savepoint,忽略不說了
                if (status.hasSavepoint()) {
                    if (status.isDebug()) {
                        logger.debug("Releasing transaction savepoint");
                    }
                    unexpectedRollback = status.isGlobalRollbackOnly();
                    status.releaseHeldSavepoint();
                }
        // 獨立事務(wù)
                else if (status.isNewTransaction()) {
                    if (status.isDebug()) {
                        logger.debug("Initiating transaction commit");
                    }
          // 讀取全局rollback-only標記
                    unexpectedRollback = status.isGlobalRollbackOnly();
          // 調(diào)用模板方法進行提交
                    doCommit(status);
                }
        // fail-fast,讀取全局rollback-only標記
                else if (isFailEarlyOnGlobalRollbackOnly()) {
                    unexpectedRollback = status.isGlobalRollbackOnly();
                }
                // 拋出異常,如果全局標記有 rollback-only
                if (unexpectedRollback) {
                    throw new UnexpectedRollbackException(
                            "Transaction silently rolled back because it has been marked as rollback-only");
                }
            }
            catch (UnexpectedRollbackException ex) {
        // 回調(diào) TransactionSynchronization#afterCompletion(...)
                // 對應(yīng)上面拋出的異常,此時事務(wù)是回滾的而不是已提交
        triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
                throw ex;
            }
            catch (TransactionException ex) {
                // 如果需要在提交失敗時進行回滾的話,默認false
        if (isRollbackOnCommitFailure()) {
                    doRollbackOnCommitException(status, ex);
                }
                else {
          // 回調(diào) TransactionSynchronization#afterCompletion(...),狀態(tài)未知
                    triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                }
                throw ex;
            }
            catch (RuntimeException | Error ex) {
        // 確保 TransactionSynchronization#beforeCompletion()得到執(zhí)行
                if (!beforeCompletionInvoked) {
                    triggerBeforeCompletion(status);
                }
        // 回滾事務(wù),如果提交失敗的話,注意與前面遇到TransactionException時處理的異同
                doRollbackOnCommitException(status, ex);
                throw ex;
            }
      // 回調(diào) TransactionSynchronization#afterCommit()
            try {
                triggerAfterCommit(status);
            }
            finally {
        // 回調(diào) TransactionSynchronization#afterCompletion(...),狀態(tài)已提交
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
            }
        }
        finally {
      // 清理
            cleanupAfterCompletion(status);
        }
    }

??commit相對來講是比較復(fù)雜的,它牽涉到對rollback-only標記的處理。這里解釋一下global rollback only,區(qū)別于local rollback only,這個標記多用于多事務(wù)協(xié)作的情況,比如兩階段提交協(xié)議下的分布式事務(wù)。

??區(qū)別于rollback,commit不需要處理非獨立事務(wù)的情況,內(nèi)層事務(wù)不需要提交,它只需要等待外層事務(wù)提交即可。對于獨立事務(wù),模板方法doCommit(...)也只是簡單的代理給java.sql.Connection的提交機制。

    @Override
    protected void doCommit(DefaultTransactionStatus status) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
        Connection con = txObject.getConnectionHolder().getConnection();
        if (status.isDebug()) {
            logger.debug("Committing JDBC transaction on Connection [" + con + "]");
        }
        try {
            con.commit();
        }
        catch (SQLException ex) {
      // processCommit(...)對doCommit(...)拋出的TransactionException有特殊處理
            throw new TransactionSystemException("Could not commit JDBC transaction", ex);
        }
    }

后記

??最后回顧一下我們的小例子,現(xiàn)在它還有秘密嗎?本篇我們一起探索了PlatformTransactionManeger抽象下的事務(wù)管理機制,篇幅較長,相信耐心看完的你一定有所收獲??

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容