File size: 6,925 Bytes
5868187
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
"""
通用的HTTP客户端模块
为所有需要使用httpx的模块提供统一的客户端配置和方法
保持通用性,不与特定业务逻辑耦合
"""
import httpx
from typing import Optional, Dict, Any, AsyncGenerator
from contextlib import asynccontextmanager

from config import get_proxy_config
from log import log


class HttpxClientManager:
    """通用HTTP客户端管理器"""
    
    async def get_client_kwargs(self, timeout: float = 30.0, **kwargs) -> Dict[str, Any]:
        """获取httpx客户端的通用配置参数"""
        client_kwargs = {
            "timeout": timeout,
            **kwargs
        }
        
        # 动态读取代理配置,支持热更新
        current_proxy_config = await get_proxy_config()
        if current_proxy_config:
            client_kwargs["proxy"] = current_proxy_config
        
        return client_kwargs
    
    @asynccontextmanager
    async def get_client(self, timeout: float = 30.0, **kwargs) -> AsyncGenerator[httpx.AsyncClient, None]:
        """获取配置好的异步HTTP客户端"""
        client_kwargs = await self.get_client_kwargs(timeout=timeout, **kwargs)
        
        async with httpx.AsyncClient(**client_kwargs) as client:
            yield client
    
    @asynccontextmanager
    async def get_streaming_client(self, timeout: float = None, **kwargs) -> AsyncGenerator[httpx.AsyncClient, None]:
        """获取用于流式请求的HTTP客户端(无超时限制)"""
        client_kwargs = await self.get_client_kwargs(timeout=timeout, **kwargs)
        
        # 创建独立的客户端实例用于流式处理
        client = httpx.AsyncClient(**client_kwargs)
        try:
            yield client
        finally:
            await client.aclose()


# 全局HTTP客户端管理器实例
http_client = HttpxClientManager()


# 通用的异步方法
async def get_async(url: str, headers: Optional[Dict[str, str]] = None, 
                   timeout: float = 30.0, **kwargs) -> httpx.Response:
    """通用异步GET请求"""
    async with http_client.get_client(timeout=timeout, **kwargs) as client:
        return await client.get(url, headers=headers)


async def post_async(url: str, data: Any = None, json: Any = None,
                    headers: Optional[Dict[str, str]] = None,
                    timeout: float = 30.0, **kwargs) -> httpx.Response:
    """通用异步POST请求"""
    async with http_client.get_client(timeout=timeout, **kwargs) as client:
        return await client.post(url, data=data, json=json, headers=headers)


async def put_async(url: str, data: Any = None, json: Any = None,
                   headers: Optional[Dict[str, str]] = None,
                   timeout: float = 30.0, **kwargs) -> httpx.Response:
    """通用异步PUT请求"""
    async with http_client.get_client(timeout=timeout, **kwargs) as client:
        return await client.put(url, data=data, json=json, headers=headers)


async def delete_async(url: str, headers: Optional[Dict[str, str]] = None,
                      timeout: float = 30.0, **kwargs) -> httpx.Response:
    """通用异步DELETE请求"""
    async with http_client.get_client(timeout=timeout, **kwargs) as client:
        return await client.delete(url, headers=headers)


# 错误处理装饰器
def handle_http_errors(func):
    """HTTP错误处理装饰器"""
    async def wrapper(*args, **kwargs):
        try:
            response = await func(*args, **kwargs)
            response.raise_for_status()
            return response
        except httpx.HTTPStatusError as e:
            log.error(f"HTTP错误: {e.response.status_code} - {e.response.text}")
            raise
        except httpx.RequestError as e:
            log.error(f"请求错误: {e}")
            raise
        except Exception as e:
            log.error(f"未知错误: {e}")
            raise
    return wrapper


# 应用错误处理的安全方法
@handle_http_errors
async def safe_get_async(url: str, headers: Optional[Dict[str, str]] = None,
                        timeout: float = 30.0, **kwargs) -> httpx.Response:
    """安全的异步GET请求(自动错误处理)"""
    return await get_async(url, headers=headers, timeout=timeout, **kwargs)


@handle_http_errors
async def safe_post_async(url: str, data: Any = None, json: Any = None,
                         headers: Optional[Dict[str, str]] = None,
                         timeout: float = 30.0, **kwargs) -> httpx.Response:
    """安全的异步POST请求(自动错误处理)"""
    return await post_async(url, data=data, json=json, headers=headers, timeout=timeout, **kwargs)


@handle_http_errors
async def safe_put_async(url: str, data: Any = None, json: Any = None,
                        headers: Optional[Dict[str, str]] = None,
                        timeout: float = 30.0, **kwargs) -> httpx.Response:
    """安全的异步PUT请求(自动错误处理)"""
    return await put_async(url, data=data, json=json, headers=headers, timeout=timeout, **kwargs)


@handle_http_errors
async def safe_delete_async(url: str, headers: Optional[Dict[str, str]] = None,
                           timeout: float = 30.0, **kwargs) -> httpx.Response:
    """安全的异步DELETE请求(自动错误处理)"""
    return await delete_async(url, headers=headers, timeout=timeout, **kwargs)


# 流式请求支持
class StreamingContext:
    """流式请求上下文管理器"""
    
    def __init__(self, client: httpx.AsyncClient, stream_context):
        self.client = client
        self.stream_context = stream_context
        self.response = None
    
    async def __aenter__(self):
        self.response = await self.stream_context.__aenter__()
        return self.response
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        try:
            if self.stream_context:
                await self.stream_context.__aexit__(exc_type, exc_val, exc_tb)
        finally:
            if self.client:
                await self.client.aclose()


@asynccontextmanager
async def get_streaming_post_context(url: str, data: Any = None, json: Any = None,
                                   headers: Optional[Dict[str, str]] = None,
                                   timeout: float = None, **kwargs) -> AsyncGenerator[StreamingContext, None]:
    """获取流式POST请求的上下文管理器"""
    async with http_client.get_streaming_client(timeout=timeout, **kwargs) as client:
        stream_ctx = client.stream("POST", url, data=data, json=json, headers=headers)
        streaming_context = StreamingContext(client, stream_ctx)
        yield streaming_context


async def create_streaming_client_with_kwargs(**kwargs) -> httpx.AsyncClient:
    """创建用于流式处理的独立客户端实例(手动管理生命周期)"""
    client_kwargs = await http_client.get_client_kwargs(timeout=None, **kwargs)
    return httpx.AsyncClient(**client_kwargs)