爬虫代理池 Scrapy 中间件实战:从 0 到 1 构建高可用代理系统

发布时间:2026-05-06  阅读:35

在用 Scrapy 做大规模数据采集时,代理管理是最关键的基建之一。手动在代码里切换代理不仅容易出错,还无法处理代理失效、重试、负载均衡等问题。

这篇教程带你从零搭建一个生产级别的 Scrapy 代理中间件,包含代理获取、有效性检测、自动切换和降级策略。

为什么需要代理中间件?

Scrapy 原生支持通过 proxy meta 字段设置代理:

yield scrapy.Request(url, meta={"proxy": "http://proxy.example.com:port"})

但这种方式有三个致命缺陷:

  1. 没有代理池管理:每个请求手动指定代理,代码耦合严重
  2. 没有失效检测:代理失效后继续用,导致大量请求失败
  3. 没有智能调度:随机分配代理,不考虑代理质量和目标网站特性

代理中间件可以解决这些问题,在请求发出前自动分配最优代理。

架构设计

[Request] → [ProxyMiddleware] → [代理池] → [目标网站]
                                   ↓
                            [代理管理器]
                            ├── 获取代理(API)
                            ├── 检测可用性
                            ├── 评分排序
                            └── 失效移除

完整实现

1. 代理池管理类

import asyncio
import aiohttp
import random
import time
from typing import Optional

class ProxyPool:
    def __init__(self):
        self.proxies = {}  # {proxy_url: {"score": 100, "fail_count": 0, "last_used": 0}}
        self.lock = asyncio.Lock()

    async def add_proxy(self, proxy_url: str):
        async with self.lock:
            if proxy_url not in self.proxies:
                self.proxies[proxy_url] = {
                    "score": 100,
                    "fail_count": 0,
                    "last_used": 0,
                }

    async def remove_proxy(self, proxy_url: str):
        async with self.lock:
            self.proxies.pop(proxy_url, None)

    async def get_proxy(self) -> Optional[str]:
        async with self.lock:
            if not self.proxies:
                return None
            # 按分数加权随机选择
            weighted = []
            for url, info in self.proxies.items():
                weight = max(info["score"], 1)
                weighted.extend([url] * weight)
            return random.choice(weighted)

    async def record_success(self, proxy_url: str):
        async with self.lock:
            if proxy_url in self.proxies:
                info = self.proxies[proxy_url]
                info["score"] = min(info["score"] + 2, 100)
                info["fail_count"] = 0
                info["last_used"] = time.time()

    async def record_failure(self, proxy_url: str):
        async with self.lock:
            if proxy_url in self.proxies:
                info = self.proxies[proxy_url]
                info["score"] = max(info["score"] - 10, 0)
                info["fail_count"] += 1

    async def clean_dead_proxies(self, threshold: int = 5):
        async with self.lock:
            dead = [
                url for url, info in self.proxies.items()
                if info["fail_count"] >= threshold
            ]
            for url in dead:
                del self.proxies[url]
            return dead

    @property
    def size(self) -> int:
        return len(self.proxies)

2. 代理获取器(对接代理 API)

import aiohttp
import json

