Python 安全编码实战——输入验证、SQL 注入防御、敏感数据处理
Python 安全编码实战——输入验证、SQL 注入防御、敏感数据处理
适读人群:Python 后端工程师、对安全编码感兴趣的开发者、负责安全 review 的工程师 | 阅读时长:约14分钟 | 核心价值:用真实案例讲安全漏洞怎么产生、怎么防,不是泛泛而谈
2023年春节前,我做了一次内部安全代码 review,扫了公司几个 Python 项目。结果找出了 23 个安全问题,其中有 4 个是我认为如果被人发现、会造成严重损失的高危漏洞。
最让我难受的是,这些代码不是新人写的,有两个问题是在老同学写的"成熟"代码里。
安全问题不是特别高深的领域,大部分漏洞的原因非常简单:没有对输入做验证,或者信任了不该信任的数据。
SQL 注入防御
SQL 注入在 2024 年还是最常见的 Web 漏洞之一。Python 里主要有两种写法:一种安全,一种不安全。
不安全的写法(会被 SQL 注入):
# 千万不要这样写!!!
def get_user_by_name(conn, username: str):
# 如果 username = "' OR '1'='1" 就完蛋了
query = f"SELECT * FROM users WHERE username = '{username}'"
cursor = conn.execute(query)
return cursor.fetchone()
# 同样危险的 % 拼接
query = "SELECT * FROM users WHERE username = '%s'" % username安全的写法(参数化查询):
# 正确:使用参数化查询,数据库驱动自动转义
def get_user_by_name(conn, username: str):
cursor = conn.execute(
"SELECT * FROM users WHERE username = %s",
(username,) # 参数作为元组传入,不要用字符串拼接!
)
return cursor.fetchone()
# SQLAlchemy ORM(自动参数化)
from sqlalchemy import select
async def get_user_by_name(db: AsyncSession, username: str):
result = await db.execute(
select(User).where(User.username == username)
)
return result.scalar_one_or_none()
# SQLAlchemy 文本查询(必须显式用 bindparams)
from sqlalchemy import text
async def search_users(db: AsyncSession, search_term: str):
result = await db.execute(
text("SELECT * FROM users WHERE username LIKE :term"),
{"term": f"%{search_term}%"} # 参数用 :name 占位符
)
return result.fetchall()踩坑实录: 我们 review 到的一个真实案例,一个搜索接口:
# 有漏洞的代码(脱敏处理,保留结构)
@app.get("/search")
async def search(q: str, db = Depends(get_db)):
# q 来自用户输入,直接拼进 SQL
results = await db.execute(f"SELECT * FROM products WHERE name LIKE '%{q}%'")
return results.fetchall()攻击者可以传入 q = "'; DROP TABLE products; --",你懂的。
输入验证
原则:所有来自用户的输入都是不可信的,包括路径参数、查询参数、请求体、请求头。
用 Pydantic 做严格的输入校验:
from pydantic import BaseModel, Field, field_validator, EmailStr
from typing import Optional
import re
class UserCreateRequest(BaseModel):
username: str = Field(
min_length=3,
max_length=50,
pattern=r'^[a-zA-Z0-9_-]+$', # 只允许字母数字下划线横杠
description="用户名,3-50个字符,只允许字母数字下划线横杠"
)
email: EmailStr # Pydantic 内置的邮箱格式验证
password: str = Field(min_length=8, max_length=128)
age: Optional[int] = Field(None, ge=0, le=150) # 年龄 0-150
@field_validator('password')
@classmethod
def validate_password_strength(cls, v: str) -> str:
"""密码强度验证"""
if not re.search(r'[A-Z]', v):
raise ValueError('密码必须包含至少一个大写字母')
if not re.search(r'[a-z]', v):
raise ValueError('密码必须包含至少一个小写字母')
if not re.search(r'\d', v):
raise ValueError('密码必须包含至少一个数字')
return v
@field_validator('username')
@classmethod
def validate_username_no_reserved(cls, v: str) -> str:
"""禁止使用保留用户名"""
reserved = {'admin', 'root', 'system', 'api', 'null', 'undefined'}
if v.lower() in reserved:
raise ValueError(f'用户名 {v} 是保留词,请换一个')
return v
# 路径参数也要验证
@app.get("/users/{user_id}")
async def get_user(
user_id: int = Path(gt=0, lt=2**31), # 确保是正整数,不超过 int32 上限
db: AsyncSession = Depends(get_db_session),
):
...踩坑实录一:文件上传的安全漏洞
现象: 我们有个文件上传接口,本意只允许上传图片,但一个测试人员上传了一个 .php 文件,发现居然能访问,而且服务器执行了文件内容(这是远程代码执行漏洞)。
原因: 只检查了文件扩展名(filename.endswith('.jpg')),没有检查文件的实际内容类型,而且上传目录在 Web 可直接访问的路径下。
解法:
import magic # python-magic 库,读取真实的文件类型
from pathlib import Path
import uuid
import os
ALLOWED_MIME_TYPES = {'image/jpeg', 'image/png', 'image/gif', 'image/webp'}
MAX_FILE_SIZE = 10 * 1024 * 1024 # 10MB
@app.post("/upload")
async def upload_file(file: UploadFile, current_user: User = Depends(get_current_user)):
# 1. 检查文件大小
content = await file.read()
if len(content) > MAX_FILE_SIZE:
raise HTTPException(400, "文件超过10MB限制")
# 2. 检查真实的文件类型(不信任扩展名)
mime = magic.from_buffer(content, mime=True)
if mime not in ALLOWED_MIME_TYPES:
raise HTTPException(400, f"不支持的文件类型: {mime}")
# 3. 生成随机文件名,避免路径遍历攻击和覆盖攻击
ext = {'image/jpeg': 'jpg', 'image/png': 'png', 'image/gif': 'gif', 'image/webp': 'webp'}[mime]
safe_filename = f"{uuid.uuid4().hex}.{ext}"
# 4. 存储到非 Web 可访问目录(或对象存储)
# 不要存在 /var/www/uploads/,要存在 /data/uploads/ 或 S3
upload_path = Path("/data/uploads") / safe_filename
upload_path.write_bytes(content)
return {"filename": safe_filename, "size": len(content)}敏感数据处理
密码哈希:
from passlib.context import CryptContext
# bcrypt 是目前密码哈希的最佳选择
pwd_context = CryptContext(
schemes=["bcrypt"],
deprecated="auto",
bcrypt__rounds=12, # 迭代次数,越高越慢(安全),建议10-14
)
def hash_password(plain_password: str) -> str:
return pwd_context.hash(plain_password)
def verify_password(plain_password: str, hashed: str) -> bool:
return pwd_context.verify(plain_password, hashed)
# 绝对不要:
# 用 MD5/SHA1/SHA256 哈希密码(可被彩虹表攻击)
# 明文存储密码
# 用加密而不是哈希(加密可以解密,哈希不行)敏感信息脱敏:
import re
from typing import Any
def mask_phone(phone: str) -> str:
"""手机号脱敏:138****1234"""
if len(phone) == 11:
return phone[:3] + '****' + phone[-4:]
return '***'
def mask_email(email: str) -> str:
"""邮箱脱敏:zh***@example.com"""
parts = email.split('@')
if len(parts) != 2:
return '***'
name = parts[0]
masked_name = name[:2] + '***' if len(name) > 2 else '***'
return masked_name + '@' + parts[1]
def mask_id_card(id_card: str) -> str:
"""身份证脱敏:110***********1234"""
if len(id_card) == 18:
return id_card[:3] + '*' * 11 + id_card[-4:]
return '***'
# 日志脱敏中间件
class SensitiveDataFilter(logging.Filter):
PATTERNS = [
(re.compile(r'"password"\s*:\s*"[^"]*"'), '"password": "***"'),
(re.compile(r'"token"\s*:\s*"[^"]*"'), '"token": "***"'),
(re.compile(r'1[3-9]\d{9}'), lambda m: mask_phone(m.group())),
]
def filter(self, record):
msg = str(record.getMessage())
for pattern, replacement in self.PATTERNS:
if callable(replacement):
msg = pattern.sub(replacement, msg)
else:
msg = pattern.sub(replacement, msg)
record.msg = msg
record.args = ()
return True踩坑实录二:JWT token 泄露
现象: 某个接口在出错时,把完整的请求上下文打印到了日志里,其中包含了 Authorization: Bearer eyJhbG... 这个请求头。日志系统有日志查看权限的人(包括运维、部分业务人员)都能看到别人的 token。
原因: 异常处理太粗暴,直接 logger.error(f"Request failed: {request}"),把整个请求对象序列化了。
解法:
# 日志里永远不要打印:
# - Authorization header
# - Cookie
# - 请求体(可能包含密码)
# 安全的请求日志
def safe_log_request(request: Request) -> dict:
safe_headers = {
k: v for k, v in request.headers.items()
if k.lower() not in ('authorization', 'cookie', 'x-api-key')
}
return {
"method": request.method,
"path": request.url.path,
"safe_headers": safe_headers,
"client_ip": request.client.host if request.client else None,
}踩坑实录三:IDOR(对象直接引用漏洞)
现象: 一个测试用户发现,把请求 URL 里的 order_id=12345 改成 order_id=12346,能看到别人的订单详情。
原因: 接口只检查了用户是否登录,没有检查当前用户是否有权限访问这个特定订单。
解法:
@app.get("/orders/{order_id}")
async def get_order(
order_id: int,
current_user: User = Depends(get_current_user),
db: AsyncSession = Depends(get_db_session),
):
order = await db.get(Order, order_id)
if not order:
raise HTTPException(404, "订单不存在")
# 关键检查:这个订单是否属于当前用户?
if order.user_id != current_user.id:
# 注意:返回 404 而不是 403,避免暴露"这个订单存在但你没权限"的信息
raise HTTPException(404, "订单不存在")
return order对所有涉及资源归属的接口,都要做这个检查。这是最容易被遗漏、也最容易被利用的漏洞。
安全检查清单
| 风险 | 防御措施 |
|---|---|
| SQL 注入 | 参数化查询,禁止字符串拼接 SQL |
| XSS | 输出时 HTML 转义,Content-Security-Policy |
| IDOR | 每次资源访问都验证归属权 |
| 暴力破解 | 接口限流(rate limiting) |
| 文件上传 RCE | 检查 MIME 类型,随机文件名,不可执行目录 |
| 敏感信息泄露 | 日志脱敏,异常不返回内部细节 |
| 弱密码 | bcrypt 哈希,强度验证 |
| Token 泄露 | HTTPS,不在 URL 中传 token,短过期时间 |
安全不是一次性任务,是每次写代码时的习惯。最好的安全代码,是写的时候就想好了边界,而不是出了事故再来打补丁。
