Go 文件处理实战——大文件分块读取、CSV/Excel 解析、并发处理
Go 文件处理实战——大文件分块读取、CSV/Excel 解析、并发处理
适读人群:需要处理大文件的 Go 工程师、做数据导入导出功能的开发者 | 阅读时长:约18分钟 | 核心价值:掌握 Go 处理 GB 级文件的内存友好方案,以及 CSV/Excel 的高效解析技巧
那次让服务器内存爆掉的"文件导入"需求
2023年4月,运营同学找到我,说要做一个"用户数据导入"功能,把一个10万行的 Excel 文件导入系统。
"多大的文件?"我问。
"Excel 文件,200MB 左右。"
我想了想,感觉不大,就直接写了一个:读取文件 → 解析 Excel → 循环入库。
上线当天,运营上传了文件,服务器内存直接从2GB涨到8GB,然后 OOM,服务崩了。
同事跑来问我:"你把整个 Excel 都加载到内存里了?"
我那时才意识到:excelize.OpenFile 会把整个 Excel 加载到内存,200MB 的 Excel 解析后可能膨胀到 4-8GB 内存。
那次事故是我学习 Go 文件处理的起点。这篇文章把正确的方法都写出来。
大文件读取:不要一次性 ReadAll
Java 工程师转 Go 后,很多人的直觉是:
// 错误做法(小文件无所谓,大文件内存爆炸)
data, err := os.ReadFile("bigfile.txt")正确做法是流式读取,把文件当流处理,不把整个文件加载到内存。
按行读取(文本文件)
package main
import (
"bufio"
"fmt"
"log"
"os"
"time"
)
// ReadFileByLine 按行读取大文件,内存占用始终只有一行的大小
func ReadFileByLine(filePath string, handler func(line string) error) error {
f, err := os.Open(filePath)
if err != nil {
return fmt.Errorf("打开文件失败: %w", err)
}
defer f.Close()
scanner := bufio.NewScanner(f)
// 默认 scanner 每行最大 64KB,如果有超长行需要扩大 buffer
scanner.Buffer(make([]byte, 1024*1024), 1024*1024) // 最大 1MB/行
lineNum := 0
for scanner.Scan() {
lineNum++
line := scanner.Text()
if err := handler(line); err != nil {
return fmt.Errorf("处理第 %d 行失败: %w", lineNum, err)
}
}
return scanner.Err()
}
// 分块读取(适合二进制文件)
func ReadFileInChunks(filePath string, chunkSize int, handler func(chunk []byte) error) error {
f, err := os.Open(filePath)
if err != nil {
return err
}
defer f.Close()
buf := make([]byte, chunkSize)
for {
n, err := f.Read(buf)
if n > 0 {
if err := handler(buf[:n]); err != nil {
return err
}
}
if err != nil {
if err.Error() == "EOF" {
return nil
}
return err
}
}
}CSV 解析:流式 vs 全量
正确的 CSV 流式读取
package main
import (
"encoding/csv"
"fmt"
"io"
"log"
"os"
"strconv"
)
// UserRecord CSV 行结构
type UserRecord struct {
UserID int64
Username string
Email string
Age int
}
// ParseCSVStream 流式解析 CSV(内存占用恒定)
func ParseCSVStream(filePath string, handler func(user *UserRecord) error) (int, int, error) {
f, err := os.Open(filePath)
if err != nil {
return 0, 0, err
}
defer f.Close()
reader := csv.NewReader(f)
reader.FieldsPerRecord = -1 // -1 表示允许每行字段数不一致(更宽容)
reader.LazyQuotes = true // 允许不严格的引号(处理脏数据)
// 跳过标题行
if _, err := reader.Read(); err != nil {
return 0, 0, fmt.Errorf("读取标题行失败: %w", err)
}
var successCount, failCount int
for {
record, err := reader.Read()
if err == io.EOF {
break
}
if err != nil {
failCount++
log.Printf("解析行失败: %v,跳过", err)
continue
}
if len(record) < 4 {
failCount++
continue
}
userID, _ := strconv.ParseInt(record[0], 10, 64)
age, _ := strconv.Atoi(record[3])
user := &UserRecord{
UserID: userID,
Username: record[1],
Email: record[2],
Age: age,
}
if err := handler(user); err != nil {
failCount++
log.Printf("处理用户 %d 失败: %v,跳过", userID, err)
continue
}
successCount++
}
return successCount, failCount, nil
}Excel 解析:流式 API(解决内存爆炸问题)
回到文章开头的问题,excelize.OpenFile 会加载整个文件到内存。解决方案是用 excelize 的流式读取 API:
package main
import (
"fmt"
"log"
"github.com/xuri/excelize/v2"
)
// ParseExcelStream 流式读取 Excel(推荐用于 > 10MB 的文件)
func ParseExcelStream(filePath string, sheetName string, handler func(row []string, rowNum int) error) error {
f, err := excelize.OpenFile(filePath,
excelize.Options{
// 不把所有 sheet 加载到内存,只按需读取
UnzipSizeLimit: 1024 * 1024 * 100, // 100MB 限制
},
)
if err != nil {
return err
}
defer f.Close()
// 使用 Rows() 流式迭代,而不是 GetRows()(GetRows会全部加载到内存)
rows, err := f.Rows(sheetName)
if err != nil {
return err
}
rowNum := 0
for rows.Next() {
rowNum++
row, err := rows.Columns()
if err != nil {
log.Printf("读取第 %d 行失败: %v,跳过", rowNum, err)
continue
}
// 跳过标题行
if rowNum == 1 {
continue
}
if err := handler(row, rowNum); err != nil {
return fmt.Errorf("处理第 %d 行失败: %w", rowNum, err)
}
}
return rows.Close()
}
// 使用示例
func importUsersFromExcel(filePath string) error {
return ParseExcelStream(filePath, "Sheet1", func(row []string, rowNum int) error {
if len(row) < 4 {
return nil // 跳过空行
}
log.Printf("处理第 %d 行: %v", rowNum, row)
// 这里调用数据库插入...
return nil
})
}写 Excel 也用流式 API(写大文件时节省内存):
func WriteExcelStream(filePath string, headers []string, dataFunc func(w *excelize.StreamWriter) error) error {
f := excelize.NewFile()
defer f.Close()
// 创建流式写入器
sw, err := f.NewStreamWriter("Sheet1")
if err != nil {
return err
}
// 写标题行
headerRow := make([]interface{}, len(headers))
for i, h := range headers {
headerRow[i] = h
}
sw.SetRow("A1", headerRow)
// 写数据
if err := dataFunc(sw); err != nil {
return err
}
sw.Flush() // 必须调用 Flush 才能写入
return f.SaveAs(filePath)
}并发处理:Worker Pool 模式
大文件的行数可能有几十万,串行处理太慢。用 Worker Pool 并发处理:
package main
import (
"sync"
"log"
)
// WorkerPool 通用 Worker Pool(Java 类比:ThreadPoolExecutor)
type WorkerPool struct {
workerCount int
jobs chan *UserRecord
wg sync.WaitGroup
errCh chan error
}
func NewWorkerPool(workerCount, bufferSize int) *WorkerPool {
return &WorkerPool{
workerCount: workerCount,
jobs: make(chan *UserRecord, bufferSize),
errCh: make(chan error, bufferSize),
}
}
func (wp *WorkerPool) Start(handler func(*UserRecord) error) {
for i := 0; i < wp.workerCount; i++ {
wp.wg.Add(1)
go func(workerID int) {
defer wp.wg.Done()
for user := range wp.jobs {
if err := handler(user); err != nil {
log.Printf("Worker %d 处理失败: userID=%d, err=%v", workerID, user.UserID, err)
// 非阻塞地写入错误
select {
case wp.errCh <- err:
default:
}
}
}
}(i)
}
}
func (wp *WorkerPool) Submit(user *UserRecord) {
wp.jobs <- user
}
func (wp *WorkerPool) Wait() {
close(wp.jobs)
wp.wg.Wait()
close(wp.errCh)
}
// 完整的并发导入流程
func ConcurrentImportCSV(filePath string, db *sql.DB) error {
pool := NewWorkerPool(20, 1000) // 20个并发 worker,缓冲1000条
pool.Start(func(user *UserRecord) error {
// 批量插入或单条插入
return insertUser(db, user)
})
// 流式读取 CSV,提交任务给 worker pool
_, failCount, err := ParseCSVStream(filePath, func(user *UserRecord) error {
pool.Submit(user)
return nil
})
pool.Wait()
if failCount > 0 {
log.Printf("共有 %d 行处理失败", failCount)
}
return err
}踩坑实录
坑1:并发写数据库时,没有控制并发数,连接池耗尽
现象: 并发 goroutine 数设置得很大(100+),数据库连接池配置了20个连接,并发写时大量请求等待连接,不但没加速,反而比串行还慢,还出现了大量超时错误。
原因: goroutine 数远超数据库连接数时,大量 goroutine 在等锁,上下文切换开销巨大。
解法: Worker 数量 ≈ 数据库连接池大小。同时开启批量插入,把单条 INSERT 改为批量 INSERT INTO ... VALUES (...), (...),10条一批,吞吐量提升10倍以上。
坑2:CSV 里有中文,读出来是乱码
现象: Excel 另存为的 CSV,用 Go 读取后中文全是乱码。
原因: Excel 在 Windows 上另存为 CSV 默认是 GBK/GB18030 编码,不是 UTF-8。
解法: 用 golang.org/x/text/encoding/simplifiedchinese 做转码:
import "golang.org/x/text/encoding/simplifiedchinese"
import "golang.org/x/text/transform"
reader := transform.NewReader(f, simplifiedchinese.GBK.NewDecoder())
csvReader := csv.NewReader(reader)坑3:Excel 流式 API 读取时,数字格式的日期显示为数字
现象: Excel 里的"2023/01/01"日期列,用 rows.Columns() 读出来是 "44927"(Excel 日期序列号)。
原因: Excel 的日期底层是一个数字(从1900-01-01起的天数),流式读取时 excelize 没有自动格式化。
解法: 用 f.GetCellValue 时会自动格式化,或者手动转换:
import "github.com/xuri/excelize/v2"
// 把 Excel 日期序列号转为 time.Time
t, err := excelize.ExcelDateToTime(44927, false)
// t = 2023-01-01 00:00:00总结:大文件处理的核心原则
- 流式 > 全量:任何超过几十MB的文件都不要全部加载进内存。
- 并发处理 + 批量入库:把串行的"读一条存一条"改成"并发处理 + 批量入库",性能差10-100倍。
- 失败行记录,不影响正常行:大文件处理必然有脏数据,用错误记录机制跳过失败行,不要因为一行失败整个任务失败。
- 进度反馈:几十万行的任务,要有进度日志,否则没法判断任务卡住还是在正常跑。
