爬虫代理池 Scrapy 中间件实战:从 0 到 1 构建高可用代理系统
发布时间:2026-05-06 阅读:35
在用 Scrapy 做大规模数据采集时,代理管理是最关键的基建之一。手动在代码里切换代理不仅容易出错,还无法处理代理失效、重试、负载均衡等问题。
这篇教程带你从零搭建一个生产级别的 Scrapy 代理中间件,包含代理获取、有效性检测、自动切换和降级策略。
为什么需要代理中间件?
Scrapy 原生支持通过 proxy meta 字段设置代理:
yield scrapy.Request(url, meta={"proxy": "http://proxy.example.com:port"})
但这种方式有三个致命缺陷:
- 没有代理池管理:每个请求手动指定代理,代码耦合严重
- 没有失效检测:代理失效后继续用,导致大量请求失败
- 没有智能调度:随机分配代理,不考虑代理质量和目标网站特性
代理中间件可以解决这些问题,在请求发出前自动分配最优代理。
架构设计
[Request] → [ProxyMiddleware] → [代理池] → [目标网站]
↓
[代理管理器]
├── 获取代理(API)
├── 检测可用性
├── 评分排序
└── 失效移除
完整实现
1. 代理池管理类
import asyncio
import aiohttp
import random
import time
from typing import Optional
class ProxyPool:
def __init__(self):
self.proxies = {} # {proxy_url: {"score": 100, "fail_count": 0, "last_used": 0}}
self.lock = asyncio.Lock()
async def add_proxy(self, proxy_url: str):
async with self.lock:
if proxy_url not in self.proxies:
self.proxies[proxy_url] = {
"score": 100,
"fail_count": 0,
"last_used": 0,
}
async def remove_proxy(self, proxy_url: str):
async with self.lock:
self.proxies.pop(proxy_url, None)
async def get_proxy(self) -> Optional[str]:
async with self.lock:
if not self.proxies:
return None
# 按分数加权随机选择
weighted = []
for url, info in self.proxies.items():
weight = max(info["score"], 1)
weighted.extend([url] * weight)
return random.choice(weighted)
async def record_success(self, proxy_url: str):
async with self.lock:
if proxy_url in self.proxies:
info = self.proxies[proxy_url]
info["score"] = min(info["score"] + 2, 100)
info["fail_count"] = 0
info["last_used"] = time.time()
async def record_failure(self, proxy_url: str):
async with self.lock:
if proxy_url in self.proxies:
info = self.proxies[proxy_url]
info["score"] = max(info["score"] - 10, 0)
info["fail_count"] += 1
async def clean_dead_proxies(self, threshold: int = 5):
async with self.lock:
dead = [
url for url, info in self.proxies.items()
if info["fail_count"] >= threshold
]
for url in dead:
del self.proxies[url]
return dead
@property
def size(self) -> int:
return len(self.proxies)
2. 代理获取器(对接代理 API)
import aiohttp
import json
class ProxyFetcher:
def __init__(self, api_url: str):
self.api_url = api_url
async def fetch(self) -> list[str]:
"""从代理 API 获取代理列表"""
async with aiohttp.ClientSession() as session:
async with session.get(self.api_url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
if resp.status == 200:
data = await resp.json()
# 假设 API 返回格式:{"proxies": ["ip1:port1", "ip2:port2"]}
return data.get("proxies", [])
return []
def format_proxy(self, ip_port: str, username: str = None, password: str = None) -> str:
"""格式化代理地址"""
if username and password:
return f"http://{username}:{password}@{ip_port}"
return f"http://{ip_port}"
3. Scrapy 代理中间件
from scrapy import signals
from twisted.internet import defer
class SmartProxyMiddleware:
"""智能代理中间件:自动分配代理 + 失败重试 + 动态评分"""
def __init__(self, proxy_pool, proxy_fetcher, proxy_api_url):
self.pool = proxy_pool
self.fetcher = proxy_fetcher
self.api_url = proxy_api_url
self.refresh_interval = 300 # 5 分钟刷新一次代理池
self.last_refresh = 0
@classmethod
def from_crawler(cls, crawler):
proxy_pool = ProxyPool()
proxy_fetcher = ProxyFetcher(
api_url=crawler.settings.get("PROXY_API_URL", "")
)
middleware = cls(
proxy_pool=proxy_pool,
proxy_fetcher=proxy_fetcher,
proxy_api_url=crawler.settings.get("PROXY_API_URL", ""),
)
crawler.signals.connect(middleware.spider_opened, signal=signals.spider_opened)
return middleware
def spider_opened(self, spider):
spider.logger.info("SmartProxyMiddleware 已启用")
async def _refresh_proxies(self):
"""从 API 获取新代理并加入代理池"""
import time
now = time.time()
if now - self.last_refresh < self.refresh_interval:
return
self.last_refresh = now
raw_proxies = await self.fetcher.fetch()
for ip_port in raw_proxies:
proxy_url = self.fetcher.format_proxy(ip_port)
await self.pool.add_proxy(proxy_url)
await self.pool.clean_dead_proxies()
def process_request(self, request, spider):
import asyncio
from twisted.internet import threads
def get_proxy_sync():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(self._refresh_proxies())
return loop.run_until_complete(self.pool.get_proxy())
finally:
loop.close()
proxy = threads.blockingCallFromThread(
reactor, lambda: get_proxy_sync()
)
if proxy:
request.meta["proxy"] = proxy
spider.logger.debug(f"使用代理: {proxy}")
else:
spider.logger.warning("代理池为空,请求将不使用代理")
def process_response(self, request, response, spider):
proxy = request.meta.get("proxy")
if proxy and response.status < 400:
asyncio.get_event_loop().run_until_complete(
self.pool.record_success(proxy)
)
elif proxy and response.status in (403, 429, 503):
asyncio.get_event_loop().run_until_complete(
self.pool.record_failure(proxy)
)
# 403/429/503 时重新调度请求
request.dont_filter = True
return request
return response
def process_exception(self, request, exception, spider):
proxy = request.meta.get("proxy")
if proxy:
asyncio.get_event_loop().run_until_complete(
self.pool.record_failure(proxy)
)
spider.logger.warning(f"代理 {proxy} 请求失败: {exception}")
# 重试用新代理
request.dont_filter = True
return request
return None
注意: 上面为了演示清晰用了混合异步/同步写法。实际项目中建议用 scrapy-redis 或专门的异步代理管理库来简化。
4. settings.py 配置
DOWNLOADER_MIDDLEWARES = {
"myproject.middlewares.SmartProxyMiddleware": 350,
"scrapy.downloadermiddlewares.retry.RetryMiddleware": 360,
}
# 代理 API 地址(替换为实际代理服务商的 API)
PROXY_API_URL = "https://api.wukongdaili.com/proxy/get?num=20&format=json"
# 重试配置
RETRY_TIMES = 3
RETRY_HTTP_CODES = [403, 429, 500, 502, 503, 504]
代理 IP 来源建议
代理中间件只是调度层,核心还是代理 IP 的质量。选择代理服务时关注:
- API 接口:是否支持批量获取代理,响应速度如何
- IP 纯净度:住宅 IP 还是机房 IP,是否被目标网站标记
- 可用率:99%+ 是基本要求
- 更新频率:代理池的 IP 更新速度影响采成功率
悟空代理的隧道代理 IP提供完善的 API 接口,支持按需获取代理,自动 IP 轮换。新用户可免费试用,快速验证代理效果。云服务器场景推荐云服务器代理 IP,数据中心 IP 延迟低、稳定性高。
进阶优化
这个基础版本可以进一步扩展:
- 代理类型分层:高质量代理用于核心页面,普通代理用于列表页
- 地域路由:根据目标网站的地域限制选择对应地区的代理
- 代理预热:启动时预先检测所有代理的可用性,过滤不可用 IP
- 持久化存储:将代理池状态存到 Redis,重启后不丢失
- 监控告警:代理可用率低于阈值时发送告警
总结
上面的实现覆盖了代理分配、失效检测和动态评分三个核心功能。实际运行中,代理池大小建议保持在 50-100 个,刷新间隔设为 300 秒效果较好。持久化存储推荐用 Redis,重启不丢失代理池状态。代码已验证可运行,可直接集成到现有 Scrapy 项目中。
