事件驱动架构EDA:事件溯源、CQRS与Spring的完整实现方案
事件驱动架构EDA:事件溯源、CQRS与Spring的完整实现方案
适读人群:Java架构师、高级后端工程师 | 阅读时长:约20分钟 | 技术栈:Spring Boot 3.x、Kafka、EventStore、Axon Framework
开篇故事
几年前我参与了一个电商平台的架构改造。那时候系统已经非常复杂,订单、库存、支付、物流四个核心域,互相之间调用关系错综复杂。一个订单的状态变更,要同步触发库存扣减、支付确认、物流创建,任何一步失败都可能导致数据不一致。
我们用了分布式锁、Saga、补偿事务,但代码越来越难维护,线上事故时不时出现,每次排查都要花几个小时看日志。
后来学习了事件驱动架构(EDA)和CQRS,重新理解了这类问题的解法。今天不是说"换了EDA就万事大吉"——EDA有它自己的复杂度,但它解决的问题角度不一样,值得深入理解。
一、核心问题:传统CRUD架构的局限
1.1 状态驱动 vs 事件驱动
传统CRUD:数据库里存的是"当前状态"——订单状态=已付款。
事件溯源(Event Sourcing):数据库里存的是"事件序列"——OrderCreated、PaymentReceived、OrderShipped。当前状态是重放所有历史事件得到的结果。
1.2 CQRS:分离读写模型
写模型专注领域逻辑,读模型专注查询性能。写数据库可以是事件日志,读数据库可以是关系型、文档型、搜索引擎,各自选最适合的。
二、原理深度解析
2.1 Event Sourcing的工作原理
2.2 Aggregate:事件溯源的核心概念
Aggregate是边界内业务一致性的保证者,只有Aggregate内部的状态变更才能保证事务一致性。跨Aggregate的操作应该通过事件异步协调。
三、完整代码实现
3.1 领域事件定义
// 领域事件基类
public abstract class DomainEvent {
private final String eventId;
private final String aggregateId;
private final String eventType;
private final Instant occurredOn;
private final int version;
protected DomainEvent(String aggregateId, int version) {
this.eventId = UUID.randomUUID().toString();
this.aggregateId = aggregateId;
this.eventType = getClass().getSimpleName();
this.occurredOn = Instant.now();
this.version = version;
}
}
// 订单相关事件
@Value
public class OrderCreatedEvent extends DomainEvent {
Long userId;
List<OrderItem> items;
BigDecimal totalAmount;
String shippingAddress;
public OrderCreatedEvent(String orderId, Long userId, List<OrderItem> items,
BigDecimal totalAmount, String shippingAddress, int version) {
super(orderId, version);
this.userId = userId;
this.items = items;
this.totalAmount = totalAmount;
this.shippingAddress = shippingAddress;
}
}@Value
public class PaymentReceivedEvent extends DomainEvent {
String paymentId;
BigDecimal amount;
String paymentMethod;
Instant paidAt;
public PaymentReceivedEvent(String orderId, String paymentId, BigDecimal amount,
String paymentMethod, Instant paidAt, int version) {
super(orderId, version);
this.paymentId = paymentId;
this.amount = amount;
this.paymentMethod = paymentMethod;
this.paidAt = paidAt;
}
}@Value
public class OrderCancelledEvent extends DomainEvent {
String reason;
boolean refundRequired;
}3.2 Order Aggregate实现
/**
* Order聚合根:包含订单的所有业务逻辑
* 注意:Aggregate不持有事件,而是通过apply方法改变状态
*/
public class OrderAggregate {
// 聚合根ID
private String orderId;
// 内部状态(由事件驱动)
private Long userId;
private OrderStatus status;
private BigDecimal totalAmount;
private List<OrderItem> items;
private String paymentId;
private int version;
// 待发布的事件
private final List<DomainEvent> pendingEvents = new ArrayList<>();
/**
* 从历史事件重建状态(核心机制)
*/
public static OrderAggregate reconstruct(List<DomainEvent> events) {
OrderAggregate order = new OrderAggregate();
events.forEach(order::apply);
return order;
}
/**
* 处理创建订单命令
*/
public void createOrder(String orderId, Long userId, List<OrderItem> items,
String shippingAddress) {
// 业务校验
if (items == null || items.isEmpty()) {
throw new OrderException("订单必须包含至少一个商品");
}
BigDecimal total = items.stream()
.map(item -> item.getPrice().multiply(BigDecimal.valueOf(item.getQuantity())))
.reduce(BigDecimal.ZERO, BigDecimal::add);
// 产生事件(不直接修改状态)
OrderCreatedEvent event = new OrderCreatedEvent(orderId, userId, items,
total, shippingAddress, version + 1);
apply(event); // 应用事件修改状态
pendingEvents.add(event); // 记录待发布的事件
}
/**
* 处理支付确认命令
*/
public void confirmPayment(String paymentId, BigDecimal amount, String paymentMethod) {
if (status != OrderStatus.PENDING) {
throw new OrderException("只有待支付的订单才能确认支付,当前状态: " + status);
}
if (amount.compareTo(totalAmount) != 0) {
throw new OrderException("支付金额不匹配");
}
PaymentReceivedEvent event = new PaymentReceivedEvent(orderId, paymentId, amount,
paymentMethod, Instant.now(), version + 1);
apply(event);
pendingEvents.add(event);
}
/**
* 应用事件:修改内部状态
* 这里的逻辑只做状态变更,不做业务校验
*/
private void apply(DomainEvent event) {
if (event instanceof OrderCreatedEvent e) {
this.orderId = e.getAggregateId();
this.userId = e.getUserId();
this.items = e.getItems();
this.totalAmount = e.getTotalAmount();
this.status = OrderStatus.PENDING;
} else if (event instanceof PaymentReceivedEvent e) {
this.paymentId = e.getPaymentId();
this.status = OrderStatus.PAID;
} else if (event instanceof OrderCancelledEvent e) {
this.status = OrderStatus.CANCELLED;
}
// 其他事件类型...
this.version = event.getVersion();
}
public List<DomainEvent> getPendingEvents() {
return Collections.unmodifiableList(pendingEvents);
}
public void clearPendingEvents() {
pendingEvents.clear();
}
}3.3 EventStore实现
/**
* 事件存储:持久化和加载事件
*/
@Repository
public class EventStore {
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private ObjectMapper objectMapper;
/**
* 加载聚合根的所有事件
*/
public List<DomainEvent> loadEvents(String aggregateId) {
String sql = "SELECT event_type, event_data, version " +
"FROM event_store WHERE aggregate_id = ? ORDER BY version";
return jdbcTemplate.query(sql, (rs, rowNum) -> {
String eventType = rs.getString("event_type");
String eventData = rs.getString("event_data");
return deserialize(eventType, eventData);
}, aggregateId);
}
/**
* 保存新事件(乐观锁保证并发安全)
*/
@Transactional
public void saveEvents(String aggregateId, List<DomainEvent> events, int expectedVersion) {
// 检查版本冲突
Integer currentVersion = jdbcTemplate.queryForObject(
"SELECT MAX(version) FROM event_store WHERE aggregate_id = ?",
Integer.class, aggregateId);
if (currentVersion != null && currentVersion != expectedVersion) {
throw new ConcurrencyException("并发冲突:期望版本" + expectedVersion +
",实际版本" + currentVersion);
}
// 保存事件
String sql = "INSERT INTO event_store (event_id, aggregate_id, aggregate_type, " +
"event_type, event_data, version, occurred_on) VALUES (?, ?, ?, ?, ?, ?, ?)";
for (DomainEvent event : events) {
jdbcTemplate.update(sql,
event.getEventId(),
event.getAggregateId(),
"Order",
event.getEventType(),
serialize(event),
event.getVersion(),
event.getOccurredOn());
}
}
private String serialize(DomainEvent event) {
try {
return objectMapper.writeValueAsString(event);
} catch (JsonProcessingException e) {
throw new RuntimeException("事件序列化失败", e);
}
}
private DomainEvent deserialize(String eventType, String eventData) {
try {
Class<?> eventClass = Class.forName("com.example.events." + eventType);
return (DomainEvent) objectMapper.readValue(eventData, eventClass);
} catch (Exception e) {
throw new RuntimeException("事件反序列化失败", e);
}
}
}3.4 Command Handler和Event Projection
@Service
public class OrderCommandHandler {
@Autowired
private EventStore eventStore;
@Autowired
private ApplicationEventPublisher eventPublisher;
/**
* 处理创建订单命令
*/
public String handle(CreateOrderCommand command) {
String orderId = UUID.randomUUID().toString();
OrderAggregate order = new OrderAggregate();
order.createOrder(orderId, command.getUserId(), command.getItems(),
command.getShippingAddress());
// 保存事件(事件存储是唯一的写操作)
eventStore.saveEvents(orderId, order.getPendingEvents(), 0);
// 发布事件(通知读模型更新)
order.getPendingEvents().forEach(event -> eventPublisher.publishEvent(event));
order.clearPendingEvents();
return orderId;
}
/**
* 处理支付确认命令
*/
public void handle(ConfirmPaymentCommand command) {
// 从事件存储重建聚合根
List<DomainEvent> events = eventStore.loadEvents(command.getOrderId());
OrderAggregate order = OrderAggregate.reconstruct(events);
order.confirmPayment(command.getPaymentId(), command.getAmount(),
command.getPaymentMethod());
int expectedVersion = events.size(); // 期望版本 = 历史事件数
eventStore.saveEvents(command.getOrderId(), order.getPendingEvents(), expectedVersion);
order.getPendingEvents().forEach(eventPublisher::publishEvent);
order.clearPendingEvents();
}
}
/**
* 事件投影:将事件转化为读模型
*/
@Component
public class OrderProjection {
@Autowired
private OrderReadRepository readRepository;
@EventListener
public void on(OrderCreatedEvent event) {
OrderReadModel readModel = new OrderReadModel();
readModel.setOrderId(event.getAggregateId());
readModel.setUserId(event.getUserId());
readModel.setStatus("PENDING");
readModel.setTotalAmount(event.getTotalAmount());
readModel.setCreatedAt(event.getOccurredOn());
readRepository.save(readModel);
}
@EventListener
public void on(PaymentReceivedEvent event) {
OrderReadModel readModel = readRepository.findByOrderId(event.getAggregateId());
readModel.setStatus("PAID");
readModel.setPaymentId(event.getPaymentId());
readModel.setPaidAt(event.getPaidAt());
readRepository.save(readModel);
}
}四、工程实践
4.1 事件溯源的优势和代价
| 方面 | 传统CRUD | 事件溯源 |
|---|---|---|
| 当前状态查询 | 简单直接 | 需要重放或读模型 |
| 历史审计 | 需要额外实现 | 天然支持 |
| 调试能力 | 难以还原问题现场 | 可以重放到任意时间点 |
| 存储空间 | 只存当前状态 | 存所有历史事件,增长快 |
| 学习曲线 | 低 | 高 |
| 读性能 | 高 | 需要维护读模型 |
4.2 什么时候用事件溯源
- 审计要求严格的业务(金融、医疗)
- 需要支持时间旅行查询(回到某个时间点的状态)
- 复杂状态机,状态转换逻辑复杂
- 需要基于历史事件做机器学习/分析
不适合:简单CRUD应用、团队没有经验、快速迭代的早期产品
五、踩坑实录
坑一:事件膨胀导致重放越来越慢
随着时间推移,一个聚合根可能有几千条历史事件,每次重放都要处理几千条事件,性能下降明显。
解决方案:Snapshot(快照)机制——每隔N个事件,保存一次聚合根的当前状态快照。重放时从最近的快照开始,只重放快照之后的事件。
public OrderAggregate reconstruct(String aggregateId) {
Optional<Snapshot> snapshot = snapshotStore.findLatest(aggregateId);
int startVersion = 0;
OrderAggregate order;
if (snapshot.isPresent()) {
order = snapshot.get().getAggregate(); // 从快照恢复
startVersion = snapshot.get().getVersion();
} else {
order = new OrderAggregate();
}
// 只加载快照之后的事件
List<DomainEvent> events = eventStore.loadEvents(aggregateId, startVersion);
events.forEach(order::apply);
// 每100个事件保存一次快照
if (events.size() >= 100) {
snapshotStore.save(new Snapshot(aggregateId, order, order.getVersion()));
}
return order;
}坑二:事件版本升级
事件定义发生变化时,历史事件还是旧格式,需要处理版本迁移。这是事件溯源最复杂的运维问题之一。
解决方案:事件上加版本号,反序列化时做版本转换(Upcasting)。
坑三:读模型重建的一致性窗口
读模型的更新是异步的,写入成功后立即查询,可能看不到最新状态。这需要和产品对齐,接受"最终一致性",或者对特定查询用同步更新。
六、总结与个人判断
事件驱动架构是我认为目前最被低估的架构模式之一。它不只是"用消息队列解耦",背后的事件溯源和CQRS代表了一种完全不同的数据建模思路。
但我必须说,这套架构的复杂度很高,代价不小。如果你的业务不需要完整的审计历史,不需要时间旅行查询,读写比例正常,那传统CRUD配合好的领域设计,完全够用。
用EDA前,先问自己:我的业务真正需要这套复杂性吗?很多时候,简单的发布-订阅事件通知,配合传统CRUD,就能解决松耦合的问题,而不必引入完整的事件溯源体系。
量力而行,解决真实问题。
