第1811篇:Kafka Streams与AI的结合——实时文本分类流水线的构建
第1811篇:Kafka Streams与AI的结合——实时文本分类流水线的构建
说起来惭愧,我第一次把Kafka Streams和LLM接在一起的时候,直接把整个集群打挂了。
那是去年的事,我们在做一个内容审核系统,产品要求"实时"——用户发一条评论,500毫秒内给出审核结果。我当时的思路很直接:Kafka接消息,Streams做流处理,每条消息调一次LLM做文本分类,然后把结果写回。看起来很完美对吧?
上线第一天,LLM调用的平均延迟320ms,偶发性的P99跑到4秒多。Kafka Streams的处理线程全部阻塞在LLM调用上,lag开始飙升,然后背压传导,上游消费者开始报错,最后整个链路雪崩。
这篇文章就是把我踩过的这些坑,连同正确的架构思路一起整理出来。不是教科书式的介绍,是真实系统里血和泪换来的经验。
为什么选Kafka Streams而不是Flink
在讲方案之前,先解释一下为什么用Kafka Streams而不是更主流的Flink。
原因很实际:我们团队当时没有Flink运维经验,Kafka集群已经在跑了,Kafka Streams是一个库而不是一个独立的集群,部署成本几乎为零——就是在Spring Boot应用里加一个依赖。
这是Kafka Streams最大的优势:无外部依赖。不需要JobManager、TaskManager、ZooKeeper(Kafka新版本也去掉了),直接作为你Java应用的一部分运行。
但它也有明显的局限:状态存在本地RocksDB里,扩容的时候状态迁移比较麻烦;不支持真正的批流一体;复杂的拓扑关系用起来比Flink繁琐。
对于中小规模的实时AI应用,Kafka Streams是个很务实的选择。不要听网上那些"Flink才是流处理正统"的说法,选工具要看团队背景和业务规模。
核心架构设计
先把整体架构理清楚。一个完整的实时文本分类流水线,分这几层:
这里有几个关键设计决策:
双路由策略:不是所有消息都需要LLM。简单的规则分类(包含某些关键词直接拒绝)走本地路径,延迟<10ms;复杂的语义理解才走LLM,这样可以把LLM调用量降低60-70%。
异步调用池:这是修复我之前那次事故的核心。LLM调用绝对不能阻塞Streams处理线程。
批量推理:把多条消息聚合成一个batch再调LLM,吞吐量能提升3-5倍。
项目依赖配置
<dependencies>
<!-- Kafka Streams -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.7.0</version>
</dependency>
<!-- Spring Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.2.3</version>
</dependency>
<!-- LangChain4j for LLM integration -->
<dependency>
<groupId>dev.langchain4j</groupId>
<artifactId>langchain4j</artifactId>
<version>0.32.0</version>
</dependency>
<!-- OpenAI compatible client -->
<dependency>
<groupId>dev.langchain4j</groupId>
<artifactId>langchain4j-open-ai</artifactId>
<version>0.32.0</version>
</dependency>
<!-- Resilience4j for circuit breaker -->
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-circuitbreaker</artifactId>
<version>2.2.0</version>
</dependency>
<!-- Micrometer for metrics -->
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
</dependencies>消息模型设计
先把核心数据模型定义清楚,这一步很多人图省事跳过,后面会后悔的:
// 输入消息
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class TextMessage {
private String messageId;
private String content;
private String userId;
private String source; // APP/WEB/API
private long timestamp;
private Map<String, String> metadata;
}
// 分类结果
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class ClassificationResult {
private String messageId;
private String originalContent;
private ClassificationCategory category;
private double confidence;
private String reason; // LLM给出的原因
private ClassificationMethod method; // RULE/LLM
private long processingTimeMs;
private long timestamp;
public enum ClassificationCategory {
NORMAL, // 正常内容
SPAM, // 垃圾广告
TOXIC, // 有害内容
SENSITIVE, // 敏感话题
UNKNOWN // 无法判断
}
public enum ClassificationMethod {
RULE_BASED,
LLM_BASED,
HYBRID
}
}
// 路由决策
@Data
@Builder
public class RoutingDecision {
private TextMessage message;
private boolean needsLLM;
private String routingReason;
private List<String> triggeredRules;
}Kafka Streams拓扑构建
这是核心代码,我一块一块解释:
@Configuration
@Slf4j
public class TextClassificationTopology {
private static final String INPUT_TOPIC = "raw-text-input";
private static final String RESULT_TOPIC = "classification-result";
private static final String LLM_PENDING_TOPIC = "llm-pending";
private static final String DEAD_LETTER_TOPIC = "classification-dlq";
// 用于序列化/反序列化
private final Serde<TextMessage> textMessageSerde;
private final Serde<ClassificationResult> resultSerde;
private final Serde<RoutingDecision> routingDecisionSerde;
@Bean
public Topology buildTopology() {
StreamsBuilder builder = new StreamsBuilder();
// 读取原始输入流
KStream<String, TextMessage> inputStream = builder.stream(
INPUT_TOPIC,
Consumed.with(Serdes.String(), textMessageSerde)
);
// 步骤1:预处理 - 过滤空消息,清洗内容
KStream<String, TextMessage> cleanedStream = inputStream
.filter((key, msg) -> msg != null && msg.getContent() != null
&& !msg.getContent().isBlank())
.mapValues(this::preprocessMessage)
.peek((key, msg) -> log.debug("Preprocessed message: {}", key));
// 步骤2:路由决策 - 区分规则可处理 vs 需要LLM
KStream<String, RoutingDecision>[] branches = cleanedStream
.mapValues(this::makeRoutingDecision)
.branch(
(key, decision) -> !decision.isNeedsLLM(), // 规则路径
(key, decision) -> decision.isNeedsLLM() // LLM路径
);
KStream<String, RoutingDecision> ruleBasedStream = branches[0];
KStream<String, RoutingDecision> llmStream = branches[1];
// 步骤3a:规则路径 - 直接分类,延迟极低
KStream<String, ClassificationResult> ruleResults = ruleBasedStream
.mapValues(this::classifyByRule);
// 步骤3b:LLM路径 - 发到pending topic,异步处理
// 注意:这里不直接调LLM,而是转发到另一个topic
llmStream
.mapValues(decision -> decision.getMessage())
.to(LLM_PENDING_TOPIC, Produced.with(Serdes.String(), textMessageSerde));
// 步骤4:规则结果写入最终topic
ruleResults.to(RESULT_TOPIC, Produced.with(Serdes.String(), resultSerde));
// 异常处理 - 处理失败的消息写入DLQ
// 通过自定义异常处理器实现,见下文
return builder.build();
}
private TextMessage preprocessMessage(TextMessage msg) {
// 清洗HTML标签、标准化空白字符、截断超长文本
String cleaned = msg.getContent()
.replaceAll("<[^>]+>", "")
.replaceAll("\\s+", " ")
.trim();
// 超长文本截断 - LLM有token限制
if (cleaned.length() > 2000) {
cleaned = cleaned.substring(0, 2000) + "...";
}
return TextMessage.builder()
.messageId(msg.getMessageId())
.content(cleaned)
.userId(msg.getUserId())
.source(msg.getSource())
.timestamp(msg.getTimestamp())
.metadata(msg.getMetadata())
.build();
}
private RoutingDecision makeRoutingDecision(TextMessage msg) {
List<String> triggeredRules = new ArrayList<>();
// 规则1:纯数字/符号,无需LLM
if (msg.getContent().matches("[\\d\\s\\p{Punct}]+")) {
triggeredRules.add("NUMERIC_ONLY");
return RoutingDecision.builder()
.message(msg).needsLLM(false)
.routingReason("纯数字符号内容").triggeredRules(triggeredRules)
.build();
}
// 规则2:明确违禁词,直接规则处理
if (containsBannedKeywords(msg.getContent())) {
triggeredRules.add("BANNED_KEYWORD");
return RoutingDecision.builder()
.message(msg).needsLLM(false)
.routingReason("命中违禁词").triggeredRules(triggeredRules)
.build();
}
// 规则3:极短文本,规则处理即可
if (msg.getContent().length() < 10) {
triggeredRules.add("TOO_SHORT");
return RoutingDecision.builder()
.message(msg).needsLLM(false)
.routingReason("文本过短").triggeredRules(triggeredRules)
.build();
}
// 其余情况需要LLM语义理解
return RoutingDecision.builder()
.message(msg).needsLLM(true)
.routingReason("需要语义分析").triggeredRules(triggeredRules)
.build();
}
private ClassificationResult classifyByRule(RoutingDecision decision) {
long startTime = System.currentTimeMillis();
TextMessage msg = decision.getMessage();
ClassificationResult.ClassificationCategory category;
double confidence = 0.95;
String reason;
if (decision.getTriggeredRules().contains("BANNED_KEYWORD")) {
category = ClassificationResult.ClassificationCategory.TOXIC;
reason = "命中违禁关键词: " + String.join(", ", decision.getTriggeredRules());
} else {
category = ClassificationResult.ClassificationCategory.NORMAL;
reason = "规则判断为正常内容";
}
return ClassificationResult.builder()
.messageId(msg.getMessageId())
.originalContent(msg.getContent())
.category(category)
.confidence(confidence)
.reason(reason)
.method(ClassificationResult.ClassificationMethod.RULE_BASED)
.processingTimeMs(System.currentTimeMillis() - startTime)
.timestamp(System.currentTimeMillis())
.build();
}
private boolean containsBannedKeywords(String content) {
// 实际项目中这里会是一个高效的Aho-Corasick自动机
List<String> bannedWords = Arrays.asList("违禁词1", "违禁词2");
return bannedWords.stream().anyMatch(content::contains);
}
}LLM异步处理服务
这是修复最初崩溃问题的关键。LLM调用独立成一个服务,从pending topic消费,批量处理后写回result topic:
@Service
@Slf4j
public class LLMClassificationService {
private final ChatLanguageModel chatModel;
private final KafkaTemplate<String, ClassificationResult> kafkaTemplate;
private final MeterRegistry meterRegistry;
private final CircuitBreaker circuitBreaker;
// 批量处理的核心参数
private static final int BATCH_SIZE = 20;
private static final int MAX_WAIT_MS = 100; // 最多等100ms凑批
// Java 21 Virtual Threads - 高并发LLM调用的利器
private final ExecutorService virtualThreadExecutor =
Executors.newVirtualThreadPerTaskExecutor();
@Autowired
public LLMClassificationService(
@Value("${llm.api.url}") String apiUrl,
@Value("${llm.api.key}") String apiKey,
@Value("${llm.model.name}") String modelName,
KafkaTemplate<String, ClassificationResult> kafkaTemplate,
MeterRegistry meterRegistry) {
this.chatModel = OpenAiChatModel.builder()
.baseUrl(apiUrl)
.apiKey(apiKey)
.modelName(modelName)
.timeout(Duration.ofSeconds(10))
.maxRetries(2)
.build();
this.kafkaTemplate = kafkaTemplate;
this.meterRegistry = meterRegistry;
// 配置熔断器 - 防止LLM服务抖动时雪崩
CircuitBreakerConfig cbConfig = CircuitBreakerConfig.custom()
.failureRateThreshold(50)
.slowCallRateThreshold(80)
.slowCallDurationThreshold(Duration.ofSeconds(5))
.waitDurationInOpenState(Duration.ofSeconds(30))
.slidingWindowSize(20)
.build();
this.circuitBreaker = CircuitBreaker.of("llm-service", cbConfig);
}
@KafkaListener(
topics = "llm-pending",
groupId = "llm-classifier",
concurrency = "3", // 3个并发消费者
properties = {
"fetch.min.bytes=1024", // 凑够1KB再拉取,增加批量机会
"fetch.max.wait.ms=100" // 最多等100ms
}
)
public void processLLMBatch(List<ConsumerRecord<String, TextMessage>> records) {
if (records.isEmpty()) return;
log.info("Processing LLM batch, size: {}", records.size());
Timer.Sample timer = Timer.start(meterRegistry);
// 把一批消息并发提交给LLM处理
List<CompletableFuture<ClassificationResult>> futures = records.stream()
.map(record -> CompletableFuture.supplyAsync(
() -> classifyWithLLM(record.value()),
virtualThreadExecutor
))
.collect(Collectors.toList());
// 等待所有结果(带超时保护)
try {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.get(15, TimeUnit.SECONDS);
// 批量写入Kafka
futures.stream()
.map(f -> {
try { return f.get(); }
catch (Exception e) { return null; }
})
.filter(Objects::nonNull)
.forEach(result -> {
kafkaTemplate.send("classification-result", result.getMessageId(), result);
meterRegistry.counter("llm.classification.success").increment();
});
} catch (TimeoutException e) {
log.error("LLM batch processing timeout for {} records", records.size());
meterRegistry.counter("llm.classification.timeout").increment();
// 超时的消息写入DLQ,人工处理
records.forEach(r -> sendToDLQ(r.value(), "TIMEOUT"));
} catch (Exception e) {
log.error("LLM batch processing failed", e);
records.forEach(r -> sendToDLQ(r.value(), e.getMessage()));
}
timer.stop(Timer.builder("llm.batch.processing.time")
.tag("batch_size", String.valueOf(records.size()))
.register(meterRegistry));
}
private ClassificationResult classifyWithLLM(TextMessage msg) {
long startTime = System.currentTimeMillis();
// 熔断器保护
Supplier<ClassificationResult> classifySupplier = CircuitBreaker
.decorateSupplier(circuitBreaker, () -> doLLMClassification(msg));
try {
return Try.ofSupplier(classifySupplier)
.recover(CallNotPermittedException.class, ex -> fallbackClassification(msg, "CIRCUIT_OPEN"))
.get();
} catch (Exception e) {
log.error("LLM classification failed for message: {}", msg.getMessageId(), e);
return fallbackClassification(msg, e.getMessage());
}
}
private ClassificationResult doLLMClassification(TextMessage msg) {
// 精心设计的prompt - 要求结构化输出
String prompt = buildClassificationPrompt(msg.getContent());
String response = chatModel.generate(prompt);
// 解析LLM返回的JSON结果
return parseClassificationResponse(msg, response);
}
private String buildClassificationPrompt(String content) {
return String.format("""
你是一个内容审核专家,请对以下文本进行分类。
文本内容:
%s
请严格按照以下JSON格式返回结果,不要有任何额外内容:
{
"category": "NORMAL|SPAM|TOXIC|SENSITIVE|UNKNOWN",
"confidence": 0.0-1.0之间的数字,
"reason": "简短的分类理由,不超过50字"
}
分类说明:
- NORMAL: 正常内容
- SPAM: 垃圾广告、营销推广
- TOXIC: 违法违规、暴恐色情
- SENSITIVE: 政治敏感、争议性话题
- UNKNOWN: 无法判断
""", content);
}
private ClassificationResult parseClassificationResponse(TextMessage msg, String response) {
try {
// 从LLM响应中提取JSON
ObjectMapper mapper = new ObjectMapper();
String jsonStr = extractJson(response);
JsonNode node = mapper.readTree(jsonStr);
String categoryStr = node.get("category").asText("UNKNOWN");
double confidence = node.get("confidence").asDouble(0.5);
String reason = node.get("reason").asText("LLM分类");
return ClassificationResult.builder()
.messageId(msg.getMessageId())
.originalContent(msg.getContent())
.category(ClassificationResult.ClassificationCategory.valueOf(categoryStr))
.confidence(confidence)
.reason(reason)
.method(ClassificationResult.ClassificationMethod.LLM_BASED)
.processingTimeMs(System.currentTimeMillis())
.timestamp(System.currentTimeMillis())
.build();
} catch (Exception e) {
log.warn("Failed to parse LLM response: {}, raw: {}", e.getMessage(), response);
return fallbackClassification(msg, "PARSE_ERROR");
}
}
private String extractJson(String text) {
// 提取第一个完整的JSON对象
int start = text.indexOf('{');
int end = text.lastIndexOf('}');
if (start >= 0 && end > start) {
return text.substring(start, end + 1);
}
throw new IllegalArgumentException("No JSON found in response: " + text);
}
private ClassificationResult fallbackClassification(TextMessage msg, String reason) {
return ClassificationResult.builder()
.messageId(msg.getMessageId())
.originalContent(msg.getContent())
.category(ClassificationResult.ClassificationCategory.UNKNOWN)
.confidence(0.0)
.reason("降级处理: " + reason)
.method(ClassificationResult.ClassificationMethod.RULE_BASED)
.processingTimeMs(0)
.timestamp(System.currentTimeMillis())
.build();
}
private void sendToDLQ(TextMessage msg, String errorReason) {
// 发送到死信队列,包含原始消息和错误信息
log.warn("Sending to DLQ: {}, reason: {}", msg.getMessageId(), errorReason);
// 实际实现省略
}
}Kafka Streams配置调优
光有代码还不够,配置才是性能的关键:
@Configuration
public class KafkaStreamsConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfig() {
Map<String, Object> props = new HashMap<>();
// 基础配置
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "text-classification-v1");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
// 性能调优 - 这几个参数我调了很久
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
Runtime.getRuntime().availableProcessors()); // 充分利用CPU
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); // 1秒提交一次offset
props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L); // 10MB缓存
// 状态存储 - RocksDB调优
props.put(StreamsConfig.STATE_DIR_CONFIG, "/data/kafka-streams-state");
// 容错配置
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2);
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.AT_LEAST_ONCE); // 先用至少一次,业务做幂等
// 序列化
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class);
return new KafkaStreamsConfiguration(props);
}
// 自定义序列化器
@Bean
public Serde<TextMessage> textMessageSerde() {
return new JsonSerde<>(TextMessage.class);
}
@Bean
public Serde<ClassificationResult> classificationResultSerde() {
return new JsonSerde<>(ClassificationResult.class);
}
}监控指标设计
这套系统上线后,我每天早上第一件事是看监控面板。指标设计得好,问题才能早发现:
@Component
@Slf4j
public class ClassificationMetrics {
private final MeterRegistry registry;
// 核心业务指标
private final Counter totalProcessed;
private final Counter llmProcessed;
private final Counter ruleProcessed;
private final Counter dlqMessages;
// 分类结果分布
private final Map<String, Counter> categoryCounters = new ConcurrentHashMap<>();
// 性能指标
private final Timer ruleClassificationTimer;
private final Timer llmClassificationTimer;
private final DistributionSummary batchSizeSummary;
public ClassificationMetrics(MeterRegistry registry) {
this.registry = registry;
totalProcessed = Counter.builder("classification.total")
.description("总处理消息数").register(registry);
llmProcessed = Counter.builder("classification.llm")
.description("LLM处理消息数").register(registry);
ruleProcessed = Counter.builder("classification.rule")
.description("规则处理消息数").register(registry);
dlqMessages = Counter.builder("classification.dlq")
.description("死信队列消息数").register(registry);
ruleClassificationTimer = Timer.builder("classification.time")
.tag("method", "rule").register(registry);
llmClassificationTimer = Timer.builder("classification.time")
.tag("method", "llm").register(registry);
batchSizeSummary = DistributionSummary.builder("llm.batch.size")
.description("LLM批处理大小分布").register(registry);
}
public void recordClassification(ClassificationResult result) {
totalProcessed.increment();
if (result.getMethod() == ClassificationResult.ClassificationMethod.LLM_BASED) {
llmProcessed.increment();
} else {
ruleProcessed.increment();
}
// 按分类结果统计
String categoryName = result.getCategory().name();
categoryCounters.computeIfAbsent(categoryName,
name -> Counter.builder("classification.category")
.tag("category", name).register(registry))
.increment();
}
}一些真实踩坑记录
踩坑1:序列化版本兼容。我们升级过一次TextMessage的数据结构,加了一个新字段。老版本序列化的消息在新版本反序列化时直接NPE,导致处理线程崩溃。现在的做法是所有字段加@JsonIgnoreProperties(ignoreUnknown = true),新字段加默认值。
踩坑2:LLM返回格式不稳定。明明prompt里说"严格按JSON格式",模型还是偶尔在JSON前面加"当然,这是结果:"这种前缀。现在的做法是写一个宽松的JSON提取器,找到第一个{到最后一个}的内容,容错率大幅提升。
踩坑3:背压处理缺失。LLM处理慢的时候,pending topic的lag会持续增长。没有背压机制的话,最终内存会被压垮。解决方案是给@KafkaListener配置max.poll.records,限制每次最多拉50条,配合消费者线程的处理能力做匹配。
踩坑4:Virtual Threads与ThreadLocal。Java 21的虚拟线程和传统ThreadLocal配合使用有坑,MDC(日志追踪上下文)在虚拟线程切换时会丢失。现在统一用显式传参的方式传递traceId,而不依赖MDC。
这套架构在我们线上跑了半年,处理量峰值约8万条/分钟,规则路径延迟P99在15ms以内,LLM路径P99在800ms左右(包含批量等待时间)。总体来看,这个数字是可以接受的,产品也认可了。
如果你也在做类似的系统,最大的建议是:先把异步和同步路径分清楚,绝对不要让LLM调用阻塞流处理线程。这是最关键的一个原则。
