Spring AI + Kafka:事件驱动异步AI处理架构实战
Spring AI + Kafka:事件驱动异步AI处理架构实战
适读人群:有1-5年Java开发经验,想向AI工程师方向转型的开发者 阅读时长:约18分钟 文章价值:① 掌握事件驱动AI架构的核心设计思想 ② 学会用Kafka解耦AI调用瓶颈 ③ 获得一套可直接落地的异步AI处理模板
上周四下午,我接到老王的电话,语气有点急。
老王是我认识了六年的老同事,去年跳槽到一家做智能客服的公司,负责后端架构。他们最近上线了一个AI自动回复功能——用户发来问题,系统调用GPT生成回答,直接返回给用户。
"老张,我现在头疼死了。"他说,"用户量一上来,AI调用全部超时,接口直接打满,整个服务崩了。"
我问他现在是什么架构。
"就是最简单的那种,用户请求进来,同步调用LLM,等结果,返回。"
我当时就笑了。"老王,你这是把最慢的操作放在最核心的链路上了。LLM一个请求少则三秒,多则二十秒,你同步等,等于让每个用户都坐在那儿干等。并发一高,线程池直接打满。"
"那怎么搞?"
"Kafka。事件驱动。"
这篇文章,就是我当时给老王讲的那套方案——用Kafka把AI调用从同步链路里彻底解耦出去,构建一套生产可用的事件驱动异步AI处理架构。
为什么同步AI调用会出问题
先把问题说清楚。
LLM调用有几个显著特点,跟传统的数据库查询、Redis操作完全不同:
| 特性 | 数据库查询 | LLM调用 |
|---|---|---|
| 延迟 | 1-50ms | 1s-30s |
| 不确定性 | 极低 | 高(tokens不定) |
| 失败率 | 极低 | 偏高(限流/超时) |
| 资源消耗 | 低 | 高(长连接) |
| 可重试性 | 安全 | 需幂等设计 |
同步调用LLM,意味着你的HTTP线程要被占用几秒到几十秒。Tomcat默认200个线程,200个并发请求同时调用LLM,全卡住了,第201个请求进来,直接拒绝或超时。
解法只有一个:把AI调用从请求链路里移出去。
用户发请求 → 系统接收、生成任务ID、立即返回 → 消息进Kafka → 后台Consumer消费、调用LLM → 结果写回存储 → 用户轮询或WebSocket推送。
这就是事件驱动AI架构的核心思路。
整体架构设计
先看架构全景:
再看一下核心的消息时序:
代码实现
第一步:依赖和配置
<!-- pom.xml 核心依赖 -->
<dependencies>
<!-- Spring AI OpenAI -->
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-openai-spring-boot-starter</artifactId>
<version>1.0.0</version>
</dependency>
<!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- WebSocket,用于结果推送 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- JPA + PostgreSQL -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
</dependency>
</dependencies># application.yml
spring:
ai:
openai:
api-key: ${OPENAI_API_KEY}
chat:
options:
model: gpt-4o
temperature: 0.7
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all # 强确认,防止消息丢失
retries: 3
consumer:
group-id: ai-task-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
auto-offset-reset: earliest
properties:
spring.json.trusted.packages: "com.laozhang.ai.*"
# 自定义topic配置
ai:
kafka:
topic:
task: ai-task-topic
result: ai-result-topic
partitions: 6 # 6个分区,支持6个Consumer并行
replicas: 2第二步:任务实体和事件对象
// 任务实体
@Entity
@Table(name = "ai_tasks")
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class AiTask {
@Id
private String taskId;
@Column(nullable = false, length = 5000)
private String prompt;
@Enumerated(EnumType.STRING)
private TaskStatus status;
@Column(columnDefinition = "TEXT")
private String result;
private String errorMessage;
@Column(nullable = false)
private String userId;
// 任务类型:CHAT/SUMMARY/TRANSLATION等
private String taskType;
@CreationTimestamp
private LocalDateTime createdAt;
private LocalDateTime processedAt;
private LocalDateTime completedAt;
public enum TaskStatus {
PENDING, PROCESSING, COMPLETED, FAILED
}
}
// Kafka消息事件对象
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class AiTaskEvent {
private String taskId;
private String userId;
private String prompt;
private String taskType;
private Map<String, Object> metadata; // 额外参数,如systemPrompt/temperature
private LocalDateTime createdAt;
}第三步:Controller + TaskService(接入层)
@RestController
@RequestMapping("/api/ai")
@RequiredArgsConstructor
@Slf4j
public class AiTaskController {
private final AiTaskService aiTaskService;
/**
* 提交AI任务,立即返回taskId
*/
@PostMapping("/task")
public ResponseEntity<TaskSubmitResponse> submitTask(
@RequestBody TaskSubmitRequest request,
@AuthenticationPrincipal UserDetails userDetails) {
String taskId = aiTaskService.submitTask(
request.getPrompt(),
request.getTaskType(),
userDetails.getUsername(),
request.getMetadata()
);
return ResponseEntity.ok(TaskSubmitResponse.builder()
.taskId(taskId)
.status("PENDING")
.message("任务已提交,请通过WebSocket或轮询接口获取结果")
.build());
}
/**
* 查询任务状态(轮询方案)
*/
@GetMapping("/task/{taskId}")
public ResponseEntity<AiTaskDTO> getTaskStatus(@PathVariable String taskId) {
return ResponseEntity.ok(aiTaskService.getTaskStatus(taskId));
}
}
@Service
@RequiredArgsConstructor
@Slf4j
public class AiTaskService {
private final AiTaskRepository taskRepository;
private final KafkaTemplate<String, AiTaskEvent> kafkaTemplate;
@Value("${ai.kafka.topic.task}")
private String taskTopic;
@Transactional
public String submitTask(String prompt, String taskType,
String userId, Map<String, Object> metadata) {
// 生成唯一任务ID
String taskId = "task_" + UUID.randomUUID().toString().replace("-", "");
// 持久化任务记录
AiTask task = AiTask.builder()
.taskId(taskId)
.prompt(prompt)
.taskType(taskType)
.userId(userId)
.status(AiTask.TaskStatus.PENDING)
.build();
taskRepository.save(task);
// 发布Kafka事件
// 注意:用taskId作为消息key,保证同一用户的任务进同一分区(有序)
AiTaskEvent event = AiTaskEvent.builder()
.taskId(taskId)
.userId(userId)
.prompt(prompt)
.taskType(taskType)
.metadata(metadata)
.createdAt(LocalDateTime.now())
.build();
kafkaTemplate.send(taskTopic, userId, event)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("任务事件发送失败,taskId={}", taskId, ex);
// 这里可以更新任务状态为FAILED,或走重试逻辑
markTaskFailed(taskId, "消息发送失败: " + ex.getMessage());
} else {
log.info("任务事件发送成功,taskId={}, partition={}",
taskId, result.getRecordMetadata().partition());
}
});
return taskId;
}
@Transactional
public void markTaskFailed(String taskId, String errorMsg) {
taskRepository.findById(taskId).ifPresent(task -> {
task.setStatus(AiTask.TaskStatus.FAILED);
task.setErrorMessage(errorMsg);
taskRepository.save(task);
});
}
public AiTaskDTO getTaskStatus(String taskId) {
AiTask task = taskRepository.findById(taskId)
.orElseThrow(() -> new TaskNotFoundException("任务不存在: " + taskId));
return AiTaskDTO.fromEntity(task);
}
}第四步:AI Consumer(核心处理层)
这是整套架构最关键的部分。Consumer消费Kafka消息,调用Spring AI,把结果写回数据库并推送给用户。
@Component
@RequiredArgsConstructor
@Slf4j
public class AiTaskConsumer {
private final ChatClient chatClient;
private final AiTaskRepository taskRepository;
private final AiResultPublisher resultPublisher;
private final WebSocketSessionManager sessionManager;
/**
* 核心消费逻辑
* concurrency = "3" 表示启动3个Consumer线程并行处理
*/
@KafkaListener(
topics = "${ai.kafka.topic.task}",
groupId = "ai-task-group",
concurrency = "3",
containerFactory = "aiTaskListenerContainerFactory"
)
public void handleAiTask(
@Payload AiTaskEvent event,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
Acknowledgment ack) {
String taskId = event.getTaskId();
log.info("开始处理AI任务,taskId={}, partition={}, offset={}", taskId, partition, offset);
try {
// 1. 更新状态为PROCESSING
updateTaskStatus(taskId, AiTask.TaskStatus.PROCESSING);
// 2. 调用Spring AI
String result = processWithSpringAI(event);
// 3. 写入结果
saveTaskResult(taskId, result);
// 4. 推送给用户
notifyUser(event.getUserId(), taskId, result);
// 5. 手动提交offset(确保处理成功才提交)
ack.acknowledge();
log.info("AI任务处理完成,taskId={}", taskId);
} catch (Exception e) {
log.error("AI任务处理失败,taskId={}", taskId, e);
handleTaskError(taskId, e);
// 这里选择acknowledge,避免无限重试导致消息堆积
// 失败记录已写入数据库,可以通过补偿任务重试
ack.acknowledge();
}
}
private String processWithSpringAI(AiTaskEvent event) {
// 根据taskType选择不同的处理策略
return switch (event.getTaskType()) {
case "CHAT" -> processChatTask(event);
case "SUMMARY" -> processSummaryTask(event);
case "TRANSLATION" -> processTranslationTask(event);
default -> processChatTask(event);
};
}
private String processChatTask(AiTaskEvent event) {
// 从metadata中获取可选参数
String systemPrompt = (String) event.getMetadata()
.getOrDefault("systemPrompt", "你是一个专业的AI助手,请用简洁清晰的中文回答用户问题。");
return chatClient.prompt()
.system(systemPrompt)
.user(event.getPrompt())
.call()
.content();
}
private String processSummaryTask(AiTaskEvent event) {
return chatClient.prompt()
.system("你是一个专业的文本摘要助手。请对以下内容生成结构化摘要,包含:核心观点、关键信息、结论建议。")
.user(event.getPrompt())
.call()
.content();
}
@Transactional
private void updateTaskStatus(String taskId, AiTask.TaskStatus status) {
taskRepository.findById(taskId).ifPresent(task -> {
task.setStatus(status);
if (status == AiTask.TaskStatus.PROCESSING) {
task.setProcessedAt(LocalDateTime.now());
}
taskRepository.save(task);
});
}
@Transactional
private void saveTaskResult(String taskId, String result) {
taskRepository.findById(taskId).ifPresent(task -> {
task.setStatus(AiTask.TaskStatus.COMPLETED);
task.setResult(result);
task.setCompletedAt(LocalDateTime.now());
taskRepository.save(task);
});
}
private void notifyUser(String userId, String taskId, String result) {
// 如果用户有WebSocket连接,推送结果
if (sessionManager.hasSession(userId)) {
sessionManager.sendMessage(userId, AiResultMessage.builder()
.taskId(taskId)
.status("COMPLETED")
.result(result)
.build());
}
}
@Transactional
private void handleTaskError(String taskId, Exception e) {
taskRepository.findById(taskId).ifPresent(task -> {
task.setStatus(AiTask.TaskStatus.FAILED);
task.setErrorMessage(e.getMessage());
task.setCompletedAt(LocalDateTime.now());
taskRepository.save(task);
});
}
}第五步:Kafka配置(含死信队列)
@Configuration
@EnableKafka
public class KafkaConfig {
@Value("${ai.kafka.topic.task}")
private String taskTopic;
@Value("${ai.kafka.partitions:6}")
private int partitions;
@Value("${ai.kafka.replicas:2}")
private short replicas;
/**
* 创建主Topic
*/
@Bean
public NewTopic aiTaskTopic() {
return TopicBuilder.name(taskTopic)
.partitions(partitions)
.replicas(replicas)
.config(TopicConfig.RETENTION_MS_CONFIG, "604800000") // 7天保留
.build();
}
/**
* 死信队列Topic:处理失败的消息会进这里
*/
@Bean
public NewTopic aiTaskDltTopic() {
return TopicBuilder.name(taskTopic + ".DLT")
.partitions(1)
.replicas(replicas)
.build();
}
/**
* 自定义ListenerContainerFactory
* 配置手动提交、并发数、错误处理
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, AiTaskEvent>
aiTaskListenerContainerFactory(
ConsumerFactory<String, AiTaskEvent> consumerFactory,
KafkaTemplate<String, AiTaskEvent> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<String, AiTaskEvent> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
// 手动提交offset
factory.getContainerProperties().setAckMode(
ContainerProperties.AckMode.MANUAL_IMMEDIATE);
// 配置死信队列:重试3次失败后发送到DLT
DefaultErrorHandler errorHandler = new DefaultErrorHandler(
new DeadLetterPublishingRecoverer(kafkaTemplate,
(record, ex) -> new TopicPartition(taskTopic + ".DLT", 0)),
new FixedBackOff(2000L, 3L) // 间隔2s,最多重试3次
);
// 不重试的异常类型(业务逻辑错误,重试没意义)
errorHandler.addNotRetryableExceptions(
IllegalArgumentException.class,
NullPointerException.class
);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
}生产环境关键考量
消息幂等性设计
网络抖动可能导致Kafka消息被消费两次。必须做幂等保护:
@Component
public class IdempotencyGuard {
private final RedisTemplate<String, String> redisTemplate;
/**
* 检查任务是否已经处理过
* 利用Redis的SETNX原子性保证只处理一次
*/
public boolean tryAcquire(String taskId) {
String lockKey = "ai:task:processing:" + taskId;
Boolean acquired = redisTemplate.opsForValue()
.setIfAbsent(lockKey, "1", Duration.ofHours(24));
return Boolean.TRUE.equals(acquired);
}
}
// 在Consumer中使用
@KafkaListener(topics = "${ai.kafka.topic.task}", ...)
public void handleAiTask(@Payload AiTaskEvent event, Acknowledgment ack) {
// 幂等检查:如果已经处理过,直接acknowledge跳过
if (!idempotencyGuard.tryAcquire(event.getTaskId())) {
log.warn("任务已处理过,跳过重复消费,taskId={}", event.getTaskId());
ack.acknowledge();
return;
}
// ... 正常处理逻辑
}背压控制
当LLM调用慢、Kafka堆积时,需要控制Consumer的消费速率:
spring:
kafka:
listener:
# 每次poll最多拉取5条消息,避免Consumer积压太多
max-poll-records: 5
# poll间隔,给LLM调用留时间
poll-timeout: 3000效果对比
给老王说完这套方案,他花了三天时间改造上线。一周后发来消息:
"老张,真的牛。接口P99从18秒降到50毫秒,AI处理能力横向扩容,加Consumer实例就行。系统再也没崩过。"
| 指标 | 同步方案 | 事件驱动方案 |
|---|---|---|
| 接口P99延迟 | 18s+ | <100ms |
| 最大并发支持 | ~50 | 数千(横向扩展) |
| LLM调用失败影响 | 直接返回错误 | 重试/降级透明 |
| 系统可扩展性 | 垂直扩展受限 | 水平无限扩展 |
| 代码复杂度 | 简单 | 中等 |
同步方案在低并发场景完全够用,但一旦有规模,事件驱动是唯一出路。AI应用的延迟特性,天然适合这套架构。
