Python locust 实战——用 Python 代码写压测脚本的完整指南
Python locust 实战——用 Python 代码写压测脚本的完整指南
适读人群:Python开发工程师、测试工程师、全栈开发者 | 阅读时长:约15分钟 | 核心价值:掌握locust从入门到实战的完整用法,用Python代码写出专业级别的压测脚本
Python测试工程师的压测困境
2021年我们团队招了一个测试工程师小张,Python背景,之前做的是数据测试。
让她做接口压测,她安装了JMeter,折腾了两天,用XML配置脚本,各种报错,最终放弃了,跑来找我:"老张,有没有能用Python写压测脚本的工具?"
我说:"locust,你去看看。"
她去看了文档,第二天下午就跑出来了她第一个压测结果。来找我说:"这也太简单了,比JMeter好用多了。"
locust的定位就是这样:让开发者用熟悉的Python代码写压测脚本,无需学习特殊的DSL或XML配置。
它的核心优势:
- Python代码写场景,逻辑清晰可维护
- Web UI实时查看压测进度,直观友好
- 支持分布式压测
- 支持自定义负载形态(阶梯、突增、脉冲等)
安装与快速上手
pip install locust
# 验证
locust --version
# locust 2.32.0第一个 locust 脚本
# locustfile.py
from locust import HttpUser, task, between
class QuickStartUser(HttpUser):
# 每次请求之间等待1-3秒(模拟用户思考时间)
wait_time = between(1, 3)
@task
def view_product_list(self):
"""浏览商品列表"""
self.client.get("/api/products?page=1&size=20")
@task(3) # 权重3:这个任务被选中的概率是其他@task(1)的3倍
def view_product_detail(self):
"""查看商品详情"""
import random
product_id = random.randint(1001, 5000)
self.client.get(f"/api/products/{product_id}")
@task(1)
def search_product(self):
"""搜索商品"""
keywords = ["手机", "电脑", "耳机", "平板"]
import random
kw = random.choice(keywords)
self.client.get(f"/api/search?q={kw}")运行Web UI模式(推荐):
locust -f locustfile.py --host=https://api.example.com然后访问http://localhost:8089,在Web界面里输入用户数和增长速率,点Start启动压测。
命令行无UI模式(适合CI):
locust -f locustfile.py \
--host=https://api.example.com \
--headless \ # 无Web UI
-u 200 \ # 用户总数200
-r 10 \ # 每秒增加10个用户
--run-time 10m \ # 运行10分钟
--html report.html # 生成HTML报告核心 API 详解
HttpUser 类
from locust import HttpUser, task, between, constant, constant_throughput
from locust.exception import RescheduleTask
class OrderUser(HttpUser):
# wait_time 的三种方式
wait_time = between(1, 5) # 随机1-5秒
# wait_time = constant(2) # 固定2秒
# wait_time = constant_throughput(2) # 目标2 RPS(locust自动调整等待时间)
host = "https://api.example.com" # 也可以在启动时用--host覆盖
def on_start(self):
"""用户"登录"时执行,相当于前置准备"""
response = self.client.post("/api/auth/login", json={
"username": f"test_user_{self.user_id}", # 每个用户独立账号
"password": "test_pass"
})
if response.status_code == 200:
self.token = response.json()["data"]["accessToken"]
else:
raise RescheduleTask() # 登录失败,放弃这个用户的任务
def on_stop(self):
"""用户"退出"时执行"""
self.client.post("/api/auth/logout",
headers={"Authorization": f"Bearer {self.token}"})
@task(5)
def browse(self):
"""权重5的任务"""
with self.client.get(
"/api/products",
headers={"Authorization": f"Bearer {self.token}"},
name="GET 商品列表", # name用于统计分组,不同URL可以用同一个name
catch_response=True # 开启手动断言
) as response:
if response.status_code == 200:
data = response.json()
if data.get("code") != 200:
response.failure(f"业务错误: {data.get('msg')}")
else:
response.success()
else:
response.failure(f"HTTP错误: {response.status_code}")
@task(1)
def create_order(self):
"""创建订单"""
import random
with self.client.post(
"/api/order/create",
json={
"userId": self.user_id,
"productId": random.randint(1001, 5000),
"quantity": random.randint(1, 3)
},
headers={
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
},
name="POST 创建订单",
catch_response=True
) as response:
if response.status_code == 200 and response.json().get("code") == 200:
response.success()
else:
response.failure(f"下单失败: {response.text[:200]}")顺序任务(Sequential Task Sets)
有些场景需要按固定顺序执行步骤:
from locust import HttpUser, SequentialTaskSet, task
class CheckoutTaskSet(SequentialTaskSet):
"""下单流程:按顺序执行的任务集"""
@task
def step1_view_cart(self):
"""Step1: 查看购物车"""
response = self.client.get("/api/cart",
headers={"Authorization": f"Bearer {self.user.token}"})
if response.status_code == 200:
items = response.json().get("data", {}).get("items", [])
if items:
self.cart_item_id = items[0]["id"]
@task
def step2_create_order(self):
"""Step2: 创建订单(需要step1的数据)"""
if not hasattr(self, 'cart_item_id'):
return # step1没有成功,跳过
response = self.client.post("/api/order/create",
json={"cartItemIds": [self.cart_item_id]},
headers={"Authorization": f"Bearer {self.user.token}"})
if response.status_code == 200:
self.order_id = response.json()["data"]["orderId"]
@task
def step3_pay_order(self):
"""Step3: 支付订单"""
if not hasattr(self, 'order_id'):
return
self.client.post(f"/api/payment/pay/{self.order_id}",
json={"paymentMethod": "alipay"},
headers={"Authorization": f"Bearer {self.user.token}"})
class ShopUser(HttpUser):
tasks = [CheckoutTaskSet]
wait_time = between(1, 3)自定义负载形态
locust支持完全自定义的负载曲线,不限于简单的线性加压:
# custom_shape.py
from locust import LoadTestShape
class StagesShape(LoadTestShape):
"""
自定义负载形态:
0-2min: 爬升到100用户
2-7min: 保持100用户
7-9min: 爬升到200用户
9-14min: 保持200用户
14-15min: 降到0
"""
stages = [
{"duration": 120, "users": 100, "spawn_rate": 1},
{"duration": 420, "users": 100, "spawn_rate": 1},
{"duration": 540, "users": 200, "spawn_rate": 2},
{"duration": 840, "users": 200, "spawn_rate": 2},
{"duration": 900, "users": 0, "spawn_rate": 5},
]
def tick(self):
run_time = self.get_run_time()
for stage in self.stages:
if run_time < stage["duration"]:
tick_data = (stage["users"], stage["spawn_rate"])
return tick_data
return None # 返回None表示测试结束
class SpikeShape(LoadTestShape):
"""
脉冲测试:模拟流量突增场景
正常流量50用户,每隔2分钟突增到500用户保持30秒
"""
def tick(self):
run_time = self.get_run_time()
cycle = run_time % 150 # 150秒一个周期
if cycle < 30: # 前30秒:突增流量
return (500, 50) # 500用户,每秒增加50
else: # 后120秒:正常流量
return (50, 10) # 50用户,每秒增加10分布式压测
当单机用户数不够时,locust支持Master-Worker分布式架构:
# Master节点(负责协调和统计,不发请求)
locust -f locustfile.py \
--master \
--host=https://api.example.com \
--expect-workers=5 # 等待5个Worker连接后开始
# Worker节点(实际发送请求,每个worker运行在独立机器上)
# 在5台Worker机器上各跑一次:
locust -f locustfile.py \
--worker \
--master-host=10.0.1.100 # Master的IP
# 启动压测(在Master的Web UI里操作,或用headless模式)
# 200用户 = 5台Worker × 每台40个用户完整压测脚本:电商全场景
# ecommerce_load_test.py
"""
电商系统全场景压测
覆盖:浏览、搜索、下单、支付全链路
流量分配:浏览60% | 搜索25% | 下单10% | 支付5%
"""
import random
import logging
from locust import HttpUser, task, between, events
from locust.exception import RescheduleTask
# 测试数据
USER_POOL = [{"username": f"bench_user_{i}", "password": "Test@1234"}
for i in range(1, 50001)]
PRODUCT_IDS = list(range(1001, 11001))
KEYWORDS = ["手机", "电脑", "耳机", "平板电脑", "键盘", "鼠标", "显示器"]
class EcommerceUser(HttpUser):
wait_time = between(1, 4)
def on_start(self):
"""用户上线,登录获取token"""
user = random.choice(USER_POOL)
resp = self.client.post(
"/api/auth/login",
json=user,
name="POST 登录"
)
if resp.status_code != 200 or resp.json().get("code") != 200:
logging.error(f"Login failed: {resp.text[:100]}")
raise RescheduleTask()
data = resp.json()["data"]
self.token = data["accessToken"]
self.user_id = data["userId"]
self.headers = {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
@task(12)
def browse_products(self):
"""浏览商品列表(60%流量)"""
page = random.randint(1, 10)
with self.client.get(
f"/api/products?page={page}&size=20",
headers=self.headers,
name="GET 商品列表",
catch_response=True
) as resp:
if resp.status_code == 200 and resp.json().get("code") == 200:
resp.success()
else:
resp.failure(f"Browse failed: {resp.status_code}")
@task(5)
def search_products(self):
"""搜索商品(25%流量)"""
kw = random.choice(KEYWORDS)
with self.client.get(
f"/api/search?q={kw}&page=1",
headers=self.headers,
name="GET 搜索商品",
catch_response=True
) as resp:
if resp.status_code == 200:
resp.success()
else:
resp.failure(f"Search failed: {resp.status_code}")
@task(2)
def create_order(self):
"""创建订单(10%流量)"""
product_id = random.choice(PRODUCT_IDS)
with self.client.post(
"/api/order/create",
json={
"userId": self.user_id,
"productId": product_id,
"quantity": random.randint(1, 3)
},
headers=self.headers,
name="POST 创建订单",
catch_response=True
) as resp:
if resp.status_code == 200 and resp.json().get("code") == 200:
self._pending_order_id = resp.json()["data"]["orderId"]
resp.success()
else:
resp.failure(f"Order create failed: {resp.status_code}")
@task(1)
def pay_order(self):
"""支付订单(5%流量)"""
order_id = getattr(self, '_pending_order_id', None)
if not order_id:
return # 没有待支付的订单,跳过
with self.client.post(
f"/api/payment/pay",
json={"orderId": order_id, "paymentMethod": "balance"},
headers=self.headers,
name="POST 支付",
catch_response=True
) as resp:
if resp.status_code == 200 and resp.json().get("code") == 200:
self._pending_order_id = None
resp.success()
else:
resp.failure(f"Payment failed: {resp.status_code}")
# 测试结束时输出自定义统计
@events.test_stop.add_listener
def on_test_stop(environment, **kwargs):
stats = environment.stats
print("\n========== Custom Summary ==========")
for name, entry in stats.entries.items():
if entry.num_requests > 0:
print(f"{name[1]:30s} "
f"P50={entry.get_response_time_percentile(0.5):.0f}ms "
f"P99={entry.get_response_time_percentile(0.99):.0f}ms "
f"RPS={entry.total_rps:.1f} "
f"Err={entry.fail_ratio*100:.2f}%")踩坑实录
坑1:locust Web UI在高并发下自身卡顿
现象: 用locust Web UI压测,用户数超过1000时,Web UI页面刷新很慢,数据更新频率从每秒降到每10秒。
原因: locust Master节点在处理大量Worker数据统计的同时,还要响应Web UI的请求,资源竞争导致UI卡顿。
解法: 大规模压测时使用--headless模式,不开Web UI。如果需要实时监控,用--csv=result把统计数据输出到CSV,再另外用脚本可视化。
坑2:wait_time设置错误导致单机用户数上限很低
现象: 设置了wait_time = constant(0.1)(100ms等待),1000个用户跑着跑着变成了500个,locust日志里一直在Warning"Can not spawn more users"。
原因: wait_time太短,每个用户的任务完成很快,locust spawn的速度跟不上。另外每个用户(协程)也有内存开销,用户数过多时单机内存不够。
解法: 单机locust用户数上限通常是500-2000(取决于wait_time和任务复杂度)。更多用户需要分布式。wait_time = between(1, 3)是比较合理的值,接近真实用户行为。
总结
locust是Python工程师做压测的最佳选择。代码即脚本,逻辑清晰,支持复杂场景,Web UI友好,分布式扩展方便。
整个性能测试系列到这里就完整了。从认知(第1篇)到工具(JMeter/Gatling/k6/locust),从场景设计到结果分析,从单接口到全链路,希望这20篇能帮你建立完整的性能测试体系。
性能测试不是玄学,是工程——用数据说话,用工具排查,用优化验证。
祝你的系统在下一次大促里零故障。
