第1673篇:Agent记忆系统的持久化设计——短期、长期、情节记忆的存储架构
第1673篇:Agent记忆系统的持久化设计——短期、长期、情节记忆的存储架构
有人跟我说:我的Agent每次对话都像失忆了一样,上一轮聊的内容下一轮全不记得,怎么解决?
我说:你有没有想过,"记忆"这件事本身就分很多层?你记得昨天中午吃了什么(情节记忆),但你不需要时刻记着"筷子怎么用"(程序记忆)。Agent的记忆系统也应该分层,不同层的记忆用不同的存储方式,检索方式也不一样。
这篇文章就系统讲一下Agent的记忆架构设计。这是我在项目里摸索出来的一套方案,在两个生产级的对话Agent上跑了半年多,效果不错。
为什么Agent需要分层记忆
在开始讲实现之前,先想清楚为什么需要分层。
最简单粗暴的记忆方案是:把所有历史对话全塞进Context里。这个方案在对话轮数少的时候没问题,但随着轮数增加,问题越来越多:
Token爆炸:GPT-4的Context窗口是128K Token,几百轮对话就撑满了,要么截断,要么付更多钱。
注意力稀释:LLM的注意力机制对过长的上下文处理效果差,早期的重要信息会被"淡化"。
不同信息的时效性不同:用户的名字(长期有效)和上一轮说的"我想看科幻电影"(短期有效)应该用不同的方式处理。
人类记忆系统给了我们很好的启发:
对应到Agent系统,我们设计三层记忆:
- 工作记忆(Working Memory):当前对话的上下文,存在内存里,对话结束就清除
- 情节记忆(Episodic Memory):过去的对话摘要和关键事件,持久化存储,按时间检索
- 语义记忆(Semantic Memory):关于用户的结构化知识(偏好、信息、习惯),持久化存储,精准查询
数据模型设计
先把数据模型设计好,这是整个记忆系统的基础。
// 工作记忆:对话消息
@Data
public class ConversationMessage {
private String id;
private String sessionId;
private String role; // user / assistant / system / tool
private String content;
private Map<String, Object> metadata; // 工具调用参数、token计数等
private LocalDateTime timestamp;
private int tokenCount;
}
// 情节记忆:对话摘要
@Entity
@Table(name = "episodic_memories")
@Data
public class EpisodicMemory {
@Id
private String id;
private String userId;
private String sessionId; // 关联的原始会话
private String summary; // LLM生成的摘要
private String keyTopics; // 关键主题(JSON数组)
private String actionsTaken; // 采取的行动(JSON数组)
private String outcomes; // 结果(JSON数组)
@Column(columnDefinition = "vector(1536)")
private float[] embedding; // 语义向量,用于相似度检索
private LocalDateTime occurredAt;
private LocalDateTime createdAt;
private float importanceScore; // 重要性评分,影响检索优先级
}
// 语义记忆:用户画像
@Entity
@Table(name = "semantic_memories")
@Data
public class SemanticMemory {
@Id
private String id;
private String userId;
private String category; // preference / fact / skill / relationship
private String key; // 记忆的键,如"favorite_language"
private String value; // 记忆的值,如"Python"
private float confidence; // 置信度 0-1
private int mentions; // 被提及/确认的次数
private LocalDateTime firstObservedAt;
private LocalDateTime lastUpdatedAt;
private LocalDateTime expiresAt; // 可选的过期时间
}工作记忆的管理
工作记忆就是当前会话的消息列表,关键问题是当消息超过Token限制时怎么处理。
@Service
public class WorkingMemoryManager {
private final int maxTokens;
private final TokenCounter tokenCounter;
private final Map<String, Deque<ConversationMessage>> sessionMemories
= new ConcurrentHashMap<>();
public WorkingMemoryManager(@Value("${agent.working-memory.max-tokens:8000}")
int maxTokens) {
this.maxTokens = maxTokens;
this.tokenCounter = new TikTokenCounter();
}
/**
* 添加消息到工作记忆
*/
public void addMessage(String sessionId, ConversationMessage message) {
Deque<ConversationMessage> messages = sessionMemories.computeIfAbsent(
sessionId, k -> new ArrayDeque<>()
);
message.setTokenCount(tokenCounter.count(message.getContent()));
messages.addLast(message);
// 检查是否超过Token限制
compactIfNeeded(sessionId, messages);
}
/**
* 压缩工作记忆:保留System消息 + 最近的消息
*/
private void compactIfNeeded(String sessionId,
Deque<ConversationMessage> messages) {
int totalTokens = messages.stream()
.mapToInt(ConversationMessage::getTokenCount)
.sum();
if (totalTokens <= maxTokens) {
return;
}
log.info("工作记忆超限,开始压缩. sessionId={}, tokens={}",
sessionId, totalTokens);
// 策略:保留System消息,移除最老的用户/助手消息
Iterator<ConversationMessage> iter = messages.iterator();
List<ConversationMessage> toRemove = new ArrayList<>();
while (iter.hasNext() && totalTokens > maxTokens * 0.8) {
ConversationMessage msg = iter.next();
if (!"system".equals(msg.getRole())) {
toRemove.add(msg);
totalTokens -= msg.getTokenCount();
}
}
messages.removeAll(toRemove);
// 触发异步的情节记忆提取(把被移除的内容摘要化保存)
if (!toRemove.isEmpty()) {
episodicMemoryExtractor.extractAsync(sessionId, toRemove);
}
}
/**
* 获取当前工作记忆,构建LLM请求的消息列表
*/
public List<ConversationMessage> getWorkingMemory(String sessionId) {
Deque<ConversationMessage> messages = sessionMemories.get(sessionId);
if (messages == null) {
return Collections.emptyList();
}
return new ArrayList<>(messages);
}
/**
* 注入相关的长期记忆到工作记忆中
* 在每次LLM调用前调用这个方法
*/
public List<ConversationMessage> enrichWithLongTermMemory(
String sessionId, String userId, String currentQuery) {
List<ConversationMessage> working = getWorkingMemory(sessionId);
// 检索相关的情节记忆
List<EpisodicMemory> episodic = episodicMemoryService
.searchSimilar(userId, currentQuery, 3);
// 检索相关的语义记忆(用户画像信息)
List<SemanticMemory> semantic = semanticMemoryService
.getRelevant(userId, currentQuery);
if (episodic.isEmpty() && semantic.isEmpty()) {
return working;
}
// 把长期记忆注入为一个System消息
String memoryContext = buildMemoryContext(episodic, semantic);
ConversationMessage memoryMsg = new ConversationMessage();
memoryMsg.setRole("system");
memoryMsg.setContent("以下是关于用户的背景信息,请在回答时参考:\n" + memoryContext);
// 插入到System消息之后,User消息之前
List<ConversationMessage> enriched = new ArrayList<>();
enriched.add(memoryMsg);
enriched.addAll(working);
return enriched;
}
private String buildMemoryContext(List<EpisodicMemory> episodic,
List<SemanticMemory> semantic) {
StringBuilder sb = new StringBuilder();
if (!semantic.isEmpty()) {
sb.append("【用户基本信息】\n");
for (SemanticMemory mem : semantic) {
sb.append(String.format("- %s: %s\n", mem.getKey(), mem.getValue()));
}
}
if (!episodic.isEmpty()) {
sb.append("\n【历史对话摘要】\n");
for (EpisodicMemory mem : episodic) {
sb.append(String.format("- [%s] %s\n",
mem.getOccurredAt().toLocalDate(), mem.getSummary()));
}
}
return sb.toString();
}
}情节记忆的提取与存储
情节记忆是对过去对话的摘要,需要用LLM来提取,然后向量化存储,用相似度检索。
@Service
public class EpisodicMemoryService {
private final LLMClient llmClient;
private final EmbeddingClient embeddingClient;
private final EpisodicMemoryRepository repository;
/**
* 异步提取情节记忆(在工作记忆压缩时触发)
*/
@Async
public void extractAsync(String sessionId,
List<ConversationMessage> messages) {
try {
String conversation = formatConversation(messages);
// 用LLM提取摘要
String prompt = """
请分析以下对话内容,提取关键信息,返回JSON格式:
{
"summary": "对话的简短摘要(50字以内)",
"key_topics": ["主题1", "主题2"],
"actions_taken": ["采取的行动1", "行动2"],
"outcomes": ["结果1", "结果2"],
"importance": 0.8 // 重要性评分,0-1
}
对话内容:
""" + conversation;
String response = llmClient.complete(prompt, "gpt-4o-mini");
EpisodeSummary summary = JSON.parseObject(response, EpisodeSummary.class);
// 生成向量嵌入
float[] embedding = embeddingClient.embed(summary.getSummary());
// 持久化
EpisodicMemory memory = new EpisodicMemory();
memory.setId(UUID.randomUUID().toString());
memory.setSessionId(sessionId);
memory.setSummary(summary.getSummary());
memory.setKeyTopics(JSON.toJSONString(summary.getKeyTopics()));
memory.setEmbedding(embedding);
memory.setImportanceScore(summary.getImportance());
memory.setOccurredAt(messages.get(0).getTimestamp());
repository.save(memory);
log.info("情节记忆提取完成: sessionId={}, summary={}",
sessionId, summary.getSummary());
} catch (Exception e) {
log.error("情节记忆提取失败: sessionId={}", sessionId, e);
}
}
/**
* 语义相似度检索:找出和当前查询最相关的历史记忆
*/
public List<EpisodicMemory> searchSimilar(String userId,
String query,
int topK) {
// 生成查询的向量
float[] queryEmbedding = embeddingClient.embed(query);
// 向量相似度检索(使用pgvector扩展)
return repository.findSimilar(userId, queryEmbedding, topK);
}
}向量检索需要数据库支持,这里用PostgreSQL的pgvector扩展:
-- 建表
CREATE TABLE episodic_memories (
id VARCHAR(36) PRIMARY KEY,
user_id VARCHAR(64) NOT NULL,
session_id VARCHAR(64),
summary TEXT NOT NULL,
key_topics JSONB,
actions_taken JSONB,
outcomes JSONB,
embedding vector(1536), -- OpenAI text-embedding-ada-002维度
importance_score FLOAT,
occurred_at TIMESTAMP,
created_at TIMESTAMP DEFAULT NOW()
);
-- 创建向量索引(IVFFlat)
CREATE INDEX idx_episodic_embedding
ON episodic_memories
USING ivfflat (embedding vector_cosine_ops)
WITH (lists = 100);
-- 相似度检索SQL
SELECT *, 1 - (embedding <=> $1::vector) AS similarity
FROM episodic_memories
WHERE user_id = $2
AND importance_score > 0.3 -- 过滤低重要性记忆
ORDER BY embedding <=> $1::vector
LIMIT $3;对应的JPA查询:
@Repository
public interface EpisodicMemoryRepository extends JpaRepository<EpisodicMemory, String> {
@Query(value = """
SELECT *, 1 - (embedding <=> :embedding::vector) AS similarity
FROM episodic_memories
WHERE user_id = :userId
AND importance_score > 0.3
ORDER BY embedding <=> :embedding::vector
LIMIT :topK
""", nativeQuery = true)
List<EpisodicMemory> findSimilar(
@Param("userId") String userId,
@Param("embedding") float[] embedding,
@Param("topK") int topK
);
}语义记忆的更新逻辑
语义记忆是关于用户的结构化知识,更新规则比较复杂——需要处理冲突、合并和过期。
@Service
public class SemanticMemoryService {
private final SemanticMemoryRepository repository;
private final LLMClient llmClient;
/**
* 从对话中提取语义记忆并更新
*/
public void updateFromConversation(String userId,
List<ConversationMessage> messages) {
// 用LLM从对话中提取用户信息
String extractionPrompt = """
从以下对话中提取关于用户的客观信息,返回JSON数组:
[
{
"category": "preference", // preference/fact/habit
"key": "信息的键名",
"value": "信息的值",
"confidence": 0.9,
"expires_in_days": null // 如果是临时信息填天数,否则null
}
]
只提取明确表达的信息,不要推断。如果没有新信息,返回空数组。
对话内容:
""" + formatConversation(messages);
String response = llmClient.complete(extractionPrompt, "gpt-4o-mini");
List<ExtractedFact> facts = JSON.parseArray(response, ExtractedFact.class);
for (ExtractedFact fact : facts) {
upsertSemanticMemory(userId, fact);
}
}
/**
* 插入或更新语义记忆(处理冲突)
*/
@Transactional
public void upsertSemanticMemory(String userId, ExtractedFact fact) {
Optional<SemanticMemory> existing = repository.findByUserIdAndCategoryAndKey(
userId, fact.getCategory(), fact.getKey()
);
if (existing.isPresent()) {
SemanticMemory memory = existing.get();
// 检查是否是冲突信息(值不同)
if (!memory.getValue().equals(fact.getValue())) {
// 用置信度决策:新信息置信度高则更新
if (fact.getConfidence() > memory.getConfidence()) {
memory.setValue(fact.getValue());
memory.setConfidence(fact.getConfidence());
log.info("语义记忆更新: userId={}, key={}, {} -> {}",
userId, fact.getKey(), memory.getValue(), fact.getValue());
}
// 否则记录冲突但不更新
} else {
// 值相同,增加置信度(累积确认)
memory.setConfidence(Math.min(1.0f,
memory.getConfidence() + 0.05f));
memory.setMentions(memory.getMentions() + 1);
}
memory.setLastUpdatedAt(LocalDateTime.now());
repository.save(memory);
} else {
// 新记忆
SemanticMemory memory = new SemanticMemory();
memory.setId(UUID.randomUUID().toString());
memory.setUserId(userId);
memory.setCategory(fact.getCategory());
memory.setKey(fact.getKey());
memory.setValue(fact.getValue());
memory.setConfidence(fact.getConfidence());
memory.setMentions(1);
memory.setFirstObservedAt(LocalDateTime.now());
memory.setLastUpdatedAt(LocalDateTime.now());
if (fact.getExpiresInDays() != null) {
memory.setExpiresAt(
LocalDateTime.now().plusDays(fact.getExpiresInDays()));
}
repository.save(memory);
}
}
/**
* 获取与当前查询相关的语义记忆
*/
public List<SemanticMemory> getRelevant(String userId, String query) {
// 获取所有未过期的记忆
List<SemanticMemory> allMemories = repository.findActiveByUserId(userId);
// 过滤低置信度的记忆
return allMemories.stream()
.filter(m -> m.getConfidence() >= 0.6f)
.sorted(Comparator.comparingDouble(SemanticMemory::getConfidence).reversed())
.limit(20) // 最多取20条,避免Context过长
.collect(Collectors.toList());
}
/**
* 定期清理过期记忆
*/
@Scheduled(cron = "0 0 2 * * ?") // 每天凌晨2点
public void cleanExpiredMemories() {
int deleted = repository.deleteExpired(LocalDateTime.now());
log.info("清理过期语义记忆: {} 条", deleted);
}
}记忆重要性衰减
记忆不是永久有效的,随着时间推移,一些记忆的重要性应该降低。
@Service
public class MemoryDecayService {
/**
* 情节记忆的重要性衰减
* 使用指数衰减模型:importance(t) = importance(0) * e^(-λt)
*/
@Scheduled(cron = "0 0 3 * * ?")
public void decayEpisodicMemories() {
LocalDateTime cutoff = LocalDateTime.now().minusDays(30);
List<EpisodicMemory> oldMemories = episodicRepository
.findOlderThan(cutoff);
for (EpisodicMemory memory : oldMemories) {
long daysOld = ChronoUnit.DAYS.between(
memory.getOccurredAt(), LocalDateTime.now()
);
// 半衰期为90天
double decayFactor = Math.exp(-0.693 / 90.0 * daysOld);
float newImportance = (float) (memory.getImportanceScore() * decayFactor);
if (newImportance < 0.1f) {
// 重要性太低,直接删除
episodicRepository.delete(memory);
} else {
memory.setImportanceScore(newImportance);
episodicRepository.save(memory);
}
}
}
}整合:完整的记忆感知Agent调用链
把上面所有组件整合成一个完整的调用链:
@Service
public class MemoryAwareAgentService {
private final WorkingMemoryManager workingMemory;
private final EpisodicMemoryService episodicMemory;
private final SemanticMemoryService semanticMemory;
private final LLMClient llmClient;
public AgentResponse chat(String userId, String sessionId, String userInput) {
// 1. 添加用户输入到工作记忆
ConversationMessage userMsg = new ConversationMessage();
userMsg.setRole("user");
userMsg.setContent(userInput);
userMsg.setSessionId(sessionId);
workingMemory.addMessage(sessionId, userMsg);
// 2. 检索相关的长期记忆,注入到上下文
List<ConversationMessage> enrichedContext = workingMemory
.enrichWithLongTermMemory(sessionId, userId, userInput);
// 3. 调用LLM
String assistantResponse = llmClient.chat(enrichedContext);
// 4. 把Assistant的回复也加入工作记忆
ConversationMessage assistantMsg = new ConversationMessage();
assistantMsg.setRole("assistant");
assistantMsg.setContent(assistantResponse);
assistantMsg.setSessionId(sessionId);
workingMemory.addMessage(sessionId, assistantMsg);
// 5. 异步更新语义记忆(从最近的对话中提取用户信息)
List<ConversationMessage> recentMessages = List.of(userMsg, assistantMsg);
semanticMemory.updateFromConversation(userId, recentMessages);
return new AgentResponse(assistantResponse, sessionId);
}
/**
* 会话结束时,触发完整的情节记忆提取
*/
public void onSessionEnd(String sessionId, String userId) {
List<ConversationMessage> messages = workingMemory.getWorkingMemory(sessionId);
if (messages.size() >= 4) { // 至少2轮对话才值得提取
episodicMemory.extractAsync(sessionId, messages);
}
// 清理工作记忆
workingMemory.clearSession(sessionId);
}
}几个实际踩过的坑
坑1:向量索引没建,查询慢到崩溃。
早期测试时数据量少,没建向量索引,查询还很快。上线后数据量到10万条,每次语义检索要扫全表,耗时从5ms飙到2秒。赶紧补了IVFFlat索引,降回50ms以内。
坑2:LLM提取语义记忆的错误率比想象高。
我们让LLM从对话里提取用户信息,但LLM有时候会把不确定的信息作为事实提取出来。比如用户说"我好像喜欢Python",LLM可能提取出"喜欢Python:True"。后来在提取Prompt里加了"只提取用户明确确认的信息"的限制,并且设置较高的置信度阈值,效果好很多。
坑3:工作记忆压缩时机不对。
最初在每次添加消息后都检查压缩,压缩发生在消息添加的同步链路里,导致偶尔的高延迟。改成异步压缩,先添加消息返回,后台异步检查和压缩,用户体验明显改善。
坑4:情节记忆摘要质量参差不齐。
LLM生成的摘要有时候太简略("用户咨询了问题"),有时候太啰嗦(复述了整个对话)。后来在Prompt里加了具体的格式要求和长度限制,并且加了摘要质量评分,评分低的触发重新生成。
记忆系统是让Agent从"工具"进化为"助手"的关键一步。短期记忆保证了对话的连贯性,长期记忆让Agent真正了解用户。这套架构在我们的项目里效果不错,用户留存率明显提升,反馈说"这个Agent记得我之前说的,不用每次重复解释"。
