Java 大文件处理实战——百GB 文件的读取、分块处理、内存控制策略
Java 大文件处理实战——百GB 文件的读取、分块处理、内存控制策略
适读人群:有数据处理需求的 Java 开发者,特别是遇到过大文件内存溢出的同学 | 阅读时长:约 16 分钟 | 核心价值:系统掌握大文件处理的核心技术,包括流式读取、分块处理、并行加速的完整方案
去年我们有一个数据迁移任务:把一个 120GB 的日志文件按照某个规则拆分成多个小文件,分发给不同的下游系统。
产品给我一天时间。
第一个方案是直接读:Files.readAllBytes()。我知道这不行,但我想看看会报什么错:
java.lang.OutOfMemoryError: Required array length 128849018880 is too large128849018880 是 120 × 1024³,120GB 的字节数。连 long 都快装不下了。
然后我用了正确的方法,花了 3 个多小时处理完,产出了 47 个分片文件。这篇文章把整个过程写下来。
核心原则:流式处理,永远不把整个文件加载到内存
大文件处理的根本原则:像流水一样处理数据,读一块、处理一块、写出去、释放内存,然后读下一块。
内存里同时存在的数据应该只有当前处理的那一块,不是整个文件。
package com.example.bigfile;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.*;
/**
* 大文件流式处理的基础示例
* 内存始终只保留当前行,不管文件多大
*/
public class StreamFileProcessor {
/**
* 逐行处理大文件
* 内存占用 = 单行最大长度,与文件大小无关
*/
public static void processLineByLine(Path filePath, LineProcessor processor)
throws IOException {
// BufferedReader 默认 8KB 缓冲区,读完一行处理一行
try (BufferedReader reader = Files.newBufferedReader(filePath, StandardCharsets.UTF_8)) {
String line;
long lineCount = 0;
while ((line = reader.readLine()) != null) {
processor.process(line, ++lineCount);
// 处理完的 line 对象立刻可以被 GC,不会积压
}
}
}
@FunctionalInterface
public interface LineProcessor {
void process(String line, long lineNumber) throws IOException;
}
}实战:按规则分片写出大文件
我的实际任务是:按日志里的用户 ID 哈希,把日志分成 64 个桶,每个桶一个文件。
package com.example.bigfile;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.*;
import java.util.HashMap;
import java.util.Map;
/**
* 大文件分片处理:按规则路由到不同输出文件
* 关键:同时打开多个输出流(64 个),每条数据写入对应的桶
*/
public class FileSplitter {
private static final int BUCKET_COUNT = 64;
public static void split(Path inputFile, Path outputDir) throws IOException {
Files.createDirectories(outputDir);
// 同时打开 64 个输出流
// 注意:文件描述符是有限的,64 个一般没问题(默认限制通常是 1024)
// 如果桶数量很大(比如 1000),需要考虑分批写
Map<Integer, BufferedWriter> writers = new HashMap<>();
try {
// 初始化所有输出 writer
for (int i = 0; i < BUCKET_COUNT; i++) {
Path bucketFile = outputDir.resolve("bucket-" + i + ".log");
writers.put(i, Files.newBufferedWriter(bucketFile, StandardCharsets.UTF_8,
StandardOpenOption.CREATE, StandardOpenOption.APPEND));
}
// 流式读取输入文件,路由每一行
try (BufferedReader reader = Files.newBufferedReader(inputFile, StandardCharsets.UTF_8)) {
String line;
long totalLines = 0;
while ((line = reader.readLine()) != null) {
int bucket = getBucket(line);
writers.get(bucket).write(line);
writers.get(bucket).newLine();
totalLines++;
if (totalLines % 1_000_000 == 0) {
System.out.println("已处理 " + totalLines / 1_000_000 + "M 行");
// 定期 flush,避免数据在缓冲区积压太久
for (BufferedWriter w : writers.values()) w.flush();
}
}
}
} finally {
// 关闭所有 writer,无论是否异常
for (BufferedWriter w : writers.values()) {
try { w.close(); } catch (IOException ignored) {}
}
}
}
/**
* 根据行内容计算桶 ID
* 这里假设行格式是 "userId|timestamp|action|..."
*/
private static int getBucket(String line) {
int separatorPos = line.indexOf('|');
if (separatorPos <= 0) return 0;
String userId = line.substring(0, separatorPos);
// 用 hashCode 的绝对值取模,注意 Math.abs(Integer.MIN_VALUE) 仍然是负数,要处理
int hash = userId.hashCode();
return Math.abs(hash == Integer.MIN_VALUE ? 0 : hash) % BUCKET_COUNT;
}
}踩坑实录一:BufferedWriter 缓冲区大,进程结束前数据没 flush
这是我第一次运行时踩的坑。程序运行完,输出文件大小比预期小了很多。
BufferedWriter 默认缓冲区是 8KB,程序结束时如果没有 close() 或 flush(),缓冲区里的数据就丢了。
我当时用了 try-with-resources,理论上应该自动 close。检查了一下,发现是中途有个未捕获的异常导致 try-with-resources 的 close 出了问题(close 时又抛了异常,被吞掉了)。
最终用 try-finally 明确关闭每个 writer,确保关闭逻辑不被意外吞掉(如上面代码所示)。
用 MappedByteBuffer 处理随机访问需求
如果需要随机访问大文件(比如根据索引跳转到任意位置),用 FileChannel + MappedByteBuffer(内存映射文件):
package com.example.bigfile;
import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.file.*;
/**
* 内存映射文件:适合需要随机访问的大文件
* OS 按需加载页(4KB),不会一次性把整个文件加载到内存
*/
public class MappedFileReader {
// 每次映射的块大小:256MB(不要太大,防止内存压力)
private static final long CHUNK_SIZE = 256L * 1024 * 1024;
/**
* 按块映射读取大文件
* 适合需要在文件中随机跳转的场景
*/
public static void readInChunks(Path filePath) throws IOException {
try (FileChannel channel = FileChannel.open(filePath, StandardOpenOption.READ)) {
long fileSize = channel.size();
long position = 0;
while (position < fileSize) {
long chunkSize = Math.min(CHUNK_SIZE, fileSize - position);
// 映射这一块到内存
MappedByteBuffer buffer = channel.map(
FileChannel.MapMode.READ_ONLY,
position,
chunkSize
);
processBuffer(buffer);
position += chunkSize;
// 建议:处理完之后尝试强制 unmap(JDK 没有提供 public API,需要反射)
// 否则这块内存要等到 GC 才释放
tryUnmap(buffer);
}
}
}
private static void processBuffer(MappedByteBuffer buffer) {
// 处理这块映射数据
while (buffer.hasRemaining()) {
byte b = buffer.get();
// 处理每个字节...
}
}
/**
* 强制释放 MappedByteBuffer 占用的内存
* MappedByteBuffer 不受 GC 直接控制,需要显式释放
*/
private static void tryUnmap(MappedByteBuffer buffer) {
try {
java.lang.reflect.Method cleanerMethod = buffer.getClass().getMethod("cleaner");
cleanerMethod.setAccessible(true);
Object cleaner = cleanerMethod.invoke(buffer);
if (cleaner != null) {
java.lang.reflect.Method cleanMethod = cleaner.getClass().getMethod("clean");
cleanMethod.setAccessible(true);
cleanMethod.invoke(cleaner);
}
} catch (Exception ignored) {
// 释放失败,等 GC 处理
}
}
}并行处理大文件:多线程加速
单线程处理 120GB 文件,受限于 IO 速度和 CPU 处理速度。如果 CPU 处理是瓶颈,可以用多线程。
我的方案是:主线程负责读取,把读到的行批量放入阻塞队列,多个工作线程从队列里取出处理。
package com.example.bigfile;
import java.io.*;
import java.nio.file.*;
import java.util.*;
import java.util.concurrent.*;
/**
* 生产者-消费者模式处理大文件
* 主线程读文件(生产者),多个线程处理(消费者)
*/
public class ParallelFileProcessor {
private static final int BATCH_SIZE = 1000; // 每批多少行
private static final int QUEUE_CAPACITY = 50; // 队列容量(批次数)
private static final List<String> POISON_PILL = Collections.emptyList(); // 结束信号
public static void process(Path filePath, int workerCount, BatchProcessor processor)
throws IOException, InterruptedException {
BlockingQueue<List<String>> queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY);
ExecutorService workers = Executors.newFixedThreadPool(workerCount);
List<Future<?>> futures = new ArrayList<>();
// 启动 workerCount 个消费者线程
for (int i = 0; i < workerCount; i++) {
futures.add(workers.submit(() -> {
try {
while (true) {
List<String> batch = queue.take(); // 阻塞等待
if (batch == POISON_PILL) break; // 收到结束信号
processor.process(batch);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}));
}
// 主线程作为生产者:读文件,分批放入队列
try (BufferedReader reader = Files.newBufferedReader(filePath)) {
List<String> batch = new ArrayList<>(BATCH_SIZE);
String line;
while ((line = reader.readLine()) != null) {
batch.add(line);
if (batch.size() == BATCH_SIZE) {
queue.put(new ArrayList<>(batch)); // 队列满时会阻塞,控制内存使用
batch.clear();
}
}
if (!batch.isEmpty()) queue.put(new ArrayList<>(batch)); // 最后一批
}
// 发送结束信号(每个 worker 一个)
for (int i = 0; i < workerCount; i++) {
queue.put(POISON_PILL);
}
workers.shutdown();
workers.awaitTermination(1, TimeUnit.HOURS);
}
@FunctionalInterface
public interface BatchProcessor {
void process(List<String> batch);
}
}踩坑实录二:并行处理时,结果顺序和输入顺序不一致
用多线程处理后,输出的顺序可能和输入顺序不同(线程调度是不确定的)。
如果业务要求保持顺序,有两个方案:
- 处理时给每行加上行号,处理完之后按行号排序输出(但这要把所有结果放到内存里排序,适合中等规模)
- 用有序的分区(比如用行号 % workerCount 决定由哪个线程处理),每个线程只写自己的分区文件,最后合并
我那次任务不要求顺序,所以直接并行就行了。
踩坑实录三:没有处理 BOM 头,UTF-8 文件第一行解析异常
Windows 上生成的 UTF-8 文件可能有 BOM(\uFEFF),Linux 生成的一般没有。
我处理的那个 120GB 文件是从一台 Windows 服务器导出的,第一行开头有 BOM,导致第一行的第一个字段解析出来带了一个不可见字符,查了半天才发现。
package com.example.bigfile;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.*;
/**
* 处理 UTF-8 BOM 头
*/
public class BomAwareReader {
public static BufferedReader openWithBomSkip(Path filePath) throws IOException {
InputStream is = Files.newInputStream(filePath);
// 检查并跳过 BOM
byte[] bom = new byte[3];
int read = is.read(bom, 0, 3);
if (read == 3 && bom[0] == (byte)0xEF && bom[1] == (byte)0xBB && bom[2] == (byte)0xBF) {
// 是 UTF-8 BOM,已经跳过了 3 个字节
System.out.println("检测到 UTF-8 BOM,已跳过");
} else {
// 不是 BOM,把读走的字节推回去(用 PushbackInputStream)
// 重新打开文件更简单:
is.close();
is = Files.newInputStream(filePath);
}
return new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8));
}
}内存控制的几个关键参数
处理大文件时,JVM 参数的配置很重要:
# 处理 120GB 文件时,我用的 JVM 参数
java \
-Xms512m -Xmx2g \ # 2GB 堆内存,完全足够处理任意大小的文件(因为流式读取)
-XX:+UseG1GC \
-XX:MaxGCPauseMillis=500 \
-XX:+UseCompressedOops \ # 64位 JVM 启用指针压缩,节约内存
-jar bigfile-processor.jar注意:堆大小 2GB 就够了,因为内存里同时存在的数据最多就是:
- 一批数据(BATCH_SIZE × 单行平均长度)
- 工作线程栈
- 类元数据
与文件大小无关。