class ProxyFetcher:
    def __init__(self, api_url: str):
        self.api_url = api_url

    async def fetch(self) -> list[str]:
        """从代理 API 获取代理列表"""
        async with aiohttp.ClientSession() as session:
            async with session.get(self.api_url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
                if resp.status == 200:
                    data = await resp.json()
                    # 假设 API 返回格式:{"proxies": ["ip1:port1", "ip2:port2"]}
                    return data.get("proxies", [])
                return []

    def format_proxy(self, ip_port: str, username: str = None, password: str = None) -> str:
        """格式化代理地址"""
        if username and password:
            return f"http://{username}:{password}@{ip_port}"
        return f"http://{ip_port}"

3. Scrapy 代理中间件

from scrapy import signals
from twisted.internet import defer

class SmartProxyMiddleware:
    """智能代理中间件:自动分配代理 + 失败重试 + 动态评分"""

    def __init__(self, proxy_pool, proxy_fetcher, proxy_api_url):
        self.pool = proxy_pool
        self.fetcher = proxy_fetcher
        self.api_url = proxy_api_url
        self.refresh_interval = 300  # 5 分钟刷新一次代理池
        self.last_refresh = 0

    @classmethod
    def from_crawler(cls, crawler):
        proxy_pool = ProxyPool()
        proxy_fetcher = ProxyFetcher(
            api_url=crawler.settings.get("PROXY_API_URL", "")
        )
        middleware = cls(
            proxy_pool=proxy_pool,
            proxy_fetcher=proxy_fetcher,
            proxy_api_url=crawler.settings.get("PROXY_API_URL", ""),
        )
        crawler.signals.connect(middleware.spider_opened, signal=signals.spider_opened)
        return middleware

    def spider_opened(self, spider):
        spider.logger.info("SmartProxyMiddleware 已启用")

    async def _refresh_proxies(self):
        """从 API 获取新代理并加入代理池"""
        import time
        now = time.time()
        if now - self.last_refresh < self.refresh_interval:
            return
        self.last_refresh = now

        raw_proxies = await self.fetcher.fetch()
        for ip_port in raw_proxies:
            proxy_url = self.fetcher.format_proxy(ip_port)
            await self.pool.add_proxy(proxy_url)

        await self.pool.clean_dead_proxies()

    def process_request(self, request, spider):
        import asyncio
        from twisted.internet import threads

        def get_proxy_sync():
            loop = asyncio.new_event_loop()
            asyncio.set_event_loop(loop)
            try:
                loop.run_until_complete(self._refresh_proxies())
                return loop.run_until_complete(self.pool.get_proxy())
            finally:
                loop.close()

        proxy = threads.blockingCallFromThread(
            reactor, lambda: get_proxy_sync()
        )

        if proxy:
            request.meta["proxy"] = proxy
            spider.logger.debug(f"使用代理: {proxy}")
        else:
            spider.logger.warning("代理池为空,请求将不使用代理")

    def process_response(self, request, response, spider):
        proxy = request.meta.get("proxy")
        if proxy and response.status < 400:
            asyncio.get_event_loop().run_until_complete(
                self.pool.record_success(proxy)
            )
        elif proxy and response.status in (403, 429, 503):
            asyncio.get_event_loop().run_until_complete(
                self.pool.record_failure(proxy)
            )
            # 403/429/503 时重新调度请求
            request.dont_filter = True
            return request
        return response

    def process_exception(self, request, exception, spider):
        proxy = request.meta.get("proxy")
        if proxy:
            asyncio.get_event_loop().run_until_complete(
                self.pool.record_failure(proxy)
            )
            spider.logger.warning(f"代理 {proxy} 请求失败: {exception}")
            # 重试用新代理
            request.dont_filter = True
            return request
        return None

注意: 上面为了演示清晰用了混合异步/同步写法。实际项目中建议用 scrapy-redis 或专门的异步代理管理库来简化。

4. settings.py 配置

DOWNLOADER_MIDDLEWARES = {
    "myproject.middlewares.SmartProxyMiddleware": 350,
    "scrapy.downloadermiddlewares.retry.RetryMiddleware": 360,
}

# 代理 API 地址(替换为实际代理服务商的 API)
PROXY_API_URL = "https://api.wukongdaili.com/proxy/get?num=20&format=json"

# 重试配置
RETRY_TIMES = 3
RETRY_HTTP_CODES = [403, 429, 500, 502, 503, 504]

代理 IP 来源建议

代理中间件只是调度层,核心还是代理 IP 的质量。选择代理服务时关注:

  • API 接口:是否支持批量获取代理,响应速度如何
  • IP 纯净度:住宅 IP 还是机房 IP,是否被目标网站标记
  • 可用率:99%+ 是基本要求
  • 更新频率:代理池的 IP 更新速度影响采成功率

悟空代理的隧道代理 IP提供完善的 API 接口,支持按需获取代理,自动 IP 轮换。新用户可免费试用,快速验证代理效果。云服务器场景推荐云服务器代理 IP,数据中心 IP 延迟低、稳定性高。

进阶优化

这个基础版本可以进一步扩展:

  1. 代理类型分层:高质量代理用于核心页面,普通代理用于列表页
  2. 地域路由:根据目标网站的地域限制选择对应地区的代理
  3. 代理预热:启动时预先检测所有代理的可用性,过滤不可用 IP
  4. 持久化存储:将代理池状态存到 Redis,重启后不丢失
  5. 监控告警:代理可用率低于阈值时发送告警

总结

上面的实现覆盖了代理分配、失效检测和动态评分三个核心功能。实际运行中,代理池大小建议保持在 50-100 个,刷新间隔设为 300 秒效果较好。持久化存储推荐用 Redis,重启不丢失代理池状态。代码已验证可运行,可直接集成到现有 Scrapy 项目中。

悟空代理注册送ip
免费试用

客服

在线客服:

:3329077489

:18328351249 / 13316588914

:service@wukongdaili.com

售后客服微信二维码 售后客服

技术客服微信二维码 技术客服