Spring事務(wù)管理

Spring事務(wù)概述

JAVA事務(wù)局限

  • 局部事務(wù)的管理綁定到了具體的數(shù)據(jù)訪問方式

    使用特定的數(shù)據(jù)訪問方式,就必須使用該方式特有的事務(wù)管理。其必然結(jié)果就是導(dǎo)致業(yè)務(wù)代碼與事務(wù)管理代碼相互耦合,因為并沒有統(tǒng)一所有事務(wù)管理API的上層抽象。

  • 事務(wù)的異常處理

    與數(shù)據(jù)訪問相同,特定API的事務(wù)管理的異常體系并不統(tǒng)一。當(dāng)使用特定API時,就必須在代碼中實現(xiàn)一套異常處理邏輯(而且,同時在事務(wù)管理中,往往也要處理數(shù)據(jù)訪問拋出的異常)。相應(yīng)地,這對未來的維護(hù)以及技術(shù)的替換帶來了前所未有的困難。

  • CMT聲明式事務(wù)的局限

    聲明式事務(wù)必然依賴某種容器。而當(dāng)時企業(yè)級應(yīng)用的容器EJB,才能保證聲明式事務(wù)的使用。

Spring事務(wù)

Spring抽離了事務(wù)管理的關(guān)注點,并對這些關(guān)注點進(jìn)行了抽象。這樣子使得開發(fā)人員,在spring容器下也可以使用統(tǒng)一的事務(wù)管理編程模型,而不需要關(guān)注使用的數(shù)據(jù)訪問方式和數(shù)據(jù)資源類型。

Spring的事務(wù)管理可以良好結(jié)合Spring的數(shù)據(jù)訪問。

Spring事務(wù)框架設(shè)計原則

image.png

Spring事務(wù)框架

核心接口

public interface PlatformTransactionManager {

   TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException;

   void commit(TransactionStatus status) throws TransactionException;

   void rollback(TransactionStatus status) throws TransactionException;

}

作為Spring事務(wù)管理框架的核心接口,它只是提供了事務(wù)管理的統(tǒng)一抽象,具體的管理方式,則根據(jù)數(shù)據(jù)訪問方式以及數(shù)據(jù)資源類型,來派生具體實現(xiàn)。默認(rèn)實現(xiàn)由:org.springframework.transaction.jta.JtaTransactionManager(分布式事務(wù))和org.springframework.jdbc.datasource.DataSourceTransactionManager(局部事務(wù))。

問題

思考:如果是JDBC的方式,那么進(jìn)行事務(wù)管理,必須基于Connection(或者h(yuǎn)ibernate的session)。那么在TransactionManager中是通過什么方式獲取的呢。實際上,是與Spring的數(shù)據(jù)訪問相結(jié)合,在數(shù)據(jù)訪問框架中,通過DataSourceUtil管理connection。因此本身也是借助數(shù)據(jù)訪問框架來完成資源管理的。

對應(yīng)hibernate的是SessionFactoryUtils,對應(yīng)JDO的PermissionManagerFactoryUtils

image.png

核心接口體系

image.png

PlatformTransactionManager負(fù)責(zé)界定事務(wù)邊界;TransactionDefinition負(fù)責(zé)定義事務(wù)相關(guān)屬性,比如事務(wù)傳播,事務(wù)隔離級別,超時時間,是否只讀等;TransactionStatus負(fù)責(zé)事務(wù)開啟之后到結(jié)束的事務(wù)狀態(tài),可以通過該類對事務(wù)進(jìn)行有限的控制。

TransactionDefinition

事務(wù)屬性

  • 事務(wù)隔離級別(默認(rèn)采用數(shù)據(jù)庫的默認(rèn)隔離級別)

    int ISOLATION_DEFAULT = -1;
    // 1:讀未提交 問題:臟讀,幻讀,不可重復(fù)讀
    int ISOLATION_READ_UNCOMMITTED = Connection.TRANSACTION_READ_UNCOMMITTED;
    // 2:讀已提交 問題:幻讀,不可重復(fù)讀
    int ISOLATION_READ_COMMITTED = Connection.TRANSACTION_READ_COMMITTED;
    // 4:可重復(fù)讀 問題:幻讀
    int ISOLATION_REPEATABLE_READ = Connection.TRANSACTION_REPEATABLE_READ;
    // 8:串行化
    int ISOLATION_SERIALIZABLE = Connection.TRANSACTION_SERIALIZABLE;
    

臟讀(dirty reads)

