Python API 客户端设计实战——重试、限流、认证、SDK 封装最佳实践
Python API 客户端设计实战——重试、限流、认证、SDK 封装最佳实践
适读人群:需要集成第三方 API 的 Python 开发者、SDK 开发者 | 阅读时长:约16分钟 | 核心价值:掌握工程级 API 客户端的设计模式,告别脆弱的 requests 调用
一个因为没有重试机制而损失惨重的故事
去年我有个朋友小范,他们公司开发了一个对接支付平台的服务,由于支付平台接口偶尔会抽风(超时率约0.3%),他们的代码里没有重试机制——请求失败直接返回错误。
在日交易量1万笔的时候,0.3% 的失败率意味着30笔失败,用户投诉了但还在可接受范围。但公司做了一次推广活动,日交易量突增到10万笔,当天有300笔失败,客服接到了大量投诉,损失了不少用户。
更尴尬的是,这300笔失败里有很多是"支付平台处理成功,但返回超时"——用户钱扣了,订单没生成,造成了大量人工客服处理工作。
那次之后,他们重构了整个支付客户端,加了重试(幂等性检查)、超时控制、日志告警,再也没出过类似问题。
今天这篇,我就来系统讲讲工程级 API 客户端的设计。
一、最简单的 API 调用 vs 工程级封装
很多人调用 API 是这样的:
# 脆弱的写法
import requests
response = requests.get("https://api.example.com/users/123")
data = response.json()这种写法的问题:没有超时、没有重试、没有错误处理、没有日志。在生产环境跑一周就会出问题。
工程级的 API 客户端需要处理:
- 认证:Token 过期自动刷新
- 超时:每次请求都有合理的超时时间
- 重试:网络抖动或服务端 5xx 自动重试(幂等接口)
- 限流:遵守 API 的速率限制
- 日志:请求和响应的关键信息记录
- 错误处理:不同错误码有对应的处理策略
二、完整的 API 客户端实现
import time
import logging
import hashlib
import json
from typing import Any, Optional, Callable
from dataclasses import dataclass, field
from urllib.parse import urljoin
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
logger = logging.getLogger(__name__)
class APIError(Exception):
"""API 错误基类"""
def __init__(self, message: str, status_code: int = None, response_data: dict = None):
super().__init__(message)
self.status_code = status_code
self.response_data = response_data or {}
class AuthError(APIError):
"""认证错误"""
pass
class RateLimitError(APIError):
"""频率限制错误"""
def __init__(self, message: str, retry_after: int = 60):
super().__init__(message, status_code=429)
self.retry_after = retry_after
class ServerError(APIError):
"""服务端错误(5xx)"""
pass
@dataclass
class APIClientConfig:
"""客户端配置"""
base_url: str
api_key: str = ""
timeout: int = 30
max_retries: int = 3
retry_delay: float = 1.0
rate_limit_per_second: float = 10.0
verify_ssl: bool = True
headers: dict = field(default_factory=dict)
class TokenBucket:
"""令牌桶限速器"""
def __init__(self, rate: float):
self.rate = rate
self._tokens = rate
self._last_update = time.monotonic()
def acquire(self):
now = time.monotonic()
elapsed = now - self._last_update
self._tokens = min(self.rate, self._tokens + elapsed * self.rate)
self._last_update = now
if self._tokens >= 1:
self._tokens -= 1
return
wait_time = (1 - self._tokens) / self.rate
time.sleep(wait_time)
self._tokens = 0
class BaseAPIClient:
"""
工程级 API 客户端基类
子类继承后实现具体 API 方法
"""
def __init__(self, config: APIClientConfig):
self.config = config
self._rate_limiter = TokenBucket(config.rate_limit_per_second)
self._session = self._create_session()
def _create_session(self) -> requests.Session:
"""创建带重试的 Session"""
session = requests.Session()
# urllib3 内置重试(处理连接级别的失败)
retry_strategy = Retry(
total=self.config.max_retries,
status_forcelist=[502, 503, 504], # 这些状态码自动重试
backoff_factor=self.config.retry_delay,
respect_retry_after_header=True,
)
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=10,
pool_maxsize=20,
)
session.mount("https://", adapter)
session.mount("http://", adapter)
# 基础 headers
session.headers.update({
"User-Agent": "MyApp/1.0 Python SDK",
"Accept": "application/json",
"Content-Type": "application/json",
**self.config.headers,
})
return session
def _build_auth_headers(self) -> dict:
"""构建认证头(子类可覆盖)"""
if self.config.api_key:
return {"Authorization": f"Bearer {self.config.api_key}"}
return {}
def _request(
self,
method: str,
endpoint: str,
*,
params: dict = None,
data: dict = None,
json_data: dict = None,
extra_headers: dict = None,
idempotency_key: str = None,
) -> dict:
"""
核心请求方法
:param idempotency_key: 幂等性 Key,用于安全重试
"""
url = urljoin(self.config.base_url, endpoint)
headers = self._build_auth_headers()
if extra_headers:
headers.update(extra_headers)
if idempotency_key:
headers["Idempotency-Key"] = idempotency_key
# 速率限制
self._rate_limiter.acquire()
# 重试逻辑(业务层重试,和 urllib3 重试互补)
last_error = None
for attempt in range(self.config.max_retries + 1):
if attempt > 0:
wait = self.config.retry_delay * (2 ** (attempt - 1))
logger.info(f"重试 {attempt}/{self.config.max_retries},等待 {wait:.1f}s")
time.sleep(wait)
try:
req_id = hashlib.md5(
f"{method}{url}{time.time()}".encode()
).hexdigest()[:8]
logger.debug(f"[{req_id}] {method} {url}")
start = time.monotonic()
response = self._session.request(
method=method,
url=url,
params=params,
data=data,
json=json_data,
headers=headers,
timeout=self.config.timeout,
verify=self.config.verify_ssl,
)
elapsed = time.monotonic() - start
logger.info(
f"[{req_id}] {method} {url} -> {response.status_code} ({elapsed:.2f}s)"
)
return self._handle_response(response)
except requests.exceptions.Timeout:
last_error = APIError(f"请求超时: {url}")
logger.warning(f"超时 (attempt {attempt+1}): {url}")
# 超时可以重试(如果有幂等性保证)
if not idempotency_key:
raise last_error
except requests.exceptions.ConnectionError as e:
last_error = APIError(f"连接失败: {e}")
logger.warning(f"连接错误 (attempt {attempt+1}): {e}")
raise last_error or APIError("请求失败,已重试耗尽")
def _handle_response(self, response: requests.Response) -> dict:
"""统一处理响应"""
if response.status_code == 401:
raise AuthError("认证失败,请检查 API Key", status_code=401)
if response.status_code == 403:
raise AuthError("权限不足", status_code=403)
if response.status_code == 429:
retry_after = int(response.headers.get("Retry-After", 60))
raise RateLimitError(f"请求频率超限", retry_after=retry_after)
if response.status_code >= 500:
raise ServerError(
f"服务端错误 {response.status_code}",
status_code=response.status_code,
)
if not response.ok:
try:
error_data = response.json()
except Exception:
error_data = {"raw": response.text[:500]}
raise APIError(
f"请求失败 {response.status_code}",
status_code=response.status_code,
response_data=error_data,
)
if not response.content:
return {}
return response.json()
def get(self, endpoint: str, params: dict = None) -> dict:
return self._request("GET", endpoint, params=params)
def post(self, endpoint: str, data: dict = None, idempotency_key: str = None) -> dict:
return self._request("POST", endpoint, json_data=data, idempotency_key=idempotency_key)
def put(self, endpoint: str, data: dict = None) -> dict:
return self._request("PUT", endpoint, json_data=data)
def delete(self, endpoint: str) -> dict:
return self._request("DELETE", endpoint)
# 具体 SDK 实现示例
class PaymentClient(BaseAPIClient):
"""支付 API 客户端"""
def __init__(self, api_key: str, sandbox: bool = True):
base_url = (
"https://sandbox.payment.example.com/v1/"
if sandbox else
"https://api.payment.example.com/v1/"
)
config = APIClientConfig(
base_url=base_url,
api_key=api_key,
timeout=15,
max_retries=3,
rate_limit_per_second=20.0,
)
super().__init__(config)
def create_order(
self,
amount: int,
currency: str,
user_id: str,
order_id: str, # 业务侧订单号,用作幂等性 Key
) -> dict:
"""创建支付订单(幂等接口)"""
return self.post(
"/orders",
data={
"amount": amount,
"currency": currency,
"user_id": user_id,
"order_id": order_id,
},
idempotency_key=order_id, # 保证重试安全
)
def get_order(self, order_id: str) -> dict:
return self.get(f"/orders/{order_id}")
def refund(self, order_id: str, amount: int) -> dict:
return self.post(
f"/orders/{order_id}/refund",
data={"amount": amount},
idempotency_key=f"refund-{order_id}",
)
# 使用示例
client = PaymentClient(api_key="your-api-key", sandbox=True)
try:
order = client.create_order(
amount=9900, # 单位:分
currency="CNY",
user_id="user_123",
order_id="biz_order_456",
)
print(f"订单创建成功: {order}")
except AuthError:
logger.error("API Key 无效")
except RateLimitError as e:
logger.warning(f"频率限制,{e.retry_after}秒后重试")
except ServerError:
logger.error("支付平台服务异常")
except APIError as e:
logger.error(f"API 调用失败: {e}")三、踩坑实录
踩坑实录1:幂等性理解错误,重试导致重复扣款
现象:支付超时后重试,用户被扣了两次款。
原因:支付接口不是幂等的,但代码里无脑重试了 POST 请求。
解法:对于写操作(创建、扣款),必须先确认接口是否支持幂等性(通过 Idempotency-Key Header)。不支持的接口宁可不重试,让用户手动重试。
踩坑实录2:Token 过期没有自动刷新,一到整点就报 401
现象:每隔一段时间开始出现大量 401 错误,重启后恢复。
原因:Access Token 有效期2小时,程序启动时获取一次,2小时后就失效了,没有自动刷新。
解法:检测到 401 时,自动调用 refresh_token 重新获取,然后重试原请求。
踩坑实录3:忘记关闭 Session,进程长时间运行后连接泄漏
现象:服务运行几天后,网络连接数持续增长,最终 FD 耗尽。
原因:每次请求创建了新的 requests.Session 但没有关闭,连接池里的连接一直占用。
解法:使用单例 Session,并实现上下文管理器(__enter__/__exit__)确保关闭。
