Python 异步框架选型——asyncio vs trio vs anyio,我的选择和理由
Python 异步框架选型——asyncio vs trio vs anyio,我的选择和理由
适读人群:准备用 Python 写异步服务的开发者、在 asyncio 里踩过坑想了解替代品的工程师 | 阅读时长:约13分钟 | 核心价值:不是抄文档,是我真正用过这三个框架后的真实感受和选择逻辑
去年我在写一个需要大量并发 HTTP 请求的数据采集服务时,认真对比研究了 asyncio、trio 和 anyio。最后的结论是选 asyncio + anyio,但这个过程让我对 Python 异步的设计哲学有了很多新的理解。
今天把这些想法完整写出来。
先说我的立场
我的选择:
新项目:asyncio + anyio
已有 asyncio 代码:原地不动
纯绿地、对正确性要求极高的项目:考虑 trio
接下来解释为什么。
asyncio:Python 原生,但有很多历史包袱
asyncio 是 Python 标准库的一部分,从 Python 3.4 开始加入,到 3.11 已经相当成熟。大部分 Python 异步生态(FastAPI、httpx、SQLAlchemy async、Redis async 等)都以 asyncio 为基础。
asyncio 的主要问题:
一、取消(Cancellation)语义混乱
import asyncio
async def fetch_data():
try:
await asyncio.sleep(10) # 模拟长时间 I/O
return "data"
except asyncio.CancelledError:
# 这里可以选择不重新 raise,从而"吞掉"取消请求
print("Cancelled, but I'll ignore it")
return "fake data" # 这是允许的,但会让调用方困惑
async def main():
task = asyncio.create_task(fetch_data())
await asyncio.sleep(1)
task.cancel()
try:
result = await task
print(f"Got: {result}") # 可能打印 "Got: fake data"
except asyncio.CancelledError:
print("Task was cancelled")在 asyncio 里,CancelledError 可以被吞掉,这会导致取消语义不可靠。你以为取消了,但任务可能还在跑。
二、结构化并发不够强制
# asyncio 里很容易"飞出去"的后台任务
async def bad_pattern():
task = asyncio.create_task(background_work()) # 创建了任务但没等它
return "done" # 函数返回了,但 background_work 还在跑
# 如果 background_work 里有异常,可能被静默吞掉trio:结构化并发的最佳实践
trio 是由 Nathaniel J. Smith 开发的第三方异步库,核心思想是结构化并发(Structured Concurrency)。
trio 的核心设计:Nursery(托儿所)
import trio
async def fetch_url(url: str) -> str:
async with trio.open_tcp_stream(host, port) as stream:
# ...
return response
async def main():
results = {}
async with trio.open_nursery() as nursery:
# 在 nursery 里启动的所有任务
# nursery 不退出,除非所有任务都完成(或者有任务出错)
nursery.start_soon(fetch_url, "https://api1.example.com")
nursery.start_soon(fetch_url, "https://api2.example.com")
# 走到这里,nursery 里的所有任务都完成了
# 如果任何一个任务出了异常,nursery 会取消其他任务,然后把异常传播出来trio 的取消语义是严格的:
# trio 里的 Cancelled 不能被吞掉
async def fetch_with_timeout():
with trio.move_on_after(5) as cancel_scope:
result = await do_slow_io()
if cancel_scope.cancelled_caught:
# 能走到这里说明超时了,result 没拿到
return None
return resulttrio 的优点:
- 取消语义清晰,不会"吞掉"取消
- 结构化并发强制你思考任务的生命周期
- 更容易写出正确的并发代码
- 更好的错误传播(ExceptionGroup)
trio 的缺点:
- 生态远不如 asyncio,很多库不支持
- 和 asyncio 不兼容,混用很麻烦
- 社区相对小
踩坑实录一:asyncio 任务泄漏
现象: 服务跑了一段时间后,内存一直涨,重启后恢复正常。用 tracemalloc 排查,发现有大量 Task 对象没有被回收。
原因:
# 有问题的代码(实际排查出来的)
async def process_request(request_data):
# 创建了后台任务但没有存引用
asyncio.create_task(log_to_analytics(request_data)) # ← 这里
return await do_main_processing(request_data)
# log_to_analytics 里如果出了异常,异常会被吞掉,还会报一个
# "Task exception was never retrieved" 的警告create_task 创建的任务如果没有被 await 或者存起来,任务完成后如果有异常,异常会被静默吞掉,只打印一个警告。更严重的是,如果任务在某种情况下一直不完成,它对应的 Task 对象会一直活着。
解法:
# 方式一:保存引用,在合适的地方等待
background_tasks: set = set()
async def process_request(request_data):
task = asyncio.create_task(log_to_analytics(request_data))
background_tasks.add(task)
task.add_done_callback(background_tasks.discard) # 完成后自动从集合移除
return await do_main_processing(request_data)
# 方式二:用 anyio TaskGroup(更好)
async def process_request(request_data):
async with anyio.create_task_group() as tg:
tg.start_soon(log_to_analytics, request_data)
result = await do_main_processing(request_data)
return resultanyio:两全其美的方案
anyio 是一个兼容层,让你能用统一的 API 写异步代码,底层可以切换 asyncio 或 trio。
anyio 的 TaskGroup(来自 trio 的结构化并发思想):
import anyio
async def fetch_multiple_apis():
results = {}
async def fetch_and_store(key: str, url: str):
async with anyio.from_thread.run_sync(get_url, url):
pass
# 简化的例子
results[key] = await fetch_url(url)
async with anyio.create_task_group() as tg:
tg.start_soon(fetch_and_store, "api1", "https://api1.example.com")
tg.start_soon(fetch_and_store, "api2", "https://api2.example.com")
tg.start_soon(fetch_and_store, "api3", "https://api3.example.com")
# 走到这里,三个任务都完成了
# 如果有任何一个抛异常,其余任务会被取消,异常会传播
return resultsanyio 的超时(cancel scope):
import anyio
async def fetch_with_timeout(url: str, timeout: float = 5.0):
with anyio.move_on_after(timeout) as scope:
result = await fetch_url(url)
return result
if scope.cancelled_caught:
# 超时了
return None我为什么选 anyio:
- 可以运行在 asyncio 后端(兼容现有生态)
- TaskGroup 语义比
asyncio.gather更清晰、更安全 - 取消语义更可靠
- FastAPI、Starlette、httpx 都已经用了 anyio
- 如果未来需要切到 trio,只需要改一行
anyio.run(main, backend='trio')
踩坑实录二:asyncio.gather 的坑
import asyncio
async def main():
# gather 的默认行为:一个任务出错,其他任务仍然继续
results = await asyncio.gather(
task1(),
task2(), # 假设这个抛了异常
task3(),
return_exceptions=False, # 默认值
)
# task2 抛了 ValueError,但 task1 和 task3 继续运行
# gather 会等它们都完成,然后把 ValueError 传播出来
# 但如果 task1/task3 里申请了资源,可能没有被正确清理用 anyio TaskGroup 替代:
import anyio
async def main():
async with anyio.create_task_group() as tg:
tg.start_soon(task1)
tg.start_soon(task2) # 如果这个抛了异常
tg.start_soon(task3)
# task2 一抛异常,tg 立刻取消 task1 和 task3
# 取消是结构化的,task1/task3 的清理代码(finally 块)会被执行
# 整个 with 块抛出异常(Python 3.11+ 的 ExceptionGroup)踩坑实录三:在 asyncio 里同步代码阻塞事件循环
现象: async 服务里有个接口,调用了一个读取大文件的同步函数,结果整个服务在这段时间内无法响应任何其他请求。
原因: 同步阻塞调用直接在事件循环里运行,阻塞了整个事件循环,所有协程都挂起了。
解法: 用线程池执行同步阻塞操作:
import anyio
import asyncio
from functools import partial
def sync_read_file(path: str) -> bytes:
"""同步的文件读取(阻塞操作)"""
with open(path, 'rb') as f:
return f.read()
async def async_read_file(path: str) -> bytes:
"""把同步操作放到线程池,不阻塞事件循环"""
return await anyio.to_thread.run_sync(sync_read_file, path)
# 或者用 asyncio 原生方式
async def async_read_file_v2(path: str) -> bytes:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, sync_read_file, path)总结:怎么选
新项目,想写出高质量的并发代码:
→ anyio(底层 asyncio),用 TaskGroup 替代 create_task
新项目,团队对异步不熟悉,追求正确性优先:
→ trio,结构化并发会强制你写对
已有 asyncio 代码,跑得还可以:
→ 保持不动,按需引入 anyio 的 TaskGroup
追求最大生态兼容性,要接各种第三方库:
→ 纯 asyncio,anyio 兼容 asyncio 所以也可以Python 的异步生态在这几年成熟了很多,但底层的一些设计缺陷(特别是取消语义)会一直存在,因为向后兼容的压力。了解这些坑,比换框架更重要。
