ApplicationEvent事件驱动:Spring观察者模式的发布订阅实现
ApplicationEvent事件驱动:Spring观察者模式的发布订阅实现
适读人群:有Spring使用经验,希望用事件解耦业务逻辑的Java后端开发者 | 阅读时长:约16分钟
开篇故事
两年前我重构了一个用户注册流程。原来的代码是这样的:
public void register(UserRegistrationDTO dto) {
// 1. 保存用户
User user = userRepository.save(dto.toEntity());
// 2. 发送欢迎邮件
emailService.sendWelcomeEmail(user);
// 3. 发送短信验证码
smsService.sendVerifyCode(user.getPhone());
// 4. 初始化积分账户
pointService.initAccount(user.getId());
// 5. 给推荐人加积分
if (dto.getReferralCode() != null) {
pointService.addReferralBonus(dto.getReferralCode());
}
// 6. 记录注册日志
auditService.logRegistration(user);
}这个方法耦合了六个不同的业务。每次新增一个注册后的动作,就要改这个方法,越来越臃肿。更糟的是,某次邮件服务挂了,直接导致注册失败,用户数据也回滚了。
重构之后,我只保留了核心的保存用户逻辑,其他全部改成发布事件:
public User register(UserRegistrationDTO dto) {
User user = userRepository.save(dto.toEntity());
applicationEventPublisher.publishEvent(new UserRegisteredEvent(this, user, dto));
return user;
}各个监听器各司其职,互不影响,主流程的健壮性也大幅提升。
今天我们就来拆解Spring事件机制的底层实现,以及怎么把它用好。
一、Spring事件机制的核心组件
Spring的事件机制基于标准的观察者模式,三个核心角色:
- ApplicationEvent:事件本身,携带事件数据
- ApplicationEventPublisher:发布者,
ApplicationContext实现了这个接口 - ApplicationListener:监听者,处理特定类型的事件
Spring 4.2之后还支持@EventListener注解,更加简洁。
二、源码核心路径解析
2.1 事件发布流程
2.2 AbstractApplicationContext.publishEvent源码
// AbstractApplicationContext.java 第441行
@Override
public void publishEvent(ApplicationEvent event) {
publishEvent(event, null);
}
protected void publishEvent(Object event, @Nullable ResolvableType typeHint) {
Assert.notNull(event, "Event must not be null");
// 1. 包装成ApplicationEvent
ApplicationEvent applicationEvent;
if (event instanceof ApplicationEvent ae) {
applicationEvent = ae;
} else {
applicationEvent = new PayloadApplicationEvent<>(this, event, typeHint);
// PayloadApplicationEvent允许发布任意对象作为事件
}
// 2. 用SimpleApplicationEventMulticaster广播
getApplicationEventMulticaster().multicastEvent(applicationEvent,
ResolvableType.forInstance(applicationEvent));
// 3. 如果有父ApplicationContext,也发布给父
if (this.parent != null) {
if (this.parent instanceof AbstractApplicationContext aac) {
aac.publishEvent(event, typeHint);
} else {
this.parent.publishEvent(event);
}
}
}2.3 SimpleApplicationEventMulticaster.multicastEvent
// SimpleApplicationEventMulticaster.java 第146行
@Override
public void multicastEvent(final ApplicationEvent event,
@Nullable ResolvableType eventType) {
ResolvableType type = (eventType != null ? eventType :
ResolvableType.forInstance(event));
Executor executor = getTaskExecutor();
// 获取所有能处理这个事件类型的Listener
for (ApplicationListener<?> listener : getApplicationListeners(event, type)) {
if (executor != null && listener.supportsAsyncExecution()) {
// 异步执行
try {
executor.execute(() -> invokeListener(listener, event));
} catch (RejectedExecutionException ex) {
// 线程池满了,降级为同步执行
invokeListener(listener, event);
}
} else {
// 同步执行
invokeListener(listener, event);
}
}
}2.4 Listener匹配逻辑
// AbstractApplicationEventMulticaster.java 第244行(简化)
protected Collection<ApplicationListener<?>> getApplicationListeners(
ApplicationEvent event, ResolvableType eventType) {
// 缓存:事件类型 -> 匹配的Listener列表
// 避免每次都做类型判断
ListenerCacheKey cacheKey = new ListenerCacheKey(eventType,
event.getSource().getClass());
CachedListenerRetriever cachedRetriever = this.retrieverCache.get(cacheKey);
if (cachedRetriever != null) {
Collection<ApplicationListener<?>> listeners =
cachedRetriever.getApplicationListeners();
if (listeners != null) {
return listeners;
}
}
// 缓存未命中,重新计算
CachedListenerRetriever newRetriever = new CachedListenerRetriever();
cachedRetriever = this.retrieverCache.putIfAbsent(cacheKey, newRetriever);
// ...
return retrieveApplicationListeners(eventType, sourceType,
cachedRetriever != null ? cachedRetriever : newRetriever);
}2.5 @EventListener的处理原理
ApplicationListenerMethodAdapter是@EventListener的适配器:
// ApplicationListenerMethodAdapter.java 第117行(简化)
@Override
public void onApplicationEvent(ApplicationEvent event) {
processEvent(event);
}
public void processEvent(ApplicationEvent event) {
// 1. 解析方法参数
Object[] args = resolveArguments(event);
// 2. 检查@EventListener的condition条件
if (shouldHandle(event, args)) {
// 3. 反射调用方法
Object result = doInvoke(args);
// 4. 如果方法有返回值,把返回值也发布为事件(事件链)
if (result != null) {
handleResult(result);
}
}
}三、完整代码示例
3.1 用户注册事件完整实现
// 1. 定义事件
public class UserRegisteredEvent extends ApplicationEvent {
private final User user;
private final String referralCode;
public UserRegisteredEvent(Object source, User user, String referralCode) {
super(source);
this.user = user;
this.referralCode = referralCode;
}
public User getUser() { return user; }
public String getReferralCode() { return referralCode; }
}
// 2. 发布事件
@Service
@Transactional
public class UserService {
@Autowired
private UserRepository userRepository;
@Autowired
private ApplicationEventPublisher eventPublisher;
public User register(UserRegistrationDTO dto) {
// 核心逻辑:保存用户(在事务内)
User user = userRepository.save(User.fromDTO(dto));
// 发布事件(事件发布也在事务内,但Listener可以选择是否参与事务)
eventPublisher.publishEvent(
new UserRegisteredEvent(this, user, dto.getReferralCode())
);
return user;
}
}
// 3. 邮件监听器(同步,参与事务)
@Component
public class WelcomeEmailListener implements ApplicationListener<UserRegisteredEvent> {
@Autowired
private EmailService emailService;
@Override
@Transactional(propagation = Propagation.REQUIRES_NEW)
// 用REQUIRES_NEW,邮件发送失败不影响注册事务
public void onApplicationEvent(UserRegisteredEvent event) {
try {
emailService.sendWelcomeEmail(event.getUser());
} catch (Exception e) {
// 邮件发送失败只记日志,不抛异常,不影响注册流程
log.error("Failed to send welcome email to {}",
event.getUser().getEmail(), e);
}
}
}
// 4. 积分监听器(用@EventListener注解写法,更简洁)
@Component
public class PointsEventListener {
@Autowired
private PointService pointService;
@EventListener
@Async // 异步执行,不阻塞注册流程
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
// 注意:这里使用AFTER_COMMIT,确保用户已提交才加积分
public void onUserRegistered(UserRegisteredEvent event) {
User user = event.getUser();
pointService.initAccount(user.getId());
if (event.getReferralCode() != null) {
pointService.addReferralBonus(event.getReferralCode());
}
}
// 事件链:如果有返回值,返回值会被自动发布为新事件
@EventListener
public AuditLogEvent onUserRegisteredForAudit(UserRegisteredEvent event) {
// 返回AuditLogEvent,它会被自动发布
return new AuditLogEvent(this, "USER_REGISTER", event.getUser().getId());
}
}
// 5. 审计监听器
@Component
public class AuditListener {
@Autowired
private AuditLogRepository auditLogRepository;
@EventListener
public void onAuditLog(AuditLogEvent event) {
auditLogRepository.save(new AuditLog(event.getAction(), event.getUserId(),
LocalDateTime.now()));
}
}3.2 @TransactionalEventListener的正确使用
@Component
public class OrderEventListener {
@Autowired
private NotificationService notificationService;
// AFTER_COMMIT:事务提交后执行,此时订单数据已持久化
// 适合:发通知、发消息队列、更新缓存等不需要回滚的操作
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
@Async
public void onOrderCreated(OrderCreatedEvent event) {
notificationService.sendOrderConfirmation(event.getOrder());
}
// AFTER_ROLLBACK:事务回滚后执行
// 适合:补偿操作、告警
@TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
public void onOrderFailed(OrderCreatedEvent event) {
log.error("Order creation failed, order: {}", event.getOrder().getId());
alertService.sendAlert("Order creation failed: " + event.getOrder().getId());
}
// BEFORE_COMMIT:事务提交前执行(罕用)
@TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
public void beforeOrderCommit(OrderCreatedEvent event) {
// 在同一个事务内做最后的验证
validateOrderIntegrity(event.getOrder());
}
}四、踩坑实录
坑1:@TransactionalEventListener在没有事务的方法中不执行
现象:发布了事件,但@TransactionalEventListener的处理方法没有被调用。
根因:@TransactionalEventListener默认phase = AFTER_COMMIT,如果发布事件的方法没有在事务中运行,没有事务可以在提交后触发,监听器就不会执行。
// 错误:没有@Transactional,监听器不执行
public void doSomething() {
eventPublisher.publishEvent(new MyEvent(this)); // 没有事务,AFTER_COMMIT永不触发
}
// 正确写法1:发布者加上事务
@Transactional
public void doSomething() {
eventPublisher.publishEvent(new MyEvent(this));
}
// 正确写法2:允许无事务时也执行
@TransactionalEventListener(
phase = TransactionPhase.AFTER_COMMIT,
fallbackExecution = true // 无事务时直接执行
)
public void onEvent(MyEvent event) { ... }坑2:同步事件监听器抛异常导致整个发布失败
现象:某个ApplicationListener抛出异常,后续的Listener都没执行,发布方还收到了异常。
根因:默认情况下,SimpleApplicationEventMulticaster的invokeListener会把异常向上抛,中断整个广播流程。
解决:配置ErrorHandler:
@Bean
public ApplicationEventMulticaster applicationEventMulticaster() {
SimpleApplicationEventMulticaster multicaster = new SimpleApplicationEventMulticaster();
multicaster.setErrorHandler(throwable -> {
// 记录日志但不中断广播
log.error("Event listener error: {}", throwable.getMessage(), throwable);
});
return multicaster;
}坑3:@Async事件监听器中访问懒加载实体报LazyInitializationException
现象:@Async的事件监听器里访问Entity的懒加载关联时报LazyInitializationException。
根因:@Async在新线程中执行,原来的Hibernate Session已经关闭,懒加载关联无法在新Session中加载。
解决:在事件中只传递ID,在监听器中通过ID重新查询:
// 不要把Entity放到事件中传递(懒加载问题)
// 错误:
public class UserRegisteredEvent extends ApplicationEvent {
private final User user; // 危险!包含懒加载关联
}
// 正确:只传必要的ID和值对象
public class UserRegisteredEvent extends ApplicationEvent {
private final Long userId;
private final String email;
private final String username;
// 只传值,不传实体
}坑4:应用启动时发布的事件没有被Listener接收
现象:在@PostConstruct里发布事件,但Listener没有处理。
根因:@PostConstruct执行时,ApplicationEventMulticaster可能还没完全初始化(取决于Bean的加载顺序),或者某些Listener的Bean还没实例化。
解决:使用ApplicationReadyEvent或SmartInitializingSingleton确保所有Bean都初始化完成后再发布:
@Component
public class StartupEventPublisher implements ApplicationListener<ApplicationReadyEvent> {
@Autowired
private ApplicationEventPublisher eventPublisher;
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
// 所有Bean已初始化,安全发布
eventPublisher.publishEvent(new AppStartupEvent(this));
}
}五、总结与延伸
Spring事件机制是解耦业务逻辑的利器,核心原则:
| 使用场景 | 推荐方案 |
|---|---|
| 主流程后的无关操作 | @EventListener + @Async |
| 事务提交后的操作 | @TransactionalEventListener(AFTER_COMMIT) |
| 需要事务的操作 | @EventListener + @Transactional |
| 关键操作 | 直接调用,别用事件(调试困难) |
什么时候不该用事件:如果A必须等B的结果,或者A和B必须在同一个事务里,就直接调用,别包成事件——过度解耦反而增加理解成本。
下一篇聊BeanPostProcessor和BeanFactoryPostProcessor的执行时机,这是Spring扩展中最容易混淆的两个概念。
