事件驱动的 AI 工作流——用 Kafka 实现实时 AI 个性化
事件驱动的 AI 工作流——用 Kafka 实现实时 AI 个性化
前不久做了一个用户行为分析和个性化推荐的改造项目。原来的架构是同步的:用户下单 → 同步调用 AI → 生成个性化推荐 → 写入数据库。看起来没问题,但上线一段时间后发现,每次用户下单,整个请求链路增加了 1.5 到 2 秒的 AI 处理时间。
用户体验不能这样搞,下单是核心流程,不能因为 AI 推荐慢了就让用户等。
改造的方向很明确:把 AI 处理从下单主流程里剥离出来,用 Kafka 做事件驱动,异步处理。
这篇文章讲的就是这个改造的完整技术方案,以及事件驱动 AI 架构的通用设计思路。
一、同步 AI 调用的问题
在讲方案之前,先把「同步 AI 调用」的本质问题说清楚。
1.1 性能问题
同步 AI 调用的典型链路:
用户请求 → 业务处理(50ms)→ AI API 调用(800-2000ms)→ 结果写库(20ms)→ 返回用户
总耗时:870ms - 2070msAI API 的延迟是不可控的,而且有尾部延迟(P99 经常比 P50 高出 3-5 倍)。这种不可控的延迟污染了整个请求链路。
1.2 可用性问题
如果 AI 服务出现问题(网络超时、服务降级、熔断触发),同步调用会直接影响主业务。「推荐服务不可用 → 下单失败」是完全不可接受的。
1.3 扩展性问题
AI 处理通常比较重:需要拉取用户历史、计算 embedding、调用 LLM。如果同步嵌在主流程里,QPS 高了主流程也会被拖慢。
事件驱动的核心价值:把 AI 处理从主流程解耦,让主流程不受 AI 延迟影响。
二、事件驱动 AI 架构
关键特性:
- 主流程(绿色):下单成功立即返回用户,不等 AI 处理
- AI 处理(蓝色):异步消费订单事件,不影响主流程
- 事件链:AI 处理结果本身也是事件,触发下游的通知等操作
三、Kafka 主题设计
事件驱动架构里,Topic 的设计非常重要,直接影响系统的可扩展性和可维护性。
/**
* Topic 常量定义
*/
public final class AiEventTopics {
// 触发 AI 处理的上游事件
public static final String ORDER_COMPLETED = "order.completed";
public static final String USER_BROWSED = "user.browsed";
public static final String USER_SEARCHED = "user.searched";
public static final String REVIEW_SUBMITTED = "review.submitted";
// AI 处理结果事件
public static final String RECOMMENDATION_READY = "ai.recommendation.ready";
public static final String USER_PROFILE_UPDATED = "ai.user-profile.updated";
public static final String CONTENT_MODERATION_DONE = "ai.content-moderation.done";
// AI 处理失败事件(进死信队列)
public static final String AI_PROCESSING_FAILED = "ai.processing.failed";
public static final String AI_PROCESSING_DLQ = "ai.processing.dlq";
private AiEventTopics() {}
}Kafka Topic 配置:
@Configuration
public class KafkaTopicConfig {
@Bean
public NewTopic orderCompletedTopic() {
return TopicBuilder.name(AiEventTopics.ORDER_COMPLETED)
.partitions(12) // 12 个分区,支持 12 个并行消费者
.replicas(3) // 3 副本,高可用
.config(TopicConfig.RETENTION_MS_CONFIG, "604800000") // 7天保留
.build();
}
@Bean
public NewTopic recommendationReadyTopic() {
return TopicBuilder.name(AiEventTopics.RECOMMENDATION_READY)
.partitions(6)
.replicas(3)
.config(TopicConfig.RETENTION_MS_CONFIG, "86400000") // 1天保留
.build();
}
@Bean
public NewTopic aiProcessingDlqTopic() {
return TopicBuilder.name(AiEventTopics.AI_PROCESSING_DLQ)
.partitions(3)
.replicas(3)
.config(TopicConfig.RETENTION_MS_CONFIG, "2592000000") // 30天保留(便于排查)
.build();
}
}四、事件定义
/**
* 订单完成事件(上游触发)
*/
@Data
@Builder
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public class OrderCompletedEvent {
private String eventId;
private String orderId;
private String userId;
private List<OrderItemInfo> items;
private Double totalAmount;
private Long occurredAt; // Unix 时间戳(毫秒)
@Data
@Builder
public static class OrderItemInfo {
private String productId;
private String categoryId;
private Integer quantity;
private Double price;
}
}
/**
* AI 推荐就绪事件(AI 处理结果)
*/
@Data
@Builder
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public class RecommendationReadyEvent {
private String eventId;
private String userId;
private String triggerEventId; // 触发这次 AI 处理的上游事件 ID
private String triggerType; // ORDER_COMPLETED / USER_BROWSED 等
private List<RecommendedItem> recommendations;
private Long processedAt;
@Data
@Builder
public static class RecommendedItem {
private String productId;
private Double score;
private String reason; // AI 给出的推荐理由
}
}五、事件发布(生产者)
订单服务在主流程完成后发布事件:
@Service
@Slf4j
public class OrderEventPublisher {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final ObjectMapper objectMapper;
/**
* 发布订单完成事件
* 在订单主流程的事务提交后调用(TransactionalEventListener)
*/
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void publishOrderCompleted(OrderCompletedDomainEvent domainEvent) {
Order order = domainEvent.order();
OrderCompletedEvent kafkaEvent = OrderCompletedEvent.builder()
.eventId(UUID.randomUUID().toString())
.orderId(order.getId())
.userId(order.getUserId())
.items(order.getItems().stream()
.map(item -> OrderCompletedEvent.OrderItemInfo.builder()
.productId(item.getProductId())
.categoryId(item.getCategoryId())
.quantity(item.getQuantity())
.price(item.getUnitPrice())
.build())
.toList())
.totalAmount(order.getTotalAmount())
.occurredAt(System.currentTimeMillis())
.build();
// 以 userId 为 key,保证同一用户的事件有序处理
kafkaTemplate.send(
AiEventTopics.ORDER_COMPLETED,
order.getUserId(), // key
kafkaEvent
).whenComplete((result, ex) -> {
if (ex != null) {
log.error("发布订单事件失败,orderId: {}, error: {}",
order.getId(), ex.getMessage());
// 不要抛出异常,主流程已完成
} else {
log.info("订单事件发布成功,orderId: {}, partition: {}, offset: {}",
order.getId(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
}
});
}
}注意:使用 @TransactionalEventListener(phase = AFTER_COMMIT) 确保只有在数据库事务成功提交后才发布事件。如果用 @EventListener,可能出现事务回滚但事件已发出的情况。
六、AI 处理服务(消费者)
这是整个方案的核心部分:
@Service
@Slf4j
public class AiPersonalizationConsumer {
private final ChatClient chatClient;
private final UserBehaviorService userBehaviorService;
private final RecommendationRepository recommendationRepo;
private final KafkaTemplate<String, Object> kafkaTemplate;
@KafkaListener(
topics = AiEventTopics.ORDER_COMPLETED,
groupId = "ai-personalization",
concurrency = "4" // 4 个并发消费者线程
)
public void handleOrderCompleted(
@Payload OrderCompletedEvent event,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
Acknowledgment acknowledgment) {
log.info("收到订单完成事件,userId: {}, orderId: {}, partition: {}, offset: {}",
event.getUserId(), event.getOrderId(), partition, offset);
try {
// 执行 AI 个性化处理
List<RecommendationReadyEvent.RecommendedItem> recommendations =
generateRecommendations(event);
// 保存推荐结果
recommendationRepo.save(event.getUserId(), recommendations);
// 发布推荐就绪事件
publishRecommendationReady(event, recommendations);
// 手动提交 offset(确保处理成功后才提交)
acknowledgment.acknowledge();
log.info("AI 个性化处理完成,userId: {}, 推荐数量: {}",
event.getUserId(), recommendations.size());
} catch (Exception e) {
log.error("AI 个性化处理失败,userId: {}, orderId: {}, error: {}",
event.getUserId(), event.getOrderId(), e.getMessage());
// 失败时发送到死信队列,不 acknowledge,让 Kafka 重试
sendToDlq(event, e);
// 这里不 acknowledge,Kafka 会重试(根据重试策略)
// 如果是不可重试的错误(如业务逻辑错误),则 acknowledge 并记录
if (isNonRetryableError(e)) {
acknowledgment.acknowledge();
}
}
}
/**
* 核心:调用 AI 生成个性化推荐
*/
private List<RecommendationReadyEvent.RecommendedItem> generateRecommendations(
OrderCompletedEvent event) {
// 1. 拉取用户历史行为(最近30天的购买和浏览记录)
UserBehaviorSummary behaviorSummary =
userBehaviorService.getSummary(event.getUserId(), 30);
// 2. 构建推荐 Prompt
String prompt = buildRecommendationPrompt(event, behaviorSummary);
// 3. 调用 AI 生成推荐
String aiResponse = chatClient.prompt()
.system("""
你是一个电商个性化推荐引擎。
根据用户的购买历史和当前订单,推荐最相关的商品。
推荐要考虑:用户的价格敏感度、品类偏好、最近的行为趋势。
""")
.user(prompt)
.options(OpenAiChatOptions.builder()
.temperature(0.3)
.responseFormat(new ResponseFormat(ResponseFormat.Type.JSON_OBJECT))
.build())
.call()
.content();
// 4. 解析 AI 返回的推荐结果
return parseRecommendations(aiResponse);
}
private String buildRecommendationPrompt(
OrderCompletedEvent event,
UserBehaviorSummary behavior) {
return String.format("""
## 用户当前订单
购买商品:%s
订单金额:%.2f 元
## 用户历史行为(近30天)
购买品类:%s
浏览较多的品类:%s
平均客单价:%.2f 元
最近购买趋势:%s
## 推荐任务
请推荐 5 个最可能感兴趣的商品类别(用商品类别 ID 表示),
并给出每个推荐的置信度(0-1)和简短理由。
返回 JSON 格式:
{
"recommendations": [
{"categoryId": "CAT001", "score": 0.95, "reason": "推荐理由"},
...
]
}
""",
formatOrderItems(event.getItems()),
event.getTotalAmount(),
behavior.purchasedCategories(),
behavior.browsedCategories(),
behavior.avgOrderAmount(),
behavior.recentTrend()
);
}
private void sendToDlq(OrderCompletedEvent event, Exception error) {
Map<String, Object> dlqPayload = Map.of(
"originalEvent", event,
"errorMessage", error.getMessage(),
"errorType", error.getClass().getSimpleName(),
"failedAt", System.currentTimeMillis()
);
kafkaTemplate.send(AiEventTopics.AI_PROCESSING_DLQ,
event.getUserId(), dlqPayload);
}
private boolean isNonRetryableError(Exception e) {
// 业务逻辑异常不重试,技术异常(网络超时等)重试
return e instanceof IllegalArgumentException
|| e instanceof IllegalStateException;
}
}七、错误处理和重试策略
AI 服务调用可能失败,重试策略要配置合理:
@Configuration
public class KafkaConsumerConfig {
/**
* 配置消费者的重试策略和死信队列处理
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object>
kafkaListenerContainerFactory(
ConsumerFactory<String, Object> consumerFactory,
KafkaTemplate<String, Object> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
// 配置重试策略
DefaultErrorHandler errorHandler = new DefaultErrorHandler(
// 死信队列处理器
new DeadLetterPublishingRecoverer(kafkaTemplate,
(record, exception) -> new TopicPartition(
AiEventTopics.AI_PROCESSING_DLQ,
record.partition()
)
),
// 重试策略:指数退避,最多重试 3 次
new ExponentialBackOffWithMaxRetries(3)
);
// 不重试的异常类型
errorHandler.addNotRetryableExceptions(
IllegalArgumentException.class,
NullPointerException.class
);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
}7.1 死信队列处理
死信队列里的消息需要有人工或自动处理机制:
@Service
@Slf4j
public class AiDlqProcessor {
@KafkaListener(
topics = AiEventTopics.AI_PROCESSING_DLQ,
groupId = "ai-dlq-processor"
)
public void processDlq(
@Payload Map<String, Object> dlqPayload,
Acknowledgment acknowledgment) {
String errorType = (String) dlqPayload.get("errorType");
Long failedAt = (Long) dlqPayload.get("failedAt");
log.warn("处理死信消息,错误类型: {}, 失败时间: {}",
errorType, Instant.ofEpochMilli(failedAt));
// 记录告警(接入 PagerDuty 或企业微信告警)
alertService.sendAlert(
AlertLevel.WARNING,
"AI 处理失败:" + errorType,
dlqPayload.toString()
);
// 对于某些错误类型,可以尝试降级处理(如使用规则引擎兜底)
if ("OpenAiException".equals(errorType)) {
Object originalEvent = dlqPayload.get("originalEvent");
fallbackRecommendationService.generateRuleBased(originalEvent);
}
acknowledgment.acknowledge();
}
}八、结果回写和通知
AI 处理完后,结果需要写入数据库,并通知前端:
@Service
@Slf4j
public class RecommendationReadyConsumer {
private final NotificationService notificationService;
private final RecommendationCacheService cacheService;
@KafkaListener(
topics = AiEventTopics.RECOMMENDATION_READY,
groupId = "recommendation-notifier"
)
public void handleRecommendationReady(
@Payload RecommendationReadyEvent event,
Acknowledgment acknowledgment) {
try {
// 更新缓存(Redis)
cacheService.updateUserRecommendations(
event.getUserId(),
event.getRecommendations()
);
// 通过 WebSocket 推送给在线用户
notificationService.pushToUser(
event.getUserId(),
NotificationEvent.builder()
.type("RECOMMENDATIONS_UPDATED")
.data(Map.of(
"count", event.getRecommendations().size(),
"preview", event.getRecommendations().stream()
.limit(3)
.toList()
))
.build()
);
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("推荐通知处理失败: {}", e.getMessage());
acknowledgment.acknowledge(); // 通知失败不重试,只记录日志
}
}
}九、消费者性能调优
AI 消费者的性能瓶颈通常在 AI API 调用本身(每次 800-2000ms)。优化思路:
9.1 批量处理
/**
* 批量处理:一次拉取多条事件,但每条单独调用 AI(并行)
* 适合高吞吐量场景
*/
@KafkaListener(
topics = AiEventTopics.USER_BROWSED,
groupId = "ai-behavior-analyzer",
containerFactory = "batchListenerContainerFactory"
)
public void handleUserBrowsedBatch(
@Payload List<UserBrowsedEvent> events,
Acknowledgment acknowledgment) {
// 按用户去重(同一批次内同一用户的行为合并处理)
Map<String, List<UserBrowsedEvent>> byUser = events.stream()
.collect(Collectors.groupingBy(UserBrowsedEvent::getUserId));
// 并行处理每个用户的行为
List<CompletableFuture<Void>> futures = byUser.entrySet().stream()
.map(entry -> CompletableFuture.runAsync(() ->
processUserBehavior(entry.getKey(), entry.getValue())
))
.toList();
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
acknowledgment.acknowledge();
}9.2 背压控制
# application.yml
spring:
kafka:
consumer:
group-id: ai-personalization
auto-offset-reset: earliest
enable-auto-commit: false
max-poll-records: 10 # 每次拉取最多10条,防止 AI 处理积压
max-poll-interval-ms: 300000 # AI 处理最多5分钟(比默认的5分钟还需要留余量)
fetch-max-wait-ms: 500
listener:
concurrency: 4 # 4 个消费者线程
idle-event-interval: 60000 # 空闲时每60秒打一次日志十、监控和可观测性
事件驱动架构的监控比同步调用更复杂,需要追踪整条事件链:
@Aspect
@Component
@Slf4j
public class AiEventTracingAspect {
@Around("@annotation(kafkaListener)")
public Object traceConsumer(ProceedingJoinPoint pjp, KafkaListener kafkaListener)
throws Throwable {
long startTime = System.currentTimeMillis();
String topic = kafkaListener.topics()[0];
// 提取事件 ID(用于链路追踪)
Object[] args = pjp.getArgs();
String eventId = extractEventId(args);
MDC.put("eventId", eventId);
MDC.put("topic", topic);
try {
Object result = pjp.proceed();
long elapsed = System.currentTimeMillis() - startTime;
log.info("事件处理完成,topic: {}, eventId: {}, 耗时: {}ms",
topic, eventId, elapsed);
// 上报 Prometheus 指标
aiEventProcessingTimer
.tag("topic", topic)
.tag("status", "success")
.record(elapsed, TimeUnit.MILLISECONDS);
return result;
} catch (Exception e) {
long elapsed = System.currentTimeMillis() - startTime;
log.error("事件处理失败,topic: {}, eventId: {}, 耗时: {}ms, error: {}",
topic, eventId, elapsed, e.getMessage());
aiEventProcessingTimer
.tag("topic", topic)
.tag("status", "failure")
.record(elapsed, TimeUnit.MILLISECONDS);
throw e;
} finally {
MDC.clear();
}
}
}十一、改造效果
改造前后的关键指标对比:
| 指标 | 改造前(同步 AI) | 改造后(事件驱动) |
|---|---|---|
| 下单接口 P50 延迟 | 1,850ms | 85ms |
| 下单接口 P99 延迟 | 4,200ms | 230ms |
| AI 服务不可用时下单是否受影响 | 是(超时报错) | 否(只影响推荐) |
| 推荐生成延迟(下单后到推荐出现) | 同步(即时) | 异步(平均3.2秒) |
| 每日 AI 处理量 | 受下单 QPS 限制 | 独立扩展,不受限制 |
下单延迟从 P50 1850ms 降到 85ms,这个改进非常显著。代价是推荐从「即时」变成了「3 秒内」,但这个延迟对推荐场景完全可以接受。
十二、小结
事件驱动的 AI 工作流,本质上是一个「解耦」的设计决策:把不需要同步等待结果的 AI 处理,从主流程里剥离出来,用事件异步驱动。
适合事件驱动的 AI 场景:
- 个性化推荐(不需要即时,下单后 3 秒内够用)
- 内容审核(不阻塞内容发布,后处理)
- 用户行为分析(周期性更新用户 profile)
- 通知文案个性化(通知发出前异步生成个性化文案)
不适合事件驱动的 AI 场景:
- 实时对话(必须同步等待 AI 回答)
- AI 生成内容是主流程的必要输入(比如 AI 生成的订单号或验证码)
- 强一致性要求的写操作
搞清楚哪些 AI 调用需要同步、哪些可以异步,是这类架构决策的核心判断。
