File size: 6,830 Bytes
621645b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""

HTTP请求工具模块

提供统一的HTTP请求处理,包括重试逻辑和错误处理

"""
import asyncio
import time
from typing import Any, Dict, List, Optional, Union
import httpx
import requests
from .logging import get_logger

logger = get_logger("http_utils")

class HTTPClient:
    """统一的HTTP客户端,支持重试和错误处理"""
    
    def __init__(

        self,

        base_urls: Optional[List[str]] = None,

        max_retries: int = 3,

        retry_delay: float = 1.0,

        timeout: tuple = (5.0, 180.0)

    ):
        """

        初始化HTTP客户端

        

        Args:

            base_urls: 基础URL列表,支持多个备用地址

            max_retries: 最大重试次数

            retry_delay: 重试间隔(秒)

            timeout: 请求超时设置 (连接超时, 读取超时)

        """
        self.base_urls = base_urls or []
        self.max_retries = max_retries
        self.retry_delay = retry_delay
        self.timeout = timeout
    
    def post_with_fallback(

        self,

        endpoint: str,

        json_data: Optional[Dict[str, Any]] = None,

        **kwargs

    ) -> requests.Response:
        """

        发送POST请求,支持多个备用URL

        

        Args:

            endpoint: API端点路径

            json_data: JSON数据

            **kwargs: 其他请求参数

        

        Returns:

            requests.Response: 响应对象

        

        Raises:

            Exception: 所有URL都失败时抛出异常

        """
        last_exc: Optional[Exception] = None
        
        for base_url in self.base_urls:
            url = f"{base_url}{endpoint}"
            try:
                logger.debug(f"发送POST请求到: {url}")
                response = requests.post(
                    url,
                    json=json_data,
                    timeout=self.timeout,
                    **kwargs
                )
                if response.status_code == 200:
                    return response
                else:
                    last_exc = Exception(f"HTTP {response.status_code}: {response.text}")
                    logger.warning(f"请求失败 {url}: {last_exc}")
            except Exception as e:
                last_exc = e
                logger.warning(f"请求异常 {url}: {e}")
                continue
        
        if last_exc:
            raise last_exc
        raise Exception("所有备用URL都不可用")
    
    def post_with_retry(

        self,

        url: str,

        json_data: Optional[Dict[str, Any]] = None,

        handle_429: bool = True,

        refresh_url: Optional[str] = None,

        **kwargs

    ) -> requests.Response:
        """

        发送POST请求,支持重试和429处理

        

        Args:

            url: 请求URL

            json_data: JSON数据

            handle_429: 是否处理429错误(限流)

            refresh_url: JWT刷新URL(用于处理429)

            **kwargs: 其他请求参数

        

        Returns:

            requests.Response: 响应对象

        """
        for attempt in range(1, self.max_retries + 1):
            try:
                logger.debug(f"尝试 {attempt}/{self.max_retries}: POST {url}")
                response = requests.post(
                    url,
                    json=json_data,
                    timeout=self.timeout,
                    **kwargs
                )
                
                # 处理429限流错误
                if response.status_code == 429 and handle_429 and refresh_url:
                    logger.warning("收到429限流错误,尝试刷新JWT...")
                    try:
                        refresh_resp = requests.post(refresh_url, timeout=10.0)
                        logger.debug(f"JWT刷新响应: HTTP {refresh_resp.status_code}")
                    except Exception as e:
                        logger.warning(f"JWT刷新失败: {e}")
                    
                    # 重试原请求
                    response = requests.post(
                        url,
                        json=json_data,
                        timeout=self.timeout,
                        **kwargs
                    )
                
                if response.status_code == 200:
                    return response
                
                logger.warning(f"请求失败: HTTP {response.status_code}")
                
            except Exception as e:
                logger.warning(f"请求异常 (尝试 {attempt}/{self.max_retries}): {e}")
                if attempt < self.max_retries:
                    time.sleep(self.retry_delay)
                else:
                    raise
        
        raise Exception(f"请求失败,已重试 {self.max_retries} 次")
    
    async def async_post_with_retry(

        self,

        url: str,

        json_data: Optional[Dict[str, Any]] = None,

        **kwargs

    ) -> httpx.Response:
        """

        异步发送POST请求,支持重试

        

        Args:

            url: 请求URL

            json_data: JSON数据

            **kwargs: 其他请求参数

        

        Returns:

            httpx.Response: 响应对象

        """
        async with httpx.AsyncClient(timeout=self.timeout[1], trust_env=True) as client:
            for attempt in range(1, self.max_retries + 1):
                try:
                    logger.debug(f"异步尝试 {attempt}/{self.max_retries}: POST {url}")
                    response = await client.post(url, json=json_data, **kwargs)
                    
                    if response.status_code == 200:
                        return response
                    
                    logger.warning(f"异步请求失败: HTTP {response.status_code}")
                    
                except Exception as e:
                    logger.warning(f"异步请求异常 (尝试 {attempt}/{self.max_retries}): {e}")
                    if attempt < self.max_retries:
                        await asyncio.sleep(self.retry_delay)
                    else:
                        raise
        
        raise Exception(f"异步请求失败,已重试 {self.max_retries} 次")


# 创建默认的HTTP客户端实例
def get_http_client(

    base_urls: Optional[List[str]] = None,

    **kwargs

) -> HTTPClient:
    """

    获取HTTP客户端实例

    

    Args:

        base_urls: 基础URL列表

        **kwargs: 其他HTTPClient参数

    

    Returns:

        HTTPClient: HTTP客户端实例

    """
    return HTTPClient(base_urls=base_urls, **kwargs)