"""Global outbound proxy helpers for upstream ChatGPT and CPA requests.""" from __future__ import annotations import time from urllib.parse import urlparse from curl_cffi.requests import Session from services.config import config class ProxySettingsStore: def build_session_kwargs(self, **session_kwargs) -> dict[str, object]: proxy = config.get_proxy_settings() if proxy: session_kwargs["proxy"] = proxy return session_kwargs def build_session_kwargs_for_url(self, url: str, **session_kwargs) -> dict[str, object]: host = (urlparse(str(url or "")).hostname or "").lower() if host in {"127.0.0.1", "localhost", "::1", "host.docker.internal"}: return session_kwargs return self.build_session_kwargs(**session_kwargs) def _clean(value: object) -> str: return str(value or "").strip() def _is_valid_proxy_url(url: str) -> bool: parsed = urlparse(url) return parsed.scheme in {"http", "https", "socks5", "socks5h"} and bool(parsed.netloc) def test_proxy(url: str, *, timeout: float = 15.0) -> dict: candidate = _clean(url) if not candidate: return {"ok": False, "status": 0, "latency_ms": 0, "error": "proxy url is required"} if not _is_valid_proxy_url(candidate): return {"ok": False, "status": 0, "latency_ms": 0, "error": "invalid proxy url"} session = Session(impersonate="edge101", verify=True, proxy=candidate) started = time.perf_counter() try: response = session.get( "https://chatgpt.com/api/auth/csrf", headers={"user-agent": "Mozilla/5.0 (chatgpt2api proxy test)"}, timeout=timeout, ) latency_ms = int((time.perf_counter() - started) * 1000) return { "ok": response.status_code < 500, "status": int(response.status_code), "latency_ms": latency_ms, "error": None if response.status_code < 500 else f"HTTP {response.status_code}", } except Exception as exc: latency_ms = int((time.perf_counter() - started) * 1000) return { "ok": False, "status": 0, "latency_ms": latency_ms, "error": str(exc) or exc.__class__.__name__, } finally: session.close() proxy_settings = ProxySettingsStore()