一個事務(wù)讀取了另一個未提交的并行事務(wù)寫的數(shù)據(jù)。

不可重復(fù)讀(non-repeatable reads)

一個事務(wù)重新讀取前面讀取過的數(shù)據(jù), 發(fā)現(xiàn)該數(shù)據(jù)已經(jīng)被另一個已提交的事務(wù)修改過。 (針對一條記錄)

幻讀(phantom read)

一個事務(wù)重新執(zhí)行一個查詢,返回一套符合查詢條件的行, 發(fā)現(xiàn)這些行因為其他最近提交的事務(wù)而發(fā)生了改變。(針對其他多條記錄)

  • 事務(wù)傳播行為

    int PROPAGATION_REQUIRED = 0;
    
    int PROPAGATION_SUPPORTS = 1;
    
    int PROPAGATION_MANDATORY = 2;
    
    int PROPAGATION_REQUIRES_NEW = 3;
    
    int PROPAGATION_NOT_SUPPORTED = 4;
    
    int PROPAGATION_NEVER = 5;
    
    int PROPAGATION_NESTED = 6;
    
    事務(wù)傳播行為類型 說明
    PROPAGATION_REQUIRED 如果當(dāng)前沒有事務(wù),就新建一個事務(wù),如果已經(jīng)存在一個事務(wù)中,加入到這個事務(wù)中。這是最常見的選擇。
    PROPAGATION_SUPPORTS 支持當(dāng)前事務(wù),如果當(dāng)前沒有事務(wù),就以非事務(wù)方式執(zhí)行。
    PROPAGATION_MANDATORY 使用當(dāng)前的事務(wù),如果當(dāng)前沒有事務(wù),就拋出異常。
    PROPAGATION_REQUIRES_NEW 新建事務(wù),如果當(dāng)前存在事務(wù),把當(dāng)前事務(wù)掛起。
    PROPAGATION_NOT_SUPPORTED 以非事務(wù)方式執(zhí)行操作,如果當(dāng)前存在事務(wù),就把當(dāng)前事務(wù)掛起。
    PROPAGATION_NEVER 以非事務(wù)方式執(zhí)行,如果當(dāng)前存在事務(wù),則拋出異常。
    PROPAGATION_NESTED 如果當(dāng)前存在事務(wù),則在嵌套事務(wù)內(nèi)執(zhí)行。如果當(dāng)前沒有事務(wù),則執(zhí)行與PROPAGATION_REQUIRED類似的操作

    Spring事務(wù)傳播行為詳解:代碼舉例清晰

  • 事務(wù)超時時間(默認(rèn)是事務(wù)系統(tǒng)的默認(rèn)超時時間)

    int TIMEOUT_DEFAULT = -1;

  • 是否為只讀事務(wù)

    由具體實現(xiàn)類控制。

實現(xiàn)類

可以將TransactionDefinition的實現(xiàn)類根據(jù)編程式事務(wù)場景和聲明式事務(wù)場景(基于AOP)分為兩類。這種分類只是出于每個類在相應(yīng)場景的出現(xiàn)頻率考慮的,并不代表只能在規(guī)定場景使用。

image.png
編程式事務(wù)

DefaultTransactionDefinitionTransactionDefinition的默認(rèn)實現(xiàn)類,提供了各種事務(wù)屬性的默認(rèn)值,并提供了setter方法以進(jìn)行更改。默認(rèn)屬性:

private int propagationBehavior = PROPAGATION_REQUIRED;

private int isolationLevel = ISOLATION_DEFAULT;

private int timeout = TIMEOUT_DEFAULT;

private boolean readOnly = false;

TransactionTemplate是Spring提供的簡化編程式事務(wù)界限和事務(wù)異??刂频哪0宸椒?,直接繼承自DefaultTransactionDefinition。因此允許通過TransactionTemplate提供事務(wù)屬性控制。其核心方法execute(實現(xiàn)自TransactionCallback)支持事務(wù)代碼。

聲明式事務(wù)

TransactionAttribute繼承自TransactionDefinition,主要面向使用Spring AOP進(jìn)行聲明式事務(wù)的場景。所以在父接口的基礎(chǔ)上添加了一個boolean rollbackOn(Throwable ex);的方法定義,以便通過聲明的方式指定業(yè)務(wù)方法在拋出什么異常時進(jìn)行回滾。

DelegatingTransactionDefinition是一個抽象類,它將所有事務(wù)屬性getter方法調(diào)用委派給另一個TransactionDefinition的具體實現(xiàn)類。

