""" HTTP 客户端封装 基于 curl_cffi 的 HTTP 请求封装,支持代理和错误处理 """ import time import json from typing import Optional, Dict, Any, Union, Tuple from dataclasses import dataclass import logging from curl_cffi import requests as cffi_requests from curl_cffi.requests import Session, Response from ..config.constants import ERROR_MESSAGES from ..config.settings import get_settings from .openai.sentinel import SentinelPOWError, build_sentinel_pow_token logger = logging.getLogger(__name__) @dataclass class RequestConfig: """HTTP 请求配置""" timeout: int = 30 max_retries: int = 3 retry_delay: float = 1.0 impersonate: str = "chrome" verify_ssl: bool = True follow_redirects: bool = True class HTTPClientError(Exception): """HTTP 客户端异常""" pass class HTTPClient: """ HTTP 客户端封装 支持代理、重试、错误处理和会话管理 """ def __init__( self, proxy_url: Optional[str] = None, config: Optional[RequestConfig] = None, session: Optional[Session] = None ): """ 初始化 HTTP 客户端 Args: proxy_url: 代理 URL,如 "http://127.0.0.1:7890" config: 请求配置 session: 可重用的会话对象 """ self.proxy_url = proxy_url self.config = config or RequestConfig() self._session = session @property def proxies(self) -> Optional[Dict[str, str]]: """获取代理配置""" if not self.proxy_url: return None return { "http": self.proxy_url, "https": self.proxy_url, } @property def session(self) -> Session: """获取会话对象(单例)""" if self._session is None: self._session = Session( proxies=self.proxies, impersonate=self.config.impersonate, verify=self.config.verify_ssl, timeout=self.config.timeout ) return self._session def request( self, method: str, url: str, **kwargs ) -> Response: """ 发送 HTTP 请求 Args: method: HTTP 方法 (GET, POST, PUT, DELETE, etc.) url: 请求 URL **kwargs: 其他请求参数 Returns: Response 对象 Raises: HTTPClientError: 请求失败 """ # 设置默认参数 kwargs.setdefault("timeout", self.config.timeout) kwargs.setdefault("allow_redirects", self.config.follow_redirects) # 添加代理配置 if self.proxies and "proxies" not in kwargs: kwargs["proxies"] = self.proxies last_exception = None for attempt in range(self.config.max_retries): try: response = self.session.request(method, url, **kwargs) # 检查响应状态码 if response.status_code >= 400: logger.warning( f"HTTP {response.status_code} for {method} {url}" f" (attempt {attempt + 1}/{self.config.max_retries})" ) # 如果是服务器错误,重试 if response.status_code >= 500 and attempt < self.config.max_retries - 1: time.sleep(self.config.retry_delay * (attempt + 1)) continue return response except (cffi_requests.RequestsError, ConnectionError, TimeoutError) as e: last_exception = e logger.warning( f"请求失败: {method} {url} (attempt {attempt + 1}/{self.config.max_retries}): {e}" ) if attempt < self.config.max_retries - 1: time.sleep(self.config.retry_delay * (attempt + 1)) else: break raise HTTPClientError( f"请求失败,最大重试次数已达: {method} {url} - {last_exception}" ) def get(self, url: str, **kwargs) -> Response: """发送 GET 请求""" return self.request("GET", url, **kwargs) def post(self, url: str, data: Any = None, json: Any = None, **kwargs) -> Response: """发送 POST 请求""" return self.request("POST", url, data=data, json=json, **kwargs) def put(self, url: str, data: Any = None, json: Any = None, **kwargs) -> Response: """发送 PUT 请求""" return self.request("PUT", url, data=data, json=json, **kwargs) def delete(self, url: str, **kwargs) -> Response: """发送 DELETE 请求""" return self.request("DELETE", url, **kwargs) def head(self, url: str, **kwargs) -> Response: """发送 HEAD 请求""" return self.request("HEAD", url, **kwargs) def options(self, url: str, **kwargs) -> Response: """发送 OPTIONS 请求""" return self.request("OPTIONS", url, **kwargs) def patch(self, url: str, data: Any = None, json: Any = None, **kwargs) -> Response: """发送 PATCH 请求""" return self.request("PATCH", url, data=data, json=json, **kwargs) def download_file(self, url: str, filepath: str, chunk_size: int = 8192) -> None: """ 下载文件 Args: url: 文件 URL filepath: 保存路径 chunk_size: 块大小 Raises: HTTPClientError: 下载失败 """ try: response = self.get(url, stream=True) response.raise_for_status() with open(filepath, 'wb') as f: for chunk in response.iter_content(chunk_size=chunk_size): if chunk: f.write(chunk) except Exception as e: raise HTTPClientError(f"下载文件失败: {url} - {e}") def check_proxy(self, test_url: str = "https://httpbin.org/ip") -> bool: """ 检查代理是否可用 Args: test_url: 测试 URL Returns: bool: 代理是否可用 """ if not self.proxy_url: return False try: response = self.get(test_url, timeout=10) return response.status_code == 200 except Exception: return False def close(self): """关闭会话""" if self._session: self._session.close() self._session = None def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() class OpenAIHTTPClient(HTTPClient): """ OpenAI 专用 HTTP 客户端 包含 OpenAI API 特定的请求方法 """ def __init__( self, proxy_url: Optional[str] = None, config: Optional[RequestConfig] = None ): """ 初始化 OpenAI HTTP 客户端 Args: proxy_url: 代理 URL config: 请求配置 """ super().__init__(proxy_url, config) # OpenAI 特定的默认配置 if config is None: self.config.timeout = 30 self.config.max_retries = 3 # 默认请求头 self.default_headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", "Accept": "application/json", "Accept-Language": "en-US,en;q=0.9", "Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive", "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-site", } def check_ip_location(self) -> Tuple[bool, Optional[str]]: """ 检查 IP 地理位置 Returns: Tuple[是否支持, 位置信息] """ try: response = self.get("https://cloudflare.com/cdn-cgi/trace", timeout=10) trace_text = response.text # 解析位置信息 import re loc_match = re.search(r"loc=([A-Z]+)", trace_text) loc = loc_match.group(1) if loc_match else None # 检查是否支持 if loc in ["CN", "HK", "MO", "TW"]: return False, loc return True, loc except Exception as e: logger.error(f"检查 IP 地理位置失败: {e}") return False, None def send_openai_request( self, endpoint: str, method: str = "POST", data: Optional[Dict[str, Any]] = None, json_data: Optional[Dict[str, Any]] = None, headers: Optional[Dict[str, str]] = None, **kwargs ) -> Dict[str, Any]: """ 发送 OpenAI API 请求 Args: endpoint: API 端点 method: HTTP 方法 data: 表单数据 json_data: JSON 数据 headers: 请求头 **kwargs: 其他参数 Returns: 响应 JSON 数据 Raises: HTTPClientError: 请求失败 """ # 合并请求头 request_headers = self.default_headers.copy() if headers: request_headers.update(headers) # 设置 Content-Type if json_data is not None and "Content-Type" not in request_headers: request_headers["Content-Type"] = "application/json" elif data is not None and "Content-Type" not in request_headers: request_headers["Content-Type"] = "application/x-www-form-urlencoded" try: response = self.request( method, endpoint, data=data, json=json_data, headers=request_headers, **kwargs ) # 检查响应状态码 response.raise_for_status() # 尝试解析 JSON try: return response.json() except json.JSONDecodeError: return {"raw_response": response.text} except cffi_requests.RequestsError as e: raise HTTPClientError(f"OpenAI 请求失败: {endpoint} - {e}") def check_sentinel(self, did: str, proxies: Optional[Dict] = None) -> Optional[str]: """ 检查 Sentinel 拦截 Args: did: Device ID proxies: 代理配置 Returns: Sentinel token 或 None """ from ..config.constants import OPENAI_API_ENDPOINTS try: pow_token = build_sentinel_pow_token(self.default_headers.get("User-Agent", "")) sen_req_body = json.dumps({ "p": pow_token, "id": did, "flow": "authorize_continue", }, separators=(",", ":")) response = self.post( OPENAI_API_ENDPOINTS["sentinel"], headers={ "origin": "https://sentinel.openai.com", "referer": "https://sentinel.openai.com/backend-api/sentinel/frame.html?sv=20260219f9f6", "content-type": "text/plain;charset=UTF-8", }, data=sen_req_body, ) if response.status_code == 200: return response.json().get("token") else: logger.warning(f"Sentinel 检查失败: {response.status_code}") return None except SentinelPOWError as e: logger.error(f"Sentinel POW 求解失败: {e}") return None except Exception as e: logger.error(f"Sentinel 检查异常: {e}") return None def create_http_client( proxy_url: Optional[str] = None, config: Optional[RequestConfig] = None ) -> HTTPClient: """ 创建 HTTP 客户端工厂函数 Args: proxy_url: 代理 URL config: 请求配置 Returns: HTTPClient 实例 """ return HTTPClient(proxy_url, config) def create_openai_client( proxy_url: Optional[str] = None, config: Optional[RequestConfig] = None ) -> OpenAIHTTPClient: """ 创建 OpenAI HTTP 客户端工厂函数 Args: proxy_url: 代理 URL config: 请求配置 Returns: OpenAIHTTPClient 实例 """ return OpenAIHTTPClient(proxy_url, config)