LLM上下文窗口优化:长文本处理的5种工程化解决方案
LLM上下文窗口优化:长文本处理的5种工程化解决方案
适读人群:需要处理长文档、长对话的Java工程师,遇到"context length exceeded"错误的开发者 阅读时长:约18分钟 文章价值:系统掌握长文本处理的5种方案,针对不同场景选择最优策略
合同审查系统的崩溃
去年有个读者给我发消息,他在做一个合同审查系统,客户上传合同PDF,系统用AI分析风险条款。
开发期间一切顺利,上线后第三天就崩了。原因是某客户上传了一份200页的并购合同,一次性塞进prompt,直接触发了"context length exceeded"。
他问我怎么办,要不要换一个支持更长上下文的模型?
我说换模型是最后手段,不是第一反应。200页合同塞进128K上下文,即使技术上可行,也会有两个问题:费用爆炸(token数量乘以价格)、推理变慢(注意力机制O(n²)复杂度)。
真正的解法是工程化地处理长文本。
这篇文章,我把5种方案从简单到复杂逐一拆解。
先理解问题:上下文窗口的本质限制
主流模型上下文限制:
| 模型 | 上下文窗口 | 约合中文字数 | 成本(每百万输入token) |
|---|---|---|---|
| GPT-4o-mini | 128K | 约19万字 | $0.15 |
| GPT-4o | 128K | 约19万字 | $2.50 |
| Claude 3.5 Sonnet | 200K | 约30万字 | $3.00 |
| Gemini 1.5 Pro | 1M | 约150万字 | $1.25 |
即使1M的Gemini,也有200页合同这样的极端case。更重要的是,窗口越大,成本越高,速度越慢。
方案一:文本分块(Chunking)
最基础的方案,把长文档切成小块分批处理:
@Service
@Slf4j
public class DocumentChunkingService {
private static final int CHUNK_SIZE = 1500; // 块大小(tokens约)
private static final int CHUNK_OVERLAP = 200; // 重叠区(保证上下文连贯)
/**
* 按语义段落分块(比固定长度分块效果好)
*/
public List<String> semanticChunk(String document) {
// 先按自然段落分割
String[] paragraphs = document.split("\n\n+");
List<String> chunks = new ArrayList<>();
StringBuilder currentChunk = new StringBuilder();
for (String paragraph : paragraphs) {
int currentLen = estimateTokens(currentChunk.toString());
int paraLen = estimateTokens(paragraph);
if (currentLen + paraLen > CHUNK_SIZE && currentChunk.length() > 0) {
// 当前块已满,保存并开始新块
chunks.add(currentChunk.toString().trim());
// 保留最后一段作为重叠(保证上下文连贯)
String[] sentences = currentChunk.toString().split("[。!?]");
int overlapChars = 0;
currentChunk = new StringBuilder();
for (int i = sentences.length - 1; i >= 0; i--) {
overlapChars += sentences[i].length();
if (overlapChars >= CHUNK_OVERLAP * 2) break; // 粗略字符到token转换
currentChunk.insert(0, sentences[i] + "。");
}
}
currentChunk.append(paragraph).append("\n\n");
}
if (currentChunk.length() > 0) {
chunks.add(currentChunk.toString().trim());
}
log.info("文档分块完成: totalChunks={}", chunks.size());
return chunks;
}
/**
* Map-Reduce模式:分块处理 → 汇总
*/
public String mapReduceAnalysis(String document, String question) {
List<String> chunks = semanticChunk(document);
// Map阶段:每块独立分析
List<String> chunkResults = chunks.parallelStream()
.map(chunk -> analyzeChunk(chunk, question))
.collect(Collectors.toList());
// Reduce阶段:汇总所有分析结果
return summarizeResults(chunkResults, question);
}
private String analyzeChunk(String chunk, String question) {
return chatClient.prompt()
.user(String.format("""
请分析以下文档片段,重点关注与问题相关的内容:
问题:%s
文档片段:
%s
请提取与问题相关的关键信息(如果本段无相关信息,请回复"本段无相关内容"):
""", question, chunk))
.call()
.content();
}
private String summarizeResults(List<String> results, String question) {
// 过滤掉无相关内容的块
List<String> relevantResults = results.stream()
.filter(r -> !r.contains("本段无相关内容"))
.collect(Collectors.toList());
String combinedResults = IntStream.range(0, relevantResults.size())
.mapToObj(i -> "分析片段" + (i+1) + ":\n" + relevantResults.get(i))
.collect(Collectors.joining("\n\n"));
return chatClient.prompt()
.user(String.format("""
基于以下各段分析结果,给出关于"%s"的综合回答:
%s
""", question, combinedResults))
.call()
.content();
}
private int estimateTokens(String text) {
return (int) (text.length() / 1.5);
}
}适用场景:文档问答、合同分析、报告摘要
局限:Map阶段的并行调用有费用,跨块信息可能断裂
方案二:滑动窗口(对话历史管理)
长对话场景,历史消息会撑爆上下文:
@Service
@Slf4j
public class SlidingWindowChatService {
private final ChatClient chatClient;
// 方案A:保留最近N条消息(最简单)
public String chatWithRecentHistory(String sessionId,
String newMessage,
int maxHistoryMessages) {
List<Message> history = getSessionHistory(sessionId);
// 只保留最近N条
List<Message> trimmedHistory = history.size() > maxHistoryMessages
? history.subList(history.size() - maxHistoryMessages, history.size())
: history;
String response = chatClient.prompt()
.messages(trimmedHistory)
.user(newMessage)
.call()
.content();
// 保存新消息
saveToHistory(sessionId, newMessage, response);
return response;
}
// 方案B:摘要压缩(保留信息完整性)
public String chatWithSummaryCompression(String sessionId, String newMessage) {
List<Message> history = getSessionHistory(sessionId);
// 计算token用量
int totalTokens = estimateTotalTokens(history, newMessage);
if (totalTokens > 80000) { // 阈值:超过80K就压缩
// 把早期历史压缩为摘要
int keepCount = 6; // 保留最近6条
List<Message> oldHistory = history.subList(0, history.size() - keepCount);
List<Message> recentHistory = history.subList(history.size() - keepCount, history.size());
// 生成早期对话摘要
String summary = compressHistory(oldHistory);
// 用摘要替换早期历史
Message summaryMessage = new SystemMessage(
"以下是早期对话的摘要:\n" + summary + "\n\n请基于此继续对话。"
);
List<Message> compressedHistory = new ArrayList<>();
compressedHistory.add(summaryMessage);
compressedHistory.addAll(recentHistory);
return chatClient.prompt()
.messages(compressedHistory)
.user(newMessage)
.call()
.content();
}
return chatClient.prompt()
.messages(history)
.user(newMessage)
.call()
.content();
}
private String compressHistory(List<Message> messages) {
String historyText = messages.stream()
.map(m -> m.getMessageType() + ": " + m.getContent())
.collect(Collectors.joining("\n"));
return chatClient.prompt()
.user("请将以下对话历史压缩为简洁的摘要,保留关键信息和决策点:\n\n" + historyText)
.call()
.content();
}
}适用场景:长对话客服、多轮问答
局限:摘要本身需要一次LLM调用,有额外成本
方案三:RAG动态检索(推荐方案)
不是把全文塞进去,而是只检索最相关的片段:
@Service
@Slf4j
public class LongDocumentRagService {
private final VectorStore vectorStore;
private final ChatClient chatClient;
private final DocumentChunkingService chunkingService;
/**
* 一次性向量化大文档(离线处理)
*/
public void indexDocument(String docId, String content) {
// 分块
List<String> chunks = chunkingService.semanticChunk(content);
// 构建Document列表
List<Document> documents = IntStream.range(0, chunks.size())
.mapToObj(i -> {
Document doc = new Document(chunks.get(i));
doc.getMetadata().put("docId", docId);
doc.getMetadata().put("chunkIndex", i);
doc.getMetadata().put("totalChunks", chunks.size());
return doc;
})
.collect(Collectors.toList());
// 向量化存储
vectorStore.add(documents);
log.info("文档已索引: docId={}, chunks={}", docId, chunks.size());
}
/**
* 在线查询:只取最相关的几个块
*/
public String queryDocument(String docId, String question) {
// 只检索最相关的3-5个块(而不是全文)
Filter.Expression filter = new Filter.Expression(
Filter.ExpressionType.EQ,
new Filter.Key("docId"),
new Filter.Value(docId)
);
List<Document> relevantChunks = vectorStore.similaritySearch(
SearchRequest.query(question)
.withTopK(5)
.withSimilarityThreshold(0.7)
.withFilterExpression(filter)
);
if (relevantChunks.isEmpty()) {
return "在文档中未找到与问题相关的内容。";
}
// 按chunkIndex排序(保证上下文顺序)
relevantChunks.sort(Comparator.comparingInt(
d -> (int) d.getMetadata().getOrDefault("chunkIndex", 0)
));
String context = relevantChunks.stream()
.map(Document::getContent)
.collect(Collectors.joining("\n\n---\n\n"));
return chatClient.prompt()
.user(String.format("""
基于以下文档内容回答问题。请只使用提供的信息,不要凭空推断。
文档内容:
%s
问题:%s
""", context, question))
.call()
.content();
}
}适用场景:知识库问答、文档检索
局限:需要提前索引,不适合实时上传的临时文档分析
方案四:层次摘要(递归压缩)
对需要全文理解的场景(如合同完整性审查),用层次摘要:
@Service
@Slf4j
public class HierarchicalSummaryService {
private final ChatClient chatClient;
private final DocumentChunkingService chunkingService;
/**
* 递归摘要:叶子→中间→根
* 适合需要理解全文的场景
*/
public String hierarchicalSummarize(String document, String focus) {
List<String> chunks = chunkingService.semanticChunk(document);
log.info("开始层次摘要: totalChunks={}", chunks.size());
// 第一层:每个块生成摘要
List<String> level1Summaries = chunks.parallelStream()
.map(chunk -> summarizeChunk(chunk, focus))
.collect(Collectors.toList());
// 如果摘要还是太多,继续递归压缩
List<String> currentLevel = level1Summaries;
int levelCount = 1;
while (currentLevel.size() > 3) {
log.info("继续压缩: level={}, count={}", levelCount, currentLevel.size());
// 每5个摘要合并成一个
List<String> nextLevel = new ArrayList<>();
for (int i = 0; i < currentLevel.size(); i += 5) {
List<String> group = currentLevel.subList(
i, Math.min(i + 5, currentLevel.size())
);
nextLevel.add(mergeSummaries(group, focus));
}
currentLevel = nextLevel;
levelCount++;
}
// 最终汇总
return finalSummary(currentLevel, focus);
}
private String summarizeChunk(String chunk, String focus) {
return chatClient.prompt()
.user(String.format("""
请对以下文本进行摘要,重点关注:%s
文本:
%s
摘要(200字以内):
""", focus, chunk))
.call()
.content();
}
private String mergeSummaries(List<String> summaries, String focus) {
String combined = IntStream.range(0, summaries.size())
.mapToObj(i -> "摘要" + (i+1) + ":" + summaries.get(i))
.collect(Collectors.joining("\n\n"));
return chatClient.prompt()
.user(String.format("""
请将以下多个摘要合并为一个连贯的摘要,重点关注:%s
%s
合并摘要(300字以内):
""", focus, combined))
.call()
.content();
}
private String finalSummary(List<String> summaries, String focus) {
String combined = String.join("\n\n", summaries);
return chatClient.prompt()
.user(String.format("""
基于以下摘要,给出完整的分析报告,重点关注:%s
%s
""", focus, combined))
.call()
.content();
}
}适用场景:合同完整性审查、长篇报告分析
局限:多次LLM调用,成本和时间随文档长度线性增长
方案五:Agent工具调用(最灵活)
让LLM自己决定读哪些部分:
@Service
@Slf4j
public class DocumentAgentService {
private final ChatClient chatClient;
private final Map<String, List<String>> documentChunks = new ConcurrentHashMap<>();
public DocumentAgentService(ChatClient.Builder builder) {
this.chatClient = builder
.defaultSystem("""
你是一个文档分析助手。
你有工具可以读取文档的不同部分。
分析问题时,先判断需要读哪些部分,再有针对性地读取,不要读取不必要的部分。
""")
.build();
}
public String analyzeDocument(String docId, String question) {
// 先索引文档
// documentChunks.put(docId, chunks);
return chatClient.prompt()
.user("请分析文档 " + docId + " 中关于以下问题的内容:" + question)
.functions(
FunctionCallback.builder()
.function("getDocumentOutline",
(GetOutlineRequest req) -> getOutline(req.docId()))
.description("获取文档的章节大纲,了解文档结构")
.inputType(GetOutlineRequest.class)
.build(),
FunctionCallback.builder()
.function("readDocumentSection",
(ReadSectionRequest req) -> readSection(req.docId(), req.chunkIndex()))
.description("读取文档的指定章节内容")
.inputType(ReadSectionRequest.class)
.build(),
FunctionCallback.builder()
.function("searchInDocument",
(SearchRequest req) -> searchInDoc(req.docId(), req.keyword()))
.description("在文档中搜索关键词,返回相关段落")
.inputType(SearchRequest.class)
.build()
)
.call()
.content();
}
private String getOutline(String docId) {
List<String> chunks = documentChunks.get(docId);
if (chunks == null) return "文档未找到";
return IntStream.range(0, chunks.size())
.mapToObj(i -> "第" + (i+1) + "部分: " +
chunks.get(i).substring(0, Math.min(50, chunks.get(i).length())) + "...")
.collect(Collectors.joining("\n"));
}
private String readSection(String docId, int chunkIndex) {
List<String> chunks = documentChunks.get(docId);
if (chunks == null || chunkIndex >= chunks.size()) return "章节未找到";
return chunks.get(chunkIndex);
}
private String searchInDoc(String docId, String keyword) {
List<String> chunks = documentChunks.get(docId);
if (chunks == null) return "文档未找到";
return chunks.stream()
.filter(chunk -> chunk.contains(keyword))
.limit(3)
.collect(Collectors.joining("\n---\n"));
}
record GetOutlineRequest(String docId) {}
record ReadSectionRequest(String docId, int chunkIndex) {}
record SearchRequest(String docId, String keyword) {}
}适用场景:复杂文档分析、需要灵活跳转章节的场景
局限:多轮Function Calling有延迟,不适合简单查询
五种方案对比
| 方案 | 适用场景 | 成本 | 速度 | 信息完整性 |
|---|---|---|---|---|
| 分块+MapReduce | 临时文档,特定问题 | 中 | 中(可并行) | 一般 |
| 滑动窗口 | 长对话历史 | 低 | 快 | 近期完整 |
| RAG检索 | 可索引知识库 | 低 | 快 | 高(相关片段) |
| 层次摘要 | 全文理解场景 | 高 | 慢 | 高但有信息损失 |
| Agent工具调用 | 复杂动态分析 | 中 | 慢 | 按需 |
小结
上下文窗口不够用,解法有5种,按实际场景选择:
- 首选RAG:能索引的知识库,RAG是王道
- 对话场景:滑动窗口+摘要压缩,平衡成本和连贯性
- 全文分析:层次摘要,接受成本换完整性
- 动态场景:Agent工具调用,灵活但慢
合同审查那个案例,最终选择了方案四(层次摘要)加上方案三(关键条款预索引)的组合:先对常见条款建索引快速检索,对于需要全文理解的完整性审查用层次摘要。两者配合,既快又准。
