import json import random import time from dataclasses import dataclass from urllib.parse import urlparse import requests from loguru import logger from xhs_utils.rate_limiter import get_default_rate_limiter @dataclass class RequestResult: ok: bool msg: str status_code: int | None json: dict | None text: str | None elapsed_ms: int | None headers: dict | None class HttpClient: def __init__( self, session: requests.Session | None = None, timeout: tuple[float, float] = (5.0, 20.0), max_retries: int = 2, backoff_base_s: float = 0.6, backoff_max_s: float = 6.0, rate_limiter=None, ): self.session = session or requests.Session() self.timeout = timeout self.max_retries = int(max_retries) self.backoff_base_s = float(backoff_base_s) self.backoff_max_s = float(backoff_max_s) self.rate_limiter = rate_limiter or get_default_rate_limiter() def request_json( self, method: str, url: str, headers: dict | None = None, cookies: dict | None = None, params: dict | None = None, data=None, json_data=None, proxies: dict | None = None, allow_redirects: bool = True, ) -> RequestResult: method = method.upper() domain = urlparse(url).netloc if self.rate_limiter is not None: self.rate_limiter.acquire(domain) last_err = None for attempt in range(self.max_retries + 1): start = time.monotonic() try: resp = self.session.request( method=method, url=url, headers=headers, cookies=cookies, params=params, data=data, json=json_data, proxies=proxies, timeout=self.timeout, allow_redirects=allow_redirects, ) elapsed_ms = int((time.monotonic() - start) * 1000) status = resp.status_code if status in (429, 500, 502, 503, 504) and attempt < self.max_retries: if status == 429 and hasattr(self.rate_limiter, "penalize"): self.rate_limiter.penalize(domain, factor=0.25, ttl_s=60.0) self._sleep_backoff(attempt) continue try: res_json = resp.json() except Exception: res_json = None if res_json is None: text = resp.text if resp is not None else None msg = f"http_status={status}, invalid_json" return RequestResult(ok=False, msg=msg, status_code=status, json=None, text=text, elapsed_ms=elapsed_ms, headers=dict(resp.headers) if resp is not None else None) if status >= 400: msg = f"http_status={status}" if status in (401, 403): msg = f"{msg}, auth_required" if status == 429: msg = f"{msg}, rate_limited" return RequestResult(ok=False, msg=msg, status_code=status, json=res_json, text=None, elapsed_ms=elapsed_ms, headers=dict(resp.headers)) return RequestResult(ok=True, msg="ok", status_code=status, json=res_json, text=None, elapsed_ms=elapsed_ms, headers=dict(resp.headers)) except requests.Timeout as e: last_err = e if attempt < self.max_retries: self._sleep_backoff(attempt) continue return RequestResult(ok=False, msg="timeout", status_code=None, json=None, text=None, elapsed_ms=None, headers=None) except requests.RequestException as e: last_err = e if attempt < self.max_retries: self._sleep_backoff(attempt) continue return RequestResult(ok=False, msg=str(e), status_code=None, json=None, text=None, elapsed_ms=None, headers=None) except Exception as e: last_err = e return RequestResult(ok=False, msg=str(e), status_code=None, json=None, text=None, elapsed_ms=None, headers=None) logger.warning(f"http_request_failed method={method} url={url} err={last_err}") return RequestResult(ok=False, msg=str(last_err) if last_err else "unknown_error", status_code=None, json=None, text=None, elapsed_ms=None, headers=None) def request_text( self, method: str, url: str, headers: dict | None = None, cookies: dict | None = None, params: dict | None = None, data=None, proxies: dict | None = None, allow_redirects: bool = True, ) -> RequestResult: method = method.upper() domain = urlparse(url).netloc if self.rate_limiter is not None: self.rate_limiter.acquire(domain) last_err = None for attempt in range(self.max_retries + 1): start = time.monotonic() try: resp = self.session.request( method=method, url=url, headers=headers, cookies=cookies, params=params, data=data, proxies=proxies, timeout=self.timeout, allow_redirects=allow_redirects, ) elapsed_ms = int((time.monotonic() - start) * 1000) status = resp.status_code if status in (429, 500, 502, 503, 504) and attempt < self.max_retries: if status == 429 and hasattr(self.rate_limiter, "penalize"): self.rate_limiter.penalize(domain, factor=0.25, ttl_s=60.0) self._sleep_backoff(attempt) continue text = resp.text if status >= 400: msg = f"http_status={status}" if status in (401, 403): msg = f"{msg}, auth_required" if status == 429: msg = f"{msg}, rate_limited" if "text/html" in (resp.headers.get("Content-Type", "") or ""): lowered = (text or "").lower() if "captcha" in lowered or "验证" in lowered or "验证码" in lowered: msg = f"{msg}, risk_control_html" return RequestResult(ok=False, msg=msg, status_code=status, json=None, text=text, elapsed_ms=elapsed_ms, headers=dict(resp.headers)) if "text/html" in (resp.headers.get("Content-Type", "") or ""): lowered = (text or "").lower() if "captcha" in lowered or "验证" in lowered or "验证码" in lowered: if hasattr(self.rate_limiter, "penalize"): self.rate_limiter.penalize(domain, factor=0.5, ttl_s=120.0) return RequestResult(ok=False, msg="risk_control_html", status_code=status, json=None, text=text, elapsed_ms=elapsed_ms, headers=dict(resp.headers)) if resp.history: for h in resp.history: loc = h.headers.get("Location", "") if h is not None else "" if "login" in loc or "passport" in loc: return RequestResult(ok=False, msg="redirect_to_login", status_code=status, json=None, text=text, elapsed_ms=elapsed_ms, headers=dict(resp.headers)) return RequestResult(ok=True, msg="ok", status_code=status, json=None, text=text, elapsed_ms=elapsed_ms, headers=dict(resp.headers)) except requests.Timeout as e: last_err = e if attempt < self.max_retries: self._sleep_backoff(attempt) continue return RequestResult(ok=False, msg="timeout", status_code=None, json=None, text=None, elapsed_ms=None, headers=None) except requests.RequestException as e: last_err = e if attempt < self.max_retries: self._sleep_backoff(attempt) continue return RequestResult(ok=False, msg=str(e), status_code=None, json=None, text=None, elapsed_ms=None, headers=None) except Exception as e: last_err = e return RequestResult(ok=False, msg=str(e), status_code=None, json=None, text=None, elapsed_ms=None, headers=None) logger.warning(f"http_request_failed method={method} url={url} err={last_err}") return RequestResult(ok=False, msg=str(last_err) if last_err else "unknown_error", status_code=None, json=None, text=None, elapsed_ms=None, headers=None) def _sleep_backoff(self, attempt: int): exp = min(self.backoff_max_s, self.backoff_base_s * (2**attempt)) jitter = random.uniform(0.0, exp * 0.25) time.sleep(exp + jitter) def json_dumps(data) -> str: return json.dumps(data, separators=(",", ":"), ensure_ascii=False)