设计一个消息推送系统:长连接、消息扇出、在线离线的架构方案
设计一个消息推送系统:长连接、消息扇出、在线离线的架构方案
适读人群:Java中高级工程师、即时通讯方向技术人员 | 阅读时长:约20分钟 | 难度:★★★★☆
开篇故事
三年前我们做一个社交类App,有个功能是"关注的人发了新动态,粉丝要立刻收到通知"。产品经理画了个原型:用户发一条帖子,所有粉丝的App上5秒内要出现小红点。
最开始我们用轮询,客户端每5秒请求一次服务器看有没有新消息。上线第一天,在线用户才3万,服务端的接口QPS直接到了6万(3万用户每5秒轮询一次)。DBA打来电话说MySQL连接池耗尽了。
我花了两周时间把架构彻底重做,换成了WebSocket长连接 + 消息扇出的方案,之后系统支撑到50万日活,服务端压力反而比轮询时低了80%。这套架构思路放在今天依然适用,下面把核心设计完整讲一遍。
一、需求分析与规模估算
功能需求
- 实时推送: 平台通知、互动消息(点赞、评论、关注)实时推送到在线用户
- 离线消息: 用户离线期间收到的消息,上线后可以拉取
- 多端同步: 同一账号在手机和Web上同时在线,消息需要同步到所有端
- 消息可靠性: 重要消息(如系统通知)必须确保送达,支持ack确认
- 推送类型: 应用内推送(App打开时)+ APNs/FCM离线推送(App关闭时)
规模估算
以一个中等规模社交平台为例:
用户规模: 1000万注册用户,100万日活(DAU),峰值同时在线30万
消息量估算:
- 互动消息(点赞/评论/关注):每天3000万条
- 平均QPS:3000万 / 86400 ≈ 347 QPS
- 峰值QPS:约3000 QPS(晚8-10点)
消息扇出估算(最复杂的场景):
- 大V用户发一条动态,有200万粉丝需要收到通知
- 每天大V发帖:100条(假设10个大V,各发10条)
- 扇出消息总量:100 × 200万 = 2亿条/天(仅大V部分)
- 峰值扇出QPS:假设大V在同一时段发帖,扇出并发度极高
连接数估算:
- 30万同时在线用户,每个用户平均1.5个连接(部分用户双端)
- 总WebSocket连接数:约45万
- 单台WebSocket服务器维持连接上限:约5万连接
- 需要WebSocket服务器:约10台
存储估算:
- 每条消息约200字节
- 每天2亿条扇出后消息:40GB(仅离线消息存储7天)
- 7天离线消息总量:280GB → 需要分库分表
带宽估算:
- 每条推送消息约200字节
- 峰值3000 QPS推送 × 200字节 = 600KB/s(服务端 → 客户端方向很小)
- 主要带宽消耗在连接维持的心跳包
二、系统架构设计
设计要点说明:
负载均衡层用IP哈希(而非轮询),保证同一用户的多次连接尽量落在同一台WebSocket服务器上(但不强制,因为要支持多端)。
连接路由服务是核心:用Redis维护userId → [wsServer1, wsServer2]的映射,知道把消息推给哪台服务器。
WebSocket服务器之间不直接通信,通过Redis Pub/Sub协调:需要推送给某用户时,往Redis的对应Channel发消息,该用户所在的所有WebSocket服务器都能收到,再通过WebSocket连接推给客户端。
三、核心组件详解
3.1 连接路由机制
用户建立WebSocket连接时,在Redis里写入online:{userId} → {serverId}:{connectionId},带30秒TTL,心跳包续期。
用户如果在多端在线(手机+电脑),用Set存储:online:set:{userId} → {serverId1:connId1, serverId2:connId2}。
3.2 消息扇出策略
这是推送系统最复杂的部分,要根据粉丝数量采用不同策略:
- 普通用户(粉丝 < 1万):推模式,发帖时同步/异步地把消息写入每个粉丝的消息盒子
- 大V用户(粉丝 > 1万):拉模式,不主动扇出,粉丝上线时主动拉取关注的大V的最新动态
- 混合模式:在线粉丝主动推,离线粉丝存消息盒子
3.3 离线消息存储
离线消息不能丢,存MySQL。按userId % 64分64张表,每个用户有自己的消息盒子。上线时拉取最近未读消息,超过7天的消息归档或删除。
3.4 在线/离线状态管理
连接建立时:Redis SETEX online:{userId} 30 {serverId}
心跳包(每20秒一次):EXPIRE online:{userId} 30,续期
连接断开时:主动删除,或等TTL自然过期
判断是否在线:EXISTS online:{userId},O(1)操作
四、关键代码实现
4.1 WebSocket连接管理器
@Component
@Slf4j
public class WebSocketConnectionManager {
@Autowired
private StringRedisTemplate redisTemplate;
// 本地连接存储:userId → List<WebSocketSession>
// 同一用户可能多端在线
private final ConcurrentHashMap<String, CopyOnWriteArrayList<WebSocketSession>>
userConnections = new ConcurrentHashMap<>();
// 反向映射:sessionId → userId(用于连接断开时清理)
private final ConcurrentHashMap<String, String> sessionUserMap =
new ConcurrentHashMap<>();
private static final String SERVER_ID =
System.getenv("SERVER_ID"); // 通过环境变量区分服务器实例
private static final String ONLINE_KEY_PREFIX = "online:";
private static final int ONLINE_TTL_SECONDS = 30;
/**
* 用户连接建立
*/
public void onConnect(String userId, WebSocketSession session) {
// 1. 本地注册连接
userConnections.computeIfAbsent(userId, k -> new CopyOnWriteArrayList<>())
.add(session);
sessionUserMap.put(session.getId(), userId);
// 2. 在Redis中注册在线状态
String onlineKey = ONLINE_KEY_PREFIX + userId;
redisTemplate.opsForSet().add(onlineKey, SERVER_ID + ":" + session.getId());
redisTemplate.expire(onlineKey, ONLINE_TTL_SECONDS, TimeUnit.SECONDS);
log.info("用户连接建立, userId={}, sessionId={}, server={}",
userId, session.getId(), SERVER_ID);
}
/**
* 用户连接断开
*/
public void onDisconnect(WebSocketSession session) {
String userId = sessionUserMap.remove(session.getId());
if (userId == null) return;
// 移除本地连接
CopyOnWriteArrayList<WebSocketSession> sessions = userConnections.get(userId);
if (sessions != null) {
sessions.remove(session);
if (sessions.isEmpty()) {
userConnections.remove(userId);
// 所有连接都断了,移除在线状态
redisTemplate.delete(ONLINE_KEY_PREFIX + userId);
} else {
// 还有其他端在线,仅移除本次连接的记录
redisTemplate.opsForSet().remove(
ONLINE_KEY_PREFIX + userId,
SERVER_ID + ":" + session.getId()
);
}
}
log.info("用户连接断开, userId={}, sessionId={}", userId, session.getId());
}
/**
* 向本机上的用户推送消息
* @return true=推送成功,false=用户不在本机
*/
public boolean pushToLocal(String userId, String messageJson) {
CopyOnWriteArrayList<WebSocketSession> sessions = userConnections.get(userId);
if (sessions == null || sessions.isEmpty()) {
return false;
}
TextMessage message = new TextMessage(messageJson);
sessions.forEach(session -> {
if (session.isOpen()) {
try {
session.sendMessage(message);
} catch (IOException e) {
log.warn("WebSocket推送失败, sessionId={}", session.getId(), e);
}
}
});
return true;
}
/**
* 心跳包处理:续期在线状态
*/
public void onHeartbeat(String userId) {
String onlineKey = ONLINE_KEY_PREFIX + userId;
redisTemplate.expire(onlineKey, ONLINE_TTL_SECONDS, TimeUnit.SECONDS);
}
/**
* 检查用户在本机是否有连接
*/
public boolean isLocalOnline(String userId) {
CopyOnWriteArrayList<WebSocketSession> sessions = userConnections.get(userId);
return sessions != null && !sessions.isEmpty();
}
}4.2 消息推送服务(跨服务器分发)
@Service
@Slf4j
public class MessagePushService {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private WebSocketConnectionManager connectionManager;
@Autowired
private OfflineMessageService offlineMessageService;
@Autowired
private MobilePushService mobilePushService;
private static final String ONLINE_KEY_PREFIX = "online:";
// Redis Pub/Sub频道前缀:用于跨WebSocket服务器推送
private static final String WS_PUSH_CHANNEL_PREFIX = "ws:push:";
/**
* 推送消息给指定用户
* 自动处理:在线推WebSocket,离线推APNs/FCM,存离线消息
*/
public void pushToUser(String toUserId, PushMessage message) {
String messageJson = JsonUtils.toJson(message);
// 1. 存离线消息(无论在线与否,保证可靠性)
offlineMessageService.save(toUserId, message);
// 2. 检查是否在线
String onlineKey = ONLINE_KEY_PREFIX + toUserId;
Set<String> onlineServers = redisTemplate.opsForSet().members(onlineKey);
if (onlineServers == null || onlineServers.isEmpty()) {
// 用户不在线,发送APNs/FCM推送
mobilePushService.push(toUserId, message);
return;
}
// 3. 用户在线,通过Redis Pub/Sub广播到相关WebSocket服务器
// 每个server记录格式:serverId:sessionId
Set<String> targetServers = onlineServers.stream()
.map(s -> s.split(":")[0])
.collect(Collectors.toSet());
boolean pushedLocally = false;
for (String serverId : targetServers) {
if (serverId.equals(System.getenv("SERVER_ID"))) {
// 用户在本机,直接推
pushedLocally = connectionManager.pushToLocal(toUserId, messageJson);
} else {
// 用户在其他机器,通过Redis Pub/Sub通知
String channel = WS_PUSH_CHANNEL_PREFIX + serverId;
WsForwardMessage forwardMsg = new WsForwardMessage(toUserId, messageJson);
redisTemplate.convertAndSend(channel, JsonUtils.toJson(forwardMsg));
}
}
log.debug("消息已推送, toUserId={}, servers={}", toUserId, targetServers);
}
/**
* 批量推送(用于扇出场景)
* 使用Pipeline批量操作Redis,减少网络RTT
*/
public void pushBatch(List<String> userIds, PushMessage message) {
// 按服务器分组,批量处理
Map<String, List<String>> serverUserMap = new HashMap<>();
userIds.forEach(userId -> {
Set<String> onlineServers = redisTemplate.opsForSet()
.members(ONLINE_KEY_PREFIX + userId);
if (onlineServers != null && !onlineServers.isEmpty()) {
onlineServers.forEach(serverInfo -> {
String serverId = serverInfo.split(":")[0];
serverUserMap.computeIfAbsent(serverId, k -> new ArrayList<>())
.add(userId);
});
} else {
// 离线用户批量推APNs
mobilePushService.pushAsync(userId, message);
}
});
// 按服务器批量推送
serverUserMap.forEach((serverId, users) -> {
if (serverId.equals(System.getenv("SERVER_ID"))) {
users.forEach(userId ->
connectionManager.pushToLocal(userId, JsonUtils.toJson(message)));
} else {
WsBatchForwardMessage batchMsg =
new WsBatchForwardMessage(users, JsonUtils.toJson(message));
redisTemplate.convertAndSend(
WS_PUSH_CHANNEL_PREFIX + serverId,
JsonUtils.toJson(batchMsg)
);
}
});
}
}4.3 消息扇出服务
@Service
@Slf4j
public class FanoutService {
@Autowired
private UserRelationService userRelationService;
@Autowired
private MessagePushService messagePushService;
@Autowired
private OfflineMessageService offlineMessageService;
private static final int BIG_V_THRESHOLD = 10000; // 粉丝数阈值
private static final int BATCH_SIZE = 1000; // 批量扇出大小
/**
* 处理动态发布后的消息扇出
* 根据发布者的粉丝数选择推模式或拉模式
*/
@Async("fanoutExecutor")
public void fanout(String publisherId, FeedContent content) {
long followerCount = userRelationService.getFollowerCount(publisherId);
if (followerCount <= BIG_V_THRESHOLD) {
// 推模式:直接写入每个粉丝的消息盒子
doPushFanout(publisherId, content);
} else {
// 大V用拉模式:仅通知在线粉丝,离线粉丝上线时主动拉取
doPullNotify(publisherId, content);
}
}
/**
* 推模式扇出:批量写入粉丝消息盒子
*/
private void doPushFanout(String publisherId, FeedContent content) {
// 分页获取粉丝列表,避免一次加载太多
int page = 0;
List<String> followerBatch;
do {
followerBatch = userRelationService.getFollowers(publisherId, page++, BATCH_SIZE);
if (followerBatch.isEmpty()) break;
// 批量写离线消息
offlineMessageService.saveBatch(followerBatch, content);
// 批量推送在线消息
PushMessage pushMsg = PushMessage.builder()
.type(MessageType.NEW_FEED)
.senderId(publisherId)
.contentId(content.getId())
.preview(content.getTextPreview())
.timestamp(System.currentTimeMillis())
.build();
messagePushService.pushBatch(followerBatch, pushMsg);
} while (followerBatch.size() == BATCH_SIZE);
}
/**
* 拉模式通知:仅通知当前在线的粉丝"有新内容了"
* 不写每个粉丝的消息盒子,粉丝下次刷新时拉取
*/
private void doPullNotify(String publisherId, FeedContent content) {
// 只推给当前在线的粉丝(从Redis在线集合中取)
int page = 0;
List<String> followerBatch;
do {
followerBatch = userRelationService.getFollowers(publisherId, page++, BATCH_SIZE);
List<String> onlineFollowers = followerBatch.stream()
.filter(this::isOnline)
.collect(Collectors.toList());
if (!onlineFollowers.isEmpty()) {
PushMessage pushMsg = PushMessage.builder()
.type(MessageType.FEED_HINT) // 提示性通知,不含全量内容
.senderId(publisherId)
.contentId(content.getId())
.build();
messagePushService.pushBatch(onlineFollowers, pushMsg);
}
} while (followerBatch.size() == BATCH_SIZE);
}
private boolean isOnline(String userId) {
return Boolean.TRUE.equals(
redisTemplate.hasKey("online:" + userId)
);
}
@Autowired
private StringRedisTemplate redisTemplate;
}4.4 离线消息服务
@Service
public class OfflineMessageService {
@Autowired
private OfflineMessageMapper offlineMessageMapper;
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 用户上线时,拉取离线消息
* 使用游标分页,避免一次拉取太多
*/
public OfflineMessageResult pullMessages(String userId, Long lastMsgId, int limit) {
List<OfflineMessage> messages;
if (lastMsgId == null || lastMsgId == 0) {
// 首次拉取:获取最新的N条
messages = offlineMessageMapper.findLatest(userId, limit);
} else {
// 续拉:获取比lastMsgId更新的消息
messages = offlineMessageMapper.findAfter(userId, lastMsgId, limit);
}
// 更新未读数为0(已读)
if (!messages.isEmpty()) {
redisTemplate.opsForValue().set(
"unread:" + userId, "0", 7, TimeUnit.DAYS
);
}
boolean hasMore = messages.size() == limit;
Long nextCursor = hasMore ? messages.get(messages.size() - 1).getId() : null;
return new OfflineMessageResult(messages, hasMore, nextCursor);
}
/**
* 保存消息到用户的离线消息盒子
*/
public void save(String userId, PushMessage message) {
OfflineMessage entity = OfflineMessage.builder()
.userId(userId)
.type(message.getType().name())
.content(JsonUtils.toJson(message))
.isRead(false)
.createTime(LocalDateTime.now())
.build();
offlineMessageMapper.insert(entity);
// 未读数+1
redisTemplate.opsForValue().increment("unread:" + userId);
}
/**
* 批量保存(扇出场景)
*/
public void saveBatch(List<String> userIds, FeedContent content) {
List<OfflineMessage> entities = userIds.stream()
.map(userId -> OfflineMessage.builder()
.userId(userId)
.type("NEW_FEED")
.content(JsonUtils.toJson(content))
.isRead(false)
.createTime(LocalDateTime.now())
.build())
.collect(Collectors.toList());
// 批量插入,每批500条
Lists.partition(entities, 500)
.forEach(offlineMessageMapper::batchInsert);
// 批量更新未读数(Pipeline操作)
redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
userIds.forEach(userId ->
connection.stringCommands()
.incr(("unread:" + userId).getBytes())
);
return null;
});
}
}五、扩展性设计
从10万连接扩展到1000万连接
问题: 单台服务器能维持约5万WebSocket连接(每个连接约20KB内存),10万连接需要2台,1000万连接需要200台。
方案一:连接层与业务层分离
WebSocket服务器只负责维持连接和转发消息,不处理业务逻辑。业务服务器处理消息,通过消息队列与WebSocket服务器通信。这样可以独立扩展连接层和业务层。
方案二:按用户ID分片
把用户ID按哈希分配到不同的WebSocket服务器集群,每个集群只管理一部分用户。路由信息存在Redis中,其他服务通过Redis路由找到目标服务器。
方案三:利用Netty优化连接数
Spring WebSocket默认基于Servlet容器,每个连接会占用一个线程。换用Netty实现WebSocket服务器,用Reactor模型处理连接,单台服务器能维持100万连接(Netty每连接约10KB内存)。
消息扇出的性能优化
大V(200万粉丝)发帖时,如果同步扇出需要写200万条消息,即使每条写入100微秒,也需要200秒,完全不可接受。
解决方案:
- 分级延迟扇出: 在线用户立即通知,离线用户按批次异步扇出(允许5-10分钟延迟)
- 消息合并: 如果用户离线期间同一个大V发了多条帖子,合并成一条"TA发了N条新动态"通知
- 读扩散代替写扩散: 大V的帖子不进粉丝的消息盒子,用户上线时主动拉取关注列表的最新动态
六、踩坑实录
坑1:WebSocket连接在NAT后面频繁断开
用户在手机4G网络下,移动运营商的NAT设备会在2-5分钟内关闭空闲的TCP连接(即使WebSocket还开着)。导致用户以为在线,实际上消息推不过去。
解决方案:客户端每20秒发一次心跳包(Ping帧),服务端收到后回复Pong,同时刷新在线状态的Redis TTL。同时服务端每30秒检查连接活跃度,超时未收到心跳的连接主动关闭。
坑2:Redis Pub/Sub在服务器重启时消息丢失
当WebSocket服务器重启时,订阅的Redis Channel断开,这段时间内其他服务器发来的转发消息全部丢失,导致部分用户错过消息。
解决方案:改用Redis Stream替代Pub/Sub。Stream支持消费者组和消息ACK,服务器重启后可以从上次消费位置继续读取,不丢消息。
// 使用Redis Stream代替Pub/Sub
redisTemplate.opsForStream().add(
"ws:forward:" + serverId,
Collections.singletonMap("msg", messageJson)
);坑3:大V扇出任务堆积导致Kafka消费延迟
某个大V(500万粉丝)突然发了一条爆款帖子,Kafka里积压了500万条扇出任务,消费者处理不过来,导致其他正常消息也被延迟推送。
解决方案:为大V扇出任务设置独立的Kafka Topic,与普通消息的Topic隔离。大V Topic用更多的Consumer实例处理,普通消息Topic不受影响。同时对扇出任务做优先级排序,优先处理在线用户的推送。
坑4:消息重复推送
多端在线时,同一条消息可能被推送两次(手机一次,电脑一次)。虽然这在业务上是预期行为,但有时候Redis Pub/Sub的at-least-once语义加上网络重试,会导致同一端收到重复消息。
解决方案:每条消息带唯一的msgId,客户端收到消息后做去重处理(维护一个最近100条消息ID的LRU缓存)。服务端不处理重复问题,由客户端保证幂等消费。
七、总结
消息推送系统的核心是三个权衡:
- 在线实时性 vs 服务器资源: 长连接比轮询资源效率高10倍以上,但需要处理连接管理的复杂性
- 消息可靠性 vs 推送延迟: 存离线消息保可靠,异步推送保低延迟,二者可以同时实现
- 推模式吞吐 vs 大V扇出成本: 小用户推模式简单,大V必须用拉模式或延迟扇出
| 组件 | 技术选型 | 核心职责 |
|---|---|---|
| 连接维持 | Netty/Spring WebSocket | 管理用户长连接 |
| 跨服务器路由 | Redis Set + Pub/Sub | 连接路由和跨节点转发 |
| 消息扇出 | Kafka + 异步线程池 | 大V消息的批量扇出 |
| 离线消息 | MySQL分库分表 | 可靠的消息持久化 |
| 离线推送 | APNs/FCM | 用户离线时的触达 |
