第1865篇:Spring AI的异步处理框架——基于Spring Events的解耦架构
第1865篇:Spring AI的异步处理框架——基于Spring Events的解耦架构
大概半年前,有个做内容平台的朋友找我看代码,他们的 AI 文章摘要功能一直有个问题:用户上传文章后,调 AI 生成摘要需要等 8-10 秒,页面一直转圈,用户体验很差。
他的第一反应是"要不换个更快的模型"。我看了下代码,问题不在模型速度,在于架构——用户上传文章和 AI 处理是同步的,用户的 HTTP 请求要等 AI 处理完才能返回。
这是 AI 项目里非常典型的架构问题。AI 调用天生是慢操作(几秒到几十秒不等),如果还用同步请求/响应模式,用户体验就会很差,而且服务器线程也会被大量占用。
解决方案是把 AI 处理变成异步任务,用事件驱动的方式解耦。Spring 的 ApplicationEvent 机制加上异步执行,刚好适合这种场景。今天就把这套架构讲清楚。
一、同步 vs 异步:选择的依据
不是所有 AI 调用都要改成异步,先想清楚场景:
适合异步处理的场景:
- 用户提交后可以去做别的事,完成后收到通知
- 处理时间超过 3 秒的 AI 任务
- 批量处理场景
- 非关键路径的 AI 增强功能(比如自动打标签)
适合同步/流式的场景:
- 对话机器人(用户在等待回答)
- 实时摘要(用户看着生成过程)
二、Spring Events 基础:为什么选它
Spring 的 ApplicationEvent 机制是一个进程内的发布订阅系统,有这几个优势让它适合 AI 异步处理:
- 零外部依赖:不需要引入 Kafka、RabbitMQ
- 事务感知:支持在事务提交后才发布事件(
@TransactionalEventListener) - 异步支持:加一个注解就变成异步处理
- 类型安全:Java 泛型保证事件类型安全
当然局限也明显:进程内,重启事件丢失,不能跨服务。如果需要跨服务或者事件持久化,要换成消息队列。
三、设计事件体系
先把事件类设计好,这是整个架构的基础:
// 基础 AI 任务事件
@Getter
public abstract class AiTaskEvent extends ApplicationEvent {
private final String taskId;
private final String userId;
private final Instant createdAt;
protected AiTaskEvent(Object source, String taskId, String userId) {
super(source);
this.taskId = taskId;
this.userId = userId;
this.createdAt = Instant.now();
}
}
// 文档摘要请求事件
@Getter
public class DocumentSummaryRequestEvent extends AiTaskEvent {
private final String documentId;
private final String content;
private final SummaryStyle style; // BRIEF/DETAILED/BULLET_POINTS
public DocumentSummaryRequestEvent(Object source, String taskId,
String userId, String documentId,
String content, SummaryStyle style) {
super(source, taskId, userId);
this.documentId = documentId;
this.content = content;
this.style = style;
}
}
// AI 任务完成事件
@Getter
public class AiTaskCompletedEvent extends AiTaskEvent {
private final String result;
private final int tokensUsed;
public AiTaskCompletedEvent(Object source, String taskId, String userId,
String result, int tokensUsed) {
super(source, taskId, userId);
this.result = result;
this.tokensUsed = tokensUsed;
}
}
// AI 任务失败事件
@Getter
public class AiTaskFailedEvent extends AiTaskEvent {
private final String errorMessage;
private final boolean retriable;
public AiTaskFailedEvent(Object source, String taskId, String userId,
String errorMessage, boolean retriable) {
super(source, taskId, userId);
this.errorMessage = errorMessage;
this.retriable = retriable;
}
}四、任务状态管理
异步任务需要一个状态追踪机制,让用户能查询任务进度:
@Entity
@Table(name = "ai_tasks")
@Data
@Builder
public class AiTaskEntity {
@Id
private String taskId;
private String userId;
@Enumerated(EnumType.STRING)
private TaskStatus status; // PENDING/PROCESSING/COMPLETED/FAILED
@Lob
private String result;
private String errorMessage;
private Integer tokensUsed;
private Instant createdAt;
private Instant updatedAt;
private Instant completedAt;
public enum TaskStatus {
PENDING, PROCESSING, COMPLETED, FAILED, RETRYING
}
}
@Repository
public interface AiTaskRepository extends JpaRepository<AiTaskEntity, String> {
List<AiTaskEntity> findByUserIdOrderByCreatedAtDesc(String userId);
List<AiTaskEntity> findByStatusAndCreatedAtBefore(
AiTaskEntity.TaskStatus status, Instant threshold);
}五、核心服务:事件发布与任务提交
@Service
@Slf4j
public class AiTaskSubmitService {
private final ApplicationEventPublisher eventPublisher;
private final AiTaskRepository taskRepository;
public AiTaskSubmitService(ApplicationEventPublisher eventPublisher,
AiTaskRepository taskRepository) {
this.eventPublisher = eventPublisher;
this.taskRepository = taskRepository;
}
/**
* 提交文档摘要任务,立即返回 taskId
*/
public String submitDocumentSummary(String userId, String documentId,
String content, SummaryStyle style) {
// 生成任务 ID
String taskId = UUID.randomUUID().toString();
// 持久化任务状态
AiTaskEntity task = AiTaskEntity.builder()
.taskId(taskId)
.userId(userId)
.status(AiTaskEntity.TaskStatus.PENDING)
.createdAt(Instant.now())
.updatedAt(Instant.now())
.build();
taskRepository.save(task);
// 发布事件(异步处理由监听器完成)
eventPublisher.publishEvent(
new DocumentSummaryRequestEvent(this, taskId, userId,
documentId, content, style)
);
log.info("文档摘要任务已提交 taskId={} userId={}", taskId, userId);
return taskId;
}
/**
* 查询任务状态
*/
public AiTaskStatusDTO getTaskStatus(String taskId, String userId) {
AiTaskEntity task = taskRepository.findById(taskId)
.orElseThrow(() -> new TaskNotFoundException("任务不存在: " + taskId));
// 安全检查:只能查自己的任务
if (!task.getUserId().equals(userId)) {
throw new AccessDeniedException("无权查询此任务");
}
return AiTaskStatusDTO.builder()
.taskId(task.getTaskId())
.status(task.getStatus().name())
.result(task.getStatus() == AiTaskEntity.TaskStatus.COMPLETED
? task.getResult() : null)
.errorMessage(task.getStatus() == AiTaskEntity.TaskStatus.FAILED
? task.getErrorMessage() : null)
.createdAt(task.getCreatedAt())
.completedAt(task.getCompletedAt())
.build();
}
}六、异步事件监听器:真正执行 AI 调用的地方
@Component
@Slf4j
public class DocumentSummaryEventListener {
private final ChatClient chatClient;
private final AiTaskRepository taskRepository;
private final ApplicationEventPublisher eventPublisher;
public DocumentSummaryEventListener(ChatClient chatClient,
AiTaskRepository taskRepository,
ApplicationEventPublisher eventPublisher) {
this.chatClient = chatClient;
this.taskRepository = taskRepository;
this.eventPublisher = eventPublisher;
}
/**
* @Async 使监听器在独立线程池中执行
* 不会阻塞事件发布者的线程
*/
@Async("aiTaskExecutor")
@EventListener
public void handleDocumentSummaryRequest(DocumentSummaryRequestEvent event) {
String taskId = event.getTaskId();
log.info("开始处理文档摘要任务 taskId={}", taskId);
// 更新状态为处理中
updateTaskStatus(taskId, AiTaskEntity.TaskStatus.PROCESSING, null, null);
try {
String systemPrompt = buildSummarySystemPrompt(event.getStyle());
String userPrompt = "请对以下内容生成摘要:\n\n" + event.getContent();
// 调用 AI
org.springframework.ai.chat.model.ChatResponse response =
chatClient.prompt()
.system(systemPrompt)
.user(userPrompt)
.call()
.chatResponse();
String summary = response.getResult().getOutput().getContent();
int tokensUsed = response.getMetadata().getUsage() != null
? response.getMetadata().getUsage().getTotalTokens() : 0;
// 更新状态为完成
updateTaskStatus(taskId, AiTaskEntity.TaskStatus.COMPLETED, summary, null);
// 发布完成事件(其他监听器可以订阅,比如发通知)
eventPublisher.publishEvent(
new AiTaskCompletedEvent(this, taskId, event.getUserId(),
summary, tokensUsed)
);
log.info("文档摘要任务完成 taskId={} tokens={}", taskId, tokensUsed);
} catch (Exception e) {
log.error("文档摘要任务失败 taskId={}", taskId, e);
boolean retriable = isRetriable(e);
updateTaskStatus(taskId, AiTaskEntity.TaskStatus.FAILED,
null, e.getMessage());
eventPublisher.publishEvent(
new AiTaskFailedEvent(this, taskId, event.getUserId(),
e.getMessage(), retriable)
);
}
}
private String buildSummarySystemPrompt(SummaryStyle style) {
return switch (style) {
case BRIEF -> "你是一个专业的内容摘要助手。请用2-3句话概括文章的核心内容。";
case DETAILED -> "你是一个专业的内容摘要助手。请详细摘要文章的主要观点,保留重要细节。";
case BULLET_POINTS -> "你是一个专业的内容摘要助手。请用5-8个要点列表的形式总结文章内容。";
};
}
private void updateTaskStatus(String taskId, AiTaskEntity.TaskStatus status,
String result, String errorMessage) {
taskRepository.findById(taskId).ifPresent(task -> {
task.setStatus(status);
task.setResult(result);
task.setErrorMessage(errorMessage);
task.setUpdatedAt(Instant.now());
if (status == AiTaskEntity.TaskStatus.COMPLETED
|| status == AiTaskEntity.TaskStatus.FAILED) {
task.setCompletedAt(Instant.now());
}
taskRepository.save(task);
});
}
private boolean isRetriable(Exception e) {
// 超时和服务不可用可以重试,认证错误不重试
return !(e instanceof UnauthorizedException);
}
}七、配置独立的线程池
AI 任务的线程池要和 Web 请求线程池分开,避免 AI 任务慢导致影响 HTTP 接口响应:
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
/**
* AI 任务专用线程池
*/
@Bean("aiTaskExecutor")
public Executor aiTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5); // 核心线程数
executor.setMaxPoolSize(20); // 最大线程数
executor.setQueueCapacity(100); // 队列容量
executor.setThreadNamePrefix("ai-task-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60); // 优雅关闭等待时间
executor.initialize();
return executor;
}
/**
* 通知类任务线程池(轻量级)
*/
@Bean("notificationExecutor")
public Executor notificationExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("notification-");
executor.initialize();
return executor;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex, method, params) ->
log.error("异步任务执行异常 method={}", method.getName(), ex);
}
}八、任务完成通知:监听完成事件
任务完成后可以通过多种方式通知用户,这里用事件监听的方式实现,保持解耦:
@Component
@Slf4j
public class TaskNotificationListener {
private final SseEmitterManager sseEmitterManager;
private final WebSocketNotifier webSocketNotifier;
public TaskNotificationListener(SseEmitterManager sseEmitterManager,
WebSocketNotifier webSocketNotifier) {
this.sseEmitterManager = sseEmitterManager;
this.webSocketNotifier = webSocketNotifier;
}
@Async("notificationExecutor")
@EventListener
public void onTaskCompleted(AiTaskCompletedEvent event) {
log.info("任务完成通知 taskId={} userId={}",
event.getTaskId(), event.getUserId());
TaskNotification notification = TaskNotification.builder()
.taskId(event.getTaskId())
.type("COMPLETED")
.message("您的AI任务已完成")
.build();
// 尝试通过 SSE 推送(如果用户还在页面上)
sseEmitterManager.send(event.getUserId(), notification);
// 通过 WebSocket 推送(如果有长连接)
webSocketNotifier.sendToUser(event.getUserId(), notification);
}
@Async("notificationExecutor")
@EventListener
public void onTaskFailed(AiTaskFailedEvent event) {
log.info("任务失败通知 taskId={} retriable={}",
event.getTaskId(), event.isRetriable());
TaskNotification notification = TaskNotification.builder()
.taskId(event.getTaskId())
.type("FAILED")
.message(event.isRetriable()
? "AI任务处理失败,系统将自动重试"
: "AI任务处理失败,请重新提交")
.build();
sseEmitterManager.send(event.getUserId(), notification);
}
}九、SSE 长连接管理:实时推送任务状态
@Component
@Slf4j
public class SseEmitterManager {
// userId -> SseEmitter
private final Map<String, SseEmitter> emitters = new ConcurrentHashMap<>();
/**
* 用户建立 SSE 连接
*/
public SseEmitter createEmitter(String userId) {
// 移除旧的连接
SseEmitter oldEmitter = emitters.remove(userId);
if (oldEmitter != null) {
oldEmitter.complete();
}
SseEmitter emitter = new SseEmitter(300_000L); // 5分钟超时
emitter.onCompletion(() -> {
emitters.remove(userId);
log.debug("SSE 连接关闭 userId={}", userId);
});
emitter.onTimeout(() -> {
emitters.remove(userId);
log.debug("SSE 连接超时 userId={}", userId);
});
emitters.put(userId, emitter);
return emitter;
}
/**
* 向指定用户推送消息
*/
public void send(String userId, Object data) {
SseEmitter emitter = emitters.get(userId);
if (emitter == null) return;
try {
emitter.send(SseEmitter.event()
.name("task-update")
.data(data, MediaType.APPLICATION_JSON));
} catch (IOException e) {
emitters.remove(userId);
log.debug("SSE 发送失败,移除连接 userId={}", userId);
}
}
}
// Controller 提供 SSE 连接端点
@RestController
@RequestMapping("/api/v1/sse")
public class SseController {
private final SseEmitterManager sseEmitterManager;
@GetMapping("/subscribe")
public SseEmitter subscribe(@AuthenticationPrincipal Jwt jwt) {
String userId = jwt.getSubject();
SseEmitter emitter = sseEmitterManager.createEmitter(userId);
// 发送初始连接成功消息
try {
emitter.send(SseEmitter.event()
.name("connected")
.data("连接成功"));
} catch (IOException e) {
log.warn("初始消息发送失败", e);
}
return emitter;
}
}十、失败重试:基于事件的重试机制
@Component
@Slf4j
public class TaskRetryListener {
private final ApplicationEventPublisher eventPublisher;
private final AiTaskRepository taskRepository;
// 任务重试次数追踪
private final Map<String, Integer> retryCountMap = new ConcurrentHashMap<>();
private static final int MAX_RETRY = 3;
public TaskRetryListener(ApplicationEventPublisher eventPublisher,
AiTaskRepository taskRepository) {
this.eventPublisher = eventPublisher;
this.taskRepository = taskRepository;
}
@Async("aiTaskExecutor")
@EventListener
public void onTaskFailed(AiTaskFailedEvent event) {
if (!event.isRetriable()) return;
String taskId = event.getTaskId();
int retryCount = retryCountMap.getOrDefault(taskId, 0);
if (retryCount >= MAX_RETRY) {
log.error("任务重试次数已达上限 taskId={} maxRetry={}", taskId, MAX_RETRY);
retryCountMap.remove(taskId);
return;
}
retryCountMap.put(taskId, retryCount + 1);
// 指数退避:1s, 2s, 4s
long delayMs = (long) Math.pow(2, retryCount) * 1000;
log.info("任务将在 {}ms 后重试 taskId={} retry={}", delayMs, taskId, retryCount + 1);
// 更新状态
taskRepository.findById(taskId).ifPresent(task -> {
task.setStatus(AiTaskEntity.TaskStatus.RETRYING);
task.setUpdatedAt(Instant.now());
taskRepository.save(task);
});
// 延迟重新发布事件(简单实现,生产可用 ScheduledExecutorService)
try {
Thread.sleep(delayMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
// 这里需要重新构建原始事件,实际项目中可以把原始参数存在任务记录里
// 简化起见,直接修改状态触发重处理
taskRepository.findById(taskId).ifPresent(task -> {
task.setStatus(AiTaskEntity.TaskStatus.PENDING);
task.setUpdatedAt(Instant.now());
taskRepository.save(task);
// 重新发布任务事件
// eventPublisher.publishEvent(reconstructEvent(task));
});
}
}十一、整体架构流程
十二、踩坑记录
坑1:@TransactionalEventListener 的使用时机
如果任务提交和事件发布都在同一个事务里,默认情况下事件在事务提交前就发布了,可能出现监听器查询任务状态还是 NULL 的情况。
解决:把事件发布改成 @TransactionalEventListener(phase = AFTER_COMMIT):
@Async("aiTaskExecutor")
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void handleDocumentSummaryRequest(DocumentSummaryRequestEvent event) {
// 此时事务已提交,数据库里一定能查到任务记录
}坑2:线程池满了导致任务丢失
CallerRunsPolicy 在队列满时会让发布者线程来执行任务,会阻塞 HTTP 线程。生产环境建议换成自定义策略,把溢出的任务记录到数据库,后台扫描重新提交。
坑3:应用重启导致处理中的任务永久卡住
需要在应用启动时扫描并恢复处于 PROCESSING/PENDING 状态但长时间未完成的任务:
@Component
public class TaskRecoveryService implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) {
Instant threshold = Instant.now().minus(10, ChronoUnit.MINUTES);
List<AiTaskEntity> stuckTasks = taskRepository
.findByStatusAndCreatedAtBefore(TaskStatus.PROCESSING, threshold);
stuckTasks.forEach(task -> {
task.setStatus(TaskStatus.PENDING);
taskRepository.save(task);
// 重新发布事件
});
}
}事件驱动架构的代码量比同步方案多,但它带来的是更好的用户体验、更强的系统弹性,以及更清晰的职责边界。对于 AI 这种慢操作,值得投入。
