|
|
""" |
|
|
通用的HTTP客户端模块 |
|
|
为所有需要使用httpx的模块提供统一的客户端配置和方法 |
|
|
保持通用性,不与特定业务逻辑耦合 |
|
|
""" |
|
|
|
|
|
from contextlib import asynccontextmanager |
|
|
from typing import Any, AsyncGenerator, Dict, Optional |
|
|
|
|
|
import httpx |
|
|
|
|
|
from config import get_proxy_config |
|
|
from log import log |
|
|
|
|
|
|
|
|
class HttpxClientManager: |
|
|
"""通用HTTP客户端管理器""" |
|
|
|
|
|
async def get_client_kwargs(self, timeout: float = 30.0, **kwargs) -> Dict[str, Any]: |
|
|
"""获取httpx客户端的通用配置参数""" |
|
|
client_kwargs = {"timeout": timeout, **kwargs} |
|
|
|
|
|
|
|
|
current_proxy_config = await get_proxy_config() |
|
|
if current_proxy_config: |
|
|
client_kwargs["proxy"] = current_proxy_config |
|
|
|
|
|
return client_kwargs |
|
|
|
|
|
@asynccontextmanager |
|
|
async def get_client( |
|
|
self, timeout: float = 30.0, **kwargs |
|
|
) -> AsyncGenerator[httpx.AsyncClient, None]: |
|
|
"""获取配置好的异步HTTP客户端""" |
|
|
client_kwargs = await self.get_client_kwargs(timeout=timeout, **kwargs) |
|
|
|
|
|
async with httpx.AsyncClient(**client_kwargs) as client: |
|
|
yield client |
|
|
|
|
|
@asynccontextmanager |
|
|
async def get_streaming_client( |
|
|
self, timeout: float = None, **kwargs |
|
|
) -> AsyncGenerator[httpx.AsyncClient, None]: |
|
|
"""获取用于流式请求的HTTP客户端(无超时限制)""" |
|
|
client_kwargs = await self.get_client_kwargs(timeout=timeout, **kwargs) |
|
|
|
|
|
|
|
|
client = httpx.AsyncClient(**client_kwargs) |
|
|
try: |
|
|
yield client |
|
|
finally: |
|
|
|
|
|
try: |
|
|
await client.aclose() |
|
|
except Exception as e: |
|
|
log.warning(f"Error closing streaming client: {e}") |
|
|
|
|
|
|
|
|
|
|
|
http_client = HttpxClientManager() |
|
|
|
|
|
|
|
|
|
|
|
async def get_async( |
|
|
url: str, headers: Optional[Dict[str, str]] = None, timeout: float = 30.0, **kwargs |
|
|
) -> httpx.Response: |
|
|
"""通用异步GET请求""" |
|
|
async with http_client.get_client(timeout=timeout, **kwargs) as client: |
|
|
return await client.get(url, headers=headers) |
|
|
|
|
|
|
|
|
async def post_async( |
|
|
url: str, |
|
|
data: Any = None, |
|
|
json: Any = None, |
|
|
headers: Optional[Dict[str, str]] = None, |
|
|
timeout: float = 30.0, |
|
|
**kwargs, |
|
|
) -> httpx.Response: |
|
|
"""通用异步POST请求""" |
|
|
async with http_client.get_client(timeout=timeout, **kwargs) as client: |
|
|
return await client.post(url, data=data, json=json, headers=headers) |
|
|
|
|
|
|
|
|
async def stream_post_async( |
|
|
url: str, |
|
|
body: Dict[str, Any], |
|
|
native: bool = False, |
|
|
headers: Optional[Dict[str, str]] = None, |
|
|
**kwargs, |
|
|
): |
|
|
"""流式异步POST请求""" |
|
|
async with http_client.get_streaming_client(**kwargs) as client: |
|
|
async with client.stream("POST", url, json=body, headers=headers) as r: |
|
|
|
|
|
if r.status_code != 200: |
|
|
from fastapi import Response |
|
|
yield Response(await r.aread(), r.status_code, dict(r.headers)) |
|
|
return |
|
|
|
|
|
|
|
|
if native: |
|
|
async for chunk in r.aiter_bytes(): |
|
|
yield chunk |
|
|
else: |
|
|
|
|
|
async for line in r.aiter_lines(): |
|
|
yield line |
|
|
|