Python HTTP 爬虫优化——aiohttp 异步并发、代理池、速率控制
Python HTTP 爬虫优化——aiohttp 异步并发、代理池、速率控制
适读人群:有 requests 基础、想提升爬虫性能的 Python 开发者 | 阅读时长:约16分钟 | 核心价值:掌握异步爬虫全套工程化实践
那次让我刻骨铭心的性能事故
两年前,我接了一个数据采集项目,需要爬取约30万条商品数据。
我用 requests 写了个脚本,顺序请求,每条请求约1.2秒,简单算了一下:300,000 × 1.2 = 360,000秒 = 100小时。
客户给的deadline是两天内交付,也就是48小时。
当时我几乎崩溃,重写代码已经来不及,只能硬着头皮上多线程。用 ThreadPoolExecutor 开了50个线程,速度确实快多了,但跑了三个小时后,内存涨到了快6GB,机器开始频繁 GC,最后直接 OOM 崩掉。
这件事让我深刻认识到:Python 多线程爬虫有天然上限,IO 密集型任务的正确解法是异步(asyncio + aiohttp),不是多线程。
用 aiohttp 重写后,同样50个并发,内存只用了400MB,速度反而更快,最终20小时完成了30万条数据的采集。
一、aiohttp 基础——从 requests 迁移
最简单的对比
# requests 同步版本
import requests
def fetch(url: str) -> str:
response = requests.get(url, timeout=10)
return response.text
# aiohttp 异步版本
import aiohttp
import asyncio
async def fetch(url: str) -> str:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
return await resp.text()
asyncio.run(fetch("https://example.com"))为什么异步比多线程快
- 多线程:每个线程都有独立的栈空间(约1MB),50个线程 = 50MB 栈 + 频繁的线程切换开销
- 异步:所有任务共享一个线程,通过事件循环切换,切换开销极低,内存消耗极少
对于网络IO这种"大部分时间在等待"的场景,异步的优势非常明显。
二、高性能异步爬虫完整实现
import asyncio
import logging
import time
from dataclasses import dataclass, field
from typing import Optional
from urllib.parse import urlencode
import aiohttp
from aiohttp import ClientSession, TCPConnector
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s"
)
logger = logging.getLogger(__name__)
@dataclass
class CrawlConfig:
"""爬虫配置"""
concurrency: int = 20 # 并发数
timeout: int = 15 # 单请求超时(秒)
retry_times: int = 3 # 重试次数
retry_delay: float = 2.0 # 重试间隔(秒)
rate_limit: float = 0.1 # 最小请求间隔(秒),10 req/s
headers: dict = field(default_factory=lambda: {
"User-Agent": (
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/122.0.0.0 Safari/537.36"
),
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9",
})
class RateLimiter:
"""令牌桶限速器"""
def __init__(self, rate: float):
"""
:param rate: 每秒最大请求数
"""
self.rate = rate
self._tokens = rate
self._last_update = time.monotonic()
self._lock = asyncio.Lock()
async def acquire(self):
async with self._lock:
now = time.monotonic()
# 根据时间流逝补充令牌
elapsed = now - self._last_update
self._tokens = min(self.rate, self._tokens + elapsed * self.rate)
self._last_update = now
if self._tokens >= 1:
self._tokens -= 1
return
else:
# 等待直到有令牌
wait_time = (1 - self._tokens) / self.rate
await asyncio.sleep(wait_time)
self._tokens = 0
class AsyncCrawler:
"""高性能异步爬虫"""
def __init__(self, config: Optional[CrawlConfig] = None):
self.config = config or CrawlConfig()
self.rate_limiter = RateLimiter(rate=1.0 / self.config.rate_limit)
self._semaphore = asyncio.Semaphore(self.config.concurrency)
self._success_count = 0
self._fail_count = 0
async def _fetch_one(
self,
session: ClientSession,
url: str,
params: Optional[dict] = None,
proxy: Optional[str] = None,
) -> Optional[dict]:
"""单个 URL 的抓取(含重试)"""
timeout = aiohttp.ClientTimeout(total=self.config.timeout)
for attempt in range(self.config.retry_times):
await self.rate_limiter.acquire()
try:
async with session.get(
url,
params=params,
proxy=proxy,
timeout=timeout,
allow_redirects=True,
ssl=False,
) as response:
if response.status == 200:
self._success_count += 1
return {
"url": str(response.url),
"status": response.status,
"content": await response.text(errors="replace"),
"headers": dict(response.headers),
}
elif response.status in (429, 503):
# 频率限制,等待更长时间
wait = self.config.retry_delay * (attempt + 2)
logger.warning(
f"频率限制 {response.status},等待 {wait}s: {url}"
)
await asyncio.sleep(wait)
else:
logger.warning(
f"HTTP {response.status} (第{attempt+1}次): {url}"
)
if attempt < self.config.retry_times - 1:
await asyncio.sleep(self.config.retry_delay)
except asyncio.TimeoutError:
logger.warning(f"超时 (第{attempt+1}次): {url}")
if attempt < self.config.retry_times - 1:
await asyncio.sleep(self.config.retry_delay)
except aiohttp.ClientConnectionError as e:
logger.warning(f"连接错误 {e} (第{attempt+1}次): {url}")
if attempt < self.config.retry_times - 1:
await asyncio.sleep(self.config.retry_delay * (attempt + 1))
self._fail_count += 1
logger.error(f"最终失败: {url}")
return None
async def _fetch_with_sem(self, session, url, **kwargs):
"""带 Semaphore 限制的抓取"""
async with self._semaphore:
return await self._fetch_one(session, url, **kwargs)
async def batch_fetch(
self,
urls: list[str],
proxies: Optional[list[str]] = None,
) -> list[Optional[dict]]:
"""批量并发抓取"""
connector = TCPConnector(
limit=self.config.concurrency * 2, # 连接池大小
ttl_dns_cache=300, # DNS 缓存5分钟
ssl=False,
)
start_time = time.monotonic()
results = []
async with ClientSession(
connector=connector,
headers=self.config.headers,
) as session:
tasks = []
for i, url in enumerate(urls):
proxy = proxies[i % len(proxies)] if proxies else None
task = self._fetch_with_sem(session, url, proxy=proxy)
tasks.append(task)
# 使用 asyncio.gather 并发执行所有任务
results = await asyncio.gather(*tasks, return_exceptions=True)
elapsed = time.monotonic() - start_time
logger.info(
f"完成 {len(urls)} 个 URL | "
f"成功: {self._success_count} | "
f"失败: {self._fail_count} | "
f"耗时: {elapsed:.1f}s | "
f"速率: {len(urls)/elapsed:.1f} req/s"
)
return [r for r in results if isinstance(r, dict)]
# 使用示例
async def main():
config = CrawlConfig(
concurrency=15,
timeout=10,
retry_times=3,
rate_limit=0.05, # 20 req/s
)
crawler = AsyncCrawler(config)
urls = [f"https://httpbin.org/get?id={i}" for i in range(100)]
results = await crawler.batch_fetch(urls)
print(f"成功获取 {len(results)} 条数据")
asyncio.run(main())三、代理池集成
踩坑实录1:所有请求走同一个代理,代理很快被封
现象:买了100个代理IP,用了一天就全部被封了。
原因:没有轮换代理,所有请求集中打到少数几个代理上。
解法:实现代理池,自动轮换,并对失效代理进行标记和剔除。
import random
import asyncio
from typing import Optional
class ProxyPool:
"""代理池管理"""
def __init__(self, proxies: list[str]):
self._proxies = list(proxies)
self._failed: dict[str, int] = {} # 记录失败次数
self._lock = asyncio.Lock()
async def get(self) -> Optional[str]:
"""随机获取一个可用代理"""
async with self._lock:
available = [
p for p in self._proxies
if self._failed.get(p, 0) < 3 # 失败3次以上的不用
]
if not available:
# 全部代理都失效了,重置计数(可能是临时问题)
self._failed.clear()
available = self._proxies
return random.choice(available) if available else None
async def mark_failed(self, proxy: str):
"""标记代理失败"""
async with self._lock:
self._failed[proxy] = self._failed.get(proxy, 0) + 1
if self._failed[proxy] >= 3:
# 从池中移除
self._proxies = [p for p in self._proxies if p != proxy]
self._failed.pop(proxy, None)
print(f"代理已移除: {proxy},剩余 {len(self._proxies)} 个")
def size(self) -> int:
return len(self._proxies)
# 从免费代理源获取代理(示例)
async def fetch_proxy_list() -> list[str]:
"""从代理池服务获取代理列表"""
# 实际项目中对接你的代理服务商API
# 这里仅作示例
async with aiohttp.ClientSession() as session:
async with session.get(
"http://your-proxy-service/api/proxies",
timeout=aiohttp.ClientTimeout(total=5)
) as resp:
data = await resp.json()
return [f"http://{p['ip']}:{p['port']}" for p in data["proxies"]]四、踩坑实录
踩坑实录2:aiohttp Session 每次请求都新建,性能差
现象:代码能跑,但速度没有明显提升。
原因:在每个请求里都 async with aiohttp.ClientSession() 创建新 Session,没有复用连接池。
解法:Session 和 Connector 应该在整个批量任务里共享,只创建一次。
错误示例:
# 错误写法:每次请求都创建新Session,连接无法复用
async def bad_fetch(url):
async with aiohttp.ClientSession() as session: # 每次都新建,非常低效
async with session.get(url) as resp:
return await resp.text()踩坑实录3:asyncio.gather 中有一个异常就全部中断
现象:100个任务中有1个抛了异常,gather 立即中断,其他99个任务全部被取消。
原因:asyncio.gather 默认 return_exceptions=False,一个任务失败就整体失败。
解法:始终设置 return_exceptions=True,然后在结果里过滤异常对象。
results = await asyncio.gather(*tasks, return_exceptions=True)
# 过滤掉异常
ok_results = [r for r in results if not isinstance(r, Exception)]
errors = [r for r in results if isinstance(r, Exception)]
print(f"成功: {len(ok_results)}, 失败: {len(errors)}")五、性能调优参数参考
根据实际经验总结的调优参数建议:
| 目标网站类型 | 推荐并发数 | 请求间隔 | 超时时间 |
|---|---|---|---|
| 小型个人网站 | 2-5 | 2-3秒 | 15秒 |
| 中型企业网站 | 5-10 | 0.5-1秒 | 10秒 |
| 大型平台(有反爬) | 3-8 | 1-2秒(随机) | 10秒 |
| 公开API接口 | 10-30 | 按 rate limit | 5秒 |
| 内部系统 | 20-50 | 0.1秒 | 5秒 |
那次100小时变20小时的经历,是我真正理解异步编程价值的转折点。性能优化不是玄学,背后有清晰的技术原理。理解了原理,调参才不是瞎猜。
