第2100篇:上下文窗口的工程优化——让LLM用更少Token做更多事
2026/4/30大约 11 分钟
第2100篇:上下文窗口的工程优化——让LLM用更少Token做更多事
适读人群:关注LLM调用成本和效果的工程师 | 阅读时长:约20分钟 | 核心价值:掌握上下文压缩、历史对话管理、动态截断等策略,在保持对话质量的前提下大幅降低Token消耗
第2100篇了,写这个系列不知不觉过了一百篇。
这篇聊一个所有LLM应用都会碰到的问题:上下文窗口的工程管理。
新手的典型做法是把所有对话历史都塞进Prompt,结果token消耗直线增长,到对话后期每次调用可能消耗几万token。更要命的是,过多的无关历史会干扰LLM对当前问题的关注,反而使效果下降。
上下文不是越多越好,也不是越少越好,而是需要精心管理。
上下文膨胀的代价
/**
* 上下文膨胀的真实成本
*
* 场景:用户和AI的连续20轮对话
* 每轮平均用户500token + AI回复1000token = 1500token
*
* 朴素方案(保留所有历史):
* - 第1轮:1500 tokens
* - 第10轮:15000 tokens
* - 第20轮:30000 tokens
* - 平均每轮:15750 tokens
* - 20轮总消耗:315000 tokens
* - 按GPT-4o价格 $0.005/1K tokens:$1.57/次对话
*
* 优化后(智能上下文管理):
* - 保持每轮约3000-5000 tokens
* - 20轮总消耗:80000 tokens(减少75%)
* - 成本:$0.40/次对话
*
* 对于日活10万用户,成本差距是每天$117 vs $30
* 年化差距接近$32000
*
* 而且长上下文还有另一个代价:LLM在超长context里的注意力会稀释,
* 重要信息被淹没,效果反而不如短而精准的上下文
*/对话历史管理策略
/**
* 对话历史管理器
*
* 核心思路:不是保留所有历史,而是保留最有价值的历史
* 价值 = 相关性 × 时效性
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class ConversationHistoryManager {
// 最大保留token数(用于历史)
private static final int MAX_HISTORY_TOKENS = 4000;
// 最大保留轮数
private static final int MAX_ROUNDS = 10;
private final TokenCounter tokenCounter;
private final MessageSummarizer summarizer;
/**
* 获取优化后的历史消息列表
*
* 策略:
* 1. 最近N轮:完整保留(保证连续性)
* 2. 早期重要消息:压缩摘要
* 3. 超出token限制:继续删减
*/
public List<ChatMessage> getOptimizedHistory(
String sessionId,
List<ChatMessage> fullHistory,
String currentQuestion) {
if (fullHistory.isEmpty()) return List.of();
// 策略1:最近N轮直接保留
int keepRecent = Math.min(6, fullHistory.size());
List<ChatMessage> recentMessages = fullHistory.subList(
fullHistory.size() - keepRecent, fullHistory.size());
// 计算最近消息的token数
int recentTokens = countTokens(recentMessages);
if (recentTokens >= MAX_HISTORY_TOKENS) {
// 最近N轮就超限了,只保留最近3轮
int keep = Math.min(3 * 2, fullHistory.size()); // 3轮 = 6条消息
return fullHistory.subList(fullHistory.size() - keep, fullHistory.size());
}
// 策略2:早期消息做摘要
List<ChatMessage> olderMessages = fullHistory.subList(
0, fullHistory.size() - keepRecent);
if (!olderMessages.isEmpty()) {
int remainingBudget = MAX_HISTORY_TOKENS - recentTokens;
// 如果早期消息也在预算内,全保留
int olderTokens = countTokens(olderMessages);
if (olderTokens <= remainingBudget) {
return fullHistory;
}
// 早期消息超预算,生成摘要
String summary = summarizer.summarize(olderMessages);
List<ChatMessage> result = new ArrayList<>();
result.add(SystemMessage.from("【之前对话摘要】\n" + summary));
result.addAll(recentMessages);
log.debug("历史压缩: sessionId={}, original={}条, after={}条",
sessionId, fullHistory.size(), result.size());
return result;
}
return recentMessages;
}
/**
* 基于相关性的历史过滤
*
* 不保留所有历史,只保留和当前问题相关的历史
* 适用于非连续对话(用户跳跃式提问)
*/
public List<ChatMessage> getRelevantHistory(
List<ChatMessage> fullHistory,
String currentQuestion,
EmbeddingModel embeddingModel,
EmbeddingStore historyEmbeddingStore) {
// 嵌入当前问题
Embedding questionEmbedding = embeddingModel.embed(currentQuestion).content();
// 检索最相关的历史消息
EmbeddingSearchRequest request = EmbeddingSearchRequest.builder()
.queryEmbedding(questionEmbedding)
.maxResults(4)
.minScore(0.65)
.build();
List<EmbeddingMatch<TextSegment>> relevant =
historyEmbeddingStore.search(request).matches();
// 始终保留最近2轮(保证对话连续性)
int keepRecent = Math.min(4, fullHistory.size());
List<ChatMessage> recent = fullHistory.subList(
fullHistory.size() - keepRecent, fullHistory.size());
// 合并相关历史和最近历史(去重,按时间排序)
Set<String> recentContents = recent.stream()
.map(m -> m.text())
.collect(Collectors.toSet());
List<ChatMessage> result = new ArrayList<>();
// 添加相关但不在最近列表里的历史
relevant.stream()
.map(m -> m.embedded().text())
.filter(t -> !recentContents.contains(t))
.map(SystemMessage::from) // 简化处理,实际要保留原始角色
.forEach(result::add);
result.addAll(recent);
return result;
}
private int countTokens(List<ChatMessage> messages) {
return messages.stream()
.mapToInt(m -> tokenCounter.count(m.text()))
.sum();
}
}消息压缩摘要
/**
* 对话历史摘要生成
*
* 将大量历史对话压缩成精简摘要
* 同时保留关键信息:用户身份、已解决的问题、重要上下文
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class MessageSummarizer {
private final ChatLanguageModel llm;
/**
* 将历史对话压缩成摘要
*/
public String summarize(List<ChatMessage> messages) {
if (messages.isEmpty()) return "";
String conversationText = formatMessages(messages);
String prompt = """
请对以下对话历史进行压缩摘要。
对话历史:
%s
**摘要要求**:
1. 保留用户的核心诉求和偏好
2. 保留已经明确的事实信息(如:用户的角色、已确认的条件等)
3. 保留已达成的共识和结论
4. 去掉过程性对话(比如"好的"、"明白了"等无实质内容的回复)
5. 摘要控制在300字以内
直接输出摘要内容,不要额外格式。
""".formatted(conversationText);
try {
return llm.generate(prompt).trim();
} catch (Exception e) {
log.warn("摘要生成失败,使用简单截断: {}", e.getMessage());
// 降级:取最后1000字符
return conversationText.length() > 1000
? conversationText.substring(conversationText.length() - 1000)
: conversationText;
}
}
/**
* 渐进式摘要:每N轮自动压缩一次
*
* 策略:保持一个滚动摘要,每隔N轮更新一次
* 不是每次都重新摘要全部历史,而是把新的历史追加到已有摘要
*/
public String updateRollingSummary(String existingSummary,
List<ChatMessage> newMessages) {
String newConversation = formatMessages(newMessages);
String prompt = """
已有的对话摘要:
%s
新增的对话(需要融入摘要):
%s
请更新摘要,把新增内容融入已有摘要,保持摘要300字以内。
重点:记录新增的事实、结论和用户需求,删除已过时的信息。
直接输出更新后的摘要。
""".formatted(
existingSummary.isBlank() ? "(无历史摘要)" : existingSummary,
newConversation
);
try {
return llm.generate(prompt).trim();
} catch (Exception e) {
log.warn("摘要更新失败: {}", e.getMessage());
return existingSummary;
}
}
private String formatMessages(List<ChatMessage> messages) {
return messages.stream()
.map(m -> {
String role = m instanceof UserMessage ? "用户" : "助手";
return role + ":" + m.text();
})
.collect(Collectors.joining("\n"));
}
}动态上下文预算分配
/**
* 动态上下文预算分配
*
* 不同组件竞争有限的上下文窗口
* 需要智能分配:System Prompt / 历史 / RAG检索 / 当前问题
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class ContextBudgetAllocator {
private final TokenCounter tokenCounter;
// 总上下文窗口(留出输出Token的空间)
// GPT-4o 128K context,留20K给输出,可用108K
// 但实践中建议不用满,保持在60-80K以内效果更好
private static final int TOTAL_CONTEXT_BUDGET = 60000;
// 各部分的最低和最高Token分配
private static final ContextLimits SYSTEM_PROMPT_LIMITS = new ContextLimits(500, 3000);
private static final ContextLimits HISTORY_LIMITS = new ContextLimits(500, 10000);
private static final ContextLimits RAG_CONTEXT_LIMITS = new ContextLimits(1000, 20000);
private static final ContextLimits CURRENT_QUESTION_LIMITS = new ContextLimits(10, 4000);
/**
* 分配上下文预算
*/
public ContextBudget allocate(ContextRequest request) {
int questionTokens = tokenCounter.count(request.getCurrentQuestion());
int systemPromptTokens = tokenCounter.count(request.getSystemPrompt());
// 确保必要的部分有足够空间
questionTokens = clamp(questionTokens,
CURRENT_QUESTION_LIMITS.min(), CURRENT_QUESTION_LIMITS.max());
systemPromptTokens = clamp(systemPromptTokens,
SYSTEM_PROMPT_LIMITS.min(), SYSTEM_PROMPT_LIMITS.max());
int remainingBudget = TOTAL_CONTEXT_BUDGET - questionTokens - systemPromptTokens;
if (remainingBudget <= 0) {
log.warn("上下文预算严重不足: question={}, systemPrompt={}",
questionTokens, systemPromptTokens);
return new ContextBudget(systemPromptTokens, 0, 0, questionTokens);
}
// 动态分配剩余预算给历史和RAG
// 根据任务类型调整比例
double historyRatio, ragRatio;
switch (request.getTaskType()) {
case MULTI_TURN_CHAT -> {
// 对话任务:历史更重要
historyRatio = 0.5;
ragRatio = 0.5;
}
case DOCUMENT_QA -> {
// 文档问答:RAG更重要
historyRatio = 0.2;
ragRatio = 0.8;
}
case CODE_GENERATION -> {
// 代码生成:历史和RAG都重要(代码库检索)
historyRatio = 0.4;
ragRatio = 0.6;
}
default -> {
historyRatio = 0.35;
ragRatio = 0.65;
}
}
int historyBudget = clamp(
(int)(remainingBudget * historyRatio),
HISTORY_LIMITS.min(), HISTORY_LIMITS.max());
int ragBudget = clamp(
remainingBudget - historyBudget,
RAG_CONTEXT_LIMITS.min(), RAG_CONTEXT_LIMITS.max());
log.debug("上下文预算分配: system={}, history={}, rag={}, question={}",
systemPromptTokens, historyBudget, ragBudget, questionTokens);
return new ContextBudget(systemPromptTokens, historyBudget, ragBudget, questionTokens);
}
private int clamp(int value, int min, int max) {
return Math.max(min, Math.min(max, value));
}
public record ContextLimits(int min, int max) {}
public record ContextBudget(
int systemPromptTokens, int historyTokens,
int ragContextTokens, int questionTokens
) {
public int total() {
return systemPromptTokens + historyTokens + ragContextTokens + questionTokens;
}
}
@Data
@Builder
public static class ContextRequest {
private String currentQuestion;
private String systemPrompt;
private TaskType taskType;
public enum TaskType {
MULTI_TURN_CHAT, DOCUMENT_QA, CODE_GENERATION, TASK_EXECUTION
}
}
}RAG上下文截断策略
/**
* RAG检索结果的智能截断
*
* 问题:RAG检索出来的文档片段超出预算怎么办?
* 不是简单的硬截断,而是按相关性降序截断
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class RagContextTruncator {
private final TokenCounter tokenCounter;
/**
* 在预算内选取最有价值的RAG片段
*/
public List<TextSegment> truncateToFit(
List<EmbeddingMatch<TextSegment>> matches, int tokenBudget) {
List<TextSegment> selected = new ArrayList<>();
int usedTokens = 0;
// 按相关度降序排列(通常已经是这个顺序)
for (EmbeddingMatch<TextSegment> match : matches) {
TextSegment segment = match.embedded();
int segmentTokens = tokenCounter.count(segment.text());
if (usedTokens + segmentTokens <= tokenBudget) {
selected.add(segment);
usedTokens += segmentTokens;
} else {
// 剩余预算能否放下截断版本
int remainingBudget = tokenBudget - usedTokens;
if (remainingBudget > 200) {
// 截断片段(保留前半部分,通常最重要的内容在开头)
String truncated = truncateText(segment.text(), remainingBudget);
selected.add(TextSegment.from(truncated, segment.metadata()));
usedTokens += tokenCounter.count(truncated);
}
break; // 预算用完
}
}
log.debug("RAG上下文截断: 输入{}个片段, 保留{}个, 使用{}tokens/{}预算",
matches.size(), selected.size(), usedTokens, tokenBudget);
return selected;
}
/**
* 按句子边界截断文本
*
* 避免在句子中间截断,影响语义完整性
*/
private String truncateText(String text, int maxTokens) {
// 按句子分割
String[] sentences = text.split("(?<=[。!?.!?])");
StringBuilder sb = new StringBuilder();
for (String sentence : sentences) {
if (tokenCounter.count(sb.toString() + sentence) <= maxTokens) {
sb.append(sentence);
} else {
break;
}
}
String result = sb.toString().trim();
// 如果连一个句子都放不下,强制截断
if (result.isEmpty() && !text.isEmpty()) {
result = text.substring(0, Math.min(text.length(), 200)) + "...";
}
return result;
}
/**
* 去重:RAG结果可能有重复内容(来自不同文档的相似段落)
*/
public List<EmbeddingMatch<TextSegment>> deduplicate(
List<EmbeddingMatch<TextSegment>> matches, double similarityThreshold) {
List<EmbeddingMatch<TextSegment>> deduped = new ArrayList<>();
for (EmbeddingMatch<TextSegment> candidate : matches) {
boolean isDuplicate = deduped.stream()
.anyMatch(existing ->
contentSimilarity(existing.embedded().text(),
candidate.embedded().text()) > similarityThreshold);
if (!isDuplicate) {
deduped.add(candidate);
}
}
return deduped;
}
private double contentSimilarity(String a, String b) {
// 简单的词重叠率(实际可以用更复杂的相似度)
Set<String> wordsA = Set.of(a.split("\\s+"));
Set<String> wordsB = Set.of(b.split("\\s+"));
long intersection = wordsA.stream().filter(wordsB::contains).count();
long union = wordsA.size() + wordsB.size() - intersection;
return union > 0 ? (double) intersection / union : 0;
}
}Token计数器
/**
* Token计数器
*
* 在截断之前需要知道文本有多少token
* 不同模型的tokenizer不同
*/
@Component
@Slf4j
public class TokenCounter {
/**
* 近似计数(不需要精确时用)
*
* 经验规则:
* - 英文:1 token ≈ 4字符
* - 中文:1 token ≈ 1.5字符(中文字符基本是1-2 token)
*/
public int count(String text) {
if (text == null || text.isEmpty()) return 0;
// 简单但够用的近似算法
// 英文单词计数(按空格分割)
long englishWords = Arrays.stream(text.split("\\s+"))
.filter(w -> w.matches("[a-zA-Z]+"))
.count();
// 中文字符计数
long chineseChars = text.chars()
.filter(c -> c >= 0x4E00 && c <= 0x9FFF)
.count();
// 其他字符
long otherChars = text.length() - englishWords * 4 - chineseChars;
// 近似计算
return (int)(englishWords + chineseChars + otherChars / 3);
}
/**
* 精确计数(使用Tiktoken或模型SDK)
*
* 注意:精确计数需要加载tokenizer,有启动开销
* 高频调用时建议用近似方法
*/
public int countExact(String text, String modelName) {
// 实际使用时集成tiktoken4j或模型SDK的tokenizer
// 这里用近似值代替
return count(text);
}
/**
* 估算整个Prompt的Token数
*/
public int countPrompt(List<ChatMessage> messages) {
// 每条消息有额外的overhead(角色标记等)
int baseOverhead = messages.size() * 4;
return baseOverhead + messages.stream()
.mapToInt(m -> count(m.text()))
.sum();
}
}上下文优化的整合
/**
* 完整的上下文优化流程
*
* 在每次LLM调用前自动处理上下文
*/
@Service
@RequiredArgsConstructor
@Slf4j
public class ContextOptimizationService {
private final ConversationHistoryManager historyManager;
private final ContextBudgetAllocator budgetAllocator;
private final RagContextTruncator ragTruncator;
private final MessageSummarizer summarizer;
private final TokenCounter tokenCounter;
/**
* 构建优化后的完整上下文
*/
public OptimizedContext build(ContextBuildRequest request) {
// 1. 分配预算
ContextBudgetAllocator.ContextBudget budget = budgetAllocator.allocate(
ContextBudgetAllocator.ContextRequest.builder()
.currentQuestion(request.getCurrentQuestion())
.systemPrompt(request.getSystemPrompt())
.taskType(request.getTaskType())
.build()
);
// 2. 优化历史消息
List<ChatMessage> optimizedHistory = historyManager.getOptimizedHistory(
request.getSessionId(),
request.getFullHistory(),
request.getCurrentQuestion()
);
// 确保历史在预算内
while (tokenCounter.countPrompt(optimizedHistory) > budget.historyTokens()
&& optimizedHistory.size() > 2) {
// 继续删减:去掉最早的消息对
optimizedHistory = optimizedHistory.subList(2, optimizedHistory.size());
}
// 3. 截断RAG结果
List<TextSegment> ragContext = request.getRagResults() != null
? ragTruncator.truncateToFit(request.getRagResults(), budget.ragContextTokens())
: List.of();
// 4. 构建最终结果
int totalTokens = budget.systemPromptTokens() +
tokenCounter.countPrompt(optimizedHistory) +
ragContext.stream().mapToInt(s -> tokenCounter.count(s.text())).sum() +
budget.questionTokens();
log.debug("上下文优化完成: totalTokens={}, history={}条, rag={}个片段",
totalTokens, optimizedHistory.size(), ragContext.size());
return new OptimizedContext(
request.getSystemPrompt(),
optimizedHistory,
ragContext,
request.getCurrentQuestion(),
totalTokens
);
}
public record ContextBuildRequest(
String sessionId, String systemPrompt, String currentQuestion,
List<ChatMessage> fullHistory,
List<EmbeddingMatch<TextSegment>> ragResults,
ContextBudgetAllocator.ContextRequest.TaskType taskType
) {}
public record OptimizedContext(
String systemPrompt, List<ChatMessage> history,
List<TextSegment> ragContext, String currentQuestion, int estimatedTokens
) {}
}实践经验
渐进式压缩优于突然截断
我见过一些实现在达到上下文限制时直接删掉最早的N条消息,用户体验非常差(AI突然"失忆")。更好的做法是渐进式的:先尝试摘要早期消息,摘要后还超限再删除。用户感知到的是AI"记住了要点但忘了细节",比完全失忆体验好很多。
不同任务有不同的最优上下文长度
经过多个项目的调优,我发现:纯聊天任务上下文6-10轮就够,再多对效果几乎没影响;RAG问答任务历史不重要,把预算都给检索内容效果最好;代码辅助任务需要更多历史(代码上下文很重要),但也要过滤掉无关的对话。
监控Token消耗是基本功
必须对每次LLM调用的token消耗做监控(输入token + 输出token)。接入Prometheus,按接口、按用户、按时间段统计。一旦某个接口的平均token突然上涨,通常意味着上下文管理出了问题或者有滥用行为。我们就靠这个监控发现过一个接口的Token消耗在某天翻了5倍——后来查出来是一个prompt里误加了一个大文件的内容。
