第1676篇:Human-in-the-loop设计模式——在Agent自动化中保留人工干预点
第1676篇:Human-in-the-loop设计模式——在Agent自动化中保留人工干预点
有个朋友做了个自动化采购Agent,跑了两天,某个供应商API价格更新了,Agent自动帮他采购了一批货,价格比正常高了30%。没有人工干预点,Agent一路绿灯执行完了。
这件事让我想了很多。Agent自动化的价值无需质疑,但把人完全排除在决策链之外,在当前的AI能力水平下是危险的。更合理的设计是:让Agent处理它擅长的重复性、规则性工作,在真正需要判断的关键节点,保留人的参与。
这就是Human-in-the-loop(HITL),今天系统聊聊怎么在生产级Agent系统里设计好这个机制。
HITL的层次:不是一刀切的"每步都问"
很多人误解HITL,以为就是每一步都弹窗让用户确认,那Agent和纯人工有什么区别?
HITL应该是分级的:
关键是要做好风险分级,让低风险操作顺畅流转,只在真正需要人参与的地方设置检查点。
HITL的四种设计模式
模式一:Interrupt-and-wait(中断等待)
最常见的模式:Agent执行到某个检查点时暂停,等待人工确认后继续。
@Service
public class InterruptableAgentExecutor {
private final HumanInteractionService humanInteraction;
private final AgentStateStore stateStore;
public AgentResult execute(AgentTask task) {
AgentPlan plan = planAgent.createPlan(task);
AgentExecutionContext context = new AgentExecutionContext(task, plan);
for (int stepIdx = 0; stepIdx < plan.getSteps().size(); stepIdx++) {
PlanStep step = plan.getSteps().get(stepIdx);
// 检查这个步骤是否需要人工确认
InterruptPolicy policy = checkInterruptPolicy(step, context);
if (policy.requiresInterrupt()) {
// 保存当前执行状态
String checkpointId = stateStore.saveCheckpoint(context, stepIdx);
// 发起人工干预请求
HumanInteractionRequest request = HumanInteractionRequest.builder()
.checkpointId(checkpointId)
.taskId(task.getTaskId())
.pendingAction(formatActionDescription(step))
.riskLevel(policy.getRiskLevel())
.impact(policy.getEstimatedImpact())
.context(summarizeContext(context))
.timeout(policy.getTimeout())
.build();
HumanDecision decision = humanInteraction.requestDecision(request);
switch (decision.getAction()) {
case APPROVE -> {
// 继续执行,可能附带修改
if (decision.hasModifications()) {
step = applyModifications(step, decision.getModifications());
}
}
case REJECT -> {
return AgentResult.rejectedByHuman(
task.getTaskId(),
decision.getReason()
);
}
case MODIFY_AND_CONTINUE -> {
// 人工修改了后续计划
plan = applyPlanModification(plan, decision.getPlanChanges(), stepIdx);
}
case ABORT -> {
// 中止整个任务并回滚
rollbackCompletedSteps(context);
return AgentResult.aborted(task.getTaskId());
}
}
}
// 执行步骤
StepResult result = executeStep(step, context);
context.recordStepResult(stepIdx, result);
}
return AgentResult.success(task.getTaskId(), context.getOutputs());
}
private InterruptPolicy checkInterruptPolicy(PlanStep step,
AgentExecutionContext context) {
List<InterruptRule> matchedRules = interruptRules.stream()
.filter(rule -> rule.matches(step, context))
.sorted(Comparator.comparingInt(r -> r.getPriority()))
.collect(Collectors.toList());
if (matchedRules.isEmpty()) {
return InterruptPolicy.noInterrupt();
}
// 取最严格的规则
return matchedRules.get(0).getPolicy();
}
}模式二:Approve-before-commit(提交前审批)
所有步骤先模拟执行(dry run),生成执行报告,人工审批后再真正提交。
@Service
public class ApproveBeforeCommitAgent {
public AgentResult execute(AgentTask task) {
AgentPlan plan = planAgent.createPlan(task);
// 第一阶段:Dry Run,收集所有预期变更
DryRunResult dryRun = executeDryRun(plan);
// 生成变更预览报告
ChangePreviewReport report = generateChangePreview(dryRun);
// 提交人工审批
HumanDecision decision = humanInteraction.requestApproval(
ApprovalRequest.builder()
.taskId(task.getTaskId())
.changePreview(report)
.summary(report.getSummary())
.affectedResources(report.getAffectedResources())
.estimatedCost(dryRun.getEstimatedCost())
.build()
);
if (decision.getAction() != HumanAction.APPROVE) {
return AgentResult.rejectedByHuman(task.getTaskId(), decision.getReason());
}
// 第二阶段:真正执行
return executeForReal(plan);
}
/**
* Dry Run:执行所有步骤但不提交
* 每个工具都有对应的dry run实现
*/
private DryRunResult executeDryRun(AgentPlan plan) {
DryRunContext dryRunCtx = new DryRunContext(); // 所有写操作只记录不执行
for (PlanStep step : plan.getSteps()) {
ToolRegistration tool = toolRegistry.findByName(step.getTool()).get();
if (tool.getCapabilities().isReadOnly()) {
// 只读工具:直接执行获取真实数据
dryRunCtx.recordRead(step, executeStep(step, dryRunCtx));
} else {
// 写操作:只记录意图,不真正执行
dryRunCtx.recordWrite(step,
tool.getDryRunHandler().simulate(step.getParams())
);
}
}
return dryRunCtx.buildResult();
}
private ChangePreviewReport generateChangePreview(DryRunResult dryRun) {
StringBuilder sb = new StringBuilder();
sb.append("## 执行计划预览\n\n");
sb.append("此操作将进行以下变更:\n\n");
for (PlannedChange change : dryRun.getPlannedChanges()) {
sb.append(String.format("- **%s**: %s\n",
change.getType(), change.getDescription()));
if (change.getType() == ChangeType.DELETE) {
sb.append(String.format(" ⚠️ 将删除 %d 条记录\n",
change.getAffectedCount()));
}
}
sb.append(String.format("\n预计Token消耗:%d\n", dryRun.getEstimatedTokens()));
sb.append(String.format("预计执行时间:%s\n", dryRun.getEstimatedDuration()));
return new ChangePreviewReport(sb.toString(), dryRun.getAffectedResources(),
dryRun.getEstimatedCost());
}
}模式三:Exception-based(异常触发)
正常情况下完全自动化,只有在遇到异常情况(超出预期范围)时才触发人工干预。
@Service
public class ExceptionBasedHITL {
private final ExceptionThresholdConfig thresholds;
public ToolResult executeWithExceptionHandling(PlanStep step,
AgentExecutionContext context) {
try {
ToolResult result = executeStep(step, context);
// 检查结果是否在预期范围内
List<AnomalyDetection> anomalies = detectAnomalies(result, step, context);
if (!anomalies.isEmpty()) {
// 有异常,触发人工干预
return handleAnomalies(anomalies, step, context, result);
}
return result;
} catch (UnexpectedException e) {
// 遇到意外情况,人工决策
return escalateToHuman(step, context, e);
}
}
private List<AnomalyDetection> detectAnomalies(ToolResult result,
PlanStep step,
AgentExecutionContext context) {
List<AnomalyDetection> anomalies = new ArrayList<>();
// 检查1:结果数量异常
if (result.getAffectedCount() != null) {
int expectedCount = context.getExpectedAffectedCount(step.getStepId());
if (result.getAffectedCount() > expectedCount * 2) {
anomalies.add(AnomalyDetection.of(
AnomalyType.UNEXPECTED_SCALE,
String.format("影响数量(%d)远超预期(%d)",
result.getAffectedCount(), expectedCount)
));
}
}
// 检查2:成本超出预算
if (result.getCost() != null) {
double budgetLimit = context.getBudgetLimit();
double totalCost = context.getAccumulatedCost() + result.getCost();
if (totalCost > budgetLimit * 0.9) {
anomalies.add(AnomalyDetection.of(
AnomalyType.BUDGET_ALERT,
String.format("累计成本(%.2f)已接近预算上限(%.2f)",
totalCost, budgetLimit)
));
}
}
// 检查3:时间异常(可能卡住了)
if (result.getDuration().toSeconds() > thresholds.getMaxStepDurationSeconds()) {
anomalies.add(AnomalyDetection.of(
AnomalyType.TIMEOUT_CONCERN,
"步骤执行时间异常,可能存在问题"
));
}
return anomalies;
}
private ToolResult handleAnomalies(List<AnomalyDetection> anomalies,
PlanStep step,
AgentExecutionContext context,
ToolResult partialResult) {
// 通知人工审核
HumanDecision decision = humanInteraction.requestDecision(
AnomalyReviewRequest.builder()
.anomalies(anomalies)
.partialResult(partialResult)
.step(step)
.options(List.of("继续执行", "回滚此步骤", "中止任务"))
.build()
);
return switch (decision.getAction()) {
case CONTINUE -> partialResult;
case ROLLBACK_STEP -> {
rollbackStep(step, partialResult);
yield ToolResult.rolledBack();
}
case ABORT -> throw new TaskAbortedException("人工中止任务");
default -> partialResult;
};
}
}模式四:Collaborative(协作模式)
Agent和人类真正协作:Agent提出方案,人类选择或调整,双向互动推进任务。
@Service
public class CollaborativeAgent {
public AgentResult execute(AgentTask task) {
// Agent先做分析,给出多个方案供人选择
List<AgentProposal> proposals = generateProposals(task);
// 人类选择方案
SelectedProposal selected = humanInteraction.selectProposal(
ProposalSelectionRequest.builder()
.taskId(task.getTaskId())
.proposals(proposals)
.allowCustomization(true)
.build()
);
AgentPlan plan = selected.isCustomized()
? planAgent.createPlanFromCustomInput(selected.getCustomInput())
: proposals.get(selected.getProposalIndex()).getPlan();
// 执行过程中,人类可以随时调整
return executeWithHumanGuidance(task, plan);
}
private List<AgentProposal> generateProposals(AgentTask task) {
// 生成2-3个不同策略的方案
return List.of(
generateProposal(task, "快速策略", "优先速度,可能消耗更多资源"),
generateProposal(task, "节约策略", "优先节省成本,但可能需要更长时间"),
generateProposal(task, "均衡策略", "速度和成本的平衡")
);
}
private AgentResult executeWithHumanGuidance(AgentTask task, AgentPlan plan) {
AgentExecutionContext context = new AgentExecutionContext(task, plan);
for (int i = 0; i < plan.getSteps().size(); i++) {
PlanStep step = plan.getSteps().get(i);
// 执行前给人类一个"插嘴"的机会(非阻塞,有默认超时)
Optional<HumanGuidance> guidance = humanInteraction.pollGuidance(
task.getTaskId(), Duration.ofSeconds(5) // 5秒内没有输入就自动继续
);
if (guidance.isPresent()) {
// 人类有新指令,融入执行计划
plan = adjustPlan(plan, guidance.get(), i);
step = plan.getSteps().get(i);
}
StepResult result = executeStep(step, context);
context.recordStepResult(i, result);
// 执行后汇报进展
humanInteraction.updateProgress(
task.getTaskId(),
ProgressUpdate.of(i + 1, plan.getSteps().size(),
result.getSummary())
);
}
return AgentResult.success(task.getTaskId(), context.getOutputs());
}
}人工干预的通知渠道
HITL不能只依赖同步等待,需要支持多种通知渠道,让人能在不同场景下响应:
@Service
public class MultiChannelHumanInteractionService {
private final WebSocketNotifier websocketNotifier;
private final SlackNotifier slackNotifier;
private final EmailNotifier emailNotifier;
private final PendingDecisionRepository pendingDecisions;
public HumanDecision requestDecision(HumanInteractionRequest request) {
// 持久化请求(防止服务重启导致请求丢失)
String decisionId = persistRequest(request);
// 根据紧急程度选择通知渠道
NotificationChannel channel = selectChannel(request);
switch (channel) {
case WEBSOCKET -> {
// 用户在线,实时通知
websocketNotifier.push(request.getAssignee(),
buildWebSocketNotification(decisionId, request));
}
case SLACK -> {
// 工作时间内,发Slack消息
slackNotifier.sendInteractiveMessage(
request.getAssigneeSlack(),
buildSlackMessage(decisionId, request)
);
}
case EMAIL -> {
// 优先级低,发邮件
emailNotifier.send(
request.getAssigneeEmail(),
"Agent任务需要您的决策",
buildEmailContent(decisionId, request)
);
}
}
// 阻塞等待决策,带超时
return waitForDecision(decisionId, request.getTimeout());
}
private HumanDecision waitForDecision(String decisionId, Duration timeout) {
long deadline = System.currentTimeMillis() + timeout.toMillis();
while (System.currentTimeMillis() < deadline) {
Optional<HumanDecision> decision = pendingDecisions.getDecision(decisionId);
if (decision.isPresent()) {
return decision.get();
}
try {
Thread.sleep(2000); // 每2秒检查一次
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
// 超时处理:根据策略自动决策
return handleTimeout(decisionId);
}
private HumanDecision handleTimeout(String decisionId) {
PendingDecision pending = pendingDecisions.findById(decisionId);
return switch (pending.getTimeoutPolicy()) {
case AUTO_APPROVE -> HumanDecision.autoApproved("超时自动批准");
case AUTO_REJECT -> HumanDecision.autoRejected("超时自动拒绝");
case ESCALATE -> {
// 升级给更高级别的审批人
escalate(pending);
yield HumanDecision.escalated("已升级审批");
}
};
}
// REST API:人工通过API提交决策(Web界面/移动端调用)
@PostMapping("/api/hitl/decisions/{decisionId}")
public ResponseEntity<Void> submitDecision(
@PathVariable String decisionId,
@RequestBody HumanDecisionRequest decisionRequest) {
PendingDecision pending = pendingDecisions.findById(decisionId);
// 验证决策者权限
if (!hasPermission(SecurityContext.getCurrentUser(), pending)) {
return ResponseEntity.status(403).build();
}
HumanDecision decision = HumanDecision.builder()
.decisionId(decisionId)
.action(decisionRequest.getAction())
.reason(decisionRequest.getReason())
.modifications(decisionRequest.getModifications())
.decidedBy(SecurityContext.getCurrentUser())
.decidedAt(LocalDateTime.now())
.build();
pendingDecisions.saveDecision(decisionId, decision);
return ResponseEntity.ok().build();
}
}审批工作流与SLA管理
生产环境里,HITL不只是发通知等确认,还需要完整的审批工作流。
@Service
public class HITLWorkflowService {
/**
* 多级审批流:根据风险等级决定需要多少级审批
*/
public ApprovalProcess createApprovalProcess(HumanInteractionRequest request) {
ApprovalProcess process = new ApprovalProcess();
process.setRequestId(request.getCheckpointId());
process.setCreatedAt(LocalDateTime.now());
// 一级审批:直接负责人
process.addLevel(ApprovalLevel.of(
1,
request.getAssignee(),
Duration.ofMinutes(30) // 30分钟SLA
));
// 高风险时增加二级审批
if (request.getRiskLevel() == RiskLevel.HIGH) {
process.addLevel(ApprovalLevel.of(
2,
getDirectManager(request.getAssignee()),
Duration.ofHours(2) // 2小时SLA
));
}
// 超高风险增加三级审批
if (request.getRiskLevel() == RiskLevel.CRITICAL) {
process.addLevel(ApprovalLevel.of(
3,
getRiskOfficer(),
Duration.ofHours(8) // 8小时SLA
));
}
return process;
}
/**
* SLA监控:审批超时自动升级
*/
@Scheduled(fixedDelay = 60000)
public void checkSLABreaches() {
List<PendingApproval> overdueApprovals = pendingApprovalRepository
.findOverdue(LocalDateTime.now());
for (PendingApproval approval : overdueApprovals) {
int currentLevel = approval.getCurrentLevel();
ApprovalLevel level = approval.getProcess().getLevel(currentLevel);
Duration overdue = Duration.between(
approval.getLevelStartedAt(), LocalDateTime.now()
);
Duration sla = level.getSla();
if (overdue.compareTo(sla) > 0) {
log.warn("审批SLA超时: requestId={}, level={}, overdue={}min",
approval.getRequestId(), currentLevel,
overdue.toMinutes() - sla.toMinutes());
// 发送催办通知
notifyOverdue(approval, overdue, sla);
// 如果超时超过2倍SLA,自动升级
if (overdue.compareTo(sla.multipliedBy(2)) > 0) {
escalateToNextLevel(approval);
}
}
}
}
}HITL的反馈学习
人工决策的数据是宝贵的训练素材,应该反哺到Agent的规划和风险识别能力里:
@Service
public class HITLFeedbackLearner {
/**
* 记录人工决策结果,用于改进未来的风险评估
*/
public void recordDecisionFeedback(HumanDecision decision,
HumanInteractionRequest request) {
DecisionFeedback feedback = new DecisionFeedback();
feedback.setDecisionId(decision.getDecisionId());
feedback.setAction(decision.getAction());
feedback.setRiskLevel(request.getRiskLevel());
feedback.setToolName(request.getPendingAction().getToolName());
feedback.setDecisionReason(decision.getReason());
feedback.setWasCorrect(null); // 后续可以回填
feedbackRepository.save(feedback);
}
/**
* 分析决策模式:某类操作总是被人工批准/拒绝
* 用于调整自动化策略
*/
@Scheduled(cron = "0 0 8 * * MON")
public void analyzeFeedbackPatterns() {
// 统计各类操作的批准率
Map<String, ApprovalStats> stats = feedbackRepository
.aggregateByOperation(LocalDate.now().minusWeeks(4));
List<PolicyRecommendation> recommendations = new ArrayList<>();
for (Map.Entry<String, ApprovalStats> entry : stats.entrySet()) {
String operation = entry.getKey();
ApprovalStats s = entry.getValue();
if (s.getTotalDecisions() < 10) continue; // 样本不足
double approvalRate = s.getApprovalRate();
if (approvalRate > 0.95) {
// 这类操作几乎总是被批准,考虑降低中断级别
recommendations.add(PolicyRecommendation.of(
operation,
"批准率高达" + (int)(approvalRate * 100) + "%,建议降低为自动执行或仅通知",
RecommendationType.REDUCE_INTERRUPTS
));
} else if (approvalRate < 0.3) {
// 这类操作经常被拒绝,可能需要更严格的前置检查
recommendations.add(PolicyRecommendation.of(
operation,
"拒绝率高,建议加强执行前验证或提高风险等级",
RecommendationType.INCREASE_RIGOR
));
}
}
if (!recommendations.isEmpty()) {
reportService.sendPolicyRecommendations(recommendations);
}
}
}几点重要的设计原则
做了这么多HITL项目,总结几个不变的原则:
原则1:默认不信任,需要赢得自主权。 新的Agent功能上线时,所有写操作都先设置为需要人工确认。随着运行时间增长、历史数据积累,逐步放开某些操作的自动化。不要反过来:一开始全自动,出了问题再加限制。
原则2:给人类足够的上下文。 不能只告诉人"确认还是拒绝",要给出清晰的背景信息:这个操作是什么,为什么Agent要执行它,会影响什么。决策不带上下文,人工审批就沦为走形式。
原则3:超时策略要谨慎。 超时自动批准是最危险的设置,适合真的低风险的场景。高风险操作超时应该自动拒绝或升级,而不是自动批准。
原则4:记录每一个决策。 人工决策要详细记录:谁决策、何时决策、决策原因、最终结果。这既是审计需要,也是未来改进Agent的数据来源。
原则5:让人类参与得有价值。 如果每次中断都是"是否继续执行",人类审批者很快会形成"默认批准"的惯性,HITL失去意义。中断的场景要精心设计,每次中断都要提供真正有价值的决策信息,让人的判断能带来实质性的价值。
HITL不是Agent能力不足时的妥协,而是在当前技术水平下,保证系统安全可靠的工程选择。等LLM的推理能力和可信度提升到足够高,我们可以逐步减少干预点。但在那一天到来之前,Human-in-the-loop是负责任的工程师应该认真对待的设计模式。