DelegatingTransactionAttribute依舊是一個抽象類,在DelegatingTransactionDefinition的基礎(chǔ)上,將rollbackOn等動作也委派給其他的具體實現(xiàn)類。

DefaultTransactionAttributeTransactionAttribute的通用實現(xiàn),同時繼承了DefaultTransactionDefinition,在此基礎(chǔ)上指定了當(dāng)異常類型為unchecked exception的時候進(jìn)行事務(wù)回滾(ex instanceof RuntimeException || ex instanceof Error)。

RuleBasedTransactionAttribute允許指定多個回滾規(guī)則,這些規(guī)則以RollbackRuleAttributeNoRollbackRuleAttribute的list形式存在。rollbackOn將傳入的異常與這些規(guī)則匹配,以決定是否進(jìn)行回滾。

TransactionStatus

image.png

TransactionStatus代表著事務(wù)狀態(tài),程序可以通過該類獲取狀態(tài)信息以及編程式地進(jìn)行回滾(而不是通過拋出異常)。繼承SavepointManager接口提供了SavePoint管理能力。不過該能力只在基礎(chǔ)事務(wù)管理下可用。

![image.png](https://upload-images.jianshu.io/upload_images/19423360-6a4c2ab5af8e870f.jpg?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

PlatformTransactionManager

PlatformTransactionManager的抽象體系基于strategy模式,由PlatformTransactionManager對事務(wù)界定進(jìn)行統(tǒng)一抽象,而具體的界定策略由具體實現(xiàn)類定義。

由于PlatformTransactionManager的子類在實現(xiàn)時,基本上遵循統(tǒng)一的結(jié)構(gòu)和理念,所以代碼以DataSourceTransactionManager為例進(jìn)行描述。

image.png

相關(guān)概念

image.png

AbstractPlatformTransactionManager

AbstractPlatformTransactionManager作為PlatformTransactionManager的抽象實現(xiàn),以模板方法的封裝了固定的事務(wù)處理邏輯,而只將與事務(wù)資源相關(guān)的操作以protected或者abstract方法的形式留給具體的實現(xiàn)類。

而作為抽象模板類,AbstractPlatformTransactionManager替子類實現(xiàn)了以下邏輯:

  • 判斷當(dāng)前是否存在事務(wù),根據(jù)判斷結(jié)果執(zhí)行不同處理;
  • 根據(jù)是否存在當(dāng)前事務(wù)的情況,應(yīng)用適當(dāng)?shù)氖聞?wù)傳播行為;
  • 掛起以及恢復(fù)事務(wù);
  • 提交時檢查rollbackonly標(biāo)志,如果是,則執(zhí)行事務(wù)回滾,而不是提交。
  • 事務(wù)回滾的情況下,清理并恢復(fù)事務(wù)狀態(tài);
  • 如果 Transaction Synchronization 為active,則在事務(wù)處理的規(guī)定時點觸發(fā)注冊的Synchronization回調(diào)接口。

主要體現(xiàn)在以下幾個模板方法:

public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {}

public final void rollback(TransactionStatus status) throws TransactionException {}

public final void commit(TransactionStatus status) throws TransactionException {}

protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {}

protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder) throws TransactionException {}
getTransaction
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
    // 1.doGetTransaction是由具體實現(xiàn)類提供的,會返回特定實現(xiàn)的Transaction Object。比如DataSourceTransactionObject。
   // 在doGetTransaction中,會從TransactionSynchronizationManager獲取DataSource并綁定到Transaction Object中返回。
   Object transaction = doGetTransaction();

   // Cache debug flag to avoid repeated checks.
   boolean debugEnabled = logger.isDebugEnabled();

   if (definition == null) {
      // 2.Use defaults if no transaction definition given.
      // 如果沒有提供 Transaction Definition,則生成一個默認(rèn)的事務(wù)屬性信息。
      definition = new DefaultTransactionDefinition();
   }
        // 3.檢測是否存在事務(wù),再根據(jù)事務(wù)傳播行為進(jìn)行對應(yīng)的處理。
    // isExistingTransaction 默認(rèn)返回false,具體實現(xiàn)類根據(jù)對應(yīng)的Transaction Object進(jìn)行判斷(內(nèi)部封裝的DataSource)。
   if (isExistingTransaction(transaction)) {
      // 4.Existing transaction found -> check propagation behavior to find out how to behave.
      return handleExistingTransaction(definition, transaction, debugEnabled);
   }

   // Check definition settings for new transaction.
   if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
      throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
   }
   // 根據(jù)事務(wù)傳播行為,創(chuàng)建新的事務(wù)
   // No existing transaction found -> check propagation behavior to find out how to proceed.
   if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
      throw new IllegalTransactionStateException(
            "No existing transaction found for transaction marked with propagation 'mandatory'");
   }
   else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
         definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
         definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
      SuspendedResourcesHolder suspendedResources = suspend(null);
      if (debugEnabled) {
         logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
      }
      try {
         boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
         DefaultTransactionStatus status = newTransactionStatus(
               definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
         doBegin(transaction, definition);
         prepareSynchronization(status, definition);
         return status;
      }
      catch (RuntimeException | Error ex) {
         resume(null, suspendedResources);
         throw ex;
      }
   }
   else {
      // Create "empty" transaction: no actual transaction, but potentially synchronization.
      if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
         logger.warn("Custom isolation level specified but no actual transaction initiated; " +
               "isolation level will effectively be ignored: " + definition);
      }
      boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
      return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
   }
}
3.isExistingTransaction & handleExistingTransaction

