第2343篇:Java AI与消息队列集成——Kafka驱动的异步AI处理
2026/4/30大约 5 分钟
第2343篇:Java AI与消息队列集成——Kafka驱动的异步AI处理
适读人群:需要构建异步AI处理管道的Java工程师,关注AI应用解耦和高吞吐量设计的架构师 | 阅读时长:约18分钟 | 核心价值:掌握Kafka与Spring AI的集成模式,构建可扩展的异步AI处理系统
同步调用LLM有一个根本性的问题:调用方必须等待。
对于用户交互型应用,等是可以接受的(用户在看流式输出)。但对于后台处理型场景——批量分析用户评论、异步摘要文档、自动打标签——让HTTP请求同步等待5-10秒的LLM响应,既浪费连接资源,又让上游服务变得脆弱。
引入消息队列,AI处理变成异步的:生产者发送任务,消费者从队列取任务,调用LLM,结果回写或通知。生产者和AI处理完全解耦,LLM故障不影响生产者,处理速度可以独立扩展。
架构设计:Kafka + Spring AI
依赖配置
<!-- pom.xml -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-starter-model-openai</artifactId>
</dependency>
<!-- JSON序列化 -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency># application.yml
spring:
kafka:
bootstrap-servers: ${KAFKA_SERVERS:localhost:9092}
consumer:
group-id: ai-processor
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "com.example.ai.dto"
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
# 消费者并发配置
listener:
concurrency: 5 # 5个消费者线程并行处理
# AI处理配置
ai-processor:
max-retries: 3
retry-delay-ms: 2000消息定义
// AI任务消息
public record AiTask(
String taskId,
String taskType, // SUMMARIZE / CLASSIFY / SENTIMENT / CUSTOM
String content, // 待处理的内容
Map<String, String> metadata, // 额外元数据(如文档来源、用户ID等)
String callbackTopic, // 结果发送到哪个topic(可选)
int priority, // 优先级(0-10)
Instant createdAt
) {
public static AiTask summarize(String content, String sourceId) {
return new AiTask(
UUID.randomUUID().toString(),
"SUMMARIZE",
content,
Map.of("sourceId", sourceId),
"ai-summaries",
5,
Instant.now()
);
}
public static AiTask classify(String content, String documentId) {
return new AiTask(
UUID.randomUUID().toString(),
"CLASSIFY",
content,
Map.of("documentId", documentId),
"ai-classifications",
5,
Instant.now()
);
}
}
// AI结果消息
public record AiResult(
String taskId,
String taskType,
boolean success,
String result, // AI处理结果
String errorMessage, // 失败时的错误信息
long processingTimeMs,
Instant completedAt
) {}任务生产者
@Service
@RequiredArgsConstructor
@Slf4j
public class AiTaskProducer {
private final KafkaTemplate<String, AiTask> kafkaTemplate;
private static final String AI_TASKS_TOPIC = "ai-tasks";
/**
* 发送AI处理任务
*/
public CompletableFuture<String> submitTask(AiTask task) {
return kafkaTemplate.send(AI_TASKS_TOPIC, task.taskId(), task)
.thenApply(result -> {
log.info("AI任务已提交:taskId={}, partition={}, offset={}",
task.taskId(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
return task.taskId();
})
.exceptionally(ex -> {
log.error("AI任务提交失败:taskId={}", task.taskId(), ex);
throw new RuntimeException("任务提交失败:" + ex.getMessage(), ex);
});
}
/**
* 批量提交任务
*/
public List<String> submitBatch(List<AiTask> tasks) {
return tasks.stream()
.map(this::submitTask)
.map(CompletableFuture::join)
.toList();
}
}核心:AI任务消费者
@Service
@RequiredArgsConstructor
@Slf4j
public class AiTaskConsumer {
private final ChatClient.Builder chatClientBuilder;
private final KafkaTemplate<String, AiResult> resultKafkaTemplate;
private final Semaphore concurrencySemaphore = new Semaphore(10);
private final RateLimiter rateLimiter = RateLimiter.create(5.0);
@KafkaListener(
topics = "ai-tasks",
groupId = "ai-processor",
containerFactory = "aiTaskListenerContainerFactory"
)
public void handleAiTask(
@Payload AiTask task,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
Acknowledgment acknowledgment) {
log.info("收到AI任务:taskId={}, type={}, partition={}, offset={}",
task.taskId(), task.taskType(), partition, offset);
// 速率控制
rateLimiter.acquire();
try {
concurrencySemaphore.acquire();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("任务处理被中断:taskId={}", task.taskId());
// 不确认,让Kafka重新投递
return;
}
long start = System.currentTimeMillis();
try {
String result = processTask(task);
long duration = System.currentTimeMillis() - start;
// 发布成功结果
publishResult(task, true, result, null, duration);
// 确认消息(已成功处理)
acknowledgment.acknowledge();
log.info("AI任务完成:taskId={}, 耗时={}ms", task.taskId(), duration);
} catch (Exception e) {
long duration = System.currentTimeMillis() - start;
log.error("AI任务处理失败:taskId={}", task.taskId(), e);
// 发布失败结果
publishResult(task, false, null, e.getMessage(), duration);
// 确认消息(不重试,失败结果已记录)
// 如果需要重试,这里不acknowledge,让消息重新投递
acknowledgment.acknowledge();
} finally {
concurrencySemaphore.release();
}
}
private String processTask(AiTask task) {
ChatClient chatClient = chatClientBuilder.build();
return switch (task.taskType()) {
case "SUMMARIZE" -> summarize(chatClient, task.content());
case "CLASSIFY" -> classify(chatClient, task.content());
case "SENTIMENT" -> analyzeSentiment(chatClient, task.content());
case "CUSTOM" -> customProcess(chatClient, task);
default -> throw new IllegalArgumentException("未知任务类型:" + task.taskType());
};
}
private String summarize(ChatClient client, String content) {
// 内容太长时截断
String truncated = content.length() > 5000
? content.substring(0, 5000) + "..."
: content;
return client.prompt()
.system("你是文档摘要专家,用100-200字总结核心内容,语言简洁")
.user(truncated)
.call()
.content();
}
private String classify(ChatClient client, String content) {
return client.prompt()
.system("""
将文档分类到以下类别之一,只返回类别名:
技术文档 / 产品需求 / 用户反馈 / 会议记录 / 其他
""")
.user(content.substring(0, Math.min(1000, content.length())))
.call()
.content()
.trim();
}
private String analyzeSentiment(ChatClient client, String content) {
return client.prompt()
.system("分析文本情感,返回JSON:{sentiment: positive/negative/neutral, confidence: 0-1}")
.user(content)
.call()
.content();
}
private String customProcess(ChatClient client, AiTask task) {
String prompt = task.metadata().getOrDefault("prompt", "处理以下内容:");
return client.prompt()
.user(prompt + "\n\n" + task.content())
.call()
.content();
}
private void publishResult(AiTask task, boolean success, String result,
String error, long duration) {
String targetTopic = task.callbackTopic() != null
? task.callbackTopic()
: "ai-results";
AiResult aiResult = new AiResult(
task.taskId(),
task.taskType(),
success,
result,
error,
duration,
Instant.now()
);
resultKafkaTemplate.send(targetTopic, task.taskId(), aiResult);
}
}死信队列处理:持续失败的任务
@Service
@RequiredArgsConstructor
@Slf4j
public class DeadLetterQueueProcessor {
private final FailedTaskRepository failedTaskRepository;
@KafkaListener(topics = "ai-tasks-dlq", groupId = "ai-dlq-processor")
public void handleDeadLetter(AiTask task,
@Header(KafkaHeaders.EXCEPTION_MESSAGE) String exceptionMessage) {
log.error("任务进入死信队列:taskId={}, error={}", task.taskId(), exceptionMessage);
// 持久化到数据库,供人工排查
FailedTask failedTask = FailedTask.builder()
.taskId(task.taskId())
.taskType(task.taskType())
.content(task.content())
.errorMessage(exceptionMessage)
.failedAt(LocalDateTime.now())
.needsManualReview(true)
.build();
failedTaskRepository.save(failedTask);
// 发送告警
// alertService.sendAlert("AI任务进入死信队列:" + task.taskId());
}
}监控:消费者积压监控
@Component
@Slf4j
public class KafkaLagMonitor {
private final KafkaAdmin kafkaAdmin;
private final MeterRegistry meterRegistry;
@Scheduled(fixedDelay = 30000)
public void monitorLag() {
try (AdminClient adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {
// 获取消费者组的积压情况
ListConsumerGroupOffsetsResult offsetsResult =
adminClient.listConsumerGroupOffsets("ai-processor");
offsetsResult.partitionsToOffsetAndMetadata().get()
.forEach((tp, om) -> {
// 记录到Prometheus指标
meterRegistry.gauge("kafka.consumer.lag",
Tags.of("topic", tp.topic(), "partition", String.valueOf(tp.partition())),
om.offset());
});
} catch (Exception e) {
log.warn("监控Kafka积压失败", e);
}
}
}异步AI处理架构的核心价值在于:让AI处理从请求链路中解耦。上游服务不需要等待AI处理完成,系统整体的可用性和响应性都得到了提升。
