Multi-step Function Call:复杂任务分解与工具链式调用
Multi-step Function Call:复杂任务分解与工具链式调用
适读人群:构建LLM Agent系统的Java工程师,希望处理多步骤复杂任务 | 阅读时长:约18分钟
开篇故事
用单个工具调用能解决的需求其实挺有限。真实的业务场景往往是这样的:"帮我分析一下上个月北京和上海的销售数据对比,找出哪些品类增长最快,并给出下个月的备货建议。"
这个任务拆开来要好多步:查北京数据、查上海数据、对比分析、查历史趋势、结合库存情况、给出建议。每一步都可能是一个工具调用,而且有些步骤有先后依赖(先查数据才能分析),有些可以并行执行(北京和上海的数据可以同时查)。
这就是Multi-step Function Call面对的场景。今天我们聊怎么在Java里构建能处理复杂多步骤任务的Agent系统。
一、多步骤调用的两种模式
顺序调用(Sequential):步骤有依赖,前一步的结果是后一步的输入
查用户信息 → 查用户订单 → 查订单商品 → 生成报告并行调用(Parallel):步骤之间独立,可以同时执行
查北京数据 ┐
查上海数据 ┤ → 数据对比分析 → 生成建议
查广州数据 ┘LLM本身支持在一次响应中返回多个tool_calls(并行调用),但处理并行结果然后继续下一步需要我们在代码层面做好编排。
二、多步骤调用的核心问题
三、完整代码示例
3.1 支持并行工具调用的Agent框架
@Service
public class MultiStepAgent {
@Autowired
private OpenAIClient openAIClient;
@Autowired
private Map<String, FunctionCallback> toolRegistry;
@Autowired
private ObjectMapper objectMapper;
private static final int MAX_ROUNDS = 10;
private static final int TOOL_TIMEOUT_SECONDS = 30;
public AgentResult execute(AgentRequest request) {
List<ChatCompletionMessageParam> messages = new ArrayList<>();
// system prompt
messages.add(ChatCompletionSystemMessageParam.builder()
.role(SYSTEM)
.content(request.systemPrompt())
.build());
// user message
messages.add(ChatCompletionUserMessageParam.builder()
.role(USER)
.content(request.userMessage())
.build());
// 构建工具列表
List<ChatCompletionTool> tools = buildTools(request.toolNames());
List<String> executionLog = new ArrayList<>();
for (int round = 0; round < MAX_ROUNDS; round++) {
log.info("[Agent] Round {}, messages count: {}", round + 1, messages.size());
// 调用LLM
ChatCompletion response = openAIClient.chat().completions().create(
ChatCompletionCreateParams.builder()
.model(request.model())
.messages(messages)
.tools(tools)
.build()
);
ChatCompletionMessage assistantMessage = response.choices().get(0).message();
FinishReason finishReason = response.choices().get(0).finishReason();
// 把assistant消息加入历史
messages.add(buildAssistantMessage(assistantMessage));
if (finishReason != FinishReason.TOOL_CALLS) {
// 不需要工具调用,返回最终答案
return AgentResult.success(
assistantMessage.content().orElse(""),
executionLog,
round + 1
);
}
// 处理工具调用(支持并行执行)
List<ChatCompletionMessageToolCall> toolCalls =
assistantMessage.toolCalls().orElse(Collections.emptyList());
log.info("[Agent] Round {}, tool calls: {}", round + 1,
toolCalls.stream().map(tc -> tc.function().name()).collect(toList()));
// 并行执行所有工具
List<ToolExecutionResult> results = executeToolsInParallel(toolCalls, executionLog);
// 把工具结果加入消息历史
for (ToolExecutionResult result : results) {
messages.add(ChatCompletionToolMessageParam.builder()
.role(TOOL)
.toolCallId(result.callId())
.content(result.content())
.build());
}
}
return AgentResult.error("Exceeded maximum rounds (" + MAX_ROUNDS + ")");
}
/**
* 并行执行工具调用,提高效率
*/
private List<ToolExecutionResult> executeToolsInParallel(
List<ChatCompletionMessageToolCall> toolCalls,
List<String> executionLog) {
// 使用CompletableFuture并行执行
List<CompletableFuture<ToolExecutionResult>> futures = toolCalls.stream()
.map(toolCall -> CompletableFuture.supplyAsync(() -> {
long start = System.currentTimeMillis();
String toolName = toolCall.function().name();
String callId = toolCall.id();
try {
FunctionCallback callback = toolRegistry.get(toolName);
if (callback == null) {
return new ToolExecutionResult(callId,
"{\"error\": \"Unknown tool: " + toolName + "\"}");
}
String result = executeWithTimeout(callback,
toolCall.function().arguments());
long elapsed = System.currentTimeMillis() - start;
executionLog.add(String.format("[%s] %dms: %s",
toolName, elapsed, summarize(result)));
return new ToolExecutionResult(callId, result);
} catch (Exception e) {
log.error("[Agent] Tool {} failed: {}", toolName, e.getMessage());
return new ToolExecutionResult(callId,
"{\"error\": \"" + e.getMessage() + "\"}");
}
}))
.collect(toList());
// 等待所有工具执行完成
return futures.stream()
.map(f -> {
try {
return f.get(TOOL_TIMEOUT_SECONDS + 5, TimeUnit.SECONDS);
} catch (Exception e) {
throw new RuntimeException("Tool execution failed", e);
}
})
.collect(toList());
}
private String executeWithTimeout(FunctionCallback callback, String arguments) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(
() -> callback.call(arguments));
try {
return future.get(TOOL_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (java.util.concurrent.TimeoutException e) {
future.cancel(true);
return "{\"error\": \"Tool execution timed out\"}";
} catch (Exception e) {
return "{\"error\": \"" + e.getMessage() + "\"}";
}
}
private String summarize(String json) {
if (json.length() <= 100) return json;
return json.substring(0, 100) + "...";
}
}3.2 复杂业务任务:销售数据分析Agent
// 定义分析所需的工具
@Configuration
public class SalesAnalysisToolsConfig {
@Bean
@Description("查询指定城市和时间段的销售汇总数据,包括销售额、订单量、客户数等。" +
"支持按品类分组统计。")
public Function<SalesQueryRequest, SalesQueryResponse> querySalesData() {
return request -> salesAnalysisService.query(
request.city(), request.startDate(), request.endDate(),
request.groupByCategory()
);
}
@Bean
@Description("查询指定城市和品类的历史销售趋势,返回近N个月的月度数据。" +
"用于分析增长趋势和季节性规律。")
public Function<TrendQueryRequest, TrendQueryResponse> queryHistoricalTrend() {
return request -> trendService.query(
request.city(), request.category(), request.months()
);
}
@Bean
@Description("查询指定品类的当前库存状况,包括各仓库的库存量和预计消耗天数。")
public Function<InventoryQueryRequest, InventoryQueryResponse> queryInventory() {
return request -> inventoryService.query(request.category(), request.city());
}
@Bean
@Description("生成进货建议报告。基于销售数据和库存情况," +
"计算建议的补货数量和优先级。需要先查询销售数据和库存数据再调用此工具。")
public Function<ProcurementRequest, ProcurementResponse> generateProcurementSuggestion() {
return request -> procurementService.generate(
request.salesData(), request.inventoryData(), request.targetDays()
);
}
}
// 使用Agent执行复杂任务
@RestController
@RequestMapping("/api/analysis")
public class SalesAnalysisController {
@Autowired
private MultiStepAgent agent;
@PostMapping("/sales-report")
public ResponseEntity<AgentResult> generateSalesReport(
@RequestBody SalesReportRequest request) {
String systemPrompt = """
你是一个销售数据分析专家。你需要完成以下分析任务:
1. 并行查询各城市的当月和上月销售数据
2. 识别增长最快的品类(同比增长率最高)
3. 查询增长最快品类的历史趋势,验证是否持续增长
4. 查询这些品类的当前库存状况
5. 生成具体的备货建议
分析时请注意:
- 城市数据可以并行查询以提高效率
- 增长率 = (当月-上月) / 上月 × 100%
- 备货建议要结合库存消耗速度和采购周期
""";
String userMessage = String.format(
"请分析%s的%d年%d月销售数据," +
"重点关注与上月对比增长最快的品类," +
"并给出下月备货建议。",
String.join("、", request.cities()),
request.year(), request.month()
);
AgentResult result = agent.execute(AgentRequest.builder()
.systemPrompt(systemPrompt)
.userMessage(userMessage)
.toolNames(List.of("querySalesData", "queryHistoricalTrend",
"queryInventory", "generateProcurementSuggestion"))
.model("gpt-4o")
.build());
return ResponseEntity.ok(result);
}
}四、踩坑实录
坑1:并行工具调用时结果顺序错乱
LLM返回多个tool_calls,我们并行执行后,把结果以错误的顺序(或错误的tool_call_id)返回给LLM,导致LLM把结果张冠李戴。
解决:严格按callId对应工具结果,不能按执行完成的顺序返回:
// 使用Map按callId存储结果,再按toolCalls顺序组装
Map<String, String> resultMap = new ConcurrentHashMap<>();
// ... 并行执行,结果存入resultMap
// 按toolCalls的原始顺序组装
toolCalls.forEach(tc -> messages.add(buildToolMessage(tc.id(), resultMap.get(tc.id()))));坑2:LLM陷入循环:A的结果总是触发B,B又触发A
检测循环:记录最近N轮的工具调用序列,发现重复就中断:
Deque<String> recentCalls = new ArrayDeque<>(5);
for (round in rounds) {
String callSignature = toolCalls.stream()
.map(tc -> tc.function().name())
.sorted().collect(Collectors.joining(","));
if (recentCalls.contains(callSignature)) {
return AgentResult.error("Detected tool call loop: " + callSignature);
}
recentCalls.addLast(callSignature);
if (recentCalls.size() > 5) recentCalls.pollFirst();
}坑3:长对话导致token超限
多轮工具调用会积累大量消息历史,当消息超过模型context window时会报错。
解决:实现滑动窗口,保留最近N条消息:
// 保留system prompt + 最近10条消息
if (messages.size() > 12) {
List<ChatCompletionMessageParam> truncated = new ArrayList<>();
truncated.add(messages.get(0)); // 始终保留system prompt
truncated.addAll(messages.subList(messages.size() - 10, messages.size()));
messages = truncated;
}坑4:工具执行结果太大塞满context
某些工具可能返回大量数据(比如SQL查询返回1000行),直接塞入消息历史会浪费大量token。
解决:对工具结果做摘要处理,或者只返回前N条记录加上统计摘要:
// 对大型结果做摘要
if (result.length() > 2000) {
result = summarizeToolResult(toolName, result);
// summarize:提取关键统计信息,而不是全部数据
}五、总结与延伸
Multi-step Function Call的核心设计原则:
- 并行能并行:多个独立的工具调用同时执行,提高响应速度
- 限制最大轮次:防止无限循环消耗资源
- 循环检测:发现相同的工具调用序列时中断
- 结果压缩:大型工具结果摘要化,不把context window撑爆
- 执行日志:记录每步执行情况,便于调试和审计
真正复杂的Agent系统还需要:持久化对话历史、中间结果缓存、任务进度反馈、人工确认节点等。这些是生产级Agent系统必须考虑的。
下一篇聊Function Call的超时和重试机制,生产级工具调用的可靠性设计。
