AI应用的事件驱动架构:用Kafka解耦AI处理流水线
AI应用的事件驱动架构:用Kafka解耦AI处理流水线
那45分钟的雪崩
刘刚还记得2025年6月那个令人痛苦的下午。
他是某金融科技公司的Java架构师,团队搭建了一套AI合规审查系统——用户提交合规文档,系统同步调用AI分析,5分钟内返回审查报告。架构很简单:HTTP请求 → AI分析服务 → 返回结果。
618大促期间,下午3点,提交量突然飙升到平时的8倍。
第一个请求超时。客户端重试。又超时。再重试。
雪崩开始了:
- AI分析服务的线程全部被占满,新请求进不来
- 等待队列撑满后,Tomcat开始拒绝连接
- 上游API网关收到大量连接拒绝,开始告警
- 运维人工介入,发现AI服务已经完全不可用
故障持续了45分钟,可用性跌到不到70%。
事后复盘,刘刚在白板上写下了问题的根本原因:同步处理架构下,AI分析的长尾延迟(每次3~5秒)直接成为整个系统的瓶颈。当并发一高,所有请求都在等待AI结果,没有任何缓冲。
于是团队决定:引入Kafka,把AI处理改造成异步流水线。
改造后上线,618高峰期再次到来——系统可用性从95%提升到99.9%,AI处理吞吐量提升了15倍。
这篇文章,就是他们的完整改造方案。
一、AI场景的事件驱动适用性分析
1.1 什么时候用同步,什么时候用异步
不是所有AI场景都需要事件驱动。先判断场景:
推荐使用事件驱动(Kafka)的场景:
| 场景 | AI处理时间 | 理由 |
|---|---|---|
| 文档批量导入(Embedding) | 1~10秒/文档 | 需要解耦,支持重试 |
| 合规/审核分析 | 3~30秒 | 不阻塞用户操作 |
| 报告生成 | 10~120秒 | 异步后台处理 |
| 用户画像更新 | 2~5秒 | 后台定期更新 |
| 多模型A/B测试 | 1~10秒 | 并行处理,互不影响 |
不需要事件驱动的场景:
- 实时对话(用户等待AI回复)
- 代码补全(毫秒级响应要求)
- 简单的分类/打标签(< 500ms)
二、系统架构:Kafka解耦AI处理流水线
2.1 整体架构图
2.2 Kafka Topic设计规范
| Topic名称 | 分区数 | 保留时间 | 说明 |
|---|---|---|---|
documents-uploaded | 12 | 7天 | 文档上传触发事件 |
chunks-created | 24 | 3天 | 分块完成,准备Embedding |
embeddings-generated | 24 | 3天 | 向量生成完成 |
ai-analysis-requests | 12 | 7天 | AI分析任务 |
ai-analysis-completed | 12 | 30天 | AI分析结果 |
analysis-dlq | 3 | 30天 | 死信队列,人工处理 |
三、Kafka + Spring AI完整实现
3.1 Maven依赖
<!-- pom.xml -->
<dependencies>
<!-- Spring Boot Web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- Spring AI OpenAI -->
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-openai-spring-boot-starter</artifactId>
<version>1.0.0</version>
</dependency>
<!-- Kafka Avro序列化(Schema Registry) -->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.7.0</version>
</dependency>
<!-- Spring Data Redis(任务状态存储) -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Micrometer Kafka指标 -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
</dependencies>3.2 Kafka生产者配置
// KafkaProducerConfig.java
package com.laozhang.kafka.config;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
// 可靠性配置
// acks=all:所有ISR副本确认才算成功(最高可靠性)
props.put(ProducerConfig.ACKS_CONFIG, "all");
// 幂等性:exactly-once语义,防止网络重试导致重复消息
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// 重试配置
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000);
// 批量发送配置(提高吞吐量)
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 16KB批量
props.put(ProducerConfig.LINGER_MS_CONFIG, 5); // 等5ms凑批
// 压缩(减少网络传输)
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
// 请求超时
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
KafkaTemplate<String, Object> template =
new KafkaTemplate<>(producerFactory());
// 全局默认Topic(可被每次发送覆盖)
template.setDefaultTopic("ai-events");
return template;
}
}3.3 消息事件定义
// DocumentUploadedEvent.java
package com.laozhang.kafka.event;
import lombok.Builder;
/**
* 文档上传完成事件
* 触发:API服务接收文档后发布
* 消费者:分块Worker、AI分析Worker(并行处理)
*/
@Builder
public record DocumentUploadedEvent(
String eventId, // 事件唯一ID(UUID)
String taskId, // 业务任务ID
String documentId, // 文档ID
String knowledgeBaseId, // 目标知识库ID
String storageKey, // 对象存储Key(文档实际存在OSS)
String fileName, // 原始文件名
long fileSizeBytes, // 文件大小(字节)
String mimeType, // 文件类型
String userId, // 提交用户ID
String priority, // 优先级:normal/high/critical
long eventTimestamp, // 事件时间(Unix毫秒)
int retryCount, // 重试次数(初始0)
String traceId // 分布式追踪ID
) {}
// ChunkCreatedEvent.java
@Builder
public record ChunkCreatedEvent(
String eventId,
String parentDocumentId, // 来自哪个文档
String chunkId,
String content, // 分块内容
int chunkIndex,
int totalChunks,
String knowledgeBaseId,
long eventTimestamp,
String traceId
) {}
// EmbeddingGeneratedEvent.java
@Builder
public record EmbeddingGeneratedEvent(
String eventId,
String chunkId,
String documentId,
float[] embedding, // 向量数据
String embeddingModel,
int dimensions,
String knowledgeBaseId,
long eventTimestamp,
String traceId
) {}3.4 文档处理入口:API服务发布事件
// DocumentSubmitService.java
package com.laozhang.kafka.service;
import com.laozhang.kafka.event.DocumentUploadedEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@Service
@Slf4j
@RequiredArgsConstructor
public class DocumentSubmitService {
private final KafkaTemplate<String, Object> kafkaTemplate;
private final TaskStatusRepository taskStatusRepository;
private final ObjectStorageService storageService;
private static final String TOPIC_DOCUMENTS_UPLOADED = "documents-uploaded";
/**
* 接收文档提交请求,异步化处理
*
* @return taskId(客户端用于轮询状态)
*/
public String submitDocument(DocumentSubmitRequest request) {
String taskId = "task_" + UUID.randomUUID().toString().replace("-", "");
String documentId = "doc_" + UUID.randomUUID().toString().replace("-", "");
// 1. 上传文件到对象存储
String storageKey = storageService.upload(
request.fileContent(), request.fileName());
// 2. 初始化任务状态
taskStatusRepository.create(TaskStatus.builder()
.taskId(taskId)
.documentId(documentId)
.status("queued")
.progress(0)
.submittedAt(System.currentTimeMillis())
.build());
// 3. 发布事件到Kafka(关键:用documentId作为消息Key,确保同一文档的事件有序)
DocumentUploadedEvent event = DocumentUploadedEvent.builder()
.eventId(UUID.randomUUID().toString())
.taskId(taskId)
.documentId(documentId)
.knowledgeBaseId(request.knowledgeBaseId())
.storageKey(storageKey)
.fileName(request.fileName())
.fileSizeBytes(request.fileContent().length)
.mimeType(request.mimeType())
.userId(request.userId())
.priority(request.priority() != null ? request.priority() : "normal")
.eventTimestamp(System.currentTimeMillis())
.retryCount(0)
.traceId(getCurrentTraceId())
.build();
CompletableFuture<SendResult<String, Object>> future =
kafkaTemplate.send(TOPIC_DOCUMENTS_UPLOADED, documentId, event);
// 异步处理发送结果(不阻塞响应)
future.whenComplete((result, ex) -> {
if (ex == null) {
log.info("文档上传事件发布成功: taskId={}, documentId={}, " +
"partition={}, offset={}",
taskId, documentId,
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
} else {
log.error("文档上传事件发布失败: taskId={}", taskId, ex);
// 降级:直接更新任务状态为失败
taskStatusRepository.updateStatus(taskId, "failed",
"事件发布失败,请重试");
}
});
log.info("文档提交成功,异步处理中: taskId={}", taskId);
return taskId;
}
private String getCurrentTraceId() {
// 从MDC或OTel获取当前TraceId
return org.slf4j.MDC.get("traceId");
}
}四、AI处理流水线:文档分块→Embedding→存储
4.1 分块Worker(消费documents-uploaded事件)
// DocumentChunkingConsumer.java
package com.laozhang.kafka.consumer;
import com.laozhang.kafka.event.*;
import com.laozhang.kafka.service.*;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@Slf4j
@RequiredArgsConstructor
public class DocumentChunkingConsumer {
private final ObjectStorageService storageService;
private final RecursiveTextSplitter textSplitter;
private final ChunkEventPublisher chunkEventPublisher;
private final TaskStatusRepository taskStatusRepository;
/**
* 消费文档上传事件,执行文本分块
*
* concurrency = "3":启动3个消费者实例并行处理
* ackMode = MANUAL:手动确认,确保分块成功后才提交offset
*/
@KafkaListener(
topics = "documents-uploaded",
groupId = "chunking-workers",
concurrency = "3",
containerFactory = "manualAckKafkaListenerContainerFactory"
)
public void processDocumentChunking(
@Payload DocumentUploadedEvent event,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
Acknowledgment acknowledgment) {
log.info("开始处理文档分块: documentId={}, partition={}, offset={}",
event.documentId(), partition, offset);
try {
// 更新任务状态
taskStatusRepository.updateStatus(
event.taskId(), "processing", 10, "文档分块中...");
// 从对象存储下载文档内容
String documentContent =
storageService.downloadAsText(event.storageKey());
// 执行递归分块
List<RecursiveTextSplitter.TextChunk> chunks =
textSplitter.split(documentContent,
RecursiveTextSplitter.SplitConfig.defaultConfig(),
event.documentId());
log.info("文档分块完成: documentId={}, 共{}块",
event.documentId(), chunks.size());
// 发布分块完成事件(每块一个消息)
chunkEventPublisher.publishChunkEvents(
event.documentId(),
event.knowledgeBaseId(),
event.traceId(),
chunks
);
// 更新进度
taskStatusRepository.updateStatus(
event.taskId(), "processing", 30, "分块完成,向量化中...");
// 手动确认:告诉Kafka这条消息已成功处理
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("文档分块失败: documentId={}", event.documentId(), e);
// 不确认 = Kafka会重新投递
// 注意:如果一直失败,会触发死信队列(DLT)
acknowledgment.nack(5000); // 5秒后重试
}
}
}4.2 Embedding Worker(消费chunks-created事件)
// EmbeddingGenerationConsumer.java
package com.laozhang.kafka.consumer;
import com.laozhang.kafka.event.*;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.embedding.EmbeddingModel;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.stream.Collectors;
@Component
@Slf4j
@RequiredArgsConstructor
public class EmbeddingGenerationConsumer {
private final EmbeddingModel embeddingModel;
private final EmbeddingEventPublisher embeddingEventPublisher;
/**
* 批量处理Embedding生成
* batchListener = true:批量消费,减少API调用次数
* pollTimeout:批量等待时间(最多等2秒凑一批)
*/
@KafkaListener(
topics = "chunks-created",
groupId = "embedding-workers",
concurrency = "2",
containerFactory = "batchKafkaListenerContainerFactory"
)
public void processEmbeddingGeneration(
@Payload List<ChunkCreatedEvent> events,
Acknowledgment acknowledgment) {
if (events.isEmpty()) {
acknowledgment.acknowledge();
return;
}
log.info("批量处理Embedding: {}条消息", events.size());
try {
// 批量提取文本内容
List<String> texts = events.stream()
.map(ChunkCreatedEvent::content)
.collect(Collectors.toList());
// 批量生成Embedding(减少LLM API调用次数)
// Spring AI的EmbeddingModel支持批量输入
List<float[]> embeddings = embeddingModel.embed(texts);
// 发布向量生成完成事件
for (int i = 0; i < events.size(); i++) {
ChunkCreatedEvent chunkEvent = events.get(i);
float[] embedding = embeddings.get(i);
embeddingEventPublisher.publish(
EmbeddingGeneratedEvent.builder()
.eventId(java.util.UUID.randomUUID().toString())
.chunkId(chunkEvent.chunkId())
.documentId(chunkEvent.parentDocumentId())
.embedding(embedding)
.embeddingModel("text-embedding-3-small")
.dimensions(embedding.length)
.knowledgeBaseId(chunkEvent.knowledgeBaseId())
.eventTimestamp(System.currentTimeMillis())
.traceId(chunkEvent.traceId())
.build()
);
}
log.info("批量Embedding生成完成: {}条", events.size());
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("批量Embedding生成失败", e);
acknowledgment.nack(10000); // 10秒后重试
}
}
}4.3 向量存储Worker
// VectorStorageConsumer.java
package com.laozhang.kafka.consumer;
import com.laozhang.kafka.event.EmbeddingGeneratedEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.document.Document;
import org.springframework.ai.vectorstore.VectorStore;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Component
@Slf4j
@RequiredArgsConstructor
public class VectorStorageConsumer {
private final VectorStore vectorStore;
private final TaskProgressTracker progressTracker;
@KafkaListener(
topics = "embeddings-generated",
groupId = "vector-storage-workers",
concurrency = "2",
containerFactory = "batchKafkaListenerContainerFactory"
)
public void storeEmbeddings(
@Payload List<EmbeddingGeneratedEvent> events,
Acknowledgment acknowledgment) {
try {
// 转换为Spring AI Document格式
List<Document> documents = events.stream()
.map(event -> {
Document doc = new Document(event.chunkId(), Map.of(
"chunk_id", event.chunkId(),
"document_id", event.documentId(),
"kb_id", event.knowledgeBaseId(),
"embedding_model", event.embeddingModel(),
"created_at", String.valueOf(event.eventTimestamp())
));
// 注意:Spring AI VectorStore会处理向量存储
// 这里的embedding已在EmbeddingGeneratedEvent中
return doc;
})
.collect(Collectors.toList());
// 批量写入向量数据库
vectorStore.add(documents);
// 更新文档处理进度
progressTracker.updateBatchProgress(events);
log.info("批量向量存储完成: {}条", events.size());
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("向量存储失败", e);
acknowledgment.nack(5000);
}
}
}五、幂等性:AI任务的重复处理防护
5.1 幂等性的重要性
Kafka的消息投递保证是"at-least-once"(至少一次)。网络抖动、Consumer重启等情况下,同一条消息可能被消费多次。对AI任务来说,重复处理意味着:
- 多次调用LLM API,浪费Token费用
- 向量库中出现重复数据,影响检索质量
- 任务完成回调触发多次,用户体验异常
5.2 幂等性实现(基于Redis)
// IdempotencyGuard.java
package com.laozhang.kafka.idempotency;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.function.Supplier;
@Component
@Slf4j
@RequiredArgsConstructor
public class IdempotencyGuard {
private final StringRedisTemplate redisTemplate;
// 幂等Key的TTL(与Kafka消息保留时间相同,保证不重复处理)
private static final Duration IDEMPOTENCY_TTL = Duration.ofDays(7);
/**
* 幂等执行:确保相同的eventId只处理一次
*
* @param eventId 事件唯一ID
* @param stage 处理阶段(chunking/embedding/storage)
* @param task 实际处理任务
* @return true=执行了任务,false=重复消息已跳过
*/
public boolean executeOnce(String eventId,
String stage,
Runnable task) {
String idempotencyKey =
String.format("idempotency:%s:%s", stage, eventId);
// SETNX(不存在则设置)+ 过期时间:原子操作
Boolean isNew = redisTemplate.opsForValue()
.setIfAbsent(idempotencyKey, "processing", IDEMPOTENCY_TTL);
if (Boolean.FALSE.equals(isNew)) {
// 已经处理过(或正在处理中)
String status = redisTemplate.opsForValue()
.get(idempotencyKey);
log.info("重复消息,跳过处理: eventId={}, stage={}, status={}",
eventId, stage, status);
return false;
}
try {
// 执行实际任务
task.run();
// 更新状态为已完成
redisTemplate.opsForValue()
.set(idempotencyKey, "completed", IDEMPOTENCY_TTL);
return true;
} catch (Exception e) {
// 失败时删除Key,允许重试
redisTemplate.delete(idempotencyKey);
throw e;
}
}
/**
* 带返回值的幂等执行
*/
public <T> java.util.Optional<T> executeOnceWithResult(
String eventId,
String stage,
Supplier<T> task) {
String idempotencyKey =
String.format("idempotency:%s:%s", stage, eventId);
String resultKey =
String.format("idempotency:result:%s:%s", stage, eventId);
Boolean isNew = redisTemplate.opsForValue()
.setIfAbsent(idempotencyKey, "processing", IDEMPOTENCY_TTL);
if (Boolean.FALSE.equals(isNew)) {
// 尝试获取之前缓存的结果
String cachedResult = redisTemplate.opsForValue().get(resultKey);
log.info("重复消息,返回缓存结果: eventId={}, stage={}", eventId, stage);
return java.util.Optional.ofNullable(
deserialize(cachedResult));
}
try {
T result = task.get();
// 缓存结果(下次重复调用可以直接返回)
redisTemplate.opsForValue()
.set(resultKey, serialize(result), IDEMPOTENCY_TTL);
redisTemplate.opsForValue()
.set(idempotencyKey, "completed", IDEMPOTENCY_TTL);
return java.util.Optional.ofNullable(result);
} catch (Exception e) {
redisTemplate.delete(idempotencyKey);
throw e;
}
}
private String serialize(Object obj) {
return obj != null ? obj.toString() : null;
}
@SuppressWarnings("unchecked")
private <T> T deserialize(String str) {
return (T) str;
}
}5.3 在Consumer中使用幂等性守卫
// IdempotentEmbeddingConsumer.java
@Component
@RequiredArgsConstructor
@Slf4j
public class IdempotentEmbeddingConsumer {
private final IdempotencyGuard idempotencyGuard;
private final EmbeddingModel embeddingModel;
private final EmbeddingEventPublisher publisher;
@KafkaListener(topics = "chunks-created", groupId = "embedding-workers")
public void processWithIdempotency(
@Payload ChunkCreatedEvent event,
Acknowledgment acknowledgment) {
boolean executed = idempotencyGuard.executeOnce(
event.eventId(),
"embedding",
() -> {
// 生成向量
float[] embedding = embeddingModel.embed(event.content());
// 发布向量完成事件
publisher.publish(EmbeddingGeneratedEvent.builder()
.eventId(java.util.UUID.randomUUID().toString())
.chunkId(event.chunkId())
.embedding(embedding)
.build());
log.info("Embedding生成并发布: chunkId={}", event.chunkId());
}
);
if (executed) {
log.debug("幂等执行成功: eventId={}", event.eventId());
} else {
log.debug("重复消息已跳过: eventId={}", event.eventId());
}
// 无论是否执行,都确认消息(避免无限重投)
acknowledgment.acknowledge();
}
}六、死信队列:AI处理失败的兜底策略
6.1 DLT配置
// KafkaConsumerConfig.java
package com.laozhang.kafka.config;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
import org.springframework.kafka.listener.DefaultErrorHandler;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.util.backoff.ExponentialBackOff;
import java.util.HashMap;
import java.util.Map;
@Configuration
@Slf4j
public class KafkaConsumerConfig {
@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public ConsumerFactory<String, Object> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES,
"com.laozhang.kafka.event");
// 手动提交offset
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
// 从最早的消息开始消费(新消费组启动时)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 批量拉取配置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 1024);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
return new DefaultKafkaConsumerFactory<>(props);
}
/**
* 配置死信队列(DLT)处理器
* 策略:指数退避重试3次,全部失败后发送到DLT
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object>
kafkaListenerContainerFactory(
ConsumerFactory<String, Object> consumerFactory,
KafkaTemplate<String, Object> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.getContainerProperties()
.setAckMode(
org.springframework.kafka.listener.ContainerProperties
.AckMode.MANUAL_IMMEDIATE);
// 死信队列发布器:失败消息发到 {topic}.DLT
DeadLetterPublishingRecoverer recoverer =
new DeadLetterPublishingRecoverer(kafkaTemplate);
// 指数退避:1秒→2秒→4秒,最多重试3次
ExponentialBackOff backOff = new ExponentialBackOff(1000L, 2.0);
backOff.setMaxAttempts(3);
DefaultErrorHandler errorHandler =
new DefaultErrorHandler(recoverer, backOff);
// 某些异常不重试,直接进DLT(避免浪费时间)
errorHandler.addNotRetryableExceptions(
IllegalArgumentException.class, // 参数错误,重试没用
org.springframework.kafka.support.serializer
.DeserializationException.class // 反序列化失败,重试没用
);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
}6.2 死信队列消费者(人工干预接口)
// DeadLetterQueueConsumer.java
package com.laozhang.kafka.consumer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
@Component
@Slf4j
@RequiredArgsConstructor
public class DeadLetterQueueConsumer {
private final DlqAlertService alertService;
private final DlqRepository dlqRepository;
/**
* 消费所有DLT主题(通过通配符匹配)
*/
@KafkaListener(
topicPattern = ".*\\.DLT",
groupId = "dlq-monitors"
)
public void processDlqMessage(
@Payload String payload,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.ORIGINAL_OFFSET) long originalOffset,
@Header(KafkaHeaders.EXCEPTION_MESSAGE) String exceptionMessage,
Acknowledgment acknowledgment) {
log.error("死信队列收到失败消息: topic={}, originalOffset={}, error={}",
topic, originalOffset, exceptionMessage);
// 1. 持久化到数据库,供运维人员查看
DlqRecord record = DlqRecord.builder()
.topic(topic)
.originalOffset(originalOffset)
.payload(payload)
.exceptionMessage(exceptionMessage)
.receivedAt(System.currentTimeMillis())
.status("pending_review")
.build();
dlqRepository.save(record);
// 2. 发送告警(飞书/钉钉)
alertService.sendDlqAlert(topic, exceptionMessage,
dlqRepository.countPendingByTopic(topic));
acknowledgment.acknowledge();
}
}6.3 DLQ管理API(运维接口)
// DlqManagementController.java
package com.laozhang.kafka.controller;
import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
@RestController
@RequestMapping("/api/admin/dlq")
@RequiredArgsConstructor
public class DlqManagementController {
private final DlqRepository dlqRepository;
private final KafkaTemplate<String, Object> kafkaTemplate;
/**
* 查看DLQ中的失败消息
*/
@GetMapping("/messages")
public ResponseEntity<?> listDlqMessages(
@RequestParam(defaultValue = "0") int page,
@RequestParam(defaultValue = "20") int size,
@RequestParam(required = false) String topic) {
return ResponseEntity.ok(
dlqRepository.findByTopic(topic, page, size));
}
/**
* 手动重试指定消息
*/
@PostMapping("/messages/{recordId}/retry")
public ResponseEntity<?> retryMessage(@PathVariable Long recordId) {
DlqRecord record = dlqRepository.findById(recordId)
.orElseThrow(() -> new RuntimeException("记录不存在"));
// 将消息重新发送到原始Topic
String originalTopic = record.getTopic().replace(".DLT", "");
kafkaTemplate.send(originalTopic, record.getPayload());
record.setStatus("retried");
dlqRepository.save(record);
return ResponseEntity.ok("消息已重新投递到: " + originalTopic);
}
/**
* 批量放弃处理(标记为已忽略)
*/
@PostMapping("/messages/dismiss")
public ResponseEntity<?> dismissMessages(
@RequestBody DismissRequest request) {
dlqRepository.updateStatusBatch(
request.recordIds(), "dismissed");
return ResponseEntity.ok("已忽略 " + request.recordIds().size() + " 条消息");
}
record DismissRequest(java.util.List<Long> recordIds) {}
}七、背压控制:队列堆积时保护AI服务
7.1 背压检测与自适应限速
// BackpressureAwareConsumer.java
package com.laozhang.kafka.consumer;
import io.micrometer.core.instrument.MeterRegistry;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.common.TopicPartition;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@Component
@Slf4j
@RequiredArgsConstructor
public class BackpressureAwareConsumer {
private final AdminClient kafkaAdminClient;
private final MeterRegistry meterRegistry;
// 背压开关:当队列堆积过多时自动降速
private final AtomicBoolean backpressureActive = new AtomicBoolean(false);
private final AtomicLong lagThreshold = new AtomicLong(10000); // 10000条消息
@KafkaListener(
topics = "ai-analysis-requests",
groupId = "ai-analysis-workers",
concurrency = "5"
)
public void processAiAnalysis(
@Payload DocumentAnalysisEvent event,
Acknowledgment acknowledgment) {
// 如果背压激活,延迟处理(给下游AI服务喘息时间)
if (backpressureActive.get()) {
log.warn("背压激活,延迟处理: eventId={}", event.eventId());
try {
Thread.sleep(2000); // 延迟2秒
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
try {
// 调用AI分析服务
analyzeDocument(event);
acknowledgment.acknowledge();
} catch (AiServiceOverloadException e) {
// AI服务过载,触发背压
activateBackpressure();
acknowledgment.nack(5000);
}
}
/**
* 每30秒检查一次消费者Lag,决定是否激活背压
*/
@Scheduled(fixedDelay = 30000)
public void checkConsumerLag() {
try {
long currentLag = measureConsumerLag(
"ai-analysis-workers", "ai-analysis-requests");
meterRegistry.gauge(
"kafka.consumer.lag",
java.util.List.of(
io.micrometer.core.instrument.Tag.of("group", "ai-analysis-workers"),
io.micrometer.core.instrument.Tag.of("topic", "ai-analysis-requests")
),
currentLag
);
if (currentLag > lagThreshold.get()) {
if (!backpressureActive.get()) {
log.warn("消费Lag超过阈值({}),激活背压控制: lag={}",
lagThreshold.get(), currentLag);
activateBackpressure();
}
} else if (currentLag < lagThreshold.get() / 2) {
if (backpressureActive.get()) {
log.info("消费Lag恢复正常,关闭背压控制: lag={}", currentLag);
deactivateBackpressure();
}
}
} catch (Exception e) {
log.error("Lag检查失败", e);
}
}
private void activateBackpressure() {
backpressureActive.set(true);
log.warn("背压控制已激活");
}
private void deactivateBackpressure() {
backpressureActive.set(false);
log.info("背压控制已关闭");
}
private long measureConsumerLag(String groupId, String topic) throws Exception {
// 通过Kafka Admin API获取消费者Lag
// 简化实现...
return 0L;
}
private void analyzeDocument(DocumentAnalysisEvent event) {
// AI分析逻辑...
}
}八、消息顺序:有序AI任务的分区策略
8.1 什么时候需要保证顺序
| 场景 | 是否需要顺序 | 理由 |
|---|---|---|
| 同一文档的分块 → Embedding → 存储 | 需要 | 必须按顺序处理 |
| 不同文档的Embedding | 不需要 | 独立任务,并行更快 |
| 用户对话历史写入 | 需要 | 对话顺序影响上下文 |
| 批量报告生成 | 不需要 | 各报告独立 |
8.2 分区策略实现
// OrderedMessageProducer.java
package com.laozhang.kafka.producer;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
@Slf4j
@RequiredArgsConstructor
public class OrderedMessageProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;
/**
* 发送需要保证顺序的消息
* 关键:使用documentId作为消息Key
* Kafka保证相同Key的消息在同一分区,同一分区严格有序
*/
public void sendOrdered(String topic, String orderKey, Object message) {
kafkaTemplate.send(topic, orderKey, message)
.whenComplete((result, ex) -> {
if (ex == null) {
log.debug("有序消息发送成功: topic={}, key={}, partition={}",
topic, orderKey,
result.getRecordMetadata().partition());
} else {
log.error("有序消息发送失败: topic={}, key={}", topic, orderKey, ex);
}
});
}
/**
* 用户对话历史:按userId路由到固定分区,保证同一用户的对话有序
*/
public void sendConversationEvent(String userId, Object event) {
// 使用userId作为Key,同一用户的对话写入同一分区
sendOrdered("conversation-events", userId, event);
}
/**
* 文档处理流水线:按documentId路由,保证同一文档处理有序
*/
public void sendDocumentPipelineEvent(String documentId,
String topic,
Object event) {
sendOrdered(topic, documentId, event);
}
}九、Schema Registry:AI消息格式的版本管理
9.1 为什么需要Schema Registry
随着业务迭代,消息格式会变化。如果生产者和消费者的消息格式不兼容,会导致反序列化失败,造成消息积压。Schema Registry提供:
- 集中管理消息Schema
- 向后兼容性检查
- Schema版本控制
9.2 Avro Schema定义
// document-uploaded.avsc
{
"namespace": "com.laozhang.kafka.event",
"type": "record",
"name": "DocumentUploadedEvent",
"doc": "文档上传完成事件 - v2.1",
"fields": [
{"name": "eventId", "type": "string", "doc": "事件唯一ID"},
{"name": "taskId", "type": "string", "doc": "业务任务ID"},
{"name": "documentId", "type": "string", "doc": "文档ID"},
{"name": "knowledgeBaseId", "type": "string", "doc": "目标知识库ID"},
{"name": "storageKey", "type": "string", "doc": "对象存储Key"},
{"name": "fileName", "type": "string", "doc": "原始文件名"},
{"name": "fileSizeBytes", "type": "long", "doc": "文件大小(字节)"},
{"name": "mimeType", "type": "string", "doc": "文件MIME类型"},
{"name": "userId", "type": "string", "doc": "提交用户ID"},
{
"name": "priority",
"type": {"type": "enum", "name": "Priority",
"symbols": ["normal", "high", "critical"]},
"default": "normal",
"doc": "处理优先级"
},
{"name": "eventTimestamp", "type": "long", "doc": "事件时间(Unix毫秒)"},
{"name": "retryCount", "type": "int", "default": 0, "doc": "重试次数"},
{
"name": "traceId",
"type": ["null", "string"],
"default": null,
"doc": "分布式追踪ID(可选)"
},
{
"name": "aiAnalysisRequired",
"type": "boolean",
"default": true,
"doc": "是否需要AI分析(v2.1新增字段)"
}
]
}9.3 Spring Boot集成Schema Registry
# application.yml(Schema Registry配置)
spring:
kafka:
producer:
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
consumer:
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
# Schema Registry地址
schema.registry.url: http://schema-registry:8081
# 向后兼容:新消费者可以消费旧格式消息
specific.avro.reader: true
# 自动注册Schema(开发环境开启,生产环境关闭)
auto.register.schemas: true十、监控:Kafka Lag与AI处理延迟告警
10.1 Kafka Metrics配置
# Micrometer Kafka监控配置
management:
endpoints:
web:
exposure:
include: prometheus, health, info
metrics:
tags:
application: ai-pipeline
environment: production10.2 Prometheus告警规则
# kafka-alerts.yml
groups:
- name: kafka-ai-pipeline
rules:
# Consumer Lag告警:积压超过5000条
- alert: KafkaConsumerLagHigh
expr: |
kafka_consumergroup_lag_sum{
consumergroup=~".*-workers",
topic=~"documents-.*|embeddings-.*|ai-analysis-.*"
} > 5000
for: 5m
labels:
severity: warning
annotations:
summary: "Kafka消费者Lag过高"
description: "消费组 {{ $labels.consumergroup }} 在主题 {{ $labels.topic }} 的Lag为 {{ $value }}"
# AI处理延迟告警:P99超过30秒
- alert: AiProcessingLatencyHigh
expr: |
histogram_quantile(0.99,
rate(ai_document_processing_seconds_bucket[5m])
) > 30
for: 3m
labels:
severity: warning
annotations:
summary: "AI文档处理P99延迟超过30秒"
# 死信队列积累告警
- alert: DlqMessageAccumulation
expr: |
increase(kafka_topic_partitions_messages_in_total{
topic=~".*\\.DLT"
}[10m]) > 10
for: 5m
labels:
severity: critical
annotations:
summary: "死信队列在10分钟内积累超过10条消息"
description: "死信主题 {{ $labels.topic }} 可能有系统性错误"10.3 性能数据对比
刘刚团队改造后的完整对比数据:
| 指标 | 改造前(同步) | 改造后(Kafka异步) |
|---|---|---|
| 系统可用性 | 95% | 99.9% |
| AI处理吞吐量 | 240文档/小时 | 3600文档/小时 (+15x) |
| 高峰期响应时间 | 30秒(超时) | <1秒(提交成功) |
| 故障恢复时间 | 45分钟 | <5分钟(重启消费者) |
| AI服务错误导致的用户感知 | 直接失败 | 后台重试,用户无感知 |
| Token成本(重复调用) | 无重复 | 幂等保护,无重复 |
十一、FAQ
Q1:Kafka和RabbitMQ怎么选?
AI处理流水线推荐Kafka:消息量大(百万/天)、需要消息回溯(重新处理历史数据)、需要多消费者组并行消费同一个Topic。RabbitMQ适合:消息量小、对延迟极敏感(毫秒级)、需要复杂的路由规则。
Q2:幂等性用Redis,如果Redis挂了怎么办?
设置Redis高可用(Sentinel/Cluster)。Redis挂了的极端情况下,可能会有少量重复处理,对大多数AI任务来说(生成向量重复写一次)可以容忍。如果完全不能容忍,用数据库的唯一索引做幂等检查(性能低一些但绝对可靠)。
Q3:Kafka消息序列化用JSON还是Avro?
开发阶段用JSON(方便调试);生产环境推荐Avro + Schema Registry(更紧凑、有Schema版本管理)。Avro序列化比JSON小约40%,在高吞吐场景下节省显著的网络和存储开销。
Q4:concurrency设置多少?
不超过Topic的分区数(多了也没用)。建议:分区数 = 最大消费者实例数 * 2(留余量)。AI处理的CPU密集型任务,concurrency通常设置为CPU核心数;I/O密集型(LLM API调用),可以设置更高(比如CPU核心数的4倍)。
Q5:如何处理Kafka消息消费超时(max.poll.interval.ms)?
AI任务可能处理很久(几十秒),超过max.poll.interval.ms(默认5分钟)会被Kafka认为Consumer挂了并触发Rebalance。解决方案:1)把AI处理提交到线程池,主线程立即返回并手动提交offset;2)适当调大max.poll.interval.ms;3)减小max.poll.records,每次少拉点,确保处理时间可控。
Q6:DLT中的失败消息,什么时候该重试,什么时候该放弃?
应该重试:临时网络错误、AI服务临时过载、数据库连接超时。 应该放弃:消息格式错误(重试没用)、数据验证失败(内容有问题)、AI内容安全拒绝(不应该绕过安全检查)。 建议在DLT消费者中增加错误分类逻辑,自动判断是否值得重试。
总结
刘刚团队的改造之路揭示了事件驱动架构对AI应用的核心价值:
同步架构的本质问题:AI处理的长尾延迟(3~30秒)直接暴露给用户,高并发时形成"堆积→超时→雪崩"的恶性循环。
Kafka异步化的核心收益:
- 解耦:AI服务慢了,不影响用户提交请求
- 削峰:高峰期消息堆积在Kafka,AI服务按自己的节奏处理
- 韧性:AI服务挂了,消息在Kafka里等着,重启后继续处理,用户无感知
- 可观测:Lag指标清晰反映处理压力,提前预警
关键实现要点:
- 幂等性保护(基于Redis的SETNX)防止重复处理
- 死信队列(DLT)保证任何消息都不丢失
- 背压控制防止下游AI服务被压垮
- 手动ack确保消息可靠处理
从95%可用性到99.9%,这就是Kafka给AI应用带来的生产级稳定性。
