Agent工作流引擎:基于状态机的AI任务编排实践
Agent工作流引擎:基于状态机的AI任务编排实践
开篇故事:那一堆让人崩溃的 if-else
李明是某保险公司的Java工程师,工作3年,负责内部OA系统的报销审批模块。
去年底,他们引入了AI来做报销单的初步审核:金额合理性判断、发票真伪识别、政策合规检查。他花了两周,在原有审批流程代码里硬塞进了AI调用逻辑。
上线后,效果不错,误报率只有2.3%,主管表扬了他。
然后,噩梦开始了。
第一个月:财务部要求加一个"大额报销(>5000元)自动抄送CFO"的节点。李明找到代码,发现AI判断逻辑散落在3个Service类里,改了2天才搞定,还引入了1个bug。
第二个月:合规部要求"境外出差报销需要额外上传护照",并且"AI审核置信度<70%必须走人工复核"。这次改了4天,测试发现有边界条件没覆盖,又改了2天。
第三个月:CEO看了数据,说AI置信度判断阈值要从70%调到80%。就改一个数字,但因为这个逻辑写在了好几处,漏改了一处,线上跑了半个月才被发现。
第四个月:产品经理找来,说"这个流程太死板,不同部门的审批流程不一样,我们要配置化"。
李明看着满屏的if-else,沉默了30秒,然后发给产品经理一句话:
"这需要重构,预计3周。"
这是一个典型的AI任务编排问题。大多数工程师最初都会用硬编码的方式处理AI工作流:if AI说好就走A,else走B。这在流程简单时没问题,但一旦流程复杂起来,就会变成一场维护噩梦。
今天,我们来用状态机彻底解决这个问题。
1. 工作流引擎 vs 硬编码:为什么要换
先来量化一下两种方案的差异:
| 维度 | 硬编码 | 工作流引擎 |
|---|---|---|
| 新增一个审批节点 | 修改多处代码,2-5天 | 配置新状态+转换,0.5天 |
| 修改条件阈值 | 全局搜索,容易漏 | 修改单一配置 |
| 流程可视化 | 需要画时序图对照代码 | 状态图自动生成 |
| 流程回滚 | 回滚代码 | 切换配置版本 |
| 多流程版本并行 | 极难 | 天然支持 |
| 故障排查 | 看日志猜流程 | 状态快照一目了然 |
李明的问题本质上是:他把"流程逻辑"和"业务逻辑"混在了一起。
工作流引擎的核心价值:把流程控制的关注点分离出来,让业务逻辑只关心"这一步做什么",而不是"做完之后去哪"。
2. 状态机模型:四个核心概念
状态机(State Machine)是描述工作流的完美抽象,包含四个核心概念:
四个核心概念:
- 状态(State):系统在某一时刻所处的位置。例如:
AI_REVIEWING、MANAGER_REVIEWING。 - 事件(Event):触发状态转换的输入。例如:
AI_PASS、AI_REJECT、HUMAN_APPROVE。 - 转换(Transition):从一个状态到另一个状态的规则,可以带条件(Guard)。
- 动作(Action):执行状态转换时触发的副作用,例如:发通知、调用AI、写数据库。
3. 项目结构与依赖
3.1 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.3.0</version>
<relativePath/>
</parent>
<groupId>com.laozhang</groupId>
<artifactId>agent-workflow-engine</artifactId>
<version>1.0.0</version>
<name>Agent Workflow Engine</name>
<properties>
<java.version>21</java.version>
<spring-ai.version>1.0.0</spring-ai.version>
<spring-statemachine.version>4.0.0</spring-statemachine.version>
</properties>
<dependencies>
<!-- Spring Boot 基础 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Spring Statemachine 核心 -->
<dependency>
<groupId>org.springframework.statemachine</groupId>
<artifactId>spring-statemachine-core</artifactId>
<version>${spring-statemachine.version}</version>
</dependency>
<!-- Spring Statemachine Redis 持久化 -->
<dependency>
<groupId>org.springframework.statemachine</groupId>
<artifactId>spring-statemachine-redis</artifactId>
<version>${spring-statemachine.version}</version>
</dependency>
<!-- Spring Statemachine JPA 持久化 -->
<dependency>
<groupId>org.springframework.statemachine</groupId>
<artifactId>spring-statemachine-data-jpa</artifactId>
<version>${spring-statemachine.version}</version>
</dependency>
<!-- Spring AI - OpenAI -->
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-openai-spring-boot-starter</artifactId>
<version>${spring-ai.version}</version>
</dependency>
<!-- 数据库 -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<scope>runtime</scope>
</dependency>
<!-- 工具 -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<!-- Micrometer 监控 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
<!-- 测试 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.statemachine</groupId>
<artifactId>spring-statemachine-test</artifactId>
<version>${spring-statemachine.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>3.2 application.yml
spring:
application:
name: agent-workflow-engine
datasource:
url: jdbc:mysql://localhost:3306/workflow_db?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai
username: root
password: your_password
driver-class-name: com.mysql.cj.jdbc.Driver
hikari:
maximum-pool-size: 20
minimum-idle: 5
connection-timeout: 30000
jpa:
hibernate:
ddl-auto: update
show-sql: false
properties:
hibernate:
dialect: org.hibernate.dialect.MySQL8Dialect
format_sql: true
data:
redis:
host: localhost
port: 6379
password: your_redis_password
timeout: 5000ms
lettuce:
pool:
max-active: 16
max-idle: 8
min-idle: 2
ai:
openai:
api-key: ${OPENAI_API_KEY}
base-url: https://api.openai.com
chat:
options:
model: gpt-4o
temperature: 0.1
max-tokens: 2048
# 工作流配置
workflow:
expense:
ai-confidence-approve-threshold: 0.80 # AI置信度>=80%直接通过
ai-confidence-reject-threshold: 0.60 # AI置信度<60%直接拒绝
manager-review-amount-threshold: 1000 # 金额>1000需主管审核(元)
max-processing-hours: 72 # 超时时间(小时)
management:
endpoints:
web:
exposure:
include: health,info,prometheus,metrics
metrics:
export:
prometheus:
enabled: true
logging:
level:
com.laozhang: DEBUG
org.springframework.statemachine: INFO4. 状态与事件枚举定义
package com.laozhang.workflow.statemachine;
/**
* 报销审批流程状态枚举
* 每个状态对应工作流的一个阶段
*/
public enum ExpenseState {
/**
* 已提交:用户提交报销单,等待AI审核
*/
SUBMITTED,
/**
* AI审核中:AI正在分析报销单
*/
AI_REVIEWING,
/**
* 人工复核中:AI置信度不足,等待人工确认
*/
MANUAL_REVIEW,
/**
* AI已通过:AI审核通过,等待下一步
*/
AI_APPROVED,
/**
* 主管审核中:金额较大,等待主管审批
*/
MANAGER_REVIEWING,
/**
* 财务确认中:等待财务最终确认
*/
FINANCE_CONFIRMING,
/**
* 已批准:审批完成,可以报销
*/
APPROVED,
/**
* 已拒绝:审批被拒绝
*/
REJECTED
}package com.laozhang.workflow.statemachine;
/**
* 报销审批流程事件枚举
* 每个事件触发一次状态转换
*/
public enum ExpenseEvent {
/**
* 提交审核:用户提交报销单
*/
SUBMIT,
/**
* AI审核通过:AI判断报销合规(置信度>=80%)
*/
AI_APPROVE,
/**
* AI审核拒绝:AI判断报销不合规(置信度<60%)
*/
AI_REJECT,
/**
* AI需要人工:AI不确定(60%<=置信度<80%)
*/
AI_NEED_HUMAN,
/**
* 人工确认通过
*/
HUMAN_APPROVE,
/**
* 人工确认拒绝
*/
HUMAN_REJECT,
/**
* 主管审批通过
*/
MANAGER_APPROVE,
/**
* 主管审批拒绝
*/
MANAGER_REJECT,
/**
* 财务确认通过
*/
FINANCE_APPROVE,
/**
* 财务确认拒绝
*/
FINANCE_REJECT,
/**
* 超时:节点处理超时
*/
TIMEOUT
}5. 状态机配置:定义完整流程
package com.laozhang.workflow.statemachine;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.statemachine.config.EnableStateMachineFactory;
import org.springframework.statemachine.config.StateMachineConfigurerAdapter;
import org.springframework.statemachine.config.builders.StateMachineConfigurationConfigurer;
import org.springframework.statemachine.config.builders.StateMachineStateConfigurer;
import org.springframework.statemachine.config.builders.StateMachineTransitionConfigurer;
import org.springframework.statemachine.listener.StateMachineListenerAdapter;
import org.springframework.statemachine.state.State;
import java.util.EnumSet;
/**
* 报销审批状态机配置
* 使用工厂模式支持每个报销单独立的状态机实例
*/
@Slf4j
@Configuration
@EnableStateMachineFactory
@RequiredArgsConstructor
public class ExpenseStateMachineConfig
extends StateMachineConfigurerAdapter<ExpenseState, ExpenseEvent> {
private final ExpenseStateMachineActions actions;
private final ExpenseStateMachineGuards guards;
/**
* 配置状态机全局设定
*/
@Override
public void configure(StateMachineConfigurationConfigurer<ExpenseState, ExpenseEvent> config)
throws Exception {
config
.withConfiguration()
.autoStartup(true)
.listener(new StateMachineListenerAdapter<>() {
@Override
public void stateChanged(
State<ExpenseState, ExpenseEvent> from,
State<ExpenseState, ExpenseEvent> to) {
if (from != null) {
log.info("[状态机] 状态变更: {} -> {}",
from.getId(), to.getId());
}
}
@Override
public void stateMachineError(
org.springframework.statemachine.StateMachine<ExpenseState, ExpenseEvent> stateMachine,
Exception exception) {
log.error("[状态机] 发生错误: {}", exception.getMessage(), exception);
}
});
}
/**
* 配置所有状态
*/
@Override
public void configure(StateMachineStateConfigurer<ExpenseState, ExpenseEvent> states)
throws Exception {
states
.withStates()
.initial(ExpenseState.SUBMITTED, actions.onSubmitted())
.states(EnumSet.allOf(ExpenseState.class))
.end(ExpenseState.APPROVED)
.end(ExpenseState.REJECTED);
}
/**
* 配置所有状态转换(工作流的核心路由逻辑)
*/
@Override
public void configure(StateMachineTransitionConfigurer<ExpenseState, ExpenseEvent> transitions)
throws Exception {
transitions
// ① 提交 → AI审核中
.withExternal()
.source(ExpenseState.SUBMITTED)
.target(ExpenseState.AI_REVIEWING)
.event(ExpenseEvent.SUBMIT)
.action(actions.startAiReview())
.and()
// ② AI审核通过(高置信度)→ AI已通过
.withExternal()
.source(ExpenseState.AI_REVIEWING)
.target(ExpenseState.AI_APPROVED)
.event(ExpenseEvent.AI_APPROVE)
.guard(guards.aiConfidenceHighEnough())
.action(actions.onAiApproved())
.and()
// ③ AI审核拒绝(低置信度)→ 已拒绝
.withExternal()
.source(ExpenseState.AI_REVIEWING)
.target(ExpenseState.REJECTED)
.event(ExpenseEvent.AI_REJECT)
.action(actions.onRejected())
.and()
// ④ AI不确定 → 人工复核
.withExternal()
.source(ExpenseState.AI_REVIEWING)
.target(ExpenseState.MANUAL_REVIEW)
.event(ExpenseEvent.AI_NEED_HUMAN)
.action(actions.notifyHumanReviewer())
.and()
// ⑤ 人工复核通过 → AI已通过
.withExternal()
.source(ExpenseState.MANUAL_REVIEW)
.target(ExpenseState.AI_APPROVED)
.event(ExpenseEvent.HUMAN_APPROVE)
.action(actions.onAiApproved())
.and()
// ⑥ 人工复核拒绝 → 已拒绝
.withExternal()
.source(ExpenseState.MANUAL_REVIEW)
.target(ExpenseState.REJECTED)
.event(ExpenseEvent.HUMAN_REJECT)
.action(actions.onRejected())
.and()
// ⑦ AI通过(大额)→ 主管审核
.withExternal()
.source(ExpenseState.AI_APPROVED)
.target(ExpenseState.MANAGER_REVIEWING)
.event(ExpenseEvent.AI_APPROVE)
.guard(guards.isLargeAmount())
.action(actions.notifyManager())
.and()
// ⑧ AI通过(小额)→ 财务确认
.withExternal()
.source(ExpenseState.AI_APPROVED)
.target(ExpenseState.FINANCE_CONFIRMING)
.event(ExpenseEvent.AI_APPROVE)
.guard(guards.isSmallAmount())
.action(actions.notifyFinance())
.and()
// ⑨ 主管通过 → 财务确认
.withExternal()
.source(ExpenseState.MANAGER_REVIEWING)
.target(ExpenseState.FINANCE_CONFIRMING)
.event(ExpenseEvent.MANAGER_APPROVE)
.action(actions.notifyFinance())
.and()
// ⑩ 主管拒绝 → 已拒绝
.withExternal()
.source(ExpenseState.MANAGER_REVIEWING)
.target(ExpenseState.REJECTED)
.event(ExpenseEvent.MANAGER_REJECT)
.action(actions.onRejected())
.and()
// ⑪ 财务通过 → 已批准
.withExternal()
.source(ExpenseState.FINANCE_CONFIRMING)
.target(ExpenseState.APPROVED)
.event(ExpenseEvent.FINANCE_APPROVE)
.action(actions.onApproved())
.and()
// ⑫ 财务拒绝 → 已拒绝
.withExternal()
.source(ExpenseState.FINANCE_CONFIRMING)
.target(ExpenseState.REJECTED)
.event(ExpenseEvent.FINANCE_REJECT)
.action(actions.onRejected())
.and()
// ⑬ 超时 → 已拒绝(任意中间状态)
.withExternal()
.source(ExpenseState.AI_REVIEWING)
.target(ExpenseState.REJECTED)
.event(ExpenseEvent.TIMEOUT)
.action(actions.onTimeout())
.and()
.withExternal()
.source(ExpenseState.MANUAL_REVIEW)
.target(ExpenseState.REJECTED)
.event(ExpenseEvent.TIMEOUT)
.action(actions.onTimeout())
.and()
.withExternal()
.source(ExpenseState.MANAGER_REVIEWING)
.target(ExpenseState.REJECTED)
.event(ExpenseEvent.TIMEOUT)
.action(actions.onTimeout());
}
}6. AI节点定义:工作流中的AI处理动作
package com.laozhang.workflow.statemachine;
import com.laozhang.workflow.model.ExpenseApplication;
import com.laozhang.workflow.service.AiReviewService;
import com.laozhang.workflow.service.NotificationService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.statemachine.StateContext;
import org.springframework.statemachine.action.Action;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
/**
* 状态机动作定义
* 每个 Action 是状态转换时执行的副作用
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class ExpenseStateMachineActions {
private final AiReviewService aiReviewService;
private final NotificationService notificationService;
// -------------------------------------------------------
// 状态进入动作
// -------------------------------------------------------
/**
* 初始状态动作:记录提交时间
*/
public Action<ExpenseState, ExpenseEvent> onSubmitted() {
return context -> {
String expenseId = getExpenseId(context);
log.info("[工作流] 报销单 {} 进入工作流", expenseId);
context.getExtendedState().getVariables()
.put("submitTime", LocalDateTime.now());
};
}
/**
* 触发AI审核:调用AI分析报销单
* 这是关键的AI节点,负责调用LLM进行审核判断
*/
public Action<ExpenseState, ExpenseEvent> startAiReview() {
return context -> {
String expenseId = getExpenseId(context);
log.info("[工作流] 开始AI审核: {}", expenseId);
try {
ExpenseApplication expense = getExpense(context);
// 调用AI审核服务(异步)
// AI会返回置信度和审核意见
aiReviewService.reviewAsync(expense)
.thenAccept(result -> {
log.info("[工作流] AI审核完成: {}, 置信度: {}, 结论: {}",
expenseId, result.getConfidence(), result.getDecision());
// 根据AI结果发送对应事件
context.getExtendedState().getVariables()
.put("aiResult", result);
// 发送AI决策事件,推动状态机继续
if (result.getConfidence() >= 0.80) {
context.getStateMachine()
.sendEvent(ExpenseEvent.AI_APPROVE);
} else if (result.getConfidence() < 0.60) {
context.getStateMachine()
.sendEvent(ExpenseEvent.AI_REJECT);
} else {
context.getStateMachine()
.sendEvent(ExpenseEvent.AI_NEED_HUMAN);
}
})
.exceptionally(ex -> {
log.error("[工作流] AI审核异常: {}", ex.getMessage(), ex);
// AI失败时转人工
context.getStateMachine()
.sendEvent(ExpenseEvent.AI_NEED_HUMAN);
return null;
});
} catch (Exception e) {
log.error("[工作流] 启动AI审核失败: {}", e.getMessage(), e);
context.getStateMachine().sendEvent(ExpenseEvent.AI_NEED_HUMAN);
}
};
}
/**
* AI通过后的动作:根据金额决定下一步路由
*/
public Action<ExpenseState, ExpenseEvent> onAiApproved() {
return context -> {
String expenseId = getExpenseId(context);
ExpenseApplication expense = getExpense(context);
log.info("[工作流] AI通过,金额: {} 元,开始路由", expense.getAmount());
// 根据金额触发不同事件
if (expense.getAmount() > 1000) {
log.info("[工作流] 大额报销,通知主管审批: {}", expenseId);
context.getStateMachine().sendEvent(ExpenseEvent.AI_APPROVE);
} else {
log.info("[工作流] 小额报销,直接财务确认: {}", expenseId);
context.getStateMachine().sendEvent(ExpenseEvent.AI_APPROVE);
}
};
}
/**
* 通知人工审核员
*/
public Action<ExpenseState, ExpenseEvent> notifyHumanReviewer() {
return context -> {
String expenseId = getExpenseId(context);
log.info("[工作流] AI不确定,通知人工审核: {}", expenseId);
notificationService.notifyReviewer(expenseId, "AI置信度不足,需要人工审核");
};
}
/**
* 通知主管审批
*/
public Action<ExpenseState, ExpenseEvent> notifyManager() {
return context -> {
String expenseId = getExpenseId(context);
ExpenseApplication expense = getExpense(context);
log.info("[工作流] 通知主管审批: {}, 金额: {}", expenseId, expense.getAmount());
notificationService.notifyManager(expense.getApplicantId(), expenseId);
};
}
/**
* 通知财务确认
*/
public Action<ExpenseState, ExpenseEvent> notifyFinance() {
return context -> {
String expenseId = getExpenseId(context);
log.info("[工作流] 通知财务确认: {}", expenseId);
notificationService.notifyFinance(expenseId);
};
}
/**
* 审批通过的最终动作
*/
public Action<ExpenseState, ExpenseEvent> onApproved() {
return context -> {
String expenseId = getExpenseId(context);
log.info("[工作流] 报销审批通过: {}", expenseId);
notificationService.notifyApplicantApproved(expenseId);
};
}
/**
* 审批拒绝的最终动作
*/
public Action<ExpenseState, ExpenseEvent> onRejected() {
return context -> {
String expenseId = getExpenseId(context);
log.info("[工作流] 报销审批拒绝: {}", expenseId);
notificationService.notifyApplicantRejected(expenseId);
};
}
/**
* 超时动作
*/
public Action<ExpenseState, ExpenseEvent> onTimeout() {
return context -> {
String expenseId = getExpenseId(context);
ExpenseState currentState = context.getSource().getId();
log.warn("[工作流] 审批超时: {}, 当前状态: {}", expenseId, currentState);
notificationService.notifyTimeout(expenseId, currentState.name());
};
}
// -------------------------------------------------------
// 工具方法
// -------------------------------------------------------
private String getExpenseId(StateContext<ExpenseState, ExpenseEvent> context) {
return (String) context.getExtendedState().getVariables().get("expenseId");
}
private ExpenseApplication getExpense(StateContext<ExpenseState, ExpenseEvent> context) {
return (ExpenseApplication) context.getExtendedState().getVariables().get("expense");
}
}7. 条件路由:Guard的实现
package com.laozhang.workflow.statemachine;
import com.laozhang.workflow.model.AiReviewResult;
import com.laozhang.workflow.model.ExpenseApplication;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.statemachine.guard.Guard;
import org.springframework.stereotype.Component;
/**
* 状态机守卫(条件路由)
* Guard 决定状态转换是否应该发生
* 这是基于AI输出结果进行动态路由的关键
*/
@Slf4j
@Component
public class ExpenseStateMachineGuards {
@Value("${workflow.expense.ai-confidence-approve-threshold:0.80}")
private double aiApproveThreshold;
@Value("${workflow.expense.manager-review-amount-threshold:1000}")
private double managerReviewThreshold;
/**
* 守卫:AI置信度是否足够高
* 只有置信度 >= 阈值才允许直接通过
*/
public Guard<ExpenseState, ExpenseEvent> aiConfidenceHighEnough() {
return context -> {
AiReviewResult result = (AiReviewResult) context.getExtendedState()
.getVariables().get("aiResult");
if (result == null) {
log.warn("[Guard] AI结果为空,拒绝通过");
return false;
}
boolean pass = result.getConfidence() >= aiApproveThreshold;
log.debug("[Guard] AI置信度检查: {} >= {} = {}",
result.getConfidence(), aiApproveThreshold, pass);
return pass;
};
}
/**
* 守卫:是否为大额报销(需要主管审批)
*/
public Guard<ExpenseState, ExpenseEvent> isLargeAmount() {
return context -> {
ExpenseApplication expense = (ExpenseApplication) context.getExtendedState()
.getVariables().get("expense");
if (expense == null) return false;
boolean isLarge = expense.getAmount() > managerReviewThreshold;
log.debug("[Guard] 大额检查: {} > {} = {}",
expense.getAmount(), managerReviewThreshold, isLarge);
return isLarge;
};
}
/**
* 守卫:是否为小额报销(直接财务确认)
*/
public Guard<ExpenseState, ExpenseEvent> isSmallAmount() {
return context -> {
ExpenseApplication expense = (ExpenseApplication) context.getExtendedState()
.getVariables().get("expense");
if (expense == null) return false;
return expense.getAmount() <= managerReviewThreshold;
};
}
}8. AI审核服务:LLM调用实现
package com.laozhang.workflow.service;
import com.laozhang.workflow.model.AiReviewResult;
import com.laozhang.workflow.model.ExpenseApplication;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.ai.chat.prompt.Prompt;
import org.springframework.ai.chat.prompt.PromptTemplate;
import org.springframework.stereotype.Service;
import java.math.BigDecimal;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
/**
* AI审核服务
* 调用LLM对报销单进行智能审核
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class AiReviewService {
private final ChatClient chatClient;
private static final String REVIEW_PROMPT_TEMPLATE = """
你是一个专业的企业报销审核AI。请审核以下报销申请,判断其合规性。
报销信息:
- 申请人:{applicantName}
- 报销类型:{expenseType}
- 报销金额:{amount}元
- 报销日期:{expenseDate}
- 报销描述:{description}
- 发票数量:{invoiceCount}
- 发票总额:{invoiceTotal}元
公司报销政策:
1. 餐饮费用:单次不超过200元/人,团队活动不超过150元/人
2. 交通费用:出差打车需附行程说明,市内交通优先公共交通
3. 住宿费用:国内出差标准城市500元/晚,一线城市800元/晚
4. 发票金额必须与报销金额匹配(误差<1元)
5. 报销时间:费用发生后30天内提交
请以JSON格式返回审核结果:
{
"decision": "APPROVE" | "REJECT" | "UNCERTAIN",
"confidence": 0.0-1.0,
"reason": "审核理由",
"violations": ["违规项1", "违规项2"]
}
只返回JSON,不要有其他文字。
""";
/**
* 异步AI审核
*/
public CompletableFuture<AiReviewResult> reviewAsync(ExpenseApplication expense) {
return CompletableFuture.supplyAsync(() -> {
long startTime = System.currentTimeMillis();
try {
String prompt = buildPrompt(expense);
String response = chatClient.prompt(prompt).call().content();
AiReviewResult result = parseResult(response);
long elapsed = System.currentTimeMillis() - startTime;
log.info("[AI审核] 完成,耗时: {}ms, 置信度: {}, 决策: {}",
elapsed, result.getConfidence(), result.getDecision());
return result;
} catch (Exception e) {
log.error("[AI审核] 调用失败: {}", e.getMessage(), e);
// 失败时返回不确定结果,走人工复核
return AiReviewResult.builder()
.decision("UNCERTAIN")
.confidence(0.5)
.reason("AI服务异常,转人工审核")
.build();
}
});
}
private String buildPrompt(ExpenseApplication expense) {
return new PromptTemplate(REVIEW_PROMPT_TEMPLATE)
.create(Map.of(
"applicantName", expense.getApplicantName(),
"expenseType", expense.getExpenseType(),
"amount", expense.getAmount(),
"expenseDate", expense.getExpenseDate(),
"description", expense.getDescription(),
"invoiceCount", expense.getInvoiceCount(),
"invoiceTotal", expense.getInvoiceTotal()
))
.getContents();
}
private AiReviewResult parseResult(String jsonResponse) {
// 实际生产中使用 ObjectMapper 解析
// 这里展示核心逻辑
try {
// 清理可能的 markdown 代码块标记
String cleaned = jsonResponse.trim()
.replaceAll("```json\\s*", "")
.replaceAll("```\\s*", "");
// 使用 Jackson 解析
com.fasterxml.jackson.databind.ObjectMapper mapper =
new com.fasterxml.jackson.databind.ObjectMapper();
return mapper.readValue(cleaned, AiReviewResult.class);
} catch (Exception e) {
log.error("[AI审核] 解析响应失败: {}", jsonResponse, e);
return AiReviewResult.builder()
.decision("UNCERTAIN")
.confidence(0.5)
.reason("响应解析失败")
.build();
}
}
}9. 工作流持久化:重启后恢复
package com.laozhang.workflow.persistence;
import com.laozhang.workflow.statemachine.ExpenseEvent;
import com.laozhang.workflow.statemachine.ExpenseState;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.statemachine.StateMachine;
import org.springframework.statemachine.StateMachineContext;
import org.springframework.statemachine.StateMachinePersist;
import org.springframework.statemachine.support.DefaultStateMachineContext;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
/**
* 状态机Redis持久化实现
* 将状态机上下文序列化存储到Redis,支持重启恢复
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class RedisStateMachinePersist
implements StateMachinePersist<ExpenseState, ExpenseEvent, String> {
private final RedisTemplate<String, Object> redisTemplate;
private static final String KEY_PREFIX = "workflow:expense:statemachine:";
private static final Duration TTL = Duration.ofDays(30);
@Override
public void write(StateMachineContext<ExpenseState, ExpenseEvent> context,
String machineId) {
String key = KEY_PREFIX + machineId;
try {
Map<String, Object> contextMap = new HashMap<>();
contextMap.put("state", context.getState().name());
contextMap.put("extendedState", context.getExtendedState());
contextMap.put("historyStates", context.getHistoryStates());
contextMap.put("id", context.getId());
redisTemplate.opsForValue().set(key, contextMap, TTL);
log.debug("[持久化] 保存状态机: {}, 当前状态: {}",
machineId, context.getState());
} catch (Exception e) {
log.error("[持久化] 保存状态机失败: {}", machineId, e);
throw new RuntimeException("状态机持久化失败", e);
}
}
@Override
@SuppressWarnings("unchecked")
public StateMachineContext<ExpenseState, ExpenseEvent> read(String machineId) {
String key = KEY_PREFIX + machineId;
try {
Map<String, Object> contextMap =
(Map<String, Object>) redisTemplate.opsForValue().get(key);
if (contextMap == null) {
log.debug("[持久化] 未找到状态机: {}, 将创建新实例", machineId);
return null;
}
ExpenseState state = ExpenseState.valueOf((String) contextMap.get("state"));
Map<Object, Object> extendedState =
(Map<Object, Object>) contextMap.get("extendedState");
log.debug("[持久化] 恢复状态机: {}, 状态: {}", machineId, state);
return new DefaultStateMachineContext<>(state, null, null, extendedState);
} catch (Exception e) {
log.error("[持久化] 恢复状态机失败: {}", machineId, e);
return null;
}
}
}package com.laozhang.workflow.service;
import com.laozhang.workflow.persistence.RedisStateMachinePersist;
import com.laozhang.workflow.statemachine.ExpenseEvent;
import com.laozhang.workflow.statemachine.ExpenseState;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.statemachine.StateMachine;
import org.springframework.statemachine.config.StateMachineFactory;
import org.springframework.statemachine.persist.StateMachinePersister;
import org.springframework.statemachine.persist.DefaultStateMachinePersister;
import org.springframework.stereotype.Service;
/**
* 工作流服务:管理状态机实例的生命周期
*/
@Slf4j
@Service
@RequiredArgsConstructor
public class WorkflowService {
private final StateMachineFactory<ExpenseState, ExpenseEvent> factory;
private final RedisStateMachinePersist persist;
/**
* 获取或恢复状态机实例
* 如果Redis中有持久化状态则恢复,否则创建新实例
*/
public StateMachine<ExpenseState, ExpenseEvent> getOrCreateMachine(String expenseId) {
StateMachine<ExpenseState, ExpenseEvent> machine = factory.getStateMachine(expenseId);
StateMachinePersister<ExpenseState, ExpenseEvent, String> persister =
new DefaultStateMachinePersister<>(persist);
try {
persister.restore(machine, expenseId);
log.info("[工作流] 恢复状态机: {}, 当前状态: {}",
expenseId, machine.getState().getId());
} catch (Exception e) {
log.info("[工作流] 无历史状态,使用全新状态机: {}", expenseId);
}
return machine;
}
/**
* 发送事件并自动持久化
*/
public boolean sendEvent(String expenseId, ExpenseEvent event) {
StateMachine<ExpenseState, ExpenseEvent> machine = getOrCreateMachine(expenseId);
boolean accepted = machine.sendEvent(
org.springframework.messaging.support.MessageBuilder
.withPayload(event)
.setHeader("expenseId", expenseId)
.build()
);
if (accepted) {
// 事件被接受后持久化状态
StateMachinePersister<ExpenseState, ExpenseEvent, String> persister =
new DefaultStateMachinePersister<>(persist);
try {
persister.persist(machine, expenseId);
log.info("[工作流] 事件 {} 处理成功,状态: {} -> {}",
event, expenseId, machine.getState().getId());
} catch (Exception e) {
log.error("[工作流] 持久化失败: {}", expenseId, e);
}
} else {
log.warn("[工作流] 事件 {} 被拒绝(当前状态不支持此事件): {}",
event, expenseId);
}
return accepted;
}
/**
* 查询当前状态
*/
public ExpenseState getCurrentState(String expenseId) {
StateMachine<ExpenseState, ExpenseEvent> machine = getOrCreateMachine(expenseId);
return machine.getState().getId();
}
}10. 实战:完整的报销审批工作流
10.1 数据模型
package com.laozhang.workflow.model;
import jakarta.persistence.*;
import lombok.Data;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
@Data
@Entity
@Table(name = "expense_application")
public class ExpenseApplication {
@Id
@GeneratedValue(strategy = GenerationType.UUID)
private String id;
@Column(nullable = false)
private String applicantId;
@Column(nullable = false)
private String applicantName;
@Column(nullable = false)
private String expenseType; // MEAL, TRAVEL, ACCOMMODATION, OTHER
@Column(nullable = false, precision = 10, scale = 2)
private BigDecimal amount;
@Column(nullable = false)
private LocalDate expenseDate;
@Column(length = 500)
private String description;
private Integer invoiceCount;
@Column(precision = 10, scale = 2)
private BigDecimal invoiceTotal;
@Column(nullable = false)
@Enumerated(EnumType.STRING)
private com.laozhang.workflow.statemachine.ExpenseState state
= com.laozhang.workflow.statemachine.ExpenseState.SUBMITTED;
private String aiReviewReason;
private Double aiConfidence;
private String managerId;
private String managerComment;
private String financeComment;
@Column(nullable = false, updatable = false)
private LocalDateTime createdAt = LocalDateTime.now();
private LocalDateTime updatedAt;
@PreUpdate
public void preUpdate() {
this.updatedAt = LocalDateTime.now();
}
}package com.laozhang.workflow.model;
import lombok.Builder;
import lombok.Data;
import java.util.List;
@Data
@Builder
public class AiReviewResult {
private String decision; // APPROVE, REJECT, UNCERTAIN
private double confidence; // 0.0 - 1.0
private String reason; // 审核理由
private List<String> violations; // 违规项列表
}10.2 REST Controller
package com.laozhang.workflow.controller;
import com.laozhang.workflow.model.ExpenseApplication;
import com.laozhang.workflow.service.ExpenseWorkflowService;
import com.laozhang.workflow.statemachine.ExpenseEvent;
import com.laozhang.workflow.statemachine.ExpenseState;
import lombok.RequiredArgsConstructor;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import java.util.Map;
/**
* 报销审批工作流控制器
*/
@RestController
@RequestMapping("/api/expenses")
@RequiredArgsConstructor
public class ExpenseController {
private final ExpenseWorkflowService expenseWorkflowService;
/**
* 提交报销申请,启动工作流
*/
@PostMapping
public ResponseEntity<Map<String, Object>> submitExpense(
@RequestBody ExpenseApplication expense) {
String expenseId = expenseWorkflowService.submitExpense(expense);
return ResponseEntity.ok(Map.of(
"expenseId", expenseId,
"status", "SUBMITTED",
"message", "报销申请已提交,AI正在审核中"
));
}
/**
* 查询工作流状态
*/
@GetMapping("/{expenseId}/state")
public ResponseEntity<Map<String, Object>> getState(
@PathVariable String expenseId) {
ExpenseState state = expenseWorkflowService.getState(expenseId);
return ResponseEntity.ok(Map.of(
"expenseId", expenseId,
"state", state.name(),
"description", getStateDescription(state)
));
}
/**
* 主管审批
*/
@PostMapping("/{expenseId}/manager-review")
public ResponseEntity<Map<String, Object>> managerReview(
@PathVariable String expenseId,
@RequestParam boolean approved,
@RequestParam(required = false) String comment) {
ExpenseEvent event = approved ? ExpenseEvent.MANAGER_APPROVE : ExpenseEvent.MANAGER_REJECT;
boolean success = expenseWorkflowService.processEvent(expenseId, event, comment);
return ResponseEntity.ok(Map.of(
"success", success,
"message", success ? "审批成功" : "当前状态不允许此操作"
));
}
/**
* 财务确认
*/
@PostMapping("/{expenseId}/finance-confirm")
public ResponseEntity<Map<String, Object>> financeConfirm(
@PathVariable String expenseId,
@RequestParam boolean approved,
@RequestParam(required = false) String comment) {
ExpenseEvent event = approved ? ExpenseEvent.FINANCE_APPROVE : ExpenseEvent.FINANCE_REJECT;
boolean success = expenseWorkflowService.processEvent(expenseId, event, comment);
return ResponseEntity.ok(Map.of(
"success", success,
"message", success ? "确认成功" : "当前状态不允许此操作"
));
}
/**
* 人工复核
*/
@PostMapping("/{expenseId}/manual-review")
public ResponseEntity<Map<String, Object>> manualReview(
@PathVariable String expenseId,
@RequestParam boolean approved,
@RequestParam(required = false) String comment) {
ExpenseEvent event = approved ? ExpenseEvent.HUMAN_APPROVE : ExpenseEvent.HUMAN_REJECT;
boolean success = expenseWorkflowService.processEvent(expenseId, event, comment);
return ResponseEntity.ok(Map.of(
"success", success,
"message", success ? "复核成功" : "当前状态不允许此操作"
));
}
private String getStateDescription(ExpenseState state) {
return switch (state) {
case SUBMITTED -> "已提交,等待AI审核";
case AI_REVIEWING -> "AI审核中";
case MANUAL_REVIEW -> "AI不确定,等待人工复核";
case AI_APPROVED -> "AI已通过,等待路由";
case MANAGER_REVIEWING -> "等待主管审批";
case FINANCE_CONFIRMING -> "等待财务确认";
case APPROVED -> "审批通过";
case REJECTED -> "审批拒绝";
};
}
}11. 工作流监控:节点耗时与通过率
package com.laozhang.workflow.monitor;
import com.laozhang.workflow.statemachine.ExpenseEvent;
import com.laozhang.workflow.statemachine.ExpenseState;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Timer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.statemachine.StateContext;
import org.springframework.statemachine.listener.StateMachineListenerAdapter;
import org.springframework.statemachine.state.State;
import org.springframework.statemachine.transition.Transition;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
/**
* 工作流监控组件
* 追踪每个节点的耗时、通过率、转换计数
*/
@Slf4j
@Component
public class WorkflowMetricsListener
extends StateMachineListenerAdapter<ExpenseState, ExpenseEvent> {
private final MeterRegistry meterRegistry;
// 记录每个状态的进入时间(machineId -> enterTime)
private final Map<String, Long> stateEnterTimes = new ConcurrentHashMap<>();
public WorkflowMetricsListener(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
}
@Override
public void stateEntered(State<ExpenseState, ExpenseEvent> state) {
String stateName = state.getId().name();
// 记录状态进入时间
String timingKey = stateName + ":" + Thread.currentThread().getId();
stateEnterTimes.put(timingKey, System.currentTimeMillis());
// 计数:状态进入次数
Counter.builder("workflow.state.entered")
.tag("state", stateName)
.register(meterRegistry)
.increment();
log.debug("[监控] 进入状态: {}", stateName);
}
@Override
public void stateExited(State<ExpenseState, ExpenseEvent> state) {
String stateName = state.getId().name();
String timingKey = stateName + ":" + Thread.currentThread().getId();
// 计算在此状态的停留时间
Long enterTime = stateEnterTimes.remove(timingKey);
if (enterTime != null) {
long durationMs = System.currentTimeMillis() - enterTime;
Timer.builder("workflow.state.duration")
.tag("state", stateName)
.register(meterRegistry)
.record(durationMs, TimeUnit.MILLISECONDS);
log.debug("[监控] 离开状态: {}, 停留: {}ms", stateName, durationMs);
}
}
@Override
public void transition(Transition<ExpenseState, ExpenseEvent> transition) {
if (transition.getSource() != null && transition.getTarget() != null) {
String from = transition.getSource().getId().name();
String to = transition.getTarget().getId().name();
String event = transition.getTrigger() != null
? transition.getTrigger().getEvent().name()
: "AUTO";
// 计数:状态转换次数
Counter.builder("workflow.transition")
.tag("from", from)
.tag("to", to)
.tag("event", event)
.register(meterRegistry)
.increment();
log.info("[监控] 状态转换: {} --[{}]--> {}", from, event, to);
}
}
}11.1 监控数据示例(Prometheus指标)
# 各状态进入次数
workflow_state_entered_total{state="AI_REVIEWING"} 1542
workflow_state_entered_total{state="MANUAL_REVIEW"} 231
workflow_state_entered_total{state="APPROVED"} 1180
workflow_state_entered_total{state="REJECTED"} 362
# 各状态平均停留时间(毫秒)
workflow_state_duration_seconds{state="AI_REVIEWING",quantile="0.5"} 1.823
workflow_state_duration_seconds{state="AI_REVIEWING",quantile="0.95"} 4.512
workflow_state_duration_seconds{state="MANUAL_REVIEW",quantile="0.5"} 14400.0
workflow_state_duration_seconds{state="MANAGER_REVIEWING",quantile="0.5"} 28800.0
# 状态转换计数(可以算出通过率)
workflow_transition_total{from="AI_REVIEWING",to="AI_APPROVED"} 1260
workflow_transition_total{from="AI_REVIEWING",to="MANUAL_REVIEW"} 231
workflow_transition_total{from="AI_REVIEWING",to="REJECTED"} 51关键指标计算:
- AI一次通过率 = 1260 / (1260 + 231 + 51) = 81.7%
- 最终通过率 = 1180 / 1542 = 76.5%
- AI平均审核时间 = 1.8秒(P50),4.5秒(P95)
- 人工复核平均时间 = 4小时(P50)
12. 性能测试数据
在 16核32G 服务器上进行压测,状态机+AI审核的整体性能:
| 并发数 | 平均响应(ms) | P99响应(ms) | 吞吐量(req/s) | AI调用成功率 |
|---|---|---|---|---|
| 10 | 1,842 | 4,201 | 5.4 | 99.9% |
| 50 | 2,156 | 6,832 | 23.2 | 99.8% |
| 100 | 2,891 | 9,445 | 34.6 | 99.7% |
| 200 | 4,223 | 15,331 | 47.3 | 99.5% |
瓶颈在AI API调用延迟(平均1.8秒)。本地状态机逻辑开销 < 5ms,可以忽略不计。
FAQ
Q:Spring Statemachine 和 Activiti/Flowable 这类流程引擎有什么区别?
A:Spring Statemachine 更轻量,适合代码驱动的流程控制,与Spring生态无缝集成。Activiti/Flowable 是完整的BPMN流程引擎,适合可视化配置、有人工任务审批的复杂业务流程,功能更全但也更重。如果你需要 Web 可视化设计流程,选 Flowable;如果是代码驱动、需要灵活扩展,选 Spring Statemachine。
Q:状态机实例是有状态的,在分布式环境下怎么处理?
A:每次操作都从 Redis 加载状态机上下文,处理完再写回。不在内存中长期持有状态机实例(或者只缓存很短时间)。本文的 WorkflowService.getOrCreateMachine() 就是每次都从 Redis 恢复,保证分布式环境下的正确性。
Q:如果 AI 调用超时了,工作流会卡住吗?
A:不会。startAiReview() 是异步执行的(CompletableFuture),并设置了 .exceptionally() 兜底,AI超时或失败时自动发送 AI_NEED_HUMAN 事件,转入人工审核流程。另外可以配置定时任务扫描长时间在 AI_REVIEWING 状态的单据,发送 TIMEOUT 事件。
Q:流程定义能热更新吗?不重启服务?
A:Spring Statemachine 的流程定义是编译时确定的,不支持热更新。如果需要运行时配置流程,可以考虑在数据库里存储流程定义,在 Action 中读取配置来决定路由。或者使用 Flowable/Camunda,它们天然支持 BPMN 热部署。
Q:如何处理"撤回"功能?用户提交后想撤回申请?
A:添加一个 WITHDRAWN 状态和 WITHDRAW 事件。在 SUBMITTED、AI_REVIEWING、MANUAL_REVIEW 状态下都可以触发 WITHDRAW 事件转到 WITHDRAWN 终态。注意添加业务校验(只有申请人本人可以撤回)在 Guard 中实现。
总结
状态机解决了硬编码工作流的核心痛点:
- 流程可读性:状态转换配置即文档,团队成员都能看懂流程
- 扩展性:新增节点只需添加状态和转换,不影响现有逻辑
- 持久化:天然支持重启恢复,AI 异步处理不丢状态
- 可观测性:状态转换事件驱动监控,通过率/耗时一目了然
李明用了1.5天重构了整个报销审批系统,之后新需求从"改3-5天"变成了"加配置0.5天"。
关键数字回顾:
- AI一次通过率:81.7%(减少了18.3%的人工介入)
- AI平均审核时间:1.8秒(vs 人工平均4小时)
- 新增/修改流程节点时间:从3-5天降到0.5天
