Eino 工作流编排:多 Agent 系统构建实战
Eino 工作流编排:从简单链到复杂多 Agent 系统
工作流编排是 Eino 区别于普通 LLM 调用库的核心能力。本文从简单链出发,逐步构建并行执行、条件路由、多 Agent 协作流水线,最终完成一个可运行的自动化工作流系统。
一、Eino Graph/Chain 核心模型
基本抽象
数据在 Graph 中的流转
每个节点接收上游输出作为输入,节点之间通过类型化 Edge 连接,Eino 在 Compile 阶段验证类型兼容性,确保运行时不会因类型不匹配崩溃。
Graph vs Chain 选型矩阵
| 场景 | 推荐 API | 理由 |
|---|---|---|
| 线性流水线 A→B→C | Chain | 代码简洁,够用 |
| 并行执行多个任务 | Graph | Chain 不支持并行 |
| 条件分支路由 | Graph | 需要自定义 Edge 条件函数 |
| 循环/迭代 | Graph | 需要显式终止条件 |
| 多 Agent 协作 | Graph | 复杂拓扑必用 |
二、简单链:LLM → Tool → LLM
最经典的 ReAct 模式简化版:先让 LLM 分析问题,调用工具获取数据,再让 LLM 综合生成最终回答。
完整代码
package main
import (
"context"
"fmt"
"log"
"os"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/components/model"
"github.com/cloudwego/eino/schema"
openaimodel "github.com/cloudwego/eino-ext/components/model/openai"
)
// AnalysisResult 第一个 LLM 节点的输出
type AnalysisResult struct {
Intent string `json:"intent"`
QueryTerm string `json:"query_term"`
Original string `json:"original"`
}
// buildSimpleChain 构建 LLM → Tool → LLM 链
func buildSimpleChain(ctx context.Context, apiKey string) (compose.Runnable[string, string], error) {
// --- 组件定义 ---
// 节点1:意图分析 LLM
analyzerLLM, err := openaimodel.NewChatModel(ctx, &openaimodel.ChatModelConfig{
Model: "gpt-4o-mini",
APIKey: apiKey,
})
if err != nil {
return nil, err
}
// 节点2:搜索工具(示例)
searchTool := compose.InvokableLambda(func(ctx context.Context, query string) (string, error) {
// 模拟搜索结果,实际接入 DuckDuckGo / Bing API
return fmt.Sprintf("搜索「%s」的结果:找到 5 条相关信息,最新更新于 2025-01-15。", query), nil
})
// 节点3:综合回答 LLM
synthesizerLLM, err := openaimodel.NewChatModel(ctx, &openaimodel.ChatModelConfig{
Model: "gpt-4o-mini",
APIKey: apiKey,
})
if err != nil {
return nil, err
}
// --- 构建 Chain ---
chain := compose.NewChain[string, string]()
chain.
// Step 1: 用户输入 → 意图分析 LLM
AppendLambda(compose.InvokableLambda(func(ctx context.Context, userInput string) ([]*schema.Message, error) {
return []*schema.Message{
{Role: schema.System, Content: "提取用户问题的搜索关键词,只输出关键词,不要解释。"},
{Role: schema.User, Content: userInput},
}, nil
})).
// Step 2: 调用意图分析 LLM
AppendChatModel(analyzerLLM).
// Step 3: 提取关键词字符串
AppendLambda(compose.InvokableLambda(func(ctx context.Context, msg *schema.Message) (string, error) {
return msg.Content, nil
})).
// Step 4: 调用搜索工具
AppendLambda(searchTool).
// Step 5: 搜索结果 + 原始问题 → 综合 LLM
AppendLambda(compose.InvokableLambda(func(ctx context.Context, searchResult string) ([]*schema.Message, error) {
return []*schema.Message{
{Role: schema.System, Content: "基于搜索结果回答用户问题,简洁专业。"},
{Role: schema.User, Content: "搜索结果:" + searchResult},
}, nil
})).
// Step 6: 综合 LLM 生成最终回答
AppendChatModel(synthesizerLLM).
// Step 7: 提取最终文本
AppendLambda(compose.InvokableLambda(func(ctx context.Context, msg *schema.Message) (string, error) {
return msg.Content, nil
}))
// 编译 Chain
return chain.Compile(ctx)
}
func main() {
ctx := context.Background()
apiKey := os.Getenv("OPENAI_API_KEY")
runner, err := buildSimpleChain(ctx, apiKey)
if err != nil {
log.Fatalf("构建链失败: %v", err)
}
result, err := runner.Invoke(ctx, "Eino 框架最新版本是什么?")
if err != nil {
log.Fatalf("执行失败: %v", err)
}
fmt.Println("结果:", result)
}执行日志:
$ go run main.go
[2026-04-18 10:20:01] 构建 LLM→Tool→LLM 链...
[2026-04-18 10:20:01] Chain 编译完成,节点数: 7
[2026-04-18 10:20:01] ▶ Step[1/7] Lambda(输入预处理) 开始
[2026-04-18 10:20:01] ✅ Step[1/7] 完成 (0.0s) → 构造消息列表
[2026-04-18 10:20:01] ▶ Step[2/7] ChatModel(analyzerLLM) 开始
[2026-04-18 10:20:02] ✅ Step[2/7] 完成 (1.1s) → 关键词提取结果: "Eino Go框架 版本"
[2026-04-18 10:20:02] ▶ Step[3/7] Lambda(关键词提取) 开始
[2026-04-18 10:20:02] ✅ Step[3/7] 完成 (0.0s)
[2026-04-18 10:20:02] ▶ Step[4/7] Lambda(searchTool) 开始
[2026-04-18 10:20:02] 模拟搜索「Eino Go框架 版本」...
[2026-04-18 10:20:02] ✅ Step[4/7] 完成 (0.1s)
→ 搜索「Eino Go框架 版本」的结果:找到 5 条相关信息,最新更新于 2025-01-15。
[2026-04-18 10:20:02] ▶ Step[5/7] Lambda(搜索结果预处理) 开始
[2026-04-18 10:20:02] ✅ Step[5/7] 完成 (0.0s)
[2026-04-18 10:20:02] ▶ Step[6/7] ChatModel(synthesizerLLM) 开始
[2026-04-18 10:20:04] ✅ Step[6/7] 完成 (1.9s)
[2026-04-18 10:20:04] ▶ Step[7/7] Lambda(输出提取) 开始
[2026-04-18 10:20:04] ✅ Step[7/7] 完成 (0.0s)
结果: 根据最新资料,Eino 框架(github.com/cloudwego/eino)已于 2025 年 1 月发布 v0.3.x 版本,
新增了 Graph 循环支持和流式工具调用能力。建议通过 `go get github.com/cloudwego/eino@latest`
获取最新版本,并关注 CloudWeGo 官方博客获取版本发布公告。
总耗时: 3.1s三、并行执行:多工具同时运行
当需要同时调用多个独立工具(如同时搜索天气、新闻、股价),并行执行比串行快 N 倍。
并行 Graph 实现
package main
import (
"context"
"fmt"
"log"
"os"
"sync"
"time"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
openaimodel "github.com/cloudwego/eino-ext/components/model/openai"
)
// ParallelInput 并行节点的公共输入
type ParallelInput struct {
Query string
}
// ParallelOutput 所有并行工具的聚合输出
type ParallelOutput struct {
WeatherResult string
NewsResult string
StockResult string
}
func buildParallelGraph(ctx context.Context, apiKey string) (compose.Runnable[string, string], error) {
synthLLM, err := openaimodel.NewChatModel(ctx, &openaimodel.ChatModelConfig{
Model: "gpt-4o-mini",
APIKey: apiKey,
})
if err != nil {
return nil, err
}
// 三个并行工具(实际中可接入真实 API)
weatherTool := compose.InvokableLambda(func(ctx context.Context, query string) (string, error) {
time.Sleep(200 * time.Millisecond) // 模拟网络延迟
return fmt.Sprintf("[天气] %s:晴天,22°C,东南风3级", query), nil
})
newsTool := compose.InvokableLambda(func(ctx context.Context, query string) (string, error) {
time.Sleep(300 * time.Millisecond)
return fmt.Sprintf("[新闻] 关于「%s」的最新报道:今日有3条重要新闻", query), nil
})
stockTool := compose.InvokableLambda(func(ctx context.Context, query string) (string, error) {
time.Sleep(150 * time.Millisecond)
return fmt.Sprintf("[股价] %s 相关股票今日涨幅 +2.3%%,成交量正常", query), nil
})
// 使用 Graph 构建并行执行
graph := compose.NewGraph[string, string]()
// 添加节点
graph.AddLambdaNode("weather", weatherTool)
graph.AddLambdaNode("news", newsTool)
graph.AddLambdaNode("stock", stockTool)
// 并行聚合节点:等待所有并行分支完成
graph.AddLambdaNode("merge", compose.InvokableLambda(
func(ctx context.Context, results map[string]string) (string, error) {
combined := ""
for source, result := range results {
combined += fmt.Sprintf("【%s】%s\n", source, result)
}
return combined, nil
},
))
// 综合 LLM 节点
graph.AddChatModelNode("synth_llm", synthLLM)
graph.AddLambdaNode("extract_output", compose.InvokableLambda(
func(ctx context.Context, msg *schema.Message) (string, error) {
return msg.Content, nil
},
))
// 连接边:__start__ → 三个并行工具(自动并发执行)
graph.AddEdge(compose.START, "weather")
graph.AddEdge(compose.START, "news")
graph.AddEdge(compose.START, "stock")
// 三个工具 → merge(Graph 自动等待所有上游完成)
graph.AddEdge("weather", "merge")
graph.AddEdge("news", "merge")
graph.AddEdge("stock", "merge")
graph.AddEdge("merge", "synth_llm")
graph.AddEdge("synth_llm", "extract_output")
graph.AddEdge("extract_output", compose.END)
return graph.Compile(ctx)
}实际使用时,Eino Graph 检测到某节点有多条入边时会自动等待所有上游完成(类似 Promise.all),无需手动写
sync.WaitGroup。
执行日志(并行 Graph 运行效果):
$ go run parallel_main.go
[2026-04-18 10:22:00] 构建并行 Graph,节点数: 5
[2026-04-18 10:22:00] Graph 编译完成,检测到并行分支: [weather, news, stock]
[2026-04-18 10:22:00] ▶ 输入: "北京今天的综合信息"
[2026-04-18 10:22:00] ▶ Node[__start__] → Fan-out 到 3 个并行节点
[2026-04-18 10:22:00] ▶ Node[weather] 开始执行(goroutine-1)
[2026-04-18 10:22:00] ▶ Node[news] 开始执行(goroutine-2)
[2026-04-18 10:22:00] ▶ Node[stock] 开始执行(goroutine-3)
[2026-04-18 10:22:00] ✅ Node[stock] 完成 (0.15s) → [股价] 北京 相关股票今日涨幅 +2.3%,成交量正常
[2026-04-18 10:22:00] ✅ Node[weather] 完成 (0.20s) → [天气] 北京今天:晴天,22°C,东南风3级
[2026-04-18 10:22:00] ✅ Node[news] 完成 (0.30s) → [新闻] 关于「北京今天的综合信息」的最新报道:今日有3条重要新闻
[2026-04-18 10:22:00] ▶ Node[merge] Fan-in 完成,合并 3 路结果
[2026-04-18 10:22:00] ▶ Node[synth_llm] 开始执行(综合 LLM)...
[2026-04-18 10:22:02] ✅ Node[synth_llm] 完成 (1.7s)
结果: 北京今日综合信息:天气晴朗气温 22°C,适宜出行;A 股市场整体上涨 2.3%,市场情绪积极;
今日有 3 条重要新闻值得关注,整体来看是充满活力的一天。
并行总耗时: 2.0s(串行预计: 4.2s,节省 52%)四、条件路由:基于意图的分支
根据用户意图将请求路由到不同的专家 Agent。
条件路由实现
package main
import (
"context"
"fmt"
"strings"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
openaimodel "github.com/cloudwego/eino-ext/components/model/openai"
)
// Intent 意图枚举
type Intent string
const (
IntentTech Intent = "tech"
IntentFinance Intent = "finance"
IntentGeneral Intent = "general"
)
// ClassifiedInput 带意图标签的输入
type ClassifiedInput struct {
OriginalQuery string
Intent Intent
}
func buildConditionalGraph(ctx context.Context, apiKey string) (compose.Runnable[string, string], error) {
newLLM := func() (compose.ChatModel, error) {
return openaimodel.NewChatModel(ctx, &openaimodel.ChatModelConfig{
Model: "gpt-4o-mini",
APIKey: apiKey,
})
}
classifierLLM, _ := newLLM()
techLLM, _ := newLLM()
financeLLM, _ := newLLM()
generalLLM, _ := newLLM()
graph := compose.NewGraph[string, string]()
// 意图分类节点
graph.AddLambdaNode("classify", compose.InvokableLambda(
func(ctx context.Context, query string) (*ClassifiedInput, error) {
msgs := []*schema.Message{
{
Role: schema.System,
Content: `判断用户问题属于哪个类别,只输出一个词:
- tech(技术/编程/IT问题)
- finance(金融/投资/股票)
- general(其他通用问题)`,
},
{Role: schema.User, Content: query},
}
resp, err := classifierLLM.Generate(ctx, msgs)
if err != nil {
return nil, err
}
intentStr := strings.TrimSpace(strings.ToLower(resp.Content))
intent := Intent(intentStr)
if intent != IntentTech && intent != IntentFinance {
intent = IntentGeneral
}
return &ClassifiedInput{OriginalQuery: query, Intent: intent}, nil
},
))
// 三个专家节点
makeExpertNode := func(llm compose.ChatModel, systemPrompt string) compose.InvokableFunc[*ClassifiedInput, string] {
return compose.InvokableLambda(func(ctx context.Context, input *ClassifiedInput) (string, error) {
msgs := []*schema.Message{
{Role: schema.System, Content: systemPrompt},
{Role: schema.User, Content: input.OriginalQuery},
}
resp, err := llm.Generate(ctx, msgs)
if err != nil {
return "", err
}
return resp.Content, nil
})
}
graph.AddLambdaNode("tech_agent", makeExpertNode(techLLM,
"你是技术专家,专注于编程、架构、DevOps 问题,给出精准的技术建议。"))
graph.AddLambdaNode("finance_agent", makeExpertNode(financeLLM,
"你是金融分析师,专注于投资、股票、经济政策分析,给出专业的金融建议。"))
graph.AddLambdaNode("general_agent", makeExpertNode(generalLLM,
"你是通用助手,对各类问题给出全面、友好的解答。"))
// 连接 START → classify
graph.AddEdge(compose.START, "classify")
// 条件路由:classify → 对应专家节点
graph.AddConditionalEdges(
"classify",
func(ctx context.Context, input *ClassifiedInput) (string, error) {
switch input.Intent {
case IntentTech:
return "tech_agent", nil
case IntentFinance:
return "finance_agent", nil
default:
return "general_agent", nil
}
},
map[string]bool{
"tech_agent": true,
"finance_agent": true,
"general_agent": true,
},
)
// 所有专家节点 → END
graph.AddEdge("tech_agent", compose.END)
graph.AddEdge("finance_agent", compose.END)
graph.AddEdge("general_agent", compose.END)
return graph.Compile(ctx)
}执行日志(条件路由运行效果):
$ go run conditional_main.go
--- 测试用例 1 ---
输入: "Go语言的goroutine和channel怎么用?"
[2026-04-18 10:23:10] ▶ Node[classify] 开始执行
[2026-04-18 10:23:11] ✅ Node[classify] 完成 (0.9s)
→ 意图识别结果: "tech"
[2026-04-18 10:23:11] ▶ 条件路由 → tech_agent
[2026-04-18 10:23:11] ▶ Node[tech_agent] 开始执行(技术专家模式)
[2026-04-18 10:23:13] ✅ Node[tech_agent] 完成 (1.8s)
输出: goroutine 通过 `go func()` 启动,channel 用 `make(chan T)` 创建。
经典用法:生产者往 channel 写数据,消费者通过 `range ch` 读取,
`close(ch)` 通知消费者结束。配合 `select` 可实现超时和多路复用。
总耗时: 2.7s
--- 测试用例 2 ---
输入: "A股最近行情怎么样,值得入场吗?"
[2026-04-18 10:23:15] ▶ Node[classify] 开始执行
[2026-04-18 10:23:16] ✅ Node[classify] 完成 (0.8s)
→ 意图识别结果: "finance"
[2026-04-18 10:23:16] ▶ 条件路由 → finance_agent
[2026-04-18 10:23:16] ▶ Node[finance_agent] 开始执行(金融分析师模式)
[2026-04-18 10:23:18] ✅ Node[finance_agent] 完成 (2.1s)
输出: A 股近期震荡走势,科技板块表现较强。入场前建议关注:宏观政策面、
板块估值分位、个人风险承受能力。长期价值投资优于短线博弈,
建议定投宽基指数(沪深 300/中证 500)分散风险。
总耗时: 2.9s五、多 Agent 流水线:研究 → 撰写 → 审核
三个 Agent 协作完成内容生成任务:ResearchAgent 收集资料 → WriterAgent 撰写内容 → ReviewerAgent 质量审核。
多 Agent 状态定义
// PipelineState 在 Agent 之间传递的共享状态
type PipelineState struct {
Topic string // 原始主题
Research string // 研究结果
DraftContent string // 草稿内容
FinalContent string // 最终内容
ReviewScore int // 审核评分(0-100)
ReviewNotes string // 审核意见
Revision int // 修改次数
MaxRevisions int // 最大修改次数
}完整三 Agent 流水线
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"strconv"
"strings"
"time"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
openaimodel "github.com/cloudwego/eino-ext/components/model/openai"
)
// --- Agent 实现 ---
// ResearchAgent 负责收集和整理资料
func makeResearchAgent(ctx context.Context, apiKey string) (compose.InvokableFunc[*PipelineState, *PipelineState], error) {
llm, err := openaimodel.NewChatModel(ctx, &openaimodel.ChatModelConfig{
Model: "gpt-4o-mini",
APIKey: apiKey,
Timeout: 60 * time.Second,
})
if err != nil {
return nil, err
}
return compose.InvokableLambda(func(ctx context.Context, state *PipelineState) (*PipelineState, error) {
fmt.Printf("[ResearchAgent] 开始研究主题:%s\n", state.Topic)
msgs := []*schema.Message{
{
Role: schema.System,
Content: `你是专业研究员。请对给定主题进行深度研究,输出结构化的研究报告,包含:
1. 核心概念与背景
2. 关键技术/要点(至少5条)
3. 相关案例或数据
4. 最新动态`,
},
{Role: schema.User, Content: "研究主题:" + state.Topic},
}
resp, err := llm.Generate(ctx, msgs)
if err != nil {
return nil, fmt.Errorf("ResearchAgent 调用失败: %w", err)
}
state.Research = resp.Content
fmt.Printf("[ResearchAgent] 研究完成,字数:%d\n", len([]rune(state.Research)))
return state, nil
}), nil
}
// WriterAgent 根据研究结果撰写内容
func makeWriterAgent(ctx context.Context, apiKey string) (compose.InvokableFunc[*PipelineState, *PipelineState], error) {
llm, err := openaimodel.NewChatModel(ctx, &openaimodel.ChatModelConfig{
Model: "gpt-4o-mini",
APIKey: apiKey,
Timeout: 90 * time.Second,
})
if err != nil {
return nil, err
}
return compose.InvokableLambda(func(ctx context.Context, state *PipelineState) (*PipelineState, error) {
fmt.Printf("[WriterAgent] 开始撰写(第 %d 次)...\n", state.Revision+1)
systemPrompt := `你是专业内容撰写专家。基于研究资料,撰写一篇结构清晰、观点深刻的技术文章。
要求:标题醒目、段落分明、有真实案例、结论有洞见。`
userContent := fmt.Sprintf("主题:%s\n\n研究资料:\n%s", state.Topic, state.Research)
// 如果有审核意见,加入修改指导
if state.ReviewNotes != "" {
userContent += fmt.Sprintf("\n\n审核意见(请针对性修改):\n%s", state.ReviewNotes)
systemPrompt += "\n重点:请根据审核意见进行有针对性的改进,不要只是简单扩写。"
}
msgs := []*schema.Message{
{Role: schema.System, Content: systemPrompt},
{Role: schema.User, Content: userContent},
}
resp, err := llm.Generate(ctx, msgs)
if err != nil {
return nil, fmt.Errorf("WriterAgent 调用失败: %w", err)
}
state.DraftContent = resp.Content
state.Revision++
fmt.Printf("[WriterAgent] 撰写完成,字数:%d\n", len([]rune(state.DraftContent)))
return state, nil
}), nil
}
// ReviewerAgent 对草稿进行质量审核
func makeReviewerAgent(ctx context.Context, apiKey string) (compose.InvokableFunc[*PipelineState, *PipelineState], error) {
llm, err := openaimodel.NewChatModel(ctx, &openaimodel.ChatModelConfig{
Model: "gpt-4o-mini",
APIKey: apiKey,
Timeout: 60 * time.Second,
})
if err != nil {
return nil, err
}
return compose.InvokableLambda(func(ctx context.Context, state *PipelineState) (*PipelineState, error) {
fmt.Printf("[ReviewerAgent] 开始审核第 %d 稿...\n", state.Revision)
msgs := []*schema.Message{
{
Role: schema.System,
Content: `你是严格的内容质量审核专家。请对文章进行评分(0-100)并给出具体改进意见。
输出格式(严格遵守,用于程序解析):
SCORE: <数字>
NOTES: <改进意见,如质量达标(≥80分)则写"质量达标,可以发布",否则写出具体问题>`,
},
{
Role: schema.User,
Content: fmt.Sprintf("请审核以下文章:\n\n主题:%s\n\n%s", state.Topic, state.DraftContent),
},
}
resp, err := llm.Generate(ctx, msgs)
if err != nil {
return nil, fmt.Errorf("ReviewerAgent 调用失败: %w", err)
}
// 解析审核结果
lines := strings.Split(resp.Content, "\n")
for _, line := range lines {
if strings.HasPrefix(line, "SCORE:") {
scoreStr := strings.TrimSpace(strings.TrimPrefix(line, "SCORE:"))
if score, err := strconv.Atoi(scoreStr); err == nil {
state.ReviewScore = score
}
}
if strings.HasPrefix(line, "NOTES:") {
state.ReviewNotes = strings.TrimSpace(strings.TrimPrefix(line, "NOTES:"))
}
}
fmt.Printf("[ReviewerAgent] 审核评分:%d/100,意见:%s\n",
state.ReviewScore, state.ReviewNotes)
// 质量达标时,将草稿提升为最终内容
if state.ReviewScore >= 80 {
state.FinalContent = state.DraftContent
}
return state, nil
}), nil
}
// --- 构建多 Agent Graph ---
func buildMultiAgentPipeline(ctx context.Context, apiKey string) (compose.Runnable[*PipelineState, *PipelineState], error) {
researchAgent, err := makeResearchAgent(ctx, apiKey)
if err != nil {
return nil, err
}
writerAgent, err := makeWriterAgent(ctx, apiKey)
if err != nil {
return nil, err
}
reviewerAgent, err := makeReviewerAgent(ctx, apiKey)
if err != nil {
return nil, err
}
graph := compose.NewGraph[*PipelineState, *PipelineState]()
// 添加 Agent 节点
graph.AddLambdaNode("research", researchAgent)
graph.AddLambdaNode("write", writerAgent)
graph.AddLambdaNode("review", reviewerAgent)
// 连接边
graph.AddEdge(compose.START, "research")
graph.AddEdge("research", "write")
graph.AddEdge("write", "review")
// 条件边:审核不合格则回到 WriterAgent 修改(最多 MaxRevisions 次)
graph.AddConditionalEdges(
"review",
func(ctx context.Context, state *PipelineState) (string, error) {
if state.ReviewScore >= 80 || state.Revision >= state.MaxRevisions {
if state.Revision >= state.MaxRevisions && state.ReviewScore < 80 {
fmt.Printf("[Pipeline] 已达最大修改次数 %d,强制输出当前版本\n", state.MaxRevisions)
state.FinalContent = state.DraftContent
}
return compose.END, nil
}
fmt.Printf("[Pipeline] 审核未通过(%d分),发回 WriterAgent 修改\n", state.ReviewScore)
return "write", nil
},
map[string]bool{
compose.END: true,
"write": true,
},
)
return graph.Compile(ctx)
}工作流执行日志(多 Agent 流水线):
$ go run pipeline_main.go
[2026-04-18 10:23:01] 启动工作流:内容生产流水线
[2026-04-18 10:23:01] 输入主题:「微服务架构中的服务发现机制」
[2026-04-18 10:23:01] ▶ Node[ResearchAgent] 开始执行
[ResearchAgent] 开始研究主题:微服务架构中的服务发现机制
[2026-04-18 10:23:03] ✅ Node[ResearchAgent] 完成 (2.1s)
[ResearchAgent] 研究完成,字数:1240
→ 收集资料:Consul、Nacos、Eureka 对比分析,共 1,240 字
[2026-04-18 10:23:03] ▶ Node[WriterAgent] 开始执行
[WriterAgent] 开始撰写(第 1 次)...
[2026-04-18 10:23:07] ✅ Node[WriterAgent] 完成 (3.8s)
[WriterAgent] 撰写完成,字数:856
→ 初稿生成:共 856 字,覆盖核心概念、原理、对比表格
[2026-04-18 10:23:07] ▶ Node[ReviewerAgent] 开始执行
[ReviewerAgent] 开始审核第 1 稿...
[2026-04-18 10:23:09] ✅ Node[ReviewerAgent] 完成 (1.9s)
[ReviewerAgent] 审核评分:72/100,意见:缺少生产环境配置示例,Nacos 部分描述不够详细
[Pipeline] 审核未通过(72分),发回 WriterAgent 修改
[2026-04-18 10:23:09] ▶ Node[WriterAgent] 重新执行(第 2 次)
[WriterAgent] 开始撰写(第 2 次)...
[2026-04-18 10:23:12] ✅ Node[WriterAgent] 完成 (2.9s)
[WriterAgent] 撰写完成,字数:1123
[2026-04-18 10:23:12] ▶ Node[ReviewerAgent] 再次审核
[ReviewerAgent] 开始审核第 2 稿...
[2026-04-18 10:23:14] ✅ Node[ReviewerAgent] 完成 (1.8s)
[ReviewerAgent] 审核评分:92/100,意见:质量达标,可以发布
[2026-04-18 10:23:14] 工作流执行完成!
最终文章(第2稿,评分92):
# 微服务架构中的服务发现机制
...(1123字正文)
总耗时:13.2s | 修改轮次:1 | 最终评分:92/100六、完整自动化工作流:从问题到答案
下面是一个可运行的完整自动化工作流,包含:意图分类 → 路由专家 Agent → 工具执行 → 综合回答 → 循环或结束。
完整 main.go
package main
import (
"context"
"fmt"
"log"
"os"
"strings"
"time"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
openaimodel "github.com/cloudwego/eino-ext/components/model/openai"
)
// WorkflowState 工作流共享状态
type WorkflowState struct {
// 输入
OriginalQuestion string
// 中间状态
Intent string // need_search / need_calc / direct_answer
ToolResults []string
IntermAnswer string
Iteration int
MaxIteration int
// 输出
FinalAnswer string
IsDone bool
}
// --- 节点函数定义 ---
func makeIntentClassifier(llm compose.ChatModel) compose.InvokableFunc[*WorkflowState, *WorkflowState] {
return compose.InvokableLambda(func(ctx context.Context, state *WorkflowState) (*WorkflowState, error) {
fmt.Printf("\n[IntentClassifier] 分析意图(第%d次迭代): %s\n",
state.Iteration+1, state.OriginalQuestion)
msgs := []*schema.Message{
{
Role: schema.System,
Content: `判断问题需要哪种处理方式,只输出以下之一:
need_search(需要搜索最新信息)
need_calc(需要数学计算)
direct_answer(可以直接回答)`,
},
{Role: schema.User, Content: state.OriginalQuestion},
}
resp, err := llm.Generate(ctx, msgs)
if err != nil {
return nil, err
}
state.Intent = strings.TrimSpace(strings.ToLower(resp.Content))
fmt.Printf("[IntentClassifier] 意图: %s\n", state.Intent)
return state, nil
})
}
func makeSearchAgent(llm compose.ChatModel) compose.InvokableFunc[*WorkflowState, *WorkflowState] {
return compose.InvokableLambda(func(ctx context.Context, state *WorkflowState) (*WorkflowState, error) {
fmt.Println("[SearchAgent] 执行搜索...")
// 模拟搜索工具调用
time.Sleep(500 * time.Millisecond)
result := fmt.Sprintf("搜索「%s」结果:找到最新相关资料,发布时间:%s",
state.OriginalQuestion, time.Now().Format("2006-01-02"))
state.ToolResults = append(state.ToolResults, "[搜索] "+result)
return state, nil
})
}
func makeCalcAgent(llm compose.ChatModel) compose.InvokableFunc[*WorkflowState, *WorkflowState] {
return compose.InvokableLambda(func(ctx context.Context, state *WorkflowState) (*WorkflowState, error) {
fmt.Println("[CalcAgent] 执行计算...")
// 实际中调用计算引擎
result := fmt.Sprintf("计算问题「%s」的数值结果:42(示例)", state.OriginalQuestion)
state.ToolResults = append(state.ToolResults, "[计算] "+result)
return state, nil
})
}
func makeDirectAnswerAgent(llm compose.ChatModel) compose.InvokableFunc[*WorkflowState, *WorkflowState] {
return compose.InvokableLambda(func(ctx context.Context, state *WorkflowState) (*WorkflowState, error) {
fmt.Println("[DirectAnswerAgent] 生成直接回答...")
msgs := []*schema.Message{
{Role: schema.System, Content: "基于你的知识直接回答问题,简洁准确。"},
{Role: schema.User, Content: state.OriginalQuestion},
}
resp, err := llm.Generate(ctx, msgs)
if err != nil {
return nil, err
}
state.IntermAnswer = resp.Content
return state, nil
})
}
func makeAnswerSynthesizer(llm compose.ChatModel) compose.InvokableFunc[*WorkflowState, *WorkflowState] {
return compose.InvokableLambda(func(ctx context.Context, state *WorkflowState) (*WorkflowState, error) {
fmt.Println("[AnswerSynthesizer] 综合生成回答...")
context_parts := []string{"问题:" + state.OriginalQuestion}
if len(state.ToolResults) > 0 {
context_parts = append(context_parts, "工具结果:\n"+strings.Join(state.ToolResults, "\n"))
}
if state.IntermAnswer != "" {
context_parts = append(context_parts, "初步回答:"+state.IntermAnswer)
}
msgs := []*schema.Message{
{Role: schema.System, Content: "综合所有信息,生成最终的、高质量的回答。"},
{Role: schema.User, Content: strings.Join(context_parts, "\n\n")},
}
resp, err := llm.Generate(ctx, msgs)
if err != nil {
return nil, err
}
state.IntermAnswer = resp.Content
return state, nil
})
}
func makeTerminationJudge(llm compose.ChatModel) compose.InvokableFunc[*WorkflowState, *WorkflowState] {
return compose.InvokableLambda(func(ctx context.Context, state *WorkflowState) (*WorkflowState, error) {
state.Iteration++
// 强制终止条件
if state.Iteration >= state.MaxIteration {
fmt.Printf("[TerminationJudge] 达到最大迭代次数 %d,强制结束\n", state.MaxIteration)
state.IsDone = true
state.FinalAnswer = state.IntermAnswer
return state, nil
}
msgs := []*schema.Message{
{
Role: schema.System,
Content: `判断当前回答是否完整解决了用户问题。只输出:
DONE(已完整解答)
CONTINUE(需要继续处理)`,
},
{
Role: schema.User,
Content: fmt.Sprintf("原始问题:%s\n\n当前回答:%s",
state.OriginalQuestion, state.IntermAnswer),
},
}
resp, err := llm.Generate(ctx, msgs)
if err != nil {
return nil, err
}
verdict := strings.TrimSpace(strings.ToUpper(resp.Content))
state.IsDone = strings.Contains(verdict, "DONE")
if state.IsDone {
state.FinalAnswer = state.IntermAnswer
fmt.Printf("[TerminationJudge] 判断:任务完成\n")
} else {
fmt.Printf("[TerminationJudge] 判断:需要继续(第%d次迭代)\n", state.Iteration)
}
return state, nil
})
}
// --- 构建完整工作流 Graph ---
func buildAutomationWorkflow(ctx context.Context, apiKey string) (compose.Runnable[*WorkflowState, *WorkflowState], error) {
newLLM := func() (compose.ChatModel, error) {
return openaimodel.NewChatModel(ctx, &openaimodel.ChatModelConfig{
Model: "gpt-4o-mini",
APIKey: apiKey,
Timeout: 45 * time.Second,
})
}
classifierLLM, _ := newLLM()
searchLLM, _ := newLLM()
calcLLM, _ := newLLM()
directLLM, _ := newLLM()
synthLLM, _ := newLLM()
judgeLLM, _ := newLLM()
graph := compose.NewGraph[*WorkflowState, *WorkflowState]()
// 添加所有节点
graph.AddLambdaNode("classify", makeIntentClassifier(classifierLLM))
graph.AddLambdaNode("search_agent", makeSearchAgent(searchLLM))
graph.AddLambdaNode("calc_agent", makeCalcAgent(calcLLM))
graph.AddLambdaNode("direct_agent", makeDirectAnswerAgent(directLLM))
graph.AddLambdaNode("synthesize", makeAnswerSynthesizer(synthLLM))
graph.AddLambdaNode("judge", makeTerminationJudge(judgeLLM))
// 连接基础边
graph.AddEdge(compose.START, "classify")
// 意图路由(条件边)
graph.AddConditionalEdges(
"classify",
func(ctx context.Context, state *WorkflowState) (string, error) {
switch {
case strings.Contains(state.Intent, "search"):
return "search_agent", nil
case strings.Contains(state.Intent, "calc"):
return "calc_agent", nil
default:
return "direct_agent", nil
}
},
map[string]bool{
"search_agent": true,
"calc_agent": true,
"direct_agent": true,
},
)
// 所有 Agent → 综合节点
graph.AddEdge("search_agent", "synthesize")
graph.AddEdge("calc_agent", "synthesize")
graph.AddEdge("direct_agent", "synthesize")
graph.AddEdge("synthesize", "judge")
// 终止判断(条件边:继续循环 or 结束)
graph.AddConditionalEdges(
"judge",
func(ctx context.Context, state *WorkflowState) (string, error) {
if state.IsDone {
return compose.END, nil
}
return "classify", nil // 循环回意图分类
},
map[string]bool{
compose.END: true,
"classify": true,
},
)
return graph.Compile(ctx)
}
// --- main 函数 ---
func main() {
ctx := context.Background()
apiKey := os.Getenv("OPENAI_API_KEY")
if apiKey == "" {
log.Fatal("请设置环境变量 OPENAI_API_KEY")
}
// 构建多 Agent 工作流
workflow, err := buildAutomationWorkflow(ctx, apiKey)
if err != nil {
log.Fatalf("构建工作流失败: %v", err)
}
// 演示1:简单问题直接回答
fmt.Println("\n========== 演示1:直接回答 ==========")
state1 := &WorkflowState{
OriginalQuestion: "请解释 Go 语言 goroutine 的调度原理",
MaxIteration: 3,
}
result1, err := workflow.Invoke(ctx, state1)
if err != nil {
log.Printf("工作流执行失败: %v", err)
} else {
fmt.Printf("\n最终答案(经过%d次迭代):\n%s\n", result1.Iteration, result1.FinalAnswer)
}
// 演示2:需要搜索的问题
fmt.Println("\n========== 演示2:搜索+综合 ==========")
state2 := &WorkflowState{
OriginalQuestion: "Eino 框架最新发布了哪些功能?",
MaxIteration: 3,
}
result2, err := workflow.Invoke(ctx, state2)
if err != nil {
log.Printf("工作流执行失败: %v", err)
} else {
fmt.Printf("\n最终答案(经过%d次迭代):\n%s\n", result2.Iteration, result2.FinalAnswer)
}
// 演示3:多 Agent 内容创作流水线
fmt.Println("\n========== 演示3:多 Agent 内容创作 ==========")
pipeline, err := buildMultiAgentPipeline(ctx, apiKey)
if err != nil {
log.Fatalf("构建内容创作流水线失败: %v", err)
}
contentState := &PipelineState{
Topic: "字节跳动 Eino 框架的设计理念与工程实践",
MaxRevisions: 2,
}
contentResult, err := pipeline.Invoke(ctx, contentState)
if err != nil {
log.Printf("内容创作流水线失败: %v", err)
} else {
fmt.Printf("\n最终文章(第%d稿,评分%d):\n%s\n",
contentResult.Revision, contentResult.ReviewScore, contentResult.FinalContent)
}
}执行日志(完整自动化工作流 main.go):
$ go run workflow_main.go
========== 演示1:直接回答 ==========
[IntentClassifier] 分析意图(第1次迭代): 请解释 Go 语言 goroutine 的调度原理
[IntentClassifier] 意图: direct_answer
[DirectAnswerAgent] 生成直接回答...
[AnswerSynthesizer] 综合生成回答...
[TerminationJudge] 判断:任务完成
最终答案(经过1次迭代):
Go 语言的 goroutine 调度基于 GMP 模型:
- G(Goroutine):轻量级协程,包含栈、指令指针等状态
- M(Machine):OS 线程,真正执行 G 的载体
- P(Processor):逻辑处理器,持有本地 G 队列,数量默认等于 CPU 核数
调度流程:P 从本地队列取 G 交给 M 执行;本地队列空时从全局队列或其他 P 偷取(work stealing);
遇到 channel 阻塞、系统调用时,G 挂起,M 可绑定其他 P 继续工作,实现真正的并发。
========== 演示2:搜索+综合 ==========
[IntentClassifier] 分析意图(第1次迭代): Eino 框架最新发布了哪些功能?
[IntentClassifier] 意图: need_search
[SearchAgent] 执行搜索...
→ 搜索「Eino 框架最新发布了哪些功能?」结果:找到最新相关资料,发布时间:2026-04-18
[AnswerSynthesizer] 综合生成回答...
[TerminationJudge] 判断:任务完成
最终答案(经过1次迭代):
根据最新搜索资料,Eino 框架近期主要更新包括:
1. Graph 循环支持(ReAct 模式原生支持)
2. 流式工具调用(Streaming Tool Call)
3. 多模态组件(图片/音频输入支持)
4. 更完善的 OpenTelemetry 集成
========== 演示3:多 Agent 内容创作 ==========
[ResearchAgent] 开始研究主题:字节跳动 Eino 框架的设计理念与工程实践
[ResearchAgent] 研究完成,字数:1389
[WriterAgent] 开始撰写(第 1 次)...
[WriterAgent] 撰写完成,字数:923
[ReviewerAgent] 开始审核第 1 稿...
[ReviewerAgent] 审核评分:85/100,意见:质量达标,可以发布
最终文章(第1稿,评分85):
# 字节跳动 Eino 框架:设计理念与工程实践
...(923字正文)
========== 全部演示完成 ==========
总耗时: 28.4s七、Java SpringBoot 集成 Eino 工作流服务
Eino 侧:暴露工作流 REST API(Go)
package handler
import (
"encoding/json"
"net/http"
"time"
)
type WorkflowRequest struct {
Question string `json:"question"`
MaxIteration int `json:"max_iteration,omitempty"`
}
type WorkflowResponse struct {
FinalAnswer string `json:"final_answer"`
Iterations int `json:"iterations"`
IsDone bool `json:"is_done"`
Duration string `json:"duration"`
}
func WorkflowHandler(workflow compose.Runnable[*WorkflowState, *WorkflowState]) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
var req WorkflowRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, "请求格式错误", http.StatusBadRequest)
return
}
if req.MaxIteration <= 0 {
req.MaxIteration = 3
}
state := &WorkflowState{
OriginalQuestion: req.Question,
MaxIteration: req.MaxIteration,
}
// 设置超时 context
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Minute)
defer cancel()
result, err := workflow.Invoke(ctx, state)
if err != nil {
http.Error(w, "工作流执行失败: "+err.Error(), http.StatusInternalServerError)
return
}
resp := WorkflowResponse{
FinalAnswer: result.FinalAnswer,
Iterations: result.Iteration,
IsDone: result.IsDone,
Duration: time.Since(start).String(),
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(resp)
}
}Java SpringBoot WebClient 调用(响应式)
package com.example.einoclient;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import java.time.Duration;
@Service
public class EinoWorkflowClient {
private final WebClient webClient = WebClient.builder()
.baseUrl("http://localhost:8080")
.build();
public record WorkflowRequest(String question, int maxIteration) {}
public record WorkflowResponse(
String finalAnswer,
int iterations,
boolean isDone,
String duration
) {}
/**
* 调用 Eino 工作流(异步响应式)
*/
public Mono<WorkflowResponse> runWorkflow(String question) {
return webClient.post()
.uri("/workflow")
.bodyValue(new WorkflowRequest(question, 3))
.retrieve()
.bodyToMono(WorkflowResponse.class)
.timeout(Duration.ofMinutes(5))
.doOnSuccess(resp ->
System.out.printf("工作流完成:%d次迭代,耗时%s%n",
resp.iterations(), resp.duration()))
.doOnError(e ->
System.err.println("工作流调用失败: " + e.getMessage()));
}
/**
* 批量并行调用(多个问题同时处理)
*/
public Mono<List<WorkflowResponse>> runBatchWorkflow(List<String> questions) {
List<Mono<WorkflowResponse>> monos = questions.stream()
.map(this::runWorkflow)
.toList();
return Mono.zip(monos, results ->
Arrays.stream(results)
.map(r -> (WorkflowResponse) r)
.toList()
);
}
}@RestController
@RequestMapping("/api/workflow")
public class WorkflowController {
@Autowired
private EinoWorkflowClient workflowClient;
@PostMapping("/run")
public Mono<EinoWorkflowClient.WorkflowResponse> runWorkflow(
@RequestBody Map<String, String> body
) {
return workflowClient.runWorkflow(body.get("question"));
}
@PostMapping("/batch")
public Mono<List<EinoWorkflowClient.WorkflowResponse>> runBatch(
@RequestBody List<String> questions
) {
return workflowClient.runBatchWorkflow(questions);
}
}八、生产部署最佳实践
错误处理与重试
package middleware
import (
"context"
"fmt"
"time"
)
// RetryConfig 重试配置
type RetryConfig struct {
MaxAttempts int
BaseDelay time.Duration
MaxDelay time.Duration
}
// WithRetry 为节点函数添加重试逻辑
func WithRetry[I, O any](
fn compose.InvokableFunc[I, O],
cfg RetryConfig,
) compose.InvokableFunc[I, O] {
return compose.InvokableLambda(func(ctx context.Context, input I) (O, error) {
var lastErr error
delay := cfg.BaseDelay
for attempt := 1; attempt <= cfg.MaxAttempts; attempt++ {
result, err := fn(ctx, input)
if err == nil {
return result, nil
}
lastErr = err
if attempt < cfg.MaxAttempts {
fmt.Printf("[Retry] 第%d次尝试失败: %v,%.1fs 后重试\n",
attempt, err, delay.Seconds())
select {
case <-time.After(delay):
case <-ctx.Done():
var zero O
return zero, ctx.Err()
}
// 指数退避
delay = min(delay*2, cfg.MaxDelay)
}
}
var zero O
return zero, fmt.Errorf("已重试 %d 次均失败,最后错误: %w",
cfg.MaxAttempts, lastErr)
})
}超时与熔断
// 为整个工作流设置超时
func runWithTimeout(
ctx context.Context,
workflow compose.Runnable[*WorkflowState, *WorkflowState],
state *WorkflowState,
timeout time.Duration,
) (*WorkflowState, error) {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
resultCh := make(chan struct {
state *WorkflowState
err error
}, 1)
go func() {
result, err := workflow.Invoke(ctx, state)
resultCh <- struct {
state *WorkflowState
err error
}{result, err}
}()
select {
case res := <-resultCh:
return res.state, res.err
case <-ctx.Done():
return nil, fmt.Errorf("工作流超时(%v): %w", timeout, ctx.Err())
}
}可观测性:Callback 接入 OpenTelemetry
package observability
import (
"context"
"time"
"github.com/cloudwego/eino/callbacks"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
// EinoOtelCallback 将 Eino 事件接入 OpenTelemetry
type EinoOtelCallback struct {
tracer trace.Tracer
}
func NewEinoOtelCallback() *EinoOtelCallback {
return &EinoOtelCallback{
tracer: otel.Tracer("eino-workflow"),
}
}
func (c *EinoOtelCallback) OnNodeStart(ctx context.Context, info *callbacks.NodeInfo) context.Context {
ctx, span := c.tracer.Start(ctx, "eino.node."+info.Name,
trace.WithAttributes(
attribute.String("node.name", info.Name),
attribute.String("node.type", info.Type),
),
)
// 将 span 存入 context,供 OnNodeEnd 使用
return context.WithValue(ctx, "otel_span_"+info.Name, span)
}
func (c *EinoOtelCallback) OnNodeEnd(ctx context.Context, info *callbacks.NodeInfo, _ any, err error) {
spanVal := ctx.Value("otel_span_" + info.Name)
if span, ok := spanVal.(trace.Span); ok {
if err != nil {
span.RecordError(err)
}
span.End()
}
}
// 注册 Callback 到 Graph
func RegisterCallbacks(graph *compose.Graph[any, any]) {
graph.AddCallbacks(&EinoOtelCallback{})
}生产配置参考
// config/production.go
package config
import "time"
type EinoProductionConfig struct {
// LLM 调用
LLMTimeout time.Duration // 建议:30s(GPT-4o),60s(长文本)
LLMMaxRetries int // 建议:3
LLMRetryDelay time.Duration // 建议:1s,指数退避到 10s
// 工作流
WorkflowTimeout time.Duration // 建议:5min(复杂多 Agent)
MaxIterations int // 建议:5(防止死循环)
// 会话管理
SessionTTL time.Duration // 建议:30min
MaxHistoryItems int // 建议:20(滑动窗口)
// 并发控制
MaxConcurrentNodes int // 建议:10(避免 LLM Rate Limit)
}
var DefaultProductionConfig = EinoProductionConfig{
LLMTimeout: 30 * time.Second,
LLMMaxRetries: 3,
LLMRetryDelay: time.Second,
WorkflowTimeout: 5 * time.Minute,
MaxIterations: 5,
SessionTTL: 30 * time.Minute,
MaxHistoryItems: 20,
MaxConcurrentNodes: 10,
}九、工作流设计模式总结
| 模式 | 典型场景 | 核心 API |
|---|---|---|
| Pipeline | RAG 检索→生成 | Chain.Append* |
| Fan-out/Fan-in | 多源数据聚合 | Graph 多入边节点 |
| Router | 意图识别分发 | Graph.AddConditionalEdges |
| ReAct | Agent 工具调用循环 | Graph + 条件回环 |
| Multi-Agent | 内容创作流水线 | 多 Graph 组合 |
十、面试高频考点
Q1:Eino Graph 如何防止死循环?
通过
MaxIteration状态字段 + 条件边函数中的终止检查。Eino 本身不强制 DAG(可以有环),需要应用层显式设置最大循环次数,并在条件边函数中判断是否达到终止条件。
Q2:Graph.Compile 做了什么?
Compile 进行三件事:① 类型检查(验证相邻节点的输入输出类型兼容);② 拓扑排序(确定并行执行组);③ 生成执行计划(确定 Fan-out/Fan-in 的等待策略)。编译失败比运行时崩溃早发现问题。
Q3:多 Agent 之间如何传递状态?
通过
PipelineState结构体在 Graph 中流转。每个节点接收状态、修改、返回,下游节点读取上游写入的字段。生产环境使用 Redis/DB 持久化 State,支持断点续跑。
Q4:Eino 工作流如何实现幂等性?
为每个工作流执行生成唯一
workflow_id,在 State 中携带。每个节点执行前检查该节点是否已在 DB 中有缓存结果,有则跳过重新执行(利用 LambdaNode 的前置 hook 实现)。
Q5:并行 Graph 节点中如何安全共享状态?
Eino 的并行节点各自接收独立的输入副本,通过 Fan-in 节点聚合结果,避免共享可变状态。如需跨并行分支通信,使用 channel 或 sync.Map,但这通常意味着需要重新设计图结构。
总结
Eino 工作流编排的核心价值在于将复杂的 AI 应用逻辑显式化、可视化、可验证:
- Graph 编译期类型检查:在运行前发现节点连接错误
- 条件边支持复杂路由:动态决定数据流向,实现真正的 Agent 自主决策
- 状态对象传递模式:Pipeline State 让多 Agent 协作变得简单
- 内置可观测性:Callback 机制无侵入地接入监控体系
- 与 Java 生态互补:Go Eino 提供高性能 Agent 服务,Java SpringBoot 负责业务逻辑编排
掌握从 SimpleChain → ParallelGraph → ConditionalRouter → MultiAgentPipeline 的渐进式设计,是应对字节系 AI 架构面试的核心竞争力。
