AI 应用的 WebSocket 实时通信——多人协作 AI 工具的工程实现
AI 应用的 WebSocket 实时通信——多人协作 AI 工具的工程实现
公司内部有个工具:多个运营同学同时在一个白板上协作,每个人可以向 AI 提问,AI 的回答实时出现在白板上,所有人都能看到。
听起来不复杂,但实现的时候踩了不少坑。主要问题有两个:
问题一:状态隔离。 用户 A 的问题和用户 B 的问题,各自有各自的对话历史,不能串到一起。但他们又在同一个「房间」里,部分内容(比如 AI 的公开回答)需要广播给所有人。
问题二:流式响应和 WebSocket 的结合。 流式 AI 响应(SSE 模式)是单向的服务端推送,WebSocket 是双向全双工。两种协议各有优势,在多人协作场景里需要组合使用。
这篇文章把这两个问题的解法都讲清楚,附上完整的代码实现。
一、需求分析和技术选型
1.1 多人协作 AI 工具的核心需求
- 每个用户有独立的对话历史(私有会话)
- 部分内容(公开问答)对房间内所有用户可见
- AI 的回答是流式的,要实时推送给用户
- 用户可以「@全体」发起一个对所有人可见的 AI 问答
- 在线用户列表实时更新
1.2 WebSocket vs SSE 的选择
SSE(Server-Sent Events):
- 服务端 → 客户端单向推送
- HTTP 协议,穿越代理和防火墙更方便
- 浏览器原生支持,简单
- 缺点:客户端无法通过 SSE 通道发送数据
WebSocket:
- 全双工,双向通信
- 建立连接后开销小
- 适合实时交互场景
- 缺点:部分代理/防火墙配置问题
我们的选择:WebSocket 主通道 + SSE 流式输出
具体来说:
- 用 WebSocket 处理「事件」:用户加入/离开、发送消息、状态更新
- 当 AI 开始流式输出时,把 token 流通过 WebSocket 的消息帧推送
- 这样既有双向通信能力,又有流式推送能力
二、后端实现:Spring WebSocket + STOMP
2.1 WebSocket 配置
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
// 使用内存消息代理(生产环境可换 RabbitMQ)
config.enableSimpleBroker(
"/topic", // 广播(房间级别)
"/queue" // 单播(用户级别)
);
config.setApplicationDestinationPrefixes("/app"); // 客户端发送消息的前缀
config.setUserDestinationPrefix("/user"); // 用户专属目的地
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws/collab")
.setAllowedOriginPatterns("*")
.withSockJS(); // 降级支持(不支持 WebSocket 时用 HTTP 长轮询)
}
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
registration.setMessageSizeLimit(128 * 1024); // 128KB 消息大小限制
registration.setSendBufferSizeLimit(512 * 1024);
registration.setSendTimeLimit(20_000); // 20秒发送超时
}
}2.2 会话状态管理
这是多人协作最核心的部分:每个用户有自己的对话历史,同一个房间的用户可以看到公共事件。
@Component
@Slf4j
public class CollabSessionManager {
// 房间 → 房间内用户列表
private final Map<String, Set<String>> roomUsers = new ConcurrentHashMap<>();
// userId → 私有对话历史(不跨用户共享)
private final Map<String, List<Message>> userConversationHistory = new ConcurrentHashMap<>();
// 房间 → 公共对话历史(房间内所有人可见)
private final Map<String, List<PublicMessage>> roomPublicHistory = new ConcurrentHashMap<>();
// userId → WebSocket session 信息
private final Map<String, UserSession> userSessions = new ConcurrentHashMap<>();
/**
* 用户加入房间
*/
public void userJoin(String userId, String roomId, String username) {
roomUsers.computeIfAbsent(roomId, k -> ConcurrentHashMap.newKeySet()).add(userId);
userSessions.put(userId, new UserSession(userId, username, roomId, Instant.now()));
userConversationHistory.putIfAbsent(userId, new ArrayList<>());
log.info("用户 {} ({}) 加入房间 {}", username, userId, roomId);
}
/**
* 用户离开房间
*/
public void userLeave(String userId) {
UserSession session = userSessions.remove(userId);
if (session != null) {
Set<String> users = roomUsers.get(session.roomId());
if (users != null) {
users.remove(userId);
}
}
// 注意:不清除对话历史,重连后可以恢复
}
/**
* 获取用户的私有对话历史
*/
public List<Message> getUserHistory(String userId) {
return userConversationHistory.getOrDefault(userId, new ArrayList<>());
}
/**
* 添加消息到用户私有历史
*/
public void addToUserHistory(String userId, Message message) {
userConversationHistory.computeIfAbsent(userId, k -> new ArrayList<>())
.add(message);
// 限制历史长度(最近 20 轮对话)
List<Message> history = userConversationHistory.get(userId);
if (history.size() > 40) {
// 保留 System Message + 最近 38 条
List<Message> trimmed = new ArrayList<>();
history.stream().filter(m -> m instanceof SystemMessage).limit(1).forEach(trimmed::add);
trimmed.addAll(history.subList(history.size() - 38, history.size()));
userConversationHistory.put(userId, trimmed);
}
}
/**
* 获取房间内的在线用户列表
*/
public List<UserInfo> getRoomUsers(String roomId) {
Set<String> userIds = roomUsers.getOrDefault(roomId, Set.of());
return userIds.stream()
.map(userId -> userSessions.get(userId))
.filter(Objects::nonNull)
.map(s -> new UserInfo(s.userId(), s.username(), s.joinTime()))
.toList();
}
/**
* 获取用户所在的房间 ID
*/
public Optional<String> getUserRoom(String userId) {
return Optional.ofNullable(userSessions.get(userId))
.map(UserSession::roomId);
}
record UserSession(String userId, String username, String roomId, Instant joinTime) {}
record UserInfo(String userId, String username, Instant joinTime) {}
}2.3 消息控制器
@Controller
@Slf4j
public class CollabController {
private final CollabSessionManager sessionManager;
private final ChatClient chatClient;
private final SimpMessagingTemplate messagingTemplate;
@Autowired
public CollabController(
CollabSessionManager sessionManager,
ChatClient chatClient,
SimpMessagingTemplate messagingTemplate) {
this.sessionManager = sessionManager;
this.chatClient = chatClient;
this.messagingTemplate = messagingTemplate;
}
/**
* 用户加入房间
*/
@MessageMapping("/room.join")
public void joinRoom(
@Payload JoinRoomMessage message,
SimpMessageHeaderAccessor headerAccessor) {
String userId = getUserId(headerAccessor);
String sessionId = headerAccessor.getSessionId();
sessionManager.userJoin(userId, message.roomId(), message.username());
// 把用户 ID 存到 WebSocket session attributes
headerAccessor.getSessionAttributes().put("userId", userId);
headerAccessor.getSessionAttributes().put("roomId", message.roomId());
// 广播给房间内所有人:新用户加入
broadcastToRoom(message.roomId(), WsEvent.builder()
.type("USER_JOINED")
.data(Map.of(
"userId", userId,
"username", message.username(),
"roomUsers", sessionManager.getRoomUsers(message.roomId())
))
.build());
// 给加入的用户发送房间历史
sendToUser(userId, WsEvent.builder()
.type("ROOM_HISTORY")
.data(Map.of("history", sessionManager.getRoomPublicHistory(message.roomId())))
.build());
}
/**
* 私人问 AI(只有自己能看到回答)
*/
@MessageMapping("/ai.ask.private")
public void askAIPrivate(
@Payload AskAIMessage message,
SimpMessageHeaderAccessor headerAccessor) {
String userId = getUserId(headerAccessor);
log.info("用户 {} 私人提问: {}", userId, message.question());
// 添加用户消息到私有历史
sessionManager.addToUserHistory(userId, new UserMessage(message.question()));
// 通知用户 AI 开始思考
sendToUser(userId, WsEvent.builder()
.type("AI_START")
.data(Map.of("questionId", message.questionId()))
.build());
// 流式调用 AI
List<Message> history = sessionManager.getUserHistory(userId);
chatClient.prompt()
.messages(history)
.stream()
.content()
.subscribe(
// onNext:每个 token 推送给用户
token -> sendToUser(userId, WsEvent.builder()
.type("AI_TOKEN")
.data(Map.of(
"questionId", message.questionId(),
"token", token
))
.build()),
// onError:推送错误给用户
error -> {
log.error("AI 调用失败: {}", error.getMessage());
sendToUser(userId, WsEvent.builder()
.type("AI_ERROR")
.data(Map.of("error", error.getMessage()))
.build());
},
// onComplete:标记 AI 回答结束
() -> {
sendToUser(userId, WsEvent.builder()
.type("AI_DONE")
.data(Map.of("questionId", message.questionId()))
.build());
// 完整回答需要单独收集(这里简化处理)
}
);
}
/**
* 公开问 AI(房间内所有人可见)
*/
@MessageMapping("/ai.ask.public")
public void askAIPublic(
@Payload AskAIMessage message,
SimpMessageHeaderAccessor headerAccessor) {
String userId = getUserId(headerAccessor);
String username = sessionManager.getUserSession(userId).username();
Optional<String> roomId = sessionManager.getUserRoom(userId);
if (roomId.isEmpty()) {
sendToUser(userId, WsEvent.error("未加入任何房间"));
return;
}
log.info("用户 {} 在房间 {} 公开提问: {}", username, roomId.get(), message.question());
// 广播:有人提问了
broadcastToRoom(roomId.get(), WsEvent.builder()
.type("PUBLIC_QUESTION")
.data(Map.of(
"questionId", message.questionId(),
"asker", username,
"question", message.question()
))
.build());
// 广播 AI 开始回答
broadcastToRoom(roomId.get(), WsEvent.builder()
.type("AI_PUBLIC_START")
.data(Map.of("questionId", message.questionId()))
.build());
StringBuilder fullAnswer = new StringBuilder();
String finalRoomId = roomId.get();
// 公开问答用系统级对话历史(不用某个用户的私有历史)
chatClient.prompt()
.system("你是一个团队协作助手,回答需要清晰、专业,适合团队讨论")
.user(message.question())
.stream()
.content()
.subscribe(
token -> {
fullAnswer.append(token);
// 广播每个 token 给房间内所有人
broadcastToRoom(finalRoomId, WsEvent.builder()
.type("AI_PUBLIC_TOKEN")
.data(Map.of(
"questionId", message.questionId(),
"token", token
))
.build());
},
error -> broadcastToRoom(finalRoomId, WsEvent.builder()
.type("AI_PUBLIC_ERROR")
.data(Map.of("error", error.getMessage()))
.build()),
() -> {
// 广播完成事件,并保存到房间历史
String answer = fullAnswer.toString();
sessionManager.addToRoomPublicHistory(finalRoomId,
new PublicMessage(message.questionId(), username, message.question(), answer));
broadcastToRoom(finalRoomId, WsEvent.builder()
.type("AI_PUBLIC_DONE")
.data(Map.of(
"questionId", message.questionId(),
"fullAnswer", answer
))
.build());
}
);
}
/**
* 广播消息到房间内所有用户
*/
private void broadcastToRoom(String roomId, WsEvent event) {
messagingTemplate.convertAndSend(
"/topic/room." + roomId,
event
);
}
/**
* 发送消息给特定用户
*/
private void sendToUser(String userId, WsEvent event) {
messagingTemplate.convertAndSendToUser(
userId,
"/queue/messages",
event
);
}
private String getUserId(SimpMessageHeaderAccessor headerAccessor) {
// 实际项目里从 JWT Token 或 Principal 获取
return (String) headerAccessor.getSessionAttributes().get("userId");
}
}2.4 WebSocket 连接事件监听
@Component
@Slf4j
public class WebSocketEventListener {
private final CollabSessionManager sessionManager;
private final SimpMessagingTemplate messagingTemplate;
@EventListener
public void handleWebSocketConnectListener(SessionConnectedEvent event) {
StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());
String sessionId = headerAccessor.getSessionId();
log.info("WebSocket 连接建立,sessionId: {}", sessionId);
}
@EventListener
public void handleWebSocketDisconnectListener(SessionDisconnectEvent event) {
StompHeaderAccessor headerAccessor = StompHeaderAccessor.wrap(event.getMessage());
String userId = (String) headerAccessor.getSessionAttributes().get("userId");
String roomId = (String) headerAccessor.getSessionAttributes().get("roomId");
if (userId != null) {
sessionManager.userLeave(userId);
if (roomId != null) {
// 广播用户离开事件
messagingTemplate.convertAndSend(
"/topic/room." + roomId,
WsEvent.builder()
.type("USER_LEFT")
.data(Map.of(
"userId", userId,
"roomUsers", sessionManager.getRoomUsers(roomId)
))
.build()
);
}
log.info("用户 {} 断开连接,已从房间 {} 移除", userId, roomId);
}
}
}三、前端实现(React 示例)
// useCollabAI.js - 协作 AI 的 WebSocket Hook
import { useEffect, useRef, useState, useCallback } from 'react';
import SockJS from 'sockjs-client';
import { Client } from '@stomp/stompjs';
export function useCollabAI(roomId, userId, username) {
const clientRef = useRef(null);
const [connected, setConnected] = useState(false);
const [roomUsers, setRoomUsers] = useState([]);
const [publicMessages, setPublicMessages] = useState([]);
const [privateMessages, setPrivateMessages] = useState([]);
// key: questionId, value: 累积的 token 字符串
const [streamingAnswers, setStreamingAnswers] = useState({});
useEffect(() => {
const client = new Client({
webSocketFactory: () => new SockJS('/ws/collab'),
reconnectDelay: 5000,
onConnect: () => {
setConnected(true);
console.log('WebSocket 连接成功');
// 订阅房间广播
client.subscribe(`/topic/room.${roomId}`, (message) => {
const event = JSON.parse(message.body);
handleRoomEvent(event);
});
// 订阅私人消息
client.subscribe(`/user/queue/messages`, (message) => {
const event = JSON.parse(message.body);
handlePrivateEvent(event);
});
// 加入房间
client.publish({
destination: '/app/room.join',
body: JSON.stringify({ roomId, userId, username })
});
},
onDisconnect: () => {
setConnected(false);
console.log('WebSocket 断开连接');
}
});
client.activate();
clientRef.current = client;
return () => {
client.deactivate();
};
}, [roomId, userId, username]);
const handleRoomEvent = useCallback((event) => {
switch (event.type) {
case 'USER_JOINED':
case 'USER_LEFT':
setRoomUsers(event.data.roomUsers);
break;
case 'PUBLIC_QUESTION':
setPublicMessages(prev => [...prev, {
id: event.data.questionId,
type: 'question',
asker: event.data.asker,
content: event.data.question,
answer: '',
streaming: false
}]);
break;
case 'AI_PUBLIC_START':
setPublicMessages(prev => prev.map(msg =>
msg.id === event.data.questionId
? { ...msg, streaming: true }
: msg
));
break;
case 'AI_PUBLIC_TOKEN':
// 追加 token 到对应消息
setPublicMessages(prev => prev.map(msg =>
msg.id === event.data.questionId
? { ...msg, answer: msg.answer + event.data.token }
: msg
));
break;
case 'AI_PUBLIC_DONE':
setPublicMessages(prev => prev.map(msg =>
msg.id === event.data.questionId
? { ...msg, streaming: false, answer: event.data.fullAnswer }
: msg
));
break;
}
}, []);
const handlePrivateEvent = useCallback((event) => {
switch (event.type) {
case 'AI_TOKEN':
setStreamingAnswers(prev => ({
...prev,
[event.data.questionId]:
(prev[event.data.questionId] || '') + event.data.token
}));
break;
case 'AI_DONE':
const fullAnswer = streamingAnswers[event.data.questionId] || '';
setPrivateMessages(prev => [...prev, {
id: event.data.questionId,
type: 'ai',
content: fullAnswer
}]);
// 清理 streaming state
setStreamingAnswers(prev => {
const next = { ...prev };
delete next[event.data.questionId];
return next;
});
break;
}
}, [streamingAnswers]);
// 发送私人 AI 问题
const askPrivate = useCallback((question) => {
const questionId = Date.now().toString();
// 先添加用户消息到私人历史
setPrivateMessages(prev => [...prev, {
id: questionId + '_q',
type: 'user',
content: question
}]);
clientRef.current?.publish({
destination: '/app/ai.ask.private',
body: JSON.stringify({ questionId, question })
});
}, []);
// 发送公开 AI 问题
const askPublic = useCallback((question) => {
const questionId = Date.now().toString();
clientRef.current?.publish({
destination: '/app/ai.ask.public',
body: JSON.stringify({ questionId, question })
});
}, []);
return {
connected,
roomUsers,
publicMessages,
privateMessages,
streamingAnswers,
askPrivate,
askPublic
};
}四、多实例部署:状态共享问题
单机可以用上面的方案。但如果部署了多个实例,一个用户连接到实例 A,另一个用户连接到实例 B,实例 A 的广播到不了实例 B 上的用户。
解决方案:用 Redis Pub/Sub 作为消息代理。
@Configuration
public class MultiInstanceWebSocketConfig {
/**
* 使用 RabbitMQ 作为外部消息代理(比 Redis 更适合 WebSocket 场景)
*/
@Bean
public MessageBrokerRegistry configureBroker(MessageBrokerRegistry config) {
// 使用外部 RabbitMQ 代理
config.enableStompBrokerRelay("/topic", "/queue")
.setRelayHost("${rabbitmq.host}")
.setRelayPort(61613)
.setClientLogin("${rabbitmq.username}")
.setClientPasscode("${rabbitmq.password}")
.setSystemLogin("${rabbitmq.username}")
.setSystemPasscode("${rabbitmq.password}");
config.setApplicationDestinationPrefixes("/app");
config.setUserDestinationPrefix("/user");
return config;
}
}同时,CollabSessionManager 中的内存状态也需要迁移到 Redis:
@Component
public class RedisCollabSessionManager {
private final RedisTemplate<String, Object> redisTemplate;
private static final String ROOM_USERS_KEY = "room:users:";
private static final String USER_SESSION_KEY = "user:session:";
public void userJoin(String userId, String roomId, String username) {
// 房间成员列表
redisTemplate.opsForSet().add(ROOM_USERS_KEY + roomId, userId);
// 用户会话信息
Map<String, String> sessionData = Map.of(
"userId", userId,
"username", username,
"roomId", roomId,
"joinTime", Instant.now().toString()
);
redisTemplate.opsForHash().putAll(USER_SESSION_KEY + userId, sessionData);
redisTemplate.expire(USER_SESSION_KEY + userId, Duration.ofHours(24));
}
// 对话历史用 Redis List
public void addToUserHistory(String userId, String role, String content) {
String key = "user:history:" + userId;
redisTemplate.opsForList().rightPush(key,
Map.of("role", role, "content", content));
// 只保留最近 40 条
redisTemplate.opsForList().trim(key, -40, -1);
redisTemplate.expire(key, Duration.ofHours(24));
}
}五、真实场景:多人协作写作工具
把上面这套技术方案,应用到多人协作写作工具里:
场景描述:
- 4-8 个运营同学同时在线
- 可以互相看到对方的写作进度(实时光标位置)
- 每个人可以私下问 AI「这段话怎么改」
- 可以发起公开讨论「@全体,这个标题哪个更好」
实际问题和解法:
问题 1:AI 流式输出太快,前端渲染卡顿
解法:对 token 做批处理,每 50ms 批量推送一次,而不是每个 token 都触发 WebSocket 发送:
// 使用 buffer 操作批量推送
chatClient.prompt()
.user(question)
.stream()
.content()
.buffer(Duration.ofMillis(50)) // 每50ms收集一次
.subscribe(tokens -> {
String batch = String.join("", tokens);
broadcastToRoom(roomId, WsEvent.builder()
.type("AI_TOKEN_BATCH")
.data(Map.of("questionId", questionId, "tokens", batch))
.build());
});问题 2:用户刷新页面后历史消息丢失
解法:房间公共历史存 Redis,用户重连后先拉历史:
@MessageMapping("/room.reconnect")
public void reconnect(@Payload ReconnectMessage message,
SimpMessageHeaderAccessor header) {
String userId = getUserId(header);
// 重新注册用户
sessionManager.userJoin(userId, message.roomId(), message.username());
// 推送历史(最近50条公共消息)
sendToUser(userId, WsEvent.builder()
.type("ROOM_HISTORY")
.data(Map.of(
"history", sessionManager.getRecentPublicHistory(message.roomId(), 50),
"roomUsers", sessionManager.getRoomUsers(message.roomId())
))
.build());
}问题 3:AI 回答中间网络断了,如何续传
解法:AI 流式输出的中间状态存 Redis,重连后可以续传:
// 存储流式输出的中间状态
private void saveStreamingState(String questionId, String accumulatedAnswer) {
redisTemplate.opsForValue().set(
"streaming:" + questionId,
accumulatedAnswer,
Duration.ofMinutes(5) // 5分钟内有效
);
}
// 用户重连时检查是否有未完成的流式输出
private void checkAndResendIncompleteAnswer(String userId, String questionId) {
String partial = (String) redisTemplate.opsForValue().get("streaming:" + questionId);
if (partial != null) {
sendToUser(userId, WsEvent.builder()
.type("AI_PARTIAL_ANSWER")
.data(Map.of("questionId", questionId, "partial", partial))
.build());
}
}六、性能数据
实际运行数据(100 并发用户,每个用户每分钟 2 次 AI 问答):
- WebSocket 连接数:100
- 每秒消息数峰值:约 800 条(流式 token 批量推送)
- 单次公开 AI 问答广播延迟(服务端到客户端):P50 8ms,P99 45ms
- 内存消耗:约 180MB(含用户历史)
- CPU:峰值 35%(2 core)
这个规模对单台 4 核 8GB 的服务器来说完全没压力。如果用户量增加到 1000+,需要引入 RabbitMQ 做多实例扩展。
七、小结
多人协作 AI 工具的核心工程问题有两个:
状态隔离:私人对话历史按 userId 隔离,公共历史按 roomId 共享,两者不互相干扰。这部分设计清晰就不难实现。
流式 + 广播:AI 的流式 token 通过 WebSocket 消息帧推送,私人回答单播给用户,公开回答广播给房间。需要注意 token 批量处理,避免每个 token 都触发 WebSocket 帧导致开销过大。
多实例部署时,要把内存状态迁移到 Redis,消息代理换成 RabbitMQ 或 Redis Pub/Sub。这是从单机到分布式的必经路,没有捷径。
