Spring Boot WebSocket 实战——实时消息推送、心跳保活、集群广播完整方案
Spring Boot WebSocket 实战——实时消息推送、心跳保活、集群广播完整方案
适读人群:需要实现实时消息推送功能的 Java 工程师 | 阅读时长:约20分钟 | 核心价值:掌握 Spring Boot WebSocket 的完整实战方案,覆盖单机推送、心跳保活、多节点集群广播三个核心场景
一、老李的实时通知需求
老李接到一个需求:运营后台要能实时给在线用户推送消息,比如"您有新订单待处理"、"库存预警"这类实时通知,不能让用户刷新才能看到。
他第一反应是做轮询:前端每隔 5 秒请求一次接口,看看有没有新消息。我问他:"你们有多少在线用户?"他说高峰期大概 1000 人同时在线。我算了一下:1000 × 12(每分钟 12 次)= 每分钟 12000 个请求,全是无效请求,大多数时候都没有新消息。
我说:"用 WebSocket。持久连接,有消息时服务端主动推,没消息什么都不发,流量省,延迟低。"
老李问:"那集群怎么办,我们有 3 个节点?"
这个问题问到了点子上。单机 WebSocket 好做,集群广播才是真正的技术难点。这篇文章我把单机到集群的完整方案都讲清楚。
二、Spring Boot WebSocket 基础配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>package com.example.websocket;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.simp.config.MessageBrokerRegistry;
import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
/**
* WebSocket 配置类,启用 STOMP 协议。
* STOMP(Simple Text Oriented Messaging Protocol)是 WebSocket 上层的消息协议,
* 比原始 WebSocket 更结构化,Spring 对其有完整支持。
*/
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
// 启用简单消息代理,处理 /topic(广播)和 /queue(点对点)前缀的消息
config.enableSimpleBroker("/topic", "/queue");
// 应用目标前缀:客户端发送消息时的前缀(发给 @MessageMapping 的消息)
config.setApplicationDestinationPrefixes("/app");
// 用户目标前缀:点对点消息的前缀
config.setUserDestinationPrefix("/user");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws") // WebSocket 握手端点
.setAllowedOriginPatterns("*") // 跨域配置(生产环境限制具体域名)
.withSockJS(); // 降级支持:不支持 WebSocket 的浏览器用 SockJS 兼容
}
}三、服务端推送消息
3.1 主动推送给指定用户(点对点)
package com.example.websocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;
/**
* 实时消息推送服务。
* 使用 SimpMessagingTemplate 向 WebSocket 客户端推送消息。
*/
@Service
public class NotificationPushService {
private static final Logger log = LoggerFactory.getLogger(NotificationPushService.class);
private final SimpMessagingTemplate messagingTemplate;
public NotificationPushService(SimpMessagingTemplate messagingTemplate) {
this.messagingTemplate = messagingTemplate;
}
/**
* 向指定用户推送消息(点对点)。
* 消息会发送到 /user/{userId}/queue/notification,
* 只有该用户的 WebSocket 连接能收到。
*
* @param userId 目标用户 ID
* @param message 消息体
*/
public void pushToUser(String userId, NotificationMessage message) {
try {
messagingTemplate.convertAndSendToUser(
userId,
"/queue/notification", // 相对路径,完整路径是 /user/{userId}/queue/notification
message
);
log.debug("[WS] 推送消息给用户: userId={}, type={}", userId, message.getType());
} catch (Exception e) {
log.error("[WS] 推送消息失败: userId={}", userId, e);
}
}
/**
* 广播消息给所有订阅了 /topic/broadcast 的客户端。
*
* @param message 广播消息体
*/
public void broadcast(NotificationMessage message) {
messagingTemplate.convertAndSend("/topic/broadcast", message);
log.info("[WS] 广播消息: type={}", message.getType());
}
/**
* 消息体 DTO。
*/
public static class NotificationMessage {
private String type; // 消息类型,如 ORDER_UPDATE, STOCK_ALERT
private String title; // 通知标题
private String content; // 通知内容
private long timestamp; // 发送时间戳
private Object data; // 附加数据(可选)
public NotificationMessage() {
this.timestamp = System.currentTimeMillis();
}
public static NotificationMessage of(String type, String title, String content) {
NotificationMessage msg = new NotificationMessage();
msg.type = type;
msg.title = title;
msg.content = content;
return msg;
}
public String getType() { return type; }
public void setType(String type) { this.type = type; }
public String getTitle() { return title; }
public void setTitle(String title) { this.title = title; }
public String getContent() { return content; }
public void setContent(String content) { this.content = content; }
public long getTimestamp() { return timestamp; }
public void setTimestamp(long timestamp) { this.timestamp = timestamp; }
public Object getData() { return data; }
public void setData(Object data) { this.data = data; }
}
}3.2 客户端连接时的身份验证
package com.example.websocket;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.simp.config.ChannelRegistration;
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.messaging.support.MessageHeaderAccessor;
import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer;
import java.security.Principal;
/**
* WebSocket 握手时的认证拦截器。
* 客户端连接时在请求头里带上 Token,服务端验证 Token 并设置用户 Principal。
* 后续点对点推送根据 Principal.getName() 识别用户。
*/
@Configuration
@Order(Ordered.HIGHEST_PRECEDENCE + 99)
public class WebSocketAuthConfig implements WebSocketMessageBrokerConfigurer {
private final JwtTokenService jwtTokenService;
public WebSocketAuthConfig(JwtTokenService jwtTokenService) {
this.jwtTokenService = jwtTokenService;
}
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(
message, StompHeaderAccessor.class);
// 只处理 CONNECT 命令(握手阶段)
if (accessor != null && StompCommand.CONNECT.equals(accessor.getCommand())) {
String token = accessor.getFirstNativeHeader("Authorization");
if (token != null && token.startsWith("Bearer ")) {
token = token.substring(7);
// 验证 Token,获取用户 ID
String userId = jwtTokenService.getUserIdFromToken(token);
if (userId != null) {
// 设置 Principal,后续点对点推送用这个 name 识别用户
String finalUserId = userId;
accessor.setUser(new Principal() {
@Override
public String getName() { return finalUserId; }
});
}
}
}
return message;
}
});
}
// 占位接口
interface JwtTokenService {
String getUserIdFromToken(String token);
}
}四、心跳保活机制
WebSocket 连接可能因为网络波动或客户端静默而断开,心跳机制确保连接状态可感知。
服务端心跳配置:
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic", "/queue")
// 设置心跳:[服务端发送间隔(ms), 服务端接收间隔(ms)]
// 服务端每 10 秒发一次心跳,同时期望客户端每 10 秒发一次心跳
.setHeartbeatValue(new long[]{10000, 10000});
// ...
}连接事件监听(检测断连):
package com.example.websocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.event.EventListener;
import org.springframework.messaging.simp.stomp.StompHeaderAccessor;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.messaging.SessionConnectedEvent;
import org.springframework.web.socket.messaging.SessionDisconnectEvent;
import org.springframework.web.socket.messaging.SessionSubscribeEvent;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* WebSocket 连接事件监听器。
* 跟踪在线用户数,处理连接和断连事件。
*/
@Component
public class WebSocketEventListener {
private static final Logger log = LoggerFactory.getLogger(WebSocketEventListener.class);
/** 在线连接数统计 */
private final AtomicInteger onlineCount = new AtomicInteger(0);
/** 用户 ID -> 会话 ID 映射(用于了解哪些用户在线) */
private final ConcurrentHashMap<String, String> userSessionMap = new ConcurrentHashMap<>();
@EventListener
public void handleWebSocketConnectListener(SessionConnectedEvent event) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(event.getMessage());
String userId = getUserId(accessor);
String sessionId = accessor.getSessionId();
onlineCount.incrementAndGet();
if (userId != null) {
userSessionMap.put(userId, sessionId);
}
log.info("[WS] 用户连接: userId={}, sessionId={}, 当前在线: {}",
userId, sessionId, onlineCount.get());
}
@EventListener
public void handleWebSocketDisconnectListener(SessionDisconnectEvent event) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(event.getMessage());
String userId = getUserId(accessor);
String sessionId = accessor.getSessionId();
onlineCount.decrementAndGet();
if (userId != null) {
userSessionMap.remove(userId);
}
log.info("[WS] 用户断连: userId={}, sessionId={}, 当前在线: {}",
userId, sessionId, onlineCount.get());
}
@EventListener
public void handleSubscribeEvent(SessionSubscribeEvent event) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(event.getMessage());
log.debug("[WS] 订阅: destination={}, userId={}",
accessor.getDestination(), getUserId(accessor));
}
public boolean isUserOnline(String userId) {
return userSessionMap.containsKey(userId);
}
public int getOnlineCount() {
return onlineCount.get();
}
private String getUserId(StompHeaderAccessor accessor) {
if (accessor.getUser() != null) {
return accessor.getUser().getName();
}
return null;
}
}五、集群广播方案(Redis Pub/Sub)
多节点部署时,用户 A 连在节点 1,但消息从节点 2 的业务代码触发,如何推送给用户 A?
解法:用 Redis Pub/Sub 做节点间广播。每个节点订阅同一个 Redis 频道,当业务代码触发推送时,向 Redis 发布消息,所有节点都收到消息,各自检查目标用户是否在自己这个节点上,是则推送,不是则忽略。
package com.example.websocket.cluster;
import com.example.websocket.NotificationPushService;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
/**
* 基于 Redis Pub/Sub 的集群 WebSocket 消息广播。
* 任意节点发布消息到 Redis,所有节点收到后检查目标用户是否在本节点,
* 在则推送,不在则忽略。
*/
@Component
public class ClusterWebSocketBroadcast implements MessageListener {
private static final Logger log = LoggerFactory.getLogger(ClusterWebSocketBroadcast.class);
private static final String WS_CHANNEL = "ws:notifications";
private final RedisTemplate<String, Object> redisTemplate;
private final NotificationPushService pushService;
private final WebSocketEventListener eventListener;
private final ObjectMapper objectMapper;
public ClusterWebSocketBroadcast(RedisTemplate<String, Object> redisTemplate,
NotificationPushService pushService,
WebSocketEventListener eventListener,
ObjectMapper objectMapper) {
this.redisTemplate = redisTemplate;
this.pushService = pushService;
this.eventListener = eventListener;
this.objectMapper = objectMapper;
}
/**
* 跨节点推送消息给指定用户。
* 不管目标用户在哪个节点,都能送达。
*/
public void pushToUserAcrossCluster(String userId,
NotificationPushService.NotificationMessage message) {
try {
ClusterMessage clusterMsg = new ClusterMessage(userId, message);
String json = objectMapper.writeValueAsString(clusterMsg);
// 发布到 Redis 频道,所有节点都会收到
redisTemplate.convertAndSend(WS_CHANNEL, json);
} catch (Exception e) {
log.error("[WS Cluster] 发布消息失败: userId={}", userId, e);
}
}
/**
* 接收 Redis Pub/Sub 消息(由 RedisMessageListenerContainer 回调)。
*/
@Override
public void onMessage(Message message, byte[] pattern) {
try {
String json = new String(message.getBody());
ClusterMessage clusterMsg = objectMapper.readValue(json, ClusterMessage.class);
// 检查目标用户是否在当前节点
if (eventListener.isUserOnline(clusterMsg.getUserId())) {
pushService.pushToUser(clusterMsg.getUserId(), clusterMsg.getMessage());
log.debug("[WS Cluster] 本节点推送成功: userId={}", clusterMsg.getUserId());
}
} catch (Exception e) {
log.error("[WS Cluster] 处理广播消息失败", e);
}
}
static class ClusterMessage {
private String userId;
private NotificationPushService.NotificationMessage message;
public ClusterMessage() {}
public ClusterMessage(String userId, NotificationPushService.NotificationMessage message) {
this.userId = userId;
this.message = message;
}
public String getUserId() { return userId; }
public void setUserId(String userId) { this.userId = userId; }
public NotificationPushService.NotificationMessage getMessage() { return message; }
public void setMessage(NotificationPushService.NotificationMessage message) {
this.message = message;
}
}
}注册 Redis 消息监听器:
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(
RedisConnectionFactory factory,
ClusterWebSocketBroadcast broadcast) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
container.addMessageListener(broadcast,
new PatternTopic("ws:notifications"));
return container;
}六、踩坑实录
坑1:客户端 SockJS 在 Nginx 后端无法连接
现象:直连服务没问题,经过 Nginx 代理后 WebSocket 握手失败,报 101 协议升级错误。
原因:Nginx 默认不转发 Connection: Upgrade 和 Upgrade: websocket 头,导致 WebSocket 握手失败。
解法:Nginx 配置添加:
location /ws {
proxy_pass http://backend;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host;
proxy_read_timeout 86400; # 长连接需要很长的 timeout
}这个坑我也踩过,当时对着前端 Network 面板看了很久,才发现是 Nginx 配置问题。
坑2:大量连接时内存飙高
现象:1 万个 WebSocket 连接时,JVM 堆内存从 1G 涨到 6G。
原因:每个 WebSocket 连接有一个 Session 对象,Session 里缓存了连接相关的数据。1 万个连接 × 每个 Session 占几十 KB = 数百 MB。另外 SockJS 轮询模式会为每个连接维护消息队列,如果消息积压,队列占用大量内存。
解法:评估实际并发连接数,合理设置 JVM 堆大小;开启消息队列的容量限制(SimpleBroker 有 setTaskScheduler 和 setSendTimeLimit 配置);对于超大规模连接(> 10万),考虑专用的 WebSocket 服务器(如 Netty)。
坑3:集群广播时用户收到重复消息
现象:一个用户收到了同一条消息两次或多次。
原因:用户同时有两个 WebSocket 连接(比如同时开了两个浏览器 Tab),isUserOnline 只检查有没有连接,两个节点都认为用户在线,都推送了一次。
解法:消息里加唯一的 messageId,客户端做幂等去重(收到已处理的 messageId 直接忽略)。或者服务端在推送时记录 messageId + userId 到 Redis,TTL 设置为消息有效期,重复投递时判断 Redis 里是否已有记录。
七、生产部署建议
- 连接数评估:一个 2C4G 的实例,稳定支持约 5000~8000 个 WebSocket 长连接(取决于消息频率)
- 负载均衡:WebSocket 是有状态的长连接,Nginx 负载均衡需要用 IP Hash 或 Cookie Hash,保证同一客户端每次连到同一个节点
- 心跳超时设置:服务端心跳 10s,客户端心跳 10s,连续 3 次没收到心跳则认为连接断开并清理
- 消息持久化:重要消息(如订单状态变更)应同时存 DB,不只依赖 WebSocket 实时推送。用户断线期间产生的消息,重连时做补推
