XHS / xhs_utils /http_client.py
Trae Bot
Upload Spider_XHS project
c481f8a
import json
import random
import time
from dataclasses import dataclass
from urllib.parse import urlparse
import requests
from loguru import logger
from xhs_utils.rate_limiter import get_default_rate_limiter
@dataclass
class RequestResult:
ok: bool
msg: str
status_code: int | None
json: dict | None
text: str | None
elapsed_ms: int | None
headers: dict | None
class HttpClient:
def __init__(
self,
session: requests.Session | None = None,
timeout: tuple[float, float] = (5.0, 20.0),
max_retries: int = 2,
backoff_base_s: float = 0.6,
backoff_max_s: float = 6.0,
rate_limiter=None,
):
self.session = session or requests.Session()
self.timeout = timeout
self.max_retries = int(max_retries)
self.backoff_base_s = float(backoff_base_s)
self.backoff_max_s = float(backoff_max_s)
self.rate_limiter = rate_limiter or get_default_rate_limiter()
def request_json(
self,
method: str,
url: str,
headers: dict | None = None,
cookies: dict | None = None,
params: dict | None = None,
data=None,
json_data=None,
proxies: dict | None = None,
allow_redirects: bool = True,
) -> RequestResult:
method = method.upper()
domain = urlparse(url).netloc
if self.rate_limiter is not None:
self.rate_limiter.acquire(domain)
last_err = None
for attempt in range(self.max_retries + 1):
start = time.monotonic()
try:
resp = self.session.request(
method=method,
url=url,
headers=headers,
cookies=cookies,
params=params,
data=data,
json=json_data,
proxies=proxies,
timeout=self.timeout,
allow_redirects=allow_redirects,
)
elapsed_ms = int((time.monotonic() - start) * 1000)
status = resp.status_code
if status in (429, 500, 502, 503, 504) and attempt < self.max_retries:
if status == 429 and hasattr(self.rate_limiter, "penalize"):
self.rate_limiter.penalize(domain, factor=0.25, ttl_s=60.0)
self._sleep_backoff(attempt)
continue
try:
res_json = resp.json()
except Exception:
res_json = None
if res_json is None:
text = resp.text if resp is not None else None
msg = f"http_status={status}, invalid_json"
return RequestResult(ok=False, msg=msg, status_code=status, json=None, text=text, elapsed_ms=elapsed_ms, headers=dict(resp.headers) if resp is not None else None)
if status >= 400:
msg = f"http_status={status}"
if status in (401, 403):
msg = f"{msg}, auth_required"
if status == 429:
msg = f"{msg}, rate_limited"
return RequestResult(ok=False, msg=msg, status_code=status, json=res_json, text=None, elapsed_ms=elapsed_ms, headers=dict(resp.headers))
return RequestResult(ok=True, msg="ok", status_code=status, json=res_json, text=None, elapsed_ms=elapsed_ms, headers=dict(resp.headers))
except requests.Timeout as e:
last_err = e
if attempt < self.max_retries:
self._sleep_backoff(attempt)
continue
return RequestResult(ok=False, msg="timeout", status_code=None, json=None, text=None, elapsed_ms=None, headers=None)
except requests.RequestException as e:
last_err = e
if attempt < self.max_retries:
self._sleep_backoff(attempt)
continue
return RequestResult(ok=False, msg=str(e), status_code=None, json=None, text=None, elapsed_ms=None, headers=None)
except Exception as e:
last_err = e
return RequestResult(ok=False, msg=str(e), status_code=None, json=None, text=None, elapsed_ms=None, headers=None)
logger.warning(f"http_request_failed method={method} url={url} err={last_err}")
return RequestResult(ok=False, msg=str(last_err) if last_err else "unknown_error", status_code=None, json=None, text=None, elapsed_ms=None, headers=None)
def request_text(
self,
method: str,
url: str,
headers: dict | None = None,
cookies: dict | None = None,
params: dict | None = None,
data=None,
proxies: dict | None = None,
allow_redirects: bool = True,
) -> RequestResult:
method = method.upper()
domain = urlparse(url).netloc
if self.rate_limiter is not None:
self.rate_limiter.acquire(domain)
last_err = None
for attempt in range(self.max_retries + 1):
start = time.monotonic()
try:
resp = self.session.request(
method=method,
url=url,
headers=headers,
cookies=cookies,
params=params,
data=data,
proxies=proxies,
timeout=self.timeout,
allow_redirects=allow_redirects,
)
elapsed_ms = int((time.monotonic() - start) * 1000)
status = resp.status_code
if status in (429, 500, 502, 503, 504) and attempt < self.max_retries:
if status == 429 and hasattr(self.rate_limiter, "penalize"):
self.rate_limiter.penalize(domain, factor=0.25, ttl_s=60.0)
self._sleep_backoff(attempt)
continue
text = resp.text
if status >= 400:
msg = f"http_status={status}"
if status in (401, 403):
msg = f"{msg}, auth_required"
if status == 429:
msg = f"{msg}, rate_limited"
if "text/html" in (resp.headers.get("Content-Type", "") or ""):
lowered = (text or "").lower()
if "captcha" in lowered or "验证" in lowered or "验证码" in lowered:
msg = f"{msg}, risk_control_html"
return RequestResult(ok=False, msg=msg, status_code=status, json=None, text=text, elapsed_ms=elapsed_ms, headers=dict(resp.headers))
if "text/html" in (resp.headers.get("Content-Type", "") or ""):
lowered = (text or "").lower()
if "captcha" in lowered or "验证" in lowered or "验证码" in lowered:
if hasattr(self.rate_limiter, "penalize"):
self.rate_limiter.penalize(domain, factor=0.5, ttl_s=120.0)
return RequestResult(ok=False, msg="risk_control_html", status_code=status, json=None, text=text, elapsed_ms=elapsed_ms, headers=dict(resp.headers))
if resp.history:
for h in resp.history:
loc = h.headers.get("Location", "") if h is not None else ""
if "login" in loc or "passport" in loc:
return RequestResult(ok=False, msg="redirect_to_login", status_code=status, json=None, text=text, elapsed_ms=elapsed_ms, headers=dict(resp.headers))
return RequestResult(ok=True, msg="ok", status_code=status, json=None, text=text, elapsed_ms=elapsed_ms, headers=dict(resp.headers))
except requests.Timeout as e:
last_err = e
if attempt < self.max_retries:
self._sleep_backoff(attempt)
continue
return RequestResult(ok=False, msg="timeout", status_code=None, json=None, text=None, elapsed_ms=None, headers=None)
except requests.RequestException as e:
last_err = e
if attempt < self.max_retries:
self._sleep_backoff(attempt)
continue
return RequestResult(ok=False, msg=str(e), status_code=None, json=None, text=None, elapsed_ms=None, headers=None)
except Exception as e:
last_err = e
return RequestResult(ok=False, msg=str(e), status_code=None, json=None, text=None, elapsed_ms=None, headers=None)
logger.warning(f"http_request_failed method={method} url={url} err={last_err}")
return RequestResult(ok=False, msg=str(last_err) if last_err else "unknown_error", status_code=None, json=None, text=None, elapsed_ms=None, headers=None)
def _sleep_backoff(self, attempt: int):
exp = min(self.backoff_max_s, self.backoff_base_s * (2**attempt))
jitter = random.uniform(0.0, exp * 0.25)
time.sleep(exp + jitter)
def json_dumps(data) -> str:
return json.dumps(data, separators=(",", ":"), ensure_ascii=False)