第1936篇:长文本处理的工程挑战——超出上下文窗口时的分块与合并策略
第1936篇:长文本处理的工程挑战——超出上下文窗口时的分块与合并策略
有一段时间,我们的用户经常反映:让AI总结一份几十页的合同,它要么说"抱歉,文档太长无法处理",要么给出的总结漏掉了很多关键条款。这两种结果都让用户很沮丧。
后来我在一个技术群里聊这个问题,发现这是普遍现象。很多团队第一反应是:换一个上下文更长的模型。Gemini 1.5 Pro有100万Token的上下文窗口,够了吧?但换了之后发现,更长的上下文带来了新问题:成本爆炸、推理变慢、而且"丢失在中间"(lost-in-the-middle)的问题——模型对处于上下文中间位置的信息的利用率远低于头尾。
超长上下文的正确处理姿势,不是简单地塞进更大的窗口,而是有一套工程化的分块与合并策略。
理解上下文窗口的本质限制
在讨论解决方案之前,先把问题的本质说清楚。
LLM的上下文窗口不只是长度限制,它有几个特性:
注意力衰减:Transformer的注意力机制在处理超长序列时,对远端位置的关注度会下降。即使模型声称支持128k Token,在实际效果上,前8000 Token和最后8000 Token的处理质量远好于中间部分。
成本非线性:GPT-4的API调用,128k Token的成本不是4k Token的32倍,而可能更高(大多数模型按输入输出Token计费,输入越长,每次请求成本越高)。
推理延迟线性增长:输入Token每翻倍,推理时间大约增加30%-50%,对实时性要求高的场景影响明显。
知道了这些,就能理解为什么即使上下文够长,合理的分块策略仍然有价值。
分块策略的几种模式
固定大小分块(最简单,效果最差)
public class FixedSizeChunker {
private final int chunkSize; // 每块的字符数
private final int overlapSize; // 相邻块的重叠字符数(保证连贯性)
public List<TextChunk> chunk(String text) {
List<TextChunk> chunks = new ArrayList<>();
int start = 0;
int chunkIndex = 0;
while (start < text.length()) {
int end = Math.min(start + chunkSize, text.length());
TextChunk chunk = TextChunk.builder()
.index(chunkIndex++)
.content(text.substring(start, end))
.startOffset(start)
.endOffset(end)
.build();
chunks.add(chunk);
// 下一块从(end - overlapSize)开始,形成重叠区域
start = end - overlapSize;
if (start >= text.length()) break;
}
return chunks;
}
}固定大小分块的问题:会在句子中间截断,导致语义不完整。比如一段完整的合同条款可能被截成两半,两半分别在不同的块里,模型处理每块时都只看到不完整的条款。
语义边界分块(推荐)
按自然语言边界分块,优先级从高到低:章节 > 段落 > 句子
public class SemanticBoundaryChunker {
private final int maxChunkTokens; // 每块最大Token数
private final TokenCounter tokenCounter;
public List<TextChunk> chunk(String text) {
// 第一步:识别文档结构
List<DocumentSection> sections = parseSections(text);
List<TextChunk> chunks = new ArrayList<>();
int chunkIndex = 0;
for (DocumentSection section : sections) {
if (tokenCounter.count(section.getContent()) <= maxChunkTokens) {
// 整个章节可以放进一个块
chunks.add(TextChunk.builder()
.index(chunkIndex++)
.content(section.getContent())
.sectionTitle(section.getTitle())
.sectionLevel(section.getLevel())
.build());
} else {
// 章节太长,需要按段落进一步切分
List<TextChunk> subChunks = chunkByParagraph(section, chunkIndex);
chunks.addAll(subChunks);
chunkIndex += subChunks.size();
}
}
return chunks;
}
private List<TextChunk> chunkByParagraph(DocumentSection section, int startIndex) {
String[] paragraphs = section.getContent().split("\n\n+");
List<TextChunk> chunks = new ArrayList<>();
StringBuilder currentChunk = new StringBuilder();
int currentChunkIndex = startIndex;
// 每个块的元数据前缀(帮助模型理解当前块的位置)
String contextPrefix = String.format("[%s]\n", section.getTitle());
for (String paragraph : paragraphs) {
String candidate = currentChunk.length() == 0
? contextPrefix + paragraph
: currentChunk + "\n\n" + paragraph;
if (tokenCounter.count(candidate) <= maxChunkTokens) {
currentChunk = new StringBuilder(candidate);
} else {
// 当前段落加进去会超限,先把之前的保存为一个块
if (currentChunk.length() > 0) {
chunks.add(TextChunk.builder()
.index(currentChunkIndex++)
.content(currentChunk.toString())
.sectionTitle(section.getTitle())
.build());
}
// 开始新块
currentChunk = new StringBuilder(contextPrefix + paragraph);
}
}
// 别忘了最后一个块
if (currentChunk.length() > 0) {
chunks.add(TextChunk.builder()
.index(currentChunkIndex)
.content(currentChunk.toString())
.sectionTitle(section.getTitle())
.isLast(true)
.build());
}
return chunks;
}
private List<DocumentSection> parseSections(String text) {
List<DocumentSection> sections = new ArrayList<>();
// 识别Markdown风格的标题
String[] lines = text.split("\n");
DocumentSection currentSection = null;
StringBuilder currentContent = new StringBuilder();
for (String line : lines) {
if (line.startsWith("# ")) {
saveSection(sections, currentSection, currentContent);
currentSection = new DocumentSection(line.substring(2), 1);
currentContent = new StringBuilder();
} else if (line.startsWith("## ")) {
saveSection(sections, currentSection, currentContent);
currentSection = new DocumentSection(line.substring(3), 2);
currentContent = new StringBuilder();
} else {
currentContent.append(line).append("\n");
}
}
saveSection(sections, currentSection, currentContent);
return sections;
}
}递归字符分块(实用折中方案)
这是LangChain等框架里比较常用的方式:按分隔符优先级递归切分,直到每块大小满足要求:
public class RecursiveCharacterChunker {
// 分隔符优先级,从高到低
private static final List<String> SEPARATORS = List.of(
"\n\n\n", // 多行空行(通常是章节分隔)
"\n\n", // 段落分隔
"\n", // 行分隔
"。", // 句子结束(中文)
";", // 分号
",", // 逗号(最后才用,尽量保持完整句子)
" " // 空格(英文)
);
private final int maxChunkSize; // 字符数
private final int minChunkSize; // 避免生成过小的碎片
private final int overlapSize;
public List<String> chunk(String text) {
return splitText(text, SEPARATORS);
}
private List<String> splitText(String text, List<String> separators) {
List<String> finalChunks = new ArrayList<>();
// 找到当前适用的分隔符
String separator = null;
List<String> newSeparators = null;
for (int i = 0; i < separators.size(); i++) {
String s = separators.get(i);
if (text.contains(s)) {
separator = s;
newSeparators = separators.subList(i + 1, separators.size());
break;
}
}
if (separator == null) {
// 找不到任何分隔符,强制切分
return forceChunk(text);
}
// 按当前分隔符切分
String[] splits = text.split(Pattern.quote(separator), -1);
StringBuilder goodSplits = new StringBuilder();
for (String split : splits) {
if (split.length() <= maxChunkSize) {
// 这个片段足够小
if (goodSplits.length() + split.length() + separator.length() <= maxChunkSize) {
if (goodSplits.length() > 0) goodSplits.append(separator);
goodSplits.append(split);
} else {
// 加入后会超限,保存当前积累的内容
if (goodSplits.length() >= minChunkSize) {
finalChunks.add(goodSplits.toString());
}
goodSplits = new StringBuilder(split);
}
} else {
// 这个片段本身太大,递归用更细的分隔符切分
if (goodSplits.length() > 0) {
finalChunks.add(goodSplits.toString());
goodSplits = new StringBuilder();
}
List<String> subChunks = splitText(split, newSeparators != null ? newSeparators : List.of());
finalChunks.addAll(subChunks);
}
}
if (goodSplits.length() > 0) {
finalChunks.add(goodSplits.toString());
}
return finalChunks;
}
}合并策略:从多块结果到最终答案
分块是第一步,各块处理完之后如何合并结果才是真正的难点。常见的合并模式有三种:
Map-Reduce模式(适合摘要类任务)
@Service
public class MapReduceSummarizer {
private final LlmClient llmClient;
private final TextChunker chunker;
public String summarize(String document, String requirement) {
// Map阶段:对每个块单独处理
List<String> chunks = chunker.chunk(document);
log.info("文档分为{}块进行处理", chunks.size());
List<String> chunkSummaries = processChunksInParallel(chunks, requirement);
// 如果摘要列表还是太长,递归Reduce
return recursiveReduce(chunkSummaries, requirement);
}
private List<String> processChunksInParallel(List<String> chunks, String requirement) {
return chunks.parallelStream()
.map(chunk -> summarizeChunk(chunk, requirement))
.collect(Collectors.toList());
}
private String summarizeChunk(String chunk, String requirement) {
String prompt = String.format("""
请对以下文本段落进行摘要。
要求:%s
注意:这是完整文档的一个片段,请忽略片段的不完整性,只处理当前内容。
如果当前片段与要求无关,请回复"此段落与查询无关"。
文本内容:
%s
请给出简洁摘要:
""", requirement, chunk);
return llmClient.complete(prompt);
}
private String recursiveReduce(List<String> summaries, String requirement) {
// 过滤掉无关的段落摘要
List<String> relevantSummaries = summaries.stream()
.filter(s -> !s.contains("此段落与查询无关"))
.collect(Collectors.toList());
// 计算合并后的长度
String combined = String.join("\n\n---\n\n", relevantSummaries);
if (tokenCounter.count(combined) <= MAX_TOKENS_FOR_REDUCTION) {
// 长度合适,直接做最终合并
return finalReduce(combined, requirement);
} else {
// 还是太长,继续分块Reduce
List<String> chunks = chunker.chunk(combined);
List<String> reducedChunks = processChunksInParallel(chunks, requirement);
return recursiveReduce(reducedChunks, requirement);
}
}
private String finalReduce(String combinedSummaries, String requirement) {
String prompt = String.format("""
以下是一份长文档各段落的摘要汇总。请基于这些摘要,综合生成一份完整的最终摘要。
原始需求:%s
各段落摘要:
%s
请生成综合摘要:
""", requirement, combinedSummaries);
return llmClient.complete(prompt);
}
}Refine模式(适合需要连贯性的任务)
Map-Reduce是并行处理,各块之间没有信息传递。对于需要前后关联的任务(比如总结一篇有逻辑递进的文章),用Refine模式更好:
@Service
public class RefineProcessor {
private final LlmClient llmClient;
private final TextChunker chunker;
public String process(String document, String task) {
List<String> chunks = chunker.chunk(document);
if (chunks.isEmpty()) return "";
// 先处理第一块,得到初始结果
String currentAnswer = processFirstChunk(chunks.get(0), task);
// 逐块"精炼"答案
for (int i = 1; i < chunks.size(); i++) {
currentAnswer = refineWithNextChunk(currentAnswer, chunks.get(i), task, i, chunks.size());
}
return currentAnswer;
}
private String processFirstChunk(String chunk, String task) {
String prompt = String.format("""
请根据以下文本内容,完成任务:%s
这是文档的第一部分,后续还有更多内容。
内容:
%s
请给出初步处理结果:
""", task, chunk);
return llmClient.complete(prompt);
}
private String refineWithNextChunk(String currentAnswer, String newChunk,
String task, int chunkIndex, int totalChunks) {
String prompt = String.format("""
你已经基于文档的前面部分给出了一个初步结果。现在有新的内容,请根据新内容更新你的结果。
原始任务:%s
当前进度:第%d块(共%d块)
之前的处理结果:
%s
新的内容(第%d部分):
%s
请根据新内容,更新并完善你的处理结果。如果新内容没有带来有价值的变化,可以保持之前的结果不变。
更新后的结果:
""", task, chunkIndex + 1, totalChunks, currentAnswer, chunkIndex + 1, newChunk);
return llmClient.complete(prompt);
}
}问答模式(适合RAG场景)
对于问答任务,不需要处理所有块,只需要找到相关的块来回答问题:
@Service
public class ChunkedQA {
private final LlmClient llmClient;
private final EmbeddingService embeddingService;
private final TextChunker chunker;
public String answer(String document, String question) {
// 分块
List<TextChunk> chunks = chunker.chunkWithMetadata(document);
// 对所有块生成嵌入向量(可以用向量数据库存储)
Map<TextChunk, float[]> chunkEmbeddings = new HashMap<>();
for (TextChunk chunk : chunks) {
float[] embedding = embeddingService.embed(chunk.getContent());
chunkEmbeddings.put(chunk, embedding);
}
// 对问题生成嵌入向量
float[] questionEmbedding = embeddingService.embed(question);
// 按相似度排序,选Top-K个最相关的块
int topK = 5;
List<TextChunk> relevantChunks = selectTopKChunks(
chunks, chunkEmbeddings, questionEmbedding, topK
);
// 按原文顺序重排(保持上下文连贯性)
relevantChunks.sort(Comparator.comparingInt(TextChunk::getIndex));
// 合并相关块,构建上下文
String context = relevantChunks.stream()
.map(c -> String.format("[段落%d]\n%s", c.getIndex() + 1, c.getContent()))
.collect(Collectors.joining("\n\n"));
// 回答问题
String prompt = String.format("""
请基于以下文档片段回答问题。
问题:%s
相关文档片段:
%s
请给出准确的回答,如果文档中没有相关信息,请明确说明。
""", question, context);
return llmClient.complete(prompt);
}
private List<TextChunk> selectTopKChunks(
List<TextChunk> chunks,
Map<TextChunk, float[]> embeddings,
float[] questionEmbedding,
int k) {
return chunks.stream()
.sorted(Comparator.comparingDouble(chunk ->
-cosineSimilarity(embeddings.get(chunk), questionEmbedding)
))
.limit(k)
.collect(Collectors.toList());
}
private double cosineSimilarity(float[] a, float[] b) {
double dotProduct = 0;
double normA = 0;
double normB = 0;
for (int i = 0; i < a.length; i++) {
dotProduct += a[i] * b[i];
normA += a[i] * a[i];
normB += b[i] * b[i];
}
return dotProduct / (Math.sqrt(normA) * Math.sqrt(normB));
}
}合同审查的完整示例
回到文章开头的合同审查问题,用上面的技术来处理:
@Service
public class ContractReviewer {
private final SemanticBoundaryChunker chunker;
private final MapReduceSummarizer summarizer;
private final LlmClient llmClient;
public ContractReviewResult review(String contractText, ReviewRequirement requirement) {
// 第一步:识别合同关键条款(Map-Reduce)
String keyTermsSummary = summarizer.summarize(
contractText,
"提取并总结以下类型的条款:付款条件、违约责任、保密义务、知识产权归属、争议解决方式"
);
// 第二步:针对性的风险扫描(对每一类风险分别处理)
Map<String, String> riskAnalyses = new HashMap<>();
for (String riskType : requirement.getRiskTypes()) {
String analysis = summarizer.summarize(
contractText,
"分析合同中与「" + riskType + "」相关的条款,指出潜在风险点"
);
riskAnalyses.put(riskType, analysis);
}
// 第三步:综合生成审查报告
String prompt = buildFinalReportPrompt(keyTermsSummary, riskAnalyses, requirement);
String finalReport = llmClient.complete(prompt);
return ContractReviewResult.builder()
.keyTermsSummary(keyTermsSummary)
.riskAnalyses(riskAnalyses)
.finalReport(finalReport)
.chunksProcessed(chunker.getLastChunkCount())
.build();
}
}性能优化:并行分块处理
大文档分块后,可以并行处理以减少等待时间:
// 利用CompletableFuture并行处理多个块
public List<String> processChunksInParallel(List<String> chunks, ChunkProcessor processor) {
// 控制并发度,避免超过API速率限制
Semaphore semaphore = new Semaphore(5); // 最多5个并发请求
List<CompletableFuture<String>> futures = chunks.stream()
.map(chunk -> CompletableFuture.supplyAsync(() -> {
try {
semaphore.acquire();
return processor.process(chunk);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
} finally {
semaphore.release();
}
}))
.collect(Collectors.toList());
// 按原顺序收集结果
return futures.stream()
.map(f -> {
try {
return f.get(60, TimeUnit.SECONDS);
} catch (Exception e) {
log.error("分块处理失败", e);
return "[处理失败]";
}
})
.collect(Collectors.toList());
}选择哪种策略的决策树
处理长文本,把握三个原则:按语义边界切,不按字节数切;选对合并模式,Map-Reduce还是Refine取决于任务是否需要前后关联;并行处理能节省大量等待时间。这三条做到位,大多数长文本处理场景都能处理好。
