Python 服务上线前的10个必查项——我们踩过的血泪教训
Python 服务上线前的10个必查项——我们踩过的血泪教训
适读人群:有 Python 服务上线经验、或即将第一次把 Python 服务推上生产的工程师 | 阅读时长:约14分钟 | 核心价值:用别人的教训换自己的安稳,上线前逐条自查
去年11月的某个周四下午3点47分,我们的一个 Python 数据接口服务刚上线不到两小时,就开始大量报 500。监控告警轰炸,运营那边发来消息说用户投诉激增。
我盯着日志看了20分钟,最后发现根本原因是:生产环境的数据库连接池大小配置还是开发时的默认值 5,并发稍一上来就全部等待超时。
就这么一行配置,让我们紧急回滚,连夜修复,第二天重新上线。
那次之后,我整理了一份"上线前必查清单",后来又经历了几次类似事故,不断补充。今天分享出来,10个必查项,每一条背后都有真实的故事。
必查项一:环境变量和配置文件
这是最容易翻车的地方,没有之一。
踩坑实录:
有次新来的同学第一次独立上线,把开发环境的 .env 文件直接打包进了镜像。结果生产环境连的是本地 MySQL,数据全写到他自己笔记本上了。发现的时候已经有427条用户数据写错了地方。
怎么查:
import os
from pathlib import Path
def check_env_config():
"""上线前环境变量自查函数"""
required_vars = [
'DATABASE_URL',
'REDIS_URL',
'SECRET_KEY',
'APP_ENV',
'LOG_LEVEL',
]
missing = []
suspicious = []
for var in required_vars:
val = os.getenv(var)
if not val:
missing.append(var)
else:
# 检查是否还是开发环境的值
dev_keywords = ['localhost', '127.0.0.1', 'dev', 'test', 'local']
if any(kw in val.lower() for kw in dev_keywords):
suspicious.append((var, val))
if missing:
raise EnvironmentError(f"缺少必要环境变量: {missing}")
if suspicious:
print("⚠️ 以下变量看起来像开发环境配置,请确认:")
for var, val in suspicious:
# 脱敏输出,只显示前8个字符
masked = val[:8] + '...' if len(val) > 8 else val
print(f" {var} = {masked}")
app_env = os.getenv('APP_ENV', '').lower()
if app_env not in ('production', 'prod'):
raise ValueError(f"APP_ENV={app_env},上线前必须设为 production")
print("✅ 环境变量检查通过")
if __name__ == '__main__':
check_env_config()把这个检查放在服务启动的最开始,不通过就直接启动失败,强制逼自己把配置搞对。
必查项二:数据库连接池配置
开头说的那次事故就是这个问题。
踩坑实录:
SQLAlchemy 默认连接池大小是 5,max_overflow 默认是 10,也就是说最多 15 个并发连接。你的服务如果有 4 个 worker,每个 worker 处理请求时要拿连接,稍微并发一点就把池子打满了。剩下的请求开始等,等超时就报错。
开发环境因为并发低,从来没出问题,一上生产就挂。
解法:
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool
# 错误做法:直接用默认值
# engine = create_engine(DATABASE_URL)
# 正确做法:根据实际情况配置
engine = create_engine(
DATABASE_URL,
poolclass=QueuePool,
pool_size=20, # 稳定连接数,根据数据库最大连接数决定
max_overflow=10, # 突发时额外允许的连接数
pool_timeout=30, # 等待连接的超时时间(秒)
pool_recycle=3600, # 连接回收时间,避免数据库侧超时断开
pool_pre_ping=True, # 每次取连接先 ping 一下,避免用到死连接
echo=False, # 生产环境关掉 SQL 日志
)pool_pre_ping=True 这个参数非常重要,很多人不知道。数据库空闲一段时间会主动关闭连接,但连接池里还留着这个"死连接",取出来用的时候直接报错。开了 pre_ping 之后,每次取连接会先发一个 ping,如果连接断了会自动重连。
必查项三:日志级别和日志格式
踩坑实录:
生产环境有个服务,DEBUG 日志一直没关。某个接口会把完整的请求参数打印出来,包括用户的手机号、身份证号。日志发到了日志系统里,有权限的人都能看。这属于数据合规问题,处理起来很麻烦。
另外,DEBUG 级别的日志量极大,I/O 压力会很明显地影响服务性能。
怎么查:
import logging
# 生产环境标准配置
logging.basicConfig(
level=logging.INFO, # 生产环境用 INFO,不要用 DEBUG
format='%(asctime)s [%(levelname)s] %(name)s:%(lineno)d - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
)
# 第三方库的日志通常很啰嗦,单独压低
logging.getLogger('sqlalchemy.engine').setLevel(logging.WARNING)
logging.getLogger('urllib3').setLevel(logging.WARNING)
logging.getLogger('httpx').setLevel(logging.WARNING)检查点:
LOG_LEVEL是否设为INFO或以上- 日志里有没有打印敏感字段(密码、token、手机号)
- 日志格式是否包含 timestamp 和 logger name,方便排查
必查项四:超时配置
这是我见过最多被忽视的配置。
踩坑实录:
我们有个服务调用第三方 AI 接口,没有设置超时。某次第三方接口变慢,响应从平均0.3秒变成了30秒。我们的服务 worker 一直在等,全被占满了,新请求进来全部排队,整个服务卡死。
必须配置的超时:
import httpx
import requests
# requests 库——必须同时设连接超时和读超时
response = requests.get(
url,
timeout=(5, 30), # (连接超时秒, 读取超时秒),不要只传一个数字
)
# httpx 异步客户端
async with httpx.AsyncClient(
timeout=httpx.Timeout(
connect=5.0,
read=30.0,
write=10.0,
pool=5.0,
)
) as client:
response = await client.get(url)
# 数据库查询超时(以 SQLAlchemy + PostgreSQL 为例)
from sqlalchemy import text, event
@event.listens_for(engine, 'before_cursor_execute')
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
cursor.execute("SET statement_timeout = '10s'")超时的原则:每一个 I/O 操作都要有超时,没有例外。
必查项五:健康检查接口
很多服务上线了没有 /health 接口,或者有但没有真正检查依赖服务的状态。
踩坑实录:
K8s 的 readinessProbe 配的是 /health,但这个接口只是返回 {"status": "ok"},根本没检查数据库是不是通的。数据库挂了,Pod 还是 Ready 状态,流量继续打进来,全部 500。
一个真正有用的健康检查:
from fastapi import FastAPI, HTTPException
from sqlalchemy import text
import redis
import time
app = FastAPI()
@app.get("/health")
async def health_check():
start = time.time()
checks = {}
all_ok = True
# 检查数据库
try:
with engine.connect() as conn:
conn.execute(text("SELECT 1"))
checks['database'] = 'ok'
except Exception as e:
checks['database'] = f'error: {str(e)}'
all_ok = False
# 检查 Redis
try:
r = redis.from_url(REDIS_URL)
r.ping()
checks['redis'] = 'ok'
except Exception as e:
checks['redis'] = f'error: {str(e)}'
all_ok = False
elapsed = round((time.time() - start) * 1000, 2)
result = {
"status": "ok" if all_ok else "degraded",
"checks": checks,
"elapsed_ms": elapsed,
}
if not all_ok:
raise HTTPException(status_code=503, detail=result)
return result必查项六:异常处理和全局错误兜底
踩坑实录:
有个接口,某个边界输入会触发 ZeroDivisionError,没有被捕获,直接把完整的 Python traceback 返回给前端了。用户能看到我们的代码路径、文件名、具体报错行。这是安全问题。
全局兜底:
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
import traceback
import logging
logger = logging.getLogger(__name__)
app = FastAPI()
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
# 日志里记录完整 traceback
logger.error(
f"Unhandled exception on {request.method} {request.url.path}",
exc_info=True,
)
# 返回给用户的只有模糊的错误信息,不暴露内部细节
return JSONResponse(
status_code=500,
content={"error": "Internal server error", "request_id": request.headers.get("X-Request-ID")},
)必查项七:内存和资源泄漏
踩坑实录:
我们有个 Python 服务跑了两三天就开始 OOM 被 K8s kill 掉重启。排查了很久,最后发现是一个列表在全局变量里不断 append,永远没有清空。代码是个新同学写的,在 request handler 里往全局 list 里追加日志,以为之后会处理,结果那个"之后"永远没来。
上线前检查:
- 有没有全局的可变容器(list、dict)在 request 处理中被修改?
- 文件 handle、数据库连接有没有用
with语句或者显式关闭? - 大对象(DataFrame、大列表)处理完有没有
del并调gc.collect()?
必查项八:并发安全
Python 有 GIL,但这不代表没有并发问题。多进程之间共享状态,或者 asyncio 里的竞态条件,都会出问题。
踩坑实录:
Celery worker 用多进程模式,两个 worker 同时处理同一个任务 ID 的消息(消息重复投递),结果同一个订单被处理了两次,扣款扣了两次。
解法:
- 关键操作加分布式锁(Redis SETNX 或 Redlock)
- 数据库层面加唯一索引,作为最后一道防线
- Celery task 加幂等检查:处理前先查任务状态
必查项九:Graceful Shutdown
踩坑实录:
直接 kill -9 或者 K8s 强制终止 Pod,正在处理中的请求直接断了,用户看到连接重置,数据可能写到一半。
正确做法(以 FastAPI + uvicorn 为例):
import signal
import asyncio
from contextlib import asynccontextmanager
shutdown_event = asyncio.Event()
@asynccontextmanager
async def lifespan(app):
# 启动时的初始化
print("Service starting...")
yield
# 收到关闭信号后的清理
print("Service shutting down, waiting for ongoing requests...")
await asyncio.sleep(2) # 给正在处理的请求一点时间完成
print("Cleanup done.")
app = FastAPI(lifespan=lifespan)K8s 的 terminationGracePeriodSeconds 设为 30 秒以上,preStop hook 里加一个 sleep,让 LB 先把流量摘掉再开始关进程。
必查项十:监控和告警
上线没有监控,等于开着飞机没有仪表盘。
最低标准:
- 接口响应时间的 P99(不是平均值,平均值会掩盖尾部问题)
- 错误率(4xx/5xx 占比)
- 进程内存使用
- 数据库连接池使用率
# 用 prometheus_client 暴露基本指标
from prometheus_client import Counter, Histogram, Gauge, make_asgi_app
import time
REQUEST_COUNT = Counter('http_requests_total', 'Total HTTP requests', ['method', 'endpoint', 'status'])
REQUEST_DURATION = Histogram('http_request_duration_seconds', 'HTTP request duration', ['endpoint'])
DB_POOL_SIZE = Gauge('db_pool_size', 'Database connection pool size')
@app.middleware("http")
async def metrics_middleware(request, call_next):
start = time.time()
response = await call_next(request)
duration = time.time() - start
REQUEST_COUNT.labels(
method=request.method,
endpoint=request.url.path,
status=response.status_code,
).inc()
REQUEST_DURATION.labels(endpoint=request.url.path).observe(duration)
return response
# 暴露 /metrics 端点给 Prometheus 抓取
app.mount("/metrics", make_asgi_app())汇总:上线前自查表
| # | 检查项 | 常见问题 | 严重程度 |
|---|---|---|---|
| 1 | 环境变量 | 开发配置混入生产 | 🔴 致命 |
| 2 | 数据库连接池 | 默认值太小,高并发超时 | 🔴 致命 |
| 3 | 日志级别 | DEBUG 未关,打印敏感信息 | 🟠 严重 |
| 4 | 超时配置 | 无超时,下游慢导致雪崩 | 🔴 致命 |
| 5 | 健康检查 | 假健康,依赖挂了还接流量 | 🟠 严重 |
| 6 | 异常处理 | 堆栈信息暴露给用户 | 🟠 严重 |
| 7 | 内存泄漏 | 全局可变对象持续增长 | 🟡 中等 |
| 8 | 并发安全 | 重复处理、数据竞争 | 🔴 致命 |
| 9 | 优雅关闭 | 强制终止导致请求中断 | 🟡 中等 |
| 10 | 监控告警 | 出了问题自己不知道 | 🟠 严重 |
每次上线前我都会把这张表过一遍。不是因为我记性差,是因为每次都可能有遗漏——人是会疲惫的,清单不会。
上线这件事,做过的准备永远不嫌多,出了事故永远都是"早知道……"。
