chatgpt2api / services /proxy_service.py
tx1538's picture
Upload 179 files
9d7ddb9 verified
Raw
History Blame
2.35 kB
"""Global outbound proxy helpers for upstream ChatGPT and CPA requests."""
from __future__ import annotations
import time
from urllib.parse import urlparse
from curl_cffi.requests import Session
from services.config import config
class ProxySettingsStore:
def build_session_kwargs(self, **session_kwargs) -> dict[str, object]:
proxy = config.get_proxy_settings()
if proxy:
session_kwargs["proxy"] = proxy
return session_kwargs
def build_session_kwargs_for_url(self, url: str, **session_kwargs) -> dict[str, object]:
host = (urlparse(str(url or "")).hostname or "").lower()
if host in {"127.0.0.1", "localhost", "::1", "host.docker.internal"}:
return session_kwargs
return self.build_session_kwargs(**session_kwargs)
def _clean(value: object) -> str:
return str(value or "").strip()
def _is_valid_proxy_url(url: str) -> bool:
parsed = urlparse(url)
return parsed.scheme in {"http", "https", "socks5", "socks5h"} and bool(parsed.netloc)
def test_proxy(url: str, *, timeout: float = 15.0) -> dict:
candidate = _clean(url)
if not candidate:
return {"ok": False, "status": 0, "latency_ms": 0, "error": "proxy url is required"}
if not _is_valid_proxy_url(candidate):
return {"ok": False, "status": 0, "latency_ms": 0, "error": "invalid proxy url"}
session = Session(impersonate="edge101", verify=True, proxy=candidate)
started = time.perf_counter()
try:
response = session.get(
"https://chatgpt.com/api/auth/csrf",
headers={"user-agent": "Mozilla/5.0 (chatgpt2api proxy test)"},
timeout=timeout,
)
latency_ms = int((time.perf_counter() - started) * 1000)
return {
"ok": response.status_code < 500,
"status": int(response.status_code),
"latency_ms": latency_ms,
"error": None if response.status_code < 500 else f"HTTP {response.status_code}",
}
except Exception as exc:
latency_ms = int((time.perf_counter() - started) * 1000)
return {
"ok": False,
"status": 0,
"latency_ms": latency_ms,
"error": str(exc) or exc.__class__.__name__,
}
finally:
session.close()
proxy_settings = ProxySettingsStore()