Function Calling 高级设计——复杂工具链的实战经验
Function Calling 高级设计——复杂工具链的实战经验
适读人群:做AI应用的工程师 | 阅读时长:约14分钟 | 核心价值:工具链的工程化设计,不只是"定义函数让模型调用"
第一次做 Function Calling 的时候,我觉得这个东西挺简单——定义几个函数,告诉模型可以调用,完事。
然后在一个生产项目里被它教育了一顿。
那是一个内部工作流系统,模型需要调用七八个工具(查订单、查库存、发通知、更新状态……),工具调用失败率大概 3%,但整个工作流的失败率接近 20%——因为工具调用是串联的,任何一环挂掉整个流程就废了。
那之后我开始认真想工具链的工程设计问题。这篇文章是我摸索出来的一套方案。
基础回顾:工具调用的完整循环
先确保大家在同一起跑线上。一个完整的工具调用循环是这样的:
import anthropic
import json
client = anthropic.Anthropic()
# 工具定义
tools = [{
"name": "get_order_status",
"description": "查询订单状态",
"input_schema": {
"type": "object",
"properties": {
"order_id": {"type": "string", "description": "订单ID"}
},
"required": ["order_id"]
}
}]
def get_order_status(order_id: str) -> dict:
# 实际实现,这里模拟
return {"order_id": order_id, "status": "shipped", "tracking": "SF1234567"}
def run_with_tools(user_message: str):
messages = [{"role": "user", "content": user_message}]
while True:
response = client.messages.create(
model="claude-sonnet-4-5",
max_tokens=1024,
tools=tools,
messages=messages
)
if response.stop_reason == "end_turn":
# 模型决定不再调用工具,输出最终答案
return response.content[0].text
if response.stop_reason == "tool_use":
# 处理工具调用
tool_use_blocks = [b for b in response.content if b.type == "tool_use"]
# 把模型的响应加到消息历史
messages.append({"role": "assistant", "content": response.content})
# 执行工具并收集结果
tool_results = []
for tool_use in tool_use_blocks:
if tool_use.name == "get_order_status":
result = get_order_status(**tool_use.input)
tool_results.append({
"type": "tool_result",
"tool_use_id": tool_use.id,
"content": json.dumps(result, ensure_ascii=False)
})
messages.append({"role": "user", "content": tool_results})这是基础。生产里的问题从这里开始。
问题一:工具调用失败的重试策略
工具会失败——网络超时、下游服务不可用、参数格式问题。不做重试,直接让模型拿到错误结果,它可能做出错误的后续决策。
我的重试策略分两层:
第一层:透明重试(对模型不可见)
对于短暂的技术错误(网络超时、5xx),在工具执行层直接重试,模型感知不到。
import time
import functools
from typing import Callable, TypeVar, Any
T = TypeVar("T")
def with_retry(max_attempts: int = 3, backoff_base: float = 1.0):
"""装饰器:带指数退避的重试"""
def decorator(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs):
last_exception = None
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except (ConnectionError, TimeoutError) as e:
last_exception = e
if attempt < max_attempts - 1:
sleep_time = backoff_base * (2 ** attempt)
time.sleep(sleep_time)
raise last_exception
return wrapper
return decorator
@with_retry(max_attempts=3, backoff_base=0.5)
def get_order_status(order_id: str) -> dict:
# 实际调用下游服务
pass第二层:语义重试(让模型知道出了什么问题)
对于业务逻辑错误(参数无效、数据不存在),把详细的错误信息返回给模型,让它自己决定是否需要用不同的参数重试:
def execute_tool_safe(tool_name: str, tool_input: dict) -> dict:
try:
if tool_name == "get_order_status":
result = get_order_status(**tool_input)
return {"success": True, "data": result}
except ValueError as e:
# 参数错误:告诉模型具体错在哪里
return {
"success": False,
"error_type": "invalid_parameter",
"message": str(e),
"hint": "请检查参数格式是否正确"
}
except Exception as e:
# 服务错误:告诉模型这是临时性的
return {
"success": False,
"error_type": "service_unavailable",
"message": "下游服务暂时不可用",
"hint": "可以稍后重试,或使用其他方式完成这个步骤"
}关键点:错误信息里要有 hint,告诉模型可以怎么应对。模型在有提示的情况下恢复能力会好很多。
问题二:并行工具调用
当模型需要调用多个独立工具时,Claude 支持在一次响应里返回多个 tool_use block,可以并行执行。
问题是很多人没注意到这个特性,或者没有正确处理并行调用,变成了不必要的串行。
import asyncio
from concurrent.futures import ThreadPoolExecutor
async def execute_tools_parallel(tool_use_blocks: list) -> list:
"""并行执行所有工具调用"""
loop = asyncio.get_event_loop()
executor = ThreadPoolExecutor(max_workers=len(tool_use_blocks))
async def execute_one(tool_use):
result = await loop.run_in_executor(
executor,
execute_tool_safe,
tool_use.name,
tool_use.input
)
return {
"type": "tool_result",
"tool_use_id": tool_use.id,
"content": json.dumps(result, ensure_ascii=False)
}
tasks = [execute_one(tu) for tu in tool_use_blocks]
return await asyncio.gather(*tasks)实测效果:一个需要调用 3 个独立查询工具的工作流,从 3.2s(串行)降到 1.4s(并行)。对于有响应时间要求的场景,这个差距很明显。
注意:并行调用只适用于无依赖关系的工具。如果工具 B 需要工具 A 的输出,只能串行。这个判断你自己要清楚,不能全靠模型。
问题三:工具版本管理
工具的实现会变,但线上的工作流可能依赖特定版本的行为。不做版本管理,升级工具会触发意外的行为变化。
我的方案:工具名里带版本号,保持旧版本一段时间的兼容:
from typing import Protocol, Dict
class ToolImplementation(Protocol):
def execute(self, **kwargs) -> dict: ...
class ToolRegistry:
def __init__(self):
self._tools: Dict[str, Dict[str, ToolImplementation]] = {}
self._latest: Dict[str, str] = {}
def register(self, name: str, version: str, impl: ToolImplementation):
if name not in self._tools:
self._tools[name] = {}
self._tools[name][version] = impl
# 自动更新 latest 指针(语义版本,选最大的)
self._latest[name] = max(self._tools[name].keys())
def get_tool_definitions(self, use_latest: bool = True) -> list:
"""生成传给 Claude 的工具定义列表"""
definitions = []
for name, versions in self._tools.items():
version = self._latest[name] if use_latest else list(versions.keys())[0]
# 工具名带版本,让模型调用时明确指定
tool_name = f"{name}_v{version.replace('.', '_')}"
definitions.append({
"name": tool_name,
"description": f"[v{version}] {versions[version].description}",
"input_schema": versions[version].schema
})
return definitions
def execute(self, versioned_name: str, **kwargs) -> dict:
# 解析 "get_order_status_v2_0" -> name="get_order_status", version="2.0"
parts = versioned_name.rsplit("_v", 1)
if len(parts) != 2:
raise ValueError(f"无效的工具名格式: {versioned_name}")
name, version_str = parts
version = version_str.replace("_", ".")
return self._tools[name][version].execute(**kwargs)问题四:工具调用的审计日志
生产系统里工具调用的审计日志至少要记录这些:
- 调用时间
- 调用了哪个工具
- 输入参数(注意脱敏!)
- 返回结果摘要
- 耗时
- 关联的会话 ID
import uuid
import logging
from datetime import datetime
from dataclasses import dataclass, asdict
@dataclass
class ToolCallAuditRecord:
session_id: str
call_id: str
timestamp: str
tool_name: str
input_summary: str # 脱敏后的摘要,不是完整输入
success: bool
error_type: str | None
elapsed_ms: int
class AuditLogger:
def __init__(self):
self.logger = logging.getLogger("tool_audit")
def _sanitize_input(self, tool_name: str, inputs: dict) -> str:
"""脱敏处理:隐藏敏感字段"""
SENSITIVE_KEYS = {"password", "token", "secret", "card_number", "id_card"}
sanitized = {
k: "***REDACTED***" if k.lower() in SENSITIVE_KEYS else v
for k, v in inputs.items()
}
return json.dumps(sanitized, ensure_ascii=False)
def log(self, session_id: str, tool_name: str, inputs: dict,
result: dict, elapsed_ms: int):
record = ToolCallAuditRecord(
session_id=session_id,
call_id=str(uuid.uuid4()),
timestamp=datetime.utcnow().isoformat(),
tool_name=tool_name,
input_summary=self._sanitize_input(tool_name, inputs),
success=result.get("success", True),
error_type=result.get("error_type"),
elapsed_ms=elapsed_ms
)
self.logger.info(json.dumps(asdict(record), ensure_ascii=False))把这些整合在一起
class ProductionToolChain:
def __init__(self):
self.registry = ToolRegistry()
self.audit = AuditLogger()
self.client = anthropic.Anthropic()
def run(self, user_message: str, session_id: str) -> str:
messages = [{"role": "user", "content": user_message}]
tool_definitions = self.registry.get_tool_definitions()
while True:
response = self.client.messages.create(
model="claude-sonnet-4-5",
max_tokens=2048,
tools=tool_definitions,
messages=messages
)
if response.stop_reason == "end_turn":
return next(
(b.text for b in response.content if hasattr(b, "text")),
""
)
if response.stop_reason == "tool_use":
messages.append({"role": "assistant", "content": response.content})
tool_use_blocks = [b for b in response.content if b.type == "tool_use"]
# 并行执行
tool_results = asyncio.run(
self._execute_with_audit(tool_use_blocks, session_id)
)
messages.append({"role": "user", "content": tool_results})
async def _execute_with_audit(self, tool_use_blocks, session_id):
async def execute_one(tool_use):
start = time.time()
result = await asyncio.get_event_loop().run_in_executor(
None,
self.registry.execute,
tool_use.name,
**tool_use.input
)
elapsed = int((time.time() - start) * 1000)
self.audit.log(session_id, tool_use.name, tool_use.input, result, elapsed)
return {
"type": "tool_result",
"tool_use_id": tool_use.id,
"content": json.dumps(result, ensure_ascii=False)
}
return await asyncio.gather(*[execute_one(tu) for tu in tool_use_blocks])一个我在项目里遇到的真实坑
工具链里有一个"发送通知"的工具。有一次因为重试逻辑的问题,同一条通知被发了三次给用户。
这暴露了一个设计问题:不是所有工具都适合重试。查询类工具(幂等操作)重试没问题,但写操作(发通知、扣库存、记录日志)如果重试就会产生副作用。
解决方案:在工具定义里增加 idempotent 标记,重试策略根据这个标记决定是否重试。
这个细节很容易被忽略,但在生产里代价很高。
