第2102篇:AI工作流引擎的设计——让复杂AI管道可视化、可调试、可复用
2026/4/30大约 9 分钟
第2102篇:AI工作流引擎的设计——让复杂AI管道可视化、可调试、可复用
适读人群:需要编排复杂AI处理流程的工程师 | 阅读时长:约20分钟 | 核心价值:掌握DAG工作流引擎的设计模式,解决多步AI处理管道的可维护性和可观测性问题
一个典型的AI内容处理流程:原始文档→文本清洗→分段→Embedding→存储→质量检查。
六个步骤,每一步都有独立的逻辑,步骤之间有数据依赖。用硬编码方式串起来很容易,但这套代码几乎没有可维护性:加一个新步骤要改流程代码,调试时不知道哪一步出了问题,步骤不能复用到其他流程。
工作流引擎解决的正是这个问题:用DAG(有向无环图)描述步骤和依赖,运行时自动调度,内置可观测性。
工作流的核心抽象
/**
* 工作流节点:可复用的处理单元
*
* 每个节点接受输入,产生输出
* 节点是无状态的,状态都在WorkflowContext中
*/
public interface WorkflowNode<I, O> {
/**
* 节点唯一标识
*/
String nodeId();
/**
* 节点处理逻辑
*
* @param input 上一步的输出(或工作流输入)
* @param context 工作流上下文(可读写共享数据)
* @return 处理结果
* @throws WorkflowException 可恢复/不可恢复的业务异常
*/
O process(I input, WorkflowContext context) throws WorkflowException;
/**
* 节点是否可以并行执行(默认true)
*/
default boolean isParallelizable() { return true; }
/**
* 失败时的处理策略
*/
default FailureStrategy onFailure() { return FailureStrategy.FAIL_FAST; }
/**
* 最大重试次数(0表示不重试)
*/
default int maxRetries() { return 0; }
enum FailureStrategy {
FAIL_FAST, // 立即停止整个工作流
SKIP, // 跳过本节点,继续后续节点
RETRY // 重试
}
}
/**
* 工作流上下文:节点间共享数据的容器
*/
@Slf4j
public class WorkflowContext {
private final String workflowId;
private final String executionId;
private final Map<String, Object> variables = new ConcurrentHashMap<>();
private final Map<String, NodeExecutionRecord> nodeRecords = new ConcurrentHashMap<>();
private final long startTimeMs = System.currentTimeMillis();
public WorkflowContext(String workflowId) {
this.workflowId = workflowId;
this.executionId = workflowId + "-" + System.currentTimeMillis();
}
// 类型安全的变量操作
@SuppressWarnings("unchecked")
public <T> Optional<T> get(String key) {
return Optional.ofNullable((T) variables.get(key));
}
public void set(String key, Object value) {
variables.put(key, value);
}
public void recordNodeStart(String nodeId) {
nodeRecords.put(nodeId, new NodeExecutionRecord(nodeId, System.currentTimeMillis()));
}
public void recordNodeSuccess(String nodeId, Object output) {
NodeExecutionRecord record = nodeRecords.get(nodeId);
if (record != null) {
record.markSuccess(output, System.currentTimeMillis());
}
}
public void recordNodeFailure(String nodeId, Exception error) {
NodeExecutionRecord record = nodeRecords.get(nodeId);
if (record != null) {
record.markFailure(error.getMessage(), System.currentTimeMillis());
}
}
public long elapsedMs() {
return System.currentTimeMillis() - startTimeMs;
}
public String getWorkflowId() { return workflowId; }
public String getExecutionId() { return executionId; }
public Map<String, NodeExecutionRecord> getNodeRecords() {
return Collections.unmodifiableMap(nodeRecords);
}
@Data
public static class NodeExecutionRecord {
private final String nodeId;
private final long startTimeMs;
private String status = "RUNNING";
private long endTimeMs;
private Object output;
private String errorMessage;
void markSuccess(Object output, long endTimeMs) {
this.status = "SUCCESS";
this.output = output;
this.endTimeMs = endTimeMs;
}
void markFailure(String error, long endTimeMs) {
this.status = "FAILED";
this.errorMessage = error;
this.endTimeMs = endTimeMs;
}
public long durationMs() { return endTimeMs - startTimeMs; }
}
}工作流定义(DAG)
/**
* 工作流定义(有向无环图)
*
* 用Builder模式构建,声明式描述节点和依赖
*/
@Slf4j
public class WorkflowDefinition {
private final String workflowId;
private final Map<String, WorkflowNode<?, ?>> nodes = new LinkedHashMap<>();
private final Map<String, List<String>> edges = new LinkedHashMap<>(); // nodeId -> dependsOn
private final Map<String, String> dataBindings = new HashMap<>(); // 节点间数据传递配置
private WorkflowDefinition(String workflowId) {
this.workflowId = workflowId;
}
public static Builder builder(String workflowId) {
return new Builder(workflowId);
}
public static class Builder {
private final WorkflowDefinition definition;
private Builder(String workflowId) {
this.definition = new WorkflowDefinition(workflowId);
}
/**
* 添加节点(无依赖,是起始节点)
*/
public Builder addNode(WorkflowNode<?, ?> node) {
definition.nodes.put(node.nodeId(), node);
definition.edges.put(node.nodeId(), new ArrayList<>());
return this;
}
/**
* 添加节点(依赖其他节点完成后执行)
*/
public Builder addNode(WorkflowNode<?, ?> node, String... dependsOn) {
definition.nodes.put(node.nodeId(), node);
definition.edges.put(node.nodeId(), Arrays.asList(dependsOn));
return this;
}
/**
* 配置数据绑定:指定节点的输入来自哪个节点的输出
* 默认:节点按DAG顺序传递数据
*/
public Builder bindOutput(String fromNodeId, String toNodeId, String bindingKey) {
definition.dataBindings.put(toNodeId + ":" + bindingKey, fromNodeId);
return this;
}
public WorkflowDefinition build() {
// 校验DAG:无环
validateNoCycles();
return definition;
}
private void validateNoCycles() {
// 拓扑排序校验
Map<String, Integer> inDegree = new HashMap<>();
definition.nodes.keySet().forEach(id -> inDegree.put(id, 0));
definition.edges.values().forEach(deps ->
deps.forEach(dep -> inDegree.merge(dep, 0, Integer::sum)));
// 实际上这里应该做完整的DFS检测,简化处理
}
}
public String getWorkflowId() { return workflowId; }
public Map<String, WorkflowNode<?, ?>> getNodes() { return nodes; }
public Map<String, List<String>> getEdges() { return edges; }
public Map<String, String> getDataBindings() { return dataBindings; }
}工作流执行引擎
/**
* 工作流执行引擎
*
* 根据DAG定义,调度节点执行
* 支持:并行执行、依赖等待、失败处理
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class WorkflowExecutionEngine {
// 工作流执行线程池(与业务线程隔离)
private final ExecutorService executorService = Executors.newFixedThreadPool(
Runtime.getRuntime().availableProcessors() * 2,
new ThreadFactoryBuilder()
.setNameFormat("workflow-worker-%d")
.setDaemon(true)
.build()
);
private final WorkflowEventPublisher eventPublisher;
private final WorkflowMetrics metrics;
/**
* 执行工作流
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public WorkflowExecutionResult execute(WorkflowDefinition definition, Object initialInput) {
WorkflowContext context = new WorkflowContext(definition.getWorkflowId());
context.set("__input__", initialInput);
log.info("工作流启动: workflowId={}, executionId={}",
definition.getWorkflowId(), context.getExecutionId());
eventPublisher.publishWorkflowStarted(context);
try {
// 拓扑排序,分层执行
List<List<String>> executionLayers = topologicalSort(definition);
Object currentOutput = initialInput;
for (List<String> layer : executionLayers) {
currentOutput = executeLayer(layer, definition, context, currentOutput);
// 检查是否有节点失败且需要FAIL_FAST
boolean hasFailFast = layer.stream()
.map(nodeId -> context.getNodeRecords().get(nodeId))
.filter(r -> r != null && "FAILED".equals(r.getStatus()))
.anyMatch(r -> {
WorkflowNode<?, ?> node = definition.getNodes().get(r.getNodeId());
return node != null &&
node.onFailure() == WorkflowNode.FailureStrategy.FAIL_FAST;
});
if (hasFailFast) {
log.warn("工作流因节点失败而停止: executionId={}", context.getExecutionId());
break;
}
}
// 统计执行结果
long successCount = context.getNodeRecords().values().stream()
.filter(r -> "SUCCESS".equals(r.getStatus())).count();
long failCount = context.getNodeRecords().values().stream()
.filter(r -> "FAILED".equals(r.getStatus())).count();
WorkflowExecutionResult result = new WorkflowExecutionResult(
context.getExecutionId(),
failCount == 0 ? "SUCCESS" : "PARTIAL_FAILURE",
context.elapsedMs(),
currentOutput,
context.getNodeRecords()
);
log.info("工作流完成: executionId={}, status={}, elapsed={}ms, success={}, failed={}",
context.getExecutionId(), result.status(), result.elapsedMs(),
successCount, failCount);
eventPublisher.publishWorkflowCompleted(context, result);
metrics.recordExecution(definition.getWorkflowId(), result);
return result;
} catch (Exception e) {
log.error("工作流执行异常: executionId={}", context.getExecutionId(), e);
eventPublisher.publishWorkflowFailed(context, e);
throw new WorkflowExecutionException("工作流执行失败", e);
}
}
/**
* 并行执行一层节点(同一层的节点没有相互依赖)
*/
@SuppressWarnings({"unchecked", "rawtypes"})
private Object executeLayer(
List<String> nodeIds, WorkflowDefinition definition,
WorkflowContext context, Object defaultInput) {
if (nodeIds.size() == 1) {
// 单节点层:直接执行
return executeSingleNode(nodeIds.get(0), definition, context, defaultInput);
}
// 多节点层:并行执行
List<CompletableFuture<NodeResult>> futures = nodeIds.stream()
.map(nodeId -> CompletableFuture.supplyAsync(() -> {
Object result = executeSingleNode(nodeId, definition, context, defaultInput);
return new NodeResult(nodeId, result);
}, executorService))
.toList();
// 等待所有节点完成
Map<String, Object> layerResults = new LinkedHashMap<>();
for (CompletableFuture<NodeResult> future : futures) {
try {
NodeResult nodeResult = future.get(30, TimeUnit.MINUTES);
if (nodeResult.output() != null) {
layerResults.put(nodeResult.nodeId(), nodeResult.output());
}
} catch (Exception e) {
log.error("并行节点执行失败: {}", e.getMessage());
}
}
// 多个节点的结果,放入context,后续节点可以按需取用
context.set("__layer_results__", layerResults);
// 返回最后一个节点的输出作为默认输入(简化处理)
return layerResults.values().stream().reduce((a, b) -> b).orElse(defaultInput);
}
@SuppressWarnings({"unchecked", "rawtypes"})
private Object executeSingleNode(
String nodeId, WorkflowDefinition definition,
WorkflowContext context, Object input) {
WorkflowNode node = definition.getNodes().get(nodeId);
if (node == null) return input;
context.recordNodeStart(nodeId);
eventPublisher.publishNodeStarted(context, nodeId);
int attempt = 0;
int maxRetries = node.maxRetries();
Exception lastException = null;
while (attempt <= maxRetries) {
try {
Object output = node.process(input, context);
context.recordNodeSuccess(nodeId, output);
eventPublisher.publishNodeCompleted(context, nodeId);
return output;
} catch (Exception e) {
lastException = e;
attempt++;
log.warn("节点执行失败 (尝试{}/{}): nodeId={}, error={}",
attempt, maxRetries + 1, nodeId, e.getMessage());
if (attempt <= maxRetries) {
try { Thread.sleep(1000L * attempt); } // 指数退避
catch (InterruptedException ie) { Thread.currentThread().interrupt(); break; }
}
}
}
// 所有重试都失败
context.recordNodeFailure(nodeId, lastException);
eventPublisher.publishNodeFailed(context, nodeId, lastException);
if (node.onFailure() == WorkflowNode.FailureStrategy.SKIP) {
log.warn("节点失败,跳过执行: nodeId={}", nodeId);
return input; // 返回原始输入,让后续节点继续
}
throw new WorkflowNodeException("节点执行失败: " + nodeId, lastException);
}
/**
* 拓扑排序:将DAG分层
*
* 同一层的节点可以并行执行
*/
private List<List<String>> topologicalSort(WorkflowDefinition definition) {
Map<String, List<String>> edges = definition.getEdges();
Map<String, Integer> inDegree = new HashMap<>();
// 计算每个节点的入度
edges.keySet().forEach(id -> inDegree.put(id, 0));
edges.forEach((nodeId, deps) ->
deps.forEach(dep -> inDegree.merge(nodeId, 1, Integer::sum)));
List<List<String>> layers = new ArrayList<>();
Set<String> processed = new HashSet<>();
while (processed.size() < edges.size()) {
// 找到所有入度为0且未处理的节点
List<String> currentLayer = inDegree.entrySet().stream()
.filter(e -> e.getValue() == 0 && !processed.contains(e.getKey()))
.map(Map.Entry::getKey)
.sorted() // 确定性排序
.toList();
if (currentLayer.isEmpty()) break; // 有环,理论上不应该发生
layers.add(currentLayer);
processed.addAll(currentLayer);
// 更新入度
currentLayer.forEach(nodeId -> {
edges.forEach((id, deps) -> {
if (deps.contains(nodeId)) {
inDegree.merge(id, -1, Integer::sum);
}
});
});
}
return layers;
}
public record NodeResult(String nodeId, Object output) {}
public record WorkflowExecutionResult(
String executionId, String status, long elapsedMs,
Object finalOutput, Map<String, WorkflowContext.NodeExecutionRecord> nodeRecords
) {}
public static class WorkflowExecutionException extends RuntimeException {
public WorkflowExecutionException(String message, Throwable cause) {
super(message, cause);
}
}
public static class WorkflowNodeException extends RuntimeException {
public WorkflowNodeException(String message, Throwable cause) {
super(message, cause);
}
}
public static class WorkflowException extends Exception {
public WorkflowException(String message) { super(message); }
public WorkflowException(String message, Throwable cause) { super(message, cause); }
}
}实际工作流示例:文档处理管道
/**
* 文档处理工作流的各个节点
*/
// 节点1:文档加载
@Component
public class DocumentLoadNode implements WorkflowNode<String, Document> {
private final DocumentLoaderService loaderService;
@Override
public String nodeId() { return "document_load"; }
@Override
public Document process(String filePath, WorkflowContext context)
throws WorkflowExecutionEngine.WorkflowException {
try {
Document doc = loaderService.load(filePath);
log.info("文档加载完成: path={}, size={}chars", filePath, doc.text().length());
context.set("documentPath", filePath);
context.set("documentSize", doc.text().length());
return doc;
} catch (Exception e) {
throw new WorkflowExecutionEngine.WorkflowException("文档加载失败: " + filePath, e);
}
}
@Override
public FailureStrategy onFailure() { return FailureStrategy.FAIL_FAST; }
}
// 节点2:文本清洗(并行安全,可以重试)
@Component
public class TextCleaningNode implements WorkflowNode<Document, Document> {
@Override
public String nodeId() { return "text_cleaning"; }
@Override
public Document process(Document document, WorkflowContext context) {
String cleaned = document.text()
.replaceAll("\\s{3,}", "\n\n") // 多余空行
.replaceAll("[\\x00-\\x08\\x0B-\\x1F]", "") // 控制字符
.trim();
return Document.from(cleaned, document.metadata());
}
@Override
public int maxRetries() { return 0; } // 纯计算,不需要重试
}
// 节点3:质量检查(可跳过)
@Component
public class QualityCheckNode implements WorkflowNode<Document, Document> {
private static final int MIN_CONTENT_LENGTH = 100;
@Override
public String nodeId() { return "quality_check"; }
@Override
public Document process(Document document, WorkflowContext context)
throws WorkflowExecutionEngine.WorkflowException {
if (document.text().length() < MIN_CONTENT_LENGTH) {
context.set("qualityCheckFailed", true);
context.set("qualityCheckReason", "内容太短: " + document.text().length());
// 标记为低质量但不阻止流程
return document;
}
context.set("qualityCheckPassed", true);
return document;
}
@Override
public FailureStrategy onFailure() { return FailureStrategy.SKIP; } // 质量检查失败不阻断
}
/**
* 组装工作流
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class DocumentProcessingWorkflow {
private final WorkflowExecutionEngine engine;
private final DocumentLoadNode loadNode;
private final TextCleaningNode cleaningNode;
private final QualityCheckNode qualityCheckNode;
public WorkflowExecutionEngine.WorkflowExecutionResult process(String filePath) {
WorkflowDefinition definition = WorkflowDefinition.builder("doc_processing")
.addNode(loadNode)
.addNode(cleaningNode, "document_load") // 依赖loadNode
.addNode(qualityCheckNode, "text_cleaning") // 依赖cleaningNode
.build();
return engine.execute(definition, filePath);
}
}工作流执行监控
/**
* 工作流执行结果分析
*
* 生产环境必须有:
* 1. 每个节点的耗时分布
* 2. 失败节点的错误统计
* 3. 整体成功率
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class WorkflowMetrics {
private final MeterRegistry meterRegistry;
public void recordExecution(
String workflowId,
WorkflowExecutionEngine.WorkflowExecutionResult result) {
// 整体执行时长
Timer.builder("workflow.execution.duration")
.tag("workflow", workflowId)
.tag("status", result.status())
.register(meterRegistry)
.record(result.elapsedMs(), TimeUnit.MILLISECONDS);
// 各节点耗时
result.nodeRecords().forEach((nodeId, record) -> {
Timer.builder("workflow.node.duration")
.tag("workflow", workflowId)
.tag("node", nodeId)
.tag("status", record.getStatus())
.register(meterRegistry)
.record(record.durationMs(), TimeUnit.MILLISECONDS);
});
// 打印执行摘要(调试用)
log.info("=== 工作流执行摘要 [{}] ===", workflowId);
log.info("总耗时: {}ms, 状态: {}", result.elapsedMs(), result.status());
result.nodeRecords().entrySet().stream()
.sorted(Map.Entry.comparingByKey())
.forEach(entry -> {
var r = entry.getValue();
String statusIcon = "SUCCESS".equals(r.getStatus()) ? "✓" : "✗";
log.info(" {} {} [{}ms]{}",
statusIcon, r.getNodeId(), r.durationMs(),
r.getErrorMessage() != null ? " ERROR: " + r.getErrorMessage() : "");
});
}
}实践建议
工作流引擎不是银弹
不是所有AI处理流程都需要工作流引擎。两三步的简单流程直接写顺序代码更清晰。只有当流程步骤超过5步、步骤可以复用、需要可视化追踪时,工作流引擎才发挥价值。
节点粒度要合适
节点太细(每个操作一个节点)会导致调度开销远大于实际计算。节点太粗又失去了灵活性。经验法则:一个节点的执行时间在50ms以上时才值得单独拆出来;通常把一个完整的业务步骤(如"调用LLM摘要")作为一个节点。
生产环境要持久化执行状态
上面的实现是纯内存的,如果JVM挂了,执行记录就丢了。生产环境需要把WorkflowContext持久化到数据库:节点开始时写入,节点完成时更新状态,支持从中断处恢复。尤其是长时间运行的工作流(几分钟到几小时),断点续传能力是必须的。
