第1907篇:Spring AI的工具调用并发优化——并行执行多个Function Call
第1907篇:Spring AI的工具调用并发优化——并行执行多个Function Call
工具调用(Function Calling)是 AI 应用里非常强大的能力,但有个性能问题往往被忽视:当 LLM 决定调用多个工具时,默认情况下这些工具是串行执行的——执行完工具A,把结果给 LLM,LLM 再决定调用工具B,再执行……
对于彼此没有依赖关系的工具调用,串行执行纯粹是浪费时间。
我们有个项目,用户问一个综合查询("我名下的订单、优惠券和积分情况"),AI 需要调用三个工具:查订单、查优惠券、查积分。这三个查询完全独立,但串行下来要 2-3 秒,并行只需要不到 1 秒。差了 2 倍以上。
这篇文章就聊聊怎么在 Spring AI 里实现工具调用的并发优化。
先理解工具调用的执行流程
要做并发优化,得先清楚默认流程是什么样的。
在 LLM 返回多个 tool_calls 之后,Spring AI 默认会逐一执行这些工具。这是因为框架不知道这些工具之间是否有依赖,保守起见选择串行。
要实现并行,我们需要介入工具的执行环节。
方案一:并发工具执行器
Spring AI 允许我们自定义 FunctionCallback 的执行方式。通过包装原始的函数实现,在执行层面引入并发:
@Configuration
public class ConcurrentFunctionCallConfig {
@Bean
public Executor toolExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("tool-exec-");
executor.setKeepAliveSeconds(60);
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}核心是实现一个并发工具调用处理器,在 Advisor 层捕获 LLM 返回的多个 tool_calls,并行执行:
@Component
public class ParallelToolExecutionAdvisor implements CallAroundAdvisor {
private static final int ORDER = 500; // 在记忆 Advisor 之后,紧贴 LLM
@Autowired
@Qualifier("toolExecutor")
private Executor toolExecutor;
// 工具注册表:工具名 -> 实际执行函数
@Autowired
private FunctionCallbackContext functionCallbackContext;
@Override
public AdvisedResponse aroundCall(AdvisedRequest request, CallAroundAdvisorChain chain) {
AdvisedResponse response = chain.nextAroundCall(request);
// 检查 LLM 是否返回了工具调用请求
AssistantMessage assistantMessage = response.response()
.getResult()
.getOutput();
if (!assistantMessage.hasToolCalls()) {
return response; // 没有工具调用,直接返回
}
List<AssistantMessage.ToolCall> toolCalls = assistantMessage.getToolCalls();
if (toolCalls.size() == 1) {
// 只有一个工具调用,串行执行即可,不需要并发开销
return executeAndContinue(request, response, toolCalls);
}
// 多个工具调用:分析依赖关系,并行执行独立工具
return executeParallelAndContinue(request, response, toolCalls);
}
/**
* 分析工具依赖并并行执行
*/
private AdvisedResponse executeParallelAndContinue(
AdvisedRequest request,
AdvisedResponse firstResponse,
List<AssistantMessage.ToolCall> toolCalls) {
// 分析工具依赖(基于工具元数据或者简单规则)
DependencyGraph graph = analyzeDependencies(toolCalls);
// 按依赖层次分组执行
List<List<AssistantMessage.ToolCall>> executionLayers = graph.getExecutionLayers();
List<ToolResponseMessage.ToolResponse> allResults = new ArrayList<>();
for (List<AssistantMessage.ToolCall> layer : executionLayers) {
if (layer.size() == 1) {
// 当前层只有一个工具,串行执行
allResults.add(executeSingle(layer.get(0)));
} else {
// 当前层有多个独立工具,并行执行
List<ToolResponseMessage.ToolResponse> layerResults =
executeLayerParallel(layer);
allResults.addAll(layerResults);
}
}
// 把所有工具结果打包成消息,再次调用 LLM
return continueWithResults(request, firstResponse, allResults);
}
private List<ToolResponseMessage.ToolResponse> executeLayerParallel(
List<AssistantMessage.ToolCall> toolCalls) {
List<CompletableFuture<ToolResponseMessage.ToolResponse>> futures =
toolCalls.stream()
.map(toolCall -> CompletableFuture.supplyAsync(
() -> executeSingle(toolCall),
toolExecutor
))
.collect(Collectors.toList());
// 等待所有并行工具执行完成
CompletableFuture<Void> allDone = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0]));
try {
allDone.get(30, TimeUnit.SECONDS); // 整体超时
} catch (TimeoutException e) {
log.error("并行工具执行超时", e);
// 超时的工具返回错误信息
futures.forEach(f -> f.cancel(true));
throw new ToolExecutionTimeoutException("工具并行执行超时");
} catch (Exception e) {
log.error("并行工具执行异常", e);
throw new ToolExecutionException("工具执行失败: " + e.getMessage());
}
return futures.stream()
.map(f -> {
try {
return f.get();
} catch (Exception e) {
// 单个工具失败,返回错误响应,不影响其他工具
return new ToolResponseMessage.ToolResponse(
"error", "工具执行失败: " + e.getMessage(), "error");
}
})
.collect(Collectors.toList());
}
private ToolResponseMessage.ToolResponse executeSingle(
AssistantMessage.ToolCall toolCall) {
long startTime = System.currentTimeMillis();
String toolName = toolCall.name();
try {
// 通过 FunctionCallbackContext 找到实际的函数实现
FunctionCallback callback = functionCallbackContext.resolve(toolName);
if (callback == null) {
throw new ToolNotFoundException("工具不存在: " + toolName);
}
String result = callback.call(toolCall.arguments());
long duration = System.currentTimeMillis() - startTime;
log.debug("工具执行完成: {}, 耗时: {}ms", toolName, duration);
return new ToolResponseMessage.ToolResponse(
toolCall.id(), toolName, result);
} catch (Exception e) {
log.error("工具执行异常: {}, 参数: {}", toolName, toolCall.arguments(), e);
return new ToolResponseMessage.ToolResponse(
toolCall.id(), toolName,
"执行失败: " + e.getMessage());
}
}
@Override
public String getName() { return "ParallelToolExecutionAdvisor"; }
@Override
public int getOrder() { return ORDER; }
}工具依赖分析:智能分层执行
不是所有工具都能无脑并行。比如"先查用户信息,再用用户ID查订单"——这两个工具有数据依赖,必须串行。
通过给工具加依赖声明,可以实现自动分层:
// 工具依赖声明注解
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface ToolDependency {
/**
* 依赖的工具名称列表
* 只有这些工具执行完后,当前工具才能执行
*/
String[] dependsOn() default {};
/**
* 是否允许与其他工具并行
*/
boolean parallelizable() default true;
/**
* 工具执行的超时时间(秒)
*/
int timeoutSeconds() default 10;
}
// 使用示例
@Component
public class UserQueryTools {
@Tool("查询用户基本信息")
@ToolDependency(parallelizable = true, timeoutSeconds = 5)
public UserInfo getUserInfo(@P("用户ID") String userId) {
return userService.getById(userId);
}
@Tool("查询用户订单列表")
@ToolDependency(parallelizable = true, timeoutSeconds = 10)
public List<Order> getUserOrders(@P("用户ID") String userId) {
return orderService.getByUserId(userId);
}
@Tool("查询用户优惠券")
@ToolDependency(parallelizable = true, timeoutSeconds = 5)
public List<Coupon> getUserCoupons(@P("用户ID") String userId) {
return couponService.getByUserId(userId);
}
@Tool("查询用户积分")
@ToolDependency(parallelizable = true, timeoutSeconds = 5)
public PointInfo getUserPoints(@P("用户ID") String userId) {
return pointService.getByUserId(userId);
}
// 这个工具依赖 getUserInfo 的结果
@Tool("基于用户画像推荐商品,需要先获取用户信息")
@ToolDependency(dependsOn = "getUserInfo", parallelizable = false)
public List<Product> recommendProducts(
@P("用户ID") String userId,
@P("推荐数量") int count) {
UserInfo user = userService.getById(userId);
return recommendService.recommend(user.getProfile(), count);
}
}依赖分析器:
@Component
public class ToolDependencyAnalyzer {
@Autowired
private ToolMetadataRegistry toolMetadata;
/**
* 分析工具调用列表,构建执行层次
* 返回按层次分组的工具列表,同一层内可以并行执行
*/
public List<List<String>> analyzeExecutionLayers(List<String> toolNames) {
// 构建依赖图
Map<String, Set<String>> dependencyMap = new HashMap<>();
for (String toolName : toolNames) {
ToolMetadata meta = toolMetadata.get(toolName);
Set<String> deps = new HashSet<>();
if (meta != null && meta.getDependsOn() != null) {
for (String dep : meta.getDependsOn()) {
if (toolNames.contains(dep)) {
deps.add(dep);
}
}
}
dependencyMap.put(toolName, deps);
}
// 拓扑排序,按层次分组
List<List<String>> layers = new ArrayList<>();
Set<String> executed = new HashSet<>();
while (executed.size() < toolNames.size()) {
List<String> currentLayer = new ArrayList<>();
for (String toolName : toolNames) {
if (executed.contains(toolName)) continue;
Set<String> deps = dependencyMap.get(toolName);
if (executed.containsAll(deps)) {
currentLayer.add(toolName);
}
}
if (currentLayer.isEmpty()) {
// 有循环依赖,打破循环
log.warn("检测到工具循环依赖,降级为串行执行");
List<String> remaining = toolNames.stream()
.filter(t -> !executed.contains(t))
.collect(Collectors.toList());
layers.add(remaining);
break;
}
layers.add(currentLayer);
executed.addAll(currentLayer);
}
return layers;
}
}实战:综合查询场景的完整实现
来看一个完整的例子:用户问"给我汇总一下账户情况",需要并行查询多个数据源。
@Component
public class AccountSummaryTools {
@Autowired
private OrderService orderService;
@Autowired
private WalletService walletService;
@Autowired
private CouponService couponService;
@Autowired
private PointService pointService;
@Tool("查询用户最近30天的订单统计")
public OrderStats getOrderStats(@P("用户ID") String userId) {
log.info("开始查询订单统计, userId={}, thread={}",
userId, Thread.currentThread().getName());
// 模拟数据库查询耗时
return orderService.getStats(userId, 30);
}
@Tool("查询用户钱包余额和收支记录")
public WalletInfo getWalletInfo(@P("用户ID") String userId) {
log.info("开始查询钱包信息, userId={}, thread={}",
userId, Thread.currentThread().getName());
return walletService.getWalletInfo(userId);
}
@Tool("查询用户可用优惠券列表")
public CouponSummary getCouponSummary(@P("用户ID") String userId) {
log.info("开始查询优惠券, userId={}, thread={}",
userId, Thread.currentThread().getName());
return couponService.getSummary(userId);
}
@Tool("查询用户积分和等级信息")
public PointInfo getPointInfo(@P("用户ID") String userId) {
log.info("开始查询积分信息, userId={}, thread={}",
userId, Thread.currentThread().getName());
return pointService.getInfo(userId);
}
}配置并发 ChatClient:
@Configuration
public class ParallelChatClientConfig {
@Bean("parallelChatClient")
public ChatClient parallelChatClient(
ChatClient.Builder builder,
ParallelToolExecutionAdvisor parallelAdvisor,
ChatMemory chatMemory) {
return builder
.defaultSystem("""
你是一个账户助手。当用户需要查询多项账户信息时,
你应该同时调用所有相关的查询工具,不要等一个工具返回才调用下一个。
汇总所有信息后给出完整的账户概况。
""")
.defaultAdvisors(
new MessageChatMemoryAdvisor(chatMemory),
parallelAdvisor // 并发工具执行 Advisor
)
.defaultFunctions(
"getOrderStats",
"getWalletInfo",
"getCouponSummary",
"getPointInfo"
)
.build();
}
}测试性能对比:
@SpringBootTest
class ParallelToolExecutionTest {
@Autowired
@Qualifier("parallelChatClient")
private ChatClient parallelClient;
@Autowired
@Qualifier("defaultChatClient")
private ChatClient defaultClient;
@Test
void comparePerformance() {
String userId = "test-user-001";
String query = "请帮我查询一下我的订单、钱包、优惠券和积分情况,做个汇总";
// 测试串行执行
long serialStart = System.currentTimeMillis();
String serialResult = defaultClient.prompt()
.user(query.replace("我", userId))
.call()
.content();
long serialTime = System.currentTimeMillis() - serialStart;
// 测试并行执行
long parallelStart = System.currentTimeMillis();
String parallelResult = parallelClient.prompt()
.user(query.replace("我", userId))
.call()
.content();
long parallelTime = System.currentTimeMillis() - parallelStart;
log.info("串行执行耗时: {}ms", serialTime);
log.info("并行执行耗时: {}ms", parallelTime);
log.info("性能提升: {:.1f}x", (double) serialTime / parallelTime);
// 验证结果完整性
assertThat(parallelResult).contains("订单", "钱包", "优惠券", "积分");
}
}监控并发工具调用的性能
生产环境里要能看到工具调用的并发情况:
@Component
public class ToolExecutionMetrics {
@Autowired
private MeterRegistry meterRegistry;
private final Map<String, AtomicInteger> activeExecutions = new ConcurrentHashMap<>();
public void recordToolStart(String toolName, String executionMode) {
meterRegistry.counter("tool.execution.started",
"tool", toolName,
"mode", executionMode
).increment();
activeExecutions.computeIfAbsent(toolName, k -> new AtomicInteger(0))
.incrementAndGet();
// 当前并发数
meterRegistry.gauge("tool.execution.concurrent",
Tags.of("tool", toolName),
activeExecutions.get(toolName));
}
public void recordToolComplete(String toolName, long durationMs, boolean success) {
meterRegistry.timer("tool.execution.duration",
"tool", toolName,
"success", String.valueOf(success)
).record(durationMs, TimeUnit.MILLISECONDS);
AtomicInteger active = activeExecutions.get(toolName);
if (active != null) active.decrementAndGet();
}
public void recordParallelBatch(int toolCount, long totalDurationMs,
long maxSingleDurationMs) {
meterRegistry.summary("tool.parallel.batch.size").record(toolCount);
meterRegistry.timer("tool.parallel.batch.duration")
.record(totalDurationMs, TimeUnit.MILLISECONDS);
// 并行效率:理论串行时间 vs 实际并行时间
// 这里简化处理,实际可以通过各工具单独计时来计算
double efficiency = (double) maxSingleDurationMs / totalDurationMs;
meterRegistry.summary("tool.parallel.efficiency").record(efficiency);
}
}踩坑记录
坑1:LLM 不一定会一次性返回所有工具调用
并不是所有模型都支持一次返回多个 tool_calls。GPT-4 和 Claude 支持,但一些较小的模型可能会把多工具需求拆成多轮对话(每次只调用一个工具,直到拿到结果再决定下一步)。做并发优化前,要先确认你的目标模型确实会一次性返回多个 tool_calls。
可以通过 prompt 引导模型一次性返回:
当需要查询多项信息时,请同时发起所有必要的工具调用,而不是一次调用一个工具等待结果后再调用下一个。坑2:工具之间的事务问题
有些工具调用有副作用(比如写数据库),并行执行时要注意事务隔离。我们有个案例,两个工具都涉及更新同一个用户积分表,并行执行时出现了数据竞争。
解决方案:有副作用的工具在元数据里标记为 parallelizable = false,或者在工具实现里加分布式锁。
坑3:线程池大小和 LLM 并发限制
工具执行的并发受两个因素限制:一是你的线程池大小,二是后端数据库/服务的并发承受能力。工具执行线程池不要设太大,否则并发访问数据库会拖慢整体速度,甚至触发数据库连接池耗尽。
一般原则:工具线程池大小 ≈ 数据库连接池大小 / 平均每个工具的 DB 查询数。
坑4:超时设置的层次
工具并行执行有多个超时需要设置:
- 单个工具的超时(每个工具自己的 timeout)
- 整个并行批次的超时(所有工具完成的总时限)
- LLM 调用的超时(包括工具结果返回后的 LLM 二次调用)
这三层超时要协调好。我见过设置整体 30 秒超时,但单个工具 20 秒,导致某个慢工具独占时间,其他工具没有足够时间执行。
小结
工具调用并发优化是 AI 应用性能调优里很有价值的一块,特别是在需要聚合多数据源的场景下效果显著。
核心要点:
- LLM 一次性返回多个 tool_calls 时,识别无依赖工具并并行执行
- 通过工具元数据(
@ToolDependency)声明依赖关系,实现智能分层执行 - 单个工具失败不影响其他工具,降级返回错误信息给 LLM
- 线程池和超时设置要结合实际后端能力来调整
- 监控工具执行的并发度和效率,持续优化
