WebSocket 实时 AI 协作——多人同时用一个 AI 工具
WebSocket 实时 AI 协作——多人同时用一个 AI 工具
适读人群:全栈工程师 / AI 应用开发者 | 阅读时长:约18分钟 | 核心价值:多人实时协作 AI 工具的完整技术方案,含并发冲突处理
去年我帮一个内容团队做了个内部工具,需求很简单:多个编辑可以同时打开同一个 AI 生成的文档,协同编辑修改。
我当时想,这能有多难,不就是 WebSocket + 广播嘛。
然后我花了三周时间解决各种奇怪的问题:两个人同时让 AI 重写同一段话,结果互相覆盖;一个人在编辑,另一个人触发了 AI 生成,AI 生成完了把第一个人的编辑冲掉;AI 正在流式输出,第三个用户加入,他应该看到什么?
实时协作这个问题,本质比它看起来复杂得多。今天把我当时的技术方案和踩坑经历写出来。
核心挑战在哪里
普通的聊天室(多人发消息)比较好做,因为消息之间是独立的,不互相影响。
但多人协作编辑一个 AI 文档就不一样了:
- 状态共享:所有用户看到的文档内容必须一致
- 操作冲突:多人同时修改同一段,怎么合并?
- AI 生成的特殊性:AI 是流式输出的,生成过程中其他人能做什么?
- AI 操作的触发:谁可以触发 AI?触发时其他人知道吗?
- 用户加入时的状态同步:新加入的用户需要看到当前状态
系统架构
客户端 A ──┐
客户端 B ──┼──> WebSocket 服务器 ──> 协作会话管理器
客户端 C ──┘ |
|──> AI 服务(流式)
|──> Redis(会话状态)
|──> PostgreSQL(持久化)关键设计决策:
- WebSocket 服务器是有状态的,一个文档的所有连接最好在同一台服务器上
- 用 Redis 存储会话状态,支持多实例部署
- 操作日志持久化,支持重放和回滚
消息协议设计
好的消息协议是整个系统的基础。我设计了以下几种消息类型:
// 消息基类
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes({
@JsonSubTypes.Type(value = UserEditMessage.class, name = "USER_EDIT"),
@JsonSubTypes.Type(value = AiGenerateRequest.class, name = "AI_GENERATE_REQUEST"),
@JsonSubTypes.Type(value = AiChunkMessage.class, name = "AI_CHUNK"),
@JsonSubTypes.Type(value = AiCompleteMessage.class, name = "AI_COMPLETE"),
@JsonSubTypes.Type(value = UserJoinMessage.class, name = "USER_JOIN"),
@JsonSubTypes.Type(value = UserLeaveMessage.class, name = "USER_LEAVE"),
@JsonSubTypes.Type(value = CursorUpdateMessage.class, name = "CURSOR_UPDATE"),
@JsonSubTypes.Type(value = LockMessage.class, name = "LOCK"),
@JsonSubTypes.Type(value = SyncStateMessage.class, name = "SYNC_STATE"),
})
public abstract class CollabMessage {
private String documentId;
private String userId;
private long timestamp;
private String messageId; // 用于去重
}
// 用户编辑操作
public class UserEditMessage extends CollabMessage {
private EditOperation operation; // INSERT / DELETE / REPLACE
private int position; // 操作位置(字符偏移量)
private int length; // 操作长度(删除/替换用)
private String content; // 插入/替换的内容
private int baseVersion; // 基于哪个版本做的操作(用于冲突检测)
}
// 请求 AI 生成
public class AiGenerateRequest extends CollabMessage {
private String prompt; // AI 指令(比如"把这段改得更简洁")
private int selectionStart; // 要处理的文本范围(-1 表示整篇)
private int selectionEnd;
private String requestId; // 这次 AI 请求的唯一 ID
}
// AI 流式输出的一个 chunk
public class AiChunkMessage extends CollabMessage {
private String requestId; // 对应哪次 AI 请求
private String delta; // 增量内容
private int insertPosition; // 插入位置
private boolean isFinal;
}
// 段落锁(AI 生成时锁定相关段落,防止冲突)
public class LockMessage extends CollabMessage {
private LockAction action; // LOCK / UNLOCK
private int startPos;
private int endPos;
private String lockReason; // "AI_GENERATING" / "USER_EDITING"
private String holderId; // 持有锁的用户或请求 ID
}
// 新用户加入时发送的完整状态
public class SyncStateMessage extends CollabMessage {
private String content; // 当前文档完整内容
private int version; // 当前版本号
private List<ActiveUser> activeUsers; // 在线用户列表
private AiStatus aiStatus; // AI 当前状态
}Spring Boot WebSocket 服务端
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Autowired
private CollabWebSocketHandler handler;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(handler, "/collab/{documentId}")
.setAllowedOrigins("*")
// 开启 SockJS 降级支持
.withSockJS();
}
}
@Component
@Slf4j
public class CollabWebSocketHandler extends TextWebSocketHandler {
@Autowired
private DocumentSessionManager sessionManager;
@Autowired
private ConflictResolver conflictResolver;
@Autowired
private AiCollabService aiCollabService;
private final ObjectMapper objectMapper = new ObjectMapper();
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
String documentId = extractDocumentId(session);
String userId = extractUserId(session);
log.info("User {} connected to document {}", userId, documentId);
// 加入协作会话
sessionManager.addSession(documentId, userId, session);
// 给新加入用户发送当前状态
DocumentState state = sessionManager.getDocumentState(documentId);
SyncStateMessage syncMsg = SyncStateMessage.builder()
.documentId(documentId)
.userId("system")
.content(state.getContent())
.version(state.getVersion())
.activeUsers(sessionManager.getActiveUsers(documentId))
.aiStatus(state.getAiStatus())
.build();
session.sendMessage(new TextMessage(objectMapper.writeValueAsString(syncMsg)));
// 广播给其他用户:有新人加入
UserJoinMessage joinMsg = new UserJoinMessage(documentId, userId, extractUserName(session));
broadcastToOthers(documentId, userId, joinMsg);
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
String documentId = extractDocumentId(session);
String userId = extractUserId(session);
CollabMessage msg = objectMapper.readValue(message.getPayload(), CollabMessage.class);
switch (msg) {
case UserEditMessage editMsg -> handleUserEdit(documentId, userId, editMsg);
case AiGenerateRequest aiReq -> handleAiGenerateRequest(documentId, userId, aiReq);
case CursorUpdateMessage cursorMsg -> handleCursorUpdate(documentId, userId, cursorMsg);
default -> log.warn("Unknown message type: {}", msg.getClass().getSimpleName());
}
}
/**
* 处理用户编辑操作
* 关键:需要做操作变换(Operational Transformation)解决冲突
*/
private void handleUserEdit(String documentId, String userId, UserEditMessage editMsg) {
DocumentState state = sessionManager.getDocumentState(documentId);
// 检查是否有锁冲突
if (state.isLocked(editMsg.getPosition(), editMsg.getPosition() + editMsg.getLength())) {
// 告诉用户这段正在被 AI 修改,不能编辑
sendError(userId, documentId, "POSITION_LOCKED",
"该位置正在由 AI 修改,请稍后再试");
return;
}
// 版本冲突检查和解决
int currentVersion = state.getVersion();
if (editMsg.getBaseVersion() < currentVersion) {
// 客户端的操作基于旧版本,需要变换
EditOperation transformed = conflictResolver.transform(
editMsg.getOperation(),
state.getOperationsSince(editMsg.getBaseVersion())
);
if (transformed == null) {
// 无法解决冲突,拒绝操作
sendError(userId, documentId, "CONFLICT_UNRESOLVABLE",
"操作冲突无法自动解决,请刷新后重试");
return;
}
// 用变换后的操作
editMsg.setOperation(transformed);
}
// 应用操作到文档
state.applyOperation(editMsg);
// 广播给所有用户(包括操作者,让操作者确认服务器接受了)
broadcastToAll(documentId, editMsg);
}
/**
* 处理 AI 生成请求
* 关键设计:AI 生成时,相关位置需要加锁
*/
private void handleAiGenerateRequest(String documentId, String userId, AiGenerateRequest req) {
DocumentState state = sessionManager.getDocumentState(documentId);
// 检查是否已有 AI 操作在进行
if (state.isAiGenerating()) {
sendError(userId, documentId, "AI_BUSY",
"AI 正在处理另一个请求,请稍后再试");
return;
}
// 锁定相关区域
LockMessage lockMsg = LockMessage.builder()
.documentId(documentId)
.userId("system")
.action(LockMessage.LockAction.LOCK)
.startPos(req.getSelectionStart())
.endPos(req.getSelectionEnd())
.lockReason("AI_GENERATING")
.holderId(req.getRequestId())
.build();
broadcastToAll(documentId, lockMsg);
state.setLock(req.getSelectionStart(), req.getSelectionEnd(), req.getRequestId());
// 异步启动 AI 生成
aiCollabService.generateAsync(documentId, userId, req)
.doOnNext(chunk -> {
// 每来一个 token,广播给所有用户
AiChunkMessage chunkMsg = AiChunkMessage.builder()
.documentId(documentId)
.requestId(req.getRequestId())
.delta(chunk.getDelta())
.insertPosition(calculateInsertPosition(state, req))
.isFinal(chunk.isFinal())
.build();
broadcastToAll(documentId, chunkMsg);
// 同步更新服务端文档状态
if (!chunk.isFinal()) {
state.appendAiContent(chunk.getDelta());
}
})
.doOnComplete(() -> {
// AI 生成完成,解锁
state.clearLock(req.getRequestId());
LockMessage unlockMsg = LockMessage.builder()
.documentId(documentId)
.action(LockMessage.LockAction.UNLOCK)
.holderId(req.getRequestId())
.build();
broadcastToAll(documentId, unlockMsg);
// 持久化
sessionManager.saveDocumentState(documentId, state);
})
.doOnError(error -> {
// AI 生成失败,解锁并通知
state.clearLock(req.getRequestId());
state.rollbackAiContent(req.getRequestId());
broadcastToAll(documentId, createAiErrorMessage(documentId, req.getRequestId(), error));
})
.subscribe();
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) {
String documentId = extractDocumentId(session);
String userId = extractUserId(session);
sessionManager.removeSession(documentId, userId, session);
// 广播用户离开
UserLeaveMessage leaveMsg = new UserLeaveMessage(documentId, userId);
broadcastToOthers(documentId, userId, leaveMsg);
// 检查是否有未完成的 AI 请求持有锁
sessionManager.releaseLocksHeldBy(documentId, userId);
}
/**
* 广播给文档里的所有用户
* 注意:WebSocketSession.sendMessage 不是线程安全的,需要同步
*/
private void broadcastToAll(String documentId, CollabMessage message) {
String payload;
try {
payload = objectMapper.writeValueAsString(message);
} catch (JsonProcessingException e) {
log.error("Failed to serialize message", e);
return;
}
TextMessage textMessage = new TextMessage(payload);
sessionManager.getSessions(documentId).forEach(session -> {
if (session.isOpen()) {
synchronized (session) { // WebSocketSession 的 sendMessage 需要同步
try {
session.sendMessage(textMessage);
} catch (IOException e) {
log.warn("Failed to send to session {}", session.getId(), e);
}
}
}
});
}
private void broadcastToOthers(String documentId, String excludeUserId, CollabMessage message) {
// 类似 broadcastToAll,但跳过 excludeUserId 的会话
}
private String extractDocumentId(WebSocketSession session) {
// 从 URI 路径里提取,比如 /collab/{documentId}
String path = session.getUri().getPath();
return path.substring(path.lastIndexOf('/') + 1);
}
private String extractUserId(WebSocketSession session) {
// 从 HTTP Header 或 Query Parameter 提取已认证的用户 ID
return (String) session.getAttributes().get("userId");
}
}冲突解决:操作变换(OT)的简化实现
完整的 OT 算法非常复杂,Google Docs 背后有一整个学术分支。对于大多数 AI 协作工具,不需要完整的 OT,一个简化版足够了。
@Service
public class ConflictResolver {
/**
* 将一个操作变换,使其可以在已有一系列操作之后应用
* 简化实现:只处理最常见的插入/删除冲突
*/
public EditOperation transform(EditOperation op, List<EditOperation> concurrentOps) {
EditOperation current = op;
for (EditOperation concurrent : concurrentOps) {
current = transformAgainst(current, concurrent);
if (current == null) return null; // 无法解决
}
return current;
}
/**
* 将 op 变换,使其在 concurrent 已经执行之后仍然正确
*/
private EditOperation transformAgainst(EditOperation op, EditOperation concurrent) {
// 两个操作都是插入
if (op.isInsert() && concurrent.isInsert()) {
if (concurrent.getPosition() <= op.getPosition()) {
// concurrent 在 op 之前插入了内容,op 的位置需要后移
return op.withPosition(op.getPosition() + concurrent.getContent().length());
}
return op; // concurrent 在 op 之后,不影响
}
// op 是插入,concurrent 是删除
if (op.isInsert() && concurrent.isDelete()) {
int delStart = concurrent.getPosition();
int delEnd = concurrent.getPosition() + concurrent.getLength();
if (op.getPosition() <= delStart) {
return op; // op 在删除区域之前,不影响
} else if (op.getPosition() >= delEnd) {
return op.withPosition(op.getPosition() - concurrent.getLength());
} else {
// op 要插入的位置被删掉了,插到删除位置的开头
return op.withPosition(delStart);
}
}
// op 是删除,concurrent 是插入
if (op.isDelete() && concurrent.isInsert()) {
if (concurrent.getPosition() <= op.getPosition()) {
return op.withPosition(op.getPosition() + concurrent.getContent().length());
}
// 判断 concurrent 是否在 op 的删除范围内
if (concurrent.getPosition() < op.getPosition() + op.getLength()) {
// 需要扩展删除范围
return op.withLength(op.getLength() + concurrent.getContent().length());
}
return op;
}
// 两个都是删除:这是最复杂的情况,简化处理
if (op.isDelete() && concurrent.isDelete()) {
int opStart = op.getPosition();
int opEnd = op.getPosition() + op.getLength();
int concStart = concurrent.getPosition();
int concEnd = concurrent.getPosition() + concurrent.getLength();
// 完全不重叠
if (opEnd <= concStart) return op;
if (opStart >= concEnd) return op.withPosition(opStart - concurrent.getLength());
// 有重叠:两个人同时删同一段,选择合并(取并集)
// 返回 null 让上层处理
return null;
}
return op;
}
}前端 JavaScript 客户端关键代码
class CollabEditor {
constructor(documentId, userId) {
this.documentId = documentId;
this.userId = userId;
this.version = 0;
this.pendingOps = []; // 还未被服务器确认的操作
this.ws = null;
}
connect() {
this.ws = new WebSocket(`/collab/${this.documentId}?userId=${this.userId}`);
this.ws.onmessage = (event) => {
const msg = JSON.parse(event.data);
this.handleMessage(msg);
};
}
handleMessage(msg) {
switch(msg.type) {
case 'SYNC_STATE':
// 收到初始状态,填充编辑器
this.editor.setValue(msg.content);
this.version = msg.version;
this.renderActiveUsers(msg.activeUsers);
this.renderAiStatus(msg.aiStatus);
break;
case 'USER_EDIT':
if (msg.userId !== this.userId) {
// 其他用户的操作,应用到本地编辑器
this.applyRemoteEdit(msg);
}
// 无论是谁的操作,更新版本号
this.version++;
break;
case 'AI_CHUNK':
// AI 流式输出,实时更新编辑器
this.applyAiChunk(msg);
break;
case 'LOCK':
// 高亮锁定区域,提示其他用户
if (msg.action === 'LOCK') {
this.editor.addMark(msg.startPos, msg.endPos, 'ai-generating');
this.showNotification(`AI 正在修改这段内容...`);
} else {
this.editor.removeMark('ai-generating');
}
break;
case 'USER_JOIN':
this.addActiveUser(msg.userId, msg.userName);
break;
case 'USER_LEAVE':
this.removeActiveUser(msg.userId);
break;
case 'CURSOR_UPDATE':
// 显示其他用户的光标位置
this.renderRemoteCursor(msg.userId, msg.position);
break;
}
}
// 用户本地编辑时
onLocalEdit(operation) {
const msg = {
type: 'USER_EDIT',
documentId: this.documentId,
userId: this.userId,
operation: operation,
baseVersion: this.version,
messageId: generateUUID()
};
this.pendingOps.push(msg);
this.ws.send(JSON.stringify(msg));
}
// 请求 AI 生成
requestAiGenerate(prompt, selectionStart, selectionEnd) {
const msg = {
type: 'AI_GENERATE_REQUEST',
documentId: this.documentId,
userId: this.userId,
prompt: prompt,
selectionStart: selectionStart,
selectionEnd: selectionEnd,
requestId: generateUUID()
};
this.ws.send(JSON.stringify(msg));
}
applyAiChunk(msg) {
const editor = this.editor;
if (!msg.isFinal) {
// 在指定位置插入增量内容
editor.insertAt(msg.insertPosition, msg.delta);
}
}
}几个设计细节
1. AI 生成时为什么要加锁
不加锁会发生什么:AI 正在流式输出第 3 段,同时用户 B 在编辑第 3 段的开头。AI 输出完毕后,用户 B 的编辑和 AI 的内容交叉在一起,变成乱码。
锁的粒度是段落级别,不是整个文档,这样用户还可以编辑其他部分。
2. 新用户加入时看到 AI 正在生成
SyncStateMessage 里包含 aiStatus,如果 AI 在生成,客户端收到初始状态后会显示"AI 正在生成中..."的状态,并等待后续的 AI_CHUNK 消息。不会出现"加入了看到一半内容不知道发生什么"的情况。
3. 断线重连后的状态恢复
WebSocket 断线后,客户端重连时发送自己的最后版本号,服务端返回这之后所有的操作增量,让客户端追上来。这比发送完整文档内容要高效,特别是文档很大的时候。
多人协作这个需求看起来简单,但细节非常多。我在这个项目上花的时间比预估的多了 50%,几乎全在处理边界情况。
如果你也在做类似的项目,建议先把消息协议设计好,把所有的消息类型和触发条件都想清楚,再动手写代码。消息协议是整个系统的契约,改起来牵一发动全身。
