AI 应用的分布式锁——并发调用时的资源竞争问题
AI 应用的分布式锁——并发调用时的资源竞争问题
那是一个普通的周五下午,我正准备收拾东西下班。监控告警响了。
告警内容是:知识库文档数量异常增长,同一个文档被索引了三遍。
我打开 Milvus 管理控制台,过滤了一下,确认了:某个用户上传了一份 PDF,系统里出现了完全一模一样的三份向量数据。不是内容相似——是 word for word 完全相同,连 chunk 的切分方式都一样。
原因很快就找到了:那个用户在上传 PDF 之后,网络稍微有点慢,页面 loading 了两三秒没有反应,他以为没成功,连续点了三次提交按钮。
三个请求几乎同时到达服务器,三个都通过了"文档是否已存在"的检查(因为这时候都还没有写入),然后三个都发起了 Embedding 生成 + 向量写入的流程。最终写进去了三份重复数据。
这个事故让我意识到:AI 应用面临的并发问题和普通 CRUD 应用有本质的不同。
AI 应用特有的并发场景
普通 CRUD 应用的并发控制,我们很熟悉:数据库乐观锁、悲观锁、唯一索引约束。这些手段在 AI 场景里能用的很少,因为 AI 操作的特点是:
1. 操作链路长,耗时几秒到几十秒 文档上传 → 解析 → 分块 → Embedding(调用 API,2-5 秒)→ 向量写入 → 元数据写入
整个流程跑完要 10-30 秒,这期间如果有第二个相同请求进来,数据库层面根本没有可以加锁的"原子操作"。
2. 涉及多个异构存储 关系型数据库(MySQL)+ 向量数据库(Milvus)+ 对象存储(OSS)——这三个存储之间没有分布式事务,任何一步失败都可能留下不一致的数据。
3. 调用成本高 Embedding API 不是免费的,重复调用三次不只是数据问题,还是钱的问题。三份重复 Embedding,白白花了三倍的 Token 费用。
4. 向量库缺乏幂等性保障 关系型数据库可以用唯一索引保证幂等,Milvus 默认不做这个,同样的 ID 可以反复插入。
5. 同一用户同时发多个 AI 对话请求 用户疯狂点发送按钮(网速慢时很常见),多个并发的对话请求可能互相干扰,导致对话历史混乱。
基于 Redis 的 AI 调用去重锁
解决方案思路是:在 AI 操作的入口处加一个分布式锁,确保同一个"操作单元"(文档上传、对话请求等)同一时间只有一个实例在执行。
先定义一个通用的分布式锁工具类:
@Service
public class RedisDistributedLock {
@Autowired
private StringRedisTemplate redisTemplate;
private static final String LOCK_PREFIX = "ai:lock:";
/**
* 尝试获取锁
* @param lockKey 锁的唯一标识
* @param requestId 当前请求 ID(用于识别锁的持有者)
* @param expireMs 锁的过期时间(ms),防止死锁
* @return 是否获取成功
*/
public boolean tryLock(String lockKey, String requestId, long expireMs) {
String fullKey = LOCK_PREFIX + lockKey;
Boolean success = redisTemplate.opsForValue()
.setIfAbsent(fullKey, requestId, expireMs, TimeUnit.MILLISECONDS);
return Boolean.TRUE.equals(success);
}
/**
* 释放锁(只有持有者才能释放)
*/
public boolean unlock(String lockKey, String requestId) {
String fullKey = LOCK_PREFIX + lockKey;
// Lua 脚本保证原子性:判断 + 删除
String luaScript = """
if redis.call('GET', KEYS[1]) == ARGV[1] then
return redis.call('DEL', KEYS[1])
else
return 0
end
""";
Long result = redisTemplate.execute(
new DefaultRedisScript<>(luaScript, Long.class),
List.of(fullKey),
requestId
);
return Long.valueOf(1L).equals(result);
}
/**
* 等待获取锁(轮询,适用于需要排队执行而不是直接拒绝的场景)
*/
public boolean tryLockWithWait(String lockKey, String requestId,
long expireMs, long waitMs) {
long deadline = System.currentTimeMillis() + waitMs;
long interval = 100; // 每 100ms 重试一次
while (System.currentTimeMillis() < deadline) {
if (tryLock(lockKey, requestId, expireMs)) {
return true;
}
try {
Thread.sleep(interval);
interval = Math.min(interval * 2, 1000); // 指数退避,最多 1 秒
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
return false;
}
/**
* 检查锁是否存在(用于查询操作,不推荐用于控制逻辑)
*/
public boolean isLocked(String lockKey) {
return Boolean.TRUE.equals(redisTemplate.hasKey(LOCK_PREFIX + lockKey));
}
}文档上传的幂等性控制
针对文档上传场景,封装一个专门的幂等控制:
@Service
public class DocumentUploadService {
@Autowired
private RedisDistributedLock distributedLock;
@Autowired
private EmbeddingService embeddingService;
@Autowired
private VectorStoreService vectorStoreService;
@Autowired
private DocumentRepository documentRepository;
/**
* 文档上传的核心方法,带幂等性保护
*/
public DocumentUploadResult upload(DocumentUploadRequest request) {
String tenantId = request.getTenantId();
String userId = request.getUserId();
// 生成文档指纹(基于文件内容 hash,而不是文件名)
String contentHash = computeContentHash(request.getFileContent());
// 锁的 key:租户维度 + 文档内容 hash
// 同一租户下,相同内容的文档同一时间只允许一个上传流程
String lockKey = String.format("doc-upload:%s:%s", tenantId, contentHash);
String requestId = request.getRequestId(); // 每个请求有唯一 ID
// 获取锁,等待最多 5 秒,锁持有时间最多 60 秒(对应整个上传流程的超时时间)
boolean locked = distributedLock.tryLockWithWait(lockKey, requestId, 60_000, 5_000);
if (!locked) {
// 获取锁失败,说明有并发的相同文档正在处理
// 检查是否已经有结果(上一个请求可能已经完成了)
Optional<Document> existing = documentRepository
.findByTenantIdAndContentHash(tenantId, contentHash);
if (existing.isPresent()) {
// 文档已存在,直接返回已有结果(幂等)
return DocumentUploadResult.alreadyExists(existing.get());
}
// 文档不存在,但有并发请求正在处理,返回"处理中"状态
return DocumentUploadResult.processing("Document is being processed concurrently");
}
try {
// 双重检查:获取到锁之后,再检查一次是否已存在
Optional<Document> existing = documentRepository
.findByTenantIdAndContentHash(tenantId, contentHash);
if (existing.isPresent()) {
return DocumentUploadResult.alreadyExists(existing.get());
}
// 执行实际的上传流程
return doUpload(request, contentHash);
} finally {
// 确保锁一定被释放
distributedLock.unlock(lockKey, requestId);
}
}
private DocumentUploadResult doUpload(DocumentUploadRequest request, String contentHash) {
// 1. 解析和分块
List<DocumentChunk> chunks = documentParser.parseAndChunk(request.getFileContent());
// 2. 生成 Embedding(耗时,API 调用)
List<float[]> embeddings = embeddingService.embedBatch(
chunks.stream().map(DocumentChunk::getContent).collect(Collectors.toList())
);
// 3. 写入向量库
for (int i = 0; i < chunks.size(); i++) {
chunks.get(i).setEmbedding(embeddings.get(i));
}
vectorStoreService.insertChunks(chunks, request.getTenantId());
// 4. 写入关系型数据库
Document document = Document.builder()
.id(UUID.randomUUID().toString())
.tenantId(request.getTenantId())
.userId(request.getUserId())
.filename(request.getFilename())
.contentHash(contentHash)
.chunkCount(chunks.size())
.status(DocumentStatus.INDEXED)
.createdAt(Instant.now())
.build();
documentRepository.save(document);
return DocumentUploadResult.success(document);
}
private String computeContentHash(byte[] content) {
try {
MessageDigest md = MessageDigest.getInstance("SHA-256");
byte[] hash = md.digest(content);
return HexFormat.of().formatHex(hash);
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("SHA-256 not available", e);
}
}
}对话请求的并发控制
对话请求的并发问题稍微复杂一些。用户连续发送多条消息,必须保证:后一条消息使用的对话历史,包含前一条消息的响应。
@Service
public class ConversationService {
@Autowired
private RedisDistributedLock distributedLock;
@Autowired
private ConversationHistoryService historyService;
@Autowired
private AICapabilityService aiService;
/**
* 发送消息并获取 AI 回复
* 保证同一会话的消息串行处理(不能并发)
*/
public ChatResponse sendMessage(String conversationId, String userMessage, String userId) {
// 锁的粒度:会话 ID(而不是用户 ID,因为用户可能有多个并发会话)
String lockKey = "conversation:" + conversationId;
String requestId = UUID.randomUUID().toString();
// 对话请求通常不需要排队等待,直接拒绝并发请求
// 告知用户"上一条消息正在处理中"
boolean locked = distributedLock.tryLock(lockKey, requestId, 30_000);
if (!locked) {
throw new ConcurrentMessageException(
"Previous message is still being processed, please wait"
);
}
try {
// 获取历史对话(有了锁保证,读到的历史一定是最新的)
List<Message> history = historyService.getHistory(conversationId);
// 构建请求
ChatRequest chatRequest = ChatRequest.builder()
.userMessage(userMessage)
.history(history)
.build();
// 调用 AI
ChatResponse response = aiService.chat(chatRequest);
// 更新对话历史(加锁保证这个更新不会丢失)
historyService.appendMessage(conversationId,
Message.user(userMessage),
Message.assistant(response.getContent())
);
return response;
} finally {
distributedLock.unlock(lockKey, requestId);
}
}
}那次真实事故的完整复盘
回到开头说的那次事故,复盘一下根因和修复过程。
事故时间线:
- T+0ms:用户点第一次提交
- T+200ms:用户点第二次提交(以为没反应)
- T+400ms:用户点第三次提交
- T+0ms~T+200ms:三个请求到达服务器,全部查询"文档是否存在",全部返回"不存在"
- T+200ms~T+8000ms:三个请求并发执行 Embedding 生成
- T+8000ms~T+9000ms:三个请求依次写入向量库
- T+9000ms:数据库里出现三份完全相同的向量数据
直接损失:
- 向量库空间:三倍于正常的向量数据
- API 费用:白白花了两倍的 Embedding 费用(一份文档 500 个 chunk,大概 $0.08,两次白花了 $0.16)
- 查询质量:相同文档被召回三次,查询结果重复,对用户来说体验很差
修复方案:
- 前端加防抖:提交按钮点击后立即 disable,loading 状态期间不允许重复提交。这是第一道防线。
- 服务端加分布式锁:如上面的代码,基于文档内容 hash 的锁,保证同一内容不会并发处理。这是第二道防线。
- 向量库加幂等写入:写入前检查是否已存在相同 document_id 的数据。这是第三道防线。
// 向量库幂等写入
public void insertChunksIdempotent(List<DocumentChunk> chunks, String tenantId) {
// 检查是否已存在
List<String> existingIds = vectorStoreService.queryExistingIds(
chunks.stream().map(DocumentChunk::getId).collect(Collectors.toList()),
tenantId
);
// 只插入不存在的
List<DocumentChunk> newChunks = chunks.stream()
.filter(c -> !existingIds.contains(c.getId()))
.collect(Collectors.toList());
if (!newChunks.isEmpty()) {
vectorStoreService.insertChunks(newChunks, tenantId);
}
log.info("Idempotent insert: total={}, new={}, skipped={}",
chunks.size(), newChunks.size(), existingIds.size());
}数据修复:
发现事故后,我写了一个数据清理脚本:
@Component
public class DuplicateVectorCleaner {
@Autowired
private VectorStoreService vectorStoreService;
@Autowired
private DocumentRepository documentRepository;
/**
* 清理重复的向量数据
* 保留 createdAt 最早的那份,删除后来的重复项
*/
public CleanupResult cleanDuplicates(String tenantId) {
// 从关系型数据库找出有重复 contentHash 的文档
List<String> duplicateHashes = documentRepository
.findDuplicateContentHashesByTenantId(tenantId);
int deletedChunks = 0;
for (String hash : duplicateHashes) {
List<Document> docs = documentRepository
.findByTenantIdAndContentHashOrderByCreatedAt(tenantId, hash);
// 保留第一个(最早创建的),删除其余的
Document keeper = docs.get(0);
List<Document> toDelete = docs.subList(1, docs.size());
for (Document doc : toDelete) {
// 删除向量库中的数据
int deleted = vectorStoreService.deleteByDocumentId(doc.getId(), tenantId);
deletedChunks += deleted;
// 删除关系型数据库中的记录
documentRepository.delete(doc);
log.info("Deleted duplicate document: {}, chunks: {}", doc.getId(), deleted);
}
}
return new CleanupResult(duplicateHashes.size(), deletedChunks);
}
}Embedding 并发写入的竞争条件
还有一个更隐蔽的并发问题:增量更新场景下的竞争条件。
假设一个文档被修改了,系统要删除旧的向量、写入新的向量。如果删除和写入之间有个时间窗口,这期间发来的查询请求会查到"空洞"(旧数据已删、新数据未入)。
@Service
public class DocumentUpdateService {
@Autowired
private RedisDistributedLock distributedLock;
@Autowired
private VectorStoreService vectorStoreService;
/**
* 文档更新:先写新数据,再删旧数据("蓝绿切换"策略)
* 避免先删后写导致的短暂空洞
*/
public void updateDocument(String documentId, String tenantId,
List<DocumentChunk> newChunks) {
String lockKey = "doc-update:" + documentId;
String requestId = UUID.randomUUID().toString();
boolean locked = distributedLock.tryLockWithWait(lockKey, requestId, 120_000, 10_000);
if (!locked) {
throw new DocumentUpdateException("Document is being updated concurrently");
}
try {
// 关键:先写入新版本(带新的版本号标识)
String newVersion = UUID.randomUUID().toString();
for (DocumentChunk chunk : newChunks) {
chunk.setMetadata("version", newVersion);
chunk.setMetadata("document_id", documentId);
}
vectorStoreService.insertChunks(newChunks, tenantId);
// 原子切换:更新数据库中的"当前版本"
documentRepository.updateCurrentVersion(documentId, newVersion);
// 异步删除旧版本(不影响查询可用性)
String oldVersion = getOldVersion(documentId);
if (oldVersion != null) {
asyncCleanupService.scheduleCleanup(documentId, oldVersion, tenantId);
}
} finally {
distributedLock.unlock(lockKey, requestId);
}
}
}注意这里用了"先写新、再删旧"而不是"先删旧、再写新",这样即使中途失败,也不会出现查询空洞——最坏的结果是新旧版本短暂共存,这比出现空洞要好得多。
锁的粒度设计原则
锁的粒度选择直接影响系统的吞吐量:
- 太粗:用户维度的锁 → 同一用户只能串行处理所有操作 → 并发性极差
- 太细:chunk 维度的锁 → 锁太多,管理成本高,还可能产生死锁
- 刚好:以"完整的业务操作单元"为粒度 → 文档上传锁到文档内容 hash,对话锁到会话 ID
锁粒度参考:
文档上传 → lock key = "doc-upload:{tenantId}:{contentHash}"
文档删除 → lock key = "doc-update:{documentId}"
会话对话 → lock key = "conversation:{conversationId}"
配额写入 → 用 Redis INCRBY 原子操作,不需要独立锁
向量集合操作 → lock key = "collection-op:{collectionName}"小结
AI 应用的并发控制,比普通 CRUD 应用要复杂,因为:
- 操作链路长:从 Embedding 生成到向量写入,整个流程跑完要十几秒,期间并发请求进来拿到的状态是旧状态
- 涉及多存储:无法用单一数据库事务解决,需要分布式锁保证原子性
- 成本高:重复的 Embedding 调用不只是数据问题,是真实的 API 费用
解决方案的核心思路:以分布式锁保证同一业务单元串行处理,以内容 hash 实现幂等写入,以"先写新后删旧"策略避免查询空洞。
三道防线组合起来,才能真正防住生产环境里的并发问题。
