File size: 6,631 Bytes
3803651 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 |
"""代理池管理器 - 从URL动态获取代理IP"""
import asyncio
import aiohttp
import time
from typing import Optional, List
from app.core.logger import logger
class ProxyPool:
"""代理池管理器"""
def __init__(self):
self._pool_url: Optional[str] = None
self._static_proxy: Optional[str] = None
self._current_proxy: Optional[str] = None
self._last_fetch_time: float = 0
self._fetch_interval: int = 300 # 5分钟刷新一次
self._enabled: bool = False
self._lock = asyncio.Lock()
def configure(self, proxy_url: str, proxy_pool_url: str = "", proxy_pool_interval: int = 300):
"""配置代理池
Args:
proxy_url: 静态代理URL(socks5h://xxx 或 http://xxx)
proxy_pool_url: 代理池API URL,返回单个代理地址
proxy_pool_interval: 代理池刷新间隔(秒)
"""
self._static_proxy = self._normalize_proxy(proxy_url) if proxy_url else None
pool_url = proxy_pool_url.strip() if proxy_pool_url else None
if pool_url and self._looks_like_proxy_url(pool_url):
normalized_proxy = self._normalize_proxy(pool_url)
if not self._static_proxy:
self._static_proxy = normalized_proxy
logger.warning("[ProxyPool] proxy_pool_url看起来是代理地址,已作为静态代理使用,请改用proxy_url")
else:
logger.warning("[ProxyPool] proxy_pool_url看起来是代理地址,已忽略(使用proxy_url)")
pool_url = None
self._pool_url = pool_url
self._fetch_interval = proxy_pool_interval
self._enabled = bool(self._pool_url)
if self._enabled:
logger.info(f"[ProxyPool] 代理池已启用: {self._pool_url}, 刷新间隔: {self._fetch_interval}s")
elif self._static_proxy:
logger.info(f"[ProxyPool] 使用静态代理: {self._static_proxy}")
self._current_proxy = self._static_proxy
else:
logger.info("[ProxyPool] 未配置代理")
async def get_proxy(self) -> Optional[str]:
"""获取代理地址
Returns:
代理URL或None
"""
# 如果未启用代理池,返回静态代理
if not self._enabled:
return self._static_proxy
# 检查是否需要刷新
now = time.time()
if not self._current_proxy or (now - self._last_fetch_time) >= self._fetch_interval:
async with self._lock:
# 双重检查
if not self._current_proxy or (now - self._last_fetch_time) >= self._fetch_interval:
await self._fetch_proxy()
return self._current_proxy
async def force_refresh(self) -> Optional[str]:
"""强制刷新代理(用于403错误重试)
Returns:
新的代理URL或None
"""
if not self._enabled:
return self._static_proxy
async with self._lock:
await self._fetch_proxy()
return self._current_proxy
async def _fetch_proxy(self):
"""从代理池URL获取新的代理"""
try:
logger.debug(f"[ProxyPool] 正在从代理池获取新代理: {self._pool_url}")
timeout = aiohttp.ClientTimeout(total=10)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(self._pool_url) as response:
if response.status == 200:
proxy_text = await response.text()
proxy = self._normalize_proxy(proxy_text.strip())
# 验证代理格式
if self._validate_proxy(proxy):
self._current_proxy = proxy
self._last_fetch_time = time.time()
logger.info(f"[ProxyPool] 成功获取新代理: {proxy}")
else:
logger.error(f"[ProxyPool] 代理格式无效: {proxy}")
# 降级到静态代理
if not self._current_proxy:
self._current_proxy = self._static_proxy
else:
logger.error(f"[ProxyPool] 获取代理失败: HTTP {response.status}")
# 降级到静态代理
if not self._current_proxy:
self._current_proxy = self._static_proxy
except asyncio.TimeoutError:
logger.error("[ProxyPool] 获取代理超时")
if not self._current_proxy:
self._current_proxy = self._static_proxy
except Exception as e:
logger.error(f"[ProxyPool] 获取代理异常: {e}")
# 降级到静态代理
if not self._current_proxy:
self._current_proxy = self._static_proxy
def _validate_proxy(self, proxy: str) -> bool:
"""验证代理格式
Args:
proxy: 代理URL
Returns:
是否有效
"""
if not proxy:
return False
# 支持的协议
valid_protocols = ['http://', 'https://', 'socks5://', 'socks5h://']
return any(proxy.startswith(proto) for proto in valid_protocols)
def _normalize_proxy(self, proxy: str) -> str:
"""标准化代理URL(sock5/socks5 → socks5h://)"""
if not proxy:
return proxy
proxy = proxy.strip()
if proxy.startswith("sock5h://"):
proxy = proxy.replace("sock5h://", "socks5h://", 1)
if proxy.startswith("sock5://"):
proxy = proxy.replace("sock5://", "socks5://", 1)
if proxy.startswith("socks5://"):
return proxy.replace("socks5://", "socks5h://", 1)
return proxy
def _looks_like_proxy_url(self, url: str) -> bool:
"""判断URL是否像代理地址(避免误把代理池API当代理)"""
return url.startswith(("sock5://", "sock5h://", "socks5://", "socks5h://"))
def get_current_proxy(self) -> Optional[str]:
"""获取当前使用的代理(同步方法)
Returns:
当前代理URL或None
"""
return self._current_proxy or self._static_proxy
# 全局代理池实例
proxy_pool = ProxyPool()
|