|
|
"""代理池管理器 - 从URL动态获取代理IP""" |
|
|
|
|
|
import asyncio |
|
|
import aiohttp |
|
|
import time |
|
|
from typing import Optional, List |
|
|
from app.core.logger import logger |
|
|
|
|
|
|
|
|
class ProxyPool: |
|
|
"""代理池管理器""" |
|
|
|
|
|
def __init__(self): |
|
|
self._pool_url: Optional[str] = None |
|
|
self._static_proxy: Optional[str] = None |
|
|
self._current_proxy: Optional[str] = None |
|
|
self._last_fetch_time: float = 0 |
|
|
self._fetch_interval: int = 300 |
|
|
self._enabled: bool = False |
|
|
self._lock = asyncio.Lock() |
|
|
|
|
|
def configure(self, proxy_url: str, proxy_pool_url: str = "", proxy_pool_interval: int = 300): |
|
|
"""配置代理池 |
|
|
|
|
|
Args: |
|
|
proxy_url: 静态代理URL(socks5h://xxx 或 http://xxx) |
|
|
proxy_pool_url: 代理池API URL,返回单个代理地址 |
|
|
proxy_pool_interval: 代理池刷新间隔(秒) |
|
|
""" |
|
|
self._static_proxy = self._normalize_proxy(proxy_url) if proxy_url else None |
|
|
pool_url = proxy_pool_url.strip() if proxy_pool_url else None |
|
|
if pool_url and self._looks_like_proxy_url(pool_url): |
|
|
normalized_proxy = self._normalize_proxy(pool_url) |
|
|
if not self._static_proxy: |
|
|
self._static_proxy = normalized_proxy |
|
|
logger.warning("[ProxyPool] proxy_pool_url看起来是代理地址,已作为静态代理使用,请改用proxy_url") |
|
|
else: |
|
|
logger.warning("[ProxyPool] proxy_pool_url看起来是代理地址,已忽略(使用proxy_url)") |
|
|
pool_url = None |
|
|
self._pool_url = pool_url |
|
|
self._fetch_interval = proxy_pool_interval |
|
|
self._enabled = bool(self._pool_url) |
|
|
|
|
|
if self._enabled: |
|
|
logger.info(f"[ProxyPool] 代理池已启用: {self._pool_url}, 刷新间隔: {self._fetch_interval}s") |
|
|
elif self._static_proxy: |
|
|
logger.info(f"[ProxyPool] 使用静态代理: {self._static_proxy}") |
|
|
self._current_proxy = self._static_proxy |
|
|
else: |
|
|
logger.info("[ProxyPool] 未配置代理") |
|
|
|
|
|
async def get_proxy(self) -> Optional[str]: |
|
|
"""获取代理地址 |
|
|
|
|
|
Returns: |
|
|
代理URL或None |
|
|
""" |
|
|
|
|
|
if not self._enabled: |
|
|
return self._static_proxy |
|
|
|
|
|
|
|
|
now = time.time() |
|
|
if not self._current_proxy or (now - self._last_fetch_time) >= self._fetch_interval: |
|
|
async with self._lock: |
|
|
|
|
|
if not self._current_proxy or (now - self._last_fetch_time) >= self._fetch_interval: |
|
|
await self._fetch_proxy() |
|
|
|
|
|
return self._current_proxy |
|
|
|
|
|
async def force_refresh(self) -> Optional[str]: |
|
|
"""强制刷新代理(用于403错误重试) |
|
|
|
|
|
Returns: |
|
|
新的代理URL或None |
|
|
""" |
|
|
if not self._enabled: |
|
|
return self._static_proxy |
|
|
|
|
|
async with self._lock: |
|
|
await self._fetch_proxy() |
|
|
|
|
|
return self._current_proxy |
|
|
|
|
|
async def _fetch_proxy(self): |
|
|
"""从代理池URL获取新的代理""" |
|
|
try: |
|
|
logger.debug(f"[ProxyPool] 正在从代理池获取新代理: {self._pool_url}") |
|
|
|
|
|
timeout = aiohttp.ClientTimeout(total=10) |
|
|
async with aiohttp.ClientSession(timeout=timeout) as session: |
|
|
async with session.get(self._pool_url) as response: |
|
|
if response.status == 200: |
|
|
proxy_text = await response.text() |
|
|
proxy = self._normalize_proxy(proxy_text.strip()) |
|
|
|
|
|
|
|
|
if self._validate_proxy(proxy): |
|
|
self._current_proxy = proxy |
|
|
self._last_fetch_time = time.time() |
|
|
logger.info(f"[ProxyPool] 成功获取新代理: {proxy}") |
|
|
else: |
|
|
logger.error(f"[ProxyPool] 代理格式无效: {proxy}") |
|
|
|
|
|
if not self._current_proxy: |
|
|
self._current_proxy = self._static_proxy |
|
|
else: |
|
|
logger.error(f"[ProxyPool] 获取代理失败: HTTP {response.status}") |
|
|
|
|
|
if not self._current_proxy: |
|
|
self._current_proxy = self._static_proxy |
|
|
|
|
|
except asyncio.TimeoutError: |
|
|
logger.error("[ProxyPool] 获取代理超时") |
|
|
if not self._current_proxy: |
|
|
self._current_proxy = self._static_proxy |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"[ProxyPool] 获取代理异常: {e}") |
|
|
|
|
|
if not self._current_proxy: |
|
|
self._current_proxy = self._static_proxy |
|
|
|
|
|
def _validate_proxy(self, proxy: str) -> bool: |
|
|
"""验证代理格式 |
|
|
|
|
|
Args: |
|
|
proxy: 代理URL |
|
|
|
|
|
Returns: |
|
|
是否有效 |
|
|
""" |
|
|
if not proxy: |
|
|
return False |
|
|
|
|
|
|
|
|
valid_protocols = ['http://', 'https://', 'socks5://', 'socks5h://'] |
|
|
|
|
|
return any(proxy.startswith(proto) for proto in valid_protocols) |
|
|
|
|
|
def _normalize_proxy(self, proxy: str) -> str: |
|
|
"""标准化代理URL(sock5/socks5 → socks5h://)""" |
|
|
if not proxy: |
|
|
return proxy |
|
|
|
|
|
proxy = proxy.strip() |
|
|
if proxy.startswith("sock5h://"): |
|
|
proxy = proxy.replace("sock5h://", "socks5h://", 1) |
|
|
if proxy.startswith("sock5://"): |
|
|
proxy = proxy.replace("sock5://", "socks5://", 1) |
|
|
if proxy.startswith("socks5://"): |
|
|
return proxy.replace("socks5://", "socks5h://", 1) |
|
|
return proxy |
|
|
|
|
|
def _looks_like_proxy_url(self, url: str) -> bool: |
|
|
"""判断URL是否像代理地址(避免误把代理池API当代理)""" |
|
|
return url.startswith(("sock5://", "sock5h://", "socks5://", "socks5h://")) |
|
|
|
|
|
def get_current_proxy(self) -> Optional[str]: |
|
|
"""获取当前使用的代理(同步方法) |
|
|
|
|
|
Returns: |
|
|
当前代理URL或None |
|
|
""" |
|
|
return self._current_proxy or self._static_proxy |
|
|
|
|
|
|
|
|
|
|
|
proxy_pool = ProxyPool() |
|
|
|