Python 异步并发实战——asyncio + aiohttp + aiomysql 高并发服务构建
2026/4/30大约 6 分钟
Python 异步并发实战——asyncio + aiohttp + aiomysql 高并发服务构建
适读人群:需要构建高并发 Python 服务、处理大量并发 HTTP 请求或数据库操作的工程师 | 阅读时长:约 16 分钟 | 核心价值:掌握 aiohttp 和 aiomysql 的生产级用法,构建真正高吞吐的异步服务
一个爬虫任务让我见识了异步的威力
两年前有个项目,需要批量抓取 5000 个商品页面的价格数据,写入数据库。同事小谢用 Java 写了一个多线程爬虫,200 个线程跑,花了将近 20 分钟。
我用 Python asyncio + aiohttp 重写,100 个并发协程,8 分钟跑完。内存占用还只有 Java 版本的十分之一。
那之后,凡是 I/O 密集的批处理任务,我们组一律用 Python 异步。
这篇文章,我把构建高并发 Python 服务的完整方案整理出来,重点是 aiohttp(异步 HTTP)和 aiomysql(异步 MySQL),以及并发控制的工程实践。
一、aiohttp:异步 HTTP 客户端
1.1 安装
pip install aiohttp aiomysql asyncio1.2 基础用法
import asyncio
import aiohttp
async def fetch(session: aiohttp.ClientSession, url: str) -> dict:
async with session.get(url) as resp:
resp.raise_for_status()
return await resp.json()
async def main():
async with aiohttp.ClientSession() as session:
data = await fetch(session, "https://httpbin.org/get")
print(data)重要:不要为每个请求都创建新的 ClientSession
# 错误:每次请求创建 Session(性能极差,会泄漏 TCP 连接)
async def bad_fetch(url: str):
async with aiohttp.ClientSession() as session: # 每次都创建!
async with session.get(url) as resp:
return await resp.json()
# 正确:复用 Session(连接池)
session: aiohttp.ClientSession | None = None
async def get_session() -> aiohttp.ClientSession:
global session
if session is None or session.closed:
session = aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=30),
connector=aiohttp.TCPConnector(limit=100), # 最大连接数
)
return session1.3 并发请求控制
无限并发会打垮目标服务器或耗尽本地连接。用 asyncio.Semaphore 限制并发数:
import asyncio
import aiohttp
from typing import Any
async def fetch_with_semaphore(
session: aiohttp.ClientSession,
url: str,
semaphore: asyncio.Semaphore,
) -> dict[str, Any] | None:
async with semaphore: # 最多同时 N 个请求
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
if resp.status == 200:
return await resp.json()
print(f"HTTP {resp.status}: {url}")
return None
except asyncio.TimeoutError:
print(f"超时: {url}")
return None
except aiohttp.ClientError as e:
print(f"请求错误: {url} - {e}")
return None
async def batch_fetch(urls: list[str], max_concurrent: int = 50) -> list[dict]:
semaphore = asyncio.Semaphore(max_concurrent)
async with aiohttp.ClientSession(
connector=aiohttp.TCPConnector(limit=max_concurrent),
timeout=aiohttp.ClientTimeout(total=30),
) as session:
tasks = [
fetch_with_semaphore(session, url, semaphore)
for url in urls
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [r for r in results if r is not None and not isinstance(r, Exception)]二、aiomysql:异步 MySQL 操作
2.1 连接池配置
import aiomysql
from contextlib import asynccontextmanager
# 全局连接池
pool: aiomysql.Pool | None = None
async def init_pool(
host: str = "localhost",
port: int = 3306,
user: str = "root",
password: str = "",
db: str = "mydb",
maxsize: int = 20,
) -> aiomysql.Pool:
global pool
pool = await aiomysql.create_pool(
host=host,
port=port,
user=user,
password=password,
db=db,
maxsize=maxsize,
minsize=5,
autocommit=False, # 手动管理事务
charset="utf8mb4",
)
return pool
async def close_pool():
global pool
if pool:
pool.close()
await pool.wait_closed()
@asynccontextmanager
async def get_connection():
"""获取数据库连接的上下文管理器"""
async with pool.acquire() as conn:
yield conn2.2 基础 CRUD
async def insert_items(items: list[dict]) -> int:
"""批量插入"""
async with get_connection() as conn:
async with conn.cursor() as cursor:
sql = "INSERT INTO items (name, price, source_url) VALUES (%s, %s, %s)"
data = [(item["name"], item["price"], item["url"]) for item in items]
await cursor.executemany(sql, data)
await conn.commit()
return cursor.rowcount
async def get_items_by_category(category: str) -> list[dict]:
"""查询"""
async with get_connection() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
await cursor.execute(
"SELECT id, name, price FROM items WHERE category = %s ORDER BY id DESC",
(category,),
)
return await cursor.fetchall()
async def update_price(item_id: int, new_price: float) -> bool:
"""更新"""
async with get_connection() as conn:
async with conn.cursor() as cursor:
await cursor.execute(
"UPDATE items SET price = %s, updated_at = NOW() WHERE id = %s",
(new_price, item_id),
)
await conn.commit()
return cursor.rowcount > 0三、完整可运行示例:并发抓取 + 批量入库
#!/usr/bin/env python3
"""
asyncio + aiohttp + aiomysql 高并发爬虫示例
使用 httpbin.org 作为测试 API(避免依赖外部数据源)
"""
import asyncio
import time
from dataclasses import dataclass
from typing import Any
import aiohttp
# ===== 数据模型 =====
@dataclass
class FetchResult:
url: str
status: int
data: dict[str, Any] | None
error: str | None = None
elapsed_ms: float = 0.0
# ===== 抓取器 =====
class AsyncFetcher:
def __init__(self, max_concurrent: int = 20, timeout: int = 10):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.session: aiohttp.ClientSession | None = None
async def __aenter__(self):
self.session = aiohttp.ClientSession(
timeout=self.timeout,
connector=aiohttp.TCPConnector(limit=50),
headers={"User-Agent": "AsyncFetcher/1.0"},
)
return self
async def __aexit__(self, *args):
if self.session:
await self.session.close()
async def fetch_one(self, url: str) -> FetchResult:
start = time.perf_counter()
async with self.semaphore:
try:
async with self.session.get(url) as resp:
elapsed = (time.perf_counter() - start) * 1000
if resp.content_type == "application/json":
data = await resp.json()
else:
data = {"text": await resp.text()}
return FetchResult(url=url, status=resp.status, data=data, elapsed_ms=elapsed)
except asyncio.TimeoutError:
return FetchResult(url=url, status=0, data=None, error="超时")
except aiohttp.ClientError as e:
return FetchResult(url=url, status=0, data=None, error=str(e))
async def fetch_all(self, urls: list[str]) -> list[FetchResult]:
tasks = [asyncio.create_task(self.fetch_one(url)) for url in urls]
return await asyncio.gather(*tasks)
# ===== 模拟数据库写入 =====
class MockDatabase:
def __init__(self):
self.records: list[dict] = []
self._lock = asyncio.Lock()
async def bulk_insert(self, items: list[dict]) -> int:
await asyncio.sleep(0.01 * len(items) / 10) # 模拟批量写入延迟
async with self._lock:
self.records.extend(items)
return len(items)
async def get_count(self) -> int:
return len(self.records)
# ===== 主流程 =====
async def process_batch(
fetcher: AsyncFetcher,
db: MockDatabase,
urls: list[str],
batch_name: str,
) -> tuple[int, int]:
"""抓取一批 URL 并写入数据库,返回 (成功数, 失败数)"""
print(f"[{batch_name}] 开始抓取 {len(urls)} 个 URL")
start = time.perf_counter()
results = await fetcher.fetch_all(urls)
elapsed = time.perf_counter() - start
success = [r for r in results if r.error is None and r.status == 200]
failed = [r for r in results if r.error or r.status != 200]
# 批量写入成功的数据
if success:
records = [{"url": r.url, "status": r.status, "elapsed_ms": r.elapsed_ms} for r in success]
await db.bulk_insert(records)
print(f"[{batch_name}] 完成: 成功={len(success)}, 失败={len(failed)}, 耗时={elapsed:.2f}s")
return len(success), len(failed)
async def main():
# 生成测试 URL(httpbin.org 的不同端点)
base_urls = [
"https://httpbin.org/get",
"https://httpbin.org/ip",
"https://httpbin.org/headers",
"https://httpbin.org/user-agent",
]
urls = [f"{url}?index={i}" for i in range(20) for url in base_urls[:2]]
print(f"总计 {len(urls)} 个 URL 待抓取\n")
db = MockDatabase()
total_success = 0
total_failed = 0
start = time.perf_counter()
async with AsyncFetcher(max_concurrent=10) as fetcher:
# 分批处理(每批 10 个)
batch_size = 10
batches = [urls[i:i + batch_size] for i in range(0, len(urls), batch_size)]
for idx, batch in enumerate(batches):
success, failed = await process_batch(fetcher, db, batch, f"批次{idx+1}")
total_success += success
total_failed += failed
total_elapsed = time.perf_counter() - start
print(f"\n=== 最终统计 ===")
print(f"总成功: {total_success}")
print(f"总失败: {total_failed}")
print(f"数据库记录数: {await db.get_count()}")
print(f"总耗时: {total_elapsed:.2f}s")
print(f"平均 QPS: {total_success / total_elapsed:.1f}")
if __name__ == "__main__":
asyncio.run(main())四、踩坑实录 1:连接池耗尽导致请求挂起
# 问题:并发数超过连接池 maxsize,新请求永久等待
pool = await aiomysql.create_pool(maxsize=5) # 只有 5 个连接
async def query_with_bug():
async with pool.acquire() as conn:
# 在持有连接的情况下,调用另一个需要连接的函数
await another_query() # another_query 也在等连接 → 死锁!现象:程序卡死不动,没有任何报错。原因:连接池满了,新请求在等待释放,但持有连接的代码又在等新连接。解法:增大连接池、避免嵌套 acquire、或设置超时。
# 设置超时,避免无限等待
try:
async with asyncio.timeout(5.0):
async with pool.acquire() as conn:
...
except asyncio.TimeoutError:
raise RuntimeError("获取数据库连接超时")五、踩坑实录 2:aiohttp 响应体未读完就关闭连接
# 错误:没有读完响应体就退出 async with,可能导致连接不能复用
async with session.get(url) as resp:
if resp.status != 200:
return None # 提前退出,响应体没读!
# 正确:显式读取或用 raise_for_status
async with session.get(url) as resp:
resp.raise_for_status() # 非 200 直接抛异常,连接正常关闭
return await resp.json()六、踩坑实录 3:忘记设置 connector 最大连接数
# 问题:默认 TCPConnector 无限制,可能耗尽操作系统 socket 数
session = aiohttp.ClientSession() # 默认无限制
# 正确:显式限制最大连接数
connector = aiohttp.TCPConnector(
limit=100, # 总连接数上限
limit_per_host=30, # 对同一主机的连接上限
ttl_dns_cache=300, # DNS 缓存 5 分钟
)
session = aiohttp.ClientSession(connector=connector)总结
高并发 Python 服务的关键要点:
- aiohttp ClientSession 要复用,不要每次请求创建
- Semaphore 控制并发数,防止打垮目标服务器
- 连接池大小要合理设置,maxsize 和并发数要匹配
- 批量操作:数据库写入用 executemany,比逐条插入快 10 倍以上
- 超时必须设置:请求超时、连接获取超时都要有
