Go 文件上传下载实战——大文件分片、断点续传、进度回调完整方案
Go 文件上传下载实战——大文件分片、断点续传、进度回调完整方案
适读人群:需要在 Go 服务中处理大文件上传下载的工程师 | 阅读时长:约18分钟 | 核心价值:从小文件到 GB 级别大文件,Go 处理文件上传下载的完整技术方案
那次上传 2.3GB 视频文件的噩梦
做内容管理系统的时候,用户反馈上传大视频文件经常失败。我去测试,上传一个 2.3GB 的 MP4,在公司网络环境下(带宽还挺好的),需要大约 7 分钟,然后有大概 30% 的概率在 6 分钟左右失败——服务端 timeout 了。
问题很明显:我们的文件上传是一次性把整个文件读到内存里,然后上传到对象存储,超时设的是 5 分钟。2.3GB 的文件,5 分钟传不完。
一个特别蠢的修法:把 timeout 改成 30 分钟。我就是这么干的,然后又出现了另一个问题:30 分钟内如果网络抖动导致连接断开,前面所有的传输都白费,用户得重头开始。
那是真的蠢。
正确方案:分片上传 + 断点续传
大文件处理的正确思路:
- 客户端把文件分成若干分片(如每片 5MB)
- 分片逐个上传,每个分片独立成功或失败
- 记录已上传的分片,失败时只重传失败的分片
- 所有分片上传完成后,服务端合并
这就是分片上传+断点续传。主流对象存储(S3、OSS、COS)都原生支持这个功能叫做 Multipart Upload。
完整实现
分片上传接口设计
POST /upload/init # 初始化上传,返回 upload_id
POST /upload/:upload_id/chunks/:chunk_index # 上传单个分片
POST /upload/:upload_id/complete # 完成上传,合并分片
GET /upload/:upload_id/status # 查询上传进度(断点续传用)
DELETE /upload/:upload_id # 取消上传服务端实现
package upload
import (
"context"
"crypto/md5"
"encoding/hex"
"fmt"
"io"
"log/slog"
"os"
"path/filepath"
"sort"
"strconv"
"sync"
"time"
)
// UploadSession 上传会话
type UploadSession struct {
ID string `json:"id"`
FileName string `json:"file_name"`
TotalSize int64 `json:"total_size"`
ChunkSize int64 `json:"chunk_size"`
TotalChunks int `json:"total_chunks"`
Chunks map[int]*ChunkInfo `json:"chunks"` // key: chunk index
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
mu sync.RWMutex
}
// ChunkInfo 分片信息
type ChunkInfo struct {
Index int `json:"index"`
Size int64 `json:"size"`
MD5 string `json:"md5"`
FilePath string `json:"file_path"` // 分片临时存储路径
Done bool `json:"done"`
}
// UploadService 处理文件上传
type UploadService struct {
tempDir string // 临时文件目录
sessions sync.Map // upload_id -> *UploadSession
storage ObjectStorage // 最终存储接口(S3/OSS 等)
logger *slog.Logger
maxChunkSize int64 // 最大分片大小(服务端限制)
minChunkSize int64 // 最小分片大小
}
// InitUpload 初始化上传会话
func (s *UploadService) InitUpload(ctx context.Context, req InitUploadRequest) (*UploadSession, error) {
if req.TotalSize <= 0 {
return nil, fmt.Errorf("invalid total size: %d", req.TotalSize)
}
if req.ChunkSize < s.minChunkSize || req.ChunkSize > s.maxChunkSize {
return nil, fmt.Errorf("chunk size %d out of range [%d, %d]",
req.ChunkSize, s.minChunkSize, s.maxChunkSize)
}
totalChunks := int((req.TotalSize + req.ChunkSize - 1) / req.ChunkSize)
session := &UploadSession{
ID: generateUploadID(),
FileName: sanitizeFileName(req.FileName),
TotalSize: req.TotalSize,
ChunkSize: req.ChunkSize,
TotalChunks: totalChunks,
Chunks: make(map[int]*ChunkInfo, totalChunks),
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
// 存储到内存(生产环境应该存到 Redis,支持重启后续传)
s.sessions.Store(session.ID, session)
s.logger.Info("upload session created",
slog.String("upload_id", session.ID),
slog.String("file_name", session.FileName),
slog.Int64("total_size", session.TotalSize),
slog.Int("total_chunks", session.TotalChunks),
)
return session, nil
}
// UploadChunk 上传单个分片
func (s *UploadService) UploadChunk(ctx context.Context, uploadID string, chunkIndex int, reader io.Reader, chunkSize int64) (*ChunkInfo, error) {
session, err := s.getSession(uploadID)
if err != nil {
return nil, err
}
// 校验分片索引
if chunkIndex < 0 || chunkIndex >= session.TotalChunks {
return nil, fmt.Errorf("invalid chunk index %d (total: %d)", chunkIndex, session.TotalChunks)
}
// 计算预期的分片大小
expectedSize := session.ChunkSize
if chunkIndex == session.TotalChunks-1 {
// 最后一片可能更小
expectedSize = session.TotalSize - int64(chunkIndex)*session.ChunkSize
}
if chunkSize != expectedSize {
return nil, fmt.Errorf("chunk size mismatch: expected %d, got %d", expectedSize, chunkSize)
}
// 写入临时文件
tempPath := filepath.Join(s.tempDir, fmt.Sprintf("%s_%d.part", uploadID, chunkIndex))
tempFile, err := os.Create(tempPath)
if err != nil {
return nil, fmt.Errorf("create temp file: %w", err)
}
defer tempFile.Close()
// 同时计算 MD5 用于完整性校验
hash := md5.New()
written, err := io.Copy(io.MultiWriter(tempFile, hash), reader)
if err != nil {
os.Remove(tempPath)
return nil, fmt.Errorf("write chunk: %w", err)
}
if written != chunkSize {
os.Remove(tempPath)
return nil, fmt.Errorf("written %d bytes, expected %d", written, chunkSize)
}
md5sum := hex.EncodeToString(hash.Sum(nil))
chunk := &ChunkInfo{
Index: chunkIndex,
Size: written,
MD5: md5sum,
FilePath: tempPath,
Done: true,
}
// 记录分片完成
session.mu.Lock()
session.Chunks[chunkIndex] = chunk
session.UpdatedAt = time.Now()
session.mu.Unlock()
s.logger.Info("chunk uploaded",
slog.String("upload_id", uploadID),
slog.Int("chunk_index", chunkIndex),
slog.Int64("size", written),
slog.String("md5", md5sum),
)
return chunk, nil
}
// CompleteUpload 完成上传,合并所有分片
func (s *UploadService) CompleteUpload(ctx context.Context, uploadID string) (string, error) {
session, err := s.getSession(uploadID)
if err != nil {
return "", err
}
session.mu.RLock()
defer session.mu.RUnlock()
// 检查所有分片是否都已上传
for i := 0; i < session.TotalChunks; i++ {
chunk, ok := session.Chunks[i]
if !ok || !chunk.Done {
return "", fmt.Errorf("chunk %d not uploaded", i)
}
}
// 按顺序合并分片,用进度回调
fileKey := generateFileKey(session.FileName)
pr, pw := io.Pipe()
// 在后台 goroutine 里合并分片并写入 pipe
go func() {
defer pw.Close()
for i := 0; i < session.TotalChunks; i++ {
chunk := session.Chunks[i]
f, err := os.Open(chunk.FilePath)
if err != nil {
pw.CloseWithError(fmt.Errorf("open chunk %d: %w", i, err))
return
}
if _, err := io.Copy(pw, f); err != nil {
f.Close()
pw.CloseWithError(fmt.Errorf("copy chunk %d: %w", i, err))
return
}
f.Close()
}
}()
// 上传到对象存储
if err := s.storage.Put(ctx, fileKey, pr, session.TotalSize); err != nil {
return "", fmt.Errorf("upload to storage: %w", err)
}
// 清理临时文件
go s.cleanup(uploadID, session)
return fileKey, nil
}
// GetUploadStatus 获取上传状态(断点续传时调用,获取已完成的分片列表)
func (s *UploadService) GetUploadStatus(ctx context.Context, uploadID string) (*UploadStatus, error) {
session, err := s.getSession(uploadID)
if err != nil {
return nil, err
}
session.mu.RLock()
defer session.mu.RUnlock()
completedChunks := make([]int, 0, len(session.Chunks))
for idx, chunk := range session.Chunks {
if chunk.Done {
completedChunks = append(completedChunks, idx)
}
}
sort.Ints(completedChunks)
return &UploadStatus{
UploadID: uploadID,
TotalChunks: session.TotalChunks,
CompletedChunks: completedChunks,
Progress: float64(len(completedChunks)) / float64(session.TotalChunks),
}, nil
}
func (s *UploadService) getSession(uploadID string) (*UploadSession, error) {
val, ok := s.sessions.Load(uploadID)
if !ok {
return nil, fmt.Errorf("upload session %s not found", uploadID)
}
return val.(*UploadSession), nil
}
func (s *UploadService) cleanup(uploadID string, session *UploadSession) {
for _, chunk := range session.Chunks {
os.Remove(chunk.FilePath)
}
s.sessions.Delete(uploadID)
}
// 辅助类型
type InitUploadRequest struct {
FileName string
TotalSize int64
ChunkSize int64
}
type UploadStatus struct {
UploadID string `json:"upload_id"`
TotalChunks int `json:"total_chunks"`
CompletedChunks []int `json:"completed_chunks"`
Progress float64 `json:"progress"`
}
type ObjectStorage interface {
Put(ctx context.Context, key string, reader io.Reader, size int64) error
Get(ctx context.Context, key string) (io.ReadCloser, error)
}
func generateUploadID() string {
return fmt.Sprintf("upload_%d", time.Now().UnixNano())
}
func generateFileKey(fileName string) string {
return fmt.Sprintf("uploads/%d/%s", time.Now().Unix(), fileName)
}
func sanitizeFileName(name string) string {
return filepath.Base(name)
}踩坑实录
坑一:分片上传并发时,临时文件目录满了
现象: 多个用户同时上传大文件时,临时目录占了 40GB 磁盘,服务器磁盘告警。
原因: 没有清理超时的上传会话,也没有限制最大同时进行的上传数量。
解法: 加两个机制:1) 定时清理超过 24 小时没有活动的上传会话和临时文件;2) 限制并发上传会话数量。
坑二:大文件下载时把整个文件读到内存里,OOM
现象: 下载 500MB 文件时,进程内存瞬间涨 500MB,然后 OOM。
原因: 代码里用了 io.ReadAll(storageReader) 把整个文件读到 []byte 里,然后 w.Write(data)。
解法: 用流式响应,直接把 storage 的 reader 复制到 response writer:
func (h *DownloadHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
reader, size, err := h.storage.Get(r.Context(), fileKey)
if err != nil {
http.Error(w, "file not found", http.StatusNotFound)
return
}
defer reader.Close()
w.Header().Set("Content-Length", strconv.FormatInt(size, 10))
w.Header().Set("Content-Disposition", "attachment; filename="+fileName)
// 流式传输,不把整个文件读入内存
// 支持 Range 请求(断点续传下载)
http.ServeContent(w, r, fileName, time.Now(),
&seekableReader{reader: reader, size: size})
}坑三:没有校验 Content-Type,被上传了可执行文件
现象: 安全扫描发现有 .exe、.sh 等可执行文件被上传进来。
解法: 除了校验 Content-Type header(可以伪造),还要读取文件头魔数(magic bytes)来识别真实格式:
func detectFileType(reader io.Reader) (string, io.Reader, error) {
// 读取前512字节用于检测
buf := make([]byte, 512)
n, err := reader.Read(buf)
if err != nil && err != io.EOF {
return "", nil, err
}
contentType := http.DetectContentType(buf[:n])
// 把读取的字节放回 reader
combined := io.MultiReader(bytes.NewReader(buf[:n]), reader)
return contentType, combined, nil
}