Spaces:
Sleeping
Sleeping
| """ | |
| 通用的HTTP客户端模块 | |
| 为所有需要使用httpx的模块提供统一的客户端配置和方法 | |
| 保持通用性,不与特定业务逻辑耦合 | |
| """ | |
| import httpx | |
| from typing import Optional, Dict, Any, AsyncGenerator | |
| from contextlib import asynccontextmanager | |
| from config import get_proxy_config | |
| from log import log | |
| class HttpxClientManager: | |
| """通用HTTP客户端管理器""" | |
| async def get_client_kwargs(self, timeout: float = 30.0, **kwargs) -> Dict[str, Any]: | |
| """获取httpx客户端的通用配置参数""" | |
| client_kwargs = { | |
| "timeout": timeout, | |
| **kwargs | |
| } | |
| # 动态读取代理配置,支持热更新 | |
| current_proxy_config = await get_proxy_config() | |
| if current_proxy_config: | |
| client_kwargs["proxy"] = current_proxy_config | |
| return client_kwargs | |
| async def get_client(self, timeout: float = 30.0, **kwargs) -> AsyncGenerator[httpx.AsyncClient, None]: | |
| """获取配置好的异步HTTP客户端""" | |
| client_kwargs = await self.get_client_kwargs(timeout=timeout, **kwargs) | |
| async with httpx.AsyncClient(**client_kwargs) as client: | |
| yield client | |
| async def get_streaming_client(self, timeout: float = None, **kwargs) -> AsyncGenerator[httpx.AsyncClient, None]: | |
| """获取用于流式请求的HTTP客户端(无超时限制)""" | |
| client_kwargs = await self.get_client_kwargs(timeout=timeout, **kwargs) | |
| # 创建独立的客户端实例用于流式处理 | |
| client = httpx.AsyncClient(**client_kwargs) | |
| try: | |
| yield client | |
| finally: | |
| await client.aclose() | |
| # 全局HTTP客户端管理器实例 | |
| http_client = HttpxClientManager() | |
| # 通用的异步方法 | |
| async def get_async(url: str, headers: Optional[Dict[str, str]] = None, | |
| timeout: float = 30.0, **kwargs) -> httpx.Response: | |
| """通用异步GET请求""" | |
| async with http_client.get_client(timeout=timeout, **kwargs) as client: | |
| return await client.get(url, headers=headers) | |
| async def post_async(url: str, data: Any = None, json: Any = None, | |
| headers: Optional[Dict[str, str]] = None, | |
| timeout: float = 30.0, **kwargs) -> httpx.Response: | |
| """通用异步POST请求""" | |
| async with http_client.get_client(timeout=timeout, **kwargs) as client: | |
| return await client.post(url, data=data, json=json, headers=headers) | |
| async def put_async(url: str, data: Any = None, json: Any = None, | |
| headers: Optional[Dict[str, str]] = None, | |
| timeout: float = 30.0, **kwargs) -> httpx.Response: | |
| """通用异步PUT请求""" | |
| async with http_client.get_client(timeout=timeout, **kwargs) as client: | |
| return await client.put(url, data=data, json=json, headers=headers) | |
| async def delete_async(url: str, headers: Optional[Dict[str, str]] = None, | |
| timeout: float = 30.0, **kwargs) -> httpx.Response: | |
| """通用异步DELETE请求""" | |
| async with http_client.get_client(timeout=timeout, **kwargs) as client: | |
| return await client.delete(url, headers=headers) | |
| # 错误处理装饰器 | |
| def handle_http_errors(func): | |
| """HTTP错误处理装饰器""" | |
| async def wrapper(*args, **kwargs): | |
| try: | |
| response = await func(*args, **kwargs) | |
| response.raise_for_status() | |
| return response | |
| except httpx.HTTPStatusError as e: | |
| log.error(f"HTTP错误: {e.response.status_code} - {e.response.text}") | |
| raise | |
| except httpx.RequestError as e: | |
| log.error(f"请求错误: {e}") | |
| raise | |
| except Exception as e: | |
| log.error(f"未知错误: {e}") | |
| raise | |
| return wrapper | |
| # 应用错误处理的安全方法 | |
| async def safe_get_async(url: str, headers: Optional[Dict[str, str]] = None, | |
| timeout: float = 30.0, **kwargs) -> httpx.Response: | |
| """安全的异步GET请求(自动错误处理)""" | |
| return await get_async(url, headers=headers, timeout=timeout, **kwargs) | |
| async def safe_post_async(url: str, data: Any = None, json: Any = None, | |
| headers: Optional[Dict[str, str]] = None, | |
| timeout: float = 30.0, **kwargs) -> httpx.Response: | |
| """安全的异步POST请求(自动错误处理)""" | |
| return await post_async(url, data=data, json=json, headers=headers, timeout=timeout, **kwargs) | |
| async def safe_put_async(url: str, data: Any = None, json: Any = None, | |
| headers: Optional[Dict[str, str]] = None, | |
| timeout: float = 30.0, **kwargs) -> httpx.Response: | |
| """安全的异步PUT请求(自动错误处理)""" | |
| return await put_async(url, data=data, json=json, headers=headers, timeout=timeout, **kwargs) | |
| async def safe_delete_async(url: str, headers: Optional[Dict[str, str]] = None, | |
| timeout: float = 30.0, **kwargs) -> httpx.Response: | |
| """安全的异步DELETE请求(自动错误处理)""" | |
| return await delete_async(url, headers=headers, timeout=timeout, **kwargs) | |
| # 流式请求支持 | |
| class StreamingContext: | |
| """流式请求上下文管理器""" | |
| def __init__(self, client: httpx.AsyncClient, stream_context): | |
| self.client = client | |
| self.stream_context = stream_context | |
| self.response = None | |
| async def __aenter__(self): | |
| self.response = await self.stream_context.__aenter__() | |
| return self.response | |
| async def __aexit__(self, exc_type, exc_val, exc_tb): | |
| try: | |
| if self.stream_context: | |
| await self.stream_context.__aexit__(exc_type, exc_val, exc_tb) | |
| finally: | |
| if self.client: | |
| await self.client.aclose() | |
| async def get_streaming_post_context(url: str, data: Any = None, json: Any = None, | |
| headers: Optional[Dict[str, str]] = None, | |
| timeout: float = None, **kwargs) -> AsyncGenerator[StreamingContext, None]: | |
| """获取流式POST请求的上下文管理器""" | |
| async with http_client.get_streaming_client(timeout=timeout, **kwargs) as client: | |
| stream_ctx = client.stream("POST", url, data=data, json=json, headers=headers) | |
| streaming_context = StreamingContext(client, stream_ctx) | |
| yield streaming_context | |
| async def create_streaming_client_with_kwargs(**kwargs) -> httpx.AsyncClient: | |
| """创建用于流式处理的独立客户端实例(手动管理生命周期)""" | |
| client_kwargs = await http_client.get_client_kwargs(timeout=None, **kwargs) | |
| return httpx.AsyncClient(**client_kwargs) |