前面我們講到了Spring在進(jìn)行事務(wù)邏輯織入的時候,無論是事務(wù)開始,提交或者回滾,都會觸發(fā)相應(yīng)的事務(wù)事件。本文首先會使用實例進(jìn)行講解Spring事務(wù)事件是如何使用的,然后會講解這種使用方式的實現(xiàn)原理。
1. 示例
對于事務(wù)事件,Spring提供了一個注解@TransactionEventListener,將這個注解標(biāo)注在某個方法上,那么就將這個方法聲明為了一個事務(wù)事件處理器,而具體的事件類型則是由TransactionalEventListener.phase屬性進(jìn)行定義的。如下是TransactionalEventListener的聲明:
```
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})@Retention(RetentionPolicy.RUNTIME)@Documented@EventListenerpublic @interface TransactionalEventListener {
? ? // 指定當(dāng)前標(biāo)注方法處理事務(wù)的類型 TransactionPhasephase()defaultTransactionPhase.AFTER_COMMIT;
? ? // 用于指定當(dāng)前方法如果沒有事務(wù),是否執(zhí)行相應(yīng)的事務(wù)事件監(jiān)聽器 booleanfallbackExecution()defaultfalse;
? ? // 與classes屬性一樣,指定了當(dāng)前事件傳入的參數(shù)類型,指定了這個參數(shù)之后就可以在監(jiān)聽方法上? ? // 直接什么一個這個參數(shù)了 @AliasFor(annotation = EventListener.class, attribute = "classes")
Class<?>[] value() default {};
? ? // 作用于value屬性一樣,用于指定當(dāng)前監(jiān)聽方法的參數(shù)類型 @AliasFor(annotation = EventListener.class, attribute = "classes")
Class<?>[] classes() default {};
? ? // 這個屬性使用Spring Expression Language對目標(biāo)類和方法進(jìn)行匹配,對于不匹配的方法將會過濾掉 Stringcondition()default"";
}
```
關(guān)于這里的classes屬性需要說明一下,如果指定了classes屬性,那么當(dāng)前監(jiān)聽方法的參數(shù)類型就可以直接使用所發(fā)布的事件的參數(shù)類型,如果沒有指定,那么這里監(jiān)聽的參數(shù)類型可以使用兩種:ApplicationEvent和PayloadApplicationEvent。對于ApplicationEvent類型的參數(shù),可以通過其getSource()方法獲取發(fā)布的事件參數(shù),只不過其返回值是一個Object類型的,如果想獲取具體的類型還需要進(jìn)行強(qiáng)轉(zhuǎn);對于PayloadApplicationEvent類型,其可以指定一個泛型參數(shù),該泛型參數(shù)必須與發(fā)布的事件的參數(shù)類型一致,這樣就可以通過其getPayload()方法獲取事務(wù)事件發(fā)布的數(shù)據(jù)了。關(guān)于上述屬性中的TransactionPhase,其可以取如下幾個類型的值:
```
public enum TransactionPhase {
? ? // 指定目標(biāo)方法在事務(wù)commit之前執(zhí)行 BEFORE_COMMIT,
? ? // 指定目標(biāo)方法在事務(wù)commit之后執(zhí)行 AFTER_COMMIT,
? ? // 指定目標(biāo)方法在事務(wù)rollback之后執(zhí)行 AFTER_ROLLBACK,
? ? // 指定目標(biāo)方法在事務(wù)完成時執(zhí)行,這里的完成是指無論事務(wù)是成功提交還是事務(wù)回滾了 AFTER_COMPLETION
}
```
這里我們假設(shè)數(shù)據(jù)庫有一個user表,對應(yīng)的有一個UserService和User的model,用于往該表中插入數(shù)據(jù),并且插入動作時使用注解標(biāo)注目標(biāo)方法。如下是這幾個類的聲明:
```
public classUser{
? private long id;
? private String name;
? private int age;
? // getter and setter...}
```
```
@Service@Transactionalpublic classUserServiceImplimplementsUserService{
? @Autowired? private JdbcTemplate jdbcTemplate;
? @Autowired? private ApplicationEventPublisher publisher;
? @Override? publicvoidinsert(User user){
? ? jdbcTemplate.update("insert into user (id, name, age) value (?, ?, ?)",
? ? ? ? user.getId(), user.getName(), user.getAge());
? ? publisher.publishEvent(user);
? }
}
```
上述代碼中有一點需要注意的是,對于需要監(jiān)控事務(wù)事件的方法,在目標(biāo)方法執(zhí)行的時候需要使用ApplicationEventPublisher發(fā)布相應(yīng)的事件消息。如下是對上述消息進(jìn)行監(jiān)控的程序:
```
@Componentpublic classUserTransactionEventListener{
? @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
? publicvoidbeforeCommit(PayloadApplicationEvent<User> event){
? ? System.out.println("before commit, id: " + event.getPayload().getId());
? }
? @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
? publicvoidafterCommit(PayloadApplicationEvent<User> event){
? ? System.out.println("after commit, id: " + event.getPayload().getId());
? }
? @TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)
? publicvoidafterCompletion(PayloadApplicationEvent<User> event){
? ? System.out.println("after completion, id: " + event.getPayload().getId());
? }
? @TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
? publicvoidafterRollback(PayloadApplicationEvent<User> event){
? ? System.out.println("after rollback, id: " + event.getPayload().getId());
? }
}
```
這里對于事件的監(jiān)控,只需要在監(jiān)聽方法上添加@TransactionalEventListener注解即可。這里需要注意的一個問題,在實際使用過程中,對于監(jiān)聽的事務(wù)事件,需要使用其他的參數(shù)進(jìn)行事件的過濾,因為這里的監(jiān)聽還是會監(jiān)聽所有事件參數(shù)為User類型的事務(wù),而無論其是哪個位置發(fā)出來的。如果需要對事件進(jìn)行過濾,這里可以封裝一個UserEvent對象,其內(nèi)保存一個類似EventType的屬性和一個User對象,這樣在發(fā)布消息的時候就可以指定EventType屬性,而在監(jiān)聽消息的時候判斷當(dāng)前方法監(jiān)聽的事件對象的EventType是否為目標(biāo)type,如果是,則對其進(jìn)行處理,否則直接略過。下面是上述程序的xml文件配置和驅(qū)動程序:

