Python 大规模数据处理实战——千万行数据的批处理方案与内存控制
Python 大规模数据处理实战——千万行数据的批处理方案与内存控制
适读人群:需要处理大量数据的 Python 工程师、写过"跑完就 OOM"脚本的开发者 | 阅读时长:约14分钟 | 核心价值:具体可落地的大数据批处理方案,内存不够用的问题彻底解决
去年初,我接到一个任务:对业务数据库里 1800 万条历史订单做数据清洗和特征提取,结果写到另一个库里,供 AI 模型训练用。
同事之前用 pandas.read_sql() 跑过一次:把整张表读进来,DataFrame 操作,然后写出去。跑了20分钟之后进程被 OOM Killer 杀掉了——内存峰值超过了 24GB。
我来搞这件事的时候,机器只有 8GB 内存。
这篇文章就是我把这个任务跑完的完整方案。
核心原则:不要把数据全读进内存
这句话说起来很简单,但真正做到需要理解数据流动的每一个环节。
错误的思维: 数据库 → 全部读入 DataFrame → 处理 → 写出
正确的思维: 数据库 → 流式读取一批 → 处理这批 → 写出这批 → 读下一批
关键词是批处理(Batch Processing)和流式处理(Streaming)。
方案一:分批读取数据库
不要用 fetchall(),改用 fetchmany() 或游标。
import psycopg2
from contextlib import contextmanager
from typing import Generator, List, Dict
import logging
logger = logging.getLogger(__name__)
@contextmanager
def get_db_connection(dsn: str):
conn = psycopg2.connect(dsn)
try:
yield conn
finally:
conn.close()
def fetch_orders_in_batches(
dsn: str,
batch_size: int = 10000,
start_id: int = 0,
) -> Generator[List[Dict], None, None]:
"""
按批次生成器读取订单数据
使用 keyset pagination(基于 ID)而不是 OFFSET,避免深分页性能退化
"""
with get_db_connection(dsn) as conn:
cursor = conn.cursor("orders_cursor") # 服务端游标,不把数据全部发到客户端
cursor.execute("""
SELECT id, user_id, amount, status, created_at, items
FROM orders
WHERE id > %s
ORDER BY id ASC
""", (start_id,))
while True:
rows = cursor.fetchmany(batch_size)
if not rows:
break
# 转换成字典列表
columns = [desc[0] for desc in cursor.description]
batch = [dict(zip(columns, row)) for row in rows]
logger.info(f"Fetched batch: {len(batch)} rows, last_id={batch[-1]['id']}")
yield batch
cursor.close()
def process_batch(batch: List[Dict]) -> List[Dict]:
"""处理一批数据"""
results = []
for row in batch:
# 特征提取逻辑
features = {
'order_id': row['id'],
'user_id': row['user_id'],
'amount': float(row['amount']),
'hour_of_day': row['created_at'].hour,
'day_of_week': row['created_at'].weekday(),
'item_count': len(row['items']) if row['items'] else 0,
'is_high_value': float(row['amount']) > 500,
}
results.append(features)
return results
def bulk_insert(conn, records: List[Dict]):
"""批量写入,比逐条 INSERT 快几十倍"""
if not records:
return
columns = list(records[0].keys())
values = [[r[c] for c in columns] for r in records]
with conn.cursor() as cursor:
# 使用 execute_values 批量插入
from psycopg2.extras import execute_values
execute_values(
cursor,
f"INSERT INTO order_features ({','.join(columns)}) VALUES %s ON CONFLICT (order_id) DO UPDATE SET " +
", ".join(f"{c}=EXCLUDED.{c}" for c in columns if c != 'order_id'),
values,
page_size=1000,
)
conn.commit()
# 主流程
def main():
src_dsn = "postgresql://user:pass@src-host/srcdb"
dst_dsn = "postgresql://user:pass@dst-host/dstdb"
processed = 0
errors = 0
with get_db_connection(dst_dsn) as dst_conn:
for batch in fetch_orders_in_batches(src_dsn, batch_size=10000):
try:
features = process_batch(batch)
bulk_insert(dst_conn, features)
processed += len(batch)
if processed % 100000 == 0:
logger.info(f"Progress: {processed:,} rows processed")
except Exception as e:
logger.error(f"Batch processing failed: {e}", exc_info=True)
errors += len(batch)
continue # 跳过这批,继续处理下一批
logger.info(f"Done: {processed:,} processed, {errors:,} errors")
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
main()这个方案跑 1800 万条数据,内存峰值稳定在 420MB 左右,跑了约 2.5 小时完成。
踩坑实录一:OFFSET 分页的性能陷阱
现象: 用 LIMIT 10000 OFFSET 0 取第一批很快,LIMIT 10000 OFFSET 1000000 取第100批时,单次查询需要17秒。
原因: OFFSET N 的实现是:先扫描前 N 行,然后扔掉,只返回之后的行。N 越大,扫描越多,越慢。这是数据库分页的经典性能陷阱。
解法: 用 Keyset Pagination(基于 ID 的游标翻页):
# 错误方式:OFFSET 分页,越到后面越慢
SELECT * FROM orders ORDER BY id LIMIT 10000 OFFSET 1000000
# 正确方式:Keyset 分页,每次都很快
SELECT * FROM orders WHERE id > {last_id} ORDER BY id LIMIT 10000Keyset 分页每次查询的代价是固定的,因为 id 上有索引,直接定位到起始位置,不需要扫描前面的行。
踩坑实录二:pandas 的内存陷阱
我们有些处理逻辑必须用 pandas,怎么在内存有限的情况下用?
import pandas as pd
import gc
def process_with_pandas(batch: List[Dict]) -> pd.DataFrame:
"""在批次内使用 pandas,处理完立即释放"""
df = pd.DataFrame(batch)
try:
# pandas 操作
df['amount'] = pd.to_numeric(df['amount'], errors='coerce')
df['created_at'] = pd.to_datetime(df['created_at'])
df['hour_of_day'] = df['created_at'].dt.hour
df['day_of_week'] = df['created_at'].dt.dayofweek
# 用完立即删除中间变量
result = df[['id', 'user_id', 'amount', 'hour_of_day', 'day_of_week']].copy()
return result
finally:
del df
gc.collect() # 强制垃圾回收,立即释放内存
# 在循环里监控内存使用
import psutil
import os
def get_memory_mb() -> float:
process = psutil.Process(os.getpid())
return process.memory_info().rss / 1024 / 1024
for i, batch in enumerate(fetch_orders_in_batches(src_dsn)):
result_df = process_with_pandas(batch)
save_results(result_df)
if i % 10 == 0:
mem = get_memory_mb()
logger.info(f"Batch {i}, memory: {mem:.1f} MB")
if mem > 6000: # 超过6GB就报警
logger.warning("Memory usage high!")踩坑实录三:断点续跑
现象: 1800万数据处理到一半(大概第 960 万条),机器突然重启,之前所有进度都没了,要从头来。
解法: 加检查点(Checkpoint),定期保存进度。
import json
from pathlib import Path
CHECKPOINT_FILE = Path("/tmp/data_processing_checkpoint.json")
def save_checkpoint(last_processed_id: int, processed_count: int):
checkpoint = {
"last_processed_id": last_processed_id,
"processed_count": processed_count,
"timestamp": datetime.utcnow().isoformat(),
}
CHECKPOINT_FILE.write_text(json.dumps(checkpoint))
def load_checkpoint() -> dict:
if CHECKPOINT_FILE.exists():
return json.loads(CHECKPOINT_FILE.read_text())
return {"last_processed_id": 0, "processed_count": 0}
def main():
checkpoint = load_checkpoint()
start_id = checkpoint["last_processed_id"]
processed = checkpoint["processed_count"]
if start_id > 0:
logger.info(f"Resuming from id={start_id}, already processed {processed:,}")
with get_db_connection(dst_dsn) as dst_conn:
for batch in fetch_orders_in_batches(src_dsn, start_id=start_id):
features = process_batch(batch)
bulk_insert(dst_conn, features)
processed += len(batch)
last_id = batch[-1]['id']
# 每处理10万条保存一次检查点
if processed % 100000 == 0:
save_checkpoint(last_id, processed)
logger.info(f"Checkpoint saved at id={last_id}")
# 完成后删除检查点
CHECKPOINT_FILE.unlink(missing_ok=True)
logger.info(f"Complete: {processed:,} rows")并行化方案
单线程太慢?用多进程并行处理(注意要按 ID 范围分片):
from multiprocessing import Pool
from typing import Tuple
def get_id_ranges(dsn: str, num_workers: int) -> List[Tuple[int, int]]:
"""把数据按 ID 分成 N 段"""
with get_db_connection(dsn) as conn:
with conn.cursor() as cur:
cur.execute("SELECT MIN(id), MAX(id) FROM orders")
min_id, max_id = cur.fetchone()
step = (max_id - min_id) // num_workers
ranges = []
for i in range(num_workers):
start = min_id + i * step
end = min_id + (i + 1) * step if i < num_workers - 1 else max_id + 1
ranges.append((start, end))
return ranges
def worker_process(args: Tuple[int, int, str, str]):
start_id, end_id, src_dsn, dst_dsn = args
for batch in fetch_orders_in_batches(src_dsn, start_id=start_id - 1):
if batch[0]['id'] >= end_id:
break
features = process_batch(batch)
# 写入...
def main_parallel():
ranges = get_id_ranges(src_dsn, num_workers=4)
with Pool(processes=4) as pool:
pool.map(worker_process, [(s, e, src_dsn, dst_dsn) for s, e in ranges])最终效果
| 方案 | 内存峰值 | 耗时 |
|---|---|---|
| 全量读入 DataFrame | 24GB+ (OOM) | DNF |
| 单线程批处理(batch=10000) | 420MB | 2.5小时 |
| 4进程并行批处理 | 1.6GB | 42分钟 |
大规模数据处理的核心不是用什么库,是建立正确的数据流动方式:永远不要让数据全部驻留在内存里。
