Python 实现 AI 工作流编排——多 Agent 协作的工程化实现
2026/4/30大约 6 分钟
Python 实现 AI 工作流编排——多 Agent 协作的工程化实现
适读人群:在用 LLM 做复杂任务的工程师、想从单 Agent 升级到多 Agent 的开发者 | 阅读时长:约15分钟 | 核心价值:不依赖 LangChain 黑盒,自己实现一个可控的 Multi-Agent 工作流框架
去年下半年我在做一个自动化内容处理系统,需要把一篇原始技术文章经过几个步骤处理:先提取关键信息,然后根据不同受众改写,最后检查质量。
最开始我用的是 LangChain 的 Agent,调了两个星期,跑起来了但完全是黑盒,出了问题根本不知道从哪里查,有次 Agent 陷入了无限循环花了我2分钟才发现,账单多了 23 块钱。
最后我决定自己实现一个简单的 Multi-Agent 编排框架,透明可控,方便 debug。
为什么不用 LangChain/LangGraph?
我不是说这些框架不好,而是对于我的场景来说,它们引入的复杂性超过了它们带来的收益。
用框架的问题:
- 黑盒多,调试困难
- 版本更新频繁,API 不稳定
- 抽象层太多,性能损耗不透明
- 文档质量参差不齐
自己实现的好处:
- 完全掌控执行流程
- 方便加监控、日志、重试
- 容易理解和 review
- 可以按需裁剪
代价是:多写一些代码,但这些代码是你理解的代码。
核心设计:Agent 和工作流
# agents/base.py
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import Any, Optional
from openai import AsyncOpenAI
import json
import logging
import time
logger = logging.getLogger(__name__)
client = AsyncOpenAI()
@dataclass
class AgentMessage:
"""Agent 之间传递的消息"""
content: str
metadata: dict = field(default_factory=dict)
role: str = "assistant"
@dataclass
class AgentResult:
"""Agent 执行结果"""
success: bool
output: Any
agent_name: str
duration_ms: float
token_used: int = 0
error: Optional[str] = None
class BaseAgent(ABC):
def __init__(
self,
name: str,
model: str = "gpt-4o-mini",
system_prompt: str = "",
temperature: float = 0.7,
max_tokens: int = 2000,
):
self.name = name
self.model = model
self.system_prompt = system_prompt
self.temperature = temperature
self.max_tokens = max_tokens
@abstractmethod
async def process(self, input_data: Any, context: dict = None) -> AgentResult:
"""处理输入,返回结果"""
pass
async def _call_llm(self, user_message: str, context: dict = None) -> tuple[str, int]:
"""调用 LLM,返回 (response_text, tokens_used)"""
messages = []
if self.system_prompt:
messages.append({"role": "system", "content": self.system_prompt})
# 可以把上下文注入到消息里
if context and context.get("conversation_history"):
messages.extend(context["conversation_history"])
messages.append({"role": "user", "content": user_message})
response = await client.chat.completions.create(
model=self.model,
messages=messages,
temperature=self.temperature,
max_tokens=self.max_tokens,
)
return response.choices[0].message.content, response.usage.total_tokens
async def run(self, input_data: Any, context: dict = None) -> AgentResult:
"""带监控的 run 包装"""
start = time.perf_counter()
logger.info(f"[{self.name}] Starting, input_type={type(input_data).__name__}")
try:
result = await self.process(input_data, context)
duration = (time.perf_counter() - start) * 1000
result.duration_ms = duration
logger.info(f"[{self.name}] Done in {duration:.1f}ms, tokens={result.token_used}")
return result
except Exception as e:
duration = (time.perf_counter() - start) * 1000
logger.error(f"[{self.name}] Failed after {duration:.1f}ms: {e}", exc_info=True)
return AgentResult(
success=False,
output=None,
agent_name=self.name,
duration_ms=duration,
error=str(e),
)具体 Agent 实现
# agents/content_agents.py
from agents.base import BaseAgent, AgentResult
from pydantic import BaseModel
import json
class KeyInfoExtractor(BaseAgent):
"""提取文章关键信息"""
def __init__(self):
super().__init__(
name="KeyInfoExtractor",
model="gpt-4o-mini",
system_prompt="""你是一个专业的技术文章分析师。
从给定的技术文章中提取关键信息,以 JSON 格式输出:
{
"title": "文章标题",
"main_topic": "核心主题(一句话)",
"key_points": ["要点1", "要点2", "要点3"],
"target_audience": "目标读者",
"difficulty_level": "beginner/intermediate/advanced",
"code_languages": ["语言1", "语言2"],
"summary": "200字摘要"
}
只输出 JSON,不要其他内容。""",
temperature=0.1, # 提取信息用低 temperature,追求一致性
)
async def process(self, input_data: str, context: dict = None) -> AgentResult:
response, tokens = await self._call_llm(f"文章内容:\n\n{input_data}")
try:
extracted = json.loads(response)
return AgentResult(
success=True,
output=extracted,
agent_name=self.name,
duration_ms=0,
token_used=tokens,
)
except json.JSONDecodeError as e:
return AgentResult(
success=False,
output=None,
agent_name=self.name,
duration_ms=0,
error=f"JSON parse failed: {e}. Response was: {response[:200]}",
)
class ContentRewriter(BaseAgent):
"""针对特定受众改写内容"""
AUDIENCE_PROMPTS = {
"junior": "适合初级工程师,避免术语,多举例子,保持简单",
"senior": "适合高级工程师,可以用技术术语,聚焦实现细节和最佳实践",
"manager": "适合技术管理者,强调业务价值和风险,弱化实现细节",
}
def __init__(self, target_audience: str = "junior"):
audience_instruction = self.AUDIENCE_PROMPTS.get(target_audience, self.AUDIENCE_PROMPTS["junior"])
super().__init__(
name=f"ContentRewriter_{target_audience}",
model="gpt-4o", # 改写用更强的模型
system_prompt=f"""你是一个专业的技术文章改写者。
根据原文关键信息,改写成适合目标受众的版本:{audience_instruction}
改写时保留技术准确性,但调整表达方式和深度。
输出完整的改写后文章,markdown 格式。""",
temperature=0.7,
)
self.target_audience = target_audience
async def process(self, input_data: dict, context: dict = None) -> AgentResult:
# input_data 是 KeyInfoExtractor 的输出
prompt = f"""原文关键信息:
{json.dumps(input_data, ensure_ascii=False, indent=2)}
请根据上述信息改写一篇面向 {self.target_audience} 的技术文章,约800-1200字。"""
response, tokens = await self._call_llm(prompt)
return AgentResult(
success=True,
output={"content": response, "audience": self.target_audience},
agent_name=self.name,
duration_ms=0,
token_used=tokens,
)
class QualityChecker(BaseAgent):
"""质量检查"""
def __init__(self):
super().__init__(
name="QualityChecker",
model="gpt-4o-mini",
system_prompt="""你是一个技术内容质量检查员。
检查文章是否符合要求,以 JSON 格式输出:
{
"passed": true/false,
"score": 1-10,
"issues": ["问题1", "问题2"],
"suggestions": ["建议1", "建议2"]
}""",
temperature=0.1,
)
async def process(self, input_data: dict, context: dict = None) -> AgentResult:
content = input_data.get("content", "")
original_info = context.get("original_info", {}) if context else {}
prompt = f"""检查以下文章的质量:
目标受众:{input_data.get('audience', '未知')}
原文关键信息:{json.dumps(original_info, ensure_ascii=False)}
文章内容:
{content}
检查标准:技术准确性、表达清晰度、适合目标受众、内容完整性"""
response, tokens = await self._call_llm(prompt)
try:
check_result = json.loads(response)
except json.JSONDecodeError:
check_result = {"passed": False, "score": 0, "issues": ["JSON parse failed"], "suggestions": []}
return AgentResult(
success=check_result.get("passed", False),
output=check_result,
agent_name=self.name,
duration_ms=0,
token_used=tokens,
)工作流编排
# workflow/pipeline.py
import asyncio
from typing import Callable, Optional
from agents.base import AgentResult
import logging
logger = logging.getLogger(__name__)
class WorkflowStep:
def __init__(
self,
name: str,
agent,
input_transform: Callable = None, # 转换上一步的输出作为这步的输入
condition: Callable = None, # 满足条件才执行
retry_count: int = 2,
):
self.name = name
self.agent = agent
self.input_transform = input_transform or (lambda x, _: x)
self.condition = condition or (lambda _: True)
self.retry_count = retry_count
class SequentialWorkflow:
"""顺序执行的工作流"""
def __init__(self, name: str, steps: list[WorkflowStep]):
self.name = name
self.steps = steps
async def run(self, initial_input: any) -> dict:
context = {"initial_input": initial_input}
step_results = {}
current_input = initial_input
for step in self.steps:
# 检查条件
if not step.condition(context):
logger.info(f"[Workflow:{self.name}] Step {step.name} skipped (condition not met)")
continue
# 转换输入
step_input = step.input_transform(current_input, context)
# 执行(带重试)
result = None
for attempt in range(step.retry_count + 1):
result = await step.agent.run(step_input, context)
if result.success:
break
if attempt < step.retry_count:
logger.warning(f"[Workflow:{self.name}] Step {step.name} failed (attempt {attempt+1}), retrying...")
await asyncio.sleep(2 ** attempt) # 指数退避
step_results[step.name] = result
context[step.name] = result.output
if not result.success:
logger.error(f"[Workflow:{self.name}] Step {step.name} failed after {step.retry_count} retries")
return {
"success": False,
"failed_step": step.name,
"error": result.error,
"step_results": step_results,
}
current_input = result.output
# 计算总 token 用量
total_tokens = sum(r.token_used for r in step_results.values())
total_duration = sum(r.duration_ms for r in step_results.values())
return {
"success": True,
"final_output": current_input,
"step_results": step_results,
"total_tokens": total_tokens,
"total_duration_ms": total_duration,
}踩坑实录:并行 Agent 的输出合并
有些步骤可以并行执行(比如同时为多个受众改写):
class ParallelWorkflow:
"""并行执行多个 Agent,然后合并结果"""
def __init__(self, agents: list, merge_fn: Callable):
self.agents = agents
self.merge_fn = merge_fn
async def run(self, input_data: any, context: dict = None) -> dict:
# 并行执行所有 Agent
tasks = [agent.run(input_data, context) for agent in self.agents]
results = await asyncio.gather(*tasks, return_exceptions=True)
successful_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"Agent {self.agents[i].name} threw exception: {result}")
elif result.success:
successful_results.append(result)
if not successful_results:
return {"success": False, "error": "All parallel agents failed"}
merged = self.merge_fn(successful_results)
return {"success": True, "output": merged}
# 使用示例:同时为三种受众改写
rewriters = [
ContentRewriter("junior"),
ContentRewriter("senior"),
ContentRewriter("manager"),
]
parallel = ParallelWorkflow(
agents=rewriters,
merge_fn=lambda results: {r.output["audience"]: r.output["content"] for r in results},
)完整的工作流使用示例
# main_workflow.py
from agents.content_agents import KeyInfoExtractor, ContentRewriter, QualityChecker
from workflow.pipeline import SequentialWorkflow, WorkflowStep
async def process_article(article_text: str) -> dict:
extractor = KeyInfoExtractor()
rewriter = ContentRewriter(target_audience="senior")
checker = QualityChecker()
workflow = SequentialWorkflow(
name="ArticleProcessing",
steps=[
WorkflowStep(
name="extract",
agent=extractor,
retry_count=2,
),
WorkflowStep(
name="rewrite",
agent=rewriter,
input_transform=lambda prev, ctx: prev, # 直接传上一步的 JSON 输出
retry_count=1,
),
WorkflowStep(
name="quality_check",
agent=checker,
input_transform=lambda prev, ctx: prev,
condition=lambda ctx: ctx.get("extract") is not None,
),
],
)
result = await workflow.run(article_text)
return result
# 运行
import asyncio
result = asyncio.run(process_article("# Python 入门\n...文章内容..."))
print(f"Success: {result['success']}")
print(f"Total tokens: {result.get('total_tokens', 0)}")这个框架几百行代码,但完全透明、可扩展,每一步的输入输出都清晰可见,出了问题立刻能定位到哪个 Agent。比用黑盒框架省下来的调试时间,远超多写这几百行代码的时间。
