第2147篇:LLM应用的会话管理——多轮对话的上下文持久化与优化
2026/4/30大约 8 分钟
第2147篇:LLM应用的会话管理——多轮对话的上下文持久化与优化
适读人群:构建对话式AI应用的后端工程师 | 阅读时长:约18分钟 | 核心价值:掌握多轮对话的会话状态管理,解决上下文过长、跨会话记忆、并发访问等工程问题
做多轮对话产品时,会话管理是最容易被低估的模块。
Demo阶段:把历史消息全部塞进Prompt,能跑,体验不错。
到了生产阶段:用户和AI聊了20轮,上下文超过了模型的上下文窗口限制,报错;两个浏览器tab同时发请求,消息顺序乱了;服务重启,历史对话全没了;有用户故意发1000字的消息,上下文被占满,AI开始"失忆"。
这些问题一个一个解决就是会话管理要做的事情。
会话管理的核心设计
/**
* 多轮对话会话管理的设计要点
*
* ===== 核心数据结构 =====
*
* Session(会话):
* - sessionId:唯一标识
* - userId:归属用户
* - messages:消息历史(有序列表)
* - metadata:会话元数据(主题、场景等)
* - createdAt/lastActiveAt
*
* Message(消息):
* - messageId:唯一标识
* - role:USER / ASSISTANT / SYSTEM
* - content:消息内容
* - tokenCount:Token数量(预估)
* - timestamp
*
* ===== 关键设计决策 =====
*
* 1. 存储选择:
* Redis:快,但有内存限制,适合活跃会话
* MySQL/PostgreSQL:可靠,适合长期归档
* 推荐:Redis + DB双写,活跃会话在Redis,冷会话归档到DB
*
* 2. 上下文窗口管理:
* 不能无限积累,需要截断或压缩策略
* 参考:article-2124的ConversationHistoryCompressor
*
* 3. 并发控制:
* 同一会话同时发两条消息,消息顺序可能乱
* 需要锁机制保证消息顺序
*
* 4. Token计算:
* 发送前要估算总Token数,超限前主动截断
*/会话存储服务
/**
* 会话存储服务
*
* Redis + 数据库双写策略:
* - 活跃会话(24小时内有操作)存Redis
* - 所有会话持久化到数据库
* - 读取时优先Redis,未命中从数据库加载
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class ConversationSessionStore {
private final RedisTemplate<String, String> redis;
private final ConversationSessionRepository sessionRepo;
private final ConversationMessageRepository messageRepo;
private final ObjectMapper objectMapper;
// 活跃会话在Redis的TTL
private static final Duration SESSION_TTL = Duration.ofHours(48);
// 消息历史最大保存条数(数据库层面)
private static final int MAX_MESSAGES_PER_SESSION = 500;
/**
* 获取或创建会话
*/
public ConversationSession getOrCreateSession(String sessionId, String userId) {
// 1. 尝试从Redis读取
String redisKey = "session:" + sessionId;
String cached = redis.opsForValue().get(redisKey);
if (cached != null) {
try {
ConversationSession session = objectMapper.readValue(cached, ConversationSession.class);
// 刷新TTL
redis.expire(redisKey, SESSION_TTL);
return session;
} catch (Exception e) {
log.warn("Redis会话反序列化失败,从DB加载: {}", e.getMessage());
}
}
// 2. 从数据库加载
Optional<ConversationSession> fromDb = sessionRepo.findBySessionId(sessionId);
if (fromDb.isPresent()) {
ConversationSession session = fromDb.get();
// 加载消息历史
List<ConversationMessage> messages = messageRepo.findBySessionIdOrderByTimestamp(sessionId);
session.setMessages(messages);
// 回写Redis
cacheSession(session);
return session;
}
// 3. 创建新会话
ConversationSession newSession = ConversationSession.builder()
.sessionId(sessionId)
.userId(userId)
.messages(new ArrayList<>())
.createdAt(LocalDateTime.now())
.lastActiveAt(LocalDateTime.now())
.build();
sessionRepo.save(newSession);
cacheSession(newSession);
return newSession;
}
/**
* 追加消息(线程安全)
*
* 使用Redis的List原子操作,保证消息顺序
*/
public ConversationMessage appendMessage(
String sessionId, String role, String content) {
ConversationMessage message = ConversationMessage.builder()
.messageId(UUID.randomUUID().toString())
.sessionId(sessionId)
.role(MessageRole.valueOf(role.toUpperCase()))
.content(content)
.tokenCount(estimateTokenCount(content))
.timestamp(LocalDateTime.now())
.build();
// 持久化到数据库
messageRepo.save(message);
// 更新Redis中的会话(追加消息)
updateSessionCache(sessionId, session -> {
session.getMessages().add(message);
session.setLastActiveAt(LocalDateTime.now());
});
// 检查消息数量上限,超过则归档旧消息
long messageCount = messageRepo.countBySessionId(sessionId);
if (messageCount > MAX_MESSAGES_PER_SESSION) {
archiveOldMessages(sessionId);
}
return message;
}
/**
* 归档旧消息
*
* 超过上限时,把最老的消息移到归档表
*/
private void archiveOldMessages(String sessionId) {
// 保留最新的200条,其余归档
int toArchive = (int) messageRepo.countBySessionId(sessionId) - 200;
if (toArchive <= 0) return;
List<ConversationMessage> oldMessages = messageRepo
.findOldestMessages(sessionId, toArchive);
// 归档(移动到历史表)
messageRepo.archiveMessages(oldMessages.stream()
.map(ConversationMessage::getMessageId)
.toList());
log.debug("会话消息归档: sessionId={}, archived={}", sessionId, toArchive);
}
/**
* 更新会话缓存(线程安全)
*/
private void updateSessionCache(String sessionId,
java.util.function.Consumer<ConversationSession> updater) {
String redisKey = "session:" + sessionId;
// 使用Redis的WATCH + MULTI/EXEC做乐观锁更新
// 简化实现:直接加载、修改、回写(生产中应用Lua脚本保证原子性)
String cached = redis.opsForValue().get(redisKey);
if (cached != null) {
try {
ConversationSession session = objectMapper.readValue(cached, ConversationSession.class);
updater.accept(session);
cacheSession(session);
} catch (Exception e) {
// 如果Redis操作失败,以数据库为准(最终一致性)
log.warn("更新会话缓存失败: {}", e.getMessage());
redis.delete(redisKey); // 清除可能损坏的缓存
}
}
}
private void cacheSession(ConversationSession session) {
try {
String json = objectMapper.writeValueAsString(session);
redis.opsForValue().set("session:" + session.getSessionId(), json, SESSION_TTL);
} catch (Exception e) {
log.warn("会话缓存写入失败: {}", e.getMessage());
}
}
/**
* 估算Token数(简单规则:中文每字0.6 token,英文每词1 token)
*/
private int estimateTokenCount(String text) {
if (text == null) return 0;
long chineseChars = text.chars()
.filter(c -> c >= 0x4e00 && c <= 0x9fa5).count();
long englishWords = Arrays.stream(text.split("\\s+"))
.filter(w -> w.matches("[a-zA-Z]+")).count();
return (int) (chineseChars * 0.6 + englishWords * 1.0) + 10;
}
@Builder
@Entity
public static class ConversationSession {
@Id private String sessionId;
private String userId;
@Transient // 不存表,从消息表加载
private List<ConversationMessage> messages;
private LocalDateTime createdAt;
private LocalDateTime lastActiveAt;
private String topicSummary; // LLM生成的话题摘要
}
@Builder
@Entity
public static class ConversationMessage {
@Id private String messageId;
private String sessionId;
@Enumerated(EnumType.STRING)
private MessageRole role;
@Column(columnDefinition = "TEXT")
private String content;
private int tokenCount;
private LocalDateTime timestamp;
}
public enum MessageRole { USER, ASSISTANT, SYSTEM }
}上下文窗口优化
/**
* 智能上下文构建
*
* 在把历史消息发送给LLM之前,
* 根据Token预算智能选择哪些消息放入上下文
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class SmartContextBuilder {
private final ChatLanguageModel llm;
// LLM的最大上下文Token数(根据使用的模型设置)
@Value("${llm.max.context.tokens:4096}")
private int maxContextTokens;
// 系统提示预留的Token数
private static final int SYSTEM_PROMPT_RESERVE = 500;
// 当前用户问题和AI回复的预留Token数
private static final int CURRENT_TURN_RESERVE = 1000;
/**
* 构建发送给LLM的消息列表
*
* 策略:
* 1. 必须包含:系统消息 + 最新的用户消息
* 2. 尽量包含:最近的N条对话历史
* 3. Token超限时:压缩或截断早期历史
*/
public List<ChatMessage> buildContext(
String systemPrompt,
List<ConversationSessionStore.ConversationMessage> history,
String currentUserMessage) {
int availableTokens = maxContextTokens - SYSTEM_PROMPT_RESERVE - CURRENT_TURN_RESERVE;
// 倒序遍历历史消息,从最新的开始,直到Token用完
List<ConversationSessionStore.ConversationMessage> selectedMessages =
selectMessagesByTokenBudget(history, availableTokens);
// 构建消息列表
List<ChatMessage> messages = new ArrayList<>();
// 1. 系统消息
if (systemPrompt != null && !systemPrompt.isBlank()) {
messages.add(new SystemMessage(systemPrompt));
}
// 2. 如果有压缩的历史摘要,加在最前面
String historySummary = getOrGenerateHistorySummary(history, selectedMessages);
if (historySummary != null && !historySummary.isBlank()) {
messages.add(new SystemMessage("以下是之前对话的摘要,供参考:\n" + historySummary));
}
// 3. 选中的历史消息
for (ConversationSessionStore.ConversationMessage msg : selectedMessages) {
switch (msg.role()) {
case USER -> messages.add(new UserMessage(msg.content()));
case ASSISTANT -> messages.add(new AiMessage(msg.content()));
case SYSTEM -> messages.add(new SystemMessage(msg.content()));
}
}
// 4. 当前用户消息
messages.add(new UserMessage(currentUserMessage));
int totalMessages = messages.size();
log.debug("上下文构建完成: messages={}, historySelected={}",
totalMessages, selectedMessages.size());
return messages;
}
/**
* 按Token预算选择历史消息
*
* 从最新的消息开始往前选,直到Token用完
*/
private List<ConversationSessionStore.ConversationMessage> selectMessagesByTokenBudget(
List<ConversationSessionStore.ConversationMessage> history, int tokenBudget) {
if (history.isEmpty()) return List.of();
LinkedList<ConversationSessionStore.ConversationMessage> selected = new LinkedList<>();
int usedTokens = 0;
// 从最新的消息往前选
for (int i = history.size() - 1; i >= 0; i--) {
ConversationSessionStore.ConversationMessage msg = history.get(i);
int msgTokens = msg.tokenCount();
if (usedTokens + msgTokens > tokenBudget) {
break; // Token用完
}
selected.addFirst(msg); // 保持顺序
usedTokens += msgTokens;
}
return selected;
}
/**
* 获取或生成历史摘要
*
* 当历史太长,被截断时,把截断部分的内容压缩成摘要,
* 让AI仍然知道更早期的对话要点
*/
private String getOrGenerateHistorySummary(
List<ConversationSessionStore.ConversationMessage> allHistory,
List<ConversationSessionStore.ConversationMessage> selectedHistory) {
if (allHistory.size() == selectedHistory.size()) {
return null; // 没有截断,不需要摘要
}
// 被截断的历史消息
int selectedCount = selectedHistory.size();
List<ConversationSessionStore.ConversationMessage> truncatedHistory =
allHistory.subList(0, allHistory.size() - selectedCount);
if (truncatedHistory.isEmpty()) return null;
// 生成摘要
String historyText = truncatedHistory.stream()
.map(m -> m.role() + ": " + m.content())
.collect(Collectors.joining("\n"));
String summaryPrompt = """
请用100字以内总结以下对话的核心内容和关键信息:
%s
""".formatted(historyText.substring(0, Math.min(3000, historyText.length())));
try {
return llm.generate(summaryPrompt);
} catch (Exception e) {
log.warn("历史摘要生成失败: {}", e.getMessage());
return null;
}
}
}并发访问控制
/**
* 会话的并发访问控制
*
* 问题场景:
* 用户在手机和电脑上同时发消息,
* 两个请求并发访问同一个会话,
* 消息顺序可能乱,上下文可能不一致
*
* 解决方案:分布式会话锁
* 同一个sessionId的请求,串行处理
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class SessionConcurrencyGuard {
private final RedisTemplate<String, String> redis;
// 会话锁的超时时间(防止死锁)
private static final Duration LOCK_TIMEOUT = Duration.ofSeconds(30);
/**
* 在会话锁保护下执行操作
*
* 确保同一个会话的并发请求串行执行
*/
public <T> T withSessionLock(String sessionId, Callable<T> action)
throws SessionBusyException {
String lockKey = "session-lock:" + sessionId;
String lockValue = UUID.randomUUID().toString();
// 尝试获取锁(SET NX PX,原子操作)
Boolean acquired = redis.opsForValue()
.setIfAbsent(lockKey, lockValue, LOCK_TIMEOUT);
if (!Boolean.TRUE.equals(acquired)) {
// 未能获取锁,说明该会话正在被其他请求处理
log.debug("会话正忙,请求排队: sessionId={}", sessionId);
// 等待并重试(最多等3秒)
int retries = 0;
while (retries < 6) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SessionBusyException("等待被中断");
}
acquired = redis.opsForValue()
.setIfAbsent(lockKey, lockValue, LOCK_TIMEOUT);
if (Boolean.TRUE.equals(acquired)) break;
retries++;
}
if (!Boolean.TRUE.equals(acquired)) {
throw new SessionBusyException("会话繁忙,请稍后重试");
}
}
try {
return action.call();
} catch (Exception e) {
if (e instanceof SessionBusyException) throw (SessionBusyException) e;
throw new RuntimeException(e);
} finally {
// 释放锁(只释放自己加的锁,防止误删其他请求的锁)
String currentValue = redis.opsForValue().get(lockKey);
if (lockValue.equals(currentValue)) {
redis.delete(lockKey);
}
}
}
static class SessionBusyException extends RuntimeException {
public SessionBusyException(String message) { super(message); }
}
}实践建议
会话的生命周期管理不能靠用户主动清理
用户很少主动关闭会话,会话会无限积累。必须有自动过期机制:活跃会话TTL(建议7天无操作后过期)、冷会话归档(超过30天的数据移到低成本存储)、消息数量上限(每个会话保留最近500条,更早的压缩为摘要)。这些不是可选的,是生产环境的必备配置。没有这些,几个月后Redis内存就会被撑爆。
并发控制的粒度是单个会话,不是全局
全局锁会让所有用户串行,QPS=1。正确的锁粒度:每个sessionId有独立的锁,不同会话之间完全并行。同一个会话内,最多同时只有一个请求在处理(保证消息顺序)。这样既保证了消息顺序的正确性,又不影响系统整体的并发能力。
Token预算是动态的,要根据当前问题动态调整
不是所有问题都需要带上全部历史。用户问"你好"不需要把前20轮对话都塞进去;用户问"总结一下我们之前聊的内容"才需要尽量多的历史。可以做简单的意图检测:如果问题包含"之前"、"刚才"、"总结"等词,适当增加历史的Token预算;如果是独立的新问题,减少历史Token,给AI的回答留更多空间。
