第1637篇:大模型的上下文窗口管理——超长对话的压缩与召回策略
第1637篇:大模型的上下文窗口管理——超长对话的压缩与召回策略
做过多轮对话应用的人都知道,有个问题早晚会遇到:对话一长,要么Token超限崩掉,要么成本高得吓人,要么响应变慢。
我们产品上有个用户,跟AI客服一次连续对话了200多轮,Token消耗超过了10万。那次的成本账单出来后,产品经理特别找我谈,说这种情况如果大规模发生,成本会失控。
从那之后我们认真研究了上下文管理这个话题,今天把思路和代码都分享出来。
先理解问题的本质
大模型是无状态的,每次调用都需要把完整的对话历史传进去。这意味着:
第1轮:发送1条消息,处理1条
第2轮:发送2条消息,处理2条
...
第N轮:发送N条消息,处理N条随着对话轮数增加,Token消耗是二次方增长的,不是线性的。对话越长,单次调用越贵,速度越慢。
主流模型的上下文限制:
- GPT-4o:128K tokens
- Claude 3.5:200K tokens
- Gemini 1.5 Pro:1M tokens
- 私有化部署的7B模型:通常4K-32K tokens
上下文越大不代表可以随便放,超长上下文下模型的注意力会分散,中间部分的信息容易被忽略(这被称为"中间消失"问题)。
四种主流的上下文管理策略
策略一:简单截断(最简单,效果最差)
只保留最近的N条消息:
public class SlidingWindowContextManager {
private final int maxMessages;
public SlidingWindowContextManager(int maxMessages) {
this.maxMessages = maxMessages;
}
public List<Message> trim(List<Message> history) {
if (history.size() <= maxMessages) return history;
// 永远保留第一条System消息(如果有)
List<Message> result = new ArrayList<>();
Message systemMessage = history.stream()
.filter(m -> m instanceof SystemMessage)
.findFirst()
.orElse(null);
if (systemMessage != null) {
result.add(systemMessage);
}
// 取最近的N条(不包含system)
List<Message> nonSystem = history.stream()
.filter(m -> !(m instanceof SystemMessage))
.collect(Collectors.toList());
int startIndex = Math.max(0, nonSystem.size() - maxMessages);
result.addAll(nonSystem.subList(startIndex, nonSystem.size()));
return result;
}
}这种方式的问题是:早期对话的重要信息(用户的偏好、背景介绍等)会被截掉,导致AI"失忆"。
策略二:摘要压缩(经典方案)
定期把早期的对话压缩成摘要,用摘要替代原始历史:
@Service
@Slf4j
public class SummaryCompressingContextManager {
private final ChatClient summaryClient;
private final int compressionThreshold; // 超过多少条消息触发压缩
private final int keepRecentCount; // 压缩后保留最近几条
private static final String SUMMARY_PROMPT = """
你是一个对话摘要专家。请将以下对话历史压缩成简洁的摘要。
要求:
1. 保留所有重要信息:用户的关键诉求、确认的事项、重要结论
2. 去除重复、寒暄、无关内容
3. 用第三人称描述,格式为"用户...,AI..."
4. 摘要不超过500字
对话历史:
{conversation}
请直接输出摘要,不要有前缀说明:
""";
public List<Message> manage(List<Message> history) {
if (history.size() <= compressionThreshold) return history;
// 分离:需要压缩的部分 + 保留的最近部分
List<Message> toCompress = history.subList(0,
history.size() - keepRecentCount);
List<Message> toKeep = history.subList(
history.size() - keepRecentCount, history.size());
// 已经是摘要了就不要重复压缩(检查是否有SummaryMessage标记)
List<Message> compressable = toCompress.stream()
.filter(m -> !(m instanceof SummaryMessage))
.collect(Collectors.toList());
if (compressable.isEmpty()) {
return history;
}
log.info("压缩 {} 条历史消息", compressable.size());
// 生成摘要
String summary = generateSummary(compressable);
// 组装新的历史:旧摘要(如果有)+ 新摘要 + 最近消息
List<Message> result = new ArrayList<>();
// 保留旧摘要
toCompress.stream()
.filter(m -> m instanceof SummaryMessage)
.forEach(result::add);
// 加入新摘要
result.add(new SummaryMessage("[对话摘要]\n" + summary));
// 加入最近消息
result.addAll(toKeep);
return result;
}
private String generateSummary(List<Message> messages) {
String conversation = messages.stream()
.map(m -> formatMessage(m))
.collect(Collectors.joining("\n"));
try {
return summaryClient.prompt()
.system("你是对话摘要专家")
.user(SUMMARY_PROMPT.replace("{conversation}", conversation))
.call()
.content();
} catch (Exception e) {
log.error("生成摘要失败", e);
// 降级:简单截断
return "(前期对话记录较多,已压缩处理)";
}
}
private String formatMessage(Message message) {
String role = message instanceof UserMessage ? "用户" : "AI";
return role + ":" + message.getContent();
}
}策略三:语义召回(RAG风格的上下文管理)
不是简单截断或摘要,而是把历史对话存入向量库,每次根据当前问题召回最相关的历史片段:
@Service
@Slf4j
public class SemanticContextRetriever {
private final EmbeddingModel embeddingModel;
private final VectorStore vectorStore;
private final int topK;
private final double similarityThreshold;
public SemanticContextRetriever(EmbeddingModel embeddingModel,
VectorStore vectorStore) {
this.embeddingModel = embeddingModel;
this.vectorStore = vectorStore;
this.topK = 5;
this.similarityThreshold = 0.7;
}
/**
* 存储对话轮次到向量库
*/
public void storeExchange(String sessionId, String userInput,
String aiResponse, int turnIndex) {
// 把一轮对话存为一个Document
String content = String.format("用户:%s\nAI:%s", userInput, aiResponse);
Document doc = Document.builder()
.content(content)
.metadata(Map.of(
"session_id", sessionId,
"turn_index", turnIndex,
"user_input", userInput,
"timestamp", System.currentTimeMillis()
))
.build();
vectorStore.add(List.of(doc));
}
/**
* 根据当前问题检索相关历史
*/
public List<Message> retrieveRelevantHistory(String sessionId,
String currentInput,
int recentTurns) {
// 用向量相似度找相关历史
SearchRequest searchRequest = SearchRequest.query(currentInput)
.withTopK(topK)
.withSimilarityThreshold(similarityThreshold)
.withFilterExpression("session_id == '" + sessionId + "'");
List<Document> relevantDocs = vectorStore.similaritySearch(searchRequest);
// 转换为消息列表
return relevantDocs.stream()
.sorted(Comparator.comparingInt(
d -> (int) d.getMetadata().get("turn_index")))
.map(doc -> (Message) new SystemMessage(
"[相关历史对话]\n" + doc.getContent()))
.collect(Collectors.toList());
}
}策略四:分层记忆(最接近人类记忆的设计)
人类的记忆有不同层次:短期记忆(最近的事)、工作记忆(当前任务相关的)、长期记忆(重要的经历)。
参考这个结构设计AI的记忆体系:
@Service
@Slf4j
public class HierarchicalMemoryManager {
// 短期记忆:最近几轮,完整保留
private final int shortTermWindow = 10;
// 工作记忆:当前任务相关,动态提取
private final WorkingMemoryExtractor workingMemoryExtractor;
// 长期记忆:跨会话的重要信息,持久化存储
private final LongTermMemoryStore longTermStore;
// 语义检索器:从历史中召回相关内容
private final SemanticContextRetriever semanticRetriever;
public MemoryContext buildContext(String userId, String sessionId,
List<Message> rawHistory,
String currentInput) {
// 1. 短期记忆:取最近的对话
List<Message> shortTermMemory = rawHistory.stream()
.skip(Math.max(0, rawHistory.size() - shortTermWindow))
.collect(Collectors.toList());
// 2. 语义记忆:从历史中找当前问题的相关内容
List<Message> semanticMemory = semanticRetriever
.retrieveRelevantHistory(sessionId, currentInput, shortTermWindow);
// 去除与短期记忆重复的内容
Set<String> recentContents = shortTermMemory.stream()
.map(Message::getContent)
.collect(Collectors.toSet());
semanticMemory = semanticMemory.stream()
.filter(m -> !recentContents.contains(m.getContent()))
.collect(Collectors.toList());
// 3. 长期记忆:用户跨会话的偏好和重要信息
UserLongTermMemory ltm = longTermStore.getMemory(userId);
String longTermContext = formatLongTermMemory(ltm);
// 4. 工作记忆:当前任务的关键信息摘要
String workingMemory = workingMemoryExtractor
.extract(rawHistory, currentInput);
return MemoryContext.builder()
.longTermContext(longTermContext)
.workingMemory(workingMemory)
.semanticHistory(semanticMemory)
.recentHistory(shortTermMemory)
.build();
}
/**
* 更新长期记忆(在对话结束或达到一定轮数时调用)
*/
public void updateLongTermMemory(String userId, List<Message> history) {
// 提取值得长期记忆的信息
UserLongTermMemory ltm = longTermStore.getMemory(userId);
// 提取用户的偏好
extractUserPreferences(history).forEach(pref ->
ltm.addPreference(pref.getKey(), pref.getValue()));
// 提取重要事实
extractKeyFacts(history).forEach(fact -> ltm.addFact(fact));
longTermStore.save(userId, ltm);
}
private String formatLongTermMemory(UserLongTermMemory ltm) {
if (ltm == null || ltm.isEmpty()) return "";
StringBuilder sb = new StringBuilder("[用户记忆]\n");
if (!ltm.getPreferences().isEmpty()) {
sb.append("偏好:").append(String.join(";", ltm.getPreferences())).append("\n");
}
if (!ltm.getKeyFacts().isEmpty()) {
sb.append("重要信息:").append(String.join(";", ltm.getKeyFacts())).append("\n");
}
return sb.toString();
}
}完整的上下文管理Pipeline
把上面的策略组合成一个完整的管理流程:
@Service
@Slf4j
public class ContextAwareChatService {
private final ChatClient chatClient;
private final HierarchicalMemoryManager memoryManager;
private final SessionRepository sessionRepository;
private final int simpleHistoryThreshold = 20; // 超过20轮才启用复杂管理
public String chat(String userId, String sessionId, String userInput) {
// 加载历史
List<Message> history = sessionRepository.getHistory(sessionId);
List<Message> contextMessages;
if (history.size() < simpleHistoryThreshold) {
// 短对话直接用完整历史
contextMessages = history;
} else {
// 长对话用分层记忆
MemoryContext memCtx = memoryManager.buildContext(
userId, sessionId, history, userInput);
contextMessages = buildMessagesFromContext(memCtx);
}
// 估算Token数(粗估)
int estimatedTokens = estimateTokens(contextMessages);
log.debug("上下文Token估算: {}, 消息数: {}", estimatedTokens, contextMessages.size());
// 如果还是太长,做最后的截断保护
if (estimatedTokens > 100000) { // 留一些给响应
contextMessages = emergencyTrim(contextMessages);
}
String response = chatClient.prompt()
.messages(contextMessages)
.user(userInput)
.call()
.content();
// 保存本轮对话
sessionRepository.addMessage(sessionId, new UserMessage(userInput));
sessionRepository.addMessage(sessionId, new AssistantMessage(response));
// 异步更新语义索引
CompletableFuture.runAsync(() ->
memoryManager.storeExchangeAsync(sessionId, userInput, response));
return response;
}
private List<Message> buildMessagesFromContext(MemoryContext ctx) {
List<Message> messages = new ArrayList<>();
// 长期记忆作为额外的System消息
if (!ctx.getLongTermContext().isEmpty()) {
messages.add(new SystemMessage(ctx.getLongTermContext()));
}
// 工作记忆
if (!ctx.getWorkingMemory().isEmpty()) {
messages.add(new SystemMessage("[当前任务上下文]\n" + ctx.getWorkingMemory()));
}
// 相关语义历史
messages.addAll(ctx.getSemanticHistory());
// 最近的对话(最重要,放最后)
messages.addAll(ctx.getRecentHistory());
return messages;
}
private int estimateTokens(List<Message> messages) {
// 粗估:中文约1.5字/token,英文约4字符/token
return messages.stream()
.mapToInt(m -> m.getContent() != null ?
m.getContent().length() / 2 : 0)
.sum();
}
private List<Message> emergencyTrim(List<Message> messages) {
// 紧急截断:只保留SystemMessage和最近10条
List<Message> result = new ArrayList<>();
messages.stream()
.filter(m -> m instanceof SystemMessage)
.forEach(result::add);
List<Message> nonSystem = messages.stream()
.filter(m -> !(m instanceof SystemMessage))
.collect(Collectors.toList());
int start = Math.max(0, nonSystem.size() - 10);
result.addAll(nonSystem.subList(start, nonSystem.size()));
log.warn("紧急截断:消息从 {} 条减少到 {} 条", messages.size(), result.size());
return result;
}
}实际效果和成本对比
我们做了一个对比实验,对比了不同策略在200轮对话下的Token消耗:
| 策略 | 第200轮Token消耗 | 总累计消耗 | 用户体验 |
|---|---|---|---|
| 无管理(直接传递) | 约38,000 tokens | 约380万 tokens | 前期好,后期崩 |
| 简单截断(保留20轮) | 约4,000 tokens | 约80万 tokens | 前期好,后期失忆 |
| 摘要压缩 | 约5,500 tokens | 约110万 tokens | 整体连贯,偶有遗漏 |
| 分层记忆 | 约6,800 tokens | 约136万 tokens | 最佳连贯性 |
分层记忆方案虽然比摘要压缩贵了约20%,但用户的对话连贯性和满意度要高很多。而且相比无管理方案省了约64%的Token,在200轮以上的长对话里这个成本差距非常大。
一个容易被忽视的细节:Token的精确计算
我上面的代码用的是粗估,实际生产中建议用精确的Token计数:
@Component
public class TokenCounter {
// 不同模型用不同的tokenizer
private final Map<String, Encoding> encoders = new HashMap<>();
@PostConstruct
public void init() {
// 使用tiktoken4j库
encoders.put("gpt-4", Encodings.newDefaultEncodingRegistry()
.getEncoding(EncodingType.CL100K_BASE).orElseThrow());
encoders.put("gpt-3.5-turbo", encoders.get("gpt-4"));
}
public int countTokens(String text, String model) {
Encoding encoder = encoders.getOrDefault(model, encoders.get("gpt-4"));
if (encoder == null) {
// fallback粗估
return text.length() / 3;
}
return encoder.encode(text).size();
}
public int countMessagesTokens(List<Message> messages, String model) {
// 每条消息还有固定的overhead(role token等)
int overhead = messages.size() * 4; // 大概每条消息4个token的overhead
return messages.stream()
.mapToInt(m -> countTokens(m.getContent() != null ?
m.getContent() : "", model))
.sum() + overhead;
}
}上下文管理没有最优解,只有最适合你业务场景的解。如果你的对话大多数在10-20轮以内,简单截断就够了。如果有大量的深度长对话,分层记忆才值得投入。关键是要先量化现有的问题,再选择合适的方案。
