File size: 6,830 Bytes
621645b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
HTTP请求工具模块
提供统一的HTTP请求处理,包括重试逻辑和错误处理
"""
import asyncio
import time
from typing import Any, Dict, List, Optional, Union
import httpx
import requests
from .logging import get_logger
logger = get_logger("http_utils")
class HTTPClient:
"""统一的HTTP客户端,支持重试和错误处理"""
def __init__(
self,
base_urls: Optional[List[str]] = None,
max_retries: int = 3,
retry_delay: float = 1.0,
timeout: tuple = (5.0, 180.0)
):
"""
初始化HTTP客户端
Args:
base_urls: 基础URL列表,支持多个备用地址
max_retries: 最大重试次数
retry_delay: 重试间隔(秒)
timeout: 请求超时设置 (连接超时, 读取超时)
"""
self.base_urls = base_urls or []
self.max_retries = max_retries
self.retry_delay = retry_delay
self.timeout = timeout
def post_with_fallback(
self,
endpoint: str,
json_data: Optional[Dict[str, Any]] = None,
**kwargs
) -> requests.Response:
"""
发送POST请求,支持多个备用URL
Args:
endpoint: API端点路径
json_data: JSON数据
**kwargs: 其他请求参数
Returns:
requests.Response: 响应对象
Raises:
Exception: 所有URL都失败时抛出异常
"""
last_exc: Optional[Exception] = None
for base_url in self.base_urls:
url = f"{base_url}{endpoint}"
try:
logger.debug(f"发送POST请求到: {url}")
response = requests.post(
url,
json=json_data,
timeout=self.timeout,
**kwargs
)
if response.status_code == 200:
return response
else:
last_exc = Exception(f"HTTP {response.status_code}: {response.text}")
logger.warning(f"请求失败 {url}: {last_exc}")
except Exception as e:
last_exc = e
logger.warning(f"请求异常 {url}: {e}")
continue
if last_exc:
raise last_exc
raise Exception("所有备用URL都不可用")
def post_with_retry(
self,
url: str,
json_data: Optional[Dict[str, Any]] = None,
handle_429: bool = True,
refresh_url: Optional[str] = None,
**kwargs
) -> requests.Response:
"""
发送POST请求,支持重试和429处理
Args:
url: 请求URL
json_data: JSON数据
handle_429: 是否处理429错误(限流)
refresh_url: JWT刷新URL(用于处理429)
**kwargs: 其他请求参数
Returns:
requests.Response: 响应对象
"""
for attempt in range(1, self.max_retries + 1):
try:
logger.debug(f"尝试 {attempt}/{self.max_retries}: POST {url}")
response = requests.post(
url,
json=json_data,
timeout=self.timeout,
**kwargs
)
# 处理429限流错误
if response.status_code == 429 and handle_429 and refresh_url:
logger.warning("收到429限流错误,尝试刷新JWT...")
try:
refresh_resp = requests.post(refresh_url, timeout=10.0)
logger.debug(f"JWT刷新响应: HTTP {refresh_resp.status_code}")
except Exception as e:
logger.warning(f"JWT刷新失败: {e}")
# 重试原请求
response = requests.post(
url,
json=json_data,
timeout=self.timeout,
**kwargs
)
if response.status_code == 200:
return response
logger.warning(f"请求失败: HTTP {response.status_code}")
except Exception as e:
logger.warning(f"请求异常 (尝试 {attempt}/{self.max_retries}): {e}")
if attempt < self.max_retries:
time.sleep(self.retry_delay)
else:
raise
raise Exception(f"请求失败,已重试 {self.max_retries} 次")
async def async_post_with_retry(
self,
url: str,
json_data: Optional[Dict[str, Any]] = None,
**kwargs
) -> httpx.Response:
"""
异步发送POST请求,支持重试
Args:
url: 请求URL
json_data: JSON数据
**kwargs: 其他请求参数
Returns:
httpx.Response: 响应对象
"""
async with httpx.AsyncClient(timeout=self.timeout[1], trust_env=True) as client:
for attempt in range(1, self.max_retries + 1):
try:
logger.debug(f"异步尝试 {attempt}/{self.max_retries}: POST {url}")
response = await client.post(url, json=json_data, **kwargs)
if response.status_code == 200:
return response
logger.warning(f"异步请求失败: HTTP {response.status_code}")
except Exception as e:
logger.warning(f"异步请求异常 (尝试 {attempt}/{self.max_retries}): {e}")
if attempt < self.max_retries:
await asyncio.sleep(self.retry_delay)
else:
raise
raise Exception(f"异步请求失败,已重试 {self.max_retries} 次")
# 创建默认的HTTP客户端实例
def get_http_client(
base_urls: Optional[List[str]] = None,
**kwargs
) -> HTTPClient:
"""
获取HTTP客户端实例
Args:
base_urls: 基础URL列表
**kwargs: 其他HTTPClient参数
Returns:
HTTPClient: HTTP客户端实例
"""
return HTTPClient(base_urls=base_urls, **kwargs) |