Python 测试并行执行实战——pytest-xdist、进程级并行、测试隔离
Python 测试并行执行实战——pytest-xdist、进程级并行、测试隔离
适读人群:测试套件较慢、希望提速的 Python 工程师 | 阅读时长:约 14 分钟 | 核心价值:掌握 pytest-xdist 的并行执行配置,让测试套件速度提升 3-8 倍
一个测试套件从 18 分钟到 3 分钟的历程
我们项目有 350 个测试用例,全部跑完需要 18 分钟。每次 PR 等 CI,等到想睡觉。
我仔细分析了原因:
- 其中 200 个是 API 接口测试,每个测试都要发 HTTP 请求
- 测试用例之间没有依赖,完全可以并行
于是引入了 pytest-xdist,并花了 2 天做了必要的隔离改造。
最终结果:4 个 worker 并行,18 分钟 → 5 分钟。再加上优化了数据库 fixture 的 scope,最终降到 3 分钟。
今天把全过程写出来。
pytest-xdist 快速入门
pip install pytest-xdist# 使用 N 个 worker 并行运行
pytest -n 4
# 自动检测 CPU 核数
pytest -n auto
# 使用逻辑 CPU 数量(含超线程)
pytest -n logical就这两步——安装、加一个参数,很多测试套件就能获得几倍的速度提升。
但如果你的测试有共享状态,并行后可能出现随机失败。所以并行化前必须做隔离分析。
并行化的前提:测试隔离
并行执行要求每个测试完全独立,不能有任何共享的可变状态。
常见的隔离问题
问题一:共享数据库数据
# 危险:两个测试用同一个用户名
def test_register_user_a():
create_user("alice@test.com") # Worker 1
assert user_exists("alice@test.com")
def test_register_user_b():
create_user("alice@test.com") # Worker 2 同时运行
# 可能失败:用户已存在解法: 每个测试使用带 UUID 的唯一数据:
def test_register_user_a():
email = f"test_{uuid.uuid4().hex[:8]}@test.com"
create_user(email)
assert user_exists(email)问题二:共享文件
# 危险:多个 worker 同时读写同一个文件
def test_process_file():
with open("test_input.csv", "w") as f:
f.write("data")
result = process_file("test_input.csv")
# Worker 之间可能产生竞态解法: 使用 pytest 内置的 tmp_path fixture,每个测试有独立的临时目录:
def test_process_file(tmp_path):
input_file = tmp_path / "input.csv"
input_file.write_text("data")
result = process_file(str(input_file))
assert result.row_count == 1问题三:共享内存状态
# 危险:全局变量在进程内共享
_cached_config = None
def get_config():
global _cached_config
if _cached_config is None:
_cached_config = load_config()
return _cached_config
def test_config_loading():
config = get_config()
config["timeout"] = 999 # 修改了全局配置!影响后续测试解法: 使用不可变配置对象,或者每个测试创建独立的配置副本。
数据库并行隔离方案
方案一:每个 worker 使用独立数据库
# conftest.py
import pytest
from xdist import get_unique_id
@pytest.fixture(scope="session")
def worker_id(request):
"""获取当前 worker 的唯一 ID(master/gw0/gw1/...)"""
if hasattr(request.config, "workerinput"):
return request.config.workerinput["workerid"]
return "master"
@pytest.fixture(scope="session")
def db_url(worker_id):
"""每个 worker 使用独立的测试数据库"""
if worker_id == "master":
db_name = "testdb"
else:
db_name = f"testdb_{worker_id}" # testdb_gw0, testdb_gw1, ...
return f"postgresql://localhost/{db_name}"
@pytest.fixture(scope="session", autouse=True)
def create_worker_database(db_url, worker_id):
"""为每个 worker 创建和初始化数据库"""
from myapp.database import create_database, drop_database
db_name = db_url.split("/")[-1]
create_database(db_name)
yield
drop_database(db_name)方案二:使用事务隔离(不需要多个数据库)
更轻量的方案:每个测试函数用独立的事务,测试结束后回滚:
@pytest.fixture
def db_session(db_engine):
"""
每个测试用独立事务,结束后回滚
并行安全:每个事务相互隔离(数据库行级锁)
"""
connection = db_engine.connect()
transaction = connection.begin()
session = Session(bind=connection)
yield session
session.close()
transaction.rollback() # 无论测试结果如何,都回滚
connection.close()注意:事务隔离方案需要数据库支持 SERIALIZABLE 或 READ COMMITTED 隔离级别,且并发测试间不会有严重的行锁竞争。
pytest-xdist 的分发策略
xdist 支持多种测试分发策略:
# 默认:负载均衡(worker 完成一个测试后领取下一个)
pytest -n 4
# 按文件分组(同一文件的测试在同一 worker 运行)
# 适合:同一文件的测试共享 session scope fixture,避免重复初始化
pytest -n 4 --dist=loadfile
# 按模块分组
pytest -n 4 --dist=loadgroup
# 指定分组(通过 marker)# 使用 xdist_group marker 控制哪些测试在同一 worker 运行
@pytest.mark.xdist_group("payment-tests")
def test_payment_flow_a():
...
@pytest.mark.xdist_group("payment-tests")
def test_payment_flow_b():
...
# 这两个测试一定在同一 worker 上运行并行 + 覆盖率报告
并行执行时,每个 worker 生成自己的覆盖率数据,需要合并:
# 安装 coverage 的并行支持
pip install pytest-cov coverage
# 运行并自动合并覆盖率
pytest -n 4 --cov=myapp --cov-report=term-missing
# pytest-cov 会自动处理并行覆盖率合并如果需要手动合并:
# 每个 worker 生成 .coverage.gw0, .coverage.gw1 等文件
# 合并所有 coverage 数据
coverage combine
# 生成报告
coverage report
coverage html处理无法并行的测试
有些测试不能并行化(比如需要独占某个端口,或者有顺序依赖):
# 方案一:用 marker 标记为顺序执行
@pytest.mark.serial # 自定义 marker
# 方案二:分开跑
pytest tests/unit/ -n auto # 单元测试并行
pytest tests/integration/ -n 2 # 集成测试限制并发
pytest tests/e2e/ -n 1 # E2E 测试顺序执行(-n 1 等于不并行)在 Makefile 中:
test-parallel:
pytest tests/unit tests/api -n auto --cov=myapp
pytest tests/e2e -n 2
test-ci:
pytest tests/ -n auto --cov=myapp --cov-fail-under=80 -v踩坑实录
坑一:session scope fixture 在每个 worker 中都重新创建
现象: session scope 的数据库初始化 fixture,在 4 个 worker 的情况下执行了 4 次。
原因: xdist 的每个 worker 是独立的进程,session scope 只在单个进程内有效,多进程之间无法共享。
解法: 如果需要只初始化一次,用文件锁或 pytest-xdist 的 tmp_path_factory:
@pytest.fixture(scope="session")
def db_schema(tmp_path_factory, worker_id):
"""只让 master 进程(或第一个 worker)初始化 schema"""
if worker_id == "master":
# 非并行运行,正常初始化
init_db_schema()
return
# 使用文件锁确保只有一个 worker 初始化 schema
lock_file = tmp_path_factory.getbasetemp().parent / "db_init.lock"
done_file = tmp_path_factory.getbasetemp().parent / "db_init.done"
with FileLock(str(lock_file)):
if not done_file.exists():
init_db_schema()
done_file.touch()坑二:并行测试中的端口冲突
现象: 测试中启动了一个本地服务器监听固定端口,并行时多个 worker 冲突。
解法: 使用随机端口:
import socket
@pytest.fixture
def free_port():
"""获取一个未被占用的随机端口"""
with socket.socket() as s:
s.bind(("", 0))
return s.getsockname()[1]
@pytest.fixture
def test_server(free_port):
server = start_test_server(port=free_port)
yield f"http://localhost:{free_port}"
server.stop()坑三:xdist 并行时的日志混乱
现象: 并行测试时,多个 worker 的日志输出混在一起,难以阅读。
解法: 配置每个 worker 的日志输出到独立文件:
# conftest.py
def pytest_configure(config):
worker_id = getattr(config, "workerinput", {}).get("workerid", "master")
logging.basicConfig(
filename=f"logs/test_{worker_id}.log",
level=logging.DEBUG,
format="%(asctime)s %(name)s %(levelname)s %(message)s"
)性能对比数据(参考)
基于我们的实际项目(350 个测试用例,API 测试为主):
| 配置 | 耗时 |
|---|---|
| 单进程(-n 1) | 18 分钟 |
| 2 个 worker | 10 分钟 |
| 4 个 worker | 5 分钟 |
| 4 个 worker + scope 优化 | 3 分钟 |
达到 6x 加速。你的项目结果取决于测试的 I/O 密集程度和隔离成本。
小结
pytest-xdist 并行化的步骤:
- 安装:
pip install pytest-xdist - 先诊断:找出共享状态(数据库、文件、全局变量)
- 做隔离:唯一化数据、使用
tmp_path、事务回滚 - 选策略:
--dist=loadfile适合共享 fixture 多的情况 - 渐进式:先加
-n 2,稳定后再加-n auto
线程安全的 fixture 设计
并行测试中,fixture 的线程安全问题值得特别关注:
# 危险:module scope 的 fixture 中有可变状态,多个 worker 共享进程内的 module scope
@pytest.fixture(scope="module")
def counter():
return {"count": 0} # 在同一进程的不同线程中,这个 dict 是共享的
def test_increment_a(counter):
counter["count"] += 1 # Worker 线程 A 修改
assert counter["count"] == 1 # 可能因为并发修改而失败
# 安全:function scope,每个测试有独立对象
@pytest.fixture
def counter():
return {"count": 0} # 每个测试有自己的 dict重要提醒:xdist 的多 worker 是多进程,不是多线程。进程间不共享内存,所以大多数 Python 对象是安全的。但共享资源(数据库、文件、网络端口)才是并行测试的真正挑战。
调试并行测试失败
并行测试的调试比串行更复杂,以下技巧有用:
# 方法一:把并行缩减到 2 个 worker,更容易复现
pytest -n 2 tests/problematic_test.py -v
# 方法二:先串行跑,确认问题是否只在并行时出现
pytest -n 1 tests/ # 等同于不并行
# 方法三:打印 worker ID,确认问题的模式# conftest.py 中打印每个测试所在的 worker
@pytest.fixture(autouse=True)
def log_worker_id(worker_id, request):
print(f"\n[Worker: {worker_id}] Running: {request.node.name}")# 方法四:把失败的测试单独隔离出来运行
pytest tests/test_order.py::TestCheckout::test_payment -v -s最大并行数的经验规律
并不是 worker 越多越快,有一个最优区间:
CPU 密集型测试:worker 数 ≈ CPU 核数
I/O 密集型测试(API 调用、数据库):worker 数 ≈ CPU 核数 × 2
E2E 测试(浏览器):worker 数 ≈ 2-4(受内存限制)我们团队 API 测试在 8 核 CI 机器上的最佳配置:
pytest -n 16 # 8核 × 2,I/O 密集型,CPU 有大量空闲时间超过 16 就没有提升了,因为数据库连接池和 I/O 成了瓶颈。
