Python asyncio 深度解析——事件循环、协程、Task、Future 核心原理
Python asyncio 深度解析——事件循环、协程、Task、Future 核心原理
适读人群:想真正搞懂 Python 异步机制的工程师,不只是会用 async/await | 阅读时长:约 18 分钟 | 核心价值:从原理层面理解 asyncio,能解释异步代码的行为,能排查异步 BUG
那个"为什么我的代码没有并发"的困惑
刚接触 asyncio 的时候,我犯过一个很蠢的错误。我写了这样的代码:
import asyncio
async def task1():
print("task1 开始")
await asyncio.sleep(1)
print("task1 结束")
async def task2():
print("task2 开始")
await asyncio.sleep(1)
print("task2 结束")
async def main():
await task1()
await task2()
asyncio.run(main())我以为这样写就会并发执行,但实际运行结果是 task1 完全跑完之后 task2 才开始,总共花了 2 秒,和同步代码没有区别。
我当时很困惑:我不是用了 async/await 吗?
后来我搞清楚了——await 不等于"并发执行",它只是"等待这个操作完成,期间可以切换到别的任务"。要真正并发,你需要用 asyncio.create_task 或 asyncio.gather。
这个误解,我发现几乎每一个从同步语言转过来的工程师都会有。这篇文章,我们从事件循环的底层开始,把 asyncio 彻底讲清楚。
一、事件循环:asyncio 的心脏
1.1 什么是事件循环
事件循环(Event Loop)是 asyncio 的核心调度器。它的工作非常简单,用伪代码描述就是:
while 还有任务或等待的 I/O:
检查有没有 I/O 事件就绪(select/epoll/kqueue)
如果有,唤醒等待这些 I/O 的协程
运行所有准备好的协程,直到它们主动让出控制权(await)
处理定时器关键词是"主动让出"——协程不会被强制中断(不像操作系统线程),它必须自己执行 await 语句,才会把控制权交还给事件循环。
这和 Java 的线程模型有根本区别:
| 维度 | Java 线程 | Python 协程 |
|---|---|---|
| 调度方式 | 操作系统抢占式调度 | 协程主动让出(协作式) |
| 切换时机 | 随时(时间片耗尽) | 只在 await 处 |
| 内存开销 | ~1MB/线程 | ~几 KB/协程 |
| 并发数量 | 通常几百个 | 可达数万个 |
| 真并行 | 多核并行 | 单线程(受 GIL) |
1.2 启动事件循环
import asyncio
# 方式一:最常用(Python 3.7+)
asyncio.run(main())
# 方式二:手动管理(高级场景)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
finally:
loop.close()
# 获取当前事件循环(在协程内部)
async def some_coroutine():
loop = asyncio.get_event_loop()
print(f"当前线程的事件循环: {loop}")二、协程(Coroutine):可暂停的函数
2.1 协程的本质
async def 定义的函数不是普通函数——调用它不会直接执行,而是返回一个协程对象:
async def greet(name: str) -> str:
await asyncio.sleep(0.1)
return f"你好,{name}"
# 错误理解:调用 async 函数直接执行
result = greet("老张") # 返回的是协程对象,不是字符串!
print(type(result)) # <class 'coroutine'>
print(result) # <coroutine object greet at 0x...>
# RuntimeWarning: coroutine 'greet' was never awaited
# 正确:必须 await 才能执行
async def main():
result = await greet("老张")
print(result) # 你好,老张2.2 协程的内部状态
协程本质上是一个状态机,有三个状态:
import inspect
async def simple():
await asyncio.sleep(0.1)
return 42
coro = simple()
print(inspect.getcoroutinestate(coro)) # CORO_CREATED
# 开始运行后
async def main():
task = asyncio.create_task(simple())
# task 在运行中...
await asyncio.sleep(0) # 让出控制权三、Task:并发的关键
3.1 Task vs 直接 await
理解这个区别是理解 asyncio 并发的关键:
import asyncio
import time
async def work(name: str, seconds: float) -> str:
print(f"{name} 开始")
await asyncio.sleep(seconds)
print(f"{name} 完成")
return f"{name} 结果"
# ===== 串行:总时间 = 各任务时间之和 =====
async def serial():
start = time.perf_counter()
await work("任务A", 1.0) # 等 A 完成
await work("任务B", 1.0) # 再等 B 完成
print(f"串行总耗时: {time.perf_counter() - start:.2f}s") # ~2.0s
# ===== 并发:总时间 = 最长任务时间 =====
async def concurrent():
start = time.perf_counter()
# create_task 立即调度,不等待
task_a = asyncio.create_task(work("任务A", 1.0))
task_b = asyncio.create_task(work("任务B", 1.0))
# 等待两个任务都完成
result_a = await task_a
result_b = await task_b
print(f"并发总耗时: {time.perf_counter() - start:.2f}s") # ~1.0s
asyncio.run(concurrent())3.2 asyncio.gather:批量并发
async def fetch_data(url: str) -> dict:
"""模拟异步 HTTP 请求"""
await asyncio.sleep(0.5) # 模拟网络延迟
return {"url": url, "data": f"来自 {url} 的数据"}
async def main():
urls = [
"https://api1.example.com",
"https://api2.example.com",
"https://api3.example.com",
]
# 并发请求所有 URL,等待全部完成
results = await asyncio.gather(*[fetch_data(url) for url in urls])
# results 是有序列表,对应 urls 的顺序
for r in results:
print(r)
# 允许部分失败
results_with_errors = await asyncio.gather(
*[fetch_data(url) for url in urls],
return_exceptions=True, # 异常作为结果返回,不抛出
)3.3 asyncio.wait vs gather
import asyncio
async def main():
tasks = [asyncio.create_task(work(f"任务{i}", i * 0.1)) for i in range(5)]
# gather:等待全部完成,返回结果列表
results = await asyncio.gather(*tasks)
# wait:更灵活,可以设置超时、等待第一个完成
done, pending = await asyncio.wait(tasks, timeout=0.3)
print(f"超时内完成: {len(done)},未完成: {len(pending)}")
# 取消未完成的任务
for task in pending:
task.cancel()
# 等待第一个完成
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)四、Future:低级原语
Future 是 asyncio 的底层异步原语,Task 是 Future 的子类。一般不需要直接操作 Future,但理解它有助于看懂源码。
async def main():
loop = asyncio.get_event_loop()
future = loop.create_future()
async def set_result():
await asyncio.sleep(1)
future.set_result("异步结果")
asyncio.create_task(set_result())
# 等待 future 有结果
result = await future
print(result) # 异步结果五、完整可运行示例:理解事件循环调度
#!/usr/bin/env python3
"""
asyncio 事件循环、协程、Task、Future 完整演示
"""
import asyncio
import time
from typing import Any
# ===== 演示 1:协程的串行 vs 并发 =====
async def demo_serial_vs_concurrent():
print("\n=== 演示1:串行 vs 并发 ===")
async def slow_task(name: str, delay: float) -> str:
await asyncio.sleep(delay)
return f"{name}完成"
# 串行
start = time.perf_counter()
r1 = await slow_task("A", 0.3)
r2 = await slow_task("B", 0.3)
serial_time = time.perf_counter() - start
print(f"串行: {r1}, {r2}, 耗时: {serial_time:.2f}s")
# 并发
start = time.perf_counter()
r1, r2 = await asyncio.gather(
slow_task("C", 0.3),
slow_task("D", 0.3),
)
concurrent_time = time.perf_counter() - start
print(f"并发: {r1}, {r2}, 耗时: {concurrent_time:.2f}s")
print(f"并发比串行快: {serial_time / concurrent_time:.1f}倍")
# ===== 演示 2:Task 生命周期 =====
async def demo_task_lifecycle():
print("\n=== 演示2:Task 生命周期 ===")
async def worker(name: str) -> str:
print(f" {name} 开始执行")
await asyncio.sleep(0.2)
print(f" {name} 执行完毕")
return f"{name}的结果"
task = asyncio.create_task(worker("工人甲"), name="worker-1")
print(f"Task 状态(创建后): done={task.done()}, cancelled={task.cancelled()}")
await asyncio.sleep(0) # 让出控制权,让 task 开始执行
print(f"Task 状态(执行中): done={task.done()}")
result = await task
print(f"Task 状态(完成后): done={task.done()}, result={result}")
# ===== 演示 3:gather 并发 + 异常处理 =====
async def demo_gather_with_errors():
print("\n=== 演示3:gather 异常处理 ===")
async def risky_task(i: int) -> str:
await asyncio.sleep(0.1)
if i == 2:
raise ValueError(f"任务 {i} 出错了")
return f"任务 {i} 成功"
results = await asyncio.gather(
*[risky_task(i) for i in range(5)],
return_exceptions=True,
)
for i, r in enumerate(results):
if isinstance(r, Exception):
print(f" 任务 {i}: 异常 - {r}")
else:
print(f" 任务 {i}: {r}")
# ===== 演示 4:asyncio.wait 超时控制 =====
async def demo_wait_with_timeout():
print("\n=== 演示4:wait 超时控制 ===")
async def variable_task(name: str, delay: float) -> str:
await asyncio.sleep(delay)
return f"{name}完成"
tasks = [
asyncio.create_task(variable_task(f"任务{i}", i * 0.2))
for i in range(1, 6)
]
done, pending = await asyncio.wait(tasks, timeout=0.5)
print(f"0.5s 内完成: {len(done)} 个,未完成: {len(pending)} 个")
for t in pending:
t.cancel()
# 等待取消操作完成(避免 ResourceWarning)
await asyncio.gather(*pending, return_exceptions=True)
print(f"已取消 {len(pending)} 个未完成任务")
# ===== 演示 5:Future 手动控制 =====
async def demo_future():
print("\n=== 演示5:Future 手动控制 ===")
future: asyncio.Future[str] = asyncio.get_event_loop().create_future()
async def resolver():
await asyncio.sleep(0.3)
future.set_result("Future 被解决了!")
asyncio.create_task(resolver())
result = await future
print(f"Future 结果: {result}")
async def main():
await demo_serial_vs_concurrent()
await demo_task_lifecycle()
await demo_gather_with_errors()
await demo_wait_with_timeout()
await demo_future()
print("\n所有演示完成!")
if __name__ == "__main__":
asyncio.run(main())六、踩坑实录 1:忘记 await Task 导致任务泄漏
# 错误:create_task 之后忘记 await,任务在后台悄悄运行
async def main():
task = asyncio.create_task(some_work())
# 忘记 await task
return "done" # main 退出,task 可能还没完成
# RuntimeWarning: Enable tracemalloc to get the object allocation traceback
# 正确:保存 task 引用并 await
async def main():
task = asyncio.create_task(some_work())
result = await task
return result
# 如果是"fire and forget",使用 shield 或在 lifespan 里管理任务集合
background_tasks: set[asyncio.Task] = set()
def create_background_task(coro):
task = asyncio.create_task(coro)
background_tasks.add(task)
task.add_done_callback(background_tasks.discard) # 完成后从集合移除
return task七、踩坑实录 2:在同步代码中调用异步函数
# 错误:同步函数里无法直接 await
def sync_function():
result = await some_async_func() # SyntaxError
# 正确方案1:将同步函数改为异步
async def async_function():
result = await some_async_func()
# 正确方案2:在没有事件循环的情况下运行协程
def sync_function():
result = asyncio.run(some_async_func()) # 创建新事件循环并运行
# 正确方案3:如果已有事件循环(比如在 FastAPI 里)
def sync_in_async_context():
loop = asyncio.get_event_loop()
future = asyncio.run_coroutine_threadsafe(some_async_func(), loop)
return future.result(timeout=5)八、踩坑实录 3:在协程里用 time.sleep 阻塞整个循环
# 错误:time.sleep 是同步的,阻塞整个事件循环
async def bad_delay():
import time
time.sleep(1) # 整个程序卡死 1 秒!
# 正确:用 asyncio.sleep
async def good_delay():
await asyncio.sleep(1) # 释放控制权,其他协程可以运行总结
asyncio 核心概念一图理清:
- 事件循环:调度器,轮询 I/O 事件,驱动所有协程
- 协程(coroutine):
async def函数的返回值,可暂停的执行单元 - Task:被调度到事件循环的协程,用
create_task创建,实现真正的并发 - Future:代表异步操作结果的低级原语,
Task继承自它 - await:暂停当前协程,把控制权交给事件循环
记住这条规则:所有 I/O 用 await,所有并发用 create_task 或 gather,所有阻塞调用放线程池。
