前言
CQRS全稱為Command Query Responsibility Segregation,是領(lǐng)域驅(qū)動編程思想中的一個概念,當然也可以脫離DDD,當作讀寫分離去使用。
傳統(tǒng)Rest模式中,DTO -> PO基本上是一樣的,是一種面向數(shù)據(jù)庫模型編程,且讀和寫操作的模型耦合,也不太方便將領(lǐng)域數(shù)據(jù)映射到頁面顯示。
CQRS將讀和寫分為Query與Command。
其中Command屬于寫操作,應(yīng)該聲明為void 或者返回id。
其中Query屬于讀操作,不應(yīng)該存在修改狀態(tài)行為,返回具體數(shù)據(jù)類型。
簡單應(yīng)用
首先抽象出Command和CommandHandler的概念,前者代表命令,后者代表命令處理者,Query同理。
public interface Command<R> {
}
public interface CommandHandler<R, C extends Command<R>> {
/**
* command handle
*
* @param command
* @return
*/
R handle(C command);
}
public interface Query<R> {
}
public interface QueryHandler<R, C extends Query<R>> {
/**
* query handle
*
* @param query
* @return
*/
R handle(C query);
}
基于Spring實現(xiàn)的話,可以使用IOC容器現(xiàn)成的applicationContext工廠實現(xiàn)Command Handler打表。
public class CommandProvider<H extends CommandHandler<?, ?>> {
private final ApplicationContext applicationContext;
private final Class<H> type;
CommandProvider(ApplicationContext applicationContext, Class<H> type) {
this.applicationContext = applicationContext;
this.type = type;
}
public H get() {
return applicationContext.getBean(type);
}
}
public class QueryProvider<H extends QueryHandler<?, ?>> {
private final ApplicationContext applicationContext;
private final Class<H> type;
QueryProvider(ApplicationContext applicationContext, Class<H> type) {
this.applicationContext = applicationContext;
this.type = type;
}
public H get() {
return applicationContext.getBean(type);
}
}
public class CommandHandlerRegistrar {
private Map<Class<? extends Command>, CommandProvider> commandProviderMap = new HashMap<>();
private Map<Class<? extends Query>, QueryProvider> queryProviderMap = new HashMap<>();
public CommandHandlerRegistrar(ApplicationContext applicationContext) {
String[] names = applicationContext.getBeanNamesForType(CommandHandler.class);
for (String name : names) {
registerCommand(applicationContext, name);
}
names = applicationContext.getBeanNamesForType(QueryHandler.class);
for (String name : names) {
registerQuery(applicationContext, name);
}
}
private void registerCommand(ApplicationContext applicationContext, String name) {
Class<CommandHandler<?, ?>> handlerClass = (Class<CommandHandler<?, ?>>)applicationContext.getType(name);
Class<?>[] generics = GenericTypeResolver.resolveTypeArguments(handlerClass, CommandHandler.class);
Class<? extends Command> commandType = (Class<? extends Command>)generics[1];
commandProviderMap.put(commandType, new CommandProvider(applicationContext, handlerClass));
}
private void registerQuery(ApplicationContext applicationContext, String name) {
Class<QueryHandler<?, ?>> handlerClass = (Class<QueryHandler<?, ?>>)applicationContext.getType(name);
Class<?>[] generics = GenericTypeResolver.resolveTypeArguments(handlerClass, QueryHandler.class);
Class<? extends Query> queryType = (Class<? extends Query>)generics[1];
queryProviderMap.put(queryType, new QueryProvider(applicationContext, handlerClass));
}
@SuppressWarnings("unchecked")
<R, C extends Command<R>> CommandHandler<R, C> getCmd(Class<C> commandClass) {
return commandProviderMap.get(commandClass).get();
}
@SuppressWarnings("unchecked")
<R, C extends Query<R>> QueryHandler<R, C> getQuery(Class<C> commandClass) {
return queryProviderMap.get(commandClass).get();
}
}
再抽象出EventBus
public interface EventBus {
/**
* command
*
* @param command
* @param <R>
* @param <C>
* @return
*/
<R, C extends Command<R>> R executeCommand(C command);
/**
* query
*
* @param query
* @param <R>
* @param <Q>
* @return
*/
<R, Q extends Query<R>> R executeQuery(Q query);
}
public class SpringEventBus implements EventBus {
private final CommandHandlerRegistrar registry;
public SpringEventBus(CommandHandlerRegistrar registry) {
this.registry = registry;
}
@Override
public <R, C extends Command<R>> R executeCommand(C command) {
CommandHandler<R, C> commandHandler = (CommandHandler<R, C>)registry.getCmd(command.getClass());
return commandHandler.handle(command);
}
@Override
public <R, Q extends Query<R>> R executeQuery(Q query) {
QueryHandler<R, Q> queryHandler = (QueryHandler<R, Q>)registry.getQuery(query.getClass());
return queryHandler.handle(query);
}
}
@Configuration即完成了Command Handler注冊發(fā)現(xiàn)。
@Bean
public CommandHandlerRegistrar registry(ApplicationContext applicationContext) {
return new CommandHandlerRegistrar(applicationContext);
}
@Bean
public EventBus commandBus(CommandHandlerRegistrar registry) {
return new SpringEventBus(registry);
}
然后在Controller層就可以直接依賴EventBus做讀寫處理,替換以前的service操作。
@RestController
@RequiredArgsConstructor
public class PoliciesController {
private final EventBus bus;
@PostMapping
public ResponseEntity<CreatePolicyResult> createPolicy(@RequestBody CreatePolicyCommand command) {
return ok(bus.executeCommand(command));
}
@PostMapping("/confirmTermination")
public ResponseEntity<ConfirmTerminationResult> terminatePolicy(@RequestBody ConfirmTerminationCommand command) {
return ok(bus.executeCommand(command));
}
@PostMapping("/confirmBuyAdditionalCover")
public ResponseEntity<ConfirmBuyAdditionalCoverResult> buyAdditionalCover(@RequestBody ConfirmBuyAdditionalCoverCommand command) {
return ok(bus.executeCommand(command));
}
@PostMapping("/find")
public Collection<PolicyInfoDto> find(@RequestBody FindPoliciesQuery query) {
return bus.executeQuery(query);
}
@GetMapping("/details/{policyNumber}/versions")
public ResponseEntity<PolicyVersionsListDto> getPolicyVersions(@PathVariable String policyNumber) {
return ok(bus.executeQuery(new GetPolicyVersionsListQuery(policyNumber)));
}
@GetMapping("/details/{policyNumber}/versions/{versionNumber}")
public ResponseEntity<PolicyVersionDto> getPolicyVersionDetails(@PathVariable String policyNumber, @PathVariable int versionNumber) {
return ok(bus.executeQuery(new GetPolicyVersionDetailsQuery(policyNumber, versionNumber)));
}
}
這里是一個Command和Query操作分發(fā)的實現(xiàn)雛形,有幾點細節(jié)。
(1) EventBus實現(xiàn)有多種方式,Controller依賴抽象即可替換,本質(zhì)是Scan到所有CommandHandler子類以后打一張map表,key是Command Class,value是CommandProvider工廠。這里自研注解在ImportBeanDefinitionRegistrar流程操作BeanDefinition也可以,自己用scanner跳過spring打表也可以。
(2)Bus就不區(qū)分Command和Query了,他屬于dispatcher。
(3)讀和寫的模型分開了,寫入?yún)ommand實現(xiàn)類,讀入?yún)uery實現(xiàn)類。
往下看一下Handler邏輯
@Component
@Transactional(rollbackFor = Throwable.class)
@RequiredArgsConstructor
public class CreatePolicyHandler implements CommandHandler<CreatePolicyResult, CreatePolicyCommand> {
private final OfferRepository offerRepository;
private final PolicyRepository policyRepository;
private final EventPublisher eventPublisher;
@Override
public CreatePolicyResult handle(CreatePolicyCommand command) {
Offer offer = offerRepository.withNumber(command.getOfferNumber());
Policy policy = Policy.convertOffer(offer, UUID.randomUUID().toString(), command.getPurchaseDate(), command.getPolicyStartDate());
policyRepository.add(policy);
eventPublisher.publish(new PolicyEvents.PolicyCreated(this, policy));
return new CreatePolicyResult(policy.getNumber());
}
}
Repository和EventPublisher都屬于抽象,可替換實現(xiàn)。
看一下領(lǐng)域?qū)ο蠛虴vent。
@Entity
@Getter
@NoArgsConstructor
@AllArgsConstructor
public class Policy {
@Id
@GeneratedValue
private UUID id;
private String number;
@ManyToOne(optional = false)
private Product product;
@OneToMany(cascade = CascadeType.ALL)
private List<PolicyVersion> versions = new ArrayList<>();
private LocalDate purchaseDate;
public Policy(UUID uuid, String policyNumber, Product product, LocalDate purchaseDate) {
this.id = uuid;
this.number = policyNumber;
this.product = product;
this.purchaseDate = purchaseDate;
}
public static Policy convertOffer(
Offer offer,
String policyNumber,
LocalDate purchaseDate,
LocalDate policyStartDate) {
if (offer.isConverted()) { throw new BusinessException("Offer already converted"); }
if (offer.isRejected()) { throw new BusinessException("Offer already rejected"); }
if (offer.isExpired(purchaseDate)) { throw new BusinessException("Offer expired"); }
if (offer.isExpired(policyStartDate)) { throw new BusinessException("Offer not valid at policy start date"); }
Policy
newPolicy = new Policy(
UUID.randomUUID(),
policyNumber,
offer.getProduct(),
purchaseDate
);
newPolicy.addFirstVersion(offer, purchaseDate, policyStartDate);
newPolicy.confirmChanges(1);
return newPolicy;
}
public void extendCoverage(LocalDate effectiveDateOfChange, CoverPrice newCover) {
//preconditions
if (isTerminated()) { throw new BusinessException("Cannot annex terminated policy"); }
Optional<PolicyVersion> versionAtEffectiveDate = getPolicyVersions().effectiveAtDate(effectiveDateOfChange);
if (!versionAtEffectiveDate.isPresent()) { throw new BusinessException("No active version at given date"); }
PolicyVersion annexVer = addNewVersionBasedOn(versionAtEffectiveDate.get(), effectiveDateOfChange);
annexVer.addCover(newCover, effectiveDateOfChange, annexVer.getCoverPeriod().getTo());
}
private boolean isTerminated() {
return versions.stream().anyMatch(v -> v.isActive() && PolicyStatus.Terminated.equals(v.getPolicyStatus()));
}
public void terminatePolicy(LocalDate effectiveDateOfChange) {
if (isTerminated()) { throw new BusinessException("Policy already terminated"); }
Optional<PolicyVersion> versionAtEffectiveDateOpt = getPolicyVersions().effectiveAtDate(effectiveDateOfChange);
if (!versionAtEffectiveDateOpt.isPresent()) { throw new BusinessException("No active version at given date"); }
PolicyVersion versionAtEffectiveDate = versionAtEffectiveDateOpt.get();
if (!versionAtEffectiveDate.getCoverPeriod().contains(effectiveDateOfChange)) {
throw new BusinessException("Cannot terminate policy at given date as it is not withing cover period");
}
PolicyVersion termVer = addNewVersionBasedOn(versionAtEffectiveDate, effectiveDateOfChange);
termVer.endPolicyOn(effectiveDateOfChange.minusDays(1));
}
public void cancelLastAnnex() {
PolicyVersion lastActiveVer = getPolicyVersions().latestActive();
if (lastActiveVer == null) { throw new BusinessException("There are no annexed left to cancel"); }
lastActiveVer.cancel();
}
public void confirmChanges(int versionToConfirmNumber) {
Optional<PolicyVersion> versionToConfirm = getPolicyVersions().withNumber(versionToConfirmNumber);
if (!versionToConfirm.isPresent()) { throw new BusinessException("Version not found"); }
versionToConfirm.get().confirm();
}
private void addFirstVersion(Offer offer, LocalDate purchaseDate, LocalDate policyStartDate) {
PolicyVersion
ver = new PolicyVersion(
UUID.randomUUID(),
1,
PolicyStatus.Active,
DateRange.between(policyStartDate, policyStartDate.plus(offer.getCoverPeriod())),
DateRange.between(policyStartDate, policyStartDate.plus(offer.getCoverPeriod())),
offer.getCustomer().copy(),
offer.getDriver().copy(),
offer.getCar().copy(),
offer.getTotalCost(),
offer.getCovers()
);
versions.add(ver);
}
private PolicyVersion addNewVersionBasedOn(
PolicyVersion versionAtEffectiveDate, LocalDate effectiveDateOfChange) {
PolicyVersion
newVersion = new PolicyVersion(
versionAtEffectiveDate,
getPolicyVersions().maxVersionNumber() + 1,
effectiveDateOfChange);
versions.add(newVersion);
return newVersion;
}
public PolicyVersions getPolicyVersions() {
return new PolicyVersions(versions);
}
public enum PolicyStatus {
Active,
Terminated
}
}
public class PolicyEvents {
@Getter
public static class PolicyCreated extends Event {
private Policy newPolicy;
public PolicyCreated(Object source, Policy newPolicy) {
super(source);
this.newPolicy = newPolicy;
}
}
@Getter
public static class PolicyAnnexed extends Event {
private Policy annexedPolicy;
private PolicyVersion annexVersion;
public PolicyAnnexed(
Object source, Policy annexedPolicy, PolicyVersion annexVersion) {
super(source);
this.annexedPolicy = annexedPolicy;
this.annexVersion = annexVersion;
}
}
@Getter
public static class PolicyTerminated extends Event {
private Policy terminatedPolicy;
private PolicyVersion terminatedVersion;
public PolicyTerminated(
Object source, Policy terminatedPolicy, PolicyVersion terminatedVersion) {
super(source);
this.terminatedPolicy = terminatedPolicy;
this.terminatedVersion = terminatedVersion;
}
}
@Getter
public static class PolicyAnnexCancelled extends Event {
private Policy policy;
private PolicyVersion cancelledAnnexVersion;
private PolicyVersion currentVersionAfterAnnexCancellation;
public PolicyAnnexCancelled(Object source,
Policy policy,
PolicyVersion cancelledAnnexVersion,
PolicyVersion currentVersionAfterAnnexCancellation) {
super(source);
this.policy = policy;
this.cancelledAnnexVersion = cancelledAnnexVersion;
this.currentVersionAfterAnnexCancellation = currentVersionAfterAnnexCancellation;
}
}
}
相應(yīng)的EventHandler:
@Component
@RequiredArgsConstructor
class PolicyEventsProjectionsHandler {
private final PolicyInfoDtoProjection policyInfoDtoProjection;
private final PolicyVersionDtoProjection policyVersionDtoProjection;
@EventListener
public void handlePolicyCreated(PolicyEvents.PolicyCreated event) {
policyInfoDtoProjection.createPolicyInfoDto(event.getNewPolicy());
policyVersionDtoProjection.createPolicyVersionDto(event.getNewPolicy(),
event.getNewPolicy().getPolicyVersions().withNumber(1).get());
}
@EventListener
public void handlePolicyTerminated(PolicyEvents.PolicyTerminated event) {
policyInfoDtoProjection.updatePolicyInfoDto(event.getTerminatedPolicy(), event.getTerminatedVersion());
policyVersionDtoProjection.createPolicyVersionDto(event.getTerminatedPolicy(), event.getTerminatedVersion());
}
@EventListener
public void handlePolicyAnnexed(PolicyEvents.PolicyAnnexed event) {
policyInfoDtoProjection.updatePolicyInfoDto(event.getAnnexedPolicy(), event.getAnnexVersion());
policyVersionDtoProjection.createPolicyVersionDto(event.getAnnexedPolicy(), event.getAnnexVersion());
}
@EventListener
public void handlePolicyAnnexCancelled(PolicyEvents.PolicyAnnexCancelled event) {
policyInfoDtoProjection.updatePolicyInfoDto(event.getPolicy(), event.getCurrentVersionAfterAnnexCancellation());
policyVersionDtoProjection.updatePolicyVersionDto(event.getCancelledAnnexVersion());
}
}
@Component
@Transactional(rollbackFor = Throwable.class)
@RequiredArgsConstructor
public class PolicyInfoDtoProjection {
private final PolicyInfoDtoRepository policyInfoDtoRepository;
public void createPolicyInfoDto(Policy policy) {
PolicyVersion policyVersion = policy.getPolicyVersions().withNumber(1).get();
PolicyInfoDto policyInfo = buildPolicyInfoDto(policy, policyVersion);
policyInfoDtoRepository.save(policyInfo);
}
public void updatePolicyInfoDto(Policy policy, PolicyVersion currentVersion) {
PolicyInfoDto policyInfo = buildPolicyInfoDto(policy, currentVersion);
policyInfoDtoRepository.update(policyInfo);
}
private PolicyInfoDto buildPolicyInfoDto(Policy policy, PolicyVersion policyVersion) {
return new PolicyInfoDto(
policy.getId(),
policy.getNumber(),
policyVersion.getCoverPeriod().getFrom(),
policyVersion.getCoverPeriod().getTo(),
policyVersion.getCar().getPlaceNumberWithMake(),
policyVersion.getPolicyHolder().getFullName(),
policyVersion.getTotalPremium().getAmount()
);
}
}
public interface PolicyInfoDtoRepository extends CrudRepository<PolicyInfoDto, Long> {
/**
* update
*
* @param policy
*/
@Modifying
@Query("UPDATE policy_info_dto " +
"SET " +
"cover_from = :policy.coverFrom, " +
"cover_to = :policy.coverTo, " +
"vehicle = :policy.vehicle, " +
"policy_holder = :policy.policyHolder, " +
"total_premium = :policy.totalPremium " +
"WHERE " +
"policy_id = :policy.policyId")
void update(@Param("policy") PolicyInfoDto policy);
/**
* find one
*
* @param policyId
* @return
*/
@Query("SELECT * FROM policy_info_dto p WHERE p.policy_id = :policyId")
Optional<PolicyInfoDto> findByPolicyId(@Param("policyId") UUID policyId);
}
再看一下Query:
@Component
@RequiredArgsConstructor
public class GetPolicyVersionDetailsHandler implements QueryHandler<PolicyVersionDto, GetPolicyVersionDetailsQuery> {
private final PolicyVersionDtoFinder policyVersionDtoFinder;
@Override
public PolicyVersionDto handle(GetPolicyVersionDetailsQuery query) {
return policyVersionDtoFinder.findByPolicyNumberAndVersionNumber(query.getPolicyNumber(), query.getVersionNumber());
}
}
@Component
@RequiredArgsConstructor
public class PolicyVersionDtoFinder {
private final PolicyVersionDtoRepository repository;
public PolicyVersionsListDto findVersionsByPolicyNumber(String policyNumber) {
return new PolicyVersionsListDto(policyNumber, repository.findVersionsByPolicyNumber(policyNumber));
}
public PolicyVersionDto findByPolicyNumberAndVersionNumber(String policyNumber, int versionNumber) {
PolicyVersionDto dto = repository.findByPolicyNumberAndVersionNumber(policyNumber, versionNumber);
List<PolicyVersionCoverDto> coversInVersion = repository.getCoversInVersion(dto.getId());
dto.setCovers(coversInVersion);
return dto;
}
}
public interface PolicyVersionDtoRepository extends CrudRepository<PolicyVersionDto, Long> {
/**
* update
*
* @param versionStatus
* @param policyVersionId
*/
@Modifying
@Query("UPDATE policy_version_dto " +
"SET " +
"version_status = :versionStatus " +
"WHERE " +
"policy_version_id = :policyVersionId")
void update(@Param("versionStatus") String versionStatus, @Param("policyVersionId") String policyVersionId);
/**
* find one
*
* @param policyNumber
* @param versionNumber
* @return
*/
@Query(value = "SELECT " +
"id, policy_version_id, policy_id, " +
"policy_number, version_number, " +
"product_code, " +
"version_status, policy_status, " +
"policy_holder, insured, car, " +
"cover_from, cover_to, version_from, version_to, " +
"total_premium_amount " +
"FROM policy_version_dto " +
"WHERE " +
"policy_number = :policyNumber " +
"AND version_number = :versionNumber",
rowMapperClass = PolicyVersionDto.PolicyVersionDtoRowMapper.class)
PolicyVersionDto findByPolicyNumberAndVersionNumber(
@Param("policyNumber") String policyNumber,
@Param("versionNumber") int versionNumber);
/**
* find one
*
* @param policyVersionDtoId
* @return
*/
@Query("SELECT * " +
"FROM policy_version_cover_dto " +
"WHERE " +
"policy_version_dto = :policyVersionDtoId")
List<PolicyVersionCoverDto> getCoversInVersion(@Param("policyVersionDtoId") Long policyVersionDtoId);
/**
* find one
*
* @param policyNumber
* @return
*/
@Query(value = "SELECT " +
"version_number, " +
"version_from, " +
"version_to, " +
"version_status " +
"FROM policy_version_dto " +
"WHERE " +
"policy_number = :policyNumber",
rowMapperClass = PolicyVersionsListDto.PolicyVersionInfoDtoRowMapper.class)
List<PolicyVersionsListDto.PolicyVersionInfoDto> findVersionsByPolicyNumber(
@Param("policyNumber") String policyNumber);
}
@Getter
@AllArgsConstructor
public class PolicyVersionsListDto {
private String policyNumber;
private List<PolicyVersionInfoDto> versionsInfo;
@Getter
@AllArgsConstructor
public static class PolicyVersionInfoDto {
private int number;
private LocalDate versionFrom;
private LocalDate versionTo;
private String versionStatus;
}
static class PolicyVersionInfoDtoRowMapper implements RowMapper<PolicyVersionInfoDto> {
@Override
public PolicyVersionInfoDto mapRow(ResultSet rs, int i) throws SQLException {
return new PolicyVersionInfoDto(
rs.getInt("version_number"),
rs.getDate("version_from").toLocalDate(),
rs.getDate("version_to").toLocalDate(),
rs.getString("version_status")
);
}
}
}
代碼分層
最終大體結(jié)構(gòu)如下


