设计一个IM即时通讯系统:WebSocket、协议设计、消息可靠性保障
设计一个IM即时通讯系统:WebSocket、协议设计、消息可靠性保障
适读人群:Java中高级工程师、需要做IM功能的技术人员 | 阅读时长:约22分钟 | 难度:★★★★☆
开篇故事
做过企业内部IM的同学都懂,这个东西看起来不就是个聊天室吗,能有多复杂?
我们第一版用HTTP长轮询做的,效果惨不忍睹:一个100人的群聊,每次轮询100人各自发一个请求,服务器每秒要处理2000个轮询请求(100人×每人每秒20次),大部分都是空响应。消息从发出到接收延迟平均1.5秒,业务方直接拒绝上线。
换成WebSocket重做之后,同样100人群聊,服务器维持100个长连接,消息送达延迟降到100ms以内。但新问题来了:消息有时候会丢(WebSocket连接断开时在途的消息丢失),偶尔会重复(断线重连后重发了已送达的消息)。
花了几个月把消息可靠性做好,这篇文章把IM系统从协议设计到消息可靠性的完整方案讲透。
一、需求分析与规模估算
功能需求
- 单聊: 点对点私信
- 群聊: 最大支持1000人的群组聊天
- 消息类型: 文字、图片、文件、系统消息
- 消息可靠性: 消息不丢失、不重复(exactly-once语义,至少best effort once)
- 消息顺序: 同一会话内消息按发送时间排序
- 已读回执: 对方看过的消息显示"已读"
- 历史消息: 可以翻看历史聊天记录
规模估算
以一个企业IM为例:
用户规模: 10万员工,峰值同时在线5万
消息量:
- 每天消息数:1000万条
- 平均QPS:116 QPS
- 峰值QPS:约1000 QPS(早9点、下午2点高峰)
连接数:
- 5万同时在线用户
- 每用户平均1.2个连接(部分用户多端登录)
- 总连接数:6万
- 需要WebSocket服务器数量:6万 / 5万(单台上限) = 约2台(生产中建议至少3台)
存储估算:
- 每条消息约200字节
- 每天1000万条 × 200字节 = 2GB/天
- 保留历史消息1年:730GB(MySQL分库分表)
二、消息协议设计
IM系统的协议设计是最关键也最容易被忽视的部分。协议要解决的核心问题:
- 消息类型怎么区分(心跳、业务消息、ACK)
- 如何实现消息的可靠送达(序列号 + ACK机制)
- 如何防止消息重复(去重ID)
/**
* IM消息协议包
* 所有WebSocket消息都用这个格式(JSON序列化)
*/
@Data
@Builder
public class ImMessage {
private String msgId; // 客户端生成的唯一ID(UUID),用于去重
private Long msgSeq; // 服务端分配的序列号,用于排序和ACK
private Integer msgType; // 消息类型(见MessageType枚举)
private String fromUserId; // 发送者ID
private String toId; // 接收者ID(单聊=用户ID,群聊=群ID)
private String sessionType; // 会话类型:"single"/"group"
private Object content; // 消息内容(根据msgType不同结构不同)
private Long timestamp; // 发送时间戳(毫秒)
private String ackId; // ACK时对应的msgSeq(ACK类型消息使用)
public enum MessageType {
TEXT(1), // 文本消息
IMAGE(2), // 图片消息
FILE(3), // 文件消息
AUDIO(4), // 语音消息
HEARTBEAT(10), // 心跳
ACK(11), // 消息确认
READ_RECEIPT(12),// 已读回执
SYSTEM(20); // 系统消息
private final int code;
MessageType(int code) { this.code = code; }
}
}三、系统架构设计
四、消息可靠性保障(核心难题)
消息可靠性是IM系统最难的部分,要处理三个场景:
- 消息发出后,服务端没收到(网络丢包)
- 服务端收到后,推送给接收方时,接收方掉线了(消息丢失)
- 消息发出后,网络不稳定,重发导致重复(消息重复)
解决方案:客户端重传 + 服务端去重 + ACK机制
五、关键代码实现
5.1 WebSocket消息处理器
@Component
@Slf4j
public class ImWebSocketHandler extends TextWebSocketHandler {
@Autowired
private ImMessageService messageService;
@Autowired
private ConnectionManager connectionManager;
@Autowired
private MessageDeduplicator deduplicator;
@Override
public void afterConnectionEstablished(WebSocketSession session) {
String userId = extractUserId(session);
connectionManager.onConnect(userId, session);
// 连接成功后,拉取离线期间的消息
pullOfflineMessages(userId, session);
}
@Override
protected void handleTextMessage(
WebSocketSession session, TextMessage message) {
try {
ImMessage imMsg = JsonUtils.fromJson(
message.getPayload(), ImMessage.class);
String userId = extractUserId(session);
imMsg.setFromUserId(userId);
imMsg.setTimestamp(System.currentTimeMillis());
switch (imMsg.getMsgType()) {
case 10: // 心跳
handleHeartbeat(userId, session);
break;
case 11: // ACK
handleAck(userId, imMsg);
break;
case 12: // 已读回执
handleReadReceipt(userId, imMsg);
break;
default: // 业务消息
handleBusinessMessage(userId, imMsg, session);
}
} catch (Exception e) {
log.error("消息处理异常, session={}", session.getId(), e);
}
}
private void handleBusinessMessage(
String fromUserId, ImMessage msg, WebSocketSession session) {
// 1. 服务端去重:同一个msgId只处理一次
if (!deduplicator.isNew(msg.getMsgId())) {
log.debug("重复消息,忽略。msgId={}", msg.getMsgId());
// 即使重复也要回复ACK,告知客户端已收到
sendAck(session, msg.getMsgId(), 0L);
return;
}
// 2. 存储消息并分配序列号
Long msgSeq = messageService.saveAndRoute(msg);
// 3. 回复发送者ACK(告知消息已被服务端接收,附带服务端序列号)
sendAck(session, msg.getMsgId(), msgSeq);
}
private void sendAck(WebSocketSession session, String msgId, Long msgSeq) {
ImMessage ack = ImMessage.builder()
.msgType(ImMessage.MessageType.ACK.getCode())
.ackId(msgId)
.msgSeq(msgSeq)
.timestamp(System.currentTimeMillis())
.build();
try {
session.sendMessage(new TextMessage(JsonUtils.toJson(ack)));
} catch (IOException e) {
log.warn("ACK发送失败, msgId={}", msgId);
}
}
private void pullOfflineMessages(String userId, WebSocketSession session) {
// 用户上线时,推送离线期间的消息
Long lastReceivedSeq = getLastReceivedSeq(userId);
List<ImMessage> offlineMessages = messageService.getOfflineMessages(
userId, lastReceivedSeq);
offlineMessages.forEach(msg -> {
try {
session.sendMessage(new TextMessage(JsonUtils.toJson(msg)));
} catch (IOException e) {
log.warn("离线消息推送失败, userId={}, msgSeq={}",
userId, msg.getMsgSeq());
}
});
}
}5.2 消息存储与路由服务
@Service
@Slf4j
public class ImMessageService {
@Autowired
private MessageMapper messageMapper;
@Autowired
private SessionMapper sessionMapper;
@Autowired
private ConnectionManager connectionManager;
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private KafkaTemplate<String, ImMessage> kafkaTemplate;
/**
* 保存消息并路由推送
* @return 消息序列号
*/
@Transactional
public Long saveAndRoute(ImMessage msg) {
// 1. 生成消息序列号(全局单调递增)
Long msgSeq = generateMsgSeq(msg.getToId());
msg.setMsgSeq(msgSeq);
// 2. 持久化消息到MySQL
messageMapper.insert(buildMessageEntity(msg));
// 3. 更新会话信息(最新消息、未读数)
updateSession(msg);
// 4. 确定接收者列表
List<String> receivers = getReceivers(msg);
// 5. 路由推送消息
for (String receiverId : receivers) {
if (!receiverId.equals(msg.getFromUserId())) {
routeAndPush(receiverId, msg);
}
}
return msgSeq;
}
/**
* 消息路由:在线直推,离线存消息盒子并走APNs/FCM
*/
private void routeAndPush(String userId, ImMessage msg) {
// 查询用户在哪台WebSocket服务器上
String onlineServerId = redisTemplate.opsForValue()
.get("im:online:" + userId);
if (onlineServerId == null) {
// 用户离线:存消息盒子,发APNs推送
kafkaTemplate.send("im-offline-push", userId, msg);
} else if (onlineServerId.equals(getLocalServerId())) {
// 用户在本机:直接推送
connectionManager.pushToLocal(userId, JsonUtils.toJson(msg));
} else {
// 用户在其他服务器:通过Redis Pub/Sub转发
String channel = "im:push:" + onlineServerId;
ImPushCommand cmd = new ImPushCommand(userId, msg);
redisTemplate.convertAndSend(channel, JsonUtils.toJson(cmd));
}
}
/**
* 生成消息序列号
* 使用Redis自增保证单调递增(按会话维度)
*/
private Long generateMsgSeq(String sessionId) {
String key = "im:seq:" + sessionId;
return redisTemplate.opsForValue().increment(key);
}
/**
* 获取离线消息(用户上线时拉取)
* @param userId 用户ID
* @param lastSeq 客户端本地最大序列号
*/
public List<ImMessage> getOfflineMessages(String userId, Long lastSeq) {
if (lastSeq == null) lastSeq = 0L;
return messageMapper.findAfterSeq(userId, lastSeq, 100); // 最多拉100条
}
private void updateSession(ImMessage msg) {
// 更新发件人会话
sessionMapper.upsertSession(msg.getFromUserId(), msg.getToId(),
msg.getSessionType(), msg);
// 更新收件人会话(单聊:收件人=toId;群聊:所有群成员)
List<String> receivers = getReceivers(msg);
receivers.forEach(userId -> {
if (!userId.equals(msg.getFromUserId())) {
sessionMapper.upsertSession(userId, msg.getToId(),
msg.getSessionType(), msg);
// 未读数+1
redisTemplate.opsForValue().increment(
"im:unread:" + userId + ":" + msg.getToId()
);
}
});
}
private List<String> getReceivers(ImMessage msg) {
if ("single".equals(msg.getSessionType())) {
return Collections.singletonList(msg.getToId());
} else {
// 群聊:查询群成员列表
return groupMemberMapper.findMemberIds(msg.getToId());
}
}
}5.3 消息去重器
@Component
public class MessageDeduplicator {
@Autowired
private StringRedisTemplate redisTemplate;
private static final long DEDUP_TTL_SECONDS = 300; // 5分钟去重窗口
/**
* 检查消息是否是新消息(首次接收)
* 用Redis的SETNX实现,同一个msgId在5分钟内只处理一次
*/
public boolean isNew(String msgId) {
String key = "im:dedup:" + msgId;
Boolean isNew = redisTemplate.opsForValue()
.setIfAbsent(key, "1", DEDUP_TTL_SECONDS, TimeUnit.SECONDS);
return Boolean.TRUE.equals(isNew);
}
}六、扩展性设计
群聊扇出优化
1000人群聊,每发一条消息需要推送给999个人。在线用户多时,这个扇出量很大。
优化方案:懒推送
- 在线人数 < 100:同步推送给所有在线成员
- 在线人数 >= 100:只更新群消息序列号,不主动推送;客户端定期轮询(每5秒)检查新消息序列号,有变化时拉取
这样把大群的消息推送改为"拉"模式,避免大量并发推送。
七、踩坑实录
坑1:消息乱序问题
同一个对话中,消息A先发,消息B后发,但B先到,显示顺序出错了。原因是两条消息走了不同的网络路径,延迟不一样。解决方案:客户端按服务端分配的msgSeq排序显示,不按到达时间排序。
坑2:群聊消息存储量爆炸
100人群聊,每发一条消息要存100条(每个人一份)。1万个群 × 每群每天1000条 × 100人 = 10亿条/天,存储完全不可行。解决方案:群聊消息只存一份,在session_message表中记录"这条群消息还没被哪些用户读取",读取时用会话序列号判断是否有新消息。
坑3:消息通知风暴
1000人群聊有人@全体时,服务端要同时推送1000条"@通知"消息,一下子产生1000个并发写Redis/DB操作,导致服务端延迟飙升。解决方案:@全体通知不走单条推送,而是在群会话信息里设一个"全体通知时间戳",客户端拉取消息时根据这个时间戳判断是否有@全体通知,把push变成pull。
八、总结
IM系统的核心设计要点:
- 协议设计: msgId(客户端去重)+ msgSeq(服务端排序)+ ACK机制(可靠性)
- 消息可靠性三步走: 客户端超时重传 → 服务端去重 → ACK确认送达
- 连接管理: 心跳保活 + 断线重连 + 离线消息拉取
- 大群优化: 懒推送(拉模式)代替主动推送,避免扇出风暴
IM系统是所有分布式系统中对实时性和可靠性要求都很高的系统,把这套设计吃透,其他实时系统的设计就会容易很多。
