微服务事件溯源:EventStore设计与CQRS模式的Spring实现
微服务事件溯源:EventStore设计与CQRS模式的Spring实现
适读人群:有一定DDD和微服务基础的后端工程师 | 阅读时长:约28分钟 | Spring Boot 3.2
开篇故事
我们有个电商平台的订单系统,用的是传统的增量更新模式:用户下单,写一条订单记录;用户支付,更新订单状态为"已支付";用户取消,更新状态为"已取消"。
有一天运营来了个需求:想查某个用户的某笔订单,从下单到完成,中间每个状态变化的具体时间点。
我去查数据库,发现找不到。传统的更新模式只保存了当前状态,历史状态已经被覆盖了。订单记录里只有最新的状态,中间状态的时间戳根本没保存。
那次之后我开始研究事件溯源(Event Sourcing),这种模式的核心思路是:不保存对象的当前状态,而是保存导致状态变化的所有事件。要知道当前状态,把历史事件重放一遍就能得出来。
事件溯源往往和CQRS(命令查询职责分离)一起使用:写操作(Command)产生事件存入EventStore,读操作(Query)从投影(Projection)视图中读取,投影视图是事件重放后生成的读优化数据。今天把这套在Spring中的完整实现写出来。
一、核心问题分析
事件溯源有几个核心概念必须搞清楚:
Event(事件):对系统中发生的事情的不可变记录。比如OrderCreated、OrderPaid、OrderCancelled。事件一旦写入就不能修改,只能追加。
Aggregate(聚合根):业务实体,维护自身状态。当命令(Command)作用于聚合根时,聚合根产生事件,事件被存入EventStore,同时聚合根根据事件更新自身状态。
EventStore:所有事件的持久化存储。关键特性:只追加、按时间顺序存储。
Projection(投影):把事件流聚合成特定视图,用于查询。比如把所有OrderCreated和OrderPaid事件聚合成一张"订单摘要表"。
Snapshot(快照):当事件数量很多时,每次重放所有事件来计算当前状态会很慢,快照就是定期把聚合根当前状态保存下来,重放时从最近的快照开始,只重放快照之后的事件。
二、原理深度解析
2.1 事件溯源与CQRS整体架构
2.2 聚合根状态重建
2.3 快照优化
三、完整代码实现
3.1 事件基类和具体事件
package com.laozhang.eventsourcing.event;
import lombok.Data;
import java.time.Instant;
import java.util.UUID;
/**
* 事件基类
*/
@Data
public abstract class DomainEvent {
private final String eventId;
private final String aggregateId;
private final long version; // 事件版本号(乐观锁)
private final Instant occurredAt;
private final String eventType;
protected DomainEvent(String aggregateId, long version, String eventType) {
this.eventId = UUID.randomUUID().toString();
this.aggregateId = aggregateId;
this.version = version;
this.occurredAt = Instant.now();
this.eventType = eventType;
}
}package com.laozhang.eventsourcing.event;
import lombok.Getter;
import java.math.BigDecimal;
/**
* 订单创建事件
*/
@Getter
public class OrderCreatedEvent extends DomainEvent {
private final String userId;
private final String productId;
private final int quantity;
private final BigDecimal amount;
public OrderCreatedEvent(
String aggregateId, long version,
String userId, String productId, int quantity, BigDecimal amount
) {
super(aggregateId, version, "OrderCreated");
this.userId = userId;
this.productId = productId;
this.quantity = quantity;
this.amount = amount;
}
}
/**
* 订单支付事件
*/
@Getter
class OrderPaidEvent extends DomainEvent {
private final String paymentId;
private final String paymentMethod;
public OrderPaidEvent(String aggregateId, long version, String paymentId, String paymentMethod) {
super(aggregateId, version, "OrderPaid");
this.paymentId = paymentId;
this.paymentMethod = paymentMethod;
}
}
/**
* 订单取消事件
*/
@Getter
class OrderCancelledEvent extends DomainEvent {
private final String reason;
public OrderCancelledEvent(String aggregateId, long version, String reason) {
super(aggregateId, version, "OrderCancelled");
this.reason = reason;
}
}3.2 订单聚合根
package com.laozhang.eventsourcing.aggregate;
import com.laozhang.eventsourcing.event.*;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
@Slf4j
@Getter
public class OrderAggregate {
private String orderId;
private String userId;
private String productId;
private int quantity;
private BigDecimal amount;
private OrderStatus status;
private String paymentId;
private String cancelReason;
private long version = 0;
// 未持久化的新事件
private final List<DomainEvent> uncommittedEvents = new ArrayList<>();
// 无参构造,用于从EventStore重建
public OrderAggregate() {}
/**
* Command处理:创建订单
*/
public static OrderAggregate create(
String orderId, String userId, String productId, int quantity, BigDecimal amount
) {
OrderAggregate aggregate = new OrderAggregate();
// 产生事件(不直接修改状态,通过apply方法修改)
OrderCreatedEvent event = new OrderCreatedEvent(
orderId, aggregate.version + 1, userId, productId, quantity, amount
);
aggregate.applyAndRecord(event);
return aggregate;
}
/**
* Command处理:支付订单
*/
public void pay(String paymentId, String paymentMethod) {
if (status != OrderStatus.CREATED) {
throw new IllegalStateException("只有CREATED状态的订单才能支付,当前状态:" + status);
}
applyAndRecord(new OrderPaidEvent(orderId, version + 1, paymentId, paymentMethod));
}
/**
* Command处理:取消订单
*/
public void cancel(String reason) {
if (status == OrderStatus.SHIPPED || status == OrderStatus.COMPLETED) {
throw new IllegalStateException("已发货或已完成的订单不能取消");
}
applyAndRecord(new OrderCancelledEvent(orderId, version + 1, reason));
}
/**
* 应用事件并记录到未提交列表(用于新命令产生的事件)
*/
private void applyAndRecord(DomainEvent event) {
apply(event);
uncommittedEvents.add(event);
}
/**
* 应用事件(用于从EventStore重建状态)
* 这里只修改状态,不产生新事件
*/
public void apply(DomainEvent event) {
if (event instanceof OrderCreatedEvent e) {
this.orderId = e.getAggregateId();
this.userId = e.getUserId();
this.productId = e.getProductId();
this.quantity = e.getQuantity();
this.amount = e.getAmount();
this.status = OrderStatus.CREATED;
} else if (event instanceof OrderPaidEvent e) {
this.paymentId = e.getPaymentId();
this.status = OrderStatus.PAID;
} else if (event instanceof OrderCancelledEvent e) {
this.cancelReason = e.getReason();
this.status = OrderStatus.CANCELLED;
} else {
log.warn("未知事件类型:{}", event.getClass().getName());
}
this.version = event.getVersion();
}
public void clearUncommittedEvents() {
uncommittedEvents.clear();
}
}3.3 EventStore实现(基于PostgreSQL)
package com.laozhang.eventsourcing.store;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.laozhang.eventsourcing.event.DomainEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
import java.sql.Timestamp;
import java.util.List;
/**
* 基于PostgreSQL的EventStore实现
* 核心特性:只追加,不更新不删除;version字段实现乐观锁防止并发冲突
*/
@Slf4j
@Repository
public class PostgresEventStore {
private final JdbcTemplate jdbcTemplate;
private final ObjectMapper objectMapper;
/*
* 建表SQL:
* CREATE TABLE domain_events (
* id BIGSERIAL PRIMARY KEY,
* event_id VARCHAR(36) NOT NULL UNIQUE,
* aggregate_id VARCHAR(36) NOT NULL,
* aggregate_type VARCHAR(100) NOT NULL,
* event_type VARCHAR(100) NOT NULL,
* payload JSONB NOT NULL,
* version BIGINT NOT NULL,
* occurred_at TIMESTAMP NOT NULL,
* UNIQUE (aggregate_id, version) -- 乐观锁:同一聚合根的version不能重复
* );
* CREATE INDEX idx_aggregate_id ON domain_events(aggregate_id);
*/
public PostgresEventStore(JdbcTemplate jdbcTemplate, ObjectMapper objectMapper) {
this.jdbcTemplate = jdbcTemplate;
this.objectMapper = objectMapper;
}
@Transactional
public void appendEvents(String aggregateType, List<DomainEvent> events) {
for (DomainEvent event : events) {
try {
String payload = objectMapper.writeValueAsString(event);
jdbcTemplate.update(
"INSERT INTO domain_events " +
"(event_id, aggregate_id, aggregate_type, event_type, payload, version, occurred_at) " +
"VALUES (?, ?, ?, ?, ?::jsonb, ?, ?)",
event.getEventId(),
event.getAggregateId(),
aggregateType,
event.getEventType(),
payload,
event.getVersion(),
Timestamp.from(event.getOccurredAt())
);
} catch (DuplicateKeyException e) {
throw new ConcurrencyConflictException(
"并发冲突,aggregateId=" + event.getAggregateId() + ",version=" + event.getVersion()
);
} catch (Exception e) {
throw new RuntimeException("事件存储失败", e);
}
}
}
public List<EventRecord> loadEvents(String aggregateId, long fromVersion) {
return jdbcTemplate.query(
"SELECT event_type, payload, version, occurred_at " +
"FROM domain_events " +
"WHERE aggregate_id = ? AND version > ? " +
"ORDER BY version ASC",
(rs, rowNum) -> new EventRecord(
rs.getString("event_type"),
rs.getString("payload"),
rs.getLong("version"),
rs.getTimestamp("occurred_at").toInstant()
),
aggregateId,
fromVersion
);
}
public long getLatestVersion(String aggregateId) {
Long version = jdbcTemplate.queryForObject(
"SELECT COALESCE(MAX(version), 0) FROM domain_events WHERE aggregate_id = ?",
Long.class,
aggregateId
);
return version != null ? version : 0;
}
}3.4 聚合根仓库(整合EventStore和快照)
package com.laozhang.eventsourcing.repository;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.laozhang.eventsourcing.aggregate.OrderAggregate;
import com.laozhang.eventsourcing.event.DomainEvent;
import com.laozhang.eventsourcing.store.EventRecord;
import com.laozhang.eventsourcing.store.PostgresEventStore;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Repository;
import org.springframework.transaction.annotation.Transactional;
import java.util.List;
import java.util.Optional;
@Slf4j
@Repository
public class OrderRepository {
private static final int SNAPSHOT_THRESHOLD = 50; // 每50个事件创建一个快照
private final PostgresEventStore eventStore;
private final SnapshotStore snapshotStore;
private final EventDeserializer eventDeserializer;
private final ApplicationEventPublisher eventPublisher;
public OrderRepository(
PostgresEventStore eventStore,
SnapshotStore snapshotStore,
EventDeserializer eventDeserializer,
ApplicationEventPublisher eventPublisher
) {
this.eventStore = eventStore;
this.snapshotStore = snapshotStore;
this.eventDeserializer = eventDeserializer;
this.eventPublisher = eventPublisher;
}
@Transactional
public void save(OrderAggregate aggregate) {
List<DomainEvent> uncommittedEvents = aggregate.getUncommittedEvents();
if (uncommittedEvents.isEmpty()) {
return;
}
// 持久化事件
eventStore.appendEvents("Order", uncommittedEvents);
// 发布事件到Spring ApplicationEvent(用于Projection更新)
uncommittedEvents.forEach(event -> {
eventPublisher.publishEvent(event);
log.debug("事件已发布:type={},aggregateId={}", event.getEventType(), event.getAggregateId());
});
// 检查是否需要创建快照
if (aggregate.getVersion() % SNAPSHOT_THRESHOLD == 0) {
snapshotStore.save("Order", aggregate.getOrderId(), aggregate, aggregate.getVersion());
log.info("创建快照,orderId={},version={}", aggregate.getOrderId(), aggregate.getVersion());
}
aggregate.clearUncommittedEvents();
}
public Optional<OrderAggregate> findById(String orderId) {
// 先尝试从快照加载
Optional<SnapshotRecord> snapshot = snapshotStore.findLatest("Order", orderId);
OrderAggregate aggregate;
long fromVersion;
if (snapshot.isPresent()) {
// 从快照恢复
aggregate = snapshot.get().deserialize(OrderAggregate.class);
fromVersion = snapshot.get().getVersion();
} else {
// 从头开始
aggregate = new OrderAggregate();
fromVersion = 0;
}
// 加载快照之后的事件并重放
List<EventRecord> events = eventStore.loadEvents(orderId, fromVersion);
if (events.isEmpty() && fromVersion == 0) {
return Optional.empty();
}
for (EventRecord record : events) {
DomainEvent event = eventDeserializer.deserialize(record);
aggregate.apply(event);
}
return Optional.of(aggregate);
}
}3.5 Projection(投影):用于查询
package com.laozhang.eventsourcing.projection;
import com.laozhang.eventsourcing.event.OrderCancelledEvent;
import com.laozhang.eventsourcing.event.OrderCreatedEvent;
import com.laozhang.eventsourcing.event.OrderPaidEvent;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/**
* 订单摘要投影
* 监听领域事件,维护读优化的订单摘要表
* 这就是CQRS的"Q"部分:维护专门用于查询的视图
*/
@Slf4j
@Component
public class OrderSummaryProjection {
private final JdbcTemplate jdbcTemplate;
public OrderSummaryProjection(JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
}
@EventListener
@Async // 异步更新投影,不影响命令处理的性能
public void on(OrderCreatedEvent event) {
jdbcTemplate.update(
"INSERT INTO order_summary " +
"(order_id, user_id, product_id, quantity, amount, status, created_at) " +
"VALUES (?, ?, ?, ?, ?, 'CREATED', ?)",
event.getAggregateId(),
event.getUserId(),
event.getProductId(),
event.getQuantity(),
event.getAmount(),
event.getOccurredAt()
);
log.debug("订单摘要投影更新:OrderCreated,orderId={}", event.getAggregateId());
}
@EventListener
@Async
public void on(OrderPaidEvent event) {
jdbcTemplate.update(
"UPDATE order_summary SET status='PAID', paid_at=? WHERE order_id=?",
event.getOccurredAt(), event.getAggregateId()
);
}
@EventListener
@Async
public void on(OrderCancelledEvent event) {
jdbcTemplate.update(
"UPDATE order_summary SET status='CANCELLED', cancel_reason=?, cancelled_at=? WHERE order_id=?",
event.getReason(), event.getOccurredAt(), event.getAggregateId()
);
}
}四、生产配置与调优
4.1 EventStore查询优化
-- 按聚合根ID查询事件,这是最频繁的查询
CREATE INDEX idx_domain_events_aggregate_id ON domain_events(aggregate_id, version);
-- 按事件类型查询(用于投影重放)
CREATE INDEX idx_domain_events_event_type ON domain_events(event_type, occurred_at);4.2 投影重放(数据修复)
当投影数据损坏时,可以从EventStore重放所有事件重建投影:
@Component
public class ProjectionReplay {
public void replayAll() {
// 清空投影表
jdbcTemplate.update("TRUNCATE order_summary");
// 从EventStore按时间顺序读取所有事件并重放
long offset = 0;
int batchSize = 1000;
// 分批处理,避免OOM
List<EventRecord> batch;
do {
batch = eventStore.loadAllEvents(offset, batchSize);
batch.forEach(record -> {
DomainEvent event = eventDeserializer.deserialize(record);
applicationEventPublisher.publishEvent(event);
});
offset += batchSize;
} while (batch.size() == batchSize);
}
}五、踩坑实录
坑一:并发命令导致版本冲突,需要乐观锁处理。
两个请求同时操作同一个订单,都读到了version=5,都尝试写version=6。第一个写成功,第二个因为UNIQUE(aggregate_id, version)约束失败。这不是bug,是正确的乐观锁行为。上层需要捕获ConcurrencyConflictException并重试。
坑二:Projection投影落后于EventStore,查询到的是旧状态。
投影是异步更新的,写入成功后立刻查询可能读到旧数据(最终一致性)。这是CQRS架构的特点,不是bug。需要在API设计时明确这一点,对于需要强一致性的查询,可以直接从EventStore重建聚合根状态,而不走投影视图。
坑三:事件类型反序列化失败,EventStore里的历史数据无法重建聚合根。
修改了事件类的包名,导致历史事件无法反序列化。EventStore里存的是事件的JSON,包含了类的全限定名,修改包名后历史数据就无法加载。
解决方案:事件类一旦有数据,就不要修改包名。如果必须修改,需要写数据迁移脚本,或者在反序列化时做版本映射。
六、总结
事件溯源解决了状态追溯的问题,CQRS解决了读写模型分离的问题,两者结合是处理复杂业务场景的利器。但这套模式也有代价:实现复杂度显著高于传统CRUD,学习曲线陡,不适合简单的增删改查场景。建议只在业务审计、回放、历史追溯需求强烈的核心域使用,不要过度设计。
