Spring AI WebFlux:响应式AI服务架构设计与实战
2026/4/30大约 6 分钟
Spring AI WebFlux:响应式AI服务架构设计与实战
适读人群:有1-5年Java开发经验,想向AI工程师方向转型的开发者 阅读时长:约18分钟 文章价值:① 理解响应式编程在AI场景下的核心价值 ② 掌握Spring AI + WebFlux的流式推送完整实现 ③ 学会响应式背压控制和错误处理的实战技巧
小林是我们群里一个做AI产品的开发者,上个月分享了一段视频——他们的AI对话界面上,AI的回复是一个字一个字"打"出来的,跟ChatGPT的效果一样。
群里立刻有人问:"这个怎么实现的?是前端假装打字效果吗?"
小林回了一句:"不是假装,是真实的流式响应。后端用的是Spring AI + WebFlux,LLM生成一个token就推一个,用户看到的是真实的生成过程。"
这篇文章,我就来讲清楚这套技术的完整实现。
为什么AI场景特别适合响应式
先想一个问题:LLM生成一个400字的回答,要多久?
可能需要10秒。
如果是传统同步方式,用户盯着加载圈等10秒,然后一下子看到完整答案。
如果是流式响应,第0.1秒就看到第一个字,然后一直看到AI在"思考",整体体验完全不同——即使总时间是一样的,用户感受截然不同。
这就是流式响应的核心价值:把等待变成过程。
响应式编程(Reactor)在这里的价值是:
- 背压控制:客户端处理慢了,可以告诉服务端"先等一下",不会被撑爆
- 非阻塞全链路:从LLM到前端,全程不阻塞线程
- 声明式错误处理:错误处理和重试逻辑融入数据流,更优雅
代码实战
第一步:依赖配置
<dependencies>
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-openai-spring-boot-starter</artifactId>
<version>1.0.0</version>
</dependency>
<!-- WebFlux:响应式Web框架,替代spring-boot-starter-web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<!-- R2DBC:响应式数据库访问(如果需要数据库)-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>r2dbc-postgresql</artifactId>
</dependency>
</dependencies>spring:
ai:
openai:
api-key: ${OPENAI_API_KEY}
chat:
options:
model: gpt-4o
# R2DBC数据库配置
r2dbc:
url: r2dbc:postgresql://localhost:5432/aidb
username: postgres
password: ${DB_PASSWORD}第二步:流式AI服务
@Service
@RequiredArgsConstructor
@Slf4j
public class ReactiveAiService {
private final ChatClient chatClient;
/**
* 基础流式聊天:返回Flux<String>,每个元素是一个token
*/
public Flux<String> streamChat(String prompt) {
return chatClient.prompt()
.user(prompt)
.stream()
.content()
// 过滤空token(LLM偶尔输出空字符串)
.filter(token -> token != null && !token.isEmpty())
// 错误处理:出错时降级到错误提示
.onErrorResume(e -> {
log.error("流式聊天出错", e);
return Flux.just("抱歉,AI服务暂时出现问题,请稍后重试。");
})
// 打印日志(生产环境去掉)
.doOnComplete(() -> log.debug("流式响应完成,prompt={}", prompt));
}
/**
* 带会话历史的流式聊天
*/
public Flux<String> streamChatWithHistory(List<ChatMessage> history, String newMessage) {
// 构建包含历史记录的消息列表
List<Message> messages = buildMessageList(history, newMessage);
return chatClient.prompt()
.messages(messages)
.stream()
.content()
.filter(token -> !token.isEmpty())
.onErrorResume(e -> Flux.just("[ERROR]" + e.getMessage()));
}
/**
* 带超时控制的流式聊天
* AI流式响应可能很慢,必须设置超时防止连接永远挂着
*/
public Flux<String> streamChatWithTimeout(String prompt, Duration timeout) {
return chatClient.prompt()
.user(prompt)
.stream()
.content()
.timeout(timeout)
.onErrorResume(TimeoutException.class, e -> {
log.warn("流式响应超时,prompt前50字={}", prompt.substring(0, Math.min(50, prompt.length())));
return Flux.just("\n\n[响应超时,以上为已生成内容]");
})
.onErrorResume(e -> Flux.just("\n\n[服务异常,请重试]"));
}
/**
* 多并发流式查询:同时向多个专家AI查询,流式合并输出
*/
public Flux<String> streamMultiExpertChat(String question) {
Flux<String> legalExpert = chatClient.prompt()
.system("你是法律专家,从法律角度分析问题。")
.user(question)
.stream().content()
.map(t -> "[法律] " + t);
Flux<String> techExpert = chatClient.prompt()
.system("你是技术专家,从技术角度分析问题。")
.user(question)
.stream().content()
.map(t -> "[技术] " + t);
// 合并两个流,交替输出
return Flux.merge(legalExpert, techExpert);
}
private List<Message> buildMessageList(List<ChatMessage> history, String newMessage) {
List<Message> messages = new ArrayList<>();
for (ChatMessage msg : history) {
if ("user".equals(msg.getRole())) {
messages.add(new UserMessage(msg.getContent()));
} else {
messages.add(new AssistantMessage(msg.getContent()));
}
}
messages.add(new UserMessage(newMessage));
return messages;
}
}第三步:WebFlux Controller(SSE + WebSocket)
@RestController
@RequestMapping("/api/reactive/ai")
@RequiredArgsConstructor
@Slf4j
public class ReactiveAiController {
private final ReactiveAiService aiService;
/**
* SSE流式接口:最简单的流式推送方式
* 前端用 EventSource 接收
*/
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> streamChat(@RequestParam String prompt) {
return aiService.streamChat(prompt)
.map(token -> ServerSentEvent.<String>builder()
.data(token)
.build())
// 流结束时发送done事件
.concatWith(Mono.just(
ServerSentEvent.<String>builder()
.event("done")
.data("[DONE]")
.build()
));
}
/**
* 带会话ID的流式接口
*/
@PostMapping(value = "/chat/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> chatStream(@RequestBody ChatStreamRequest request) {
return aiService.streamChatWithTimeout(
request.getPrompt(),
Duration.ofSeconds(60)
)
.map(token -> ServerSentEvent.<String>builder()
.id(request.getSessionId())
.data(token)
.build())
.doOnError(e -> log.error("流式接口异常,sessionId={}", request.getSessionId(), e));
}
/**
* 非流式接口:等全部生成完再返回
* 使用Mono,让整个链路保持响应式
*/
@PostMapping("/chat")
public Mono<ChatResponse> chat(@RequestBody ChatRequest request) {
return Mono.fromCallable(() ->
chatClient.prompt()
.user(request.getPrompt())
.call()
.content()
)
.map(content -> ChatResponse.builder()
.content(content)
.sessionId(request.getSessionId())
.build())
.onErrorResume(e -> Mono.just(ChatResponse.error("服务异常: " + e.getMessage())));
}
}第四步:WebSocket实现双向流式对话
SSE只能服务端推,WebSocket支持双向通信,更适合对话场景:
@Component
@RequiredArgsConstructor
@Slf4j
public class AiWebSocketHandler implements WebSocketHandler {
private final ReactiveAiService aiService;
private final ObjectMapper objectMapper;
@Override
public Mono<Void> handle(WebSocketSession session) {
String sessionId = session.getId();
log.info("WebSocket连接建立,sessionId={}", sessionId);
// 接收用户消息,触发AI流式回复,把结果推回去
Flux<WebSocketMessage> outputMessages = session.receive()
// 解析用户消息
.map(msg -> {
try {
return objectMapper.readValue(msg.getPayloadAsText(),
WebSocketChatRequest.class);
} catch (JsonProcessingException e) {
throw new RuntimeException("消息解析失败", e);
}
})
// 对每条用户消息,发起AI流式调用
.flatMap(request ->
aiService.streamChatWithTimeout(request.getPrompt(), Duration.ofSeconds(60))
.map(token -> {
// 包装成带sessionId的消息
WebSocketChatResponse response = WebSocketChatResponse.builder()
.sessionId(sessionId)
.token(token)
.done(false)
.build();
try {
return session.textMessage(
objectMapper.writeValueAsString(response));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
})
// 流结束:发送done消息
.concatWith(Mono.fromCallable(() -> {
WebSocketChatResponse done = WebSocketChatResponse.builder()
.sessionId(sessionId)
.done(true)
.build();
return session.textMessage(
objectMapper.writeValueAsString(done));
}))
);
return session.send(outputMessages)
.doOnError(e -> log.error("WebSocket推送失败,sessionId={}", sessionId, e))
.doFinally(signal -> log.info("WebSocket连接关闭,sessionId={}", sessionId));
}
}
@Configuration
public class WebSocketConfig {
@Bean
public HandlerMapping webSocketHandlerMapping(AiWebSocketHandler handler) {
Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/ws/ai/chat", handler);
SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setUrlMap(map);
mapping.setOrder(-1);
return mapping;
}
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}第五步:前端对接(JavaScript示例)
// SSE方式接收流式响应
function streamChat(prompt) {
const eventSource = new EventSource(`/api/reactive/ai/stream?prompt=${encodeURIComponent(prompt)}`);
const outputDiv = document.getElementById('output');
outputDiv.textContent = '';
eventSource.onmessage = function(event) {
if (event.data === '[DONE]') {
eventSource.close();
return;
}
outputDiv.textContent += event.data;
// 自动滚动到底部
outputDiv.scrollTop = outputDiv.scrollHeight;
};
eventSource.onerror = function() {
eventSource.close();
outputDiv.textContent += '\n[连接断开]';
};
}
// WebSocket方式双向对话
const ws = new WebSocket('ws://localhost:8080/ws/ai/chat');
ws.onmessage = function(event) {
const response = JSON.parse(event.data);
if (!response.done) {
document.getElementById('output').textContent += response.token;
} else {
console.log('AI回答完毕');
}
};
function sendMessage(prompt) {
ws.send(JSON.stringify({ prompt, sessionId: 'user123' }));
}背压测试:验证响应式真的有用
@Test
public void testBackpressure() {
StepVerifier.create(
aiService.streamChat("请写一首长诗")
// 模拟慢消费者:每个token处理需要100ms
.delayElements(Duration.ofMillis(100))
.take(20) // 只取前20个token
)
.expectNextCount(20)
.verifyComplete();
}响应式的背压确保了:即使消费者(前端/用户)处理慢,也不会导致内存溢出,数据流会自动调速。这是普通回调/Future无法优雅实现的。