```
public classTransactionApp{
? @Test? publicvoidtestTransaction(){
? ? ApplicationContext ac = new ClassPathXmlApplicationContext("applicationContext.xml");
? ? UserService userService = context.getBean(UserService.class);
? ? User user = getUser();
? ? userService.insert(user);
? }
? privateUsergetUser(){
? ? int id = new Random()
? ? ? .nextInt(1000000);
? ? User user = new User();
? ? user.setId(id);
? ? user.setName("Mary");
? ? user.setAge(27);
? ? return user;
? }
}
```
運行上述程序,其執(zhí)行結(jié)果如下:
```
before commit, id: 935052
after commit, id: 935052
after completion, id: 935052
```
?可以看到,這里確實成功監(jiān)聽了目標(biāo)程序的相關(guān)事務(wù)行為。
2. 實現(xiàn)原理
關(guān)于事務(wù)的實現(xiàn)原理,這里其實是比較簡單的,在前面的文章中,我們講解到,Spring對事務(wù)監(jiān)控的處理邏輯在TransactionSynchronization中,如下是該接口的聲明:
```
public interfaceTransactionSynchronizationextendsFlushable{
? ? // 在當(dāng)前事務(wù)掛起時執(zhí)行 defaultvoidsuspend(){
}
? ? // 在當(dāng)前事務(wù)重新加載時執(zhí)行 defaultvoidresume(){
}
? ? // 在當(dāng)前數(shù)據(jù)刷新到數(shù)據(jù)庫時執(zhí)行 defaultvoidflush(){
}
? ? // 在當(dāng)前事務(wù)commit之前執(zhí)行 defaultvoidbeforeCommit(booleanreadOnly){
}
? ? // 在當(dāng)前事務(wù)completion之前執(zhí)行 defaultvoidbeforeCompletion(){
}
? ? // 在當(dāng)前事務(wù)commit之后實質(zhì)性 defaultvoidafterCommit(){
}
? ? // 在當(dāng)前事務(wù)completion之后執(zhí)行 defaultvoidafterCompletion(intstatus){
}
}
```
很明顯,這里的TransactionSynchronization接口只是抽象了一些行為,用于事務(wù)事件發(fā)生時觸發(fā),這些行為在Spring事務(wù)中提供了內(nèi)在支持,即在相應(yīng)的事務(wù)事件時,其會獲取當(dāng)前所有注冊的TransactionSynchronization對象,然后調(diào)用其相應(yīng)的方法。那么這里TransactionSynchronization對象的注冊點對于我們了解事務(wù)事件觸發(fā)有至關(guān)重要的作用了。這里我們首先回到事務(wù)標(biāo)簽的解析處,在前面講解事務(wù)標(biāo)簽解析時,我們講到Spring會注冊一個TransactionalEventListenerFactory類型的bean到Spring容器中,這里關(guān)于標(biāo)簽的解析讀者可以閱讀本人前面的文章Spring事務(wù)用法示例與實現(xiàn)原理。這里注冊的TransactionalEventListenerFactory實現(xiàn)了EventListenerFactory接口,這個接口的主要作用是先判斷目標(biāo)方法是否是某個監(jiān)聽器的類型,然后為目標(biāo)方法生成一個監(jiān)聽器,其會在某個bean初始化之后由Spring調(diào)用其方法用于生成監(jiān)聽器。如下是該類的實現(xiàn):
```
public classTransactionalEventListenerFactoryimplementsEventListenerFactory,Ordered{
? ? // 指定當(dāng)前監(jiān)聽器的順序? ? private int order = 50;
? ? publicvoidsetOrder(intorder){
? ? ? ? this.order = order;
? ? }
? ? @Override? ? publicintgetOrder(){
? ? ? ? return this.order;
? ? }
? ? // 指定目標(biāo)方法是否是所支持的監(jiān)聽器的類型,這里的判斷邏輯就是如果目標(biāo)方法上包含有? ? // TransactionalEventListener注解,則說明其是一個事務(wù)事件監(jiān)聽器? ? @Override? ? publicbooleansupportsMethod(Method method){
? ? ? ? return (AnnotationUtils.findAnnotation(method, TransactionalEventListener.class) != null);
? ? }
? ? // 為目標(biāo)方法生成一個事務(wù)事件監(jiān)聽器,這里ApplicationListenerMethodTransactionalAdapter實現(xiàn)了? ? // ApplicationEvent接口? ? @Override? ? public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) {
? ? ? ? return new ApplicationListenerMethodTransactionalAdapter(beanName, type, method);
? ? }
}
```
這里關(guān)于事務(wù)事件監(jiān)聽的邏輯其實已經(jīng)比較清楚了。ApplicationListenerMethodTransactionalAdapter本質(zhì)上是實現(xiàn)了ApplicationListener接口的,也就是說,其是Spring的一個事件監(jiān)聽器,這也就是為什么進(jìn)行事務(wù)處理時需要使用ApplicationEventPublisher.publish()方法發(fā)布一下當(dāng)前事務(wù)的事件。
ApplicationListenerMethodTransactionalAdapter在監(jiān)聽到發(fā)布的事件之后會生成一個TransactionSynchronization對象,并且將該對象注冊到當(dāng)前事務(wù)邏輯中,如下是監(jiān)聽事務(wù)事件的處理邏輯:
```
@OverridepublicvoidonApplicationEvent(ApplicationEvent event){
? ? // 如果當(dāng)前TransactionManager已經(jīng)配置開啟事務(wù)事件監(jiān)聽,? ? // 此時才會注冊TransactionSynchronization對象? ? if (TransactionSynchronizationManager.isSynchronizationActive()) {
? ? ? ? // 通過當(dāng)前事務(wù)事件發(fā)布的參數(shù),創(chuàng)建一個TransactionSynchronization對象? ? ? ? TransactionSynchronization transactionSynchronization =
? ? ? ? ? ? createTransactionSynchronization(event);
? ? ? ? // 注冊TransactionSynchronization對象到TransactionManager中? ? ? ? TransactionSynchronizationManager
? ? ? ? ? ? .registerSynchronization(transactionSynchronization);
? ? } else if (this.annotation.fallbackExecution()) {
? ? ? ? // 如果當(dāng)前TransactionManager沒有開啟事務(wù)事件處理,但是當(dāng)前事務(wù)監(jiān)聽方法中配置了? ? ? ? // fallbackExecution屬性為true,說明其需要對當(dāng)前事務(wù)事件進(jìn)行監(jiān)聽,無論其是否有事務(wù)? ? ? ? if (this.annotation.phase() == TransactionPhase.AFTER_ROLLBACK
? ? ? ? ? ? && logger.isWarnEnabled()) {
? ? ? ? ? ? logger.warn("Processing "
? ? ? ? ? ? ? ? ? ? ? ? + event + " as a fallback execution on AFTER_ROLLBACK phase");
? ? ? ? }
? ? ? ? processEvent(event);
? ? } else {
? ? ? ? // 走到這里說明當(dāng)前是不需要事務(wù)事件處理的,因而直接略過? ? ? ? if (logger.isDebugEnabled()) {
? ? ? ? ? ? logger.debug("No transaction is active - skipping " + event);
? ? ? ? }
? ? }
}
```
這里需要說明的是,上述annotation屬性就是在事務(wù)監(jiān)聽方法上解析的TransactionalEventListener注解中配置的屬性。可以看到,對于事務(wù)事件的處理,這里創(chuàng)建了一個TransactionSynchronization對象,其實主要的處理邏輯就是在返回的這個對象中,而createTransactionSynchronization()方法內(nèi)部只是創(chuàng)建了一個TransactionSynchronizationEventAdapter對象就返回了。這里我們直接看該對象的源碼:
```
private static classTransactionSynchronizationEventAdapterextendsTransactionSynchronizationAdapter{
? ? private final ApplicationListenerMethodAdapter listener;
? ? private final ApplicationEvent event;
? ? private final TransactionPhase phase;
? ? publicTransactionSynchronizationEventAdapter(ApplicationListenerMethodAdapter
? ? ? ? listener, ApplicationEvent event, TransactionPhase phase){
? ? ? ? this.listener = listener;
? ? ? ? this.event = event;
? ? ? ? this.phase = phase;
? ? }
? ? @Override? ? publicintgetOrder(){
? ? ? ? return this.listener.getOrder();
? ? }
? ? // 在目標(biāo)方法配置的phase屬性為BEFORE_COMMIT時,處理before commit事件? ? publicvoidbeforeCommit(booleanreadOnly){
? ? ? ? if (this.phase == TransactionPhase.BEFORE_COMMIT) {
? ? ? ? ? ? processEvent();
? ? ? ? }
? ? }
? ? // 這里對于after completion事件的處理,雖然分為了三個if分支,但是實際上都是執(zhí)行的processEvent()? ? // 方法,因為after completion事件是事務(wù)事件中一定會執(zhí)行的,因而這里對于commit,? ? // rollback和completion事件都在當(dāng)前方法中處理也是沒問題的? ? publicvoidafterCompletion(intstatus){
? ? ? ? if (this.phase == TransactionPhase.AFTER_COMMIT && status == STATUS_COMMITTED) {
? ? ? ? ? ? processEvent();
? ? ? ? } else if (this.phase == TransactionPhase.AFTER_ROLLBACK
? ? ? ? ? ? ? ? ? && status == STATUS_ROLLED_BACK) {
? ? ? ? ? ? processEvent();
? ? ? ? } else if (this.phase == TransactionPhase.AFTER_COMPLETION) {
? ? ? ? ? ? processEvent();
? ? ? ? }
? ? }
? ? // 執(zhí)行事務(wù)事件? ? protectedvoidprocessEvent(){
? ? ? ? this.listener.processEvent(this.event);
? ? }
}
```
可以看到,對于事務(wù)事件的處理,最終都是委托給了ApplicationListenerMethodAdapter.processEvent()方法進(jìn)行的。如下是該方法的源碼:
```
publicvoidprocessEvent(ApplicationEvent event){
? ? // 處理事務(wù)事件的相關(guān)參數(shù),這里主要是判斷TransactionalEventListener注解中是否配置了value? ? // 或classes屬性,如果配置了,則將方法參數(shù)轉(zhuǎn)換為該指定類型傳給監(jiān)聽的方法;如果沒有配置,則判斷? ? // 目標(biāo)方法是ApplicationEvent類型還是PayloadApplicationEvent類型,是則轉(zhuǎn)換為該類型傳入? ? Object[] args = resolveArguments(event);
? ? // 這里主要是獲取TransactionalEventListener注解中的condition屬性,然后通過? ? // Spring expression language將其與目標(biāo)類和方法進(jìn)行匹配? ? if (shouldHandle(event, args)) {
? ? ? ? // 通過處理得到的參數(shù)借助于反射調(diào)用事務(wù)監(jiān)聽方法? ? ? ? Object result = doInvoke(args);
? ? ? ? if (result != null) {
? ? ? ? ? ? // 對方法的返回值進(jìn)行處理? ? ? ? ? ? handleResult(result);
? ? ? ? } else {
? ? ? ? ? ? logger.trace("No result object given - no result to handle");
? ? ? ? }
? ? }
}// 處理事務(wù)監(jiān)聽方法的參數(shù)protected Object[] resolveArguments(ApplicationEvent event) {
? ? // 獲取發(fā)布事務(wù)事件時傳入的參數(shù)類型? ? ResolvableType declaredEventType = getResolvableType(event);
? ? if (declaredEventType == null) {
? ? ? ? return null;
? ? }
? ? // 如果事務(wù)監(jiān)聽方法的參數(shù)個數(shù)為0,則直接返回? ? if (this.method.getParameterCount() == 0) {
? ? ? ? return new Object[0];
? ? }
? ? // 如果事務(wù)監(jiān)聽方法的參數(shù)不為ApplicationEvent或PayloadApplicationEvent,則直接將發(fā)布事務(wù)? ? // 事件時傳入的參數(shù)當(dāng)做事務(wù)監(jiān)聽方法的參數(shù)傳入。從這里可以看出,如果事務(wù)監(jiān)聽方法的參數(shù)不是? ? // ApplicationEvent或PayloadApplicationEvent類型,那么其參數(shù)必須只能有一個,并且這個? ? // 參數(shù)必須與發(fā)布事務(wù)事件時傳入的參數(shù)一致? ? Class<?> eventClass = declaredEventType.getRawClass();
? ? if ((eventClass == null || !ApplicationEvent.class.isAssignableFrom(eventClass)) &&
? ? ? ? event instanceof PayloadApplicationEvent) {
? ? ? ? return new Object[] {((PayloadApplicationEvent) event).getPayload()};
? ? } else {
? ? ? ? // 如果參數(shù)類型為ApplicationEvent或PayloadApplicationEvent,則直接將其傳入事務(wù)事件方法? ? ? ? return new Object[] {event};
? ? }
}// 判斷事務(wù)事件方法方法是否需要進(jìn)行事務(wù)事件處理privatebooleanshouldHandle(ApplicationEvent event, @Nullable Object[] args){
? ? if (args == null) {
? ? ? ? return false;
? ? }
? ? String condition = getCondition();
? ? if (StringUtils.hasText(condition)) {
? ? ? ? Assert.notNull(this.evaluator, "EventExpressionEvaluator must no be null");
? ? ? ? EvaluationContext evaluationContext = this.evaluator.createEvaluationContext(
? ? ? ? ? ? event, this.targetClass, this.method, args, this.applicationContext);
? ? ? ? return this.evaluator.condition(condition, this.methodKey, evaluationContext);
? ? }
? ? return true;
}// 對事務(wù)事件方法的返回值進(jìn)行處理,這里的處理方式主要是將其作為一個事件繼續(xù)發(fā)布出去,這樣就可以在// 一個統(tǒng)一的位置對事務(wù)事件的返回值進(jìn)行處理protectedvoidhandleResult(Object result){
? ? // 如果返回值是數(shù)組類型,則對數(shù)組元素一個一個進(jìn)行發(fā)布? ? if (result.getClass().isArray()) {
? ? ? ? Object[] events = ObjectUtils.toObjectArray(result);
? ? ? ? for (Object event : events) {
? ? ? ? ? ? publishEvent(event);
? ? ? ? }
? ? } else if (result instanceof Collection<?>) {
? ? ? ? // 如果返回值是集合類型,則對集合進(jìn)行遍歷,并且發(fā)布集合中的每個元素? ? ? ? Collection<?> events = (Collection<?>) result;
? ? ? ? for (Object event : events) {
? ? ? ? ? ? publishEvent(event);
? ? ? ? }
? ? } else {
? ? ? ? // 如果返回值是一個對象,則直接將其進(jìn)行發(fā)布? ? ? ? publishEvent(result);
? ? }
}
```
對于事務(wù)事件的處理,總結(jié)而言,就是為每個事務(wù)事件監(jiān)聽方法創(chuàng)建了一個TransactionSynchronizationEventAdapter對象,通過該對象在發(fā)布事務(wù)事件的時候,會在當(dāng)前線程中注冊該對象,這樣就可以保證每個線程每個監(jiān)聽器中只會對應(yīng)一個TransactionSynchronizationEventAdapter對象。在Spring進(jìn)行事務(wù)事件的時候會調(diào)用該對象對應(yīng)的監(jiān)聽方法,從而達(dá)到對事務(wù)事件進(jìn)行監(jiān)聽的目的。
在此我向大家推薦一個架構(gòu)學(xué)習(xí)交流群。交流學(xué)習(xí)群號:478030634 里面會分享一些資深架構(gòu)師錄制的視頻錄像:有Spring,MyBatis,Netty源碼分析,高并發(fā)、高性能、分布式、微服務(wù)架構(gòu)的原理,JVM性能優(yōu)化、分布式架構(gòu)等這些成為架構(gòu)師必備的知識體系。還能領(lǐng)取免費的學(xué)習(xí)資源,目前受益良多
3. 小結(jié)
本文首先對事務(wù)事件監(jiān)聽程序的使用方式進(jìn)行了講解,然后在源碼層面講解了Spring事務(wù)監(jiān)聽器是如何實現(xiàn)的。在Spring事務(wù)監(jiān)聽器使用過程中,需要注意的是要對當(dāng)前接收到的事件類型進(jìn)行判斷,因為不同的事務(wù)可能會發(fā)布同樣的消息對象過來。
原文連接:https://blog.csdn.net/yunzhaji3762/article/details/83241843