Python polars 实战——比 pandas 快10倍的数据处理框架完整指南
Python polars 实战——比 pandas 快10倍的数据处理框架完整指南
适读人群:用 pandas 处理数据遇到性能瓶颈的工程师 | 阅读时长:约16分钟 | 核心价值:快速掌握 polars 核心用法,学会从 pandas 迁移,真实场景下提升5-20倍处理速度
今年上半年,我接触了一个做用户行为分析的团队。他们每天要处理一批日志数据,大约 1500 万行,用 pandas 处理需要将近 40 分钟,整个数据流水线几乎跑一早上。
他们的数据工程师老胡找到我,问有没有办法提速。我问他用什么工具。他说全是 pandas,但内存快撑不住了,换了台 128GB 内存的服务器,也就勉强能跑。
我说你试试 polars。
他有点怀疑:"真的快这么多?"
我说,我们现场测试。同样的数据,同样的操作,polars 跑了 3 分 20 秒,内存占用从 85GB 降到了 18GB。他当场就决定迁移。
polars 之所以快,有几个核心原因:
- Rust 编写,没有 Python GIL 的限制
- 原生多线程,自动并行处理,充分利用多核 CPU
- 列式存储 + Arrow 格式,缓存友好,内存效率极高
- Lazy 模式,先构建查询计划,优化后再执行(类似 SQL 的查询优化器)
今天我们从实战角度学 polars。
一、安装与基础语法
# pip install polars
import polars as pl
import numpy as np
from datetime import datetime
# 创建 DataFrame
df = pl.DataFrame({
"user_id": [1, 2, 3, 4, 5],
"name": ["Alice", "Bob", "Charlie", "David", "Eve"],
"age": [25, 30, 35, 28, 22],
"city": ["北京", "上海", "北京", "深圳", "上海"],
"score": [85.5, 92.0, 78.3, 88.7, 95.2]
})
print(df)
print(f"\n形状:{df.shape}")
print(f"\n数据类型:\n{df.dtypes}")
# polars 的列操作语法:使用 pl.col()
result = df.select([
pl.col("user_id"),
pl.col("name"),
(pl.col("score") * 1.1).alias("adjusted_score"), # 新列名用 alias
pl.col("city").str.to_lowercase().alias("city_lower")
])
print(result)二、核心操作对比——pandas vs polars
2.1 过滤与选择
import polars as pl
import pandas as pd
import time
# 准备测试数据(500万行)
n = 5_000_000
data = {
"user_id": range(n),
"category": ["A", "B", "C", "D"] * (n // 4),
"amount": [i * 0.5 for i in range(n)],
"is_active": [True, False] * (n // 2),
"created_date": pd.date_range("2023-01-01", periods=n, freq="min").to_list()
}
# pandas 版本
df_pd = pd.DataFrame(data)
start = time.time()
result_pd = df_pd[
(df_pd["category"].isin(["A", "B"])) &
(df_pd["amount"] > 1000) &
(df_pd["is_active"] == True)
][["user_id", "category", "amount"]].sort_values("amount", ascending=False)
print(f"pandas: {time.time()-start:.2f}s, 结果行数: {len(result_pd)}")
# polars 版本
df_pl = pl.DataFrame(data)
start = time.time()
result_pl = (
df_pl
.filter(
pl.col("category").is_in(["A", "B"]) &
(pl.col("amount") > 1000) &
pl.col("is_active")
)
.select(["user_id", "category", "amount"])
.sort("amount", descending=True)
)
print(f"polars: {time.time()-start:.2f}s, 结果行数: {len(result_pl)}")
# 实测:pandas 2.8s,polars 0.4s,快7倍2.2 groupby 聚合
# pandas
start = time.time()
result_pd = df_pd.groupby(["category", "is_active"]).agg({
"amount": ["sum", "mean", "count"],
"user_id": "nunique"
}).round(2)
print(f"pandas groupby: {time.time()-start:.2f}s")
# polars(注意语法差异)
start = time.time()
result_pl = (
df_pl
.group_by(["category", "is_active"])
.agg([
pl.col("amount").sum().alias("amount_sum"),
pl.col("amount").mean().alias("amount_mean"),
pl.col("amount").count().alias("amount_count"),
pl.col("user_id").n_unique().alias("user_count")
])
.sort(["category", "is_active"])
)
print(f"polars groupby: {time.time()-start:.2f}s")
# 实测:pandas 1.2s,polars 0.15s,快8倍三、Lazy API——polars 的杀手级特性
Lazy API 是 polars 比 pandas 高一个层次的地方。它不立即执行,而是先构建执行计划,然后让查询优化器重排、合并操作,最后才执行。
# Lazy 模式:先定义查询,不立即执行
lazy_query = (
pl.scan_csv("large_file.csv") # scan_csv(懒加载,不读入内存)
.filter(pl.col("status") == "active")
.filter(pl.col("amount") > 100)
.group_by("category")
.agg([
pl.col("amount").sum().alias("total"),
pl.col("user_id").n_unique().alias("users")
])
.sort("total", descending=True)
.head(10)
)
# 查看优化后的执行计划
print(lazy_query.explain())
# polars 会自动:
# 1. 把两个 filter 合并成一个(减少遍历次数)
# 2. 只读取用到的列(谓词下推、列裁剪)
# 3. 使用流式处理(chunk by chunk),避免将整个文件加载到内存
# 触发执行
result = lazy_query.collect()
print(result)
# 流式执行(超大文件,内存极省)
result_streaming = lazy_query.collect(streaming=True)3.1 踩坑实录一:collect() 前忘记处理空值
现象:lazy 查询链接了很多操作,最后 collect() 时报错 InvalidOperationError: null values,但不知道是哪一步产生的空值。
原因:polars 在 lazy 模式下不会立即验证每步的合法性,问题到 collect() 才暴露。
解法:
# 在 lazy 查询中提前处理空值
query = (
pl.scan_csv("data.csv")
.with_columns([
pl.col("amount").fill_null(0), # 数值列空值填0
pl.col("category").fill_null("unknown"), # 字符串列填默认值
pl.col("user_id").drop_nulls() # 或者直接删除空值行
])
.filter(pl.col("user_id").is_not_null()) # 关键列必须非空
# ... 后续操作
)
# 也可以在 collect 前先 fetch 少量数据验证
sample = query.fetch(100) # 只取前100行,快速验证逻辑
print(sample)四、处理时序数据
import polars as pl
from datetime import datetime, timedelta
# 创建时序数据
df = pl.DataFrame({
"timestamp": [
datetime(2024, 1, 1, 10, 0),
datetime(2024, 1, 1, 10, 5),
datetime(2024, 1, 2, 11, 0),
datetime(2024, 1, 2, 11, 30),
datetime(2024, 1, 3, 9, 0),
],
"user_id": [1, 1, 2, 2, 1],
"event": ["login", "purchase", "login", "purchase", "login"],
"value": [None, 150.0, None, 89.5, None]
})
# 时间操作
result = df.with_columns([
pl.col("timestamp").dt.date().alias("date"),
pl.col("timestamp").dt.hour().alias("hour"),
pl.col("timestamp").dt.weekday().alias("weekday"),
])
# 滚动窗口聚合(需要按时间排序)
df_sorted = df.sort("timestamp")
# 7天滚动均值(polars 的 rolling 非常快)
rolling_result = df_sorted.with_columns([
pl.col("value")
.rolling_mean(window_size=3, min_periods=1) # 3条记录的滚动均值
.alias("rolling_avg")
])
print(rolling_result)
# 时间重采样(类似 pandas 的 resample)
daily_agg = (
df
.with_columns(pl.col("timestamp").dt.date().alias("date"))
.group_by("date")
.agg([
pl.col("value").sum().alias("daily_total"),
pl.col("user_id").n_unique().alias("daily_users"),
pl.col("event").count().alias("event_count")
])
.sort("date")
)
print(daily_agg)五、踩坑实录二:polars 字符串操作与 pandas 不同
现象:从 pandas 迁移代码时,字符串操作报错 AttributeError: 'Series' object has no attribute 'str.contains',或者结果不对。
原因:polars 的字符串操作在 pl.col("xxx").str 命名空间下,语法和 pandas 有差异。
解法:
# pandas 字符串操作 → polars 对应写法
# 包含检测
# pandas: df["col"].str.contains("pattern")
# polars: pl.col("col").str.contains("pattern")
# 分割
# pandas: df["col"].str.split(",")
# polars: pl.col("col").str.split(",")
# 提取匹配(正则)
# pandas: df["col"].str.extract(r"(\d+)")
# polars: pl.col("col").str.extract(r"(\d+)", group_index=0)
# 替换
# pandas: df["col"].str.replace("old", "new")
# polars: pl.col("col").str.replace_all("old", "new") # 注意是 replace_all
# 转大写
# pandas: df["col"].str.upper()
# polars: pl.col("col").str.to_uppercase()
# 去空格
# pandas: df["col"].str.strip()
# polars: pl.col("col").str.strip_chars()
# 示例
df = pl.DataFrame({"email": ["User@Example.com", "admin@Test.org", " space@test.com "]})
cleaned = df.with_columns([
pl.col("email").str.strip_chars().str.to_lowercase().alias("email_clean"),
pl.col("email").str.extract(r"@(.+)$", group_index=1).alias("domain")
])
print(cleaned)六、踩坑实录三:with_columns 中引用同一表达式新列
现象:想在同一个 with_columns 里,用刚创建的新列计算另一个新列,结果报 ColumnNotFoundError。
原因:with_columns 中的所有表达式是并行计算的,不能在同一个调用中引用刚创建的列。
解法:
# 错误:在同一 with_columns 中引用刚创建的列
# df.with_columns([
# (pl.col("price") * pl.col("quantity")).alias("subtotal"),
# (pl.col("subtotal") * 0.1).alias("tax"), # 报错:subtotal 还不存在
# ])
# 正确:分两步
df = df.with_columns([
(pl.col("price") * pl.col("quantity")).alias("subtotal")
]).with_columns([
(pl.col("subtotal") * 0.1).alias("tax"),
(pl.col("subtotal") * 1.1).alias("total_with_tax")
])
# 或者一步计算到位,不中间引用
df = df.with_columns([
(pl.col("price") * pl.col("quantity")).alias("subtotal"),
(pl.col("price") * pl.col("quantity") * 0.1).alias("tax"), # 重复写表达式
(pl.col("price") * pl.col("quantity") * 1.1).alias("total_with_tax")
])七、pandas 迁移指南(速查表)
| 操作 | pandas | polars |
|---|---|---|
| 读 CSV | pd.read_csv() | pl.read_csv() / pl.scan_csv() |
| 选列 | df[["a","b"]] | df.select(["a","b"]) |
| 过滤 | df[df.a > 1] | df.filter(pl.col("a") > 1) |
| 新建列 | df["c"] = ... | df.with_columns(...) |
| 排序 | df.sort_values("a") | df.sort("a") |
| groupby | df.groupby("a").agg(...) | df.group_by("a").agg(...) |
| join | pd.merge(df1, df2, on="id") | df1.join(df2, on="id") |
| 空值处理 | df.fillna(0) | df.fill_null(0) |
| 转 pandas | - | df.to_pandas() |
我的建议:新项目直接用 polars;老项目里,把性能瓶颈的数据处理部分换成 polars,其余用 pandas 过渡。两者可以通过 df.to_pandas() 和 pl.from_pandas(df) 互转。
