Python 异步最佳实践——常见异步 BUG、死锁、任务泄漏的诊断与预防
2026/4/30大约 7 分钟
Python 异步最佳实践——常见异步 BUG、死锁、任务泄漏的诊断与预防
适读人群:有 asyncio 使用经验、被异步 BUG 折磨过的 Python 工程师 | 阅读时长:约 15 分钟 | 核心价值:建立异步代码的诊断体系,彻底消灭任务泄漏和死锁
那个消失的协程
有一次生产环境出了怪事:一个批量发送通知的任务,日志显示启动了 1000 个协程,但只有 800 条通知发出去了,另外 200 个协程就像人间蒸发了一样,既没有成功日志,也没有报错日志。
我排查了两个小时,最后发现原因很简单:代码里有一行 asyncio.create_task(send_notification(...)),但是没有保存 task 的引用。Python 的垃圾回收发现这些 Task 对象没有被任何变量引用,直接回收了——Task 还没执行完就被销毁了。
这种 BUG 在测试环境很难复现,因为测试环境 GC 频率不一样。
这篇文章,就专门讲异步代码里那些诡异的 BUG:任务泄漏、死锁、竞态条件,以及如何诊断和预防。
一、任务泄漏:最隐蔽的 BUG
1.1 什么是任务泄漏
任务泄漏(Task Leak)是指创建了 Task 但没有等待它完成,导致任务被垃圾回收或在程序退出时没有干净地停止。
# 泄漏场景 1:没有保存 task 引用
async def bad_fire_and_forget():
asyncio.create_task(some_work()) # Task 对象没有引用!
# 如果 GC 在 some_work 完成前运行,任务会被取消
# 泄漏场景 2:gather 没有 await
async def bad_gather():
asyncio.gather(task1(), task2()) # gather 返回的 coroutine 没有 await!Python 3.10+ 会在这种情况下打印警告:
Task was destroyed but it is pending!1.2 正确的"fire and forget"
# 方案1:保存 task 引用集合(推荐)
_background_tasks: set[asyncio.Task] = set()
def create_task_safe(coro) -> asyncio.Task:
"""创建后台任务,防止被 GC 回收"""
task = asyncio.create_task(coro)
_background_tasks.add(task)
# 完成后自动从集合移除
task.add_done_callback(_background_tasks.discard)
return task
# 方案2:在应用关闭时等待所有后台任务
async def shutdown():
if _background_tasks:
await asyncio.gather(*_background_tasks, return_exceptions=True)二、死锁:协程版本的经典问题
2.1 Semaphore 死锁
# 错误:在持有 Semaphore 的情况下,调用另一个也需要相同 Semaphore 的函数
sem = asyncio.Semaphore(1)
async def outer():
async with sem:
await inner() # inner 也要获取 sem → 死锁!
async def inner():
async with sem: # 永远等不到,因为 outer 持有着
print("inner 执行")
# 解法:重构,避免嵌套获取同一个 Semaphore
async def inner_no_sem():
print("inner 执行")
async def outer_fixed():
async with sem:
await inner_no_sem()2.2 Lock 死锁
# 错误:两个协程互相等待对方释放锁
lock_a = asyncio.Lock()
lock_b = asyncio.Lock()
async def coroutine_1():
async with lock_a:
await asyncio.sleep(0.1)
async with lock_b: # 等待 lock_b
print("coroutine_1 完成")
async def coroutine_2():
async with lock_b:
await asyncio.sleep(0.1)
async with lock_a: # 等待 lock_a → 死锁!
print("coroutine_2 完成")
# 解法:按固定顺序获取锁
async def coroutine_1_fixed():
async with lock_a:
async with lock_b: # 总是先 a 后 b
print("coroutine_1 完成")
async def coroutine_2_fixed():
async with lock_a: # 也是先 a 后 b
async with lock_b:
print("coroutine_2 完成")2.3 连接池死锁
# 错误:持有数据库连接的协程,发起另一个需要数据库连接的请求
# 如果连接池只有 1 个连接,就会死锁
async def process_user(user_id: int, pool):
async with pool.acquire() as conn: # 获取连接 1
user_data = await fetch_user(conn, user_id)
# 在持有连接的情况下调用另一个也需要连接的函数
await update_stats(pool, user_id) # 等待连接 1 释放 → 死锁!
async def update_stats(pool, user_id: int):
async with pool.acquire() as conn: # 等待空闲连接
...三、竞态条件
# 错误:非原子的读-改-写操作
counter = 0
async def increment():
global counter
current = counter # 读
await asyncio.sleep(0) # 在这里让出控制权,其他协程也可以读到同样的值
counter = current + 1 # 写
async def main():
await asyncio.gather(*[increment() for _ in range(100)])
print(f"counter = {counter}") # 期望 100,实际可能更小
# 正确:使用 Lock
counter = 0
lock = asyncio.Lock()
async def increment_safe():
global counter
async with lock:
counter += 1
async def main_safe():
await asyncio.gather(*[increment_safe() for _ in range(100)])
print(f"counter = {counter}") # 保证是 100四、完整可运行示例:异步 BUG 诊断工具
#!/usr/bin/env python3
"""
异步 BUG 诊断与演示
展示常见异步 BUG 的现象、原因和修复方案
"""
import asyncio
import sys
import traceback
import weakref
from typing import Any
# ===== 工具 1:任务追踪器 =====
class TaskTracker:
"""追踪所有后台任务,防止泄漏"""
def __init__(self):
self._tasks: set[asyncio.Task] = set()
self._completed = 0
self._failed = 0
def create_task(self, coro, name: str | None = None) -> asyncio.Task:
task = asyncio.create_task(coro, name=name)
self._tasks.add(task)
task.add_done_callback(self._on_done)
return task
def _on_done(self, task: asyncio.Task):
self._tasks.discard(task)
if task.cancelled():
pass
elif task.exception():
self._failed += 1
print(f"[TaskTracker] 任务失败: {task.get_name()} - {task.exception()}")
else:
self._completed += 1
async def wait_all(self, timeout: float = 30.0):
"""等待所有后台任务完成"""
if not self._tasks:
return
await asyncio.wait(list(self._tasks), timeout=timeout)
def stats(self) -> dict:
return {
"pending": len(self._tasks),
"completed": self._completed,
"failed": self._failed,
}
# ===== 工具 2:死锁检测(超时机制)=====
async def with_timeout(coro, timeout: float = 5.0, name: str = "operation"):
"""包装异步操作,超时抛出有意义的错误"""
try:
return await asyncio.wait_for(coro, timeout=timeout)
except asyncio.TimeoutError:
raise asyncio.TimeoutError(f"操作 '{name}' 超时 ({timeout}s),可能发生了死锁")
# ===== 演示 1:任务泄漏 vs 正确写法 =====
async def demo_task_leak():
print("\n=== 演示1:任务泄漏 vs 正确追踪 ===")
completed_without_tracker = []
completed_with_tracker = []
async def work(i: int, results: list):
await asyncio.sleep(0.1)
results.append(i)
tracker = TaskTracker()
# 错误方式:不保存引用(模拟泄漏)
for i in range(5):
asyncio.create_task(work(i, completed_without_tracker))
# 正确方式:通过 tracker 创建
for i in range(5):
tracker.create_task(work(i, completed_with_tracker), name=f"work-{i}")
await tracker.wait_all()
await asyncio.sleep(0.2) # 给没有追踪的任务一点时间
print(f"无追踪完成数: {len(completed_without_tracker)} (可能少于5)")
print(f"有追踪完成数: {len(completed_with_tracker)} (保证等于5)")
print(f"追踪器统计: {tracker.stats()}")
# ===== 演示 2:竞态条件 vs 加锁 =====
async def demo_race_condition():
print("\n=== 演示2:竞态条件 vs Lock =====")
# 无锁(有竞态)
counter_unsafe = 0
async def increment_unsafe():
nonlocal counter_unsafe
val = counter_unsafe
await asyncio.sleep(0) # 让出控制权,制造竞态
counter_unsafe = val + 1
await asyncio.gather(*[increment_unsafe() for _ in range(20)])
print(f"无锁计数器: {counter_unsafe} (期望 20,实际可能更小)")
# 有锁(安全)
counter_safe = 0
lock = asyncio.Lock()
async def increment_safe():
nonlocal counter_safe
async with lock:
val = counter_safe
await asyncio.sleep(0)
counter_safe = val + 1
await asyncio.gather(*[increment_safe() for _ in range(20)])
print(f"有锁计数器: {counter_safe} (保证等于 20)")
# ===== 演示 3:超时保护 =====
async def demo_timeout_protection():
print("\n=== 演示3:超时保护防死锁 ===")
async def potentially_stuck():
"""模拟可能卡住的操作"""
await asyncio.sleep(10) # 模拟很长时间
try:
await with_timeout(potentially_stuck(), timeout=0.5, name="潜在死锁操作")
except asyncio.TimeoutError as e:
print(f"捕获到超时: {e}")
print("程序没有被卡死!")
# ===== 演示 4:gather 异常不丢失 =====
async def demo_gather_exceptions():
print("\n=== 演示4:gather 异常处理 ===")
async def may_fail(i: int):
await asyncio.sleep(0.05)
if i % 2 == 0:
raise ValueError(f"任务 {i} 失败")
return f"任务 {i} 成功"
results = await asyncio.gather(
*[may_fail(i) for i in range(6)],
return_exceptions=True,
)
for i, r in enumerate(results):
if isinstance(r, Exception):
print(f" 任务{i}: 异常 - {r}")
else:
print(f" 任务{i}: {r}")
async def main():
# 启用异步调试模式(开发环境)
asyncio.get_event_loop().set_debug(True)
await demo_task_leak()
await demo_race_condition()
await demo_timeout_protection()
await demo_gather_exceptions()
print("\n所有演示完成!")
if __name__ == "__main__":
asyncio.run(main(), debug=True)五、踩坑实录 1:exception 没有被捕获,静默失败
# 危险:后台 Task 里的异常被吞掉
async def risky_work():
raise ValueError("出错了!")
async def main():
task = asyncio.create_task(risky_work())
await asyncio.sleep(1)
# task 已经失败,但程序没有任何提示!
# 正确:为 Task 添加异常回调
def handle_task_error(task: asyncio.Task):
if not task.cancelled() and task.exception():
print(f"后台任务异常: {task.exception()}")
task = asyncio.create_task(risky_work())
task.add_done_callback(handle_task_error)六、踩坑实录 2:asyncio.run 嵌套调用
# 错误:在已有事件循环的情况下调用 asyncio.run
async def outer():
asyncio.run(inner()) # RuntimeError: This event loop is already running
# 常见于:在 Jupyter notebook 或已有异步框架的环境中
# 解法:用 asyncio.create_task 或直接 await
async def outer_fixed():
await inner()
# 或
task = asyncio.create_task(inner())
await task七、踩坑实录 3:忘记关闭异步资源
# 错误:aiohttp Session 没有关闭,导致资源泄漏和 unclosed connector 警告
session = aiohttp.ClientSession()
# ... 使用 session ...
# 程序结束,session 没有 close()
# 正确:始终用 async with
async with aiohttp.ClientSession() as session:
...
# 或者在 lifespan 里统一管理
@asynccontextmanager
async def lifespan(app):
app.state.session = aiohttp.ClientSession()
yield
await app.state.session.close()总结
异步 BUG 的预防清单:
- Task 必须有引用:要么 await,要么放入 set,要么用 TaskTracker
- Lock 按固定顺序获取:防止循环等待
- 连接池大小要大于最大嵌套深度
- 所有后台 Task 加 done_callback 记录异常
- 给所有等待操作设置超时:
asyncio.wait_for - 开发环境开启 asyncio debug 模式:
asyncio.run(main(), debug=True)
