Agent 的并行执行——哪些子任务可以同时跑
Agent 的并行执行——哪些子任务可以同时跑
去年我接手了一个遗留项目里的数据收集 Agent,第一次看到它跑完一个完整任务,我以为自己的电脑卡了。
整整 2 分钟 17 秒。
我打开日志一看,这个 Agent 的任务是收集一个商品的完整信息:价格数据、库存数据、用户评论、竞品对比、历史趋势……一共 5 个来源,每个来源大概需要 25-28 秒。
然后我发现了问题所在:这 5 个任务是串行执行的。
一个接一个,严格按顺序来。没有任何一个任务依赖前一个任务的结果,但就是串行跑。
改完之后,同样的 5 个任务并行执行,耗时降到了 38 秒。快了将近 4 倍。
这篇文章讲的就是这件事:Agent 的并行执行,哪些任务可以并行,怎么做,以及有哪些陷阱。
一、串行 vs 并行:两种 Agent 设计的根本差异
先说清楚什么情况下串行是正确的,什么情况下串行是浪费。
必须串行的情况:有数据依赖
步骤1:搜索相关文档
步骤2:对搜索结果进行摘要(依赖步骤1的输出)
步骤3:根据摘要生成最终答案(依赖步骤2的输出)这三步天然串行,因为每一步都依赖上一步的结果。这里强行并行是错的。
完全可以并行的情况:无数据依赖
任务A:查询价格数据
任务B:查询库存数据
任务C:查询用户评论
任务D:查询竞品信息
任务E:查询历史趋势这 5 个任务都是独立的,谁的结果都不依赖其他人,完全可以同时跑。
混合情况:部分依赖
步骤1A:获取用户基本信息
步骤1B:获取订单历史(可与1A并行)
步骤2:基于用户信息和订单历史,分析购买偏好(依赖1A和1B)
步骤3A:推荐商品(依赖步骤2)
步骤3B:生成个性化文案(依赖步骤2)
步骤4:整合推荐结果和文案(依赖3A和3B)这是最常见的情况——部分并行,部分串行。关键是识别出哪些任务之间有依赖,哪些没有。
二、依赖分析:并行化的前提
在代码层面实现并行之前,必须先做任务依赖分析。
我用一个简单的方法:依赖矩阵。
把所有子任务列出来,每个任务问自己:"我的输入需要哪个任务的输出?"
以数据收集 Agent 为例:
| 任务 | 依赖 |
|---|---|
| 获取价格数据(A) | 无 |
| 获取库存数据(B) | 无 |
| 获取用户评论(C) | 无 |
| 获取竞品信息(D) | 无 |
| 获取历史趋势(E) | 无 |
| 综合分析报告(F) | A、B、C、D、E |
结论:A-E 完全并行,F 等待所有前置完成后执行。
这是最理想的并行结构。
三、基于 CompletableFuture 的并行实现
Java 里做并行任务,CompletableFuture 是最自然的选择。
/**
* 并行 Agent 任务执行器
* 核心思路:分析依赖关系 -> 按层批次执行 -> 每层内并行
*/
@Service
@Slf4j
public class ParallelAgentExecutor {
// 使用独立线程池,避免占用公共线程池
private final ExecutorService executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors() * 2
);
@Autowired
private Map<String, AgentTool> toolRegistry;
/**
* 执行一批可并行的子任务
*
* @param tasks 任务列表(已确认互相无依赖)
* @return 任务ID -> 执行结果的映射
*/
public Map<String, Object> executeParallel(List<SubTask> tasks) {
long startTime = System.currentTimeMillis();
log.info("开始并行执行{}个子任务", tasks.size());
// 为每个任务创建一个 CompletableFuture
Map<String, CompletableFuture<Object>> futures = new LinkedHashMap<>();
for (SubTask task : tasks) {
CompletableFuture<Object> future = CompletableFuture
.supplyAsync(() -> executeSubTask(task), executorService)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("子任务失败: {}", task.getTaskId(), ex);
} else {
log.info("子任务完成: {},耗时{}ms",
task.getTaskId(),
System.currentTimeMillis() - startTime);
}
});
futures.put(task.getTaskId(), future);
}
// 等待所有任务完成,收集结果
Map<String, Object> results = new LinkedHashMap<>();
List<String> failedTasks = new ArrayList<>();
for (Map.Entry<String, CompletableFuture<Object>> entry : futures.entrySet()) {
try {
Object result = entry.getValue().get(60, TimeUnit.SECONDS);
results.put(entry.getKey(), result);
} catch (TimeoutException e) {
log.error("子任务超时: {}", entry.getKey());
failedTasks.add(entry.getKey());
} catch (ExecutionException e) {
log.error("子任务执行异常: {}", entry.getKey(), e.getCause());
failedTasks.add(entry.getKey());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("并行执行被中断", e);
}
}
long elapsed = System.currentTimeMillis() - startTime;
log.info("并行执行完成,成功{}个,失败{}个,总耗时{}ms",
results.size(), failedTasks.size(), elapsed);
if (!failedTasks.isEmpty()) {
log.warn("失败的子任务: {}", failedTasks);
// 根据业务需求决定是否抛出异常
}
return results;
}
private Object executeSubTask(SubTask task) {
log.debug("开始执行子任务: {}", task.getTaskId());
AgentTool tool = toolRegistry.get(task.getToolName());
if (tool == null) {
throw new ToolNotFoundException("工具不存在: " + task.getToolName());
}
return tool.execute(task.getParams());
}
}带依赖管理的完整执行器
真实场景里,任务之间往往有部分依赖,需要一个能处理 DAG(有向无环图)依赖的执行器:
/**
* 基于 DAG 依赖的并行任务调度器
*/
@Service
@Slf4j
public class DagScheduler {
@Autowired
private ParallelAgentExecutor parallelExecutor;
/**
* 按 DAG 依赖关系调度执行
*
* 算法:拓扑排序,按"层"批次执行
* 同一层内的任务并行,层与层之间串行
*/
public Map<String, Object> scheduleAndExecute(List<DagTask> allTasks) {
Map<String, Object> allResults = new HashMap<>();
// 构建依赖图
Map<String, DagTask> taskMap = allTasks.stream()
.collect(Collectors.toMap(DagTask::getTaskId, t -> t));
// 拓扑排序,得到执行层次
List<List<String>> executionLayers = topologicalSort(allTasks);
log.info("任务DAG分析完成,共{}层执行计划:", executionLayers.size());
for (int i = 0; i < executionLayers.size(); i++) {
log.info(" 第{}层(并行): {}", i + 1, executionLayers.get(i));
}
// 按层执行
for (int layerIndex = 0; layerIndex < executionLayers.size(); layerIndex++) {
List<String> layerTaskIds = executionLayers.get(layerIndex);
log.info("开始执行第{}层,共{}个并行任务", layerIndex + 1, layerTaskIds.size());
// 将已完成的前置任务结果注入到当前层任务的参数中
List<SubTask> subTasks = layerTaskIds.stream()
.map(taskId -> {
DagTask dagTask = taskMap.get(taskId);
// 把依赖任务的结果作为输入参数
Map<String, Object> enrichedParams = new HashMap<>(dagTask.getParams());
dagTask.getDependencies().forEach(depId -> {
if (allResults.containsKey(depId)) {
enrichedParams.put("dep_" + depId, allResults.get(depId));
}
});
return SubTask.builder()
.taskId(taskId)
.toolName(dagTask.getToolName())
.params(enrichedParams)
.build();
})
.collect(Collectors.toList());
// 当前层并行执行
Map<String, Object> layerResults = parallelExecutor.executeParallel(subTasks);
allResults.putAll(layerResults);
}
return allResults;
}
/**
* 拓扑排序:把 DAG 分层,同层的任务可以并行
*/
private List<List<String>> topologicalSort(List<DagTask> tasks) {
// 计算每个任务的入度(有几个前置依赖)
Map<String, Integer> inDegree = new HashMap<>();
Map<String, List<String>> dependents = new HashMap<>(); // 谁依赖了我
for (DagTask task : tasks) {
inDegree.put(task.getTaskId(), task.getDependencies().size());
dependents.put(task.getTaskId(), new ArrayList<>());
}
for (DagTask task : tasks) {
for (String dep : task.getDependencies()) {
dependents.get(dep).add(task.getTaskId());
}
}
List<List<String>> layers = new ArrayList<>();
Set<String> processed = new HashSet<>();
while (processed.size() < tasks.size()) {
// 找出所有入度为0的任务(可以执行的任务)
List<String> currentLayer = tasks.stream()
.map(DagTask::getTaskId)
.filter(id -> !processed.contains(id) && inDegree.get(id) == 0)
.collect(Collectors.toList());
if (currentLayer.isEmpty()) {
throw new CyclicDependencyException("任务依赖存在环,无法执行");
}
layers.add(currentLayer);
// 处理这一层的任务,更新依赖图
for (String taskId : currentLayer) {
processed.add(taskId);
for (String dependent : dependents.get(taskId)) {
inDegree.put(dependent, inDegree.get(dependent) - 1);
}
}
}
return layers;
}
}
/**
* DAG 任务定义
*/
@Data
@Builder
public class DagTask {
private String taskId;
private String toolName;
private Map<String, Object> params;
/** 前置依赖任务ID列表 */
private List<String> dependencies;
}四、实战:把串行改并行,时间从 2 分钟降到 40 秒
让我完整演示那个数据收集 Agent 的改造过程。
改造前(串行版本)
@Service
public class ProductInfoAgentV1 {
public ProductReport collectInfo(String productId) {
long start = System.currentTimeMillis();
// 串行执行,一个接一个
PriceData priceData = priceCollector.collect(productId); // 25s
StockData stockData = stockCollector.collect(productId); // 22s
ReviewData reviewData = reviewCollector.collect(productId); // 28s
CompetitorData competitorData = competitorCollector.collect(productId); // 30s
TrendData trendData = trendCollector.collect(productId); // 27s
// 生成报告(依赖以上所有数据)
ProductReport report = reportGenerator.generate(
priceData, stockData, reviewData, competitorData, trendData);
log.info("总耗时: {}ms", System.currentTimeMillis() - start);
// 输出:总耗时: 137000ms(约2分17秒)
return report;
}
}改造后(并行版本)
@Service
@Slf4j
public class ProductInfoAgentV2 {
private final ExecutorService pool = Executors.newFixedThreadPool(10);
public ProductReport collectInfo(String productId) {
long start = System.currentTimeMillis();
// 5个数据收集任务同时启动
CompletableFuture<PriceData> priceFuture = CompletableFuture
.supplyAsync(() -> priceCollector.collect(productId), pool);
CompletableFuture<StockData> stockFuture = CompletableFuture
.supplyAsync(() -> stockCollector.collect(productId), pool);
CompletableFuture<ReviewData> reviewFuture = CompletableFuture
.supplyAsync(() -> reviewCollector.collect(productId), pool);
CompletableFuture<CompetitorData> competitorFuture = CompletableFuture
.supplyAsync(() -> competitorCollector.collect(productId), pool);
CompletableFuture<TrendData> trendFuture = CompletableFuture
.supplyAsync(() -> trendCollector.collect(productId), pool);
// 等待所有数据收集完成,然后生成报告
ProductReport report = CompletableFuture
.allOf(priceFuture, stockFuture, reviewFuture, competitorFuture, trendFuture)
.thenApply(v -> {
// 所有 future 都完成后,汇总结果
try {
return reportGenerator.generate(
priceFuture.get(),
stockFuture.get(),
reviewFuture.get(),
competitorFuture.get(),
trendFuture.get()
);
} catch (Exception e) {
throw new RuntimeException("报告生成失败", e);
}
})
.get(120, TimeUnit.SECONDS); // 最多等2分钟
log.info("总耗时: {}ms", System.currentTimeMillis() - start);
// 输出:总耗时: 32000ms(约32秒,取5个任务中最慢的那个)
return report;
}
}实测结果对比:
| 指标 | 串行版本 | 并行版本 | 提升 |
|---|---|---|---|
| 平均耗时 | 137 秒 | 32 秒 | 4.3x |
| P95 耗时 | 162 秒 | 41 秒 | 3.95x |
| 超时失败率 | 8% | 2% | -75% |
| 资源利用率 | 低 | 高 | 显著提升 |
五、任务依赖图 + 并行执行计划
六、并行执行的三个陷阱
陷阱一:共享状态竞争
多个并行任务如果访问同一个可变状态,会出现并发问题:
// 危险:多个并行任务都在修改同一个 List
List<String> results = new ArrayList<>(); // 非线程安全
CompletableFuture.allOf(
CompletableFuture.runAsync(() -> results.add(fetchA())),
CompletableFuture.runAsync(() -> results.add(fetchB()))
).join();
// 可能丢数据!
// 正确做法:每个任务独立返回,最后汇总
List<String> safeResults = Stream.of(
CompletableFuture.supplyAsync(() -> fetchA()),
CompletableFuture.supplyAsync(() -> fetchB())
).map(CompletableFuture::join)
.collect(Collectors.toList());陷阱二:线程池耗尽
把所有并行任务都扔进同一个线程池,当并发量高时会导致线程池耗尽,反而比串行更慢:
// 危险:用 ForkJoinPool.commonPool()(默认线程池)
CompletableFuture.supplyAsync(() -> heavyNetworkCall());
// 正确:用独立线程池,根据任务特性调整大小
ExecutorService ioPool = Executors.newCachedThreadPool(); // IO 密集
ExecutorService cpuPool = Executors.newFixedThreadPool(cores); // CPU 密集
CompletableFuture.supplyAsync(() -> heavyNetworkCall(), ioPool);陷阱三:部分失败的处理
当 5 个并行任务有 1 个失败时,你希望怎么处理?
- 全部失败,重新来?
- 只用成功的那 4 个结果继续?
- 标记失败,跳过,用默认值?
// 容错模式:允许部分失败,收集成功结果
public Map<String, Object> executeWithFallback(List<SubTask> tasks) {
Map<String, CompletableFuture<Optional<Object>>> futures = tasks.stream()
.collect(Collectors.toMap(
SubTask::getTaskId,
task -> CompletableFuture
.supplyAsync(() -> executeSubTask(task))
.handle((result, ex) -> {
if (ex != null) {
log.warn("子任务失败(已降级): {}", task.getTaskId(), ex);
return Optional.empty(); // 失败时返回空值
}
return Optional.of(result);
})
));
// 等待所有任务,无论成功失败
CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0])).join();
// 收集所有结果,过滤掉失败的
return futures.entrySet().stream()
.filter(e -> e.getValue().join().isPresent())
.collect(Collectors.toMap(
Map.Entry::getKey,
e -> e.getValue().join().get()
));
}七、什么任务不适合并行
并行化有收益,但也有代价:代码复杂度提升、调试难度增加、资源消耗增大。
以下情况建议保持串行:
- 任务执行时间本身很短(< 1 秒):并行的调度开销可能比节省的时间还多
- 任务之间有强依赖:无法并行,勉强并行只会出 bug
- 下游服务有严格的 QPS 限制:多任务并行可能触发限流
- 调试和可观测性要求高:串行日志更容易追踪问题
八、总结
那个从 2 分钟降到 38 秒的改造,其实改动的代码量很少,核心变化只有一件事:把 5 个相互独立的任务从串行改成了并行。
做好这件事只需要两步:
- 分析依赖:识别哪些任务之间没有数据依赖
- 选择工具:Java 里用
CompletableFuture,LLM 调用本身可以用流式输出减少感知延迟
Agent 的执行效率很大程度上取决于你有没有充分利用并行能力。遇到长耗时任务,先画依赖图,然后问自己:有没有可以同时跑的?通常都有。
