Spring AI源码解析:ChatClient的内部实现原理
Spring AI源码解析:ChatClient的内部实现原理
那个让他抓狂的Bug
2025年3月的一个下午,高级工程师张伟遇到了一个让他连续查了两天的Bug。
他们的智能问答系统有一个奇怪的现象:同一个用户的两次对话之间,有时候会"串记忆"——用户A提问后,用户B的回答里出现了用户A问题的内容片段。这在智能客服场景里是严重问题,因为可能包含一个用户的订单信息出现在另一个用户的回答里。
张伟排查了一天,加了各种日志,怀疑过Prompt构建逻辑、怀疑过缓存实现,但就是找不到根因。
第二天,他决定深入Spring AI的源码,找ChatMemory相关的实现看看。
20分钟后,他找到了问题:
// 他的代码
@Bean
public ChatClient chatClient(ChatClient.Builder builder) {
return builder
.defaultAdvisors(new MessageChatMemoryAdvisor(chatMemory))
.build();
}看起来没问题,但他的chatMemory是一个InMemoryChatMemory Bean,被注入为单例(Singleton)。所有用户的聊天记忆都存在同一个Map里,key是conversationId。
问题不在这里,因为每个会话有独立的key。
他继续往下看MessageChatMemoryAdvisor的源码……
// Spring AI源码中的关键代码
public class MessageChatMemoryAdvisor implements RequestResponseAdvisor {
private static final String DEFAULT_CONVERSATION_ID = "default";
// 注意这里:如果调用时没有传conversationId,默认值是"default"他猛地明白了:他的调用代码里,没有在每个请求中传入conversationId,导致所有用户的对话都使用了同一个"default"会话ID!
// 错误的调用(没有传conversationId)
chatClient.prompt()
.user(userMessage)
.call()
.content();
// 正确的调用
chatClient.prompt()
.user(userMessage)
.advisors(spec -> spec.param(
AbstractChatMemoryAdvisor.CHAT_MEMORY_CONVERSATION_ID_KEY,
sessionId // 每个用户会话的唯一ID
))
.call()
.content();Bug找到了,修复只需要一行代码。但如果没有看源码,不知道这个默认行为,可能会查很久。
这就是为什么源码阅读能力对AI工程师至关重要。
ChatClient的整体架构
在深入细节之前,先理解ChatClient的全局设计。
ChatClient.Builder:建造者模式的精妙实现
源码定位
ChatClient的Builder在Spring AI 1.0中定义在org.springframework.ai.chat.client.ChatClient接口内:
// 源码位置:spring-ai-core/src/main/java/org/springframework/ai/chat/client/ChatClient.java
public interface ChatClient {
/**
* 静态工厂方法:创建Builder
*/
static Builder builder(ChatModel chatModel) {
return new DefaultChatClientBuilder(chatModel);
}
/**
* Builder接口
*/
interface Builder {
// 设置默认系统提示
Builder defaultSystem(String text);
Builder defaultSystem(Resource text);
Builder defaultSystem(Consumer<PromptSystemSpec> systemSpecConsumer);
// 设置默认用户消息
Builder defaultUser(String text);
Builder defaultUser(Consumer<PromptUserSpec> userSpecConsumer);
// 添加默认Advisor(关键方法)
Builder defaultAdvisors(Advisor... advisors);
Builder defaultAdvisors(List<Advisor> advisors);
Builder defaultAdvisors(Consumer<AdvisorSpec> advisorSpecConsumer);
// 设置默认ChatOptions(模型参数)
Builder defaultOptions(ChatOptions chatOptions);
// 构建最终的ChatClient实例
ChatClient build();
}DefaultChatClientBuilder的核心实现
// 源码:spring-ai-core/.../DefaultChatClientBuilder.java(简化版,保留关键逻辑)
public class DefaultChatClientBuilder implements ChatClient.Builder {
private final ChatModel chatModel;
// 默认配置存储
private String defaultSystemText;
private Map<String, Object> defaultSystemParams = new HashMap<>();
private List<Advisor> defaultAdvisors = new ArrayList<>();
private ChatOptions defaultOptions;
private List<Message> defaultMessages = new ArrayList<>();
public DefaultChatClientBuilder(ChatModel chatModel) {
Assert.notNull(chatModel, "ChatModel must not be null");
this.chatModel = chatModel;
}
@Override
public Builder defaultAdvisors(Advisor... advisors) {
// 关键点:Advisor按顺序存储,执行顺序由Order接口决定
this.defaultAdvisors.addAll(Arrays.asList(advisors));
return this;
}
@Override
public ChatClient build() {
// 创建DefaultChatClient,传入所有配置
return new DefaultChatClient(
this.chatModel,
this.defaultSystemText,
this.defaultSystemParams,
Collections.unmodifiableList(this.defaultAdvisors),
this.defaultOptions,
this.defaultMessages
);
}
}关键设计点:默认值 vs 运行时覆盖
/**
* 理解Builder的默认值覆盖机制
*
* 优先级:运行时设置 > Builder默认值
*/
@Configuration
public class ChatClientConfig {
@Bean
public ChatClient chatClient(ChatClient.Builder builder) {
return builder
// 所有请求都用gpt-4o-mini
.defaultOptions(OpenAiChatOptions.builder()
.withModel("gpt-4o-mini")
.withTemperature(0.7f)
.build())
// 所有请求都添加记忆Advisor
.defaultAdvisors(new MessageChatMemoryAdvisor(chatMemory()))
// 所有请求的默认系统提示
.defaultSystem("你是一个专业的电商客服助手。")
.build();
}
// 使用时:
public void callExample(ChatClient chatClient) {
// 使用默认配置(gpt-4o-mini,默认系统提示)
String response1 = chatClient.prompt()
.user("如何退款?")
.call().content();
// 运行时覆盖:使用gpt-4o,覆盖温度参数
String response2 = chatClient.prompt()
.options(OpenAiChatOptions.builder()
.withModel("gpt-4o") // 覆盖模型
.withTemperature(0.1f) // 覆盖温度
.build())
.system("你是一个严谨的法律顾问。") // 覆盖系统提示
.user("分析这份合同的风险。")
.call().content();
}
}Advisor链:责任链模式的精华实现
Advisor链是Spring AI最优雅的设计之一,理解它是定制Spring AI行为的关键。
Advisor接口定义
// 源码:RequestResponseAdvisor.java
public interface RequestResponseAdvisor extends Ordered {
/**
* 前置处理:在发送给LLM之前
* 可以修改Prompt、注入上下文、记录日志等
*
* @param request 原始请求
* @param chain 责任链(调用chain.nextAroundCall()传递给下一个Advisor)
* @return 处理后的响应(可以是修改过的)
*/
AdvisedResponse aroundCall(AdvisedRequest request, CallAroundAdvisorChain chain);
/**
* 流式版本的Advisor
*/
Flux<AdvisedResponse> aroundStream(AdvisedRequest request, StreamAroundAdvisorChain chain);
/**
* Advisor的执行顺序(数字越小越先执行)
* 默认实现:Integer.MAX_VALUE(最后执行)
*/
default int getOrder() {
return Integer.MAX_VALUE;
}
}责任链的执行流程(源码分析)
/**
* DefaultChatClient中Advisor链执行机制的关键代码(简化分析)
*
* 执行顺序示例(3个Advisor,order值分别为100, 200, 300):
*
* 请求阶段(前置):
* Advisor-100.before → Advisor-200.before → Advisor-300.before → ChatModel
*
* 响应阶段(后置,逆序):
* ChatModel → Advisor-300.after → Advisor-200.after → Advisor-100.after
*/
public class AdvisorChainDemo {
/**
* 模拟责任链执行逻辑(便于理解,非真实源码)
*/
static AdvisedResponse executeChain(AdvisedRequest request,
List<RequestResponseAdvisor> advisors,
ChatModel chatModel) {
// 按order排序
List<RequestResponseAdvisor> sortedAdvisors = advisors.stream()
.sorted(Comparator.comparing(Ordered::getOrder))
.toList();
// 创建迭代器(关键:使用迭代器实现"调用链")
Iterator<RequestResponseAdvisor> iterator = sortedAdvisors.iterator();
// 递归/迭代构建调用链
CallAroundAdvisorChain chain = new DefaultCallAroundAdvisorChain(
sortedAdvisors, chatModel);
// 从第一个Advisor开始执行
if (iterator.hasNext()) {
return iterator.next().aroundCall(request, chain);
}
// 没有Advisor:直接调用模型
return callModel(request, chatModel);
}
}DefaultCallAroundAdvisorChain:链的内部实现
/**
* 责任链的内部实现(基于Spring AI 1.0源码分析)
*
* 关键机制:使用AtomicInteger作为指针,每次调用nextAroundCall()
* 移动到下一个Advisor,直到没有更多Advisor时调用实际的ChatModel
*/
public class DefaultCallAroundAdvisorChain implements CallAroundAdvisorChain {
private final AtomicInteger currentAdvisorIndex;
private final List<RequestResponseAdvisor> advisors;
private final ChatModel chatModel;
DefaultCallAroundAdvisorChain(List<RequestResponseAdvisor> advisors,
ChatModel chatModel) {
this.advisors = advisors;
this.chatModel = chatModel;
this.currentAdvisorIndex = new AtomicInteger(0);
}
@Override
public AdvisedResponse nextAroundCall(AdvisedRequest advisedRequest) {
int index = currentAdvisorIndex.getAndIncrement();
if (index < advisors.size()) {
// 还有Advisor:调用下一个
return advisors.get(index).aroundCall(advisedRequest, this);
} else {
// 所有Advisor执行完:调用实际模型
return callChatModel(advisedRequest);
}
}
private AdvisedResponse callChatModel(AdvisedRequest advisedRequest) {
Prompt prompt = advisedRequest.toPrompt();
ChatResponse response = chatModel.call(prompt);
return new AdvisedResponse(response, advisedRequest.adviseContext());
}
}每个内置Advisor的源码解析
MessageChatMemoryAdvisor(最常用):
/**
* MessageChatMemoryAdvisor源码关键部分分析
*
* 功能:在每次请求前注入历史对话,请求后保存新对话
*/
public class MessageChatMemoryAdvisor implements RequestResponseAdvisor {
public static final String CHAT_MEMORY_CONVERSATION_ID_KEY =
"chat_memory_conversation_id";
public static final String CHAT_MEMORY_RETRIEVE_SIZE_KEY =
"chat_memory_retrieve_size";
private static final String DEFAULT_CONVERSATION_ID = "default";
// 注意:这就是张伟遇到的Bug的根源!
private final ChatMemory chatMemory;
private final int defaultRetrieveSize;
@Override
public AdvisedResponse aroundCall(AdvisedRequest advisedRequest,
CallAroundAdvisorChain chain) {
// 1. 获取会话ID(从请求参数中,没有则使用默认值"default")
String conversationId = (String) advisedRequest.adviseContext()
.getOrDefault(CHAT_MEMORY_CONVERSATION_ID_KEY, DEFAULT_CONVERSATION_ID);
// 2. 从ChatMemory获取历史对话
List<Message> memoryMessages = chatMemory.get(conversationId, defaultRetrieveSize);
// 3. 将历史消息注入到请求中(放在系统消息之后,用户消息之前)
AdvisedRequest processedRequest = AdvisedRequest.from(advisedRequest)
.withMessages(buildMessagesWithMemory(advisedRequest.messages(), memoryMessages))
.build();
// 4. 调用链中的下一个处理器
AdvisedResponse response = chain.nextAroundCall(processedRequest);
// 5. 后置处理:将新的用户消息和AI回复保存到ChatMemory
chatMemory.add(conversationId, advisedRequest.messages()); // 保存用户消息
chatMemory.add(conversationId, List.of(
new AssistantMessage(response.response().getResult().getOutput().getContent())
)); // 保存AI回复
return response;
}
private List<Message> buildMessagesWithMemory(List<Message> currentMessages,
List<Message> memoryMessages) {
List<Message> combined = new ArrayList<>();
// 历史消息放前面(作为上下文)
combined.addAll(memoryMessages);
// 当前请求的消息放后面
combined.addAll(currentMessages);
return combined;
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE + 1; // 早期执行,确保历史消息先注入
}
}QuestionAnswerAdvisor(RAG核心):
/**
* QuestionAnswerAdvisor源码分析
*
* 这是Spring AI内置的RAG Advisor
* 自动从VectorStore检索相关文档,注入到Prompt
*/
public class QuestionAnswerAdvisor implements RequestResponseAdvisor {
private static final String DEFAULT_USER_TEXT_ADVISE = """
Context information is below.
---------------------
{question_answer_context}
---------------------
Given the context and provided history information and not prior knowledge,
reply to the user comment. If the answer is not in the context, inform
the user that you can't answer the question.
""";
private final VectorStore vectorStore;
private final SearchRequest searchRequest;
private final String userTextAdvise;
@Override
public AdvisedResponse aroundCall(AdvisedRequest advisedRequest,
CallAroundAdvisorChain chain) {
// 1. 获取用户的原始问题
String userQuery = extractUserQuery(advisedRequest);
// 2. 向量搜索:找相关文档
SearchRequest queryRequest = SearchRequest.from(this.searchRequest)
.withQuery(userQuery)
.build();
List<Document> documents = vectorStore.similaritySearch(queryRequest);
// 3. 构建上下文文本
String documentContext = documents.stream()
.map(Document::getContent)
.collect(Collectors.joining("\n"));
// 4. 将文档上下文注入到Prompt变量
Map<String, Object> advisorContext = new HashMap<>(advisedRequest.adviseContext());
advisorContext.put("question_answer_context", documentContext);
// 5. 修改用户消息:在问题前添加检索到的上下文
AdvisedRequest processedRequest = AdvisedRequest.from(advisedRequest)
.withAdviseContext(advisorContext)
.build();
// 6. 继续链调用
AdvisedResponse response = chain.nextAroundCall(processedRequest);
// 7. 在响应元数据中记录使用了哪些文档(可追溯)
Map<String, Object> responseContext = new HashMap<>(response.adviseContext());
responseContext.put("retrieved_documents", documents);
return new AdvisedResponse(response.response(), responseContext);
}
}MessageConverter:请求转换的内部机制
每家LLM提供商的API格式不同,MessageConverter负责把Spring AI的统一消息格式转换成各厂商的格式。
/**
* MessageConverter的设计:策略模式
*
* 统一格式(Spring AI) → 各厂商格式(OpenAI/Anthropic等)
*/
// Spring AI统一消息格式
// UserMessage / SystemMessage / AssistantMessage / ToolResponseMessage
// OpenAI格式(通过OpenAiApi.ChatCompletionMessage)
// {"role": "user", "content": "..."}
// Anthropic格式(通过AnthropicApi.ContentBlock)
// {"type": "text", "text": "..."}/**
* OpenAI ChatModel中的转换逻辑(源码关键部分)
*
* 完整源码:spring-ai-openai/src/main/java/org/springframework/ai/openai/OpenAiChatModel.java
*/
public class OpenAiChatModelConversion {
/**
* 将Spring AI统一消息格式 → OpenAI API消息格式
*/
public List<OpenAiApi.ChatCompletionMessage> toOpenAiMessages(List<Message> messages) {
return messages.stream()
.map(this::toOpenAiMessage)
.toList();
}
private OpenAiApi.ChatCompletionMessage toOpenAiMessage(Message message) {
if (message instanceof SystemMessage systemMsg) {
return new OpenAiApi.ChatCompletionMessage(
systemMsg.getContent(),
OpenAiApi.ChatCompletionMessage.Role.SYSTEM
);
} else if (message instanceof UserMessage userMsg) {
// 用户消息可能包含图片(多模态)
if (userMsg.getMedia() != null && !userMsg.getMedia().isEmpty()) {
// 多模态:文本+图片
List<OpenAiApi.ChatCompletionMessage.MediaContent> content =
buildMultiModalContent(userMsg);
return new OpenAiApi.ChatCompletionMessage(
content,
OpenAiApi.ChatCompletionMessage.Role.USER
);
}
return new OpenAiApi.ChatCompletionMessage(
userMsg.getContent(),
OpenAiApi.ChatCompletionMessage.Role.USER
);
} else if (message instanceof AssistantMessage assistantMsg) {
return new OpenAiApi.ChatCompletionMessage(
assistantMsg.getContent(),
OpenAiApi.ChatCompletionMessage.Role.ASSISTANT
);
} else if (message instanceof ToolResponseMessage toolMsg) {
// 函数调用的结果
return new OpenAiApi.ChatCompletionMessage(
toolMsg.getContent(),
OpenAiApi.ChatCompletionMessage.Role.TOOL
);
}
throw new IllegalArgumentException("Unknown message type: " + message.getClass());
}
}StreamingResponseHandler:流式输出的内部原理
理解流式响应的实现,对于调试TTFT延迟问题非常关键。
/**
* Spring AI流式响应的内部机制分析
*
* 技术基础:
* - Spring WebFlux / Project Reactor(Flux)
* - Server-Sent Events(SSE)
* - OpenAI的text/event-stream格式
*/
public class StreamingMechanismAnalysis {
/**
* 流式调用的完整链路
*/
public Flux<ChatResponse> analyzeStreamChain(ChatClient chatClient, String question) {
// 1. chatClient.stream()返回的是一个"冷流"(Cold Observable)
// 订阅之前不会执行任何操作!
Flux<ChatResponse> coldFlux = chatClient.prompt()
.user(question)
.stream()
.chatResponse();
// 2. 订阅时才真正发起HTTP请求
// subscribe()触发以下流程:
// a. 执行Advisor链的前置处理
// b. 调用ChatModel.stream()
// c. 建立与OpenAI API的HTTP连接
// d. 接收SSE数据流
// e. 每个SSE事件 → 一个Flux元素
return coldFlux
.doOnSubscribe(sub ->
log.debug("开始流式请求"))
.doOnNext(chunk -> {
String content = chunk.getResult().getOutput().getContent();
if (content != null && !content.isEmpty()) {
// 每个非空chunk就是一个Token(或几个Token)
log.debug("收到Token: '{}'", content);
}
})
.doOnComplete(() ->
log.debug("流式响应完成"))
.doOnError(e ->
log.error("流式响应出错", e));
}
}流式Advisor的特殊处理
/**
* 流式Advisor的实现要点
*
* 与同步Advisor的关键区别:
* 流式场景中,后置处理必须等到流完成后才能执行(保存到记忆库)
*/
public class StreamingMemoryAdvisor implements RequestResponseAdvisor {
private final ChatMemory chatMemory;
@Override
public Flux<AdvisedResponse> aroundStream(AdvisedRequest advisedRequest,
StreamAroundAdvisorChain chain) {
// 前置:注入历史记忆(同步操作)
String conversationId = getConversationId(advisedRequest);
List<Message> history = chatMemory.get(conversationId, 10);
AdvisedRequest enrichedRequest = enrichWithHistory(advisedRequest, history);
// 调用链(返回Flux)
Flux<AdvisedResponse> responseFlux = chain.nextAroundStream(enrichedRequest);
// 关键:收集完整的流式输出,然后异步保存
// 使用.collect()将所有chunk聚合
return responseFlux
.doOnComplete(() -> {
// 注意:这里需要把收到的所有chunks拼成完整响应再保存
// 实际源码用StringBuilder累积
log.debug("流式响应完成,保存到记忆");
});
// 简化说明:真实源码使用scan()或collectList()来累积流式内容
}
private String getConversationId(AdvisedRequest request) {
return (String) request.adviseContext()
.getOrDefault(MessageChatMemoryAdvisor.CHAT_MEMORY_CONVERSATION_ID_KEY,
"default");
}
private AdvisedRequest enrichWithHistory(AdvisedRequest request,
List<Message> history) {
List<Message> allMessages = new ArrayList<>(history);
allMessages.addAll(request.messages());
return AdvisedRequest.from(request)
.withMessages(allMessages)
.build();
}
}RetryTemplate集成:重试机制的源码解析
/**
* Spring AI如何集成Spring Retry
*
* 源码位置:DefaultChatClient或各ChatModel的实现
*/
public class RetryMechanismAnalysis {
/**
* Spring AI内置的重试配置(以OpenAiChatModel为例)
*
* 源码中的默认配置:
* - 最大重试3次
* - 重试间隔:指数退避,初始500ms
* - 重试条件:速率限制(429)、服务不可用(503)
*/
@Bean
public RetryTemplate aiRetryTemplate() {
return RetryTemplate.builder()
.maxAttempts(3)
.exponentialBackoff(
Duration.ofMillis(500), // 初始间隔500ms
2.0, // 指数倍数
Duration.ofSeconds(10) // 最大间隔10s
)
.retryOn(Arrays.asList(
// 这些异常会触发重试
RateLimitException.class,
ServerErrorException.class
))
.noRetryOn(Arrays.asList(
// 这些异常不重试(参数错误、鉴权失败无意义重试)
InvalidRequestException.class,
AuthenticationException.class
))
.withListener(new RetryListenerSupport() {
@Override
public <T, E extends Throwable> void onError(RetryContext context,
RetryCallback<T, E> callback,
Throwable throwable) {
log.warn("AI调用失败,准备第{}次重试,原因:{}",
context.getRetryCount(), throwable.getMessage());
}
})
.build();
}
/**
* 在自定义ChatModel中集成RetryTemplate
*/
public ChatResponse callWithRetry(Prompt prompt, RetryTemplate retryTemplate,
ChatModel chatModel) {
return retryTemplate.execute(context -> {
try {
return chatModel.call(prompt);
} catch (Exception e) {
// 增加上下文信息,便于重试后分析
context.setAttribute("original_error", e.getMessage());
throw e;
}
}, context -> {
// 所有重试都失败后的恢复策略
log.error("AI调用在{}次重试后最终失败", context.getRetryCount());
// 可以返回降级响应
return createFallbackResponse("服务暂时不可用,请稍后重试。");
});
}
}ChatMemory:会话历史的源码深度解析
/**
* ChatMemory接口定义和实现分析
*/
// 接口(简洁)
public interface ChatMemory {
void add(String conversationId, List<Message> messages);
List<Message> get(String conversationId, int lastN);
void clear(String conversationId);
}
/**
* InMemoryChatMemory:内存实现
*
* 适合:开发测试、单机部署
* 不适合:集群部署(多实例数据不共享)
*/
public class InMemoryChatMemory implements ChatMemory {
// 关键:ConcurrentHashMap保证线程安全
private final Map<String, List<Message>> conversationHistory =
new ConcurrentHashMap<>();
@Override
public void add(String conversationId, List<Message> messages) {
conversationHistory.compute(conversationId, (id, existingMessages) -> {
List<Message> updatedMessages = existingMessages != null
? new ArrayList<>(existingMessages)
: new ArrayList<>();
updatedMessages.addAll(messages);
return updatedMessages;
});
}
@Override
public List<Message> get(String conversationId, int lastN) {
List<Message> messages = conversationHistory.getOrDefault(
conversationId, new ArrayList<>());
// 只返回最近N条(避免Context太长)
int fromIndex = Math.max(0, messages.size() - lastN);
return new ArrayList<>(messages.subList(fromIndex, messages.size()));
}
@Override
public void clear(String conversationId) {
conversationHistory.remove(conversationId);
}
}生产级Redis ChatMemory实现
package com.laozhang.ai.memory;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.ai.chat.memory.ChatMemory;
import org.springframework.ai.chat.messages.*;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.time.Duration;
import java.util.*;
/**
* Redis版ChatMemory实现
*
* 支持集群部署、会话TTL自动过期
* Key格式:ai:chat:memory:{conversationId}
*/
@Component
public class RedisChatMemory implements ChatMemory {
private static final String KEY_PREFIX = "ai:chat:memory:";
private static final Duration DEFAULT_TTL = Duration.ofHours(24);
private static final int MAX_MESSAGES_PER_CONVERSATION = 100;
private final StringRedisTemplate redisTemplate;
private final ObjectMapper objectMapper;
public RedisChatMemory(StringRedisTemplate redisTemplate,
ObjectMapper objectMapper) {
this.redisTemplate = redisTemplate;
this.objectMapper = objectMapper;
}
@Override
public void add(String conversationId, List<Message> messages) {
String key = buildKey(conversationId);
try {
// 获取现有消息
List<MessageDTO> existing = getMessageDTOs(key);
// 添加新消息
for (Message message : messages) {
existing.add(MessageDTO.from(message));
}
// 限制最大消息数(防止无限增长)
if (existing.size() > MAX_MESSAGES_PER_CONVERSATION) {
existing = existing.subList(
existing.size() - MAX_MESSAGES_PER_CONVERSATION,
existing.size());
}
// 序列化并存储
String json = objectMapper.writeValueAsString(existing);
redisTemplate.opsForValue().set(key, json, DEFAULT_TTL);
} catch (Exception e) {
throw new RuntimeException("保存会话历史失败: " + conversationId, e);
}
}
@Override
public List<Message> get(String conversationId, int lastN) {
String key = buildKey(conversationId);
try {
List<MessageDTO> allMessages = getMessageDTOs(key);
// 返回最近N条
int fromIndex = Math.max(0, allMessages.size() - lastN);
List<MessageDTO> recent = allMessages.subList(fromIndex, allMessages.size());
return recent.stream()
.map(MessageDTO::toMessage)
.toList();
} catch (Exception e) {
// 读取失败:返回空列表(降级,不影响主流程)
log.warn("读取会话历史失败,返回空列表: {}", conversationId, e);
return List.of();
}
}
@Override
public void clear(String conversationId) {
redisTemplate.delete(buildKey(conversationId));
}
/**
* 刷新TTL(用户有活动时延长会话时间)
*/
public void refreshTtl(String conversationId) {
redisTemplate.expire(buildKey(conversationId), DEFAULT_TTL);
}
/**
* 获取会话统计信息
*/
public ConversationStats getStats(String conversationId) {
String key = buildKey(conversationId);
List<MessageDTO> messages = getMessageDTOs(key);
Long ttl = redisTemplate.getExpire(key);
return new ConversationStats(
conversationId,
messages.size(),
messages.stream().filter(m -> "user".equals(m.role())).count(),
messages.stream().filter(m -> "assistant".equals(m.role())).count(),
ttl != null ? Duration.ofSeconds(ttl) : Duration.ZERO
);
}
private String buildKey(String conversationId) {
return KEY_PREFIX + conversationId;
}
private List<MessageDTO> getMessageDTOs(String key) {
String json = redisTemplate.opsForValue().get(key);
if (json == null || json.isBlank()) {
return new ArrayList<>();
}
try {
return objectMapper.readValue(json, new TypeReference<List<MessageDTO>>() {});
} catch (Exception e) {
log.warn("反序列化会话历史失败,重置: {}", key);
return new ArrayList<>();
}
}
// DTO:用于JSON序列化
record MessageDTO(String role, String content, long timestamp) {
static MessageDTO from(Message message) {
String role = switch (message) {
case UserMessage ignored -> "user";
case AssistantMessage ignored -> "assistant";
case SystemMessage ignored -> "system";
default -> "unknown";
};
return new MessageDTO(role, message.getContent(), System.currentTimeMillis());
}
Message toMessage() {
return switch (role) {
case "user" -> new UserMessage(content);
case "assistant" -> new AssistantMessage(content);
case "system" -> new SystemMessage(content);
default -> new UserMessage(content);
};
}
}
record ConversationStats(
String conversationId,
int totalMessages,
long userMessages,
long assistantMessages,
Duration remainingTtl
) {}
private static final org.slf4j.Logger log =
org.slf4j.LoggerFactory.getLogger(RedisChatMemory.class);
}自定义Advisor:基于源码写扩展
理解了Advisor的内部机制,写自定义Advisor就很自然了。
package com.laozhang.ai.advisor;
import org.springframework.ai.chat.client.advisor.api.*;
import org.springframework.core.Ordered;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
/**
* 生产级自定义Advisor示例:
*
* 功能:
* 1. 请求级别的Token限流(防止单个用户滥用)
* 2. 成本实时追踪
* 3. 超时控制(独立于HTTP超时之外的业务超时)
* 4. 敏感词过滤
*/
public class ProductionAdvisor implements RequestResponseAdvisor {
private final RateLimiter rateLimiter;
private final CostTracker costTracker;
private final ContentFilter contentFilter;
public ProductionAdvisor(RateLimiter rateLimiter,
CostTracker costTracker,
ContentFilter contentFilter) {
this.rateLimiter = rateLimiter;
this.costTracker = costTracker;
this.contentFilter = contentFilter;
}
@Override
public AdvisedResponse aroundCall(AdvisedRequest advisedRequest,
CallAroundAdvisorChain chain) {
String userId = (String) advisedRequest.adviseContext()
.getOrDefault("user_id", "anonymous");
// 1. 速率限制检查
if (!rateLimiter.tryAcquire(userId)) {
throw new RateLimitException("用户 " + userId + " 请求过于频繁,请稍后重试");
}
// 2. 用户输入过滤
String userInput = extractUserInput(advisedRequest);
ContentFilterResult filterResult = contentFilter.check(userInput);
if (filterResult.isBlocked()) {
throw new ContentPolicyException("输入内容违反使用规范:" + filterResult.reason());
}
// 3. 记录开始时间
Instant startTime = Instant.now();
// 4. 调用链
AdvisedResponse response = chain.nextAroundCall(advisedRequest);
// 5. 后置:记录成本
long durationMs = Duration.between(startTime, Instant.now()).toMillis();
if (response.response().getMetadata().getUsage() != null) {
var usage = response.response().getMetadata().getUsage();
costTracker.record(userId, usage.getPromptTokens(),
usage.getGenerationTokens(), durationMs);
}
return response;
}
@Override
public Flux<AdvisedResponse> aroundStream(AdvisedRequest advisedRequest,
StreamAroundAdvisorChain chain) {
String userId = (String) advisedRequest.adviseContext()
.getOrDefault("user_id", "anonymous");
if (!rateLimiter.tryAcquire(userId)) {
return Flux.error(new RateLimitException("请求过于频繁"));
}
return chain.nextAroundStream(advisedRequest);
}
@Override
public int getOrder() {
// 在记忆Advisor之前执行(更小的order先执行)
return Ordered.HIGHEST_PRECEDENCE;
}
private String extractUserInput(AdvisedRequest request) {
return request.messages().stream()
.filter(m -> m instanceof UserMessage)
.map(m -> m.getContent())
.findFirst()
.orElse("");
}
// 占位接口(实际实现根据业务需求)
public interface RateLimiter {
boolean tryAcquire(String userId);
}
public interface CostTracker {
void record(String userId, int inputTokens, int outputTokens, long durationMs);
}
public interface ContentFilter {
ContentFilterResult check(String content);
}
public record ContentFilterResult(boolean isBlocked, String reason) {}
public static class RateLimitException extends RuntimeException {
public RateLimitException(String message) { super(message); }
}
public static class ContentPolicyException extends RuntimeException {
public ContentPolicyException(String message) { super(message); }
}
}Spring AI 1.0:0.x到1.0的核心变化
关键迁移变化
/**
* 0.x → 1.0 迁移指南
*/
public class MigrationGuide {
// ========== 变化1:ChatClient Builder API ==========
// 0.x 写法(已废弃)
@Deprecated
void oldWay_0x(OpenAiChatClient chatClient) {
// 直接注入具体实现,无法切换模型
String response = chatClient.generate("你好");
}
// 1.0 写法
void newWay_1x(ChatClient chatClient) {
// 通过Fluent API,可配置性强
String response = chatClient.prompt()
.user("你好")
.call()
.content();
}
// ========== 变化2:流式响应 ==========
// 0.x:Flux<ChatCompletionChunk>(依赖OpenAI具体类型)
@Deprecated
Flux<?> oldStreaming(OpenAiChatClient client) {
return client.generateStream(
new Prompt("问题"));
}
// 1.0:统一的Flux<ChatResponse>
Flux<String> newStreaming(ChatClient chatClient) {
return chatClient.prompt()
.user("问题")
.stream()
.content(); // 直接得到文本流
}
// ========== 变化3:Options配置 ==========
// 0.x:在构造器或全局配置
@Deprecated
void oldOptions() {
OpenAiChatClient client = new OpenAiChatClient(api,
OpenAiChatOptions.builder()
.withModel("gpt-4")
.build());
}
// 1.0:Builder + 运行时覆盖
void newOptions(ChatClient.Builder builder) {
ChatClient client = builder
.defaultOptions(OpenAiChatOptions.builder()
.withModel("gpt-4o-mini")
.build())
.build();
// 运行时覆盖
client.prompt()
.options(OpenAiChatOptions.builder()
.withModel("gpt-4o") // 临时用更好的模型
.build())
.user("重要问题")
.call()
.content();
}
// ========== 变化4:Tool/Function Calling ==========
// 1.0新增:统一的@Tool注解
void functionCallingExample(ChatClient chatClient) {
String result = chatClient.prompt()
.user("查询北京今天的天气")
.tools(new WeatherService()) // 注册工具
.call()
.content();
}
class WeatherService {
@Tool(description = "查询指定城市的当前天气")
public String getWeather(String city) {
return "北京今天晴,气温25°C";
}
}
}通过源码学设计模式
Spring AI的源码是学习Java设计模式的绝佳教材,而且是生产级实现,不是玩具代码。
| 设计模式 | Spring AI中的应用 | 关键类 |
|---|---|---|
| 建造者模式 | ChatClient的创建 | DefaultChatClientBuilder |
| 责任链模式 | Advisor链执行 | DefaultCallAroundAdvisorChain |
| 策略模式 | MessageConverter | OpenAiChatModel, AnthropicChatModel |
| 模板方法 | AbstractChatModel | 定义骨架,子类实现具体转换 |
| 装饰器模式 | Advisor对请求的增强 | RequestResponseAdvisor |
| 观察者模式 | StreamingResponseHandler | Project Reactor的Flux |
| 门面模式 | ChatClient对底层的封装 | ChatClient接口 |
| 工厂方法 | 各ChatModel的创建 | Spring Auto-Configuration |
/**
* 责任链模式深度分析
*
* Spring AI的Advisor链是责任链的优雅实现
* 与传统Filter链的区别:可以在call点前后都执行逻辑
*/
public class AdvisorChainPatternAnalysis {
// 传统责任链:只能在before/after
interface TraditionalFilter {
void before(Request request);
Response process(Request request);
void after(Response response);
}
// Spring AI的Advisor:更灵活,可以完全控制是否调用下一个
interface SpringAIAdvisor {
AdvisedResponse aroundCall(AdvisedRequest request, CallAroundAdvisorChain chain) {
// 可以:
// 1. 修改request后调用 chain.nextAroundCall(modifiedRequest)
// 2. 不调用chain(短路,直接返回自定义响应)
// 3. 调用chain,修改响应后返回
// 4. 多次调用chain(如重试Advisor)
}
}
}本地源码调试指南
# 1. 克隆Spring AI源码
git clone https://github.com/spring-projects/spring-ai.git
cd spring-ai
git checkout v1.0.0
# 2. 导入到IntelliJ IDEA
# File → Open → 选择spring-ai目录
# 3. 关键源码位置
# spring-ai-core/src/main/java/org/springframework/ai/chat/client/
# ├── ChatClient.java # 接口定义
# ├── DefaultChatClient.java # 核心实现
# └── advisor/
# ├── api/ # Advisor接口定义
# ├── MessageChatMemoryAdvisor.java
# ├── QuestionAnswerAdvisor.java
# └── SimpleLoggerAdvisor.java
# 4. 在你的项目中添加源码关联
# pom.xml中加入(IDEA会自动下载源码)
# <dependency>
# <groupId>org.springframework.ai</groupId>
# <artifactId>spring-ai-core</artifactId>
# <classifier>sources</classifier>
# </dependency>FAQ
Q:MessageChatMemoryAdvisor和ChatMemoryAdvisor有什么区别?
A:Spring AI 1.0中有几种Memory Advisor:MessageChatMemoryAdvisor把历史消息作为普通消息注入(存在上下文长度中);PromptChatMemoryAdvisor把历史消息注入到SystemPrompt中(更节省上下文);VectorStoreChatMemoryAdvisor把历史存入向量库,检索相关历史而不是全部历史(适合超长对话)。选择依据:对话轮次少用Message版,轮次多用VectorStore版。
Q:多个Advisor的执行顺序如何控制?
A:实现Ordered接口的getOrder()方法,数字越小越先执行。Spring AI的内置Advisor order值:QuestionAnswerAdvisor默认为Integer.MAX_VALUE - 1;MessageChatMemoryAdvisor默认为Integer.MAX_VALUE。建议自定义Advisor的order值:安全检查类用0-100,业务增强类用100-1000,日志类用Integer.MAX_VALUE - 100。
Q:能不能在Advisor中访问Spring容器的Bean?
A:可以,Advisor本身就是Spring Bean,可以通过构造器注入所需的任何Bean。需要注意线程安全:Advisor是单例,状态要么不可变,要么用ThreadLocal或从请求上下文中获取。
Q:怎么调试Advisor链的执行顺序?
A:加一个SimpleLoggerAdvisor,它会打印完整的请求和响应(注意线上别开,会打印用户内容)。或者自定义一个调试Advisor:在每次aroundCall时打印advisedRequest.adviseContext().keySet(),可以看到每个Advisor在上下文中放了什么。
总结
张伟那个"串记忆"的Bug,核心原因只有一行:没传conversationId,导致所有用户共用了"default"会话。
但这个Bug让他打开了Spring AI的源码,理解了:
- Builder模式:默认值 vs 运行时覆盖的优先级
- Advisor链:责任链模式的精妙实现,前置后置都可拦截
- ChatMemory:单例Bean + 多用户的陷阱,以及Redis版的正确实现
- 流式响应:Flux冷流的订阅机制,后置Advisor需要特殊处理
源码是最权威的文档。遇到Spring AI的奇怪行为,找源码10分钟,胜过猜测2小时。