舉例一下DataSourceTransactionManager的實現(xiàn)。

基于JDBC的實現(xiàn),查看JDBC Connection是否存在并活躍。

protected boolean isExistingTransaction(Object transaction) {
   DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
   return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
}

默認(rèn)實現(xiàn),私有方法。handleExistingTransaction根據(jù)事務(wù)傳播行為,處理已存在的事務(wù)。

suspend&resume

這兩個方法只是對TransactionSynchronizationManager中當(dāng)前事務(wù)資源進(jìn)行處理。

suspend是將TransactionSynchronizationManager中的資源全部取出,封裝SuspendedResourcesHolder中。然后將TransactionSynchronizationManager清空。返回SuspendedResourcesHolder,TransactionStatus會保存SuspendedResourcesHolder,以便事后釋放該掛起的事務(wù)。

resume則是suspend的相反操作。將從TransactionStatus中獲取的SuspendedResourcesHolder,重新放回到TransactionSynchronizationManager中。

suspend

@Nullable
    protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
        if (TransactionSynchronizationManager.isSynchronizationActive()) {
            List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
            try {
                Object suspendedResources = null;
                if (transaction != null) {
          // 子類實現(xiàn),DataSourceTransactionManager則釋放Connection&DataSource
                    suspendedResources = doSuspend(transaction);
                }
                String name = TransactionSynchronizationManager.getCurrentTransactionName();
                TransactionSynchronizationManager.setCurrentTransactionName(null);
                boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
                TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
                Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
                boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
                TransactionSynchronizationManager.setActualTransactionActive(false);
                return new SuspendedResourcesHolder(
                        suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
            }
        }
        else if (transaction != null) {
            // Transaction active but no synchronization active.
            Object suspendedResources = doSuspend(transaction);
            return new SuspendedResourcesHolder(suspendedResources);
        }
        else {
            // Neither transaction nor synchronization active.
            return null;
        }
    }
private List<TransactionSynchronization> doSuspendSynchronization() {
        List<TransactionSynchronization> suspendedSynchronizations =
                TransactionSynchronizationManager.getSynchronizations();
        for (TransactionSynchronization synchronization : suspendedSynchronizations) {
            synchronization.suspend();
        }
        TransactionSynchronizationManager.clearSynchronization();
        return suspendedSynchronizations;
    }

resume

protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
  throws TransactionException {

  if (resourcesHolder != null) {
    Object suspendedResources = resourcesHolder.suspendedResources;
    if (suspendedResources != null) {
      doResume(transaction, suspendedResources);
    }
    List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
    if (suspendedSynchronizations != null) {
      TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
      TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
      TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
      TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
      doResumeSynchronization(suspendedSynchronizations);
    }
  }
}

private void doResumeSynchronization(List<TransactionSynchronization> suspendedSynchronizations) {
        TransactionSynchronizationManager.initSynchronization();
        for (TransactionSynchronization synchronization : suspendedSynchronizations) {
            synchronization.resume();
            TransactionSynchronizationManager.registerSynchronization(synchronization);
        }
    }
commit&rollback

這兩個方法都會去檢查isRollbackOnly標(biāo)志的值,而采取相應(yīng)的操作(回滾或提交)。

主要針對savepoint的處理(嵌套事務(wù))和當(dāng)前事務(wù)的回滾或提交、TransactionSynchronization的調(diào)用、資源清理。

引用
《spring揭秘》

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容