Spaces:
Sleeping
Sleeping
File size: 6,925 Bytes
5868187 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 |
"""
通用的HTTP客户端模块
为所有需要使用httpx的模块提供统一的客户端配置和方法
保持通用性,不与特定业务逻辑耦合
"""
import httpx
from typing import Optional, Dict, Any, AsyncGenerator
from contextlib import asynccontextmanager
from config import get_proxy_config
from log import log
class HttpxClientManager:
"""通用HTTP客户端管理器"""
async def get_client_kwargs(self, timeout: float = 30.0, **kwargs) -> Dict[str, Any]:
"""获取httpx客户端的通用配置参数"""
client_kwargs = {
"timeout": timeout,
**kwargs
}
# 动态读取代理配置,支持热更新
current_proxy_config = await get_proxy_config()
if current_proxy_config:
client_kwargs["proxy"] = current_proxy_config
return client_kwargs
@asynccontextmanager
async def get_client(self, timeout: float = 30.0, **kwargs) -> AsyncGenerator[httpx.AsyncClient, None]:
"""获取配置好的异步HTTP客户端"""
client_kwargs = await self.get_client_kwargs(timeout=timeout, **kwargs)
async with httpx.AsyncClient(**client_kwargs) as client:
yield client
@asynccontextmanager
async def get_streaming_client(self, timeout: float = None, **kwargs) -> AsyncGenerator[httpx.AsyncClient, None]:
"""获取用于流式请求的HTTP客户端(无超时限制)"""
client_kwargs = await self.get_client_kwargs(timeout=timeout, **kwargs)
# 创建独立的客户端实例用于流式处理
client = httpx.AsyncClient(**client_kwargs)
try:
yield client
finally:
await client.aclose()
# 全局HTTP客户端管理器实例
http_client = HttpxClientManager()
# 通用的异步方法
async def get_async(url: str, headers: Optional[Dict[str, str]] = None,
timeout: float = 30.0, **kwargs) -> httpx.Response:
"""通用异步GET请求"""
async with http_client.get_client(timeout=timeout, **kwargs) as client:
return await client.get(url, headers=headers)
async def post_async(url: str, data: Any = None, json: Any = None,
headers: Optional[Dict[str, str]] = None,
timeout: float = 30.0, **kwargs) -> httpx.Response:
"""通用异步POST请求"""
async with http_client.get_client(timeout=timeout, **kwargs) as client:
return await client.post(url, data=data, json=json, headers=headers)
async def put_async(url: str, data: Any = None, json: Any = None,
headers: Optional[Dict[str, str]] = None,
timeout: float = 30.0, **kwargs) -> httpx.Response:
"""通用异步PUT请求"""
async with http_client.get_client(timeout=timeout, **kwargs) as client:
return await client.put(url, data=data, json=json, headers=headers)
async def delete_async(url: str, headers: Optional[Dict[str, str]] = None,
timeout: float = 30.0, **kwargs) -> httpx.Response:
"""通用异步DELETE请求"""
async with http_client.get_client(timeout=timeout, **kwargs) as client:
return await client.delete(url, headers=headers)
# 错误处理装饰器
def handle_http_errors(func):
"""HTTP错误处理装饰器"""
async def wrapper(*args, **kwargs):
try:
response = await func(*args, **kwargs)
response.raise_for_status()
return response
except httpx.HTTPStatusError as e:
log.error(f"HTTP错误: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
log.error(f"请求错误: {e}")
raise
except Exception as e:
log.error(f"未知错误: {e}")
raise
return wrapper
# 应用错误处理的安全方法
@handle_http_errors
async def safe_get_async(url: str, headers: Optional[Dict[str, str]] = None,
timeout: float = 30.0, **kwargs) -> httpx.Response:
"""安全的异步GET请求(自动错误处理)"""
return await get_async(url, headers=headers, timeout=timeout, **kwargs)
@handle_http_errors
async def safe_post_async(url: str, data: Any = None, json: Any = None,
headers: Optional[Dict[str, str]] = None,
timeout: float = 30.0, **kwargs) -> httpx.Response:
"""安全的异步POST请求(自动错误处理)"""
return await post_async(url, data=data, json=json, headers=headers, timeout=timeout, **kwargs)
@handle_http_errors
async def safe_put_async(url: str, data: Any = None, json: Any = None,
headers: Optional[Dict[str, str]] = None,
timeout: float = 30.0, **kwargs) -> httpx.Response:
"""安全的异步PUT请求(自动错误处理)"""
return await put_async(url, data=data, json=json, headers=headers, timeout=timeout, **kwargs)
@handle_http_errors
async def safe_delete_async(url: str, headers: Optional[Dict[str, str]] = None,
timeout: float = 30.0, **kwargs) -> httpx.Response:
"""安全的异步DELETE请求(自动错误处理)"""
return await delete_async(url, headers=headers, timeout=timeout, **kwargs)
# 流式请求支持
class StreamingContext:
"""流式请求上下文管理器"""
def __init__(self, client: httpx.AsyncClient, stream_context):
self.client = client
self.stream_context = stream_context
self.response = None
async def __aenter__(self):
self.response = await self.stream_context.__aenter__()
return self.response
async def __aexit__(self, exc_type, exc_val, exc_tb):
try:
if self.stream_context:
await self.stream_context.__aexit__(exc_type, exc_val, exc_tb)
finally:
if self.client:
await self.client.aclose()
@asynccontextmanager
async def get_streaming_post_context(url: str, data: Any = None, json: Any = None,
headers: Optional[Dict[str, str]] = None,
timeout: float = None, **kwargs) -> AsyncGenerator[StreamingContext, None]:
"""获取流式POST请求的上下文管理器"""
async with http_client.get_streaming_client(timeout=timeout, **kwargs) as client:
stream_ctx = client.stream("POST", url, data=data, json=json, headers=headers)
streaming_context = StreamingContext(client, stream_ctx)
yield streaming_context
async def create_streaming_client_with_kwargs(**kwargs) -> httpx.AsyncClient:
"""创建用于流式处理的独立客户端实例(手动管理生命周期)"""
client_kwargs = await http_client.get_client_kwargs(timeout=None, **kwargs)
return httpx.AsyncClient(**client_kwargs) |