第1902篇:LangChain4j的内存模块深度定制——自定义记忆策略的实现
第1902篇:LangChain4j的内存模块深度定制——自定义记忆策略的实现
我记得第一次把一个 LangChain4j 的聊天机器人推上测试环境,产品经理试了没五分钟就来找我:"怎么我问了个问题,它回答完就忘了?再问它还是重头解释。"
这就是不加会话记忆的效果——每次请求都是孤立的,LLM 眼里每条消息都是全新对话。
加上默认的 MessageWindowChatMemory 之后确实解决了遗忘问题,但新问题来了:窗口太小上下文丢失,窗口太大 token 费用飙升。更麻烦的是,默认内存策略对所有会话一视同仁,没有办法针对不同场景做差异化处理——技术支持场景需要保留完整对话,闲聊场景保留最近几轮就够了。
这篇文章就从底层聊聊 LangChain4j 的内存模块,以及怎么实现真正符合业务需求的自定义记忆策略。
LangChain4j 内存模块的架构概览
先看一眼整体架构,不然后面聊自定义会觉得很割裂。
LangChain4j 的内存系统围绕三个核心抽象:
ChatMemory:内存的核心接口,定义了消息的增删查操作ChatMemoryStore:消息的持久化存储接口,负责实际的读写ChatMemoryProvider:内存的工厂接口,根据 memoryId 返回对应的 ChatMemory 实例
// ChatMemory 接口核心方法
public interface ChatMemory {
Object id();
void add(ChatMessage message);
List<ChatMessage> messages();
void clear();
}框架内置了两个实现:
MessageWindowChatMemory:保留最近 N 条消息TokenWindowChatMemory:保留不超过 N 个 token 的消息
这两个都有明显局限。MessageWindowChatMemory 按条数截断,会把一些长消息占满窗口,或者把重要的系统消息挤掉。TokenWindowChatMemory 按 token 数截断,相对好一点,但同样是纯粹的滑动窗口,没有任何语义感知。
自定义 ChatMemory 的第一步:理解消息类型
在实现自定义记忆前,必须理解消息的类型体系:
// LangChain4j 的消息类型
SystemMessage // 系统提示词,通常放在最前面
UserMessage // 用户输入
AiMessage // AI 回复,可能包含 tool calls
ToolExecutionResultMessage // 工具执行结果这里有个非常重要的约束:消息顺序必须合法。大多数 LLM API 要求:
- SystemMessage 只能在最前面,且最多一条
- UserMessage 和 AiMessage 必须交替出现
ToolExecutionResultMessage必须紧跟在包含 tool call 的AiMessage后面
如果你自定义内存,把消息列表搞乱了,API 调用会报错,而且错误信息往往不够明确,排查起来很痛苦——我当年就是这样,对着一个莫名其妙的 400 错误折腾了半天。
实现一个带摘要压缩的记忆策略
当对话轮次很多时,滑动窗口会丢失早期重要信息。更好的做法是:把早期历史压缩成摘要,保留最近的完整对话。
public class SummaryWindowChatMemory implements ChatMemory {
private final Object id;
private final int maxRecentMessages; // 保留最近几条完整消息
private final int summaryThreshold; // 超过这个数量才开始压缩
private final ChatLanguageModel summaryModel;
private final ChatMemoryStore store;
private String currentSummary = null; // 压缩后的历史摘要
public SummaryWindowChatMemory(Object id, int maxRecentMessages,
int summaryThreshold,
ChatLanguageModel summaryModel,
ChatMemoryStore store) {
this.id = id;
this.maxRecentMessages = maxRecentMessages;
this.summaryThreshold = summaryThreshold;
this.summaryModel = summaryModel;
this.store = store;
// 从持久化存储恢复摘要
this.currentSummary = store.getSummary(id);
}
@Override
public Object id() {
return id;
}
@Override
public void add(ChatMessage message) {
List<ChatMessage> messages = store.getMessages(id);
messages.add(message);
// 如果超过阈值,触发压缩
if (messages.size() > summaryThreshold) {
compressHistory(messages);
} else {
store.updateMessages(id, messages);
}
}
@Override
public List<ChatMessage> messages() {
List<ChatMessage> recentMessages = store.getMessages(id);
// 如果有摘要,把摘要作为 SystemMessage 前置
if (currentSummary != null && !currentSummary.isEmpty()) {
List<ChatMessage> result = new ArrayList<>();
// 保留原来的 SystemMessage(如果有)
recentMessages.stream()
.filter(m -> m instanceof SystemMessage)
.findFirst()
.ifPresent(result::add);
// 插入摘要作为上下文
result.add(SystemMessage.from(
"以下是之前对话的摘要,请记住这些背景信息:\n\n" + currentSummary
));
// 加入最近的非系统消息
recentMessages.stream()
.filter(m -> !(m instanceof SystemMessage))
.forEach(result::add);
return result;
}
return recentMessages;
}
@Override
public void clear() {
store.deleteMessages(id);
store.deleteSummary(id);
currentSummary = null;
}
private void compressHistory(List<ChatMessage> messages) {
// 取出需要压缩的早期消息(保留最近 maxRecentMessages 条)
int compressCount = messages.size() - maxRecentMessages;
List<ChatMessage> toCompress = messages.subList(0, compressCount);
List<ChatMessage> toKeep = messages.subList(compressCount, messages.size());
// 过滤掉 SystemMessage,它们不参与压缩
List<ChatMessage> nonSystemMessages = toCompress.stream()
.filter(m -> !(m instanceof SystemMessage))
.collect(Collectors.toList());
if (!nonSystemMessages.isEmpty()) {
String newSummary = generateSummary(nonSystemMessages, currentSummary);
currentSummary = newSummary;
store.updateSummary(id, newSummary);
}
// 更新存储,只保留最近消息
store.updateMessages(id, new ArrayList<>(toKeep));
log.info("记忆压缩完成,会话 {}: 压缩了 {} 条消息,当前摘要长度: {} 字符",
id, compressCount, currentSummary != null ? currentSummary.length() : 0);
}
private String generateSummary(List<ChatMessage> messages, String existingSummary) {
StringBuilder prompt = new StringBuilder();
if (existingSummary != null && !existingSummary.isEmpty()) {
prompt.append("现有对话摘要:\n").append(existingSummary).append("\n\n");
}
prompt.append("请将以下对话内容总结为简洁的要点,保留关键信息、用户意图和重要结论:\n\n");
for (ChatMessage msg : messages) {
if (msg instanceof UserMessage) {
prompt.append("用户:").append(((UserMessage) msg).singleText()).append("\n");
} else if (msg instanceof AiMessage) {
AiMessage aiMsg = (AiMessage) msg;
if (!aiMsg.hasToolExecutionRequests()) {
prompt.append("助手:").append(aiMsg.text()).append("\n");
}
}
}
Response<AiMessage> response = summaryModel.generate(
List.of(UserMessage.from(prompt.toString()))
);
return response.content().text();
}
}自定义 ChatMemoryStore:持久化到 Redis
默认的 InMemoryChatMemoryStore 服务重启就丢了。生产环境必须持久化,Redis 是最常见的选择。
@Component
public class RedisChatMemoryStore implements ChatMemoryStore {
private static final String MESSAGE_KEY_PREFIX = "chat:memory:msg:";
private static final String SUMMARY_KEY_PREFIX = "chat:memory:sum:";
private static final Duration DEFAULT_TTL = Duration.ofHours(24);
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Autowired
private ObjectMapper objectMapper;
@Override
public List<ChatMessage> getMessages(Object memoryId) {
String key = MESSAGE_KEY_PREFIX + memoryId;
List<String> jsonList = redisTemplate.opsForList().range(key, 0, -1);
if (jsonList == null || jsonList.isEmpty()) {
return new ArrayList<>();
}
return jsonList.stream()
.map(this::deserializeMessage)
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
@Override
public void updateMessages(Object memoryId, List<ChatMessage> messages) {
String key = MESSAGE_KEY_PREFIX + memoryId;
// 用事务保证原子性
redisTemplate.execute(new SessionCallback<Void>() {
@Override
public Void execute(RedisOperations operations) throws DataAccessException {
operations.multi();
operations.delete(key);
if (!messages.isEmpty()) {
List<String> serialized = messages.stream()
.map(RedisChatMemoryStore.this::serializeMessage)
.collect(Collectors.toList());
operations.opsForList().rightPushAll(key, serialized);
operations.expire(key, DEFAULT_TTL);
}
operations.exec();
return null;
}
});
}
@Override
public void deleteMessages(Object memoryId) {
redisTemplate.delete(MESSAGE_KEY_PREFIX + memoryId);
}
public String getSummary(Object memoryId) {
return redisTemplate.opsForValue().get(SUMMARY_KEY_PREFIX + memoryId);
}
public void updateSummary(Object memoryId, String summary) {
redisTemplate.opsForValue().set(
SUMMARY_KEY_PREFIX + memoryId,
summary,
DEFAULT_TTL
);
}
public void deleteSummary(Object memoryId) {
redisTemplate.delete(SUMMARY_KEY_PREFIX + memoryId);
}
private String serializeMessage(ChatMessage message) {
try {
Map<String, Object> map = new HashMap<>();
map.put("type", message.getClass().getSimpleName());
if (message instanceof SystemMessage) {
map.put("text", ((SystemMessage) message).text());
} else if (message instanceof UserMessage) {
map.put("text", ((UserMessage) message).singleText());
} else if (message instanceof AiMessage) {
AiMessage ai = (AiMessage) message;
map.put("text", ai.text());
if (ai.hasToolExecutionRequests()) {
map.put("toolRequests", ai.toolExecutionRequests());
}
} else if (message instanceof ToolExecutionResultMessage) {
ToolExecutionResultMessage tool = (ToolExecutionResultMessage) message;
map.put("id", tool.id());
map.put("toolName", tool.toolName());
map.put("text", tool.text());
}
return objectMapper.writeValueAsString(map);
} catch (JsonProcessingException e) {
log.error("消息序列化失败", e);
return null;
}
}
private ChatMessage deserializeMessage(String json) {
try {
Map<String, Object> map = objectMapper.readValue(json,
new TypeReference<Map<String, Object>>() {});
String type = (String) map.get("type");
String text = (String) map.get("text");
return switch (type) {
case "SystemMessage" -> SystemMessage.from(text);
case "UserMessage" -> UserMessage.from(text);
case "AiMessage" -> AiMessage.from(text != null ? text : "");
case "ToolExecutionResultMessage" -> ToolExecutionResultMessage.from(
(String) map.get("id"),
(String) map.get("toolName"),
text
);
default -> {
log.warn("未知消息类型: {}", type);
yield null;
}
};
} catch (Exception e) {
log.error("消息反序列化失败: {}", json, e);
return null;
}
}
}记忆策略工厂:按场景分配不同策略
不同业务场景需要不同的记忆策略,通过工厂模式统一管理:
@Component
public class AdaptiveChatMemoryProvider implements ChatMemoryProvider {
@Autowired
private RedisChatMemoryStore store;
@Autowired
private ChatLanguageModel summaryModel;
@Autowired
private SessionMetadataService sessionMetadata;
@Override
public ChatMemory get(Object memoryId) {
String sessionType = sessionMetadata.getSessionType(memoryId.toString());
return switch (sessionType) {
// 技术支持:保留完整对话,使用摘要压缩
case "TECH_SUPPORT" -> new SummaryWindowChatMemory(
memoryId, 20, 40, summaryModel, store
);
// 客服场景:中等窗口
case "CUSTOMER_SERVICE" -> MessageWindowChatMemory.builder()
.id(memoryId)
.maxMessages(15)
.chatMemoryStore(store)
.build();
// 闲聊:短窗口,省 token
case "CASUAL_CHAT" -> MessageWindowChatMemory.builder()
.id(memoryId)
.maxMessages(6)
.chatMemoryStore(store)
.build();
// 默认:中等窗口
default -> MessageWindowChatMemory.builder()
.id(memoryId)
.maxMessages(10)
.chatMemoryStore(store)
.build();
};
}
}记忆注入拦截:在消息存储前做预处理
有时候你需要在消息被存入内存之前做一些处理,比如敏感信息脱敏、格式标准化等。LangChain4j 没有直接提供拦截器,但可以通过装饰器模式实现:
public class FilteringChatMemory implements ChatMemory {
private final ChatMemory delegate;
private final List<MessageFilter> filters;
public FilteringChatMemory(ChatMemory delegate, List<MessageFilter> filters) {
this.delegate = delegate;
this.filters = filters;
}
@Override
public Object id() {
return delegate.id();
}
@Override
public void add(ChatMessage message) {
ChatMessage processed = message;
for (MessageFilter filter : filters) {
processed = filter.process(processed);
if (processed == null) {
// 过滤器决定丢弃这条消息
log.debug("消息被过滤器丢弃: {}", filter.getClass().getSimpleName());
return;
}
}
delegate.add(processed);
}
@Override
public List<ChatMessage> messages() {
return delegate.messages();
}
@Override
public void clear() {
delegate.clear();
}
}
// 敏感信息脱敏过滤器
public class SensitiveInfoFilter implements MessageFilter {
private static final Pattern PHONE_PATTERN =
Pattern.compile("1[3-9]\\d{9}");
private static final Pattern ID_CARD_PATTERN =
Pattern.compile("\\d{17}[0-9Xx]");
@Override
public ChatMessage process(ChatMessage message) {
if (message instanceof UserMessage) {
String text = ((UserMessage) message).singleText();
String desensitized = desensitize(text);
return UserMessage.from(desensitized);
}
return message;
}
private String desensitize(String text) {
text = PHONE_PATTERN.matcher(text).replaceAll("1**********");
text = ID_CARD_PATTERN.matcher(text).replaceAll("***");
return text;
}
}记忆管理的流程图
踩坑经验
坑1:摘要模型的选择很有讲究
不要用和主对话一样的大模型做摘要,太贵了。我们用 GPT-3.5 或者部署在本地的小模型做摘要,主对话用 GPT-4。摘要的质量不需要那么高,能抓住关键信息就行。
坑2:摘要时机的把握
太频繁压缩会增加成本(每次压缩都要调 LLM),太晚压缩会导致 token 超限。我的经验是:当消息数量达到 maxMessages * 1.5 时触发压缩,压缩后保留 maxMessages * 0.5 条完整消息。
坑3:工具调用消息不能单独出现
ToolExecutionResultMessage 必须跟在有 tool call 的 AiMessage 后面。在压缩历史时,如果把一个 AiMessage(含 tool call)压缩进摘要了,对应的 ToolExecutionResultMessage 也必须一起压缩,不能留在消息列表里。
实现时要成对处理:
// 找出所有需要一起压缩的消息组
private List<List<ChatMessage>> groupMessages(List<ChatMessage> messages) {
List<List<ChatMessage>> groups = new ArrayList<>();
List<ChatMessage> currentGroup = new ArrayList<>();
for (ChatMessage msg : messages) {
if (msg instanceof UserMessage && !currentGroup.isEmpty()) {
groups.add(new ArrayList<>(currentGroup));
currentGroup.clear();
}
currentGroup.add(msg);
}
if (!currentGroup.isEmpty()) {
groups.add(currentGroup);
}
return groups;
}坑4:多实例部署的并发写入
多个服务实例同时处理同一个 session 的请求时,会有并发写入 Redis 的问题。解决方案是加分布式锁,或者用 Redis 的 MULTI/EXEC 乐观锁:
// 使用 Redisson 分布式锁
public void add(ChatMessage message) {
RLock lock = redissonClient.getLock("chat:lock:" + id);
try {
lock.lock(5, TimeUnit.SECONDS);
// ... 正常的 add 逻辑
} finally {
lock.unlock();
}
}小结
LangChain4j 的内存模块看似简单,但往深了做有很多细节:
- 消息类型和顺序约束,搞错了 API 报错很难排查
- 摘要压缩是解决长对话 token 成本的有效手段
- 持久化必须用 Redis 等外部存储,且要处理并发安全
- 不同业务场景配不同策略,通过
ChatMemoryProvider统一管理
记忆模块是 AI 应用的"大脑短期记忆",设计好了用户体验天差地别。
