AI应用的WebSocket实时通信:构建双向AI交互体验
AI应用的WebSocket实时通信:构建双向AI交互体验
一、真实故事:60%跳出率背后的产品危机
2025年2月,某企业SaaS产品的产品经理李雪把一张图表放在了早会的大屏上。
图表上,有一条红色的折线:AI对话页面的用户跳出率——60%。
"用户点开我们的AI助手,然后等待。10秒没有任何反应,然后关掉了。"
李雪说这话的时候,技术负责人王强低着头,因为他知道问题在哪:他们的AI对话采用的是请求-响应模式。用户点击发送,系统调用LLM API,LLM生成完整回答后一次性返回。平均等待时间:9.3秒。
9.3秒。对于一个交互产品来说,这是灾难性的用户体验。
王强和他的团队花了3周,将AI对话改造为WebSocket + 流式输出模式。用户看到的不再是10秒的空白等待,而是AI像人一样,一个字一个字打出来。
上线后数据:
- 跳出率:60% → 15%(下降75%)
- 会话时长:平均2.1分钟 → 5.8分钟(增长176%)
- 用户满意度NPS:28 → 62(增长121%)
- 付费转化率:3.2% → 5.7%(增长78%)
流式输出不只是个技术细节,它直接决定了用户是否愿意使用你的AI产品。
这篇文章,就是王强团队完整的技术实现。
二、WebSocket vs SSE vs 长轮询的AI场景选型
2.1 三种方案架构对比
2.2 详细选型矩阵
| 特性 | 长轮询 | SSE | WebSocket |
|---|---|---|---|
| 通信方向 | 模拟双向 | 服务器→客户端 | 全双工双向 |
| 浏览器兼容 | 全兼容 | IE不支持 | IE11+ |
| 服务器资源 | 高(连接多) | 中 | 低(持久连接) |
| 代理/防火墙 | 友好 | 较友好 | 有时被拦截 |
| 实现复杂度 | 低 | 低 | 中 |
| AI流式输出 | 可以,但有延迟 | 最适合单轮 | 最适合多轮 |
| 中断重发 | 复杂 | 内置EventSource重连 | 需要手动实现 |
| 多实例部署 | 简单 | 需要消息队列 | 需要消息队列 |
2.3 AI场景选型建议
场景A:单次AI问答(无对话历史,用户不需要实时中断)
→ 推荐 SSE
→ 实现最简单,Spring AI原生支持
场景B:多轮对话(需要对话历史,用户需要中断AI输出)
→ 推荐 WebSocket + STOMP
→ 双向通信,用户可以随时打断并发新消息
场景C:AI协同编辑(多人实时协作,AI辅助编写)
→ 推荐 WebSocket
→ 需要双向实时同步,多用户感知
场景D:移动端AI应用
→ 推荐 SSE(消耗更少电量)
→ WebSocket保活心跳会增加电量消耗王强团队的场景是多轮对话,选择了WebSocket + STOMP。
三、Spring WebSocket + STOMP实现AI对话
3.1 项目依赖
<dependencies>
<!-- Spring WebSocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
<!-- Spring AI(流式支持)-->
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-openai-spring-boot-starter</artifactId>
<version>1.0.0</version>
</dependency>
<!-- Redis(多实例消息分发)-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Security(WebSocket认证)-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
</dependencies>3.2 WebSocket配置
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Value("${websocket.allowed-origins:*}")
private String allowedOrigins;
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws/ai")
// 允许的来源(生产环境要收紧)
.setAllowedOriginPatterns(allowedOrigins.split(","))
// SockJS兼容(不支持原生WebSocket时降级)
.withSockJS()
.setStreamBytesLimit(512 * 1024) // 512KB
.setHttpMessageCacheSize(1000)
.setDisconnectDelay(30 * 1000); // 30秒断开延迟
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
// 启用内存消息代理
registry.enableSimpleBroker(
"/topic", // 广播
"/queue" // 点对点
);
// 客户端发送消息的目的地前缀
registry.setApplicationDestinationPrefixes("/app");
// 用户私有消息前缀
registry.setUserDestinationPrefix("/user");
}
@Override
public void configureWebSocketTransport(WebSocketTransportRegistration registration) {
registration
.setMessageSizeLimit(256 * 1024) // 单条消息最大256KB
.setSendBufferSizeLimit(512 * 1024) // 发送缓冲区512KB
.setSendTimeLimit(15 * 1000) // 发送超时15秒
.setTimeToFirstMessage(30 * 1000); // 首条消息30秒超时
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
// 入站消息线程池(处理客户端消息)
registration.taskExecutor()
.corePoolSize(4)
.maxPoolSize(8)
.queueCapacity(100);
}
@Override
public void configureClientOutboundChannel(ChannelRegistration registration) {
// 出站消息线程池(向客户端推送消息)
registration.taskExecutor()
.corePoolSize(8)
.maxPoolSize(20)
.queueCapacity(500);
}
}3.3 消息协议设计
// 客户端发送的消息
@Data
@NoArgsConstructor
@AllArgsConstructor
public class AiChatMessage {
private String messageId; // 消息唯一ID
private String conversationId; // 会话ID
private String content; // 用户消息内容
private String type; // CHAT | INTERRUPT | HEARTBEAT
}
// 服务端推送的消息
@Data
@Builder
public class AiChatResponse {
private String messageId; // 对应请求的messageId
private String conversationId;
private String type; // CHUNK | DONE | ERROR | INTERRUPTED
private String content; // 本次推送的内容片段
private String fullContent; // DONE时才填充完整内容
private int chunkIndex; // 块序号
private long timestamp;
private Map<String, Object> metadata; // 模型、token数等
}四、流式输出的WebSocket封装
4.1 AI流式输出核心实现
package com.saas.ai.websocket;
import org.springframework.ai.chat.ChatClient;
import org.springframework.ai.chat.StreamingChatClient;
import org.springframework.ai.chat.messages.Message;
import org.springframework.ai.chat.messages.UserMessage;
import org.springframework.ai.chat.messages.AssistantMessage;
import org.springframework.ai.chat.prompt.Prompt;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
@Service
@Slf4j
public class AiStreamingWebSocketService {
private final StreamingChatClient streamingChatClient;
private final SimpMessagingTemplate messagingTemplate;
private final ConversationHistoryService historyService;
// 记录正在进行的流(支持中断)
// key: conversationId, value: 当前流的订阅
private final ConcurrentHashMap<String, reactor.core.Disposable> activeStreams
= new ConcurrentHashMap<>();
/**
* 处理AI对话消息,推送流式输出
*/
public void handleChatMessage(String userId, AiChatMessage userMessage) {
String conversationId = userMessage.getConversationId();
String messageId = userMessage.getMessageId();
// 如果当前会话有正在进行的流,先中断
cancelActiveStream(conversationId);
// 获取对话历史
List<Message> history = historyService.getHistory(conversationId);
history.add(new UserMessage(userMessage.getContent()));
Prompt prompt = new Prompt(history);
// 构建AI输出缓冲(用于DONE时推送完整内容)
StringBuilder fullContent = new StringBuilder();
AtomicInteger chunkIndex = new AtomicInteger(0);
// 启动流式输出
Flux<String> stream = streamingChatClient.stream(prompt)
.map(chatResponse -> chatResponse.getResult().getOutput().getContent());
reactor.core.Disposable subscription = stream
.doOnNext(chunk -> {
if (chunk == null || chunk.isEmpty()) return;
fullContent.append(chunk);
// 推送每个chunk到客户端
AiChatResponse chunkResponse = AiChatResponse.builder()
.messageId(messageId)
.conversationId(conversationId)
.type("CHUNK")
.content(chunk)
.chunkIndex(chunkIndex.getAndIncrement())
.timestamp(System.currentTimeMillis())
.build();
// 推送到用户私有队列
messagingTemplate.convertAndSendToUser(
userId,
"/queue/ai-response",
chunkResponse
);
})
.doOnComplete(() -> {
String finalContent = fullContent.toString();
// 推送DONE消息(含完整内容)
AiChatResponse doneResponse = AiChatResponse.builder()
.messageId(messageId)
.conversationId(conversationId)
.type("DONE")
.content("")
.fullContent(finalContent)
.chunkIndex(chunkIndex.get())
.timestamp(System.currentTimeMillis())
.build();
messagingTemplate.convertAndSendToUser(
userId,
"/queue/ai-response",
doneResponse
);
// 保存AI回答到对话历史
historyService.addMessage(conversationId,
new AssistantMessage(finalContent));
// 清理活跃流记录
activeStreams.remove(conversationId);
log.info("对话[{}]流式输出完成,总字数: {}",
conversationId, finalContent.length());
})
.doOnError(error -> {
log.error("对话[{}]流式输出错误: {}", conversationId, error.getMessage());
AiChatResponse errorResponse = AiChatResponse.builder()
.messageId(messageId)
.conversationId(conversationId)
.type("ERROR")
.content("AI服务出现问题,请稍后重试")
.timestamp(System.currentTimeMillis())
.build();
messagingTemplate.convertAndSendToUser(
userId,
"/queue/ai-response",
errorResponse
);
activeStreams.remove(conversationId);
})
.subscribe();
// 记录活跃流(支持中断)
activeStreams.put(conversationId, subscription);
}
/**
* 中断正在进行的AI输出
*/
public void interruptStream(String userId, String conversationId) {
if (cancelActiveStream(conversationId)) {
AiChatResponse interruptedResponse = AiChatResponse.builder()
.conversationId(conversationId)
.type("INTERRUPTED")
.content("")
.timestamp(System.currentTimeMillis())
.build();
messagingTemplate.convertAndSendToUser(
userId,
"/queue/ai-response",
interruptedResponse
);
log.info("对话[{}]已中断", conversationId);
}
}
private boolean cancelActiveStream(String conversationId) {
reactor.core.Disposable existing = activeStreams.remove(conversationId);
if (existing != null && !existing.isDisposed()) {
existing.dispose();
return true;
}
return false;
}
}4.2 WebSocket控制器
@Controller
@Slf4j
public class AiWebSocketController {
private final AiStreamingWebSocketService streamingService;
/**
* 处理用户发送的AI对话消息
* 客户端发送到 /app/ai/chat
*/
@MessageMapping("/ai/chat")
public void handleChat(AiChatMessage message, Principal principal) {
String userId = principal.getName();
log.debug("收到用户[{}]消息: conversationId={}, type={}",
userId, message.getConversationId(), message.getType());
switch (message.getType()) {
case "CHAT" -> streamingService.handleChatMessage(userId, message);
case "INTERRUPT" -> streamingService.interruptStream(userId,
message.getConversationId());
default -> log.warn("未知消息类型: {}", message.getType());
}
}
/**
* 处理心跳消息
* 客户端发送到 /app/heartbeat
*/
@MessageMapping("/heartbeat")
@SendToUser("/queue/heartbeat")
public Map<String, Object> handleHeartbeat(Principal principal) {
return Map.of(
"type", "PONG",
"timestamp", System.currentTimeMillis(),
"userId", principal.getName()
);
}
}五、前端JavaScript客户端实现
5.1 完整WebSocket客户端(含重连逻辑)
// ai-websocket-client.js
class AiWebSocketClient {
constructor(options = {}) {
this.wsUrl = options.wsUrl || '/ws/ai';
this.token = options.token;
this.maxReconnectAttempts = options.maxReconnectAttempts || 5;
this.reconnectInterval = options.reconnectInterval || 3000; // 3秒
this.heartbeatInterval = options.heartbeatInterval || 30000; // 30秒
this.stompClient = null;
this.reconnectAttempts = 0;
this.heartbeatTimer = null;
this.isManualDisconnect = false;
// 回调函数
this.onChunk = options.onChunk || (() => {});
this.onDone = options.onDone || (() => {});
this.onError = options.onError || (() => {});
this.onConnected = options.onConnected || (() => {});
this.onDisconnected = options.onDisconnected || (() => {});
}
/**
* 建立WebSocket连接
*/
connect() {
const sockJS = new SockJS(this.wsUrl);
this.stompClient = Stomp.over(sockJS);
// 关闭调试日志(生产环境)
this.stompClient.debug = null;
const connectHeaders = {
'Authorization': `Bearer ${this.token}`,
'heart-beat': '25000,25000' // STOMP心跳:25秒发送,25秒接收
};
this.stompClient.connect(
connectHeaders,
(frame) => this._onConnected(frame),
(error) => this._onConnectionError(error)
);
}
/**
* 连接成功回调
*/
_onConnected(frame) {
console.log('WebSocket连接成功');
this.reconnectAttempts = 0; // 重置重连计数
// 订阅AI响应队列
this.stompClient.subscribe('/user/queue/ai-response', (message) => {
this._handleMessage(JSON.parse(message.body));
});
// 订阅心跳响应
this.stompClient.subscribe('/user/queue/heartbeat', (message) => {
const pong = JSON.parse(message.body);
console.debug('心跳响应:', pong.timestamp);
});
// 启动心跳
this._startHeartbeat();
this.onConnected();
}
/**
* 处理服务端消息
*/
_handleMessage(data) {
switch (data.type) {
case 'CHUNK':
this.onChunk(data.content, data.chunkIndex, data.conversationId);
break;
case 'DONE':
this.onDone(data.fullContent, data.conversationId, data.metadata);
break;
case 'ERROR':
this.onError(data.content, data.conversationId);
break;
case 'INTERRUPTED':
console.log('AI输出已中断');
break;
default:
console.warn('未知消息类型:', data.type);
}
}
/**
* 发送AI对话消息
*/
sendMessage(conversationId, content) {
if (!this.isConnected()) {
console.error('WebSocket未连接,无法发送消息');
return;
}
const message = {
messageId: this._generateId(),
conversationId,
content,
type: 'CHAT'
};
this.stompClient.send('/app/ai/chat', {}, JSON.stringify(message));
return message.messageId;
}
/**
* 中断AI输出
*/
interrupt(conversationId) {
if (!this.isConnected()) return;
const message = {
messageId: this._generateId(),
conversationId,
type: 'INTERRUPT'
};
this.stompClient.send('/app/ai/chat', {}, JSON.stringify(message));
}
/**
* 心跳机制
*/
_startHeartbeat() {
this.heartbeatTimer = setInterval(() => {
if (this.isConnected()) {
this.stompClient.send('/app/heartbeat', {},
JSON.stringify({ type: 'PING', timestamp: Date.now() }));
}
}, this.heartbeatInterval);
}
_stopHeartbeat() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer);
this.heartbeatTimer = null;
}
}
/**
* 连接错误处理(含重连逻辑)
*/
_onConnectionError(error) {
console.error('WebSocket连接错误:', error);
this._stopHeartbeat();
if (this.isManualDisconnect) return;
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
// 指数退避:第1次3秒,第2次6秒,第3次12秒...
const delay = this.reconnectInterval * Math.pow(2, this.reconnectAttempts - 1);
console.log(`第${this.reconnectAttempts}次重连,${delay/1000}秒后尝试...`);
setTimeout(() => {
this.connect();
}, delay);
} else {
console.error('WebSocket重连次数已达上限,请刷新页面');
this.onDisconnected('max_reconnect_exceeded');
}
}
/**
* 主动断开连接
*/
disconnect() {
this.isManualDisconnect = true;
this._stopHeartbeat();
if (this.stompClient && this.stompClient.connected) {
this.stompClient.disconnect(() => {
console.log('WebSocket连接已断开');
this.onDisconnected('manual');
});
}
}
isConnected() {
return this.stompClient && this.stompClient.connected;
}
_generateId() {
return Date.now().toString(36) + Math.random().toString(36).substr(2);
}
}
// 使用示例
const client = new AiWebSocketClient({
wsUrl: '/ws/ai',
token: localStorage.getItem('access_token'),
onChunk: (chunk, index, conversationId) => {
// 追加到显示区域
document.getElementById('ai-response').textContent += chunk;
},
onDone: (fullContent, conversationId) => {
console.log('AI回答完成,共', fullContent.length, '字');
document.getElementById('send-btn').disabled = false;
document.getElementById('stop-btn').style.display = 'none';
},
onError: (errorMsg) => {
alert('AI服务暂时不可用: ' + errorMsg);
},
onConnected: () => {
document.getElementById('status').textContent = '已连接';
},
onDisconnected: (reason) => {
document.getElementById('status').textContent = '已断开: ' + reason;
}
});
// 页面加载时连接
client.connect();
// 发送消息
document.getElementById('send-btn').addEventListener('click', () => {
const content = document.getElementById('user-input').value;
const conversationId = getCurrentConversationId();
// 清空显示区域,禁用发送按钮
document.getElementById('ai-response').textContent = '';
document.getElementById('send-btn').disabled = true;
document.getElementById('stop-btn').style.display = 'inline';
client.sendMessage(conversationId, content);
});
// 中断AI输出
document.getElementById('stop-btn').addEventListener('click', () => {
client.interrupt(getCurrentConversationId());
document.getElementById('send-btn').disabled = false;
document.getElementById('stop-btn').style.display = 'none';
});六、心跳保活:WebSocket连接的心跳机制
6.1 心跳为什么必须实现
在生产环境中,以下情况会导致WebSocket连接"假死"(连接实际已断开,但双方不知道):
- 负载均衡器的空闲连接超时(AWS ALB默认60秒)
- NAT网关的连接超时(通常60-120秒)
- 企业防火墙的连接超时
心跳机制通过定期发送数据包,维持连接"活跃"状态。
6.2 服务端心跳实现
@Component
@Slf4j
public class WebSocketHeartbeatHandler
implements ChannelInterceptor, ApplicationEventPublisherAware {
private final ConcurrentHashMap<String, Instant> lastHeartbeatMap
= new ConcurrentHashMap<>();
@Value("${websocket.heartbeat.timeout-seconds:90}")
private int heartbeatTimeoutSeconds;
/**
* 拦截消息,更新心跳时间戳
*/
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(
message, StompHeaderAccessor.class
);
if (accessor != null) {
String sessionId = accessor.getSessionId();
// 任意消息(包括PING)都更新心跳时间
if (sessionId != null) {
lastHeartbeatMap.put(sessionId, Instant.now());
}
}
return message;
}
/**
* 定期检查心跳超时连接
*/
@Scheduled(fixedDelay = 30000) // 每30秒检查一次
public void checkHeartbeats() {
Instant timeout = Instant.now().minusSeconds(heartbeatTimeoutSeconds);
lastHeartbeatMap.entrySet().removeIf(entry -> {
if (entry.getValue().isBefore(timeout)) {
String sessionId = entry.getKey();
log.warn("WebSocket会话[{}]心跳超时,主动断开", sessionId);
// 发布会话超时事件
applicationEventPublisher.publishEvent(
new WebSocketHeartbeatTimeoutEvent(sessionId)
);
return true; // 从map中移除
}
return false;
});
}
}七、消息队列集成:多实例部署时的消息分发
7.1 多实例部署的问题
7.2 Redis Pub/Sub实现
@Configuration
public class RedisWebSocketBrokerConfig {
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(
RedisConnectionFactory connectionFactory,
WebSocketMessageDistributor distributor) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 订阅AI响应频道
container.addMessageListener(
distributor,
new PatternTopic("ai:response:*") // 通配符订阅所有用户
);
return container;
}
}@Component
@Slf4j
public class WebSocketMessageDistributor implements MessageListener {
private final SimpMessagingTemplate messagingTemplate;
private final ObjectMapper objectMapper;
// 本实例维护的WebSocket会话(用户ID → 会话ID集合)
private final ConcurrentHashMap<String, Set<String>> localSessions
= new ConcurrentHashMap<>();
/**
* 接收来自Redis的消息,转发给本实例的WebSocket用户
*/
@Override
public void onMessage(Message message, byte[] pattern) {
try {
String channel = new String(message.getChannel());
String userId = extractUserIdFromChannel(channel);
// 检查该用户是否连接在本实例
if (!localSessions.containsKey(userId)) {
return; // 用户不在本实例,忽略
}
// 反序列化并推送
AiChatResponse response = objectMapper.readValue(
message.getBody(), AiChatResponse.class
);
messagingTemplate.convertAndSendToUser(
userId,
"/queue/ai-response",
response
);
} catch (Exception e) {
log.error("Redis消息转发失败", e);
}
}
private String extractUserIdFromChannel(String channel) {
// channel格式: ai:response:{userId}
return channel.replace("ai:response:", "");
}
}@Service
@Slf4j
public class DistributedAiStreamingService {
private final StringRedisTemplate redisTemplate;
private final ObjectMapper objectMapper;
/**
* 发布AI响应到Redis(由任意实例处理AI流后推送)
*/
public void publishAiResponse(String userId, AiChatResponse response) {
try {
String channel = "ai:response:" + userId;
String message = objectMapper.writeValueAsString(response);
redisTemplate.convertAndSend(channel, message);
} catch (Exception e) {
log.error("Redis发布消息失败", e);
}
}
}八、并发管理:10000个WebSocket连接的资源控制
8.1 连接数限制设计
@Component
@Slf4j
public class WebSocketConnectionLimiter
implements ApplicationListener<SessionConnectedEvent>,
ApplicationListener<SessionDisconnectEvent> {
private static final int MAX_CONNECTIONS_PER_INSTANCE = 5000;
private static final int MAX_CONNECTIONS_PER_USER = 3;
private final AtomicInteger totalConnections = new AtomicInteger(0);
// 每个用户的连接数
private final ConcurrentHashMap<String, AtomicInteger> userConnections
= new ConcurrentHashMap<>();
@Override
public void onApplicationEvent(SessionConnectedEvent event) {
// 检查总连接数
int total = totalConnections.incrementAndGet();
if (total > MAX_CONNECTIONS_PER_INSTANCE) {
totalConnections.decrementAndGet();
// 拒绝连接(通过抛出异常触发)
throw new WebSocketConnectionLimitException(
"服务器连接数已达上限,请稍后重试"
);
}
// 检查用户连接数
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(event.getMessage());
String userId = getUserId(accessor);
if (userId != null) {
AtomicInteger userCount = userConnections.computeIfAbsent(
userId, k -> new AtomicInteger(0)
);
if (userCount.incrementAndGet() > MAX_CONNECTIONS_PER_USER) {
userCount.decrementAndGet();
totalConnections.decrementAndGet();
throw new WebSocketConnectionLimitException(
"同一用户连接数不能超过" + MAX_CONNECTIONS_PER_USER
);
}
}
log.debug("WebSocket连接建立: total={}", total);
}
@Override
public void onApplicationEvent(SessionDisconnectEvent event) {
int total = totalConnections.decrementAndGet();
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(event.getMessage());
String userId = getUserId(accessor);
if (userId != null) {
AtomicInteger userCount = userConnections.get(userId);
if (userCount != null) {
int remaining = userCount.decrementAndGet();
if (remaining <= 0) {
userConnections.remove(userId);
}
}
}
log.debug("WebSocket连接断开: total={}", total);
}
/**
* 连接统计(用于监控)
*/
public ConnectionStats getStats() {
return ConnectionStats.builder()
.totalConnections(totalConnections.get())
.uniqueUsers(userConnections.size())
.maxConnections(MAX_CONNECTIONS_PER_INSTANCE)
.utilizationPercent((double) totalConnections.get()
/ MAX_CONNECTIONS_PER_INSTANCE * 100)
.build();
}
}8.2 内存估算
每个WebSocket连接的内存占用估算:
- STOMP会话对象: ~2KB
- 消息缓冲区(发送+接收): ~32KB
- 对话历史(20条对话): ~20KB
- 线程上下文: ~1KB(使用异步,非每连接一线程)
- 合计: ~55KB/连接
10000连接: ~550MB
服务器建议: 8GB内存(留余量)九、安全:WebSocket的认证授权
9.1 JWT Token验证
@Component
@Slf4j
public class WebSocketAuthenticationInterceptor implements ChannelInterceptor {
private final JwtTokenValidator jwtValidator;
private final WebSocketSessionRegistry sessionRegistry;
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(
message, StompHeaderAccessor.class
);
if (StompCommand.CONNECT.equals(accessor.getCommand())) {
// 从STOMP CONNECT帧的Header中提取JWT
String authHeader = accessor.getFirstNativeHeader("Authorization");
if (authHeader == null || !authHeader.startsWith("Bearer ")) {
log.warn("WebSocket连接缺少Authorization header");
throw new AuthenticationException("未授权的WebSocket连接");
}
String token = authHeader.substring(7);
try {
// 验证JWT
JwtClaims claims = jwtValidator.validate(token);
String userId = claims.getSubject();
String tenantId = claims.get("tenant_id");
// 将用户信息注入Principal
UsernamePasswordAuthenticationToken auth =
new UsernamePasswordAuthenticationToken(
userId, null,
buildAuthorities(claims)
);
auth.setDetails(Map.of("tenantId", tenantId));
accessor.setUser(auth);
// 注册会话
sessionRegistry.register(
accessor.getSessionId(), userId, tenantId
);
log.debug("WebSocket认证成功: userId={}", userId);
} catch (TokenExpiredException e) {
throw new AuthenticationException("Token已过期,请重新登录");
} catch (Exception e) {
log.error("WebSocket认证失败", e);
throw new AuthenticationException("Token验证失败");
}
}
return message;
}
}9.2 WebSocket安全配置
@Configuration
public class WebSocketSecurityConfig
extends AbstractSecurityWebSocketMessageBrokerConfigurer {
@Override
protected void configureInbound(MessageSecurityMetadataSourceRegistry messages) {
messages
// CONNECT消息允许所有(认证在Interceptor中做)
.simpTypeMatchers(SimpMessageType.CONNECT).permitAll()
// 心跳消息允许认证用户
.simpDestMatchers("/app/heartbeat").authenticated()
// AI对话消息需要认证 + 特定权限
.simpDestMatchers("/app/ai/**").hasAuthority("ai:chat:basic")
// 其他消息拒绝
.anyMessage().denyAll();
}
@Override
protected boolean sameOriginDisabled() {
// 允许跨域WebSocket连接(前后端分离场景)
return true;
}
}十、监控:WebSocket连接数和消息吞吐量的监控配置
10.1 Micrometer指标暴露
@Component
public class WebSocketMetrics {
private final MeterRegistry meterRegistry;
private final WebSocketConnectionLimiter connectionLimiter;
// 自定义指标
private final Counter messagesSentCounter;
private final Counter messagesReceivedCounter;
private final DistributionSummary messageSizeDistribution;
public WebSocketMetrics(MeterRegistry meterRegistry,
WebSocketConnectionLimiter connectionLimiter) {
this.meterRegistry = meterRegistry;
this.connectionLimiter = connectionLimiter;
// 消息计数器
this.messagesSentCounter = Counter.builder("websocket.messages.sent")
.description("WebSocket发送消息总数")
.tag("type", "all")
.register(meterRegistry);
this.messagesReceivedCounter = Counter.builder("websocket.messages.received")
.description("WebSocket接收消息总数")
.register(meterRegistry);
// 消息大小分布
this.messageSizeDistribution = DistributionSummary.builder("websocket.message.size")
.description("WebSocket消息大小分布(bytes)")
.publishPercentiles(0.5, 0.95, 0.99)
.register(meterRegistry);
// 注册Gauge(实时连接数)
Gauge.builder("websocket.connections.active",
connectionLimiter, c -> c.getStats().getTotalConnections())
.description("当前活跃WebSocket连接数")
.register(meterRegistry);
Gauge.builder("websocket.connections.utilization",
connectionLimiter, c -> c.getStats().getUtilizationPercent())
.description("WebSocket连接利用率(%)")
.register(meterRegistry);
}
public void recordMessageSent(String type, int sizeBytes) {
meterRegistry.counter("websocket.messages.sent", "type", type).increment();
messageSizeDistribution.record(sizeBytes);
}
public void recordAiChunkLatency(long latencyMs) {
meterRegistry.timer("websocket.ai.chunk.latency")
.record(latencyMs, TimeUnit.MILLISECONDS);
}
}10.2 监控告警配置(Prometheus + Alertmanager)
# prometheus-rules.yml
groups:
- name: websocket_alerts
rules:
# WebSocket连接数超过80%
- alert: WebSocketConnectionsHigh
expr: websocket_connections_utilization > 80
for: 5m
labels:
severity: warning
annotations:
summary: "WebSocket连接数超过80%"
description: "当前连接利用率: {{ $value }}%,请考虑扩容"
# WebSocket连接数超过95%
- alert: WebSocketConnectionsCritical
expr: websocket_connections_utilization > 95
for: 1m
labels:
severity: critical
annotations:
summary: "WebSocket连接数接近上限"
description: "立即扩容!当前利用率: {{ $value }}%"
# AI消息发送延迟P99超过5秒
- alert: AiChunkLatencyHigh
expr: histogram_quantile(0.99, websocket_ai_chunk_latency_seconds_bucket) > 5
for: 3m
labels:
severity: warning
annotations:
summary: "AI流式输出延迟过高"
description: "P99延迟: {{ $value }}秒"10.3 关键监控指标总结
| 指标 | 正常范围 | 告警阈值 | 说明 |
|---|---|---|---|
| 活跃连接数 | < 4000 | > 4500 | 单实例5000上限 |
| 连接建立失败率 | < 0.5% | > 2% | 认证/限流问题 |
| 消息发送延迟P99 | < 100ms | > 500ms | 网络/服务器负载 |
| AI首字节延迟P99 | < 3s | > 5s | LLM服务状态 |
| 消息丢失率 | 0% | > 0.1% | 严重问题 |
| 心跳超时率 | < 1% | > 5% | 网络问题 |
FAQ
Q1:WebSocket和HTTP/2的Server Push相比,该选哪个?
HTTP/2 Server Push已在HTTP/3中被废弃,不推荐用于AI场景。WebSocket是AI实时交互的最佳选择,成熟且广泛支持。
Q2:用Nginx做反代时WebSocket不通怎么办?
Nginx默认不升级HTTP连接,需要添加配置:
location /ws/ {
proxy_pass http://backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_read_timeout 3600s; # WebSocket长连接超时时间
proxy_send_timeout 3600s;
}Q3:AWS ALB后WebSocket连接60秒就断了怎么处理?
AWS ALB的空闲连接超时默认60秒。解决方案:
- 将心跳间隔设为30秒(ALB超时的一半)
- 或者在ALB设置中将idle timeout调大到3600秒
Q4:移动端网络切换(WiFi→4G)后WebSocket断了怎么处理?
通过重连逻辑处理:
- 监听网络状态变化事件
- 网络恢复后立即触发重连
- 重连后恢复对话状态(conversationId不变)
Q5:10万在线用户需要多少台服务器?
10万用户 / 5000连接/实例 = 20个实例
加上20%冗余 = 24个实例
建议配置:8核16G × 24台(支持滚动发布不中断用户)总结
WebSocket + 流式输出是AI产品的标配基础设施,不是锦上添花,而是用户体验的生死线。
王强团队通过3周改造,把跳出率从60%降到15%,证明了一个核心结论:用户等AI等超过5秒,就会走人。
核心技术路径:
- Spring WebSocket + STOMP:最成熟的Java WebSocket方案
- Spring AI Streaming:Reactor Flux天然支持流式输出
- Redis Pub/Sub:多实例部署的消息分发
- 心跳机制:前后端各自实现,双重保障
- JWT认证:在CONNECT帧中验证,一次认证全程有效
