第2286篇:事件驱动的AI架构——用Kafka构建实时AI处理流水线
第2286篇:事件驱动的AI架构——用Kafka构建实时AI处理流水线
适读人群:有Kafka使用经验、想构建实时AI处理系统的后端工程师 | 阅读时长:约17分钟 | 核心价值:掌握事件驱动架构与AI处理的结合模式,解决异步、背压、幂等等工程问题
我们系统的第一版AI处理架构是同步的:用户提交一份合同,HTTP请求等着,AI分析完了再返回结果。一开始还好,后来合同提交量上来了,AI分析需要30-60秒,HTTP超时,前端报错,用户投诉。
加了异步任务队列,用的是数据库轮询方式——创建一个task记录,后台起定时任务轮询。问题是数据库轮询的延迟和对DB的压力,在高并发下都很突出。
最终我们迁移到了Kafka驱动的AI处理流水线。这篇文章把设计思路和关键代码分享出来。
为什么AI处理天然适合事件驱动
AI推理有几个特点,让它和事件驱动架构非常搭:
处理时间不均匀。简单问题1秒,复杂问题60秒,峰值时段突发。传统同步架构在处理时间波动大的场景下会大量占用线程资源。
处理过程可以流水线化。一个完整的AI处理任务往往有多个步骤:预处理 → AI分析 → 后处理 → 结果存储 → 通知用户。这些步骤天然适合流水线,每个步骤都是一个独立的消费者。
需要背压保护。AI API有速率限制,流量突发时如果没有缓冲,直接打到AI提供商API会触发限流。Kafka天然提供了这个缓冲层。
处理结果需要审计。合同分析、财务分析等业务场景,每次AI处理的输入输出都需要留存。Kafka的消息持久化天然满足这个要求。
整体架构如下:
消费者组设计
不同AI任务的优先级和处理速度不同,需要分不同的topic和消费者组:
@Configuration
public class KafkaTopicConfig {
// 高优先级任务(实时分析请求)
public static final String TOPIC_HIGH_PRIORITY = "ai-tasks-high";
// 普通任务(批量处理)
public static final String TOPIC_NORMAL = "ai-tasks-normal";
// AI处理结果
public static final String TOPIC_RESULTS = "ai-results";
// 死信队列(处理失败的消息)
public static final String TOPIC_DLQ = "ai-tasks-dlq";
@Bean
public NewTopic highPriorityTopic() {
return TopicBuilder.name(TOPIC_HIGH_PRIORITY)
.partitions(12) // 高优先级:多分区,高并行度
.replicas(3)
.config(TopicConfig.RETENTION_MS_CONFIG, "86400000") // 24小时
.build();
}
@Bean
public NewTopic normalTopic() {
return TopicBuilder.name(TOPIC_NORMAL)
.partitions(6)
.replicas(3)
.config(TopicConfig.RETENTION_MS_CONFIG, "604800000") // 7天
.build();
}
}@Component
public class AiTaskProducer {
private final KafkaTemplate<String, AiTaskEvent> kafkaTemplate;
public void submitTask(AiTask task) {
// 根据优先级选择topic
String topic = task.getPriority() == Priority.HIGH ?
TOPIC_HIGH_PRIORITY : TOPIC_NORMAL;
AiTaskEvent event = AiTaskEvent.builder()
.taskId(task.getId())
.taskType(task.getType())
.payload(task.getPayload())
.submitTime(Instant.now())
.traceId(MDC.get("traceId")) // 分布式追踪
.build();
// 用taskId作为消息key,保证同一任务的消息到同一分区(顺序保证)
CompletableFuture<SendResult<String, AiTaskEvent>> future =
kafkaTemplate.send(topic, task.getId(), event);
future.whenComplete((result, ex) -> {
if (ex != null) {
log.error("发送AI任务消息失败: taskId={}", task.getId(), ex);
// 写入备用队列或告警
alertService.sendAlert("Kafka发送失败", task.getId());
} else {
log.debug("AI任务消息发送成功: taskId={}, partition={}, offset={}",
task.getId(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}
});
}
}核心消费者:AI处理与限流
这是整个架构最关键的部分——如何在消费Kafka消息的同时,不超过AI API的速率限制:
@Service
public class AiAnalysisConsumer {
private final LlmClient llmClient;
private final RateLimiter rateLimiter; // 令牌桶限流器
private final KafkaTemplate<String, AiResultEvent> resultProducer;
// 并发控制:最多同时处理N个AI请求
private final Semaphore concurrencySemaphore;
public AiAnalysisConsumer(
LlmClient llmClient,
@Value("${ai.ratelimit.requests-per-minute:60}") int rpm,
@Value("${ai.max-concurrent-requests:10}") int maxConcurrent) {
this.llmClient = llmClient;
// Google Guava RateLimiter实现令牌桶
this.rateLimiter = RateLimiter.create(rpm / 60.0); // 转换为每秒
this.concurrencySemaphore = new Semaphore(maxConcurrent);
}
@KafkaListener(
topics = {TOPIC_HIGH_PRIORITY, TOPIC_NORMAL},
groupId = "ai-analyzer",
containerFactory = "batchKafkaListenerContainerFactory",
concurrency = "3" // 3个消费者线程
)
public void consumeBatch(List<ConsumerRecord<String, AiTaskEvent>> records,
Acknowledgment ack) {
// 批量处理,但每条记录独立处理(避免一条失败影响整批)
List<CompletableFuture<Void>> futures = records.stream()
.map(record -> CompletableFuture.runAsync(
() -> processRecord(record),
taskExecutor
))
.collect(Collectors.toList());
// 等待所有处理完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
// 批量确认(只在所有处理完成后提交offset)
ack.acknowledge();
}
private void processRecord(ConsumerRecord<String, AiTaskEvent> record) {
AiTaskEvent event = record.value();
String taskId = event.getTaskId();
// 设置分布式追踪上下文
MDC.put("traceId", event.getTraceId());
MDC.put("taskId", taskId);
try {
// 1. 限流等待(避免触发AI API速率限制)
rateLimiter.acquire();
// 2. 并发控制
concurrencySemaphore.acquire();
try {
// 3. 执行AI分析
AiAnalysisResult result = performAiAnalysis(event);
// 4. 发布结果事件
publishResult(taskId, result);
} finally {
concurrencySemaphore.release();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("处理被中断: taskId={}", taskId, e);
sendToDlq(record, "处理被中断");
} catch (AiApiException e) {
// AI API错误,判断是否可重试
if (e.isRetryable() && getRetryCount(record) < 3) {
log.warn("AI API可重试错误,重新入队: taskId={}", taskId);
requeue(event);
} else {
log.error("AI处理失败,进入死信队列: taskId={}", taskId, e);
sendToDlq(record, e.getMessage());
}
} catch (Exception e) {
log.error("未预期的错误: taskId={}", taskId, e);
sendToDlq(record, "未预期错误: " + e.getMessage());
} finally {
MDC.clear();
}
}
private AiAnalysisResult performAiAnalysis(AiTaskEvent event) {
// 根据任务类型构建不同的prompt
String prompt = promptBuilder.buildPrompt(event.getTaskType(), event.getPayload());
long startTime = System.currentTimeMillis();
String response = llmClient.complete(prompt);
long duration = System.currentTimeMillis() - startTime;
// 记录性能指标
metricsRecorder.recordAiCallDuration(event.getTaskType(), duration);
return AiAnalysisResult.builder()
.taskId(event.getTaskId())
.response(response)
.modelUsed(llmClient.getModel())
.processingMs(duration)
.build();
}
}幂等性设计:消息可能被重复处理
Kafka的at-least-once语义意味着消息可能被重复投递。AI处理的幂等性设计:
@Service
public class IdempotentAiProcessor {
private final RedisTemplate<String, String> redis;
private final AiAnalysisConsumer aiConsumer;
// Redis存储已处理的taskId,TTL 24小时
private static final String PROCESSED_KEY_PREFIX = "ai:processed:";
private static final Duration IDEMPOTENCY_TTL = Duration.ofHours(24);
public void processWithIdempotency(AiTaskEvent event) {
String idempotencyKey = PROCESSED_KEY_PREFIX + event.getTaskId();
// 使用SETNX实现分布式锁 + 去重
Boolean isNew = redis.opsForValue()
.setIfAbsent(idempotencyKey, "processing", IDEMPOTENCY_TTL);
if (Boolean.FALSE.equals(isNew)) {
// 已经处理过,跳过
String status = redis.opsForValue().get(idempotencyKey);
log.info("任务已处理,跳过: taskId={}, status={}", event.getTaskId(), status);
return;
}
try {
AiAnalysisResult result = aiConsumer.performAiAnalysis(event);
// 处理成功,更新状态
redis.opsForValue().set(idempotencyKey, "completed", IDEMPOTENCY_TTL);
publishResult(event.getTaskId(), result);
} catch (Exception e) {
// 处理失败,删除key允许重试
redis.delete(idempotencyKey);
throw e;
}
}
}死信队列处理与监控
生产系统里死信队列的消息需要专门处理,不能忽视:
@Component
public class DlqProcessor {
@KafkaListener(topics = TOPIC_DLQ, groupId = "dlq-processor")
public void processDlq(ConsumerRecord<String, DlqMessage> record) {
DlqMessage dlqMsg = record.value();
log.error("DLQ消息: taskId={}, failReason={}, retryCount={}",
dlqMsg.getTaskId(), dlqMsg.getFailReason(), dlqMsg.getRetryCount());
// 发送告警
alertService.sendCriticalAlert(
String.format("AI任务最终失败: %s\n原因: %s",
dlqMsg.getTaskId(), dlqMsg.getFailReason())
);
// 更新任务状态为failed
taskRepository.updateStatus(dlqMsg.getTaskId(), TaskStatus.FAILED,
dlqMsg.getFailReason());
// 某些类型的失败可以人工修复后重新处理
if (dlqMsg.isManualRetryEligible()) {
manualRetryQueue.add(dlqMsg);
}
}
// 监控Kafka消费者lag
@Scheduled(fixedRate = 60000)
public void checkConsumerLag() {
Map<TopicPartition, Long> lag = consumerLagMonitor.getLag("ai-analyzer");
lag.forEach((tp, lagValue) -> {
if (lagValue > 1000) {
log.warn("消费者lag过高: topic={}, partition={}, lag={}",
tp.topic(), tp.partition(), lagValue);
alertService.sendAlert("Kafka消费者lag告警",
"Topic: " + tp.topic() + ", Lag: " + lagValue);
}
});
}
}这套架构上线后,我们的AI处理系统吞吐量提升了5倍,AI API触发速率限制的情况降到了零,同时提交任务的响应时间从原来的30-60秒降到了毫秒级(因为改成了异步模式)。整体架构的稳定性也大幅提升,任何一个处理节点挂掉,消息都会被重新消费,不会丢失。
