gk / app /core /proxy_pool.py
nanoppa's picture
Upload 37 files
3803651 verified
"""代理池管理器 - 从URL动态获取代理IP"""
import asyncio
import aiohttp
import time
from typing import Optional, List
from app.core.logger import logger
class ProxyPool:
"""代理池管理器"""
def __init__(self):
self._pool_url: Optional[str] = None
self._static_proxy: Optional[str] = None
self._current_proxy: Optional[str] = None
self._last_fetch_time: float = 0
self._fetch_interval: int = 300 # 5分钟刷新一次
self._enabled: bool = False
self._lock = asyncio.Lock()
def configure(self, proxy_url: str, proxy_pool_url: str = "", proxy_pool_interval: int = 300):
"""配置代理池
Args:
proxy_url: 静态代理URL(socks5h://xxx 或 http://xxx)
proxy_pool_url: 代理池API URL,返回单个代理地址
proxy_pool_interval: 代理池刷新间隔(秒)
"""
self._static_proxy = self._normalize_proxy(proxy_url) if proxy_url else None
pool_url = proxy_pool_url.strip() if proxy_pool_url else None
if pool_url and self._looks_like_proxy_url(pool_url):
normalized_proxy = self._normalize_proxy(pool_url)
if not self._static_proxy:
self._static_proxy = normalized_proxy
logger.warning("[ProxyPool] proxy_pool_url看起来是代理地址,已作为静态代理使用,请改用proxy_url")
else:
logger.warning("[ProxyPool] proxy_pool_url看起来是代理地址,已忽略(使用proxy_url)")
pool_url = None
self._pool_url = pool_url
self._fetch_interval = proxy_pool_interval
self._enabled = bool(self._pool_url)
if self._enabled:
logger.info(f"[ProxyPool] 代理池已启用: {self._pool_url}, 刷新间隔: {self._fetch_interval}s")
elif self._static_proxy:
logger.info(f"[ProxyPool] 使用静态代理: {self._static_proxy}")
self._current_proxy = self._static_proxy
else:
logger.info("[ProxyPool] 未配置代理")
async def get_proxy(self) -> Optional[str]:
"""获取代理地址
Returns:
代理URL或None
"""
# 如果未启用代理池,返回静态代理
if not self._enabled:
return self._static_proxy
# 检查是否需要刷新
now = time.time()
if not self._current_proxy or (now - self._last_fetch_time) >= self._fetch_interval:
async with self._lock:
# 双重检查
if not self._current_proxy or (now - self._last_fetch_time) >= self._fetch_interval:
await self._fetch_proxy()
return self._current_proxy
async def force_refresh(self) -> Optional[str]:
"""强制刷新代理(用于403错误重试)
Returns:
新的代理URL或None
"""
if not self._enabled:
return self._static_proxy
async with self._lock:
await self._fetch_proxy()
return self._current_proxy
async def _fetch_proxy(self):
"""从代理池URL获取新的代理"""
try:
logger.debug(f"[ProxyPool] 正在从代理池获取新代理: {self._pool_url}")
timeout = aiohttp.ClientTimeout(total=10)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(self._pool_url) as response:
if response.status == 200:
proxy_text = await response.text()
proxy = self._normalize_proxy(proxy_text.strip())
# 验证代理格式
if self._validate_proxy(proxy):
self._current_proxy = proxy
self._last_fetch_time = time.time()
logger.info(f"[ProxyPool] 成功获取新代理: {proxy}")
else:
logger.error(f"[ProxyPool] 代理格式无效: {proxy}")
# 降级到静态代理
if not self._current_proxy:
self._current_proxy = self._static_proxy
else:
logger.error(f"[ProxyPool] 获取代理失败: HTTP {response.status}")
# 降级到静态代理
if not self._current_proxy:
self._current_proxy = self._static_proxy
except asyncio.TimeoutError:
logger.error("[ProxyPool] 获取代理超时")
if not self._current_proxy:
self._current_proxy = self._static_proxy
except Exception as e:
logger.error(f"[ProxyPool] 获取代理异常: {e}")
# 降级到静态代理
if not self._current_proxy:
self._current_proxy = self._static_proxy
def _validate_proxy(self, proxy: str) -> bool:
"""验证代理格式
Args:
proxy: 代理URL
Returns:
是否有效
"""
if not proxy:
return False
# 支持的协议
valid_protocols = ['http://', 'https://', 'socks5://', 'socks5h://']
return any(proxy.startswith(proto) for proto in valid_protocols)
def _normalize_proxy(self, proxy: str) -> str:
"""标准化代理URL(sock5/socks5 → socks5h://)"""
if not proxy:
return proxy
proxy = proxy.strip()
if proxy.startswith("sock5h://"):
proxy = proxy.replace("sock5h://", "socks5h://", 1)
if proxy.startswith("sock5://"):
proxy = proxy.replace("sock5://", "socks5://", 1)
if proxy.startswith("socks5://"):
return proxy.replace("socks5://", "socks5h://", 1)
return proxy
def _looks_like_proxy_url(self, url: str) -> bool:
"""判断URL是否像代理地址(避免误把代理池API当代理)"""
return url.startswith(("sock5://", "sock5h://", "socks5://", "socks5h://"))
def get_current_proxy(self) -> Optional[str]:
"""获取当前使用的代理(同步方法)
Returns:
当前代理URL或None
"""
return self._current_proxy or self._static_proxy
# 全局代理池实例
proxy_pool = ProxyPool()