commands存放CommandHandlers
queries存放QueryHandlers
CommandHandlers觸發(fā)的Event由eventhandlers包下消費。
domain存放領(lǐng)域?qū)ο蟆?/p>
按照DDD分層的話,任何外部端口屬于六邊形洋蔥架構(gòu),統(tǒng)一放在infrastructure層適配即可,本例介紹最簡單的CQRS實踐,就不討論application、domain、infrastructure、interfaces那種DI分層了。
Event-Sourcing拓展
完整的Event-Sourcing的話,還需要很多細節(jié),回溯需要Event持久化,類似于redis沒有重寫過的aof文件,可以將Event鏈路復現(xiàn),方便分析數(shù)據(jù)過程,管理版本。
還有數(shù)據(jù)一致性的問題,需要引入最終一致性和柔性事務(wù),常見的有業(yè)務(wù)上使用MQ補償,或者Saga,像Axon Framework等現(xiàn)成的CQRS框架。
如果說接入Event持久化的話,并不復雜,還是Handler那個地方,Transaction注解已經(jīng)包住了publish前中期的代碼,publish event之前落庫即可,復雜的是event可視化治理投入。
Saga現(xiàn)在也有現(xiàn)成的框架可以接。
性能方面拓展可以在Command落庫以后,binlog同步es、redis、mongodb等,查詢端走es,走es這個finder實現(xiàn)也可以隨時替換成mongodb等。甚至在封裝一層分布式內(nèi)存緩存,擊穿則讀es reset。
適合Event Sourcing的場景
系統(tǒng)沒有大量的CRUD,復雜業(yè)務(wù)的團隊。
有DDD經(jīng)驗或者具備DDD素養(yǎng)的團隊。
關(guān)注業(yè)務(wù)數(shù)據(jù)產(chǎn)生過程,關(guān)注業(yè)務(wù)流程運維,關(guān)注報表等情況的團隊。
版本管理、版本回退等需求。