6. 基于Spring Data的領(lǐng)域事件發(fā)布

領(lǐng)域事件發(fā)布是一個(gè)領(lǐng)域?qū)ο鬄榱俗屍渌鼘?duì)象知道自己已經(jīng)處理完成某個(gè)操作時(shí)發(fā)出的一個(gè)通知,事件發(fā)布力求從代碼層面讓自身對(duì)象與外部對(duì)象解耦,并減少技術(shù)代碼入侵。

一、 手動(dòng)發(fā)布事件

// 實(shí)體定義
@Entity
public class Department implements Serializable {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Integer departmentId;

    @Enumerated(EnumType.STRING)
    private State state;
}

// 事件定義
public class DepartmentEvent {
    private Department department;
    private State state;
    public DepartmentEvent(Department department) {
        this.department = department;
        state = department.getState();
    }
}

// 領(lǐng)域服務(wù)
@Service
public class ApplicationService {

    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;

    @Autowired
    private DepartmentRepository departmentRepository;

    @Transactional(rollbackFor = Exception.class)
    public void departmentAdd(Department department) {
        departmentRepository.save(department);
        // 事件發(fā)布
        applicationEventPublisher.publishEvent(new DepartmentEvent(department));
    }
}

使用applicationEventPublisher.publishEvent在領(lǐng)域服務(wù)處理完成后發(fā)布領(lǐng)域事件,此方法需要在業(yè)務(wù)代碼中顯式發(fā)布事件,并在領(lǐng)域服務(wù)里引入ApplicationEventPublisher類(lèi),但對(duì)領(lǐng)域服務(wù)本身有一定的入侵性,但靈活性較高。

二、 自動(dòng)發(fā)布事件

// 實(shí)體定義
@Entity
public class SaleOrder implements Serializable {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Integer orderId;
   
    @Enumerated(EnumType.STRING)
    private State state;

    // 返回類(lèi)型定義
    @DomainEvents
    public List<Object> domainEvents(){
        return Stream.of(new SaleOrderEvent(this)).collect(Collectors.toList());
    }

    // 事件發(fā)布后callback
    @AfterDomainEventPublication
    void callback() {
        System.err.println("ok");
    }
}

// 事件定義
public class SaleOrderEvent {
    private SaleOrder saleOrder;
    private State state;
    public SaleOrderEvent(SaleOrder saleOrder) {
        this.saleOrder = saleOrder;
        state = saleOrder.getState();
    }
}

// 領(lǐng)域服務(wù)
@Service
public class ApplicationService {
    @Autowired
    private OrderRepository orderRepository;
    
    @Transactional(rollbackFor = Exception.class)
    public void saleOrderAdd(SaleOrder saleOrder) {
        orderRepository.save(saleOrder);
    }
}

使用@DomainEvents定義事件返回的類(lèi)型,必須是一個(gè)集合,使用@AfterDomainEventPublication定義事件發(fā)布后的回調(diào)。
此方法實(shí)事件類(lèi)型定義在實(shí)體中,與領(lǐng)域服務(wù)完全解耦,沒(méi)有入侵。系統(tǒng)會(huì)在orderRepository.save(saleOrder)后自動(dòng)調(diào)用事件發(fā)布,另delete方法不會(huì)調(diào)用事件發(fā)布。

三、 事件監(jiān)聽(tīng)

@Component
public class ApplicationEventProcessor {

    @EventListener(condition = "#departmentEvent.getState().toString() == 'SUCCEED'")
    public void departmentCreated(DepartmentEvent departmentEvent) {
        System.err.println("dept-event1:" + departmentEvent);
    }

    @Async
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT, condition = "#saleOrderEvent.getState().toString() == 'SUCCEED'")
    public void saleOrderCreated(SaleOrderEvent saleOrderEvent) {
        System.err.println("sale-event succeed1:" + saleOrderEvent);
    }

    @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT, condition = "#saleOrderEvent.getState().toString() == 'SUCCEED'")
    public void saleOrderCreatedBefore(SaleOrderEvent saleOrderEvent) {
        System.err.println("sale-event succeed2:" + saleOrderEvent);
    }

    @Async
    @TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
    public void saleOrderCreatedFailed(SaleOrderEvent saleOrderEvent) {
        System.out.println("sale-event failed:" + saleOrderEvent);
    }
}

1. 使用@EventListener監(jiān)聽(tīng)事件

@EventListener沒(méi)有事務(wù)支持,只要事件發(fā)出就可監(jiān)控到

@Transactional(rollbackFor = Exception.class)
public void departmentAdd(Department department) {
    departmentRepository.save(department);
    applicationEventPublisher.publishEvent(new DepartmentEvent(department));
    throw new RuntimeException("failed");
}

上述情況會(huì)造成事務(wù)失敗回滾,但事件監(jiān)控端已經(jīng)執(zhí)行,可能導(dǎo)致數(shù)據(jù)不一致的情況發(fā)生

2. 使用@TransactionalEventListener監(jiān)聽(tīng)事件

  • TransactionPhase.BEFORE_COMMIT 事務(wù)提交前
  • TransactionPhase.AFTER_COMMIT 事務(wù)提交后
  • TransactionPhase.AFTER_ROLLBACK 事務(wù)回滾后
  • TransactionPhase.AFTER_COMPLETION 事務(wù)完成后

使用TransactionPhase.AFTER_COMMIT可在事務(wù)完成后,再執(zhí)行事件監(jiān)聽(tīng)方法,從而保證數(shù)據(jù)的一致性

3. TransactionPhase.AFTER_ROLLBACK回滾事務(wù)問(wèn)題

@Async
@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK, condition = "#departmentEvent.getState().toString() == 'SUCCEED'")
public void departmentCreatedFailed(DepartmentEvent departmentEvent) {
    System.err.println("dept-event3:" + departmentEvent);
}

由于@DomainEvents作用在實(shí)體上的,只有剛orderRepository.save(saleOrder)執(zhí)行成功后才會(huì)發(fā)送事件,故AFTER_ROLLBACK方法只會(huì)在同一事務(wù)中其它語(yǔ)句執(zhí)行失敗或顯式rollback時(shí)才會(huì)執(zhí)行,如果save方法執(zhí)行失敗,將不會(huì)監(jiān)聽(tīng)到回滾事件。

4. @Async異步事件監(jiān)聽(tīng)

  • 沒(méi)有此注解事件監(jiān)聽(tīng)方法與主方法為一個(gè)事務(wù)。
  • 使用此注解將脫離原有事務(wù),BEFORE_COMMIT也無(wú)法攔截事務(wù)提交前時(shí)刻
  • 此注解需要配合@EnableAsync一起使用

四、 總結(jié)

通過(guò)對(duì) @DomainEvents@TransactionalEventListener的使用,在有效的解決領(lǐng)域事件發(fā)布的情況下,減少了對(duì)業(yè)務(wù)代碼的入侵,同時(shí)盡一步解決了數(shù)據(jù)一致性問(wèn)題。
在分布式結(jié)構(gòu)下,通過(guò)MQ發(fā)送事件通知給其它服務(wù),為解決一致性問(wèn)題,防止對(duì)方服務(wù)處理失敗可先將事件保久化到數(shù)據(jù)庫(kù)后,再重試。

五、 源碼

https://gitee.com/hypier/barry-jpa/tree/master/jpa-section-5

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容