第1672篇:多Agent协作中的死锁与竞争——分布式协调的工程解法
第1672篇:多Agent协作中的死锁与竞争——分布式协调的工程解法
去年做了一个多Agent协作的项目,拼图拼到一半,测试同学跑来说:系统卡住了,啥也不干,日志也不报错。排查了两个小时,最后发现是两个Agent互相等待对方释放资源——死锁。
这次经历让我意识到,多Agent系统其实就是一个分布式并发系统,只不过"线程"变成了"Agent"。分布式系统里的所有经典问题——死锁、竞争条件、饥饿、活锁——在多Agent场景里照样会出现,而且因为Agent的执行是异步的、非确定性的,这些问题往往更难发现和复现。
今天系统地聊聊这个话题。
多Agent协作的基本模型
在正式讨论问题之前,先定义一下我们说的"多Agent协作"是什么样的架构。
常见的有三种模型:
串行管道(Pipeline):Agent A的输出作为Agent B的输入,依次执行。这种模型最简单,竞争问题不多,主要是错误传播的问题。
并行分发(Fan-out):一个调度Agent把任务分发给多个子Agent并行执行,最后汇总结果。这里开始出现并发问题。
协作网络(Mesh):多个Agent互相调用,每个Agent既是服务提供者也是服务消费者。这是最复杂的模型,死锁和竞争问题主要出在这里。
重点讲并行分发和协作网络两种场景下的问题。
死锁的四个必要条件及其在Agent中的体现
经典的死锁四个必要条件:互斥、占有并等待、不可剥夺、循环等待。在Agent场景里,这四个条件是怎么体现的?
互斥:多个Agent不能同时访问同一个工具或资源(比如某个API有并发限制,或者某个文件要独占写入)。
占有并等待:Agent A已经获得了工具T1的使用权,同时在等待工具T2。Agent B已经获得了T2,同时在等待T1。
不可剥夺:Agent A正在使用T1的过程中,不能强制中断它让T2先用。
循环等待:A等B,B等C,C等A,形成环路。
来看一个具体的死锁场景:
// Agent A的执行逻辑:先获取数据库锁,再获取缓存锁
public String agentALogic(String taskId) {
try (DatabaseLock dbLock = lockManager.acquireDbLock("table_orders")) {
// 做一些数据库操作...
String data = dbClient.query("SELECT * FROM orders WHERE id = ?", taskId);
// 再获取缓存锁
try (CacheLock cacheLock = lockManager.acquireCacheLock("cache_orders")) {
cacheClient.set("orders:" + taskId, data);
return "done";
}
}
}
// Agent B的执行逻辑:先获取缓存锁,再获取数据库锁(顺序反了!)
public String agentBLogic(String taskId) {
try (CacheLock cacheLock = lockManager.acquireCacheLock("cache_orders")) {
String cached = cacheClient.get("orders:" + taskId);
// 再获取数据库锁
try (DatabaseLock dbLock = lockManager.acquireDbLock("table_orders")) {
dbClient.update("UPDATE orders SET cached = true WHERE id = ?", taskId);
return cached;
}
}
}A持有数据库锁在等缓存锁,B持有缓存锁在等数据库锁,经典循环等待,必然死锁。
解法一:资源排序,消除循环等待
破坏循环等待条件是最优雅的死锁预防方案。做法是给所有资源编号,Agent获取多个资源时必须按编号从小到大的顺序获取。
@Service
public class OrderedLockManager {
// 给每种资源分配一个全局唯一的顺序编号
private static final Map<String, Integer> RESOURCE_ORDER = Map.of(
"db:table_orders", 1,
"db:table_inventory", 2,
"cache:orders", 3,
"cache:inventory", 4,
"queue:notifications", 5
);
/**
* 按顺序批量获取多个锁,防止死锁
*/
public AutoCloseable acquireMultipleLocks(List<String> resourceIds) {
// 按照预定顺序排序
List<String> sortedResources = resourceIds.stream()
.sorted(Comparator.comparing(id ->
RESOURCE_ORDER.getOrDefault(id, Integer.MAX_VALUE)))
.collect(Collectors.toList());
List<AutoCloseable> acquiredLocks = new ArrayList<>();
try {
for (String resourceId : sortedResources) {
Lock lock = getLock(resourceId);
if (!lock.tryLock(5, TimeUnit.SECONDS)) {
// 获取超时,释放已获得的所有锁
releaseAll(acquiredLocks);
throw new LockTimeoutException("获取锁超时: " + resourceId);
}
acquiredLocks.add(() -> lock.unlock());
}
} catch (InterruptedException e) {
releaseAll(acquiredLocks);
Thread.currentThread().interrupt();
throw new RuntimeException("锁获取被中断", e);
}
return () -> releaseAll(acquiredLocks);
}
private void releaseAll(List<AutoCloseable> locks) {
// 逆序释放锁
for (int i = locks.size() - 1; i >= 0; i--) {
try {
locks.get(i).close();
} catch (Exception ignored) {}
}
}
}
// 改造后的Agent逻辑
public String agentLogic(String taskId) {
// 明确声明需要的资源,框架保证按序获取
try (AutoCloseable locks = lockManager.acquireMultipleLocks(
List.of("db:table_orders", "cache:orders"))) {
// 执行业务逻辑...
return doWork(taskId);
}
}这个方案在静态资源场景下很好用。但问题是,Agent的执行是动态的——在执行之前,它可能并不知道自己需要哪些资源。LLM在推理过程中动态决定调用哪些工具,你没法提前声明资源列表。
解法二:超时+退让机制(Timeout & Backoff)
更适合动态场景的方案:给所有锁获取加上超时,超时后释放当前所有锁并随机等待一段时间再重试。
@Service
public class DeadlockAwareAgentExecutor {
private final Random random = new Random();
public AgentResult executeWithDeadlockProtection(
AgentTask task, int maxRetries) {
int attempt = 0;
while (attempt < maxRetries) {
try {
return executeOnce(task);
} catch (LockTimeoutException e) {
attempt++;
if (attempt >= maxRetries) {
throw new DeadlockException("任务执行失败,可能存在死锁: " + task.getId(), e);
}
// 指数退让 + 随机抖动,避免多个Agent同时重试
long waitMs = (long) (Math.pow(2, attempt) * 100)
+ random.nextInt(200);
log.warn("检测到锁超时,第{}次重试,等待{}ms, taskId={}",
attempt, waitMs, task.getId());
try {
Thread.sleep(waitMs);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
break;
}
}
}
throw new RuntimeException("超过最大重试次数");
}
private AgentResult executeOnce(AgentTask task) {
// 设置执行超时
CompletableFuture<AgentResult> future = CompletableFuture
.supplyAsync(() -> task.execute());
try {
return future.get(task.getTimeoutSeconds(), TimeUnit.SECONDS);
} catch (TimeoutException e) {
future.cancel(true);
throw new LockTimeoutException("Agent执行超时: " + task.getId());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}随机抖动很重要。如果所有Agent都固定等待相同时间再重试,等待结束后它们又会同时竞争同一批资源,死锁概率不降反升。加上随机抖动,各个Agent的重试时间错开,竞争压力自然分散。
竞争条件:比死锁更隐蔽的问题
死锁至少会让系统卡住,容易发现。竞争条件(Race Condition)更危险——系统看起来在正常运行,但数据悄悄地错了。
典型的竞争条件场景:
// 问题代码:多个Agent同时更新同一个计数器
@Tool(name = "increment_task_count")
public int incrementTaskCount(String projectId) {
// Agent A读取:count = 5
// Agent B也读取:count = 5
int current = database.getCount(projectId);
// Agent A计算:newCount = 6
// Agent B也计算:newCount = 6
int newCount = current + 1;
// Agent A写入:count = 6
// Agent B也写入:count = 6(期望是7,实际是6)
database.setCount(projectId, newCount);
return newCount;
}这是经典的"读-改-写"竞争,两个Agent都读到了5,都写了6,结果计数器只加了1而不是2。
方案一:数据库原子操作
@Tool(name = "increment_task_count")
public int incrementTaskCount(String projectId) {
// 用数据库的原子自增,避免读-改-写竞争
return database.atomicIncrement(projectId, 1);
// 对应SQL: UPDATE projects SET task_count = task_count + 1
// WHERE id = ? RETURNING task_count
}方案二:乐观锁(CAS)
@Tool(name = "update_task_status")
public boolean updateTaskStatus(String taskId, String expectedStatus,
String newStatus) {
// 带版本号的更新,只有版本匹配时才成功
int rowsAffected = database.update(
"UPDATE tasks SET status = ?, version = version + 1 " +
"WHERE id = ? AND status = ? AND version = ?",
newStatus, taskId, expectedStatus, currentVersion
);
if (rowsAffected == 0) {
// 更新失败,说明另一个Agent已经修改了这条记录
throw new OptimisticLockException("任务状态已被其他Agent修改,请重试");
}
return true;
}方案三:分布式锁
对于复杂的多步骤操作,用分布式锁把整个操作序列保护起来:
@Service
public class AgentResourceCoordinator {
private final RedissonClient redisson;
public <T> T executeExclusively(String resourceId,
Callable<T> action,
Duration timeout) throws Exception {
RLock lock = redisson.getLock("agent:lock:" + resourceId);
boolean locked = lock.tryLock(
timeout.toMillis(),
timeout.toMillis(),
TimeUnit.MILLISECONDS
);
if (!locked) {
throw new ResourceBusyException("资源被占用,无法获取锁: " + resourceId);
}
try {
return action.call();
} finally {
lock.unlock();
}
}
// Redisson还支持公平锁,防止Agent饥饿
public <T> T executeWithFairLock(String resourceId,
Callable<T> action) throws Exception {
RLock fairLock = redisson.getFairLock("agent:fairlock:" + resourceId);
fairLock.lock();
try {
return action.call();
} finally {
fairLock.unlock();
}
}
}消息队列:用异步解耦代替锁竞争
很多时候,死锁和竞争的根源是Agent之间直接通信、直接共享资源。一个更彻底的解法是引入消息队列,让Agent之间通过消息异步通信,彻底消除直接竞争。
// 生产者Agent:发布任务消息
@Service
public class OrchestratorAgent {
private final RabbitTemplate rabbitTemplate;
public void dispatchSubTasks(List<SubTask> subTasks) {
for (SubTask task : subTasks) {
// 每个子任务发到独立的队列,避免Agent之间竞争同一个任务
rabbitTemplate.convertAndSend(
"agent.tasks",
task.getType(), // 路由键,按类型分发
task,
message -> {
// 设置消息ID,用于幂等处理
message.getMessageProperties()
.setMessageId(task.getId());
return message;
}
);
}
}
}
// 消费者Agent:串行处理队列中的任务,天然避免并发竞争
@Component
public class DataProcessingAgent {
@RabbitListener(
queues = "agent.tasks.data-processing",
concurrency = "1" // 单线程消费,避免竞争
)
public void processTask(SubTask task, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
try {
doProcess(task);
// 手动ACK,确保任务完成后才从队列移除
channel.basicAck(deliveryTag, false);
} catch (RecoverableException e) {
// 可恢复的错误,重新入队
channel.basicNack(deliveryTag, false, true);
} catch (Exception e) {
// 不可恢复的错误,放入死信队列
channel.basicNack(deliveryTag, false, false);
deadLetterProcessor.handle(task, e);
}
}
}消息队列的好处是,每个Agent只处理自己队列里的消息,不同Agent之间完全解耦,天然避免竞争。代价是系统架构变复杂了,引入了消息中间件的依赖。
饥饿问题:公平调度的重要性
除了死锁和竞争,还有一个容易被忽视的问题:饥饿(Starvation)。某个Agent一直抢不到资源,永远轮不到它执行。
在优先级调度场景下,低优先级的Agent可能因为高优先级Agent持续进入而永远得不到执行机会。
@Service
public class FairAgentScheduler {
private final PriorityBlockingQueue<AgentTask> taskQueue;
private final Map<String, Integer> agentWaitTimes = new ConcurrentHashMap<>();
/**
* 动态优先级调整:等待时间越长,优先级越高(老化机制)
*/
public AgentTask pickNextTask() {
List<AgentTask> candidates = new ArrayList<>(taskQueue);
return candidates.stream()
.max(Comparator.comparingDouble(task -> {
// 基础优先级 + 等待时间奖励(老化)
int basePriority = task.getPriority();
long waitTime = System.currentTimeMillis() - task.getEnqueueTime();
double agingBonus = waitTime / 1000.0 * 0.1; // 每等1秒加0.1分
return basePriority + agingBonus;
}))
.orElse(null);
}
/**
* 资源配额管理:每个Agent类型有最大并发数限制
*/
private final Map<String, Semaphore> agentTypeSemaphores = Map.of(
"data-processing", new Semaphore(5), // 数据处理Agent最多5个并发
"llm-inference", new Semaphore(3), // LLM推理Agent最多3个并发(成本考虑)
"file-operation", new Semaphore(10) // 文件操作Agent最多10个并发
);
public void executeWithQuota(String agentType, Runnable task)
throws InterruptedException {
Semaphore semaphore = agentTypeSemaphores.get(agentType);
if (semaphore == null) {
task.run();
return;
}
semaphore.acquire();
try {
task.run();
} finally {
semaphore.release();
}
}
}活锁:比死锁更难发现
活锁比死锁更狡猾。死锁时所有人都不动,很容易发现。活锁时所有人都在"动",但实际上没有进展——就像两个人在走廊里对面走,互相让路,但每次都让向同一方向,结果谁也过不去。
// 活锁示例:两个Agent互相让路
// Agent A
public AgentResult agentA(String resource) {
while (true) {
if (tryAcquire(resource)) {
return doWork(resource);
}
// 发现资源被占用,主动释放自己的其他资源,让对方先走
releaseMyResources();
waitAndRetry(); // 等待后重试,但如果B也在做同样的事...
}
}
// Agent B也做相同的事
public AgentResult agentB(String resource) {
while (true) {
if (tryAcquire(resource)) {
return doWork(resource);
}
releaseMyResources(); // 和A同步释放,然后又同步竞争
waitAndRetry();
}
}破解活锁的关键是引入随机性:
// 加入随机等待时间,打破同步竞争
public void waitAndRetry() {
long waitTime = 100 + random.nextInt(400); // 100-500ms随机等待
Thread.sleep(waitTime);
}
// 或者引入仲裁者:当检测到多次重试失败时,上报给协调者决策
public AgentResult executeWithArbitration(String resource, int maxAttempts) {
for (int i = 0; i < maxAttempts; i++) {
try {
return tryExecute(resource);
} catch (ResourceConflictException e) {
if (i == maxAttempts - 1) {
// 达到最大重试次数,请求仲裁者介入
return coordinator.arbitrate(this.agentId, resource);
}
Thread.sleep(100 + random.nextInt(300));
}
}
throw new RuntimeException("执行失败");
}监控与检测:实时发现死锁
再好的预防机制也需要配套的监控。生产环境里,我们需要能实时检测死锁和竞争的手段。
@Service
public class DeadlockDetector {
private final Map<String, LockInfo> lockWaitGraph = new ConcurrentHashMap<>();
@Scheduled(fixedDelay = 5000) // 每5秒检查一次
public void detectDeadlocks() {
// 构建等待图
Map<String, Set<String>> waitGraph = buildWaitGraph();
// 检测环
List<List<String>> cycles = findCycles(waitGraph);
if (!cycles.isEmpty()) {
for (List<String> cycle : cycles) {
log.error("检测到死锁环: {}", String.join(" -> ", cycle));
// 发送告警
alertService.sendDeadlockAlert(cycle);
// 选择一个牺牲者打破死锁(选等待时间最短的Agent)
String victim = selectVictim(cycle);
agentKiller.interrupt(victim);
}
}
}
private List<List<String>> findCycles(Map<String, Set<String>> graph) {
Set<String> visited = new HashSet<>();
Set<String> inStack = new HashSet<>();
List<List<String>> cycles = new ArrayList<>();
for (String node : graph.keySet()) {
if (!visited.contains(node)) {
dfs(node, graph, visited, inStack, new ArrayList<>(), cycles);
}
}
return cycles;
}
private void dfs(String node, Map<String, Set<String>> graph,
Set<String> visited, Set<String> inStack,
List<String> path, List<List<String>> cycles) {
visited.add(node);
inStack.add(node);
path.add(node);
Set<String> neighbors = graph.getOrDefault(node, Collections.emptySet());
for (String neighbor : neighbors) {
if (inStack.contains(neighbor)) {
// 找到环
int start = path.indexOf(neighbor);
cycles.add(new ArrayList<>(path.subList(start, path.size())));
} else if (!visited.contains(neighbor)) {
dfs(neighbor, graph, visited, inStack, path, cycles);
}
}
path.remove(path.size() - 1);
inStack.remove(node);
}
}我们踩过的坑
坑1:分布式锁的过期时间设置不合理。
我们有个Agent在某种情况下需要执行比较长的操作,但分布式锁的TTL设置了30秒。结果操作还没完成,锁就自动释放了,另一个Agent进来执行了同样的操作,导致数据重复。后来改成了锁续期机制(watchdog),操作未完成时自动延长锁的有效期。
坑2:测试环境从不出现的问题,生产必现。
单机测试时Agent是顺序执行的,根本触发不了竞争条件。上线后并发一大,问题立刻暴露。后来专门建了并发测试套件,用JMeter模拟多Agent并发场景。
坑3:超时时间设置过短导致误报死锁。
某个LLM调用偶尔会慢,我们的死锁检测把它判断成死锁给kill掉了。后来分了两类超时:业务超时(正常业务的最大允许时间)和死锁超时(明显异常的时间),两个阈值分开配置。
多Agent系统的并发问题本质上和多线程程序一样,只是粒度从线程放大到了服务。解决思路也是那些经典招式:资源排序、超时退让、消息解耦、乐观锁。但Agent的特殊性在于它的执行是非确定性的,LLM可能在你没预料到的时机发起工具调用,所以监控和实时检测比预防更重要。
