LangGraph 实战——用状态机管理复杂 AI 工作流
LangGraph 实战——用状态机管理复杂 AI 工作流
适读人群:有 LangChain 基础、想做复杂 Agent 的工程师 | 阅读时长:约 15 分钟 | 核心价值:用 LangGraph 实现一个有条件分支、人工介入、多步骤的真实 Agent,包含完整踩坑记录
我第一次遇到需要 LangGraph 的场景,是在做一个合同审查 Agent 的时候。
需求很简单,说出来:自动审查合同文本,识别风险条款,给出修改建议,如果风险等级是"高",必须让人工确认才能输出最终报告。
用普通的 LangChain Chain,写着写着就乱了。你要维护"当前到第几步"的状态,条件分支用 if-else 手写,人工介入要自己实现中断和恢复……代码越来越难看,逻辑越来越难追。
后来我改用 LangGraph,代码清晰多了,逻辑也变成了可以画出来的图。
LangGraph 是什么,为什么需要它
LangGraph 是 LangChain 团队做的工作流编排库。核心思想是:把 AI 工作流建模成有向图,节点是处理步骤,边是条件分支,共享状态在整个图里流动。
传统的链式调用(Chain)适合线性流程:A -> B -> C。但稍微复杂一点的场景就不够用了:
- 需要根据中间结果决定走哪条路
- 需要循环(比如"如果质量不达标,重新生成")
- 需要在某个节点暂停,等人工输入
- 需要并行执行多个步骤
这些场景,LangGraph 都能处理,而且你的工作流逻辑是显式可视的,而不是藏在一堆 if-else 里。
从零开始:合同审查 Agent
我把这个项目的核心逻辑提炼出来,做成一个可以直接跑的示例。
工作流是这样的:
输入合同文本
|
v
[提取关键条款]
|
v
[识别风险条款]
|
+--- 无高风险 ---> [生成报告] ---> 结束
|
+--- 有高风险 ---> [等待人工确认] ---> [生成带人工意见的报告] ---> 结束安装依赖
pip install langgraph langchain-openai langchain-core定义状态
LangGraph 里,所有节点共享一个 State 对象。定义好 State 是关键的第一步:
# contract_agent.py
from typing import TypedDict, List, Optional, Annotated
import operator
class RiskClause:
def __init__(self, clause_text: str, risk_level: str, risk_description: str):
self.clause_text = clause_text
self.risk_level = risk_level # "low", "medium", "high"
self.risk_description = risk_description
class ContractReviewState(TypedDict):
# 输入
contract_text: str
# 中间结果(用 Annotated + operator.add 实现追加语义,适合 list 类型)
key_clauses: List[str]
risk_clauses: List[dict] # [{"clause": str, "level": str, "description": str}]
# 人工确认相关
has_high_risk: bool
human_review_required: bool
human_feedback: Optional[str] # 人工输入的意见
# 输出
final_report: Optional[str]
# 工作流控制
current_step: str
error: Optional[str]定义节点
每个节点是一个函数,接收 State,返回 State 的更新:
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
import json
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
llm_strong = ChatOpenAI(model="gpt-4o", temperature=0)
def extract_key_clauses(state: ContractReviewState) -> dict:
"""节点1:提取关键条款"""
print(">> 正在提取关键条款...")
response = llm.invoke([
SystemMessage(content="""你是一个法律助手。从合同文本中提取关键条款。
每个条款单独一行输出,格式:[类型] 条款内容
类型包括:付款条款、违约条款、保密条款、知识产权条款、终止条款等"""),
HumanMessage(content=f"合同文本:\n{state['contract_text']}")
])
clauses = [line.strip() for line in response.content.split('\n') if line.strip()]
return {
"key_clauses": clauses,
"current_step": "extracted_clauses"
}
def identify_risks(state: ContractReviewState) -> dict:
"""节点2:识别风险条款"""
print(">> 正在分析风险...")
clauses_text = "\n".join(state["key_clauses"])
response = llm_strong.invoke([
SystemMessage(content="""你是一个合同风险分析专家。分析以下条款的风险。
对每个有风险的条款,用JSON格式输出:
{"clause": "条款内容", "level": "low|medium|high", "description": "风险说明"}
多个条款用JSON数组输出。如果没有风险条款,输出空数组[]。
只输出JSON,不要其他内容。"""),
HumanMessage(content=f"关键条款:\n{clauses_text}")
])
try:
risk_clauses = json.loads(response.content)
except json.JSONDecodeError:
# LLM 输出不是纯 JSON,尝试提取
import re
json_match = re.search(r'\[.*\]', response.content, re.DOTALL)
if json_match:
risk_clauses = json.loads(json_match.group())
else:
risk_clauses = []
has_high_risk = any(c.get("level") == "high" for c in risk_clauses)
return {
"risk_clauses": risk_clauses,
"has_high_risk": has_high_risk,
"human_review_required": has_high_risk,
"current_step": "identified_risks"
}
def generate_report(state: ContractReviewState) -> dict:
"""节点3a:生成最终报告(无高风险路径)"""
print(">> 正在生成审查报告...")
risk_summary = "\n".join([
f"- [{c['level'].upper()}] {c['clause']}: {c['description']}"
for c in state.get("risk_clauses", [])
])
report = f"""# 合同审查报告
## 关键条款摘要
{chr(10).join(f'- {c}' for c in state.get('key_clauses', []))}
## 风险分析
{risk_summary if risk_summary else '未发现明显风险条款'}
## 结论
{'该合同存在以下风险,建议在签署前与对方协商修改。' if state.get('risk_clauses') else '该合同整体风险可控,可以签署。'}
"""
return {
"final_report": report,
"current_step": "report_generated"
}
def generate_report_with_human_feedback(state: ContractReviewState) -> dict:
"""节点3b:生成包含人工意见的报告(高风险路径)"""
print(">> 正在生成含人工意见的审查报告...")
high_risk_clauses = [c for c in state.get("risk_clauses", []) if c.get("level") == "high"]
all_risk_clauses = state.get("risk_clauses", [])
risk_summary = "\n".join([
f"- [{c['level'].upper()}] {c['clause']}: {c['description']}"
for c in all_risk_clauses
])
report = f"""# 合同审查报告(含法务审核意见)
## 关键条款摘要
{chr(10).join(f'- {c}' for c in state.get('key_clauses', []))}
## 风险分析
{risk_summary}
## 高风险条款(需重点关注)
{chr(10).join(f'- {c["clause"]}' for c in high_risk_clauses)}
## 法务审核意见
{state.get('human_feedback', '(未提供意见)')}
## 结论
该合同存在高风险条款,已经过法务人工审核。请根据以上意见决定是否签署。
"""
return {
"final_report": report,
"current_step": "report_generated_with_feedback"
}
# 人工确认节点——这是 LangGraph 的关键特性:interrupt
def human_review_node(state: ContractReviewState) -> dict:
"""
这个节点不做任何处理,只是一个占位符
真正的中断逻辑通过 interrupt_before 配置实现
当工作流运行到这里时,会暂停并等待外部输入
"""
return {"current_step": "awaiting_human_review"}定义边的条件逻辑
def should_require_human_review(state: ContractReviewState) -> str:
"""条件边:决定走哪条路"""
if state.get("has_high_risk"):
return "need_human_review"
else:
return "no_risk"组装 Graph
from langgraph.graph import StateGraph, END
from langgraph.checkpoint.memory import MemorySaver
def build_contract_review_graph():
# 创建图
workflow = StateGraph(ContractReviewState)
# 添加节点
workflow.add_node("extract_clauses", extract_key_clauses)
workflow.add_node("identify_risks", identify_risks)
workflow.add_node("human_review", human_review_node)
workflow.add_node("generate_report", generate_report)
workflow.add_node("generate_report_with_feedback", generate_report_with_human_feedback)
# 设置入口
workflow.set_entry_point("extract_clauses")
# 线性边
workflow.add_edge("extract_clauses", "identify_risks")
# 条件边
workflow.add_conditional_edges(
"identify_risks",
should_require_human_review,
{
"need_human_review": "human_review",
"no_risk": "generate_report",
}
)
# 人工审核后继续
workflow.add_edge("human_review", "generate_report_with_feedback")
# 终止边
workflow.add_edge("generate_report", END)
workflow.add_edge("generate_report_with_feedback", END)
# 编译,配置内存持久化(用于中断恢复)
memory = MemorySaver()
app = workflow.compile(
checkpointer=memory,
interrupt_before=["human_review"] # 在 human_review 节点前中断
)
return app运行 Agent
def run_contract_review(contract_text: str):
app = build_contract_review_graph()
# 用 thread_id 标识一次会话
config = {"configurable": {"thread_id": "contract-review-001"}}
initial_state = {
"contract_text": contract_text,
"key_clauses": [],
"risk_clauses": [],
"has_high_risk": False,
"human_review_required": False,
"human_feedback": None,
"final_report": None,
"current_step": "init",
"error": None,
}
print("=== 开始合同审查 ===")
# 第一次运行,会在 human_review 节点前暂停(如果有高风险)
for event in app.stream(initial_state, config):
for node_name, node_output in event.items():
if node_name != "__end__":
print(f" 完成节点: {node_name}")
# 检查当前状态
current_state = app.get_state(config)
if current_state.next == ("human_review",):
# 工作流暂停在 human_review,等待人工输入
print("\n=== 检测到高风险条款,需要人工审核 ===")
print("\n风险条款:")
for clause in current_state.values.get("risk_clauses", []):
if clause.get("level") == "high":
print(f" [HIGH RISK] {clause['clause']}: {clause['description']}")
# 这里在真实场景中,你会把数据展示给审核人员,等待他们输入
# 模拟人工输入
human_feedback = input("\n请输入法务审核意见(直接回车跳过): ").strip()
if not human_feedback:
human_feedback = "已审阅高风险条款,建议在付款条款中明确违约金上限,并要求对方提供履约保证金。"
# 更新状态并继续运行
app.update_state(config, {"human_feedback": human_feedback})
print("\n=== 继续生成报告 ===")
for event in app.stream(None, config):
for node_name, node_output in event.items():
if node_name != "__end__":
print(f" 完成节点: {node_name}")
# 获取最终结果
final_state = app.get_state(config)
report = final_state.values.get("final_report")
print("\n=== 审查完成 ===")
print(report)
return report
# 测试用合同文本
sample_contract = """
本合同由甲方(委托方)和乙方(承包方)签订。
付款条款:甲方在项目完成后90天内付款,若甲方认为工作质量不满意,有权无限期延期付款。
保密条款:乙方需对甲方所有业务信息保密,保密期限为永久,违约赔偿金额不设上限。
知识产权条款:乙方在项目中产生的所有知识产权自动归甲方所有,包括乙方在项目外独立开发的相关技术。
违约条款:若乙方未能按时交付,每日扣除合同金额的1%作为违约金,无上限。
终止条款:甲方有权在任何时候无需理由终止合同,乙方不得要求赔偿。
"""
if __name__ == "__main__":
run_contract_review(sample_contract)踩过的坑
坑1:State 里的 list 字段会被覆盖,不是追加
我刚开始用的时候,以为节点返回的更新会自动追加到 list 里。结果发现每次都是覆盖。
要实现追加语义,需要在 TypedDict 里用 Annotated:
from typing import Annotated
import operator
class MyState(TypedDict):
# 这样写:每个节点的返回会"追加"到 messages,而不是覆盖
messages: Annotated[list, operator.add]
# 普通写法:每次覆盖
result: str坑2:interrupt_before 必须在 compile 时配置,不能运行时改
我一开始想动态决定是否中断,发现不行。中断点必须在 workflow.compile() 时通过 interrupt_before 参数固定下来。如果你需要条件中断,就得在节点里做判断然后直接用条件边绕过中断节点。
坑3:MemorySaver 是进程内存,重启就丢
MemorySaver 只适合开发阶段。生产里要用 SqliteSaver 或 PostgresSaver:
from langgraph.checkpoint.sqlite import SqliteSaver
# 生产里用数据库持久化
memory = SqliteSaver.from_conn_string("./checkpoints.db")
app = workflow.compile(checkpointer=memory, interrupt_before=["human_review"])坑4:stream 和 invoke 的区别
app.invoke() 等到整个图执行完才返回。app.stream() 每个节点完成后立刻 yield 出来。有中断的场景一定要用 stream,否则你不知道什么时候暂停了。
坑5:update_state 后要用 None 触发继续执行
恢复中断的时候,调用 app.stream(None, config) 而不是 app.stream(new_state, config)。前者是"继续当前状态执行",后者是"用新状态重新开始"。我在这里卡了半天。
LangGraph 适合什么场景
适合:
- 工作流有明确的多个步骤,步骤之间有条件分支
- 需要人工介入(Human-in-the-loop)
- 需要循环(比如"生成-评估-如果不够好就重新生成")
- 工作流状态需要持久化(用户中途离开,下次可以继续)
不适合:
- 简单的单轮或双轮 LLM 调用,用普通的 Chain 就够了
- 纯线性的步骤,没有分支也没有循环,LangGraph 引入了额外复杂度
LangGraph 的学习曲线比 LangChain Chain 陡,但一旦需求复杂到一定程度,它带来的清晰度是值得的。那个合同审查 Agent,如果用手写 if-else 实现,代码量差不多,但维护性差很多——你很难从代码里直接看出工作流逻辑,而 LangGraph 的图结构让逻辑一目了然。
