Spaces:
Paused
Paused
| """Global outbound proxy helpers for upstream ChatGPT and CPA requests.""" | |
| from __future__ import annotations | |
| import time | |
| from urllib.parse import urlparse | |
| from curl_cffi.requests import Session | |
| from services.config import config | |
| class ProxySettingsStore: | |
| def build_session_kwargs(self, **session_kwargs) -> dict[str, object]: | |
| proxy = config.get_proxy_settings() | |
| if proxy: | |
| session_kwargs["proxy"] = proxy | |
| return session_kwargs | |
| def build_session_kwargs_for_url(self, url: str, **session_kwargs) -> dict[str, object]: | |
| host = (urlparse(str(url or "")).hostname or "").lower() | |
| if host in {"127.0.0.1", "localhost", "::1", "host.docker.internal"}: | |
| return session_kwargs | |
| return self.build_session_kwargs(**session_kwargs) | |
| def _clean(value: object) -> str: | |
| return str(value or "").strip() | |
| def _is_valid_proxy_url(url: str) -> bool: | |
| parsed = urlparse(url) | |
| return parsed.scheme in {"http", "https", "socks5", "socks5h"} and bool(parsed.netloc) | |
| def test_proxy(url: str, *, timeout: float = 15.0) -> dict: | |
| candidate = _clean(url) | |
| if not candidate: | |
| return {"ok": False, "status": 0, "latency_ms": 0, "error": "proxy url is required"} | |
| if not _is_valid_proxy_url(candidate): | |
| return {"ok": False, "status": 0, "latency_ms": 0, "error": "invalid proxy url"} | |
| session = Session(impersonate="edge101", verify=True, proxy=candidate) | |
| started = time.perf_counter() | |
| try: | |
| response = session.get( | |
| "https://chatgpt.com/api/auth/csrf", | |
| headers={"user-agent": "Mozilla/5.0 (chatgpt2api proxy test)"}, | |
| timeout=timeout, | |
| ) | |
| latency_ms = int((time.perf_counter() - started) * 1000) | |
| return { | |
| "ok": response.status_code < 500, | |
| "status": int(response.status_code), | |
| "latency_ms": latency_ms, | |
| "error": None if response.status_code < 500 else f"HTTP {response.status_code}", | |
| } | |
| except Exception as exc: | |
| latency_ms = int((time.perf_counter() - started) * 1000) | |
| return { | |
| "ok": False, | |
| "status": 0, | |
| "latency_ms": latency_ms, | |
| "error": str(exc) or exc.__class__.__name__, | |
| } | |
| finally: | |
| session.close() | |
| proxy_settings = ProxySettingsStore() | |