Python 网络爬虫进阶实战——Scrapy 框架、动态渲染、反爬虫对抗
Python 网络爬虫进阶实战——Scrapy 框架、动态渲染、反爬虫对抗
适读人群:有基础爬虫经验的 Python 开发者 | 阅读时长:约18分钟 | 核心价值:从玩具爬虫到工程级爬虫的系统升级
那个被封号的夜晚
去年年底,我接到一个老朋友小陈的电话,他在一家电商公司做数据分析,声音有点慌:"张哥,我的爬虫上线三天就被封了,老板明天要数据,我完了。"
我问他用的什么方案,他说就是 requests + BeautifulSoup,一个 for 循环,没有任何限速,IP 直接裸奔。我心想,这不被封才奇怪。
小陈不是技术差,他是典型的"能跑就行"心态——学了爬虫基础,写了个脚本,能抓到数据就以为万事大吉。结果在生产环境撞了南墙。
那天晚上我们花了两个小时,把他的爬虫重构成 Scrapy 框架,加上代理池、随机 UA、请求限速,第二天数据就抓回来了。他后来专门发了条朋友圈感谢我,说这是他职业生涯"最值得的两小时"。
这件事让我意识到,很多人学爬虫都停留在 requests 阶段,知道怎么发请求、怎么解析 HTML,但真正到了工程化场景——需要爬几十万条数据、需要应对反爬机制、需要稳定跑在服务器上——就完全不够用了。
今天这篇文章,我就来系统讲讲工程级爬虫的三个核心模块:Scrapy 框架使用、动态页面渲染、反爬虫对抗策略。不是教科书,是我和无数小陈们踩坑总结出来的实战经验。
一、为什么要用 Scrapy?
很多人问我:requests 不就够用了吗?为什么要学 Scrapy?
直接说结论:当你的爬虫需要同时处理 50+ 并发、需要自动重试、需要持久化、需要随时暂停恢复,requests 就撑不住了。
Scrapy 的核心优势在于它是一个异步事件驱动的爬虫框架,内置了:
- Scheduler(调度器):管理请求队列,支持去重
- Downloader(下载器):并发下载,内置重试逻辑
- Spider(爬虫逻辑):你的核心业务代码
- Item Pipeline(数据管道):数据清洗和持久化
- Middleware(中间件):请求/响应拦截,是反爬虫对抗的主战场
快速搭建一个 Scrapy 项目
pip install scrapy
scrapy startproject product_crawler
cd product_crawler
scrapy genspider product_spider example.com生成的目录结构:
product_crawler/
├── scrapy.cfg
└── product_crawler/
├── settings.py
├── items.py
├── pipelines.py
├── middlewares.py
└── spiders/
└── product_spider.py一个完整的商品爬虫示例
# spiders/product_spider.py
import scrapy
from product_crawler.items import ProductItem
class ProductSpider(scrapy.Spider):
name = "product_spider"
allowed_domains = ["books.toscrape.com"]
start_urls = ["https://books.toscrape.com/catalogue/page-1.html"]
custom_settings = {
"DOWNLOAD_DELAY": 1.5, # 请求间隔1.5秒
"RANDOMIZE_DOWNLOAD_DELAY": True, # 随机化延迟
"CONCURRENT_REQUESTS": 4, # 并发数
"RETRY_TIMES": 3, # 重试次数
}
def parse(self, response):
# 提取商品列表
for article in response.css("article.product_pod"):
item = ProductItem()
item["title"] = article.css("h3 a::attr(title)").get()
item["price"] = article.css("p.price_color::text").get()
item["rating"] = article.css("p.star-rating::attr(class)").get()
item["url"] = response.urljoin(
article.css("h3 a::attr(href)").get()
)
yield item
# 翻页逻辑
next_page = response.css("li.next a::attr(href)").get()
if next_page:
yield response.follow(next_page, callback=self.parse)
def parse_detail(self, response):
"""商品详情页解析"""
yield {
"description": response.css(
"#product_description + p::text"
).get(),
"upc": response.css(
"table.table tr:nth-child(1) td::text"
).get(),
}
# items.py
import scrapy
class ProductItem(scrapy.Item):
title = scrapy.Field()
price = scrapy.Field()
rating = scrapy.Field()
url = scrapy.Field()
description = scrapy.Field()
# pipelines.py
import json
from itemadapter import ItemAdapter
class JsonWriterPipeline:
def open_spider(self, spider):
self.file = open("products.jsonl", "w", encoding="utf-8")
def close_spider(self, spider):
self.file.close()
def process_item(self, item, spider):
line = json.dumps(ItemAdapter(item).asdict(), ensure_ascii=False)
self.file.write(line + "\n")
return item
# settings.py 关键配置
ITEM_PIPELINES = {
"product_crawler.pipelines.JsonWriterPipeline": 300,
}二、动态渲染——JavaScript 页面的应对方案
踩坑实录1:抓到的是"骨架页"
有一次我帮一个朋友抓某招聘网站的职位列表,用 requests 请求回来的 HTML 里根本没有职位数据,只有一堆 <div id="root"></div> 的空壳子。
现象:response.text 里没有目标数据,只有 JavaScript 标签。
原因:页面是 React/Vue 等前端框架渲染的,数据通过 Ajax 动态加载。
解法:两种路线——1)找 XHR 接口直接请求;2)用浏览器渲染。
路线1:抓接口(首选)
打开 Chrome DevTools → Network → XHR,找到数据接口,直接用 requests 调。这是最优雅的方案,速度快、资源消耗低。
路线2:Scrapy-Playwright 集成
当接口有加密签名、没法直接复现时,才用真实浏览器渲染。
pip install scrapy-playwright
playwright install chromium# settings.py
DOWNLOAD_HANDLERS = {
"http": "scrapy_playwright.handler.ScrapyPlaywrightDownloadHandler",
"https": "scrapy_playwright.handler.ScrapyPlaywrightDownloadHandler",
}
TWISTED_REACTOR = "twisted.internet.asyncioreactor.AsyncioSelectorReactor"
# spider.py
import scrapy
from scrapy_playwright.page import PageMethod
class DynamicSpider(scrapy.Spider):
name = "dynamic_spider"
def start_requests(self):
yield scrapy.Request(
url="https://example.com/jobs",
meta={
"playwright": True,
"playwright_include_page": True,
"playwright_page_methods": [
# 等待关键元素出现,最多等10秒
PageMethod(
"wait_for_selector",
".job-item",
timeout=10000
),
# 模拟滚动到底部
PageMethod(
"evaluate",
"window.scrollTo(0, document.body.scrollHeight)"
),
PageMethod("wait_for_timeout", 2000),
],
},
callback=self.parse,
)
async def parse(self, response, **kwargs):
page = response.meta.get("playwright_page")
if page:
await page.close()
for job in response.css(".job-item"):
yield {
"title": job.css(".job-title::text").get(),
"company": job.css(".company-name::text").get(),
"salary": job.css(".salary::text").get(),
}三、反爬虫对抗——中间件是主战场
踩坑实录2:User-Agent 太假被秒封
有一次爬某资讯网站,用的默认 Scrapy User-Agent(Scrapy/2.x (+https://scrapy.org)),请求了不到100次就开始返回 403。
现象:短时间内收到大量 403 响应。
原因:服务器检测到非浏览器 UA,直接拦截。
解法:自定义下载中间件,随机轮换真实浏览器 UA。
# middlewares.py
import random
from scrapy import signals
from scrapy.http import HtmlResponse
# 真实浏览器 UA 池
USER_AGENTS = [
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 "
"(KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:123.0) "
"Gecko/20100101 Firefox/123.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 14_3_1) AppleWebKit/605.1.15 "
"(KHTML, like Gecko) Version/17.3.1 Safari/605.1.15",
]
class RandomUserAgentMiddleware:
def process_request(self, request, spider):
request.headers["User-Agent"] = random.choice(USER_AGENTS)
class ProxyMiddleware:
"""代理池中间件"""
def __init__(self, proxy_pool_url):
self.proxy_pool_url = proxy_pool_url
@classmethod
def from_crawler(cls, crawler):
return cls(
proxy_pool_url=crawler.settings.get(
"PROXY_POOL_URL", "http://127.0.0.1:5555/random"
)
)
def process_request(self, request, spider):
import requests as req
try:
proxy = req.get(self.proxy_pool_url, timeout=3).text.strip()
request.meta["proxy"] = f"http://{proxy}"
except Exception:
spider.logger.warning("获取代理失败,使用直连")
def process_response(self, request, response, spider):
# 如果返回验证码页面,标记这个代理为失效
if response.status in [403, 429] or "captcha" in response.url:
spider.logger.warning(
f"代理可能失效,状态码: {response.status}"
)
return response
# settings.py 开启中间件
DOWNLOADER_MIDDLEWARES = {
"product_crawler.middlewares.RandomUserAgentMiddleware": 400,
"product_crawler.middlewares.ProxyMiddleware": 350,
"scrapy.downloadermiddlewares.useragent.UserAgentMiddleware": None,
}踩坑实录3:请求频率没控好,IP 被封24小时
某次抓一个政府数据网站,DOWNLOAD_DELAY 设了1秒,但忘了关 AUTOTHROTTLE,Scrapy 自动把并发调到了16,实际请求频率远超限制,IP 被封了整整一天。
现象:爬取进行到一半,全部请求开始返回 503。
原因:AutoThrottle 开启后会动态调整并发,不受 DOWNLOAD_DELAY 单独约束。
解法:精确控制 AutoThrottle 参数,或者直接关闭 AutoThrottle 手动管理。
# settings.py 精细化频率控制
DOWNLOAD_DELAY = 2.0
RANDOMIZE_DOWNLOAD_DELAY = True # 实际延迟在 1~3 秒随机
CONCURRENT_REQUESTS = 2 # 并发数控制在2
CONCURRENT_REQUESTS_PER_DOMAIN = 2
# AutoThrottle 精细配置(如果要用的话)
AUTOTHROTTLE_ENABLED = True
AUTOTHROTTLE_START_DELAY = 2.0 # 初始延迟
AUTOTHROTTLE_MAX_DELAY = 10.0 # 最大延迟
AUTOTHROTTLE_TARGET_CONCURRENCY = 2.0 # 目标并发数
# 设置 Cookies 模拟登录
COOKIES_ENABLED = True
DEFAULT_REQUEST_HEADERS = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Connection": "keep-alive",
"Upgrade-Insecure-Requests": "1",
}四、分布式爬虫——Scrapy-Redis 方案
当单机爬虫速度不够用,需要多台机器协同时,Scrapy-Redis 是标准方案:
pip install scrapy-redis# settings.py 分布式配置
SCHEDULER = "scrapy_redis.scheduler.Scheduler"
DUPEFILTER_CLASS = "scrapy_redis.dupefilter.RFPDupeFilter"
REDIS_URL = "redis://your-redis-host:6379"
SCHEDULER_PERSIST = True # 爬虫重启后继续上次进度
# spider.py 继承 RedisSpider
from scrapy_redis.spiders import RedisSpider
class DistributedSpider(RedisSpider):
name = "distributed_spider"
redis_key = "distributed_spider:start_urls" # 从 Redis 取 URL
def parse(self, response):
# 正常解析逻辑
for item in response.css(".item"):
yield {"title": item.css("::text").get()}
# 子页面 URL 会自动进入 Redis 队列,其他节点可以消费
for url in response.css("a::attr(href)").getall():
yield response.follow(url, callback=self.parse)启动多个爬虫节点:
# 节点1
scrapy crawl distributed_spider
# 节点2(另一台机器)
scrapy crawl distributed_spider
# 向 Redis 推送起始 URL
redis-cli lpush distributed_spider:start_urls "https://example.com/page-1"五、数据持久化——Pipeline 最佳实践
# pipelines.py 完整示例
import pymongo
from itemadapter import ItemAdapter
class MongoPipeline:
"""MongoDB 持久化管道"""
collection_name = "products"
def __init__(self, mongo_uri, mongo_db):
self.mongo_uri = mongo_uri
self.mongo_db = mongo_db
@classmethod
def from_crawler(cls, crawler):
return cls(
mongo_uri=crawler.settings.get("MONGO_URI", "mongodb://localhost:27017"),
mongo_db=crawler.settings.get("MONGO_DATABASE", "scrapy_db"),
)
def open_spider(self, spider):
self.client = pymongo.MongoClient(self.mongo_uri)
self.db = self.client[self.mongo_db]
def close_spider(self, spider):
self.client.close()
def process_item(self, item, spider):
adapter = ItemAdapter(item)
# 使用 upsert 避免重复插入
self.db[self.collection_name].update_one(
{"url": adapter["url"]},
{"$set": adapter.asdict()},
upsert=True,
)
return item
class DataCleanPipeline:
"""数据清洗管道,放在 MongoDB 之前"""
def process_item(self, item, spider):
adapter = ItemAdapter(item)
# 清洗价格
if adapter.get("price"):
price_str = adapter["price"].replace("£", "").replace("Â", "").strip()
try:
adapter["price"] = float(price_str)
except ValueError:
adapter["price"] = None
# 清洗评分
if adapter.get("rating"):
rating_map = {
"star-rating One": 1,
"star-rating Two": 2,
"star-rating Three": 3,
"star-rating Four": 4,
"star-rating Five": 5,
}
adapter["rating"] = rating_map.get(adapter["rating"], 0)
return item六、技术选型建议
用了这几年爬虫,我的选型原则很简单:
| 场景 | 推荐方案 |
|---|---|
| 静态页面、量小(< 1万) | requests + BeautifulSoup |
| 静态页面、量大(> 1万) | Scrapy |
| 动态页面,有接口可抓 | Scrapy + 直接调接口 |
| 动态页面,接口加密 | Scrapy + Playwright |
| 多机分布式爬取 | Scrapy-Redis |
| 需要登录态保持 | Playwright(Session 管理更方便) |
一个重要原则:永远先找接口,找不到再考虑浏览器渲染。接口方案的速度和稳定性比浏览器渲染好一个数量级。
七、合规提醒
爬虫是工具,合法使用是前提。几个必须注意的点:
- 检查
robots.txt,遵守 Disallow 规则 - 不爬取个人隐私数据(姓名、手机号、身份证等)
- 控制请求频率,不给目标服务器造成压力
- 数据仅用于个人研究/分析,不商业化销售
法律意识这块,国内已经有多起爬虫相关案例,别走进雷区。
写这篇的时候,我特地又联系了一下小陈,他现在已经从"能跑就行"进化到了自己搭 Scrapy-Redis 分布式集群,还给公司建了个数据中台。进步是一点点来的,关键是要遇到真实问题再去系统学。
遇到具体的爬虫问题,欢迎评论区留言,我尽量一一回复。
