File size: 3,546 Bytes
69fec20 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 |
"""
通用的HTTP客户端模块
为所有需要使用httpx的模块提供统一的客户端配置和方法
保持通用性,不与特定业务逻辑耦合
"""
from contextlib import asynccontextmanager
from typing import Any, AsyncGenerator, Dict, Optional
import httpx
from config import get_proxy_config
from log import log
class HttpxClientManager:
"""通用HTTP客户端管理器"""
async def get_client_kwargs(self, timeout: float = 30.0, **kwargs) -> Dict[str, Any]:
"""获取httpx客户端的通用配置参数"""
client_kwargs = {"timeout": timeout, **kwargs}
# 动态读取代理配置,支持热更新
current_proxy_config = await get_proxy_config()
if current_proxy_config:
client_kwargs["proxy"] = current_proxy_config
return client_kwargs
@asynccontextmanager
async def get_client(
self, timeout: float = 30.0, **kwargs
) -> AsyncGenerator[httpx.AsyncClient, None]:
"""获取配置好的异步HTTP客户端"""
client_kwargs = await self.get_client_kwargs(timeout=timeout, **kwargs)
async with httpx.AsyncClient(**client_kwargs) as client:
yield client
@asynccontextmanager
async def get_streaming_client(
self, timeout: float = None, **kwargs
) -> AsyncGenerator[httpx.AsyncClient, None]:
"""获取用于流式请求的HTTP客户端(无超时限制)"""
client_kwargs = await self.get_client_kwargs(timeout=timeout, **kwargs)
# 创建独立的客户端实例用于流式处理
client = httpx.AsyncClient(**client_kwargs)
try:
yield client
finally:
# 确保无论发生什么都关闭客户端
try:
await client.aclose()
except Exception as e:
log.warning(f"Error closing streaming client: {e}")
# 全局HTTP客户端管理器实例
http_client = HttpxClientManager()
# 通用的异步方法
async def get_async(
url: str, headers: Optional[Dict[str, str]] = None, timeout: float = 30.0, **kwargs
) -> httpx.Response:
"""通用异步GET请求"""
async with http_client.get_client(timeout=timeout, **kwargs) as client:
return await client.get(url, headers=headers)
async def post_async(
url: str,
data: Any = None,
json: Any = None,
headers: Optional[Dict[str, str]] = None,
timeout: float = 30.0,
**kwargs,
) -> httpx.Response:
"""通用异步POST请求"""
async with http_client.get_client(timeout=timeout, **kwargs) as client:
return await client.post(url, data=data, json=json, headers=headers)
async def stream_post_async(
url: str,
body: Dict[str, Any],
native: bool = False,
headers: Optional[Dict[str, str]] = None,
**kwargs,
):
"""流式异步POST请求"""
async with http_client.get_streaming_client(**kwargs) as client:
async with client.stream("POST", url, json=body, headers=headers) as r:
# 错误直接返回
if r.status_code != 200:
from fastapi import Response
yield Response(await r.aread(), r.status_code, dict(r.headers))
return
# 如果native=True,直接返回bytes流
if native:
async for chunk in r.aiter_bytes():
yield chunk
else:
# 通过aiter_lines转化成str流返回
async for line in r.aiter_lines():
yield line
|