Python pandas 深度实战——大数据集处理、内存优化、复杂数据清洗
Python pandas 深度实战——大数据集处理、内存优化、复杂数据清洗
适读人群:用 Python 做数据分析的工程师,尤其是从 Java 转过来的 | 阅读时长:约18分钟 | 核心价值:彻底解决 pandas 内存爆炸和速度慢两大顽疾,给你实用的大数据集处理方案
去年我在一家电商公司做技术咨询,认识了个叫小潘的工程师。他之前做 Java,去年公司让他转到数据团队,主要用 Python 做数据分析。
头一个月他挺顺的,pandas 学起来比 Java 爽多了,感觉灵活自由。但有一天他来找我,脸色有点难看:"老张,我处理一个 2GB 的 CSV,Python 直接跑死了,内存吃了 30 多 GB。服务器就 32GB 内存,差点把整个机器搞崩。"
我问他怎么加载的。他说:df = pd.read_csv('data.csv')。
就这一行,没有任何优化。
pandas 有个"默认行为"陷阱:它会把所有数据加载到内存,自动推断数据类型,但推断结果往往非常低效。一个整数列会被推断成 int64(8字节),一个只有"是/否"两种值的列会被推断成 object(字符串),内存消耗轻松翻4-8倍。
今天我们系统地把 pandas 大数据处理的核心技巧讲透。
一、内存优化:从"加载就崩"到"流畅处理"
1.1 诊断内存使用情况
import pandas as pd
import numpy as np
def diagnose_memory(df: pd.DataFrame) -> pd.DataFrame:
"""
诊断 DataFrame 的内存使用情况
找出内存占用大的列和可以优化的列
"""
info = []
for col in df.columns:
col_info = {
"column": col,
"dtype": str(df[col].dtype),
"null_count": df[col].isna().sum(),
"unique_count": df[col].nunique(),
"memory_mb": df[col].memory_usage(deep=True) / 1024 / 1024,
}
# 对数值列,检查实际值范围
if df[col].dtype in ['int64', 'float64']:
col_info["min"] = df[col].min()
col_info["max"] = df[col].max()
info.append(col_info)
result = pd.DataFrame(info).sort_values("memory_mb", ascending=False)
total_mb = df.memory_usage(deep=True).sum() / 1024 / 1024
print(f"DataFrame 总内存:{total_mb:.1f} MB")
print(f"行数:{len(df):,},列数:{len(df.columns)}")
return result
# 使用示例
df = pd.read_csv("sales_data.csv")
memory_report = diagnose_memory(df)
print(memory_report.head(10))1.2 智能类型优化
def optimize_dtypes(df: pd.DataFrame) -> pd.DataFrame:
"""
自动优化 DataFrame 的数据类型,通常可减少 50-70% 内存
"""
df_optimized = df.copy()
for col in df.columns:
col_type = df[col].dtype
# 整数列:根据值范围选择最小类型
if col_type in ['int64', 'int32', 'int16']:
col_min = df[col].min()
col_max = df[col].max()
if col_min >= 0: # 无符号整数
if col_max < 255:
df_optimized[col] = df[col].astype(np.uint8)
elif col_max < 65535:
df_optimized[col] = df[col].astype(np.uint16)
elif col_max < 4294967295:
df_optimized[col] = df[col].astype(np.uint32)
else: # 有符号整数
if col_min > -128 and col_max < 127:
df_optimized[col] = df[col].astype(np.int8)
elif col_min > -32768 and col_max < 32767:
df_optimized[col] = df[col].astype(np.int16)
elif col_min > -2147483648 and col_max < 2147483647:
df_optimized[col] = df[col].astype(np.int32)
# 浮点列:float64 → float32(大多数场景精度足够)
elif col_type == 'float64':
df_optimized[col] = df[col].astype(np.float32)
# 字符串列:低基数的用 category
elif col_type == 'object':
unique_ratio = df[col].nunique() / len(df)
if unique_ratio < 0.05: # 唯一值不超过 5%,适合 category
df_optimized[col] = df[col].astype("category")
# 统计优化效果
original_mb = df.memory_usage(deep=True).sum() / 1024 / 1024
optimized_mb = df_optimized.memory_usage(deep=True).sum() / 1024 / 1024
reduction = (1 - optimized_mb / original_mb) * 100
print(f"原始内存:{original_mb:.1f} MB")
print(f"优化后:{optimized_mb:.1f} MB")
print(f"节省了 {reduction:.1f}%")
return df_optimized
df_opt = optimize_dtypes(df)1.3 分块读取大文件
def process_large_csv(
file_path: str,
chunk_size: int = 100000,
output_path: str = None
) -> pd.DataFrame:
"""
分块处理大 CSV,避免内存溢出
适合超过可用内存大小的文件
"""
results = []
# 指定列的数据类型(避免自动推断的内存浪费)
dtype_spec = {
"user_id": np.int32,
"product_id": np.int32,
"quantity": np.int16,
"price": np.float32,
"category": "category",
"status": "category",
}
reader = pd.read_csv(
file_path,
chunksize=chunk_size,
dtype=dtype_spec,
parse_dates=["created_at"], # 直接解析日期列
usecols=["user_id", "product_id", "quantity", "price", "category", "created_at", "status"] # 只读需要的列
)
for i, chunk in enumerate(reader):
# 对每个块进行处理(这里做简单的聚合)
monthly_agg = (
chunk
.assign(month=chunk["created_at"].dt.to_period("M"))
.groupby(["month", "category"])
.agg(
total_revenue=("price", "sum"),
order_count=("user_id", "count"),
avg_quantity=("quantity", "mean")
)
.reset_index()
)
results.append(monthly_agg)
if i % 10 == 0:
print(f"已处理 {(i+1) * chunk_size:,} 行...")
# 合并所有块的结果
final_df = pd.concat(results, ignore_index=True)
final_df = final_df.groupby(["month", "category"]).sum().reset_index()
if output_path:
final_df.to_parquet(output_path, index=False)
print(f"结果已保存:{output_path}")
return final_df二、向量化操作:告别 for 循环
2.1 用 numpy 向量化替代循环
import pandas as pd
import numpy as np
import time
# 示例:根据消费金额计算折扣
df = pd.DataFrame({
"user_id": range(1000000),
"total_spent": np.random.uniform(0, 10000, 1000000)
})
# 方法一:Python 循环(极慢,千万别用)
def calc_discount_loop(df):
discounts = []
for spent in df["total_spent"]:
if spent >= 5000:
discounts.append(0.85)
elif spent >= 2000:
discounts.append(0.90)
elif spent >= 500:
discounts.append(0.95)
else:
discounts.append(1.0)
return discounts
# 方法二:apply(比循环快,但仍然慢)
def get_discount(spent):
if spent >= 5000: return 0.85
elif spent >= 2000: return 0.90
elif spent >= 500: return 0.95
return 1.0
# 方法三:np.select 向量化(最快,推荐)
def calc_discount_vectorized(df):
conditions = [
df["total_spent"] >= 5000,
df["total_spent"] >= 2000,
df["total_spent"] >= 500,
]
choices = [0.85, 0.90, 0.95]
return np.select(conditions, choices, default=1.0)
# 性能对比
start = time.time()
df["discount_loop"] = calc_discount_loop(df)
print(f"循环方法:{time.time()-start:.2f}s")
start = time.time()
df["discount_apply"] = df["total_spent"].apply(get_discount)
print(f"apply 方法:{time.time()-start:.2f}s")
start = time.time()
df["discount_vec"] = calc_discount_vectorized(df)
print(f"向量化方法:{time.time()-start:.2f}s")
# 实测(100万行):循环 3.2s,apply 0.8s,向量化 0.02s
# 向量化比循环快 160 倍!三、复杂数据清洗实战
3.1 处理混乱的真实数据
def clean_sales_data(df: pd.DataFrame) -> pd.DataFrame:
"""
清洗真实销售数据中的各种脏数据问题
"""
df = df.copy()
# 1. 处理重复行
print(f"发现 {df.duplicated().sum()} 行完全重复数据")
df = df.drop_duplicates()
# 2. 清洗手机号(格式不统一)
if "phone" in df.columns:
df["phone"] = (
df["phone"]
.astype(str)
.str.replace(r"[^\d]", "", regex=True) # 只保留数字
.str.zfill(11) # 补零到11位
)
# 过滤不合法手机号
valid_phone = df["phone"].str.match(r"^1[3-9]\d{9}$")
print(f"无效手机号:{(~valid_phone).sum()} 个")
df.loc[~valid_phone, "phone"] = np.nan
# 3. 清洗金额列(可能含有货币符号)
if "amount" in df.columns:
df["amount"] = (
df["amount"]
.astype(str)
.str.replace(r"[¥$,\s]", "", regex=True)
.str.strip()
.replace("", np.nan)
.astype(float)
)
# 异常值处理:负数或极大值标记为异常
amount_99p = df["amount"].quantile(0.99)
outlier_mask = (df["amount"] < 0) | (df["amount"] > amount_99p * 3)
print(f"金额异常值:{outlier_mask.sum()} 个")
df.loc[outlier_mask, "amount"] = np.nan
# 4. 标准化地址(省市分离)
if "address" in df.columns:
# 简单的省份提取
provinces = ["北京", "上海", "广州", "深圳", "杭州", "成都", "武汉"]
df["city"] = df["address"].apply(
lambda x: next((p for p in provinces if str(p) in str(x)), "其他")
)
# 5. 处理日期格式不统一
if "order_date" in df.columns:
df["order_date"] = pd.to_datetime(
df["order_date"],
format="mixed", # 自动识别混合格式
errors="coerce" # 无法解析的设为 NaT
)
invalid_dates = df["order_date"].isna().sum()
print(f"无效日期:{invalid_dates} 个")
print(f"\n清洗完成:{len(df):,} 行有效数据")
return df3.2 踩坑实录一:chained indexing 警告导致修改无效
现象:代码运行正常没有报错,但修改后的数据没有生效,原 DataFrame 没有变化。有时还出现 SettingWithCopyWarning。
原因:对 DataFrame 切片后再赋值,pandas 不确定你是在修改副本还是原始数据。
解法:
# 错误方式(可能不生效)
df[df["status"] == "pending"]["amount"] = 0
# 正确方式一:用 .loc 明确指定
df.loc[df["status"] == "pending", "amount"] = 0
# 正确方式二:先复制再操作
pending_df = df[df["status"] == "pending"].copy()
pending_df["amount"] = 0
# 正确方式三:用 assign 创建新列(函数式风格,最干净)
df = df.assign(
amount=lambda x: np.where(x["status"] == "pending", 0, x["amount"])
)四、踩坑实录二:groupby + apply 巨慢
现象:对 500 万行数据做 groupby().apply(custom_func),运行了 10 分钟还没出结果。
原因:apply 在每个分组上逐个执行 Python 函数,有极高的函数调用开销。
解法:
# 慢方法:groupby + apply 自定义函数
def calc_group_stats(group):
return pd.Series({
"revenue": group["amount"].sum(),
"orders": len(group),
"avg_amount": group["amount"].mean(),
"max_amount": group["amount"].max()
})
# slow
# result = df.groupby("user_id").apply(calc_group_stats)
# 快方法:用 agg 字典,让 pandas 用优化过的底层代码
result = df.groupby("user_id").agg(
revenue=("amount", "sum"),
orders=("amount", "count"),
avg_amount=("amount", "mean"),
max_amount=("amount", "max")
)
# 实测:500万行,apply 约 8分钟,agg 约 3秒,快 160 倍五、踩坑实录三:merge 产生意外的行数膨胀
现象:两个 DataFrame,一个 100 万行,一个 50 万行,merge 后变成了 800 万行。
原因:右表的 key 列有重复值,导致笛卡尔积效应。merge 是多对多的,不是你以为的一对一。
解法:
def safe_merge(left: pd.DataFrame, right: pd.DataFrame,
on: str, how: str = "left") -> pd.DataFrame:
"""安全的 merge,merge 前先检查 key 的唯一性"""
# 检查右表的 key 是否有重复
dup_count = right[on].duplicated().sum()
if dup_count > 0:
print(f"警告:右表 '{on}' 列有 {dup_count} 个重复值!")
print("这可能导致行数膨胀,请确认是否符合预期")
# 如果不需要多对多,先去重
right = right.drop_duplicates(subset=[on])
print(f"已自动对右表按 '{on}' 去重")
result = pd.merge(left, right, on=on, how=how)
if len(result) > len(left) * 1.05: # 行数增加超过5%
print(f"注意:merge 后行数从 {len(left):,} 变为 {len(result):,}")
return result六、保存格式选择
# 不同格式的性能对比(100万行数据)
# CSV: 写入 3.2s,文件大小 180MB,读取 4.1s
# Parquet: 写入 0.8s,文件大小 28MB, 读取 0.3s ← 推荐
# Feather: 写入 0.3s,文件大小 42MB, 读取 0.2s ← 速度最快
# Excel: 写入 45s, 文件大小 220MB, 读取 12s ← 避免用于大数据
# 推荐保存格式
df.to_parquet("data.parquet", index=False, compression="snappy")
# 读取时可以只读需要的列,进一步节省内存
df_partial = pd.read_parquet("data.parquet", columns=["user_id", "amount", "date"])