Event-Driven AI——事件驱动的 AI 工作流设计
Event-Driven AI——事件驱动的 AI 工作流设计
适读人群:Java 后端工程师、AI 工程师 | 阅读时长:约 15 分钟 | 核心价值:用事件驱动架构设计可靠的 AI 处理流程
有段时间我的 AI 服务老是超时,用户投诉体验差。
那是一个文档分析服务,用户上传合同 PDF,AI 帮他们提取关键条款、识别风险点。逻辑不复杂,但有个问题:大一点的合同,要调用好几次 LLM API,整个链路下来七八秒很正常,有时候十几秒。
当时我的设计是同步的:用户上传 → 后端直接调 AI → 返回结果。超时、报错都扔给用户看。这个体验就是屎。
后来我把整个流程改成了事件驱动的,这篇文章就聊这次改造——为什么改、怎么改,以及事件驱动的 AI 工作流在设计上有哪些关键问题。
为什么同步调用 AI API 是个反模式
先说清楚问题在哪里。
LLM API 调用有几个特点,和普通的 REST 接口调用完全不同:
延迟高且不稳定。 GPT-4 的平均响应时间在 5-20 秒之间,而且波动很大。你无法给用户一个确定性的等待时长。
成本高,需要排队控制。 Token 贵,并发调用太多会触发限流,要做速率控制。
失败率比普通接口高。 网络波动、模型过载、content policy 触发,失败场景比数据库查询多得多。
处理链路长。 复杂任务往往要多步 AI 处理:提取 → 分析 → 总结,每一步都是一次 LLM 调用。
把这些特点叠加在同步请求-响应模型里,结果就是:用户点击按钮,然后盯着转圈,然后超时,然后投诉。
事件驱动的方案是:用户点击按钮,后端立即返回"已收到,处理中",然后异步完成 AI 处理,完成后通知用户。
整体架构
我用的是 Kafka + Spring Events 的组合方案。下面是架构示意:
用户请求
│
▼
[API Gateway]
│ 立即返回任务ID
▼
[任务服务] ──publish──> [Kafka Topic: ai.tasks.pending]
│ │
│ ▼
│ [AI Worker 1]
│ [AI Worker 2] ──publish──> [Kafka Topic: ai.tasks.completed]
│ [AI Worker N]
│ │
▼ ▼
[状态查询接口] [结果存储服务]
│ │
▼ ▼
用户轮询/WebSocket推送 [通知服务] ──> 用户这个架构的核心好处:API 层和 AI 处理层完全解耦,可以分别扩展。AI Worker 挂了,任务在 Kafka 里等着,不会丢失。
Spring Boot 实现
依赖配置
<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>任务模型定义
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class AiTask {
private String taskId;
private String taskType; // "contract_analysis", "document_summary", etc.
private String userId;
private String inputData; // JSON 序列化的输入
private TaskStatus status;
private Integer retryCount;
private Instant createdAt;
private Instant updatedAt;
public enum TaskStatus {
PENDING, PROCESSING, COMPLETED, FAILED, DEAD_LETTER
}
}任务提交:立即返回
@Service
@RequiredArgsConstructor
public class AiTaskSubmitService {
private final KafkaTemplate<String, AiTask> kafkaTemplate;
private final RedisTemplate<String, AiTask> redisTemplate;
public String submitTask(String taskType, String userId, String inputData) {
String taskId = UUID.randomUUID().toString();
AiTask task = AiTask.builder()
.taskId(taskId)
.taskType(taskType)
.userId(userId)
.inputData(inputData)
.status(AiTask.TaskStatus.PENDING)
.retryCount(0)
.createdAt(Instant.now())
.updatedAt(Instant.now())
.build();
// 先写状态到 Redis(用于查询)
redisTemplate.opsForValue().set(
"task:" + taskId,
task,
Duration.ofHours(24)
);
// 发布到 Kafka
kafkaTemplate.send("ai.tasks.pending", taskId, task)
.addCallback(
result -> log.info("Task {} published to Kafka", taskId),
ex -> {
// Kafka 发布失败,更新状态
task.setStatus(AiTask.TaskStatus.FAILED);
redisTemplate.opsForValue().set("task:" + taskId, task, Duration.ofHours(24));
log.error("Failed to publish task {}", taskId, ex);
}
);
return taskId; // 立即返回,不等 AI 处理
}
}AI Worker:消费任务
@Service
@RequiredArgsConstructor
@Slf4j
public class AiWorkerService {
private final LlmClient llmClient;
private final RedisTemplate<String, AiTask> redisTemplate;
private final KafkaTemplate<String, AiTask> kafkaTemplate;
private static final int MAX_RETRY = 3;
@KafkaListener(
topics = "ai.tasks.pending",
groupId = "ai-workers",
concurrency = "3" // 3个并发消费者
)
public void processTask(AiTask task) {
log.info("Processing task: {}", task.getTaskId());
// 幂等性检查:同一个任务不重复处理
if (isAlreadyProcessed(task.getTaskId())) {
log.warn("Task {} already processed, skipping", task.getTaskId());
return;
}
// 更新状态为 PROCESSING
updateTaskStatus(task, AiTask.TaskStatus.PROCESSING);
try {
String result = doAiProcessing(task);
// 处理成功,更新状态和结果
updateTaskResult(task, result, AiTask.TaskStatus.COMPLETED);
// 发布完成事件
kafkaTemplate.send("ai.tasks.completed", task.getTaskId(), task);
} catch (Exception e) {
handleFailure(task, e);
}
}
private String doAiProcessing(AiTask task) {
// 根据 taskType 路由到不同的处理逻辑
return switch (task.getTaskType()) {
case "contract_analysis" -> llmClient.analyzeContract(task.getInputData());
case "document_summary" -> llmClient.summarizeDocument(task.getInputData());
default -> throw new UnsupportedOperationException("Unknown task type: " + task.getTaskType());
};
}
private void handleFailure(AiTask task, Exception e) {
log.error("Task {} failed, retry count: {}", task.getTaskId(), task.getRetryCount(), e);
if (task.getRetryCount() < MAX_RETRY) {
// 还有重试次数,重新入队(加上延迟)
task.setRetryCount(task.getRetryCount() + 1);
task.setStatus(AiTask.TaskStatus.PENDING);
task.setUpdatedAt(Instant.now());
// 延迟重试:用延迟队列或者直接重新发
scheduleRetry(task);
} else {
// 超过重试次数,发到死信队列
task.setStatus(AiTask.TaskStatus.DEAD_LETTER);
updateTaskStatus(task, AiTask.TaskStatus.DEAD_LETTER);
kafkaTemplate.send("ai.tasks.dead-letter", task.getTaskId(), task);
log.error("Task {} sent to dead letter queue after {} retries",
task.getTaskId(), MAX_RETRY);
}
}
private boolean isAlreadyProcessed(String taskId) {
AiTask task = redisTemplate.opsForValue().get("task:" + taskId);
return task != null &&
(task.getStatus() == AiTask.TaskStatus.COMPLETED ||
task.getStatus() == AiTask.TaskStatus.PROCESSING);
}
}幂等性:这是最容易翻车的地方
Kafka 消费者在某些情况下会重复消费同一条消息。比如消费者处理到一半崩了,重启后 Kafka 会重新投递。
对普通业务逻辑,幂等性处理有很多成熟方案。但 AI 处理有个特殊性:LLM API 调用很贵,不能因为幂等处理不当就调用两次。
我用的方案是在 Redis 里维护一个"处理锁":
@Service
@RequiredArgsConstructor
public class TaskIdempotencyService {
private final RedisTemplate<String, String> redisTemplate;
private static final String LOCK_PREFIX = "task:lock:";
private static final Duration LOCK_TTL = Duration.ofMinutes(30);
/**
* 尝试获取处理锁,返回 true 表示获取成功(可以处理)
* 使用 Redis SETNX 实现原子性
*/
public boolean tryAcquireProcessingLock(String taskId) {
String lockKey = LOCK_PREFIX + taskId;
Boolean success = redisTemplate.opsForValue()
.setIfAbsent(lockKey, "locked", LOCK_TTL);
return Boolean.TRUE.equals(success);
}
public void releaseProcessingLock(String taskId) {
redisTemplate.delete(LOCK_PREFIX + taskId);
}
}在 Worker 里使用:
@KafkaListener(topics = "ai.tasks.pending", groupId = "ai-workers")
public void processTask(AiTask task) {
// 尝试获取锁,获取失败说明有其他 worker 正在处理
if (!idempotencyService.tryAcquireProcessingLock(task.getTaskId())) {
log.info("Task {} is being processed by another worker", task.getTaskId());
return;
}
try {
// 处理逻辑
doProcess(task);
} finally {
// 注意:成功完成后不释放锁,锁自然过期
// 这样如果再来重复消息,SETNX 会失败,不会重复处理
// 只有在处理失败需要重试的情况下,才手动释放锁
}
}失败重试的策略
AI API 失败的原因各种各样,不同原因需要不同的重试策略:
public enum FailureType {
RATE_LIMIT, // 429,需要等更长时间
SERVER_ERROR, // 500/503,可以立即重试
CONTENT_POLICY, // 内容违规,重试也没用,直接死信
TIMEOUT, // 超时,可以重试
INVALID_INPUT // 输入有问题,重试没用,直接死信
}
private void handleFailure(AiTask task, Exception e) {
FailureType failureType = classifyFailure(e);
switch (failureType) {
case CONTENT_POLICY, INVALID_INPUT -> {
// 不可重试,直接死信
sendToDeadLetter(task, e.getMessage());
}
case RATE_LIMIT -> {
// 限流,延迟 60 秒后重试
scheduleRetryWithDelay(task, Duration.ofSeconds(60));
}
case SERVER_ERROR, TIMEOUT -> {
// 指数退避重试
long delaySeconds = (long) Math.pow(2, task.getRetryCount()) * 5;
scheduleRetryWithDelay(task, Duration.ofSeconds(delaySeconds));
}
}
}
private FailureType classifyFailure(Exception e) {
if (e instanceof RateLimitException) return FailureType.RATE_LIMIT;
if (e instanceof ContentPolicyException) return FailureType.CONTENT_POLICY;
if (e instanceof TimeoutException) return FailureType.TIMEOUT;
// ... 其他分类
return FailureType.SERVER_ERROR;
}用 Spring 内部事件处理短流程
如果是同一个服务内部的多步 AI 处理,不需要跨服务通信,可以用 Spring 的 ApplicationEvent 来做,比 Kafka 轻量:
// 定义事件
public class AiStepCompletedEvent extends ApplicationEvent {
private final String taskId;
private final String stepName;
private final String stepResult;
public AiStepCompletedEvent(Object source, String taskId, String stepName, String stepResult) {
super(source);
this.taskId = taskId;
this.stepName = stepName;
this.stepResult = stepResult;
}
// getters...
}
// 发布事件
@Service
public class ContractAnalysisService {
@Autowired
private ApplicationEventPublisher eventPublisher;
public void extractKeyTerms(String taskId, String contractText) {
String result = llmClient.extract(contractText);
// 发布步骤完成事件,触发下一步
eventPublisher.publishEvent(
new AiStepCompletedEvent(this, taskId, "extract", result)
);
}
}
// 监听事件,触发下一步
@Component
public class ContractAnalysisPipeline {
@EventListener
@Async // 异步执行,不阻塞发布者
public void onExtractCompleted(AiStepCompletedEvent event) {
if ("extract".equals(event.getStepName())) {
// 提取完成,进行风险分析
riskAnalysisService.analyze(event.getTaskId(), event.getStepResult());
}
}
}Spring Events 适合单体应用内的事件流,Kafka 适合跨服务、需要持久化和回放的场景。别一上来就 Kafka,杀鸡别用牛刀。
用户侧的状态轮询
前端怎么知道任务完成了?两种方案:
方案一:轮询接口
@GetMapping("/tasks/{taskId}/status")
public ResponseEntity<TaskStatusResponse> getTaskStatus(@PathVariable String taskId) {
AiTask task = redisTemplate.opsForValue().get("task:" + taskId);
if (task == null) {
return ResponseEntity.notFound().build();
}
return ResponseEntity.ok(TaskStatusResponse.builder()
.taskId(task.getTaskId())
.status(task.getStatus().name())
.result(task.getStatus() == COMPLETED ? task.getResult() : null)
.message(task.getStatus() == FAILED ? "处理失败,请重试" : null)
.build());
}方案二:WebSocket 推送(体验更好)
@Component
@RequiredArgsConstructor
public class TaskCompletionNotifier {
private final SimpMessagingTemplate messagingTemplate;
@KafkaListener(topics = "ai.tasks.completed", groupId = "notifiers")
public void notifyUser(AiTask task) {
// 推送给指定用户
messagingTemplate.convertAndSendToUser(
task.getUserId(),
"/queue/task-updates",
Map.of("taskId", task.getTaskId(), "status", "COMPLETED")
);
}
}总结一下改造效果
改成事件驱动之后:
- 用户提交后立即得到响应,不再超时
- AI Worker 挂了,任务不丢,重启后继续处理
- 可以根据任务量动态扩缩 Worker 数量
- 失败重试有了统一的策略,不是每个地方各搞各的
代价是系统复杂度上升了,需要维护 Kafka、Redis,需要处理幂等性和消息顺序问题。
但对于有一定规模的 AI 服务,这个复杂度是值得的。同步调用 AI API,你是在用最简单的方式做一件不稳定的事情。
