第1636篇:Chaos Engineering在AI系统中的实践——故意让模型"出错"来验证容错
2026/4/30大约 9 分钟
第1636篇:Chaos Engineering在AI系统中的实践——故意让模型"出错"来验证容错
前阵子我做了一件事:在测试环境里,我故意让我们的AI服务随机返回错误、随机增加延迟、随机截断响应。看着系统在这些"故障"下的表现,比任何代码审查都让我学到东西多。
这就是混沌工程(Chaos Engineering)的核心思路:主动注入故障,观察系统的真实行为,提前发现那些"以为不会出现"的问题。
Netflix的工程师在2011年发明了这个概念,用来测试他们的微服务系统。但在AI系统里,这套思路有一些新的维度——模型的行为本来就有不确定性,要在这个基础上再加故障注入,需要更精细的设计。
AI系统的故障模式分析
在做混沌实验之前,先要清楚AI系统可能有哪些故障模式:
普通微服务的混沌实验主要关注网络层和应用层,但AI系统还有独特的"模型层故障",这是需要专门测试的。
故障注入框架设计
我们自己实现了一个轻量的故障注入框架,核心是一个FaultInjector接口:
public interface FaultInjector {
/**
* 判断当前请求是否应该注入故障
*/
boolean shouldInject(FaultContext context);
/**
* 注入故障
* 可以抛出异常(模拟错误),也可以修改请求/响应(模拟异常数据)
*/
void inject(FaultContext context) throws FaultException;
/**
* 故障类型
*/
FaultType getType();
}
@Data
@Builder
public class FaultContext {
private String requestId;
private String userId;
private String serviceType; // "chat" / "embedding" / "agent"
private String environment; // "test" / "staging"
private Map<String, Object> metadata;
}
public enum FaultType {
TIMEOUT, // 超时
CONNECTION_ERROR, // 连接错误
SLOW_RESPONSE, // 慢响应
EMPTY_RESPONSE, // 空响应
MALFORMED_RESPONSE,// 格式错误的响应
PARTIAL_RESPONSE, // 截断响应
RATE_LIMIT, // 限流
HARMFUL_CONTENT, // 有害内容(测试内容过滤)
}然后实现几个具体的注入器:
@Component
@Slf4j
public class TimeoutFaultInjector implements FaultInjector {
private final FaultConfig config;
private final Random random = new Random();
@Override
public boolean shouldInject(FaultContext context) {
if (!config.isEnabled()) return false;
if (!"test".equals(context.getEnvironment()) &&
!"staging".equals(context.getEnvironment())) {
return false; // 只在测试/预发环境注入
}
return random.nextDouble() < config.getTimeoutProbability();
}
@Override
public void inject(FaultContext context) throws FaultException {
int delayMs = config.getTimeoutDelayMs();
log.info("[混沌] 注入超时故障,requestId={}, 延迟={}ms",
context.getRequestId(), delayMs);
try {
Thread.sleep(delayMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
throw new FaultException(FaultType.TIMEOUT, "模拟超时故障");
}
@Override
public FaultType getType() {
return FaultType.TIMEOUT;
}
}
@Component
@Slf4j
public class MalformedResponseFaultInjector implements FaultInjector {
// 这个故障注入器不抛异常,而是返回格式错误的响应
// 用于测试下游对异常格式的处理
private final FaultConfig config;
@Override
public boolean shouldInject(FaultContext context) {
if (!config.isEnabled()) return false;
return "chat".equals(context.getServiceType()) &&
Math.random() < config.getMalformedResponseProbability();
}
@Override
public void inject(FaultContext context) throws FaultException {
log.info("[混沌] 注入格式错误响应故障,requestId={}", context.getRequestId());
context.getMetadata().put("inject_malformed_response", true);
// 不抛异常,而是通过context标记,让后续的拦截器修改响应
}
@Override
public FaultType getType() {
return FaultType.MALFORMED_RESPONSE;
}
}
@Component
@Slf4j
public class SlowResponseFaultInjector implements FaultInjector {
private final FaultConfig config;
@Override
public boolean shouldInject(FaultContext context) {
if (!config.isEnabled()) return false;
// 只对没有超时配置的请求注入慢响应
return Math.random() < config.getSlowResponseProbability();
}
@Override
public void inject(FaultContext context) throws FaultException {
// 不抛异常,只增加延迟
int additionalDelayMs = 2000 + (int)(Math.random() * 3000); // 2-5秒额外延迟
log.info("[混沌] 注入慢响应故障,requestId={}, 额外延迟={}ms",
context.getRequestId(), additionalDelayMs);
try {
Thread.sleep(additionalDelayMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// 继续正常流程,只是变慢了
}
@Override
public FaultType getType() {
return FaultType.SLOW_RESPONSE;
}
}故障注入Advisor
把故障注入整合到Spring AI的Advisor链里:
@Component
@Slf4j
@ConditionalOnProperty(name = "chaos.engineering.enabled", havingValue = "true")
public class ChaosEngineeringAdvisor implements AroundAdvisor {
private final List<FaultInjector> faultInjectors;
private final ChaosMetricsCollector metricsCollector;
public ChaosEngineeringAdvisor(List<FaultInjector> faultInjectors,
ChaosMetricsCollector metricsCollector) {
this.faultInjectors = faultInjectors;
this.metricsCollector = metricsCollector;
}
@Override
public AdvisedResponse aroundCall(AdvisedRequest advisedRequest,
CallAroundAdvisorChain chain) {
FaultContext context = FaultContext.builder()
.requestId(UUID.randomUUID().toString())
.userId((String) advisedRequest.adviseContext().get("userId"))
.serviceType("chat")
.environment(System.getProperty("spring.profiles.active", "unknown"))
.metadata(new HashMap<>())
.build();
// 找到第一个要注入的故障(通常只注入一种)
FaultInjector activeInjector = faultInjectors.stream()
.filter(injector -> injector.shouldInject(context))
.findFirst()
.orElse(null);
if (activeInjector != null) {
metricsCollector.recordFaultInjection(activeInjector.getType());
try {
activeInjector.inject(context);
} catch (FaultException e) {
log.info("[混沌] 故障注入导致异常: {}", e.getMessage());
throw new RuntimeException("Chaos fault: " + e.getMessage(), e);
}
}
// 检查是否需要修改响应(格式错误注入)
if (Boolean.TRUE.equals(context.getMetadata().get("inject_malformed_response"))) {
return injectMalformedResponse(advisedRequest, chain);
}
return chain.nextAroundCall(advisedRequest);
}
private AdvisedResponse injectMalformedResponse(AdvisedRequest request,
CallAroundAdvisorChain chain) {
AdvisedResponse normalResponse = chain.nextAroundCall(request);
if (normalResponse.response() == null ||
normalResponse.response().getResults().isEmpty()) {
return normalResponse;
}
// 故意破坏响应格式
String originalContent = normalResponse.response().getResults()
.get(0).getOutput().getContent();
// 模拟JSON格式破坏(如果内容是JSON)
String malformedContent = originalContent;
if (originalContent.trim().startsWith("{")) {
// 截断JSON
malformedContent = originalContent.substring(0,
originalContent.length() / 2);
}
AssistantMessage malformedMessage = new AssistantMessage(malformedContent);
ChatResponse malformedResponse = new ChatResponse(
List.of(new Generation(malformedMessage))
);
return new AdvisedResponse(malformedResponse, normalResponse.adviseContext());
}
@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE - 100; // 在其他Advisor之后执行
}
}自动化混沌实验场景
有了故障注入能力,下一步是设计结构化的实验场景:
@Service
@Slf4j
public class ChaosExperimentRunner {
private final AIService aiService;
private final FaultConfig faultConfig;
private final ChaosExperimentRepository repository;
/**
* 运行完整的混沌实验套件
*/
public ChaosExperimentReport runFullSuite(String testUserId) {
List<ChaosScenario> scenarios = List.of(
buildTimeoutScenario(),
buildSlowResponseScenario(),
buildEmptyResponseScenario(),
buildHighConcurrencyScenario(),
buildCascadeFailureScenario()
);
List<ScenarioResult> results = scenarios.stream()
.map(scenario -> runScenario(scenario, testUserId))
.collect(Collectors.toList());
return ChaosExperimentReport.builder()
.runAt(LocalDateTime.now())
.scenarios(results)
.overallPassed(results.stream().allMatch(ScenarioResult::isPassed))
.build();
}
private ScenarioResult runScenario(ChaosScenario scenario, String userId) {
log.info("开始混沌场景: {}", scenario.getName());
// 注入故障
faultConfig.enableFault(scenario.getFaultType(), scenario.getProbability());
List<CallResult> callResults = new ArrayList<>();
try {
for (int i = 0; i < scenario.getSampleSize(); i++) {
CallResult result = executeAndMeasure(userId,
"测试问题 " + i + ": " + scenario.getTestInput());
callResults.add(result);
}
} finally {
// 必须关闭故障注入,避免影响其他测试
faultConfig.disableFault(scenario.getFaultType());
}
return analyzeResults(scenario, callResults);
}
private CallResult executeAndMeasure(String userId, String input) {
long startTime = System.currentTimeMillis();
boolean success = true;
String error = null;
String response = null;
try {
response = aiService.chat(userId, input);
} catch (Exception e) {
success = false;
error = e.getClass().getSimpleName() + ": " + e.getMessage();
}
return CallResult.builder()
.success(success)
.latencyMs(System.currentTimeMillis() - startTime)
.error(error)
.response(response)
.build();
}
private ScenarioResult analyzeResults(ChaosScenario scenario,
List<CallResult> results) {
long successCount = results.stream().filter(CallResult::isSuccess).count();
double successRate = (double) successCount / results.size();
long avgLatency = (long) results.stream()
.mapToLong(CallResult::getLatencyMs)
.average().orElse(0);
long p99Latency = results.stream()
.mapToLong(CallResult::getLatencyMs)
.sorted()
.skip((long)(results.size() * 0.99))
.findFirst().orElse(0);
// 验证期望的行为
boolean passed = true;
List<String> violations = new ArrayList<>();
for (ScenarioExpectation expectation : scenario.getExpectations()) {
switch (expectation.getType()) {
case MIN_SUCCESS_RATE -> {
if (successRate < expectation.getValue()) {
passed = false;
violations.add(String.format(
"成功率 %.1f%% 低于期望值 %.1f%%",
successRate * 100, expectation.getValue() * 100));
}
}
case MAX_P99_LATENCY -> {
if (p99Latency > expectation.getValue()) {
passed = false;
violations.add(String.format(
"P99延迟 %dms 超过期望值 %.0fms",
p99Latency, expectation.getValue()));
}
}
case GRACEFUL_DEGRADATION -> {
// 检查失败的请求是否有优雅降级(返回了友好的错误信息而不是抛异常)
boolean hasGracefulDegradation = results.stream()
.filter(r -> !r.isSuccess())
.allMatch(r -> r.getResponse() != null &&
!r.getResponse().isEmpty());
if (!hasGracefulDegradation) {
passed = false;
violations.add("部分失败请求没有优雅降级");
}
}
}
}
log.info("场景 {} 完成: {}, 成功率={}%, P99={}ms, 违规={}",
scenario.getName(), passed ? "通过" : "失败",
(int)(successRate * 100), p99Latency, violations);
return ScenarioResult.builder()
.scenarioName(scenario.getName())
.passed(passed)
.successRate(successRate)
.avgLatency(avgLatency)
.p99Latency(p99Latency)
.violations(violations)
.build();
}
private ChaosScenario buildTimeoutScenario() {
return ChaosScenario.builder()
.name("模型超时容错测试")
.faultType(FaultType.TIMEOUT)
.probability(0.5) // 50%的请求会超时
.sampleSize(100)
.testInput("帮我写一首关于春天的诗")
.expectations(List.of(
// 超时后系统应该降级,不能完全不可用
ScenarioExpectation.minSuccessRate(0.4), // 允许超时,但至少40%走降级成功
ScenarioExpectation.maxP99Latency(10000), // P99不超过10秒(超时等待时间)
ScenarioExpectation.gracefulDegradation() // 超时要有友好提示
))
.build();
}
private ChaosScenario buildHighConcurrencyScenario() {
return ChaosScenario.builder()
.name("高并发压力测试")
.faultType(null) // 不注入故障,纯并发
.probability(0)
.sampleSize(500) // 快速发出500个请求
.testInput("什么是机器学习?")
.expectations(List.of(
ScenarioExpectation.minSuccessRate(0.99), // 纯并发下成功率要接近100%
ScenarioExpectation.maxP99Latency(8000)
))
.build();
}
private ChaosScenario buildCascadeFailureScenario() {
// 级联故障:先让向量库出错,看AI服务是否能降级
return ChaosScenario.builder()
.name("RAG组件失败降级测试")
.faultType(FaultType.VECTOR_STORE_ERROR)
.probability(1.0) // 100%失败
.sampleSize(50)
.testInput("根据公司文档,假期制度是什么?")
.expectations(List.of(
ScenarioExpectation.minSuccessRate(0.9), // 应该降级到无RAG模式
ScenarioExpectation.gracefulDegradation()
))
.build();
}
}让AI系统具备"被动免疫"能力
混沌工程不只是找bug,更重要的是让系统具备自愈能力。下面是几种AI系统特有的容错模式:
模型熔断器
@Component
@Slf4j
public class AICircuitBreaker {
private final CircuitBreakerRegistry circuitBreakerRegistry;
public AICircuitBreaker(CircuitBreakerRegistry circuitBreakerRegistry) {
this.circuitBreakerRegistry = circuitBreakerRegistry;
}
public String callWithFallback(String userId, String input,
ChatClient primaryClient,
ChatClient fallbackClient) {
CircuitBreaker circuitBreaker = circuitBreakerRegistry
.circuitBreaker("primary-model");
try {
return circuitBreaker.executeSupplier(() ->
primaryClient.prompt()
.user(input)
.call()
.content()
);
} catch (CallNotPermittedException e) {
// 熔断器打开,直接走降级
log.warn("主模型熔断器已打开,切换到备用模型");
return callFallback(userId, input, fallbackClient);
} catch (Exception e) {
log.error("主模型调用失败,切换到备用模型: {}", e.getMessage());
return callFallback(userId, input, fallbackClient);
}
}
private String callFallback(String userId, String input, ChatClient fallbackClient) {
try {
return fallbackClient.prompt()
.user(input)
.call()
.content();
} catch (Exception e) {
log.error("备用模型也失败了: {}", e.getMessage());
return "抱歉,AI服务暂时不可用,请稍后再试。如有紧急需求,请联系客服。";
}
}
}熔断器配置:
resilience4j:
circuitbreaker:
instances:
primary-model:
failure-rate-threshold: 50 # 50%失败率触发熔断
wait-duration-in-open-state: 30s # 熔断后等待30秒再尝试
sliding-window-size: 20 # 用最近20次调用计算失败率
minimum-number-of-calls: 5 # 至少5次调用才计算
permitted-number-of-calls-in-half-open-state: 3混沌实验的观测指标
跑完实验,要能看到清晰的报告:
=== 混沌实验报告 ===
运行时间: 2024-01-15 14:30:00
总体结论: 通过 (5/6)
场景1: 模型超时容错测试
状态: 通过
成功率: 48.2% (期望: >=40%)
P99延迟: 8,234ms (期望: <=10,000ms)
优雅降级: 是
发现: 超时后正确返回了友好提示
场景2: 空响应处理测试
状态: 失败 ⚠
成功率: 0% (期望: >=90%)
发现: 模型返回空字符串时下游发生NullPointerException
建议: 在ResponseParser中添加空值检查
场景3: 格式错误响应测试
状态: 通过
...
=== 需要修复的问题 ===
1. 高优先级: 空响应NPE(场景2)
2. 中优先级: 高并发下P99达到7.2秒,建议优化定期自动化运行
混沌实验不是一次性的,要集成到CI/CD流程里:
# .github/workflows/chaos-test.yml
name: Chaos Engineering Tests
on:
schedule:
- cron: '0 2 * * 1' # 每周一凌晨2点运行
workflow_dispatch: # 手动触发
jobs:
chaos-test:
runs-on: self-hosted # 需要连接到测试环境
steps:
- name: Run Chaos Tests
run: |
curl -X POST http://test-env/admin/chaos/run-suite \
-H "Authorization: Bearer ${{ secrets.CHAOS_TOKEN }}"
- name: Check Results
run: |
# 等待实验完成并获取报告
sleep 300
RESULT=$(curl -s http://test-env/admin/chaos/latest-report)
if echo $RESULT | grep -q '"overallPassed":false'; then
echo "混沌实验失败!"
exit 1
fi混沌工程不是什么高大上的东西,本质就是系统化地"找自己的茬"。AI系统因为有模型这个不确定的外部依赖,比普通微服务更需要这套方法论。我们现在每周跑一次完整的混沌实验套件,已经找到了7个生产前可能造成事故的问题,感觉很值。
