import httpx import asyncio from typing import Optional, Dict, Any from packages.core.logger import app_logger from packages.core.config import settings class BaseHttpClient: """ 基础的网络请求客户端,统一处理: - 超时控制 - 重试机制 - 代理池切换 (预留) - 统一的异常捕获与日志记录 """ def __init__(self, use_proxy: bool = False, max_retries: int = 3, timeout: int = 30): self.max_retries = max_retries self.timeout = timeout self.use_proxy = use_proxy async def _get_proxy(self) -> Optional[str]: """ 获取动态代理(预留给未来的代理池接口) """ if self.use_proxy and hasattr(settings, "PROXY_POOL_URL") and settings.PROXY_POOL_URL: try: # 假设代理池 API 返回格式为: {"proxy": "http://ip:port"} 或者直接是 "ip:port" async with httpx.AsyncClient(timeout=5) as client: resp = await client.get(settings.PROXY_POOL_URL) resp.raise_for_status() data = resp.json() if isinstance(data, dict) and "proxy" in data: proxy = data["proxy"] if not proxy.startswith("http"): proxy = f"http://{proxy}" return proxy elif isinstance(data, str): proxy = data.strip() if not proxy.startswith("http"): proxy = f"http://{proxy}" return proxy except Exception as e: app_logger.error(f"Failed to fetch proxy from pool: {str(e)}") return None def _is_cloudflare_or_captcha(self, response: httpx.Response) -> bool: """ 识别是否被 Cloudflare 拦截或遇到验证码 """ if response.status_code in [403, 503]: text = response.text.lower() if "cloudflare" in text or "ray id" in text or "cf-ray" in response.headers: return True if "captcha" in text or "challenge" in text: return True return False async def request( self, method: str, url: str, params: Optional[Dict[str, Any]] = None, json_data: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, **kwargs ) -> httpx.Response: attempt = 0 last_exception = None verify = kwargs.pop("verify", True) allow_insecure_fallback = kwargs.pop("allow_insecure_fallback", False) while attempt < self.max_retries: attempt += 1 proxy = await self._get_proxy() try: client_kwargs = {"timeout": self.timeout, "verify": verify} if proxy: client_kwargs["proxy"] = proxy async with httpx.AsyncClient(**client_kwargs) as client: app_logger.debug(f"Request: {method} {url} | Attempt: {attempt}/{self.max_retries}") response = await client.request( method=method, url=url, params=params, json=json_data, headers=headers, **kwargs ) # 检查 HTTP 状态码,如果是 4xx 或 5xx 会抛出异常 response.raise_for_status() return response except httpx.HTTPStatusError as e: app_logger.warning(f"HTTP Error {e.response.status_code} for {url}") last_exception = e # 检查是否是被拦截 (Cloudflare / Captcha) if self._is_cloudflare_or_captcha(e.response): app_logger.warning(f"Detected Cloudflare or Captcha interception for {url}") # 遇到验证码或拦截,强制要求更换代理重试 self.use_proxy = True await asyncio.sleep(3 ** attempt) continue # 根据状态码判断是否需要重试 (例如 403, 429, 5xx) if e.response.status_code in [403, 429, 500, 502, 503, 504]: await asyncio.sleep(2 ** attempt) # 指数退避 continue else: raise e except httpx.RequestError as e: app_logger.warning(f"Request Error: {str(e)} for {url}") last_exception = e if allow_insecure_fallback and verify and "CERTIFICATE_VERIFY_FAILED" in str(e): app_logger.warning(f"SSL 证书校验失败,降级为 verify=False 重试: {url}") verify = False continue await asyncio.sleep(2 ** attempt) continue app_logger.error(f"Max retries reached for {url}. Last error: {str(last_exception)}") raise last_exception async def get(self, url: str, **kwargs) -> httpx.Response: return await self.request("GET", url, **kwargs) async def post(self, url: str, **kwargs) -> httpx.Response: return await self.request("POST", url, **kwargs) # 全局单例 http_client = BaseHttpClient()