第2480篇:实时AI流处理系统——Flink与LLM的实时数据处理
2026/4/30大约 5 分钟
第2480篇:实时AI流处理系统——Flink与LLM的实时数据处理
适读人群:数据工程师、后端工程师、AI工程师 | 阅读时长:约17分钟 | 核心价值:将Flink的流处理能力与LLM的语义理解结合,构建实时AI数据管道
有一个电商客服场景:用户在聊天窗口里发消息,客服系统要实时分析这些消息,判断用户意图(咨询/投诉/退货/建议),如果是投诉要立即触发SLA计时,如果是严重投诉要实时通知客服主管介入。
用批处理做?延迟太高。用传统规则引擎?覆盖不了自然语言的复杂性。
用LLM做实时分析?可以,但需要解决几个工程问题:LLM的延迟比传统流处理高很多、LLM的并发请求量受API限制、如何保证LLM处理不阻塞流式数据管道。
架构设计
关键设计决策:LLM不应该直接在Flink算子的热路径上被调用。LLM调用延迟高(通常100-2000ms),如果放在流处理的关键路径上,会让整个pipeline的吞吐量受到LLM速率的限制。
正确的做法是异步解耦:Flink做数据路由和状态管理,LLM调用异步执行,结果写回后再触发下游处理。
核心实现
1. Flink流处理作业
public class RealTimeIntentAnalysisJob {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 配置检查点(保证exactly-once语义)
env.enableCheckpointing(30000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 消息输入流
DataStream<ChatMessage> messageStream = env
.addSource(buildKafkaSource("chat-messages"))
.name("chat-messages-source");
// 第一阶段:规则预处理(低延迟,同步处理)
SingleOutputStreamOperator<MessageWithPreliminaryClass> preprocessed = messageStream
.process(new RuleBasedPreprocessor())
.name("rule-based-preprocessor");
// 分叉:
// - 规则能处理的(明显的系统消息、短命令等)直接走快速通道
// - 需要LLM分析的走异步LLM通道
OutputTag<MessageWithPreliminaryClass> fastLaneTag =
new OutputTag<>("fast-lane"){};
// 发往LLM队列的(需要深度分析)
DataStream<LLMRequest> llmRequests = preprocessed
.getSideOutput(fastLaneTag) // 排除快速通道的
.map(msg -> LLMRequest.from(msg));
// 快速通道直接输出
DataStream<AnalyzedMessage> fastResults = preprocessed
.getSideOutput(fastLaneTag)
.map(msg -> AnalyzedMessage.fromRule(msg));
// 写入LLM请求队列
llmRequests.addSink(buildKafkaSink("llm-requests"));
// LLM结果流(从LLM分析微服务读取结果)
DataStream<LLMResult> llmResults = env
.addSource(buildKafkaSource("llm-results"))
.name("llm-results-source");
// 把LLM结果和原始消息关联(基于messageId)
DataStream<AnalyzedMessage> llmAnalyzed = messageStream
.keyBy(ChatMessage::getMessageId)
.connect(llmResults.keyBy(LLMResult::getMessageId))
.process(new MessageLLMResultJoiner())
.name("message-llm-join");
// 合并两个流
DataStream<AnalyzedMessage> allAnalyzed = fastResults.union(llmAnalyzed);
// 业务处理:触发SLA、通知等
allAnalyzed
.keyBy(msg -> msg.getSessionId())
.process(new BusinessActionTrigger())
.addSink(buildActionSink());
env.execute("Real-time Intent Analysis");
}
}2. 规则预处理算子
public class RuleBasedPreprocessor
extends ProcessFunction<ChatMessage, MessageWithPreliminaryClass> {
private static final OutputTag<MessageWithPreliminaryClass> FAST_LANE_TAG =
new OutputTag<>("fast-lane"){};
@Override
public void processElement(
ChatMessage message,
Context ctx,
Collector<MessageWithPreliminaryClass> out) {
MessageWithPreliminaryClass classified = classify(message);
// 高置信度的规则分类走快速通道,不需要LLM
if (classified.getConfidence() > 0.9) {
ctx.output(FAST_LANE_TAG, classified);
} else {
// 低置信度的发给LLM分析
out.collect(classified);
}
}
private MessageWithPreliminaryClass classify(ChatMessage message) {
String text = message.getContent().toLowerCase();
// 简单规则:快速识别明显类型
if (text.matches(".*(退款|退货|换货).*")) {
return MessageWithPreliminaryClass.of(message, IntentType.REFUND_REQUEST, 0.85);
}
if (text.matches(".*(投诉|举报|不满意|太差了|骗人).*")) {
return MessageWithPreliminaryClass.of(message, IntentType.COMPLAINT, 0.80);
}
if (text.matches(".*(在吗|你好|请问|咨询).*") && text.length() < 50) {
return MessageWithPreliminaryClass.of(message, IntentType.GENERAL_INQUIRY, 0.75);
}
// 无法规则判断,需要LLM
return MessageWithPreliminaryClass.of(message, IntentType.UNKNOWN, 0.0);
}
}3. LLM分析微服务(异步批处理)
LLM分析作为独立的微服务,通过Kafka与Flink解耦:
@Service
public class LLMAnalysisWorker {
private final ChatClient chatClient;
private final KafkaTemplate<String, LLMResult> kafkaTemplate;
private static final int BATCH_SIZE = 20;
private static final Duration BATCH_TIMEOUT = Duration.ofMillis(500);
/**
* 批量处理LLM请求
* 把多个请求合并成一个LLM调用,提高效率降低成本
*/
@KafkaListener(topics = "llm-requests", groupId = "llm-workers")
public void processRequests(
@Payload List<LLMRequest> requests,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
// 按批次处理
Lists.partition(requests, BATCH_SIZE).forEach(batch -> {
List<LLMResult> results = analyzeBatch(batch);
results.forEach(result -> {
kafkaTemplate.send("llm-results", result.getMessageId(), result);
});
});
}
private List<LLMResult> analyzeBatch(List<LLMRequest> batch) {
// 构建批量分析prompt
String batchPrompt = buildBatchPrompt(batch);
ChatResponse response = chatClient.call(new Prompt(
List.of(
new SystemMessage("""
分析每条用户消息的意图,返回JSON数组。
意图类型: GENERAL_INQUIRY/COMPLAINT/REFUND_REQUEST/ORDER_STATUS/PRODUCT_INFO/OTHER
严重程度(仅投诉类): LOW/MEDIUM/HIGH/CRITICAL
"""),
new UserMessage(batchPrompt)
),
OpenAiChatOptions.builder()
.withModel("gpt-4o-mini") // 用便宜的模型处理批量分析
.withTemperature(0.1f)
.withResponseFormat(new ResponseFormat(ResponseFormat.Type.JSON_OBJECT))
.build()
));
return parseBatchResults(response.getResult().getOutput().getContent(), batch);
}
private String buildBatchPrompt(List<LLMRequest> batch) {
StringBuilder sb = new StringBuilder("分析以下每条消息:\n\n");
for (int i = 0; i < batch.size(); i++) {
sb.append(String.format("%d. [ID:%s] %s\n",
i + 1, batch.get(i).getMessageId(), batch.get(i).getContent()));
}
return sb.toString();
}
}4. 业务动作触发器
public class BusinessActionTrigger
extends KeyedProcessFunction<String, AnalyzedMessage, BusinessAction> {
// Flink状态:跟踪每个会话的SLA
private ValueState<SessionSLAState> slaState;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<SessionSLAState> descriptor =
new ValueStateDescriptor<>("sla-state", SessionSLAState.class);
slaState = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(
AnalyzedMessage message,
Context ctx,
Collector<BusinessAction> out) throws Exception {
SessionSLAState state = slaState.value();
if (state == null) {
state = new SessionSLAState();
}
// 处理投诉
if (message.getIntent() == IntentType.COMPLAINT) {
if (!state.hasSLATimer()) {
// 启动SLA计时器(15分钟内必须响应)
long slaDeadline = ctx.timestamp() + Duration.ofMinutes(15).toMillis();
ctx.timerService().registerEventTimeTimer(slaDeadline);
state.setSlaDeadline(slaDeadline);
// 发送SLA启动事件
out.collect(BusinessAction.startSLA(message.getSessionId(), slaDeadline));
}
// 严重投诉,实时通知主管
if (message.getComplaintSeverity() == ComplaintSeverity.CRITICAL) {
out.collect(BusinessAction.notifyManager(
message.getSessionId(),
message.getContent(),
"严重投诉,需要立即介入"
));
}
}
// 退款请求,触发退款流程
if (message.getIntent() == IntentType.REFUND_REQUEST) {
out.collect(BusinessAction.triggerRefundProcess(
message.getSessionId(), message.getOrderId()
));
}
slaState.update(state);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<BusinessAction> out) {
// SLA超时触发
out.collect(BusinessAction.slaBreached(ctx.getCurrentKey()));
}
}性能调优经验
批量调用是降低LLM成本的关键。我们把20个请求合并成一个LLM调用,与20次单独调用相比,每次LLM调用的固定开销(包括网络RTT、Token开销)被分摊了,总成本降低了约60%。
模型分级。不是所有消息都需要GPT-4o分析,我们用GPT-4o-mini处理批量分类(便宜、速度快),GPT-4o只用在需要深度理解(比如情绪分析、复杂意图)的场景。
背压控制。当LLM分析服务处理慢时,Kafka队列会积压。Flink端要实现背压控制,避免内存溢出。我们设置了队列深度告警(积压超过10000条就告警),并在LLM微服务端配置了自动扩容。
