customs-data / packages /core /http_client.py
3v324v23's picture
feat: 新增智利海关数据接入链路,完善系统框架与文档
89d8756
Raw
History Blame Contribute Delete
5.54 kB
import httpx
import asyncio
from typing import Optional, Dict, Any
from packages.core.logger import app_logger
from packages.core.config import settings
class BaseHttpClient:
"""
基础的网络请求客户端,统一处理:
- 超时控制
- 重试机制
- 代理池切换 (预留)
- 统一的异常捕获与日志记录
"""
def __init__(self, use_proxy: bool = False, max_retries: int = 3, timeout: int = 30):
self.max_retries = max_retries
self.timeout = timeout
self.use_proxy = use_proxy
async def _get_proxy(self) -> Optional[str]:
"""
获取动态代理(预留给未来的代理池接口)
"""
if self.use_proxy and hasattr(settings, "PROXY_POOL_URL") and settings.PROXY_POOL_URL:
try:
# 假设代理池 API 返回格式为: {"proxy": "http://ip:port"} 或者直接是 "ip:port"
async with httpx.AsyncClient(timeout=5) as client:
resp = await client.get(settings.PROXY_POOL_URL)
resp.raise_for_status()
data = resp.json()
if isinstance(data, dict) and "proxy" in data:
proxy = data["proxy"]
if not proxy.startswith("http"):
proxy = f"http://{proxy}"
return proxy
elif isinstance(data, str):
proxy = data.strip()
if not proxy.startswith("http"):
proxy = f"http://{proxy}"
return proxy
except Exception as e:
app_logger.error(f"Failed to fetch proxy from pool: {str(e)}")
return None
def _is_cloudflare_or_captcha(self, response: httpx.Response) -> bool:
"""
识别是否被 Cloudflare 拦截或遇到验证码
"""
if response.status_code in [403, 503]:
text = response.text.lower()
if "cloudflare" in text or "ray id" in text or "cf-ray" in response.headers:
return True
if "captcha" in text or "challenge" in text:
return True
return False
async def request(
self,
method: str,
url: str,
params: Optional[Dict[str, Any]] = None,
json_data: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, str]] = None,
**kwargs
) -> httpx.Response:
attempt = 0
last_exception = None
verify = kwargs.pop("verify", True)
allow_insecure_fallback = kwargs.pop("allow_insecure_fallback", False)
while attempt < self.max_retries:
attempt += 1
proxy = await self._get_proxy()
try:
client_kwargs = {"timeout": self.timeout, "verify": verify}
if proxy:
client_kwargs["proxy"] = proxy
async with httpx.AsyncClient(**client_kwargs) as client:
app_logger.debug(f"Request: {method} {url} | Attempt: {attempt}/{self.max_retries}")
response = await client.request(
method=method,
url=url,
params=params,
json=json_data,
headers=headers,
**kwargs
)
# 检查 HTTP 状态码,如果是 4xx 或 5xx 会抛出异常
response.raise_for_status()
return response
except httpx.HTTPStatusError as e:
app_logger.warning(f"HTTP Error {e.response.status_code} for {url}")
last_exception = e
# 检查是否是被拦截 (Cloudflare / Captcha)
if self._is_cloudflare_or_captcha(e.response):
app_logger.warning(f"Detected Cloudflare or Captcha interception for {url}")
# 遇到验证码或拦截,强制要求更换代理重试
self.use_proxy = True
await asyncio.sleep(3 ** attempt)
continue
# 根据状态码判断是否需要重试 (例如 403, 429, 5xx)
if e.response.status_code in [403, 429, 500, 502, 503, 504]:
await asyncio.sleep(2 ** attempt) # 指数退避
continue
else:
raise e
except httpx.RequestError as e:
app_logger.warning(f"Request Error: {str(e)} for {url}")
last_exception = e
if allow_insecure_fallback and verify and "CERTIFICATE_VERIFY_FAILED" in str(e):
app_logger.warning(f"SSL 证书校验失败,降级为 verify=False 重试: {url}")
verify = False
continue
await asyncio.sleep(2 ** attempt)
continue
app_logger.error(f"Max retries reached for {url}. Last error: {str(last_exception)}")
raise last_exception
async def get(self, url: str, **kwargs) -> httpx.Response:
return await self.request("GET", url, **kwargs)
async def post(self, url: str, **kwargs) -> httpx.Response:
return await self.request("POST", url, **kwargs)
# 全局单例
http_client = BaseHttpClient()