Python Excel/CSV 自动化处理——openpyxl、xlrd、polars 大文件处理方案
Python Excel/CSV 自动化处理——openpyxl、xlrd、polars 大文件处理方案
适读人群:需要用 Python 处理 Excel/CSV 文件的工程师和数据分析师 | 阅读时长:约16分钟 | 核心价值:掌握各种文件处理场景的最优方案,从"能跑"到"跑得快"
每个团队里都有那么一个人,专门负责"Excel 工程师"的工作:从各部门收集 Excel,清洗合并,出报表。这件事原本用 Excel 手动做,费时费力还容易出错。
小蔡就是这样的人。她是个测试工程师,但因为会点 Python,被拉去做月度报告自动化。需求很简单:把20个部门交来的 Excel 文件合并,做些统计,输出最终报告。
她用 pandas 写了个脚本,能跑通,但有几个问题:一个 Excel 文件有 15 个 Sheet,她的代码把每个 Sheet 都读成 DataFrame,内存用了好几个 G;有些文件有合并单元格,读进来数据全错位了;最头疼的是,财务部门的 Excel 有各种格式——有单元格颜色、有边框、有公式,只用 pandas 根本处理不了。
今天我们把 Python 处理 Excel/CSV 的常见场景全部覆盖。
一、工具选型
| 工具 | 读写Excel | 保留格式 | 速度 | 适用场景 |
|---|---|---|---|---|
| pandas | 读写(via openpyxl/xlrd) | 否 | 中 | 数据分析,不关心格式 |
| openpyxl | 读写 | 是 | 慢 | 需要操作格式、样式、公式 |
| xlrd | 只读(xls) | 否 | 快 | 读老格式的 xls 文件 |
| polars | 读写 | 否 | 极快 | 大文件纯数据处理 |
| xlwings | 读写 | 是 | 慢 | 需要操作 Excel 软件本身 |
二、openpyxl 深度实战
2.1 读取复杂 Excel(含合并单元格)
import openpyxl
from openpyxl import load_workbook
from openpyxl.utils import get_column_letter
import pandas as pd
from typing import Dict, List
def read_excel_with_merged_cells(file_path: str, sheet_name: str) -> pd.DataFrame:
"""
正确处理合并单元格的 Excel 读取
openpyxl 对合并单元格的处理:只有左上角单元格有值,其余为 None
"""
wb = load_workbook(file_path, data_only=True) # data_only=True 读取公式的计算结果
ws = wb[sheet_name]
# 先展开合并单元格(把左上角的值填充到所有合并区域)
# 注意:必须在读取数据前处理,否则 pandas 读取会有 None
merged_cells_ranges = list(ws.merged_cells.ranges)
for merged_range in merged_cells_ranges:
min_row, min_col = merged_range.min_row, merged_range.min_col
max_row, max_col = merged_range.max_row, merged_range.max_col
# 获取左上角的值
top_left_value = ws.cell(row=min_row, column=min_col).value
# 先取消合并
ws.unmerge_cells(str(merged_range))
# 填充到所有被合并的单元格
for row in range(min_row, max_row + 1):
for col in range(min_col, max_col + 1):
ws.cell(row=row, column=col).value = top_left_value
# 现在读取数据
data = []
for row in ws.iter_rows(values_only=True):
data.append(list(row))
if not data:
return pd.DataFrame()
# 第一行作为列名
df = pd.DataFrame(data[1:], columns=data[0])
df = df.dropna(how="all") # 删除全空行
wb.close()
return df
def read_all_sheets(file_path: str) -> Dict[str, pd.DataFrame]:
"""读取 Excel 文件的所有 Sheet"""
wb = load_workbook(file_path, data_only=True, read_only=True) # read_only 模式省内存
result = {}
for sheet_name in wb.sheetnames:
try:
ws = wb[sheet_name]
data = []
for i, row in enumerate(ws.iter_rows(values_only=True)):
data.append(list(row))
if i == 0:
headers = list(row)
if len(data) > 1:
df = pd.DataFrame(data[1:], columns=headers)
df = df.dropna(how="all")
result[sheet_name] = df
print(f" Sheet '{sheet_name}': {len(df)} 行 × {len(df.columns)} 列")
except Exception as e:
print(f" Sheet '{sheet_name}' 读取失败:{e}")
wb.close()
return result2.2 生成带格式的 Excel 报告
from openpyxl.styles import (
Font, Fill, PatternFill, Border, Side, Alignment, numbers
)
from openpyxl.utils import get_column_letter
from openpyxl.chart import BarChart, Reference
import datetime
def create_report_excel(df: pd.DataFrame, output_path: str):
"""
生成带格式的专业 Excel 报告
"""
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "月度报告"
# 定义样式
header_font = Font(name="微软雅黑", bold=True, color="FFFFFF", size=12)
header_fill = PatternFill("solid", fgColor="4527A0") # 紫色背景
header_alignment = Alignment(horizontal="center", vertical="center", wrap_text=True)
data_font = Font(name="微软雅黑", size=11)
even_fill = PatternFill("solid", fgColor="F8F0FF") # 浅紫色交替行
odd_fill = PatternFill("solid", fgColor="FFFFFF")
border_side = Side(style="thin", color="DDDDDD")
thin_border = Border(
left=border_side, right=border_side,
top=border_side, bottom=border_side
)
# 写入标题行
headers = list(df.columns)
for col_idx, header in enumerate(headers, 1):
cell = ws.cell(row=1, column=col_idx, value=header)
cell.font = header_font
cell.fill = header_fill
cell.alignment = header_alignment
cell.border = thin_border
# 写入数据行
for row_idx, row in enumerate(df.itertuples(index=False), 2):
fill = even_fill if row_idx % 2 == 0 else odd_fill
for col_idx, value in enumerate(row, 1):
cell = ws.cell(row=row_idx, column=col_idx, value=value)
cell.font = data_font
cell.fill = fill
cell.border = thin_border
cell.alignment = Alignment(vertical="center")
# 对不同类型应用不同格式
if isinstance(value, float):
if abs(value) > 100:
cell.number_format = "#,##0.00"
else:
cell.number_format = "0.00%"
elif isinstance(value, datetime.datetime):
cell.number_format = "YYYY-MM-DD HH:MM:SS"
elif isinstance(value, datetime.date):
cell.number_format = "YYYY-MM-DD"
# 自动调整列宽
for col_idx, col in enumerate(ws.iter_cols(), 1):
max_length = max(
len(str(cell.value)) if cell.value else 0
for cell in col
)
ws.column_dimensions[get_column_letter(col_idx)].width = min(max_length + 4, 50)
# 冻结首行
ws.freeze_panes = "A2"
# 添加汇总行
summary_row = len(df) + 2
ws.cell(row=summary_row, column=1, value="合计").font = Font(bold=True)
# 对数值列添加 SUM 公式
for col_idx in range(2, len(headers) + 1):
col_letter = get_column_letter(col_idx)
cell = ws.cell(
row=summary_row, column=col_idx,
value=f"=SUM({col_letter}2:{col_letter}{len(df)+1})"
)
cell.font = Font(bold=True)
cell.fill = PatternFill("solid", fgColor="E8D5FF")
wb.save(output_path)
print(f"报告已保存:{output_path}")三、大文件 CSV 处理
3.1 用 polars 处理超大 CSV
import polars as pl
import os
def process_large_csv_polars(
input_path: str,
output_path: str,
filter_conditions: dict = None
) -> int:
"""
用 polars 的 lazy API 处理大 CSV
可以处理几十 GB 的文件而不会内存溢出
"""
print(f"文件大小:{os.path.getsize(input_path) / 1024**3:.2f} GB")
# scan_csv:懒加载,不读入内存
lazy = pl.scan_csv(
input_path,
infer_schema_length=10000, # 用前10000行推断类型
try_parse_dates=True, # 自动解析日期列
ignore_errors=True, # 跳过解析错误的行
low_memory=True # 节省内存(稍慢)
)
# 应用过滤条件
if filter_conditions:
for col, value in filter_conditions.items():
lazy = lazy.filter(pl.col(col) == value)
# 数据处理(全在 lazy 阶段,还未执行)
processed = (
lazy
.with_columns([
pl.col("amount").cast(pl.Float32),
pl.col("created_date").str.strptime(pl.Date, "%Y-%m-%d", strict=False)
])
.filter(pl.col("amount") > 0)
.group_by(["user_id", pl.col("created_date").dt.month().alias("month")])
.agg([
pl.col("amount").sum().alias("total_amount"),
pl.col("amount").count().alias("order_count")
])
)
# 流式写入(不把结果全放内存)
processed.sink_csv(output_path) # sink_csv 支持流式写入大文件
# 获取结果行数
result_df = pl.read_csv(output_path)
return len(result_df)3.2 踩坑实录一:Excel 文件的编码问题
现象:打开 Excel 文件时报 UnicodeDecodeError 或者中文显示乱码。
原因:CSV 文件的编码可能是 GBK(Windows Excel 默认编码),不是 UTF-8;或者文件是 .xls(Excel 97 格式),不是 .xlsx。
解法:
import chardet
def detect_and_read_csv(file_path: str) -> pd.DataFrame:
"""自动检测编码并读取 CSV"""
# 先检测编码
with open(file_path, "rb") as f:
raw_data = f.read(50000) # 读取前50KB检测编码
result = chardet.detect(raw_data)
detected_encoding = result["encoding"]
confidence = result["confidence"]
print(f"检测到编码:{detected_encoding}(置信度:{confidence:.0%})")
# 常见的中文编码
encodings_to_try = [detected_encoding, "utf-8", "gbk", "gb2312", "utf-8-sig"]
for encoding in encodings_to_try:
if encoding is None:
continue
try:
df = pd.read_csv(file_path, encoding=encoding)
print(f"使用编码 {encoding} 成功读取")
return df
except (UnicodeDecodeError, LookupError):
continue
# 最后方案:ignore 错误字符
df = pd.read_csv(file_path, encoding="utf-8", errors="ignore")
print("使用 UTF-8 强制读取(忽略无法解码的字符)")
return df
def read_xls_or_xlsx(file_path: str) -> Dict[str, pd.DataFrame]:
"""智能读取 xls 或 xlsx"""
if file_path.endswith(".xls"):
# xlrd 只支持 .xls 格式
return pd.read_excel(file_path, sheet_name=None, engine="xlrd")
else:
# openpyxl 读 .xlsx
return pd.read_excel(file_path, sheet_name=None, engine="openpyxl")四、批量处理多个 Excel 文件
from pathlib import Path
import glob
from concurrent.futures import ThreadPoolExecutor
import threading
def process_single_excel(file_path: str) -> pd.DataFrame:
"""处理单个 Excel 文件,返回清洗后的 DataFrame"""
try:
df = pd.read_excel(file_path, engine="openpyxl")
# 统一列名(去掉空格,转小写)
df.columns = [str(c).strip().lower().replace(" ", "_") for c in df.columns]
# 添加来源文件标识
df["source_file"] = Path(file_path).stem
# 基础清洗
df = df.dropna(how="all")
return df
except Exception as e:
print(f"处理文件 {file_path} 失败:{e}")
return pd.DataFrame()
def batch_merge_excel_files(
input_dir: str,
output_path: str,
pattern: str = "*.xlsx",
n_workers: int = 4
) -> pd.DataFrame:
"""
批量合并目录下的所有 Excel 文件
使用多线程并行处理(IO 密集型,多线程有效)
"""
files = glob.glob(os.path.join(input_dir, "**", pattern), recursive=True)
print(f"找到 {len(files)} 个文件,开始处理...")
results = []
lock = threading.Lock()
def process_and_collect(file_path):
df = process_single_excel(file_path)
if not df.empty:
with lock:
results.append(df)
print(f"已处理:{Path(file_path).name} ({len(df)} 行)")
# 多线程并行处理
with ThreadPoolExecutor(max_workers=n_workers) as executor:
executor.map(process_and_collect, files)
if not results:
print("没有找到有效数据")
return pd.DataFrame()
# 合并所有 DataFrame
combined = pd.concat(results, ignore_index=True)
# 保存结果
if output_path.endswith(".xlsx"):
combined.to_excel(output_path, index=False, engine="openpyxl")
elif output_path.endswith(".parquet"):
combined.to_parquet(output_path, index=False)
else:
combined.to_csv(output_path, index=False, encoding="utf-8-sig") # utf-8-sig 可在 Excel 中正确显示中文
print(f"\n合并完成:{len(results)} 个文件,共 {len(combined):,} 行")
print(f"已保存:{output_path}")
return combined
# 使用
result = batch_merge_excel_files(
input_dir="/data/monthly_reports",
output_path="/data/combined_report.xlsx"
)五、踩坑实录二:公式单元格读取为 None
现象:Excel 里有公式的单元格,用 openpyxl 读取后值为 None。
原因:openpyxl 默认读取公式字符串(如 =SUM(A1:A10)),而不是计算结果。需要 data_only=True 参数,但这只在 Excel 软件打开并保存过的文件上有效。
解法:
# 正确读取公式结果
wb = load_workbook(file_path, data_only=True) # 读取缓存的公式结果
# 但注意:data_only=True 依赖 Excel 上次保存时缓存的值
# 如果文件是程序生成的(没有被 Excel 打开过),公式结果可能为 None
# 方案一:用 xlwings 调用 Excel 引擎重新计算(需要安装了 Excel 的 Windows/Mac)
# import xlwings as xw
# with xw.App(visible=False) as app:
# wb = app.books.open(file_path)
# wb.save() # 保存触发公式重新计算
# wb.close()
# 方案二:用 pandas 直接读(pandas 内部也用 openpyxl,但处理更友好)
df = pd.read_excel(file_path, engine="openpyxl")
# pandas 会自动处理大多数公式结果
# 方案三:让提供文件的人把 Excel 另存为"仅值"格式(治本)六、踩坑实录三:超大 Excel 文件内存溢出
现象:读取一个 200MB 的 Excel 文件时,内存飙升到 4GB 以上,最后 OOM。
原因:pandas 读 Excel 会把整个文件加载进内存,而 Excel 格式(.xlsx)本质是 ZIP 压缩的 XML,解压后体积是原来的 8-10 倍。
解法:
# 方案一:openpyxl read_only 模式(流式读取,省内存)
def read_large_excel_streaming(file_path: str) -> pd.DataFrame:
wb = load_workbook(file_path, read_only=True, data_only=True)
ws = wb.active
data = []
headers = None
for i, row in enumerate(ws.iter_rows(values_only=True)):
if i == 0:
headers = list(row)
else:
data.append(list(row))
# 每 10 万行输出一次进度
if i % 100000 == 0 and i > 0:
print(f"已读取 {i:,} 行")
wb.close()
return pd.DataFrame(data, columns=headers)
# 方案二:转为 CSV 后用 polars 处理(最优方案)
def excel_to_parquet(excel_path: str, output_path: str):
"""把 Excel 转为 Parquet 格式,后续处理更快"""
print("第一步:将 Excel 转为 CSV...")
df = read_large_excel_streaming(excel_path)
# 先保存为 CSV
csv_path = excel_path.replace(".xlsx", ".csv")
df.to_csv(csv_path, index=False, encoding="utf-8")
del df # 释放内存
print("第二步:用 polars 处理并保存为 Parquet...")
pl.read_csv(csv_path).write_parquet(output_path)
os.remove(csv_path) # 清理临时文件
print(f"完成:{output_path}")处理 Excel 和 CSV 是"脏活",但也是最贴近真实业务的工作。工具选对了,一天的手工活能压缩成几分钟的自动化,这就是 Python 自动化的价值所在。
