Spaces:
Paused
Paused
| """代理池管理器 - 从URL动态获取代理IP""" | |
| import asyncio | |
| import aiohttp | |
| import time | |
| from typing import Optional, List | |
| from app.core.logger import logger | |
| class ProxyPool: | |
| """代理池管理器""" | |
| def __init__(self): | |
| self._pool_url: Optional[str] = None | |
| self._static_proxy: Optional[str] = None | |
| self._current_proxy: Optional[str] = None | |
| self._last_fetch_time: float = 0 | |
| self._fetch_interval: int = 300 # 5分钟刷新一次 | |
| self._enabled: bool = False | |
| self._lock = asyncio.Lock() | |
| def configure(self, proxy_url: str, proxy_pool_url: str = "", proxy_pool_interval: int = 300): | |
| """配置代理池 | |
| Args: | |
| proxy_url: 静态代理URL(socks5h://xxx 或 http://xxx) | |
| proxy_pool_url: 代理池API URL,返回单个代理地址 | |
| proxy_pool_interval: 代理池刷新间隔(秒) | |
| """ | |
| self._static_proxy = self._normalize_proxy(proxy_url) if proxy_url else None | |
| pool_url = proxy_pool_url.strip() if proxy_pool_url else None | |
| if pool_url and self._looks_like_proxy_url(pool_url): | |
| normalized_proxy = self._normalize_proxy(pool_url) | |
| if not self._static_proxy: | |
| self._static_proxy = normalized_proxy | |
| logger.warning("[ProxyPool] proxy_pool_url看起来是代理地址,已作为静态代理使用,请改用proxy_url") | |
| else: | |
| logger.warning("[ProxyPool] proxy_pool_url看起来是代理地址,已忽略(使用proxy_url)") | |
| pool_url = None | |
| self._pool_url = pool_url | |
| self._fetch_interval = proxy_pool_interval | |
| self._enabled = bool(self._pool_url) | |
| if self._enabled: | |
| logger.info(f"[ProxyPool] 代理池已启用: {self._pool_url}, 刷新间隔: {self._fetch_interval}s") | |
| elif self._static_proxy: | |
| logger.info(f"[ProxyPool] 使用静态代理: {self._static_proxy}") | |
| self._current_proxy = self._static_proxy | |
| else: | |
| logger.info("[ProxyPool] 未配置代理") | |
| async def get_proxy(self) -> Optional[str]: | |
| """获取代理地址 | |
| Returns: | |
| 代理URL或None | |
| """ | |
| # 如果未启用代理池,返回静态代理 | |
| if not self._enabled: | |
| return self._static_proxy | |
| # 检查是否需要刷新 | |
| now = time.time() | |
| if not self._current_proxy or (now - self._last_fetch_time) >= self._fetch_interval: | |
| async with self._lock: | |
| # 双重检查 | |
| if not self._current_proxy or (now - self._last_fetch_time) >= self._fetch_interval: | |
| await self._fetch_proxy() | |
| return self._current_proxy | |
| async def force_refresh(self) -> Optional[str]: | |
| """强制刷新代理(用于403错误重试) | |
| Returns: | |
| 新的代理URL或None | |
| """ | |
| if not self._enabled: | |
| return self._static_proxy | |
| async with self._lock: | |
| await self._fetch_proxy() | |
| return self._current_proxy | |
| async def _fetch_proxy(self): | |
| """从代理池URL获取新的代理""" | |
| try: | |
| logger.debug(f"[ProxyPool] 正在从代理池获取新代理: {self._pool_url}") | |
| timeout = aiohttp.ClientTimeout(total=10) | |
| async with aiohttp.ClientSession(timeout=timeout) as session: | |
| async with session.get(self._pool_url) as response: | |
| if response.status == 200: | |
| proxy_text = await response.text() | |
| proxy = self._normalize_proxy(proxy_text.strip()) | |
| # 验证代理格式 | |
| if self._validate_proxy(proxy): | |
| self._current_proxy = proxy | |
| self._last_fetch_time = time.time() | |
| logger.info(f"[ProxyPool] 成功获取新代理: {proxy}") | |
| else: | |
| logger.error(f"[ProxyPool] 代理格式无效: {proxy}") | |
| # 降级到静态代理 | |
| if not self._current_proxy: | |
| self._current_proxy = self._static_proxy | |
| else: | |
| logger.error(f"[ProxyPool] 获取代理失败: HTTP {response.status}") | |
| # 降级到静态代理 | |
| if not self._current_proxy: | |
| self._current_proxy = self._static_proxy | |
| except asyncio.TimeoutError: | |
| logger.error("[ProxyPool] 获取代理超时") | |
| if not self._current_proxy: | |
| self._current_proxy = self._static_proxy | |
| except Exception as e: | |
| logger.error(f"[ProxyPool] 获取代理异常: {e}") | |
| # 降级到静态代理 | |
| if not self._current_proxy: | |
| self._current_proxy = self._static_proxy | |
| def _validate_proxy(self, proxy: str) -> bool: | |
| """验证代理格式 | |
| Args: | |
| proxy: 代理URL | |
| Returns: | |
| 是否有效 | |
| """ | |
| if not proxy: | |
| return False | |
| # 支持的协议 | |
| valid_protocols = ['http://', 'https://', 'socks5://', 'socks5h://'] | |
| return any(proxy.startswith(proto) for proto in valid_protocols) | |
| def _normalize_proxy(self, proxy: str) -> str: | |
| """标准化代理URL(sock5/socks5 → socks5h://)""" | |
| if not proxy: | |
| return proxy | |
| proxy = proxy.strip() | |
| if proxy.startswith("sock5h://"): | |
| proxy = proxy.replace("sock5h://", "socks5h://", 1) | |
| if proxy.startswith("sock5://"): | |
| proxy = proxy.replace("sock5://", "socks5://", 1) | |
| if proxy.startswith("socks5://"): | |
| return proxy.replace("socks5://", "socks5h://", 1) | |
| return proxy | |
| def _looks_like_proxy_url(self, url: str) -> bool: | |
| """判断URL是否像代理地址(避免误把代理池API当代理)""" | |
| return url.startswith(("sock5://", "sock5h://", "socks5://", "socks5h://")) | |
| def get_current_proxy(self) -> Optional[str]: | |
| """获取当前使用的代理(同步方法) | |
| Returns: | |
| 当前代理URL或None | |
| """ | |
| return self._current_proxy or self._static_proxy | |
| # 全局代理池实例 | |
| proxy_pool = ProxyPool() | |