混沌工程在 AI 系统中的实践——主动找脆弱点
混沌工程在 AI 系统中的实践——主动找脆弱点
有一次我们做季度容灾演练,模拟 AI 推理服务不可用,看应用的降级逻辑是否正常。
切断推理服务之后,前端果然弹出了"服务暂时不可用,请稍后再试"——降级逻辑生效了,看起来没问题。
但有个同事突然说:等等,我刚才试了一下搜索功能,返回的是空列表,没有降级提示。
我们去查了一下,搜索功能用的是另一个推理服务的 Embedding 接口,那个接口超时了,但代码里没有对应的 fallback 逻辑,直接返回了空结果,没有任何错误提示,用户以为搜不到东西,但实际上是服务故障。
这个问题,如果不是当天演练,不知道要等到什么时候才能发现。
混沌工程的价值就在这里:主动制造故障,在生产出问题之前找到系统的脆弱点。
AI 系统特有的混沌实验
混沌工程不是新概念,Netflix 的 Chaos Monkey 十多年前就有了。但 AI 系统有一些普通微服务没有的特殊故障模式:
故障类型一:AI 服务高延迟。普通服务的 P99 延迟可能是 200ms,超过 1 秒就是明显异常。但 AI 推理服务的正常响应就可能要 5-10 秒,特别是流式输出的场景。高延迟如果处理不好,连接池很快就会打满,导致连锁故障。
故障类型二:响应截断。流式输出的场景中,服务可能在中途断开,只输出了一半内容。应用要能处理不完整的响应,不能直接把半截 JSON 或半截句子展示给用户。
故障类型三:Token 超限。用户输入了一段超长的文本,超过了模型的上下文窗口,API 返回 400 错误。应用如何优雅处理这种错误?是告知用户还是自动截断?
故障类型四:模型返回格式错误。我们依赖模型返回 JSON,但模型有时候会在 JSON 外面加一段自然语言("以下是 JSON 格式的结果:"),或者 JSON 里有转义问题导致解析失败。应用能否容忍解析失败?
故障类型五:AI 服务限流。OpenAI 的 Rate Limit 是真实存在的,特别是用量高峰期。突然大量请求进来,第一批可能成功,后续的请求被 429 限流,应用的重试策略和排队逻辑是否合理?
混沌实验的设计原则
在开始写代码之前,需要明确几个原则,否则混沌实验很容易变成"随机破坏":
原则一:先定义稳态假设。混沌实验的前提是你知道系统的正常行为是什么。比如"AI 服务不可用时,应用应该返回 503,并在 Response Header 里带降级标记"。如果你不知道正常行为是什么,注入故障之后也不知道结果对不对。
原则二:从测试环境开始,不要直接在生产做。除非你的系统设计允许(有流量镜像、有完善的监控),否则先在 staging 做混沌实验,观察结果,确认行为符合预期之后再考虑生产环境的小范围实验。
原则三:每次只注入一种故障。如果同时注入延迟 + 限流 + 截断响应,出了问题你不知道是哪个导致的。混沌实验要有控制变量的思维。
原则四:有快速终止机制。实验要能随时中止,特别是在生产环境。任何混沌实验都应该设置自动终止条件(比如错误率超过 5% 就停止)。
基于 Spring AOP 的 AI 故障注入
Spring AOP 是实现故障注入最干净的方式:不修改业务代码,通过切面在 AI 调用的前后注入各种故障行为,可以通过配置开关控制。
Step 1:定义故障注入配置
// 故障注入配置,通过 Feature Flag 或配置中心动态控制
@Data
@ConfigurationProperties(prefix = "chaos")
@Configuration
public class ChaosConfig {
// 全局开关,生产环境必须默认关闭
private boolean enabled = false;
// 各种故障的配置
private DelayConfig delay = new DelayConfig();
private TruncationConfig truncation = new TruncationConfig();
private TokenLimitConfig tokenLimit = new TokenLimitConfig();
private ErrorConfig error = new ErrorConfig();
private RateLimitConfig rateLimit = new RateLimitConfig();
@Data
public static class DelayConfig {
private boolean enabled = false;
// 注入延迟的概率(0.0 - 1.0)
private double probability = 0.1;
// 延迟时间范围(毫秒)
private int minDelayMs = 3000;
private int maxDelayMs = 10000;
}
@Data
public static class TruncationConfig {
private boolean enabled = false;
private double probability = 0.05;
// 截断位置(输出内容的比例,0.3 = 截断到 30%)
private double truncateAt = 0.3;
}
@Data
public static class TokenLimitConfig {
private boolean enabled = false;
private double probability = 0.1;
// 模拟 Token 超限,抛出 400 错误
}
@Data
public static class ErrorConfig {
private boolean enabled = false;
private double probability = 0.05;
// 模拟的错误类型:TIMEOUT, RATE_LIMIT, SERVICE_UNAVAILABLE
private String errorType = "TIMEOUT";
}
@Data
public static class RateLimitConfig {
private boolean enabled = false;
// 每分钟允许的最大请求数
private int maxRequestsPerMinute = 10;
}
}Step 2:实现故障注入切面
@Aspect
@Component
@Slf4j
@ConditionalOnProperty(name = "chaos.enabled", havingValue = "true")
public class AICallChaosAspect {
private final ChaosConfig chaosConfig;
private final MeterRegistry meterRegistry;
private final Random random = new Random();
// 限流计数器
private final RateLimiter rateLimiter;
public AICallChaosAspect(ChaosConfig chaosConfig, MeterRegistry meterRegistry) {
this.chaosConfig = chaosConfig;
this.meterRegistry = meterRegistry;
this.rateLimiter = RateLimiter.create(
chaosConfig.getRateLimit().getMaxRequestsPerMinute() / 60.0
);
}
// 拦截所有打了 @AIOperation 注解的方法
@Around("@annotation(aiOperation)")
public Object injectChaos(ProceedingJoinPoint joinPoint, AIOperation aiOperation) throws Throwable {
String operationType = aiOperation.value();
// 1. 延迟注入
if (shouldInject(chaosConfig.getDelay())) {
injectDelay(operationType);
}
// 2. Token 超限模拟
if (shouldInject(chaosConfig.getTokenLimit())) {
injectTokenLimitError(operationType);
}
// 3. 限流模拟
if (chaosConfig.getRateLimit().isEnabled()) {
if (!rateLimiter.tryAcquire()) {
injectRateLimitError(operationType);
}
}
// 4. 通用错误注入
if (shouldInject(chaosConfig.getError())) {
injectGenericError(operationType);
}
// 执行原方法
Object result = joinPoint.proceed();
// 5. 响应截断(在返回之前处理)
if (shouldInject(chaosConfig.getTruncation())) {
result = truncateResponse(result, operationType);
}
return result;
}
private boolean shouldInject(Object config) {
// 利用反射读取 enabled 和 probability 字段
try {
boolean enabled = (boolean) config.getClass().getMethod("isEnabled").invoke(config);
if (!enabled) return false;
double probability = (double) config.getClass().getMethod("getProbability").invoke(config);
boolean inject = random.nextDouble() < probability;
if (inject) {
log.warn("CHAOS: 触发 {} 故障注入 (probability={})",
config.getClass().getSimpleName(), probability);
}
return inject;
} catch (Exception e) {
return false;
}
}
private void injectDelay(String operationType) throws InterruptedException {
ChaosConfig.DelayConfig delayConfig = chaosConfig.getDelay();
int delay = delayConfig.getMinDelayMs() +
random.nextInt(delayConfig.getMaxDelayMs() - delayConfig.getMinDelayMs());
log.warn("CHAOS: 注入 {}ms 延迟到 AI 操作: {}", delay, operationType);
meterRegistry.counter("chaos.injected",
"type", "delay",
"operation", operationType
).increment();
Thread.sleep(delay);
}
private void injectTokenLimitError(String operationType) {
log.warn("CHAOS: 注入 Token 超限错误到 AI 操作: {}", operationType);
meterRegistry.counter("chaos.injected",
"type", "token_limit",
"operation", operationType
).increment();
// 模拟 OpenAI 的 Token 超限错误
throw new AITokenLimitException(
"This model's maximum context length is 8192 tokens. " +
"However, your messages resulted in 9500 tokens. " +
"Please reduce the length of your messages.",
400
);
}
private void injectRateLimitError(String operationType) {
log.warn("CHAOS: 注入限流错误到 AI 操作: {}", operationType);
meterRegistry.counter("chaos.injected",
"type", "rate_limit",
"operation", operationType
).increment();
throw new AIRateLimitException(
"Rate limit reached for model gpt-4o. " +
"Limit: 10 RPM. Please try again in 6 seconds.",
429
);
}
private void injectGenericError(String operationType) {
String errorType = chaosConfig.getError().getErrorType();
log.warn("CHAOS: 注入 {} 错误到 AI 操作: {}", errorType, operationType);
meterRegistry.counter("chaos.injected",
"type", errorType.toLowerCase(),
"operation", operationType
).increment();
switch (errorType) {
case "TIMEOUT" -> throw new AITimeoutException("AI service timeout after 30000ms");
case "SERVICE_UNAVAILABLE" -> throw new AIServiceUnavailableException("AI service is unavailable");
case "NETWORK_ERROR" -> throw new AINetworkException("Connection reset by peer");
default -> throw new RuntimeException("CHAOS: Unknown error type: " + errorType);
}
}
private Object truncateResponse(Object result, String operationType) {
if (!(result instanceof String response)) {
return result;
}
double truncateAt = chaosConfig.getTruncation().getTruncateAt();
int truncateIndex = (int) (response.length() * truncateAt);
String truncated = response.substring(0, truncateIndex);
log.warn("CHAOS: 截断 AI 响应,原长度: {}, 截断后: {}, 操作: {}",
response.length(), truncated.length(), operationType);
meterRegistry.counter("chaos.injected",
"type", "truncation",
"operation", operationType
).increment();
return truncated;
}
}Step 3:在业务代码上打注解
@Service
public class AIDocAnalysisService {
private final ChatClient chatClient;
@AIOperation("doc-analysis") // 打上注解,切面就会拦截这个方法
public DocAnalysisResult analyzeDocument(String document) {
String response = chatClient.prompt()
.system("你是文档分析助手,请以 JSON 格式输出分析结果")
.user("分析以下文档:\n" + document)
.call()
.content();
// 注意:这里要有健壮的解析逻辑
// 如果混沌注入了截断响应,这里会收到不完整的 JSON
return parseWithFallback(response);
}
private DocAnalysisResult parseWithFallback(String response) {
try {
return objectMapper.readValue(response, DocAnalysisResult.class);
} catch (JsonProcessingException e) {
log.error("Failed to parse AI response, using fallback: {}", response);
// 降级:返回一个空的分析结果,不是 throw exception
return DocAnalysisResult.empty()
.withErrorMessage("分析结果解析失败,请重试");
}
}
}混沌实验的观测
注入了故障,需要同步观测系统的行为。核心指标:
// 混沌实验期间的观测指标
@Component
public class ChaosObserver {
private final MeterRegistry meterRegistry;
// 记录应用对各种故障的响应
public void recordFaultResponse(String faultType, String responseType, long latencyMs) {
// responseType: degraded/error/success(正常降级/报错/成功恢复)
Timer.builder("chaos.response.latency")
.description("故障注入后的响应延迟")
.tags("fault_type", faultType, "response_type", responseType)
.register(meterRegistry)
.record(latencyMs, TimeUnit.MILLISECONDS);
meterRegistry.counter("chaos.response.count",
"fault_type", faultType,
"response_type", responseType
).increment();
}
}混沌实验流程
我们发现的真实脆弱点
做了混沌实验之后,我们找到了几个不做实验不会发现的问题:
脆弱点一:Embedding 服务超时没有降级。前面提到的那个搜索功能,Embedding 超时后直接返回空列表,用户无感知。修复方案:加了 try-catch 和 fallback 逻辑,超时后返回"搜索暂时不可用"的明确提示。
脆弱点二:截断响应导致 JSON 解析崩溃。我们有几个地方直接 objectMapper.readValue() 没有 try-catch,流式响应被截断后,应用直接 500。修复方案:统一封装了 parseWithFallback,解析失败走降级逻辑。
脆弱点三:Token 超限错误没有用户友好提示。用户粘贴了一篇很长的文档,Token 超限,应用返回了内部错误码。修复方案:捕获 400 Token Limit 错误,提示用户"文档过长,请拆分后重试,建议每次不超过 3000 字"。
脆弱点四:限流错误的重试风暴。429 限流后,我们的代码立刻重试,结果重试又触发限流,形成循环。修复方案:加了指数退避,429 后等 6 秒再重试,最多重试 3 次。
总结
混沌工程的核心不是破坏,而是在可控条件下暴露问题,而不是等到生产事故时才发现。
AI 系统特有的混沌实验维度:高延迟注入、响应截断、Token 超限模拟、限流模拟。这些场景在生产中都会发生,只是频率不高,容易被忽视。
Spring AOP 的故障注入方案侵入性很低,不需要修改业务代码,通过配置开关控制,可以做到精准的概率化注入。配合 Prometheus 指标观测,可以量化应用在故障场景下的降级效果。
建议每季度做一次混沌演练,把 AI 服务的各类故障场景都过一遍。第一次做往往会有很多惊喜(都是坏的)。
