Spaces:
Runtime error
Runtime error
| import httpx | |
| import asyncio | |
| from typing import Optional, Dict, Any | |
| from packages.core.logger import app_logger | |
| from packages.core.config import settings | |
| class BaseHttpClient: | |
| """ | |
| 基础的网络请求客户端,统一处理: | |
| - 超时控制 | |
| - 重试机制 | |
| - 代理池切换 (预留) | |
| - 统一的异常捕获与日志记录 | |
| """ | |
| def __init__(self, use_proxy: bool = False, max_retries: int = 3, timeout: int = 30): | |
| self.max_retries = max_retries | |
| self.timeout = timeout | |
| self.use_proxy = use_proxy | |
| async def _get_proxy(self) -> Optional[str]: | |
| """ | |
| 获取动态代理(预留给未来的代理池接口) | |
| """ | |
| if self.use_proxy and hasattr(settings, "PROXY_POOL_URL") and settings.PROXY_POOL_URL: | |
| try: | |
| # 假设代理池 API 返回格式为: {"proxy": "http://ip:port"} 或者直接是 "ip:port" | |
| async with httpx.AsyncClient(timeout=5) as client: | |
| resp = await client.get(settings.PROXY_POOL_URL) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| if isinstance(data, dict) and "proxy" in data: | |
| proxy = data["proxy"] | |
| if not proxy.startswith("http"): | |
| proxy = f"http://{proxy}" | |
| return proxy | |
| elif isinstance(data, str): | |
| proxy = data.strip() | |
| if not proxy.startswith("http"): | |
| proxy = f"http://{proxy}" | |
| return proxy | |
| except Exception as e: | |
| app_logger.error(f"Failed to fetch proxy from pool: {str(e)}") | |
| return None | |
| def _is_cloudflare_or_captcha(self, response: httpx.Response) -> bool: | |
| """ | |
| 识别是否被 Cloudflare 拦截或遇到验证码 | |
| """ | |
| if response.status_code in [403, 503]: | |
| text = response.text.lower() | |
| if "cloudflare" in text or "ray id" in text or "cf-ray" in response.headers: | |
| return True | |
| if "captcha" in text or "challenge" in text: | |
| return True | |
| return False | |
| async def request( | |
| self, | |
| method: str, | |
| url: str, | |
| params: Optional[Dict[str, Any]] = None, | |
| json_data: Optional[Dict[str, Any]] = None, | |
| headers: Optional[Dict[str, str]] = None, | |
| **kwargs | |
| ) -> httpx.Response: | |
| attempt = 0 | |
| last_exception = None | |
| verify = kwargs.pop("verify", True) | |
| allow_insecure_fallback = kwargs.pop("allow_insecure_fallback", False) | |
| while attempt < self.max_retries: | |
| attempt += 1 | |
| proxy = await self._get_proxy() | |
| try: | |
| client_kwargs = {"timeout": self.timeout, "verify": verify} | |
| if proxy: | |
| client_kwargs["proxy"] = proxy | |
| async with httpx.AsyncClient(**client_kwargs) as client: | |
| app_logger.debug(f"Request: {method} {url} | Attempt: {attempt}/{self.max_retries}") | |
| response = await client.request( | |
| method=method, | |
| url=url, | |
| params=params, | |
| json=json_data, | |
| headers=headers, | |
| **kwargs | |
| ) | |
| # 检查 HTTP 状态码,如果是 4xx 或 5xx 会抛出异常 | |
| response.raise_for_status() | |
| return response | |
| except httpx.HTTPStatusError as e: | |
| app_logger.warning(f"HTTP Error {e.response.status_code} for {url}") | |
| last_exception = e | |
| # 检查是否是被拦截 (Cloudflare / Captcha) | |
| if self._is_cloudflare_or_captcha(e.response): | |
| app_logger.warning(f"Detected Cloudflare or Captcha interception for {url}") | |
| # 遇到验证码或拦截,强制要求更换代理重试 | |
| self.use_proxy = True | |
| await asyncio.sleep(3 ** attempt) | |
| continue | |
| # 根据状态码判断是否需要重试 (例如 403, 429, 5xx) | |
| if e.response.status_code in [403, 429, 500, 502, 503, 504]: | |
| await asyncio.sleep(2 ** attempt) # 指数退避 | |
| continue | |
| else: | |
| raise e | |
| except httpx.RequestError as e: | |
| app_logger.warning(f"Request Error: {str(e)} for {url}") | |
| last_exception = e | |
| if allow_insecure_fallback and verify and "CERTIFICATE_VERIFY_FAILED" in str(e): | |
| app_logger.warning(f"SSL 证书校验失败,降级为 verify=False 重试: {url}") | |
| verify = False | |
| continue | |
| await asyncio.sleep(2 ** attempt) | |
| continue | |
| app_logger.error(f"Max retries reached for {url}. Last error: {str(last_exception)}") | |
| raise last_exception | |
| async def get(self, url: str, **kwargs) -> httpx.Response: | |
| return await self.request("GET", url, **kwargs) | |
| async def post(self, url: str, **kwargs) -> httpx.Response: | |
| return await self.request("POST", url, **kwargs) | |
| # 全局单例 | |
| http_client = BaseHttpClient() | |