观察者模式:Spring ApplicationEvent的发布订阅与异步事件处理
观察者模式:Spring ApplicationEvent的发布订阅与异步事件处理
适读人群:中高级Java开发者 | 阅读时长:约22分钟 | 模式类型:行为型
开篇故事
2020年,我们的订单系统经历了一次严重的"功能耦合"危机。
OrderService.createOrder() 方法最开始只有几十行,随着业务发展,变成了这样:
public Order createOrder(CreateOrderRequest request) {
// 1. 基础下单逻辑
Order order = doCreateOrder(request);
// 2. 更新库存
inventoryService.deductStock(order);
// 3. 发送短信通知
smsService.sendOrderCreatedSms(order.getUserId(), order);
// 4. 更新用户行为数据
userBehaviorService.recordOrderBehavior(order.getUserId(), order);
// 5. 积分奖励
pointsService.awardOrderPoints(order);
// 6. 同步到数仓
dataWarehouseService.syncOrder(order);
// 7. 推送给物流系统
logisticsService.notifyNewOrder(order);
// 8. 更新推荐系统
recommendationService.updateUserProfile(order.getUserId(), order);
return order;
}这个方法已经耦合了7个不同的下游系统。每次有新的"下单后"逻辑,就要改这个方法。更糟糕的是,如果某个下游服务挂了(比如短信服务超时),整个下单流程都会失败。
用 Spring 事件机制重构后,createOrder() 只需要发布一个 OrderCreatedEvent,其他所有下游处理都通过事件监听器独立处理,互不影响。这就是观察者模式带来的解耦价值。
一、模式动机:解耦事件发布者与订阅者
观察者模式(Observer Pattern)定义了一种一对多的依赖关系:当一个对象的状态发生改变时,所有依赖于它的对象都将得到通知并自动更新,而发布者不需要知道有哪些订阅者,也不需要知道订阅者会做什么。
核心价值:发布者只关心"什么事情发生了",不关心"谁会处理"以及"怎么处理"。
二、模式结构
三、Spring ApplicationEvent 机制源码分析
3.1 事件发布流程
// ApplicationEventPublisher(Subject角色)
public interface ApplicationEventPublisher {
default void publishEvent(ApplicationEvent event) {
publishEvent((Object) event);
}
void publishEvent(Object event);
}
// AbstractApplicationContext.publishEvent() 核心实现
public void publishEvent(Object event, @Nullable ResolvableType typeHint) {
// ...
ApplicationEvent applicationEvent;
if (event instanceof ApplicationEvent ae) {
applicationEvent = ae;
} else {
applicationEvent = new PayloadApplicationEvent<>(this, event, typeHint);
// ...
}
// 通过SimpleApplicationEventMulticaster广播事件
getApplicationEventMulticaster().multicastEvent(applicationEvent, eventType);
// 也传播给父容器
if (this.parent != null) {
if (this.parent instanceof AbstractApplicationContext aac) {
aac.publishEvent(event, eventType);
} else {
this.parent.publishEvent(event);
}
}
}
// SimpleApplicationEventMulticaster.multicastEvent()
public void multicastEvent(ApplicationEvent event, @Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType : resolveDefaultEventType(event));
Executor executor = getTaskExecutor(); // 如果配置了异步执行器,这里就是异步
for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
if (executor != null && listener.supportsAsyncExecution()) {
executor.execute(() -> invokeListener(listener, event)); // 异步执行
} else {
invokeListener(listener, event); // 同步执行
}
}
}3.2 事件监听注册机制
Spring 通过 @EventListener 注解自动注册事件监听器,背后是 EventListenerMethodProcessor:
// EventListenerMethodProcessor 扫描所有@EventListener注解的方法
// 并将其包装为ApplicationListener注册到容器中
public class EventListenerMethodProcessor implements SmartInitializingSingleton,
ApplicationContextAware, BeanFactoryPostProcessor {
@Override
public void afterSingletonsInstantiated() {
// 遍历所有Bean,查找@EventListener注解的方法
String[] beanNames = this.beanFactory.getBeanDefinitionNames();
for (String beanName : beanNames) {
processBean(beanName, beanType);
}
}
}四、生产级代码实现
4.1 完整的订单事件系统
/**
* 订单创建事件(Event对象)
*/
@Getter
public class OrderCreatedEvent extends ApplicationEvent {
private final String orderId;
private final Long userId;
private final BigDecimal amount;
private final String channelCode;
private final List<OrderItem> items;
private final LocalDateTime createdAt;
public OrderCreatedEvent(Object source, Order order) {
super(source);
this.orderId = order.getId();
this.userId = order.getUserId();
this.amount = order.getFinalAmount();
this.channelCode = order.getChannelCode();
this.items = Collections.unmodifiableList(order.getItems());
this.createdAt = order.getCreatedAt();
}
}
/**
* 订单支付成功事件
*/
@Getter
public class OrderPaidEvent extends ApplicationEvent {
private final String orderId;
private final Long userId;
private final BigDecimal paidAmount;
private final String paymentMethod;
private final LocalDateTime paidAt;
public OrderPaidEvent(Object source, String orderId, Long userId,
BigDecimal paidAmount, String paymentMethod) {
super(source);
this.orderId = orderId;
this.userId = userId;
this.paidAmount = paidAmount;
this.paymentMethod = paymentMethod;
this.paidAt = LocalDateTime.now();
}
}
/**
* 订单服务:只负责核心业务逻辑,通过事件解耦后续处理
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class OrderService {
private final OrderRepository orderRepository;
private final InventoryService inventoryService;
private final ApplicationEventPublisher eventPublisher;
@Transactional
public Order createOrder(CreateOrderRequest request) {
// 1. 核心下单逻辑
Order order = buildOrder(request);
// 2. 扣减库存(核心,在同一事务中)
inventoryService.deductStock(order);
// 3. 保存订单
orderRepository.save(order);
// 4. 发布事件(其他所有下游处理都通过事件触发)
// 注意:事件在事务提交后才触发(通过@TransactionalEventListener实现)
eventPublisher.publishEvent(new OrderCreatedEvent(this, order));
log.info("Order created: {}", order.getId());
return order;
}
@Transactional
public void processPaymentSuccess(String orderId, String paymentOrderId, BigDecimal amount) {
Order order = orderRepository.findById(orderId)
.orElseThrow(() -> new OrderNotFoundException(orderId));
order.setStatus(OrderStatus.PAID);
order.setPaidAt(LocalDateTime.now());
order.setPaymentOrderId(paymentOrderId);
orderRepository.save(order);
// 发布支付成功事件
eventPublisher.publishEvent(new OrderPaidEvent(this, orderId,
order.getUserId(), amount, order.getPaymentMethod()));
log.info("Order paid: {}", orderId);
}
}
/**
* 短信通知监听器(同步发送:失败不影响主流程,但需要确保最终一定发送)
*/
@Component
@Slf4j
public class OrderSmsNotificationListener {
@Autowired
private SmsService smsService;
/**
* 订单创建成功后发送短信
* AFTER_COMMIT:事务提交成功后才执行,避免事务回滚后发出错误通知
*/
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
@Async("smsExecutor") // 异步执行,不阻塞业务线程
public void onOrderCreated(OrderCreatedEvent event) {
try {
smsService.sendOrderCreatedSms(event.getUserId(), event.getOrderId(), event.getAmount());
log.info("Order created SMS sent for order: {}", event.getOrderId());
} catch (Exception e) {
// 短信发送失败不应影响订单创建,记录日志后告警
log.error("Failed to send order created SMS for order {}: {}",
event.getOrderId(), e.getMessage());
// 可以投递到重试队列
}
}
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
@Async("smsExecutor")
public void onOrderPaid(OrderPaidEvent event) {
try {
smsService.sendPaymentSuccessSms(event.getUserId(), event.getOrderId(), event.getPaidAmount());
} catch (Exception e) {
log.error("Failed to send payment success SMS: {}", e.getMessage());
}
}
}
/**
* 积分奖励监听器
*/
@Component
@Slf4j
public class OrderPointsRewardListener {
@Autowired
private PointsService pointsService;
// 支付成功后奖励积分(支付成功才有积分)
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
@Async("pointsExecutor")
public void onOrderPaid(OrderPaidEvent event) {
try {
int earnedPoints = pointsService.calculateEarnedPoints(event.getPaidAmount());
pointsService.addPoints(event.getUserId(), earnedPoints,
"ORDER_PAYMENT", event.getOrderId());
log.info("Points {} awarded to user {} for order {}",
earnedPoints, event.getUserId(), event.getOrderId());
} catch (Exception e) {
log.error("Failed to award points for order {}: {}", event.getOrderId(), e.getMessage());
}
}
}
/**
* 数据仓库同步监听器
*/
@Component
@Slf4j
public class OrderDataWarehouseSyncListener {
@Autowired
private DataWarehouseClient dwClient;
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
@Async("dwSyncExecutor")
public void onOrderCreated(OrderCreatedEvent event) {
try {
dwClient.syncOrder(convertToWarehouseRecord(event));
} catch (Exception e) {
log.error("Failed to sync order to data warehouse: {}", e.getMessage());
// 数据仓库同步失败,写入重试队列(MQ)
retryQueue.offer(new RetryTask("DW_SYNC_ORDER", event));
}
}
private WarehouseOrderRecord convertToWarehouseRecord(OrderCreatedEvent event) {
return WarehouseOrderRecord.builder()
.orderId(event.getOrderId())
.userId(event.getUserId())
.amount(event.getAmount())
.channelCode(event.getChannelCode())
.createdAt(event.getCreatedAt())
.build();
}
}
/**
* 异步执行器配置
*/
@Configuration
@EnableAsync
public class AsyncEventExecutorConfig {
@Bean("smsExecutor")
public Executor smsExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(1000);
executor.setThreadNamePrefix("sms-event-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
@Bean("pointsExecutor")
public Executor pointsExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(5000);
executor.setThreadNamePrefix("points-event-");
executor.initialize();
return executor;
}
@Bean("dwSyncExecutor")
public Executor dwSyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(3);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(2000);
executor.setThreadNamePrefix("dw-sync-event-");
executor.initialize();
return executor;
}
}五、踩坑实录
坑一:同步事件监听器抛出异常导致事务回滚
默认的 @EventListener(非 @TransactionalEventListener)是同步调用的,如果监听器内部抛出异常,会导致整个事务回滚!
一次订单创建后,短信发送监听器因为短信服务器不可用抛出了异常,结果订单没有创建成功。
解决方案:对于非核心逻辑(短信、通知、积分等),必须:
- 使用
@Async异步执行 - 监听器内部做异常捕获,不让异常往外传播
- 使用
@TransactionalEventListener(phase = AFTER_COMMIT)确保事务提交后才执行
坑二:@TransactionalEventListener 在无事务环境下不触发
@TransactionalEventListener 需要在事务上下文中发布事件才会触发。如果事件是在没有事务的方法中发布的,监听器默认不会触发(除非设置 fallbackExecution = true)。
坑三:事件对象包含持久化对象导致懒加载问题
如果事件对象中包含了 Hibernate 的懒加载代理对象,在异步监听器中(已经不在原来的 Session 中)访问这些懒加载属性会抛出 LazyInitializationException。
解决方案:事件对象只包含基本属性(ID、简单字段),不包含关联的实体对象。如果需要更多数据,在监听器中重新从数据库查询。
坑四:事件在单元测试中的行为
Spring 的事件在单元测试中会被真实触发(如果用 @SpringBootTest),可能导致测试时发送真实短信、真实积分操作等副作用。
解决方案:在测试配置中 Mock 相关 Service,或者使用 @TestEventListener + 测试专用的 Profile。
六、总结
Spring 事件机制是观察者模式的工程级应用,核心价值是业务解耦——下单逻辑不再需要直接依赖短信服务、积分服务、数仓同步等,这些都通过事件松耦合地连接。
实践中的关键决策:
@EventListenervs@TransactionalEventListener:前者立即执行(可能在事务提交前),后者等事务提交后再执行(更安全)。- 同步 vs 异步:非核心逻辑用
@Async,避免监听器阻塞主流程。 - 事件粒度:事件应该代表业务状态变更(
OrderCreated、OrderPaid),而不是技术操作(InsertOrderRecord)。 - 失败处理:监听器内部必须做异常捕获,失败要记录日志并考虑重试机制,不能让监听器的失败影响发布者。
