第1925篇:消息驱动的AI架构模式——用Kafka解耦同步AI调用的痛点
第1925篇:消息驱动的AI架构模式——用Kafka解耦同步AI调用的痛点
同步调用AI接口,是很多团队最自然的选择。HTTP请求打出去,等结果回来,然后继续处理。简单直接,好理解。
但我在一个内容生产系统里,因为这个"自然的选择"被折磨了很长时间。
那个系统需要对用户上传的文章做四步处理:质量评分、关键词提取、自动分类、生成摘要。每一步都调用AI接口,串联起来是一个同步流水线。
刚开始流量小的时候,一切正常。用户上传文章,大概15秒能拿到全部处理结果。但随着用量增加,问题开始出现:
第一个问题:任何一步的AI服务超时,整个流水线就卡住了,15秒变成30秒,然后触发HTTP超时,用户以为上传失败,重新上传,又来一次。
第二个问题:四个步骤是串行的,但其实"关键词提取"和"自动分类"并不依赖"质量评分"的结果,完全可以并行。串行执行白白浪费了时间。
第三个问题:流量高峰时,AI服务已经在排队了,但新请求还在不断进来。我们没有地方可以"缓冲",只能靠超时来清理积压。
这三个问题,都指向同一个根因:同步调用没有给系统留出任何缓冲空间。
异步消息驱动的核心价值
把AI调用从同步改成异步,本质上是在系统里引入了一个"缓冲区"——Kafka的Topic。
改成异步之后:
- 用户上传文章,立即返回任务ID,不等AI处理完
- 各AI步骤通过Kafka消息触发,可以并行执行
- Kafka提供了天然的积压缓冲,AI服务有多快就消费多快
- 任何一个AI步骤失败,可以重试,不影响其他步骤
消息设计:AI任务消息的Schema
消息的设计直接影响系统的扩展性,不能随便搞。我们的AI任务消息格式:
@Data
@Builder
@JsonDeserialize(builder = AITaskMessage.AITaskMessageBuilder.class)
public class AITaskMessage {
// 消息元信息
private String messageId; // 消息唯一ID(用于幂等)
private String taskId; // 业务任务ID
private String taskType; // 任务类型:QUALITY_SCORE, KEYWORD_EXTRACT等
private int priority; // 优先级 1-10(高优先级消息先处理)
private long publishedAt; // 发布时间戳
// 重试信息
private int retryCount; // 已重试次数
private int maxRetries; // 最大重试次数
private long nextRetryAt; // 下次重试时间(毫秒时间戳)
// 负载
private String payloadType; // 负载类型,用于反序列化
private String payloadJson; // 实际业务数据(JSON字符串)
// 追踪信息
private String traceId; // 分布式追踪ID
private String correlationId; // 关联ID,把同一个业务流程的消息串起来
private String sourceService; // 来源服务
// 超时控制
private long expireAt; // 消息过期时间(超过这个时间不再处理)
public boolean isExpired() {
return expireAt > 0 && System.currentTimeMillis() > expireAt;
}
public boolean canRetry() {
return retryCount < maxRetries;
}
}核心字段解释:
correlationId:把属于同一篇文章处理的所有消息关联起来,方便追踪expireAt:AI任务有时效性,超时的任务不处理,直接丢弃,避免处理"过期"的任务priority:VIP用户的任务可以设更高优先级
Kafka Topic设计
Topic的设计需要考虑隔离性和扩展性:
@Configuration
public class KafkaTopicConfig {
// 按任务类型设置不同的Topic
// 好处:各类型任务可以独立扩缩容,互不影响
public static final String TOPIC_ARTICLE_UPLOADED = "ai.article.uploaded";
public static final String TOPIC_QUALITY_SCORE = "ai.task.quality-score";
public static final String TOPIC_KEYWORD_EXTRACT = "ai.task.keyword-extract";
public static final String TOPIC_AUTO_CLASSIFY = "ai.task.auto-classify";
public static final String TOPIC_SUMMARY_GEN = "ai.task.summary-gen";
// 结果Topic
public static final String TOPIC_TASK_RESULT = "ai.task.result";
// 死信队列
public static final String TOPIC_DLQ = "ai.task.dlq";
// 重试Topic(延迟重试用)
public static final String TOPIC_RETRY = "ai.task.retry";
@Bean
public NewTopic qualityScoreTopic() {
// 质量评分:耗时短,高并发,多分区
return TopicBuilder.name(TOPIC_QUALITY_SCORE)
.partitions(12)
.replicas(3)
.config(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(Duration.ofHours(24).toMillis()))
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "lz4")
.build();
}
@Bean
public NewTopic summaryGenTopic() {
// 摘要生成:耗时长,低并发,少分区
return TopicBuilder.name(TOPIC_SUMMARY_GEN)
.partitions(4)
.replicas(3)
.config(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(Duration.ofHours(24).toMillis()))
.build();
}
@Bean
public NewTopic dlqTopic() {
return TopicBuilder.name(TOPIC_DLQ)
.partitions(4)
.replicas(3)
.config(TopicConfig.RETENTION_MS_CONFIG,
String.valueOf(Duration.ofDays(7).toMillis())) // DLQ保留7天
.build();
}
}Topic分区数的设计原则:
- 耗时短的AI任务(分类、评分):分区多,支持高并发消费
- 耗时长的AI任务(生成、摘要):分区少,避免过多并发打爆GPU
- 分区数 ≥ 预期的Consumer实例数
Producer:发布AI任务
@Component
public class AITaskProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Autowired
private ObjectMapper objectMapper;
/**
* 发布AI任务消息
* @param topic 目标Topic
* @param taskId 任务ID
* @param payload 业务负载
*/
public CompletableFuture<String> publishTask(String topic,
String taskId,
Object payload,
int priority) {
AITaskMessage message = AITaskMessage.builder()
.messageId(UUID.randomUUID().toString())
.taskId(taskId)
.correlationId(extractCorrelationId()) // 从MDC取traceId
.priority(priority)
.publishedAt(System.currentTimeMillis())
.retryCount(0)
.maxRetries(3)
.payloadType(payload.getClass().getName())
.payloadJson(serialize(payload))
.traceId(MDC.get("traceId"))
.sourceService("content-processing-service")
.expireAt(System.currentTimeMillis() + Duration.ofHours(2).toMillis())
.build();
String messageJson = serialize(message);
// 用taskId作为消息Key,保证同一任务的消息发到同一分区
// 这样同一任务的消息在Kafka内是有序的
return kafkaTemplate.send(topic, taskId, messageJson)
.thenApply(result -> {
log.info("AI任务消息发布成功: topic={}, taskId={}, partition={}, offset={}",
topic, taskId,
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
return message.getMessageId();
})
.exceptionally(e -> {
log.error("AI任务消息发布失败: topic={}, taskId={}, error={}",
topic, taskId, e.getMessage());
throw new RuntimeException("消息发布失败", e);
});
}
private String serialize(Object obj) {
try {
return objectMapper.writeValueAsString(obj);
} catch (JsonProcessingException e) {
throw new RuntimeException("序列化失败", e);
}
}
private String extractCorrelationId() {
String correlationId = MDC.get("correlationId");
return correlationId != null ? correlationId : UUID.randomUUID().toString();
}
}Consumer:处理AI任务
Consumer的实现需要特别注意以下几点:幂等性、错误处理、背压控制。
@Component
@Slf4j
public class QualityScoreConsumer {
@Autowired
private QualityScoreService qualityScoreService;
@Autowired
private AITaskProducer producer;
@Autowired
private TaskResultRepository resultRepo;
@Autowired
private IdempotencyChecker idempotencyChecker;
@KafkaListener(
topics = KafkaTopicConfig.TOPIC_QUALITY_SCORE,
groupId = "quality-score-consumer-group",
containerFactory = "aiTaskContainerFactory",
concurrency = "4" // 4个并发Consumer线程
)
public void consume(ConsumerRecord<String, String> record,
Acknowledgment acknowledgment) {
AITaskMessage message = null;
try {
message = deserialize(record.value());
// 1. 检查消息是否过期
if (message.isExpired()) {
log.warn("消息已过期,跳过处理: messageId={}, taskId={}",
message.getMessageId(), message.getTaskId());
acknowledgment.acknowledge();
return;
}
// 2. 幂等性检查:同一消息不重复处理
if (idempotencyChecker.isProcessed(message.getMessageId())) {
log.info("消息已处理过,跳过: messageId={}", message.getMessageId());
acknowledgment.acknowledge();
return;
}
// 3. 设置MDC追踪信息
MDC.put("taskId", message.getTaskId());
MDC.put("traceId", message.getTraceId());
MDC.put("messageId", message.getMessageId());
// 4. 执行AI任务
QualityScorePayload payload = deserializePayload(message, QualityScorePayload.class);
QualityScoreResult result = qualityScoreService.score(payload.getArticleText());
// 5. 保存结果
resultRepo.savePartialResult(message.getTaskId(), "quality_score", result);
// 6. 标记幂等
idempotencyChecker.markProcessed(message.getMessageId(), Duration.ofHours(24));
// 7. 手动提交offset(at-least-once)
acknowledgment.acknowledge();
log.info("质量评分完成: taskId={}, score={}", message.getTaskId(), result.getScore());
} catch (AIServiceException e) {
// AI服务错误:根据错误类型决定是否重试
handleAIError(message, e, acknowledgment);
} catch (Exception e) {
// 未知错误:发送到死信队列,不阻塞处理
log.error("处理消息时发生未知错误: messageId={}",
message != null ? message.getMessageId() : "unknown", e);
sendToDlq(message, e);
acknowledgment.acknowledge(); // 即使有错,也提交offset,不让消息堵塞
} finally {
MDC.clear();
}
}
private void handleAIError(AITaskMessage message,
AIServiceException e,
Acknowledgment acknowledgment) {
if (message == null) {
acknowledgment.acknowledge();
return;
}
if (e.isRetryable() && message.canRetry()) {
// 可重试错误,发到重试Topic
AITaskMessage retryMessage = message.toBuilder()
.retryCount(message.getRetryCount() + 1)
.nextRetryAt(System.currentTimeMillis() +
calculateBackoff(message.getRetryCount()))
.build();
producer.publishTask(KafkaTopicConfig.TOPIC_RETRY,
message.getTaskId(), retryMessage, message.getPriority());
log.warn("AI任务错误,已加入重试队列: taskId={}, retryCount={}/{}",
message.getTaskId(), retryMessage.getRetryCount(), message.getMaxRetries());
} else {
// 不可重试或已达最大重试次数,发到死信队列
sendToDlq(message, e);
log.error("AI任务彻底失败,已发送到死信队列: taskId={}", message.getTaskId());
}
acknowledgment.acknowledge();
}
private long calculateBackoff(int retryCount) {
// 指数退避 + 随机抖动
long baseMs = (long) Math.pow(2, retryCount) * 1000; // 1s, 2s, 4s, 8s...
long jitter = (long) (Math.random() * 1000); // 0-1秒的随机抖动
return Math.min(baseMs + jitter, 30_000); // 最长30秒
}
private void sendToDlq(AITaskMessage message, Exception e) {
if (message == null) return;
try {
DlqMessage dlqMessage = new DlqMessage(message, e.getMessage(),
e.getClass().getSimpleName(), System.currentTimeMillis());
producer.publishTask(KafkaTopicConfig.TOPIC_DLQ,
message.getTaskId(), dlqMessage, 1);
} catch (Exception dlqError) {
log.error("发送到死信队列失败: {}", dlqError.getMessage());
}
}
}Consumer容器配置:背压控制
默认的Kafka Consumer配置对AI场景不合适,需要调整背压参数:
@Configuration
public class KafkaConsumerConfig {
@Bean("aiTaskContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> aiTaskContainerFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
// 手动提交offset,确保at-least-once语义
factory.getContainerProperties().setAckMode(
ContainerProperties.AckMode.MANUAL_IMMEDIATE
);
// 背压控制:限制每次poll的消息数量
// 对于慢AI任务,每次最多取2条,避免拿了太多消息但处理不过来
factory.getContainerProperties().setConsumerRebalanceListener(
new ConsumerAwareRebalanceListener() {
// 处理重平衡事件
}
);
// 设置Consumer属性
Map<String, Object> consumerProps = new HashMap<>();
consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "2");
consumerProps.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "120000"); // 2分钟
// AI推理可能很慢,给够时间,避免Consumer被踢出消费者组
consumerProps.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
consumerProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000");
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps));
return factory;
}
}这里有个很重要的配置:MAX_POLL_INTERVAL_MS。默认值是300秒,看起来很长。但如果某个AI任务耗时超过了这个时间(比如生成一篇长文),Kafka会认为Consumer挂了,触发重平衡,导致这条消息被其他Consumer重新处理。对于AI任务,要把这个值设大一点。
延迟重试:基于时间轮的调度
重试消息不能立即处理(需要等待退避时间),需要一个延迟调度机制:
@Component
public class RetryConsumer {
@Autowired
private AITaskProducer producer;
@KafkaListener(
topics = KafkaTopicConfig.TOPIC_RETRY,
groupId = "retry-consumer-group"
)
public void handleRetry(ConsumerRecord<String, String> record,
Acknowledgment acknowledgment) {
AITaskMessage message = deserialize(record.value());
long now = System.currentTimeMillis();
long nextRetryAt = message.getNextRetryAt();
if (now < nextRetryAt) {
// 还没到重试时间,先睡一会
long waitMs = Math.min(nextRetryAt - now, 5000); // 最多等5秒
try {
Thread.sleep(waitMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 不提交offset,让消息重新被poll到
return;
}
// 到重试时间了,根据任务类型重新发布到对应的处理Topic
String targetTopic = getTargetTopic(message.getTaskType());
producer.publishTask(targetTopic, message.getTaskId(), message, message.getPriority());
acknowledgment.acknowledge();
log.info("重试消息已重新发布: taskId={}, retryCount={}",
message.getTaskId(), message.getRetryCount());
}
private String getTargetTopic(String taskType) {
return switch (taskType) {
case "QUALITY_SCORE" -> KafkaTopicConfig.TOPIC_QUALITY_SCORE;
case "KEYWORD_EXTRACT" -> KafkaTopicConfig.TOPIC_KEYWORD_EXTRACT;
case "AUTO_CLASSIFY" -> KafkaTopicConfig.TOPIC_AUTO_CLASSIFY;
case "SUMMARY_GEN" -> KafkaTopicConfig.TOPIC_SUMMARY_GEN;
default -> throw new IllegalArgumentException("未知任务类型: " + taskType);
};
}
}结果聚合:等待所有步骤完成
文章处理有多个步骤,需要等所有步骤都完成后再通知用户:
@Component
public class TaskResultAggregator {
@Autowired
private TaskResultRepository resultRepo;
@Autowired
private NotificationService notificationService;
@KafkaListener(
topics = KafkaTopicConfig.TOPIC_TASK_RESULT,
groupId = "result-aggregator-group"
)
public void aggregate(ConsumerRecord<String, String> record,
Acknowledgment acknowledgment) {
TaskPartialResult partial = deserialize(record.value(), TaskPartialResult.class);
// 将部分结果合并到任务结果集
resultRepo.mergePartialResult(partial.getTaskId(), partial);
// 检查是否所有必要步骤都完成了
TaskCompletionStatus status = checkCompletion(partial.getTaskId());
if (status.isComplete()) {
// 所有步骤完成,聚合最终结果
ArticleProcessingResult finalResult = buildFinalResult(partial.getTaskId());
// 通知用户(WebSocket推送)
notificationService.notifyUser(
finalResult.getUserId(),
NotificationType.ARTICLE_PROCESSING_COMPLETE,
finalResult
);
log.info("文章处理全部完成: taskId={}, 用时{}ms",
partial.getTaskId(), status.getTotalDurationMs());
} else if (status.hasFailures() && status.isDeadlineExceeded()) {
// 有失败步骤且已超过最大等待时间,用已有结果构建不完整的结果
ArticleProcessingResult partialResult = buildPartialResult(partial.getTaskId(), status);
notificationService.notifyUser(
partialResult.getUserId(),
NotificationType.ARTICLE_PROCESSING_PARTIAL,
partialResult
);
}
acknowledgment.acknowledge();
}
private TaskCompletionStatus checkCompletion(String taskId) {
Set<String> required = Set.of("quality_score", "keywords", "category", "summary");
Set<String> completed = resultRepo.getCompletedSteps(taskId);
boolean allDone = completed.containsAll(required);
Set<String> failed = resultRepo.getFailedSteps(taskId);
long startTime = resultRepo.getTaskStartTime(taskId);
long maxWaitMs = Duration.ofMinutes(5).toMillis();
return new TaskCompletionStatus(
allDone,
!failed.isEmpty(),
System.currentTimeMillis() - startTime > maxWaitMs,
System.currentTimeMillis() - startTime
);
}
}监控与积压告警
消息积压是异步系统的核心指标,必须监控:
@Component
@Slf4j
public class KafkaLagMonitor {
@Autowired
private KafkaAdmin kafkaAdmin;
@Scheduled(fixedDelay = 60_000) // 每分钟检查一次
public void checkConsumerLag() {
Map<String, Long> lagByTopic = getLagByTopic();
for (Map.Entry<String, Long> entry : lagByTopic.entrySet()) {
String topic = entry.getKey();
long lag = entry.getValue();
// 积压超过1000条:告警
if (lag > 1000) {
log.warn("[Kafka积压告警] topic={}, lag={}", topic, lag);
// 发送钉钉/企业微信告警
}
// 积压超过5000条:紧急告警,可能需要扩容Consumer
if (lag > 5000) {
log.error("[Kafka严重积压] topic={}, lag={}, 请检查Consumer是否正常", topic, lag);
}
}
}
private Map<String, Long> getLagByTopic() {
// 通过KafkaAdmin获取各Topic的积压情况
// 实际实现依赖Kafka Admin API
Map<String, Long> result = new HashMap<>();
// ... 实现省略,可以用 KafkaConsumerLag 相关的API
return result;
}
}踩坑记录
坑一:没有设置消息过期时间,处理了大量"过期"任务。用户已经放弃等待了,但消息还在Kafka里,被Consumer处理后白白消耗了GPU资源。教训:AI任务消息必须设过期时间。
坑二:MAX_POLL_INTERVAL_MS设得太小,AI推理时Consumer被踢出消费者组。表现是消息被反复消费(每次消费都超时,然后触发重平衡,消息被其他Consumer再拿走),导致同一条消息被处理了很多次。教训:MAX_POLL_INTERVAL_MS要比AI任务的最大可能耗时长至少50%。
坑三:结果聚合时没有处理"步骤永远完不成"的情况。某个步骤死信了,但聚合器一直在等,用户永远收不到通知。教训:聚合器必须有超时机制,超时后用已有结果构建不完整结果通知用户。
坑四:开发环境Consumer没有提交offset,测试完Kafka里积压了几千条消息,上生产时全被真实Consumer消费了。教训:开发环境要用独立的Consumer Group,或者测试后手动清理Topic。
消息驱动架构不是银弹,引入Kafka会增加系统复杂度和运维成本。但对于AI场景,同步调用的脆弱性往往比这个复杂度更难接受。
什么时候适合引入Kafka?当你发现AI服务的响应时间已经影响到了用户交互体验,或者AI调用链路中任何一个环节挂掉会拖垮整个链路,是时候考虑异步化了。
