Python 爬虫反爬实战进阶——指纹识别、行为模拟、TLS 指纹绕过
Python 爬虫反爬实战进阶——指纹识别、行为模拟、TLS 指纹绕过
适读人群:有爬虫基础经验、遇到反爬拦截不知如何应对的工程师 | 阅读时长:约14分钟 | 核心价值:理解现代反爬体系的运作方式,掌握对应的绕过技术和工程化实现
先说清楚使用场景:我们的爬虫全部用于自有数据采集和已授权的数据合作,不做任何违法违规的数据抓取。本文讲的技术也是爬虫工程师的基础能力,不涉及任何违法场景。
去年做了一个价格监控系统,需要定期采集几十个电商平台的商品价格数据。前三周一切顺利,第四周开始陆续被各平台封掉了:有的直接返回 403,有的返回假数据,有的让你填验证码,还有一个更狡猾的,一直返回 200,但内容是完全虚假的价格——你还不知道被发现了。
这次被封让我系统研究了现代反爬体系,学到很多。
现代反爬的四个层次
理解反爬,先理解它要识别什么:
第一层:IP 维度
- 同一 IP 请求频率异常
- 数据中心 IP(IDC IP)直接封
第二层:HTTP 特征
- User-Agent 不像真实浏览器
- 缺少某些必要的 headers(如
Accept-Language、Referer) - headers 顺序不对(真实浏览器 headers 有固定顺序)
第三层:TLS 指纹
- TLS 握手时的密码套件顺序、扩展顺序——这个在 Python requests 和真实浏览器之间是不一样的
- 这是很多人不了解的一层,也是 2023 年以后越来越常见的反爬手段
第四层:行为特征
- 页面停留时间(爬虫通常很短)
- 鼠标移动轨迹(无头浏览器没有自然的移动轨迹)
- 点击位置(爬虫点击坐标往往太精确)
- JavaScript 执行环境(有些特征只在真实浏览器里存在)
解决 TLS 指纹问题
这是很多人不了解的反爬手段,也是最近几年逐渐普及的技术。
TLS 握手时,客户端会发送 ClientHello 消息,里面包含:
- 支持的密码套件(cipher suites)
- 支持的 TLS 扩展及其顺序
- 椭圆曲线参数
Python 的 requests 库用的是操作系统的 SSL 库,密码套件顺序和真实 Chrome 是不一样的。网站可以通过分析 TLS 指纹来判断是爬虫还是真实浏览器。
# 使用 curl_cffi 模拟真实浏览器的 TLS 指纹
# pip install curl-cffi
from curl_cffi import requests as curl_requests
session = curl_requests.Session(impersonate="chrome120") # 模拟 Chrome 120 的 TLS 指纹
response = session.get(
"https://target-site.com",
headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xhtml+json;q=0.9,image/avif,image/webp,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "document",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "none",
"Sec-Fetch-User": "?1",
}
)curl_cffi 底层用的是 libcurl,可以完整模拟各种浏览器的 TLS 指纹,这是目前绕过 TLS 指纹检测最有效的方案。
踩坑实录一:被检测到的不是 IP,是 headers 顺序
现象: 换了IP代理,还是被封,但换成 Selenium 就能访问。
排查过程: 用了一个 TLS 指纹检测网站(howsmyssl.com 或 tls.peet.ws)对比了 Python requests、curl_cffi 和真实 Chrome 的指纹,发现差异。
然后看了被封时的请求 headers,发现了一个细节:Python requests 默认的 headers 顺序是:
Host, User-Agent, Accept-Encoding, Accept, Connection真实 Chrome 的顺序是:
Host, Connection, sec-ch-ua, sec-ch-ua-Mobile, sec-ch-ua-Platform, Upgrade-Insecure-Requests, User-Agent, Accept, ...反爬系统会检查 headers 顺序,如果顺序不像 Chrome,直接封。
解法: 用 curl_cffi 并指定 impersonate,它会自动设置正确的 headers 顺序和 TLS 指纹。
行为模拟:让爬虫"像人"
对于 JavaScript 渲染的页面,有时候必须用 Playwright 或 Puppeteer。但无头浏览器也有指纹,需要处理:
# pip install playwright
from playwright.async_api import async_playwright
import asyncio
import random
import time
async def human_like_delay(min_ms: int = 800, max_ms: int = 3000):
"""人类操作间隔,使用正态分布而不是均匀分布"""
mean = (min_ms + max_ms) / 2
std = (max_ms - min_ms) / 6
delay_ms = max(min_ms, min(max_ms, random.gauss(mean, std)))
await asyncio.sleep(delay_ms / 1000)
async def human_like_scroll(page, target_y: int):
"""模拟人类滚动行为:先快后慢,有随机停顿"""
current_y = await page.evaluate("window.scrollY")
remaining = target_y - current_y
steps = random.randint(5, 15)
for i in range(steps):
progress = (i + 1) / steps
# 缓动函数:先快后慢
eased = 1 - (1 - progress) ** 3
scroll_to = current_y + remaining * eased + random.randint(-20, 20)
await page.evaluate(f"window.scrollTo(0, {scroll_to})")
await asyncio.sleep(random.uniform(0.05, 0.15))
# 偶尔在中间停顿,像人在"读内容"
if random.random() < 0.2:
await asyncio.sleep(random.uniform(0.5, 2.0))
async def stealth_launch():
"""启动隐身浏览器,去掉自动化特征"""
playwright = await async_playwright().start()
browser = await playwright.chromium.launch(
headless=True,
args=[
"--disable-blink-features=AutomationControlled",
"--disable-infobars",
f"--window-size={random.randint(1200, 1920)},{random.randint(800, 1080)}",
],
)
context = await browser.new_context(
viewport={"width": random.randint(1200, 1920), "height": random.randint(800, 1080)},
user_agent="Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
locale="zh-CN",
timezone_id="Asia/Shanghai",
# 注入 WebGL 指纹噪声
extra_http_headers={
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
},
)
# 注入反检测 JavaScript
await context.add_init_script("""
// 覆盖 navigator.webdriver
Object.defineProperty(navigator, 'webdriver', {
get: () => undefined,
});
// 模拟 Chrome 插件
Object.defineProperty(navigator, 'plugins', {
get: () => [1, 2, 3, 4, 5],
});
// 修正 Chrome 自动化标志
window.chrome = {
runtime: {},
};
""")
return playwright, browser, context踩坑实录二:假数据陷阱
现象: 爬虫一直运行正常,但某天发现数据库里的价格全都是 9999.99 元,这显然不对,但爬虫日志显示 HTTP 200,没有任何报错。
原因: 被检测到后,对方返回了假数据而不是封锁。这是一种更高明的反爬策略:不告诉你"我知道你是爬虫",而是悄悄给你假的数据,让你以为自己在正常工作。
解法: 数据验证层,对采集到的数据做合理性检查:
from pydantic import BaseModel, field_validator
from typing import Optional
import statistics
class ProductPrice(BaseModel):
product_id: str
price: float
currency: str = "CNY"
@field_validator('price')
@classmethod
def validate_price(cls, v):
if v <= 0:
raise ValueError(f"Price must be positive, got {v}")
if v > 100000:
raise ValueError(f"Price seems unreasonably high: {v}")
return v
class PriceAnomalyDetector:
"""价格异常检测:识别假数据"""
def __init__(self, history_window: int = 30):
self.history: dict[str, list[float]] = {}
self.window = history_window
def check_price(self, product_id: str, new_price: float) -> bool:
"""返回 False 表示价格异常(可能是假数据)"""
history = self.history.get(product_id, [])
if len(history) >= 3:
mean = statistics.mean(history)
stdev = statistics.stdev(history) if len(history) > 1 else mean * 0.1
# 价格偏离历史均值超过 3 个标准差,可能是假数据
z_score = abs(new_price - mean) / (stdev + 0.01)
if z_score > 3:
return False # 异常
# 更新历史记录
history.append(new_price)
if len(history) > self.window:
history.pop(0)
self.history[product_id] = history
return True # 正常
detector = PriceAnomalyDetector()
async def scrape_price(product_id: str) -> Optional[float]:
# ... 爬取逻辑 ...
raw_price = extract_price_from_html(html)
if not detector.check_price(product_id, raw_price):
logger.warning(f"Anomalous price detected for {product_id}: {raw_price}, possibly honeypot data")
return None # 丢弃可疑数据
return raw_price踩坑实录三:IP 代理池的坑
现象: 买了一个代理池服务,里面有几千个 IP。但用了一天后,可用 IP 越来越少,到了第三天,几乎所有 IP 都被封了。
原因: 这个代理池里的 IP 是"共享代理",其他爬虫用户也在用同一批 IP,而且他们频率更高,先把这批 IP 的信誉值搞坏了,然后大家都用不了了。
解法:
- 用独享代理或者住宅代理(Residential IP),不和其他人共享
- 控制每个 IP 的使用频率,不要把一个 IP 用到被封才换
- 实现 IP 质量评分,自动淘汰低质量 IP
import time
from dataclasses import dataclass, field
from typing import Optional
import random
import asyncio
@dataclass
class ProxyInfo:
ip: str
port: int
success_count: int = 0
fail_count: int = 0
last_used: float = 0
banned_until: float = 0
@property
def success_rate(self) -> float:
total = self.success_count + self.fail_count
if total == 0:
return 1.0
return self.success_count / total
@property
def is_banned(self) -> bool:
return time.time() < self.banned_until
@property
def url(self) -> str:
return f"http://{self.ip}:{self.port}"
class SmartProxyPool:
def __init__(self, proxies: list[dict], min_interval: float = 2.0):
self.proxies = [ProxyInfo(p["ip"], p["port"]) for p in proxies]
self.min_interval = min_interval # 同一 IP 最少间隔时间
def get_best_proxy(self) -> Optional[ProxyInfo]:
"""选择最优代理:优先成功率高、最近没用过的"""
now = time.time()
available = [
p for p in self.proxies
if not p.is_banned and (now - p.last_used) >= self.min_interval
]
if not available:
return None
# 按综合评分排序
available.sort(key=lambda p: (p.success_rate, -(now - p.last_used)), reverse=True)
# 从 Top 5 里随机选,避免总用同一个
top = available[:min(5, len(available))]
return random.choice(top)
def mark_success(self, proxy: ProxyInfo):
proxy.success_count += 1
proxy.last_used = time.time()
def mark_failure(self, proxy: ProxyInfo, banned: bool = False):
proxy.fail_count += 1
proxy.last_used = time.time()
if banned:
# 封禁1小时
proxy.banned_until = time.time() + 3600工程化:一个完整的爬虫任务框架
import asyncio
from typing import AsyncIterator
class ScraperTask:
def __init__(
self,
proxy_pool: SmartProxyPool,
anomaly_detector: PriceAnomalyDetector,
max_concurrent: int = 10,
request_delay: tuple = (1.0, 3.0),
):
self.proxy_pool = proxy_pool
self.anomaly_detector = anomaly_detector
self.semaphore = asyncio.Semaphore(max_concurrent)
self.request_delay = request_delay
async def scrape_one(self, url: str, product_id: str) -> Optional[float]:
async with self.semaphore:
proxy = self.proxy_pool.get_best_proxy()
await asyncio.sleep(random.uniform(*self.request_delay))
try:
async with curl_requests.AsyncSession(impersonate="chrome120") as session:
response = await session.get(url, proxy=proxy.url if proxy else None)
price = extract_price(response.text)
if not self.anomaly_detector.check_price(product_id, price):
logger.warning(f"Suspicious price for {product_id}")
return None
if proxy:
self.proxy_pool.mark_success(proxy)
return price
except Exception as e:
if proxy:
is_banned = "403" in str(e) or "429" in str(e)
self.proxy_pool.mark_failure(proxy, banned=is_banned)
logger.error(f"Scrape failed for {url}: {e}")
return None
async def scrape_many(self, tasks: list[tuple]) -> dict:
"""批量爬取,返回 {product_id: price} 字典"""
coros = [self.scrape_one(url, pid) for url, pid in tasks]
results = await asyncio.gather(*coros, return_exceptions=True)
return {
pid: price
for (_, pid), price in zip(tasks, results)
if isinstance(price, float) and price is not None
}反爬是一场持续的"攻防博弈",技术在快速演进,没有一招永远管用的方案。理解对方在检测什么,针对性地绕过,这才是正确的思路。
