第2334篇:Java AI应用的内存优化——大文档处理和向量计算的内存管理
大约 6 分钟
第2334篇:Java AI应用的内存优化——大文档处理和向量计算的内存管理
适读人群:在AI项目中遇到内存溢出、大文档处理缓慢或向量计算占用过多内存的工程师 | 阅读时长:约17分钟 | 核心价值:掌握AI应用的内存优化核心技术,解决大规模文档处理的内存瓶颈
有一次我们在处理一批PDF文档——大约500个,每个50-200页。在测试环境里没问题,一到生产环境就OOM。
当时的第一反应是加JVM内存,从4G加到8G,还是OOM。
后来用MAT分析堆转储才发现问题:所有文档被TikaDocumentReader一次性读入内存,500个文档同时驻留堆里,加上每个文档分块后的TextSegment数组、Embedding向量的float[]数组……堆里的数据是原始文档大小的6-8倍。
这不是内存不够,是设计有问题。
问题的根源:全量加载 vs 流式处理
大多数AI框架的文档处理示例代码是这样写的:
// 典型的示例代码——适合小数据量,不适合生产
List<Document> documents = documentReader.get(); // 全部读进内存
List<Document> chunks = textSplitter.apply(documents); // 分块,内存翻倍
embeddingModel.embedAll(chunks); // 向量化,内存再涨
vectorStore.add(chunks); // 入库这个流程的内存峰值是:原始文档 × 2(分块后的TextSegment复制) + 向量数组。
500个文档,每个平均100KB,就是:50MB × 2 + 向量(500文档 × 100块/文档 × 6KB/向量) = 100MB + 300MB = 400MB。
这还只是这批文档,如果有多个批次同时在处理……
解法一:流式处理文档
不要一次性把所有文档加载进内存,用迭代器逐个处理:
@Service
@Slf4j
public class StreamingDocumentIngestor {
private final EmbeddingModel embeddingModel;
private final VectorStore vectorStore;
// 每批处理的文档数(控制内存峰值)
private static final int BATCH_SIZE = 10;
// 每个文档分块后的最大文本段数
private static final int MAX_SEGMENTS_PER_DOC = 100;
/**
* 流式处理文档目录
* 任何时刻内存里只有BATCH_SIZE个文档的数据
*/
public IngestResult ingestDirectory(Path directory) {
int totalDocs = 0;
int totalSegments = 0;
int failedDocs = 0;
try (Stream<Path> files = Files.walk(directory)) {
// 只处理PDF和TXT文件,过滤其他
Iterator<Path> fileIterator = files
.filter(path -> isSupported(path))
.sorted()
.iterator();
List<Path> batch = new ArrayList<>(BATCH_SIZE);
while (fileIterator.hasNext()) {
batch.add(fileIterator.next());
if (batch.size() >= BATCH_SIZE || !fileIterator.hasNext()) {
// 处理当前批次
BatchResult result = processBatch(batch);
totalDocs += result.processedDocs();
totalSegments += result.processedSegments();
failedDocs += result.failedDocs();
// 清理批次列表,让GC回收
batch.clear();
// 主动建议GC(不强制,但给JVM一个信号)
System.gc();
log.info("进度:已处理{}个文档,{}个分段", totalDocs, totalSegments);
}
}
} catch (IOException e) {
throw new DocumentIngestException("遍历目录失败:" + directory, e);
}
return new IngestResult(totalDocs, totalSegments, failedDocs);
}
private BatchResult processBatch(List<Path> paths) {
int processed = 0;
int totalSegments = 0;
int failed = 0;
for (Path path : paths) {
try {
int segments = processOneDocument(path);
totalSegments += segments;
processed++;
} catch (Exception e) {
log.error("处理文档失败:{}", path, e);
failed++;
}
}
return new BatchResult(processed, totalSegments, failed);
}
private int processOneDocument(Path path) {
// 读取单个文档
Resource resource = new FileSystemResource(path.toFile());
TikaDocumentReader reader = new TikaDocumentReader(resource);
List<Document> rawDocs = reader.get();
// 分块
TokenTextSplitter splitter = new TokenTextSplitter(500, 50, 5, 10000, true);
List<Document> segments = splitter.apply(rawDocs);
// 限制每个文档的最大段数(防止超长文档耗尽内存)
if (segments.size() > MAX_SEGMENTS_PER_DOC) {
log.warn("文档分块数过多:{},截断为{}个", path, MAX_SEGMENTS_PER_DOC);
segments = segments.subList(0, MAX_SEGMENTS_PER_DOC);
}
// 加入来源元数据
segments.forEach(seg -> seg.getMetadata().put("source", path.toString()));
// 向量化并入库
vectorStore.add(segments);
// segments在这里已经没有引用了,GC可以回收
int count = segments.size();
segments = null; // 显式清除引用(可选,帮助读者理解意图)
return count;
}
private boolean isSupported(Path path) {
String name = path.getFileName().toString().toLowerCase();
return name.endsWith(".pdf") || name.endsWith(".txt") || name.endsWith(".md");
}
public record IngestResult(int totalDocs, int totalSegments, int failedDocs) {}
private record BatchResult(int processedDocs, int processedSegments, int failedDocs) {}
}解法二:向量计算的内存优化
Embedding向量是AI应用内存消耗的大头:
// 问题:embedAll一次性计算所有向量
// 如果有10000个TextSegment,内存里同时存在所有向量
List<Embedding> allEmbeddings = embeddingModel.embedAll(allSegments).content();
// allEmbeddings = 10000 × 1536维 × 4字节 = 60MB
// 优化:分批计算,计算完立即入库,不积累
@Service
public class EfficientEmbeddingService {
private final EmbeddingModel embeddingModel;
private final EmbeddingStore<TextSegment> embeddingStore;
private static final int EMBED_BATCH_SIZE = 100; // 每批向量化的段数
public void embedAndStore(List<TextSegment> segments) {
// 分批处理
for (int i = 0; i < segments.size(); i += EMBED_BATCH_SIZE) {
int end = Math.min(i + EMBED_BATCH_SIZE, segments.size());
List<TextSegment> batch = segments.subList(i, end);
// 计算这批向量
List<Embedding> embeddings = embeddingModel.embedAll(batch).content();
// 立即存入向量数据库(内存马上可以释放)
embeddingStore.addAll(embeddings, batch);
// 不保留引用
embeddings = null;
log.debug("向量化进度:{}/{}", end, segments.size());
}
}
}解法三:大文档的惰性加载
对于超大文档(单个文件100MB以上),需要用流式读取而不是一次性加载:
@Service
public class LargeDocumentProcessor {
/**
* 用流式方式处理大PDF(不把整个文件读进内存)
*/
public void processLargePdf(Path pdfPath, VectorStore vectorStore) {
// Tika的SAX解析器支持流式处理
try (InputStream inputStream = Files.newInputStream(pdfPath)) {
// 用ContentHandler逐页处理,不一次性加载全文
PageByPageHandler handler = new PageByPageHandler(
page -> processPage(page, pdfPath.toString(), vectorStore));
AutoDetectParser parser = new AutoDetectParser();
Metadata metadata = new Metadata();
ParseContext context = new ParseContext();
parser.parse(inputStream, handler, metadata, context);
} catch (Exception e) {
throw new DocumentProcessException("处理大文档失败:" + pdfPath, e);
}
}
private void processPage(String pageContent, String source, VectorStore vectorStore) {
if (pageContent.trim().isEmpty()) return;
// 对单页内容进行分块和向量化
Document doc = new Document(pageContent, Map.of("source", source));
TokenTextSplitter splitter = new TokenTextSplitter(300, 30, 3, 5000, true);
List<Document> chunks = splitter.apply(List.of(doc));
vectorStore.add(chunks);
}
// 自定义的逐页处理ContentHandler
static class PageByPageHandler extends DefaultHandler {
private final Consumer<String> pageProcessor;
private final StringBuilder currentPage = new StringBuilder();
public PageByPageHandler(Consumer<String> pageProcessor) {
this.pageProcessor = pageProcessor;
}
@Override
public void characters(char[] ch, int start, int length) {
currentPage.append(ch, start, length);
}
@Override
public void endElement(String uri, String localName, String qName) {
if ("div".equals(localName) || "p".equals(localName)) {
String content = currentPage.toString().trim();
if (content.length() > 100) { // 过滤太短的内容
pageProcessor.accept(content);
}
currentPage.setLength(0); // 清空,节省内存
}
}
}
}解法四:向量缓存的内存控制
向量缓存是AI应用的常见优化,但如果不控制缓存大小,会导致内存持续增长:
@Component
public class BoundedEmbeddingCache {
// 使用Caffeine的带大小限制和过期时间的缓存
private final Cache<String, float[]> cache;
public BoundedEmbeddingCache() {
this.cache = Caffeine.newBuilder()
// 最多缓存10000个向量
// 10000 × 1536 × 4 bytes = 60MB 内存上限
.maximumSize(10_000)
// 1小时未访问就过期
.expireAfterAccess(Duration.ofHours(1))
// 软引用(内存紧张时允许GC回收)
.softValues()
// 记录缓存统计
.recordStats()
.build();
}
public float[] getOrCompute(String text, EmbeddingModel model) {
return cache.get(text, key -> {
// 计算Embedding
Embedding embedding = model.embed(TextSegment.from(key)).content();
return embedding.vector();
});
}
// 监控缓存内存使用
public CacheStats getStats() {
return cache.stats();
}
// 计算缓存当前内存使用量(估算)
public long estimatedMemoryBytes() {
long count = cache.estimatedSize();
// 每个向量约6KB(1536维 × 4字节 + 对象头开销)
return count * 6 * 1024;
}
}监控和告警:发现内存问题
@Component
@Slf4j
public class MemoryMonitor {
private final MeterRegistry meterRegistry;
@Scheduled(fixedDelay = 60000) // 每分钟检查一次
public void checkMemory() {
MemoryMXBean memBean = ManagementFactory.getMemoryMXBean();
MemoryUsage heapUsage = memBean.getHeapMemoryUsage();
long used = heapUsage.getUsed();
long max = heapUsage.getMax();
double ratio = (double) used / max;
// 记录到监控指标
meterRegistry.gauge("jvm.heap.usage.ratio", ratio);
// 超过80%告警
if (ratio > 0.8) {
log.warn("[内存告警] 堆内存使用率:{:.1f}%({}MB / {}MB)",
ratio * 100,
used / 1024 / 1024,
max / 1024 / 1024);
}
// 超过90%触发GC
if (ratio > 0.9) {
log.error("[内存严重告警] 触发主动GC,当前使用率:{:.1f}%", ratio * 100);
System.gc();
}
}
}内存优化没有终点,关键是在设计阶段就考虑内存模型:数据在什么时候进内存,什么时候应该离开内存。流式处理、分批计算、有界缓存——这三个模式解决了AI应用95%的内存问题。
