Spaces:
Running
Running
| """curl_cffi session builder for reverse-proxy requests.""" | |
| import asyncio | |
| from typing import Any | |
| from urllib.parse import urlparse | |
| from curl_cffi.const import CurlOpt | |
| from app.platform.config.snapshot import get_config | |
| from app.platform.errors import UpstreamError | |
| from app.control.proxy.models import ProxyLease | |
| from app.dataplane.proxy.adapters.profile import resolve_proxy_profile | |
| def _skip_proxy_ssl(proxy_url: str) -> bool: | |
| if not proxy_url: | |
| return False | |
| cfg = get_config() | |
| return cfg.get_bool("proxy.egress.skip_ssl_verify", False) | |
| def normalize_proxy_url(url: str) -> str: | |
| """Normalize SOCKS schemes for consistent DNS-over-proxy behaviour.""" | |
| if not url: | |
| return url | |
| scheme = urlparse(url).scheme.lower() | |
| if scheme == "socks": | |
| return "socks5h://" + url[len("socks://") :] | |
| if scheme == "socks5": | |
| return "socks5h://" + url[len("socks5://") :] | |
| if scheme == "socks4": | |
| return "socks4a://" + url[len("socks4://") :] | |
| return url | |
| def build_session_kwargs( | |
| *, | |
| lease: ProxyLease | None = None, | |
| browser_override: str | None = None, | |
| extra: dict[str, Any] | None = None, | |
| ) -> dict[str, Any]: | |
| """Build kwargs suitable for ``curl_cffi.requests.AsyncSession``.""" | |
| kwargs: dict[str, Any] = dict(extra or {}) | |
| # Browser impersonation. | |
| if not kwargs.get("impersonate"): | |
| browser = browser_override or resolve_proxy_profile(lease).browser | |
| if browser: | |
| kwargs["impersonate"] = browser | |
| # Proxy URL. | |
| proxy_url = "" | |
| if lease is not None and lease.proxy_url: | |
| proxy_url = normalize_proxy_url(lease.proxy_url) | |
| scheme = urlparse(proxy_url).scheme.lower() | |
| if scheme.startswith("socks"): | |
| kwargs.setdefault("proxy", proxy_url) | |
| else: | |
| kwargs.setdefault("proxies", {"http": proxy_url, "https": proxy_url}) | |
| # curl SSL options for proxy. | |
| if _skip_proxy_ssl(proxy_url): | |
| opts = dict(kwargs.get("curl_options") or {}) | |
| opts[CurlOpt.PROXY_SSL_VERIFYPEER] = 0 | |
| opts[CurlOpt.PROXY_SSL_VERIFYHOST] = 0 | |
| kwargs["curl_options"] = opts | |
| return kwargs | |
| def _wrap_transport_error(exc: BaseException) -> UpstreamError: | |
| if isinstance(exc, UpstreamError): | |
| return exc | |
| body = str(exc).replace("\n", "\\n")[:400] | |
| return UpstreamError( | |
| f"Transport request failed: {exc}", | |
| status=502, | |
| body=body, | |
| ) | |
| class ResettableSession: | |
| """AsyncSession wrapper that resets connection on configurable status codes. | |
| Designed for long-lived hot-path use; session is recreated transparently | |
| when a reset-triggering status code is received. | |
| """ | |
| def __init__( | |
| self, | |
| *, | |
| lease: ProxyLease | None = None, | |
| browser_override: str | None = None, | |
| reset_on_status: set[int] | None = None, | |
| **session_kwargs: Any, | |
| ) -> None: | |
| self._kwargs = build_session_kwargs( | |
| lease=lease, | |
| browser_override=browser_override, | |
| extra=session_kwargs or None, | |
| ) | |
| if reset_on_status is None: | |
| codes = get_config().get_list("retry.reset_session_status_codes", [403]) | |
| reset_on_status = {int(c) for c in codes} | |
| self._reset_on = reset_on_status | |
| self._reset_pending = False | |
| self._lock = asyncio.Lock() | |
| self._session = self._create() | |
| def _create(self): | |
| from curl_cffi.requests import AsyncSession | |
| return AsyncSession(**self._kwargs) | |
| async def _maybe_reset(self) -> None: | |
| if not self._reset_pending: | |
| return | |
| async with self._lock: | |
| if not self._reset_pending: | |
| return | |
| self._reset_pending = False | |
| old, self._session = self._session, self._create() | |
| try: | |
| await old.close() | |
| except Exception: | |
| pass | |
| async def _request(self, method: str, *args: Any, **kwargs: Any): | |
| await self._maybe_reset() | |
| try: | |
| response = await getattr(self._session, method)(*args, **kwargs) | |
| except Exception as exc: | |
| self._reset_pending = True | |
| raise _wrap_transport_error(exc) from exc | |
| if self._reset_on and response.status_code in self._reset_on: | |
| self._reset_pending = True | |
| return response | |
| async def get(self, *args: Any, **kwargs: Any): | |
| return await self._request("get", *args, **kwargs) | |
| async def post(self, *args: Any, **kwargs: Any): | |
| return await self._request("post", *args, **kwargs) | |
| async def delete(self, *args: Any, **kwargs: Any): | |
| return await self._request("delete", *args, **kwargs) | |
| async def close(self) -> None: | |
| if self._session is not None: | |
| try: | |
| await self._session.close() | |
| finally: | |
| self._session = None # type: ignore[assignment] | |
| async def __aenter__(self) -> "ResettableSession": | |
| return self | |
| async def __aexit__(self, *_: Any) -> None: | |
| await self.close() | |
| def __getattr__(self, name: str) -> Any: | |
| return getattr(self._session, name) | |
| __all__ = [ | |
| "ResettableSession", | |
| "build_session_kwargs", | |
| "normalize_proxy_url", | |
| ] | |