第1904篇:Spring AI的Advisor链高级组合——构建多层拦截与增强的处理管道
第1904篇:Spring AI的Advisor链高级组合——构建多层拦截与增强的处理管道
你有没有遇到过这种情况:AI 服务上线了,产品说要加日志,加了;然后说要加内容审核,加了;然后说要加缓存,加了;然后说要加限流,加了……最后你的 Service 方法里充斥着各种横切逻辑,AI 调用本身反而淹没在一堆前置后置处理里。
Spring AI 的 Advisor 机制就是专门解决这个问题的。它的设计思路和 Spring AOP 一脉相承:把横切关注点从业务逻辑里剥离出来,每个 Advisor 只做一件事,通过链式组合形成处理管道。
这篇文章从 Advisor 的原理讲起,重点放在如何合理组合多个 Advisor,以及自定义 Advisor 的实现技巧。
Advisor 的工作原理
先看核心接口。Spring AI 的 Advisor 分为两类:
// 处理完整请求-响应的 Advisor(适合需要修改 prompt 或后处理结果的场景)
public interface CallAroundAdvisor extends Ordered {
AdvisedResponse aroundCall(AdvisedRequest request, CallAroundAdvisorChain chain);
String getName();
}
// 处理流式响应的 Advisor
public interface StreamAroundAdvisor extends Ordered {
Flux<AdvisedResponse> aroundStream(AdvisedRequest request, StreamAroundAdvisorChain chain);
String getName();
}AdvisedRequest 包含了发给 LLM 的所有信息:用户消息、系统提示、工具定义、调用参数等。AdvisedResponse 包含了 LLM 的响应以及上下文信息。
链式调用的机制类似责任链模式:每个 Advisor 在 aroundCall 里先做前置处理,然后调用 chain.nextAroundCall(request) 触发下一个 Advisor(或者最终的 LLM 调用),再对返回结果做后置处理。
@Override
public AdvisedResponse aroundCall(AdvisedRequest request, CallAroundAdvisorChain chain) {
// === 前置处理 ===
AdvisedRequest modifiedRequest = doPreProcess(request);
// 触发链中的下一个 Advisor(或最终 LLM 调用)
AdvisedResponse response = chain.nextAroundCall(modifiedRequest);
// === 后置处理 ===
AdvisedResponse modifiedResponse = doPostProcess(response);
return modifiedResponse;
}Advisor 的执行顺序
这是最容易踩坑的地方。Advisor 的执行顺序由 getOrder() 返回值决定,数值越小越先执行(外层),越大越后执行(内层,离 LLM 更近)。
想象成洋葱模型:外层 Advisor 先执行前置逻辑,内层 Advisor 后执行前置逻辑;在 LLM 返回后,内层先执行后置逻辑,外层后执行后置逻辑。
这个顺序设计有其内在逻辑:
- 限流要最外层,不通过就直接拒绝,不消耗后续资源
- 审计要记录完整的请求和响应
- 内容安全要在数据到达 LLM 之前检查,以及在响应返回之后过滤
- 缓存在 RAG 和记忆之前,缓存命中就不需要做这些开销大的操作
- RAG 和记忆要紧邻 LLM,因为它们直接修改 prompt
实现一个生产级的 Advisor 套件
下面我来实现几个在生产环境里真正有用的 Advisor。
1. 限流 Advisor
@Component
public class RateLimitAdvisor implements CallAroundAdvisor, StreamAroundAdvisor {
private static final int ORDER = 100;
@Autowired
private RateLimiterRegistry rateLimiterRegistry;
@Autowired
private UserContextService userContext;
@Override
public AdvisedResponse aroundCall(AdvisedRequest request, CallAroundAdvisorChain chain) {
String userId = userContext.getCurrentUserId();
RateLimiter limiter = rateLimiterRegistry.rateLimiter("ai-chat-" + userId);
if (!limiter.acquirePermission()) {
throw new RateLimitExceededException(
"AI 请求过于频繁,请稍后再试。每分钟最多 10 次请求。");
}
return chain.nextAroundCall(request);
}
@Override
public Flux<AdvisedResponse> aroundStream(AdvisedRequest request,
StreamAroundAdvisorChain chain) {
String userId = userContext.getCurrentUserId();
RateLimiter limiter = rateLimiterRegistry.rateLimiter("ai-stream-" + userId);
if (!limiter.acquirePermission()) {
return Flux.error(new RateLimitExceededException("请求过于频繁"));
}
return chain.nextAroundStream(request);
}
@Override
public String getName() { return "RateLimitAdvisor"; }
@Override
public int getOrder() { return ORDER; }
}2. 审计日志 Advisor
@Component
public class AuditAdvisor implements CallAroundAdvisor {
private static final int ORDER = 200;
@Autowired
private AuditLogService auditLogService;
@Override
public AdvisedResponse aroundCall(AdvisedRequest request, CallAroundAdvisorChain chain) {
String requestId = UUID.randomUUID().toString();
long startTime = System.currentTimeMillis();
// 记录请求
AuditLogEntry requestEntry = AuditLogEntry.builder()
.requestId(requestId)
.userId(UserContext.getCurrentUserId())
.sessionId(getSessionId(request))
.userMessage(extractUserMessage(request))
.timestamp(Instant.now())
.type(AuditType.REQUEST)
.build();
auditLogService.save(requestEntry);
// 把 requestId 传递给后续 Advisor
AdvisedRequest enrichedRequest = request.mutate()
.adviseContext(Map.of("auditRequestId", requestId))
.build();
try {
AdvisedResponse response = chain.nextAroundCall(enrichedRequest);
long duration = System.currentTimeMillis() - startTime;
// 记录成功响应
AuditLogEntry responseEntry = AuditLogEntry.builder()
.requestId(requestId)
.aiResponse(response.response().getResult().getOutput().getContent())
.durationMs(duration)
.tokensUsed(extractTokenUsage(response))
.type(AuditType.RESPONSE)
.build();
auditLogService.save(responseEntry);
return response;
} catch (Exception e) {
long duration = System.currentTimeMillis() - startTime;
// 记录失败
AuditLogEntry errorEntry = AuditLogEntry.builder()
.requestId(requestId)
.errorMessage(e.getMessage())
.durationMs(duration)
.type(AuditType.ERROR)
.build();
auditLogService.save(errorEntry);
throw e;
}
}
private String getSessionId(AdvisedRequest request) {
return (String) request.adviseContext().get(
AbstractChatMemoryAdvisor.CHAT_MEMORY_CONVERSATION_ID_KEY);
}
private String extractUserMessage(AdvisedRequest request) {
return request.userText();
}
private Integer extractTokenUsage(AdvisedResponse response) {
Usage usage = response.response().getMetadata().getUsage();
return usage != null ? usage.getTotalTokens() : null;
}
@Override
public String getName() { return "AuditAdvisor"; }
@Override
public int getOrder() { return ORDER; }
}3. 响应缓存 Advisor
@Component
public class ResponseCacheAdvisor implements CallAroundAdvisor {
private static final int ORDER = 400;
@Autowired
private CacheManager cacheManager;
// 只缓存这些场景类型的响应
private static final Set<String> CACHEABLE_SCENE_TYPES =
Set.of("FAQ", "KNOWLEDGE_QUERY", "DOCUMENT_SUMMARY");
@Override
public AdvisedResponse aroundCall(AdvisedRequest request, CallAroundAdvisorChain chain) {
String sceneType = (String) request.adviseContext().get("sceneType");
// 非可缓存场景直接跳过
if (!CACHEABLE_SCENE_TYPES.contains(sceneType)) {
return chain.nextAroundCall(request);
}
String cacheKey = buildCacheKey(request);
Cache cache = cacheManager.getCache("ai-responses");
// 查询缓存
if (cache != null) {
Cache.ValueWrapper cached = cache.get(cacheKey);
if (cached != null) {
log.debug("缓存命中,key: {}", cacheKey);
return (AdvisedResponse) cached.get();
}
}
// 缓存未命中,正常调用
AdvisedResponse response = chain.nextAroundCall(request);
// 写入缓存(只缓存成功的响应)
if (cache != null && isSuccessResponse(response)) {
cache.put(cacheKey, response);
}
return response;
}
private String buildCacheKey(AdvisedRequest request) {
// 用用户消息的 MD5 作为缓存 key
String userMsg = request.userText();
String systemMsg = request.systemText();
return DigestUtils.md5DigestAsHex(
(systemMsg + "||" + userMsg).getBytes(StandardCharsets.UTF_8)
);
}
private boolean isSuccessResponse(AdvisedResponse response) {
return response.response() != null
&& response.response().getResult() != null
&& response.response().getResult().getOutput() != null;
}
@Override
public String getName() { return "ResponseCacheAdvisor"; }
@Override
public int getOrder() { return ORDER; }
}4. 内容安全 Advisor
@Component
public class ContentSafetyAdvisor implements CallAroundAdvisor {
private static final int ORDER = 300;
@Autowired
private ContentModerationService moderationService;
@Override
public AdvisedResponse aroundCall(AdvisedRequest request, CallAroundAdvisorChain chain) {
// 前置:检查用户输入
String userInput = request.userText();
ModerationResult inputCheck = moderationService.check(userInput);
if (inputCheck.isViolation()) {
log.warn("用户输入违规: userId={}, category={}",
UserContext.getCurrentUserId(), inputCheck.getCategory());
throw new ContentViolationException(
"您的输入包含不当内容,请修改后重试。类别:" + inputCheck.getCategory());
}
AdvisedResponse response = chain.nextAroundCall(request);
// 后置:检查 AI 输出
String aiOutput = response.response().getResult().getOutput().getContent();
ModerationResult outputCheck = moderationService.check(aiOutput);
if (outputCheck.isViolation()) {
log.error("AI 输出违规!已拦截。category={}", outputCheck.getCategory());
// 返回替换后的安全响应
return buildSafeResponse(response, "抱歉,我暂时无法回答这个问题。");
}
return response;
}
private AdvisedResponse buildSafeResponse(AdvisedResponse original, String safeContent) {
// 构造一个替换内容的响应对象
AssistantMessage safeMessage = new AssistantMessage(safeContent);
ChatResponse safeResponse = new ChatResponse(
List.of(new Generation(safeMessage)),
original.response().getMetadata()
);
return new AdvisedResponse(safeResponse, original.adviseContext());
}
@Override
public String getName() { return "ContentSafetyAdvisor"; }
@Override
public int getOrder() { return ORDER; }
}Advisor 的动态装配
不同接口、不同用户需要不同的 Advisor 组合。与其硬编码,不如实现动态装配:
@Service
public class ChatClientFactory {
@Autowired
private ChatClient.Builder baseBuilder;
@Autowired
private List<CallAroundAdvisor> allAdvisors;
@Autowired
private ChatMemoryRepository memoryRepository;
/**
* 根据场景类型创建适配的 ChatClient
*/
public ChatClient createForScene(String sceneType, String tenantId) {
List<Advisor> advisors = new ArrayList<>();
// 所有场景都有限流和审计
advisors.add(new RateLimitAdvisor(getRateLimitConfig(sceneType, tenantId)));
advisors.add(new AuditAdvisor(auditLogService));
// 面向外部用户的场景加内容安全
if (isExternalScene(sceneType)) {
advisors.add(new ContentSafetyAdvisor(moderationService));
}
// FAQ 和知识查询场景加缓存
if (Set.of("FAQ", "KNOWLEDGE_QUERY").contains(sceneType)) {
advisors.add(new ResponseCacheAdvisor(cacheManager));
}
// 知识库相关场景加 RAG
if (Set.of("KNOWLEDGE_QUERY", "DOCUMENT_SUMMARY").contains(sceneType)) {
VectorStore vectorStore = getVectorStore(tenantId);
advisors.add(new QuestionAnswerAdvisor(vectorStore));
}
// 有对话上下文的场景加记忆
if (needsMemory(sceneType)) {
ChatMemory memory = memoryRepository.getOrCreate(sceneType);
advisors.add(new MessageChatMemoryAdvisor(memory));
}
// 按 order 排序确保顺序正确
advisors.sort(Comparator.comparingInt(a -> {
if (a instanceof Ordered) return ((Ordered) a).getOrder();
return Ordered.LOWEST_PRECEDENCE;
}));
return baseBuilder
.defaultAdvisors(advisors)
.defaultSystem(getSystemPrompt(sceneType))
.build();
}
}自定义 Advisor 的上下文传递
Advisor 之间可以通过 adviseContext 传递信息,这是一个 Map<String, Object>,在整个请求生命周期内共享:
@Component
public class UserContextEnrichAdvisor implements CallAroundAdvisor {
@Override
public AdvisedResponse aroundCall(AdvisedRequest request, CallAroundAdvisorChain chain) {
UserInfo user = UserContext.getCurrentUser();
// 把用户信息放入上下文,供后续 Advisor 使用
Map<String, Object> enrichedContext = new HashMap<>(request.adviseContext());
enrichedContext.put("userId", user.getId());
enrichedContext.put("userRole", user.getRole());
enrichedContext.put("tenantId", user.getTenantId());
enrichedContext.put("userLanguage", user.getPreferredLanguage());
// 根据用户语言修改系统提示
String systemPrompt = request.systemText();
if ("zh".equals(user.getPreferredLanguage())) {
systemPrompt = systemPrompt + "\n请用中文回答。";
}
AdvisedRequest enrichedRequest = AdvisedRequest.from(request)
.withAdviseContext(enrichedContext)
.withSystemText(systemPrompt)
.build();
return chain.nextAroundCall(enrichedRequest);
}
@Override
public String getName() { return "UserContextEnrichAdvisor"; }
@Override
public int getOrder() { return 50; } // 最外层,先丰富上下文
}在后续 Advisor 里读取这些上下文:
// 在 AuditAdvisor 里读取 userId
String userId = (String) request.adviseContext().get("userId");
// 在 RateLimitAdvisor 里读取 tenantId
String tenantId = (String) request.adviseContext().get("tenantId");踩坑总结
坑1:忘记实现 StreamAroundAdvisor
很多人只实现了 CallAroundAdvisor,结果流式接口完全绕过了这个 Advisor。如果你的 AI 服务同时支持同步和流式,Advisor 一般都要同时实现两个接口,或者在 aroundStream 里做同等处理。
坑2:Advisor 里抛异常的处理
前置处理里抛出的异常会正确传递给调用方。但如果你在后置处理(chain.nextAroundCall 返回之后)抛出异常,LLM 已经调用完成了,token 已经消耗了,缓存可能已经操作了,日志可能已经写了一半。要特别注意后置处理的异常安全性。
坑3:AdvisedRequest 的不可变性
AdvisedRequest 是不可变对象,修改时要用 .mutate() 或 AdvisedRequest.from() 创建新实例。直接修改原对象的操作在编译期就会报错,但一些同学会把原对象传给 chain.nextAroundCall,以为自己已经修改了……其实没有。
坑4:并发场景下的状态共享
如果你的 Advisor 是 Spring @Component(单例),里面不能有请求级别的状态。我见过有人在 Advisor 字段里存了一个 requestId,并发时所有请求共用一个 requestId,出了 bug 排查了半天。请求级状态要用 ThreadLocal 或者通过 adviseContext 传递。
小结
Spring AI 的 Advisor 链是一个很优雅的设计,把 AI 应用里的横切关注点(限流、审计、安全、缓存)从业务逻辑里分离出来,各自独立可测可复用。
核心要掌握的点:
- 执行顺序由
getOrder()决定,小数值在外层 - 通过
adviseContext在 Advisor 间传递上下文 - 同步和流式分别实现,别漏掉流式那个接口
- Advisor 单例不存可变状态,请求级状态通过
adviseContext传递
