gcli2api / src /httpx_client.py
lightspeed's picture
Upload 22 files
5868187 verified
"""
通用的HTTP客户端模块
为所有需要使用httpx的模块提供统一的客户端配置和方法
保持通用性,不与特定业务逻辑耦合
"""
import httpx
from typing import Optional, Dict, Any, AsyncGenerator
from contextlib import asynccontextmanager
from config import get_proxy_config
from log import log
class HttpxClientManager:
"""通用HTTP客户端管理器"""
async def get_client_kwargs(self, timeout: float = 30.0, **kwargs) -> Dict[str, Any]:
"""获取httpx客户端的通用配置参数"""
client_kwargs = {
"timeout": timeout,
**kwargs
}
# 动态读取代理配置,支持热更新
current_proxy_config = await get_proxy_config()
if current_proxy_config:
client_kwargs["proxy"] = current_proxy_config
return client_kwargs
@asynccontextmanager
async def get_client(self, timeout: float = 30.0, **kwargs) -> AsyncGenerator[httpx.AsyncClient, None]:
"""获取配置好的异步HTTP客户端"""
client_kwargs = await self.get_client_kwargs(timeout=timeout, **kwargs)
async with httpx.AsyncClient(**client_kwargs) as client:
yield client
@asynccontextmanager
async def get_streaming_client(self, timeout: float = None, **kwargs) -> AsyncGenerator[httpx.AsyncClient, None]:
"""获取用于流式请求的HTTP客户端(无超时限制)"""
client_kwargs = await self.get_client_kwargs(timeout=timeout, **kwargs)
# 创建独立的客户端实例用于流式处理
client = httpx.AsyncClient(**client_kwargs)
try:
yield client
finally:
await client.aclose()
# 全局HTTP客户端管理器实例
http_client = HttpxClientManager()
# 通用的异步方法
async def get_async(url: str, headers: Optional[Dict[str, str]] = None,
timeout: float = 30.0, **kwargs) -> httpx.Response:
"""通用异步GET请求"""
async with http_client.get_client(timeout=timeout, **kwargs) as client:
return await client.get(url, headers=headers)
async def post_async(url: str, data: Any = None, json: Any = None,
headers: Optional[Dict[str, str]] = None,
timeout: float = 30.0, **kwargs) -> httpx.Response:
"""通用异步POST请求"""
async with http_client.get_client(timeout=timeout, **kwargs) as client:
return await client.post(url, data=data, json=json, headers=headers)
async def put_async(url: str, data: Any = None, json: Any = None,
headers: Optional[Dict[str, str]] = None,
timeout: float = 30.0, **kwargs) -> httpx.Response:
"""通用异步PUT请求"""
async with http_client.get_client(timeout=timeout, **kwargs) as client:
return await client.put(url, data=data, json=json, headers=headers)
async def delete_async(url: str, headers: Optional[Dict[str, str]] = None,
timeout: float = 30.0, **kwargs) -> httpx.Response:
"""通用异步DELETE请求"""
async with http_client.get_client(timeout=timeout, **kwargs) as client:
return await client.delete(url, headers=headers)
# 错误处理装饰器
def handle_http_errors(func):
"""HTTP错误处理装饰器"""
async def wrapper(*args, **kwargs):
try:
response = await func(*args, **kwargs)
response.raise_for_status()
return response
except httpx.HTTPStatusError as e:
log.error(f"HTTP错误: {e.response.status_code} - {e.response.text}")
raise
except httpx.RequestError as e:
log.error(f"请求错误: {e}")
raise
except Exception as e:
log.error(f"未知错误: {e}")
raise
return wrapper
# 应用错误处理的安全方法
@handle_http_errors
async def safe_get_async(url: str, headers: Optional[Dict[str, str]] = None,
timeout: float = 30.0, **kwargs) -> httpx.Response:
"""安全的异步GET请求(自动错误处理)"""
return await get_async(url, headers=headers, timeout=timeout, **kwargs)
@handle_http_errors
async def safe_post_async(url: str, data: Any = None, json: Any = None,
headers: Optional[Dict[str, str]] = None,
timeout: float = 30.0, **kwargs) -> httpx.Response:
"""安全的异步POST请求(自动错误处理)"""
return await post_async(url, data=data, json=json, headers=headers, timeout=timeout, **kwargs)
@handle_http_errors
async def safe_put_async(url: str, data: Any = None, json: Any = None,
headers: Optional[Dict[str, str]] = None,
timeout: float = 30.0, **kwargs) -> httpx.Response:
"""安全的异步PUT请求(自动错误处理)"""
return await put_async(url, data=data, json=json, headers=headers, timeout=timeout, **kwargs)
@handle_http_errors
async def safe_delete_async(url: str, headers: Optional[Dict[str, str]] = None,
timeout: float = 30.0, **kwargs) -> httpx.Response:
"""安全的异步DELETE请求(自动错误处理)"""
return await delete_async(url, headers=headers, timeout=timeout, **kwargs)
# 流式请求支持
class StreamingContext:
"""流式请求上下文管理器"""
def __init__(self, client: httpx.AsyncClient, stream_context):
self.client = client
self.stream_context = stream_context
self.response = None
async def __aenter__(self):
self.response = await self.stream_context.__aenter__()
return self.response
async def __aexit__(self, exc_type, exc_val, exc_tb):
try:
if self.stream_context:
await self.stream_context.__aexit__(exc_type, exc_val, exc_tb)
finally:
if self.client:
await self.client.aclose()
@asynccontextmanager
async def get_streaming_post_context(url: str, data: Any = None, json: Any = None,
headers: Optional[Dict[str, str]] = None,
timeout: float = None, **kwargs) -> AsyncGenerator[StreamingContext, None]:
"""获取流式POST请求的上下文管理器"""
async with http_client.get_streaming_client(timeout=timeout, **kwargs) as client:
stream_ctx = client.stream("POST", url, data=data, json=json, headers=headers)
streaming_context = StreamingContext(client, stream_ctx)
yield streaming_context
async def create_streaming_client_with_kwargs(**kwargs) -> httpx.AsyncClient:
"""创建用于流式处理的独立客户端实例(手动管理生命周期)"""
client_kwargs = await http_client.get_client_kwargs(timeout=None, **kwargs)
return httpx.AsyncClient(**client_kwargs)