设计一个文件存储系统:大文件分片上传、断点续传、秒传实现
设计一个文件存储系统:大文件分片上传、断点续传、秒传实现
适读人群:Java中高级工程师、需要做文件上传的技术人员 | 阅读时长:约18分钟 | 难度:★★★☆☆
开篇故事
做过网盘类产品的工程师都懂那种折磨:用户上传一个10GB的视频文件,传到90%时网络断了,然后要从头传。用户投诉的邮件塞满了工单系统。
我们第一版用的是最简单的方案:前端直接POST,后端接收MultipartFile。这个方案对5MB以下的文件还好,但对大文件来说问题一大堆:HTTP连接超时、内存溢出(服务器把整个文件load进内存)、断网后必须重传。
花了三个月重新设计了文件上传系统,支持分片上传、断点续传、秒传(相同文件秒级完成)。这三个功能实现下来,系统的文件上传成功率从78%提升到了99.6%,用户投诉量减少了95%。这篇文章把核心实现全部公开。
一、需求分析与规模估算
核心功能
- 分片上传: 把大文件切成N个小块,逐块上传,最后在服务端合并
- 断点续传: 上传中断后,重新上传时从断点继续,已上传的分片不重传
- 秒传: 服务端已有相同文件时(通过MD5识别),跳过上传直接返回文件URL
规模估算
文件上传规模:
- 每天上传文件数:100万个
- 平均文件大小:50MB
- 日上传总量:100万 × 50MB = 50TB
- 平均上传带宽:50TB / 86400 = 600MB/s(峰值约3倍 = 1.8GB/s)
分片策略:
- 分片大小:5MB(平衡网络效率和断点续传粒度)
- 50MB文件 = 10个分片
- 每天上传分片数:100万 × 10 = 1000万个分片
存储估算:
- 原始文件:50TB/天(使用对象存储,如OSS/S3)
- 上传记录(元数据):1000万 × 100字节 = 1GB/天(MySQL)
- 分片临时文件(合并后删除):按峰值同时上传量估算,约1TB临时空间
并发估算:
- 同时上传的用户:假设峰值1万用户
- 每个用户平均10个分片并行上传
- 并发分片上传请求:10万 QPS
二、系统架构设计
三、关键代码实现
3.1 初始化上传任务
@RestController
@RequestMapping("/api/upload")
@Slf4j
public class FileUploadController {
@Autowired
private FileUploadService uploadService;
/**
* 第一步:初始化上传任务(支持秒传检查)
* 客户端先发送文件的MD5和基本信息
*/
@PostMapping("/init")
public ApiResponse<InitUploadResponse> initUpload(
@RequestBody InitUploadRequest request) {
return ApiResponse.success(uploadService.initUpload(request));
}
/**
* 第二步:上传单个分片
* Content-Type: multipart/form-data
*/
@PostMapping("/chunk")
public ApiResponse<Void> uploadChunk(
@RequestParam("taskId") String taskId,
@RequestParam("chunkIndex") int chunkIndex,
@RequestParam("chunk") MultipartFile chunk) {
uploadService.uploadChunk(taskId, chunkIndex, chunk);
return ApiResponse.success();
}
/**
* 第三步:查询已上传的分片(用于断点续传)
*/
@GetMapping("/chunks/{taskId}")
public ApiResponse<List<Integer>> getUploadedChunks(@PathVariable String taskId) {
return ApiResponse.success(uploadService.getUploadedChunks(taskId));
}
/**
* 第四步:通知服务端合并分片
*/
@PostMapping("/merge")
public ApiResponse<String> merge(@RequestBody MergeRequest request) {
String fileUrl = uploadService.merge(request.getTaskId());
return ApiResponse.success(fileUrl);
}
}3.2 上传服务核心逻辑
@Service
@Slf4j
public class FileUploadService {
@Autowired
private FileTaskMapper taskMapper;
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private StorageService storageService; // 对象存储(OSS/MinIO)
private static final int CHUNK_SIZE = 5 * 1024 * 1024; // 5MB
private static final String CHUNK_PROGRESS_KEY = "upload:chunks:";
private static final String MD5_URL_KEY = "upload:md5:";
private static final long TASK_EXPIRE_DAYS = 3; // 3天内完成上传
/**
* 初始化上传任务(含秒传检查)
*/
public InitUploadResponse initUpload(InitUploadRequest request) {
String fileMd5 = request.getFileMd5();
// 秒传检查:Redis中查MD5映射
String existingUrl = redisTemplate.opsForValue().get(MD5_URL_KEY + fileMd5);
if (existingUrl != null) {
log.info("秒传命中, md5={}", fileMd5);
return InitUploadResponse.instantUpload(existingUrl);
}
// 数据库也查一下(Redis可能过期但文件还在)
FileMetadata existing = taskMapper.findByMd5(fileMd5);
if (existing != null && existing.getStatus() == FileStatus.COMPLETE) {
// 重新写入Redis缓存
redisTemplate.opsForValue().set(
MD5_URL_KEY + fileMd5, existing.getFileUrl(), 7, TimeUnit.DAYS
);
return InitUploadResponse.instantUpload(existing.getFileUrl());
}
// 查询是否有未完成的上传任务(断点续传)
FileUploadTask existingTask = taskMapper.findPendingTask(
fileMd5, request.getUserId());
if (existingTask != null) {
// 返回已有的taskId,客户端根据这个taskId查询已上传的分片
List<Integer> uploadedChunks = getUploadedChunks(existingTask.getTaskId());
return InitUploadResponse.resume(existingTask.getTaskId(), uploadedChunks);
}
// 创建新上传任务
int totalChunks = (int) Math.ceil((double) request.getFileSize() / CHUNK_SIZE);
String taskId = UUID.randomUUID().toString().replace("-", "");
FileUploadTask task = FileUploadTask.builder()
.taskId(taskId)
.userId(request.getUserId())
.fileName(request.getFileName())
.fileMd5(fileMd5)
.fileSize(request.getFileSize())
.totalChunks(totalChunks)
.status(UploadStatus.UPLOADING)
.createTime(LocalDateTime.now())
.expireTime(LocalDateTime.now().plusDays(TASK_EXPIRE_DAYS))
.build();
taskMapper.insert(task);
// 在Redis中初始化分片进度集合
String progressKey = CHUNK_PROGRESS_KEY + taskId;
redisTemplate.expire(progressKey, TASK_EXPIRE_DAYS, TimeUnit.DAYS);
return InitUploadResponse.newTask(taskId, totalChunks);
}
/**
* 上传单个分片
*/
public void uploadChunk(String taskId, int chunkIndex, MultipartFile chunk) {
// 1. 校验任务是否有效
FileUploadTask task = taskMapper.findByTaskId(taskId);
if (task == null || task.getStatus() != UploadStatus.UPLOADING) {
throw new BusinessException("上传任务不存在或已完成");
}
if (chunkIndex < 0 || chunkIndex >= task.getTotalChunks()) {
throw new BusinessException("分片索引越界");
}
// 2. 校验分片大小(最后一片可以小于CHUNK_SIZE)
boolean isLastChunk = (chunkIndex == task.getTotalChunks() - 1);
long expectedSize = isLastChunk
? task.getFileSize() % CHUNK_SIZE
: CHUNK_SIZE;
if (expectedSize == 0) expectedSize = CHUNK_SIZE;
// 3. 存储分片到临时目录
String chunkPath = buildChunkPath(taskId, chunkIndex);
try {
storageService.saveChunk(chunkPath, chunk.getInputStream());
} catch (IOException e) {
throw new BusinessException("分片存储失败");
}
// 4. 记录分片上传进度
String progressKey = CHUNK_PROGRESS_KEY + taskId;
redisTemplate.opsForSet().add(progressKey, String.valueOf(chunkIndex));
log.debug("分片上传成功, taskId={}, chunkIndex={}/{}",
taskId, chunkIndex, task.getTotalChunks());
}
/**
* 获取已上传的分片列表(断点续传用)
*/
public List<Integer> getUploadedChunks(String taskId) {
String progressKey = CHUNK_PROGRESS_KEY + taskId;
Set<String> uploaded = redisTemplate.opsForSet().members(progressKey);
if (uploaded == null || uploaded.isEmpty()) {
// Redis可能过期,从临时存储目录扫描实际存在的分片
return storageService.listChunks(taskId);
}
return uploaded.stream()
.map(Integer::parseInt)
.sorted()
.collect(Collectors.toList());
}
/**
* 合并分片,生成最终文件
*/
public String merge(String taskId) {
FileUploadTask task = taskMapper.findByTaskId(taskId);
if (task == null) throw new BusinessException("任务不存在");
// 校验所有分片是否都已上传
List<Integer> uploadedChunks = getUploadedChunks(taskId);
if (uploadedChunks.size() != task.getTotalChunks()) {
throw new BusinessException(
String.format("分片未全部上传,已上传%d/%d",
uploadedChunks.size(), task.getTotalChunks())
);
}
// 合并分片(按顺序)
String finalPath = buildFinalPath(task);
List<String> chunkPaths = IntStream.range(0, task.getTotalChunks())
.mapToObj(i -> buildChunkPath(taskId, i))
.collect(Collectors.toList());
String fileUrl;
try {
// 合并并上传到对象存储
fileUrl = storageService.mergeAndUpload(chunkPaths, finalPath);
} catch (Exception e) {
log.error("文件合并失败, taskId={}", taskId, e);
throw new BusinessException("文件合并失败");
}
// 更新任务状态
taskMapper.updateComplete(taskId, fileUrl);
// MD5→URL 映射写入Redis(秒传用)
redisTemplate.opsForValue().set(
MD5_URL_KEY + task.getFileMd5(), fileUrl, 30, TimeUnit.DAYS
);
// 清理临时分片文件
cleanupChunks(taskId, task.getTotalChunks());
log.info("文件合并完成, taskId={}, fileUrl={}", taskId, fileUrl);
return fileUrl;
}
private String buildChunkPath(String taskId, int chunkIndex) {
return "chunks/" + taskId + "/" + chunkIndex;
}
private String buildFinalPath(FileUploadTask task) {
String ext = getFileExtension(task.getFileName());
return "files/" + task.getFileMd5() + ext;
}
private void cleanupChunks(String taskId, int totalChunks) {
// 异步清理,不阻塞合并响应
CompletableFuture.runAsync(() -> {
for (int i = 0; i < totalChunks; i++) {
storageService.delete(buildChunkPath(taskId, i));
}
redisTemplate.delete(CHUNK_PROGRESS_KEY + taskId);
});
}
}3.3 前端分片逻辑(JavaScript示意)
// 前端分片上传逻辑(伪代码,展示核心流程)
async function uploadFile(file) {
const CHUNK_SIZE = 5 * 1024 * 1024; // 5MB
// 1. 计算文件MD5
const fileMd5 = await calculateMD5(file);
// 2. 初始化上传任务
const initResp = await api.initUpload({
fileName: file.name,
fileSize: file.size,
fileMd5: fileMd5
});
// 秒传成功
if (initResp.instantUpload) {
console.log('秒传成功:', initResp.fileUrl);
return initResp.fileUrl;
}
const { taskId, totalChunks, uploadedChunks } = initResp;
// 3. 确定需要上传的分片(断点续传:跳过已上传的)
const pendingChunks = Array.from({length: totalChunks}, (_, i) => i)
.filter(i => !uploadedChunks.includes(i));
// 4. 并发上传分片(控制并发数为3,避免占满带宽)
const CONCURRENCY = 3;
await uploadChunksConcurrently(file, taskId, pendingChunks, CHUNK_SIZE, CONCURRENCY);
// 5. 通知服务端合并
const mergeResp = await api.merge({ taskId });
return mergeResp.fileUrl;
}3.4 MinIO存储服务封装
@Service
@Slf4j
public class MinioStorageService implements StorageService {
@Autowired
private MinioClient minioClient;
@Value("${minio.bucket.temp}")
private String tempBucket;
@Value("${minio.bucket.final}")
private String finalBucket;
@Override
public void saveChunk(String path, InputStream inputStream) {
try {
minioClient.putObject(PutObjectArgs.builder()
.bucket(tempBucket)
.object(path)
.stream(inputStream, -1, 5 * 1024 * 1024)
.build());
} catch (Exception e) {
throw new StorageException("分片存储失败", e);
}
}
@Override
public String mergeAndUpload(List<String> chunkPaths, String targetPath) {
// MinIO的Compose方法支持服务端合并,不需要下载到本地
List<ComposeSource> sources = chunkPaths.stream()
.map(path -> ComposeSource.builder()
.bucket(tempBucket).object(path).build())
.collect(Collectors.toList());
try {
minioClient.composeObject(ComposeObjectArgs.builder()
.bucket(finalBucket)
.object(targetPath)
.sources(sources)
.build());
// 返回访问URL
return minioClient.getPresignedObjectUrl(GetPresignedObjectUrlArgs.builder()
.bucket(finalBucket)
.object(targetPath)
.method(Method.GET)
.expiry(7, TimeUnit.DAYS)
.build());
} catch (Exception e) {
throw new StorageException("文件合并失败", e);
}
}
}四、扩展性设计
上传加速(多节点并行写入)
大文件上传时,可以把不同分片路由到不同的存储节点,充分利用多节点的写入带宽。
// 根据分片索引路由到不同的存储节点
public String getStorageNode(String taskId, int chunkIndex) {
int nodeCount = storageNodes.size();
int nodeIndex = (int) (Math.abs(taskId.hashCode()) + chunkIndex) % nodeCount;
return storageNodes.get(nodeIndex);
}文件去重(节省存储)
秒传本质上就是文件去重的一种实现。进一步优化:同一个文件在OSS上只存一份,多个用户上传同一文件时,只写一条元数据指向同一个物理文件。
五、踩坑实录
坑1:MD5计算在前端很慢
10GB文件的MD5计算在浏览器里需要2-3分钟,用户体验很差。解决方案:先采样计算(取文件头尾和中间各若干字节的组合MD5),作为快速指纹。只有命中秒传时,才提示用户等待计算完整MD5验证。
坑2:合并时内存溢出
早期合并分片时,把所有分片读入内存再合并,10个5MB分片 = 50MB内存,100个并发合并 = 5GB内存,直接OOM。解决方案:用流式合并(SequenceInputStream),每次只读一个分片写入输出流,内存占用降到分片大小级别。更好的方案是用MinIO的ComposeObject API,服务端合并不占应用内存。
坑3:过期任务的分片文件没有清理
用户上传到一半放弃了,临时分片文件永远留在存储上,3天后任务过期但分片文件没有删除。运行6个月后,临时存储里积累了几十TB的垃圾文件。解决方案:每天定时扫描过期的上传任务(3天以上未完成),把对应的分片文件全部删除。
六、总结
文件上传系统的三大核心技术:
| 功能 | 实现方案 | 核心数据结构 |
|---|---|---|
| 分片上传 | 前端切片 + 并发POST | 分片索引 + 临时存储路径 |
| 断点续传 | 初始化返回已上传分片列表 | Redis Set记录分片进度 |
| 秒传 | MD5指纹比对 | Redis Hash存MD5→URL映射 |
三个功能组合在一起,可以极大提升大文件上传的成功率和用户体验。关键是前端要做好MD5计算、并发控制和进度显示,后端要处理好分片存储、进度追踪和文件合并。
