| """curl_cffi session builder for reverse-proxy requests.""" |
|
|
| import asyncio |
| from typing import Any |
| from urllib.parse import urlparse |
|
|
| from curl_cffi.const import CurlOpt |
|
|
| from app.platform.config.snapshot import get_config |
| from app.platform.errors import UpstreamError |
| from app.control.proxy.models import ProxyLease |
| from app.dataplane.proxy.adapters.profile import resolve_proxy_profile |
|
|
|
|
| def _skip_proxy_ssl(proxy_url: str) -> bool: |
| if not proxy_url: |
| return False |
| cfg = get_config() |
| return cfg.get_bool("proxy.egress.skip_ssl_verify", False) |
|
|
|
|
| def normalize_proxy_url(url: str) -> str: |
| """Normalize SOCKS schemes for consistent DNS-over-proxy behaviour.""" |
| if not url: |
| return url |
| scheme = urlparse(url).scheme.lower() |
| if scheme == "socks": |
| return "socks5h://" + url[len("socks://") :] |
| if scheme == "socks5": |
| return "socks5h://" + url[len("socks5://") :] |
| if scheme == "socks4": |
| return "socks4a://" + url[len("socks4://") :] |
| return url |
|
|
|
|
| def build_session_kwargs( |
| *, |
| lease: ProxyLease | None = None, |
| browser_override: str | None = None, |
| extra: dict[str, Any] | None = None, |
| ) -> dict[str, Any]: |
| """Build kwargs suitable for ``curl_cffi.requests.AsyncSession``.""" |
| kwargs: dict[str, Any] = dict(extra or {}) |
|
|
| |
| if not kwargs.get("impersonate"): |
| browser = browser_override or resolve_proxy_profile(lease).browser |
| if browser: |
| kwargs["impersonate"] = browser |
|
|
| |
| proxy_url = "" |
| if lease is not None and lease.proxy_url: |
| proxy_url = normalize_proxy_url(lease.proxy_url) |
| scheme = urlparse(proxy_url).scheme.lower() |
| if scheme.startswith("socks"): |
| kwargs.setdefault("proxy", proxy_url) |
| else: |
| kwargs.setdefault("proxies", {"http": proxy_url, "https": proxy_url}) |
|
|
| |
| if _skip_proxy_ssl(proxy_url): |
| opts = dict(kwargs.get("curl_options") or {}) |
| opts[CurlOpt.PROXY_SSL_VERIFYPEER] = 0 |
| opts[CurlOpt.PROXY_SSL_VERIFYHOST] = 0 |
| kwargs["curl_options"] = opts |
|
|
| return kwargs |
|
|
|
|
| def _wrap_transport_error(exc: BaseException) -> UpstreamError: |
| if isinstance(exc, UpstreamError): |
| return exc |
| body = str(exc).replace("\n", "\\n")[:400] |
| return UpstreamError( |
| f"Transport request failed: {exc}", |
| status=502, |
| body=body, |
| ) |
|
|
|
|
| class ResettableSession: |
| """AsyncSession wrapper that resets connection on configurable status codes. |
| |
| Designed for long-lived hot-path use; session is recreated transparently |
| when a reset-triggering status code is received. |
| """ |
|
|
| def __init__( |
| self, |
| *, |
| lease: ProxyLease | None = None, |
| browser_override: str | None = None, |
| reset_on_status: set[int] | None = None, |
| **session_kwargs: Any, |
| ) -> None: |
| self._kwargs = build_session_kwargs( |
| lease=lease, |
| browser_override=browser_override, |
| extra=session_kwargs or None, |
| ) |
| if reset_on_status is None: |
| codes = get_config().get_list("retry.reset_session_status_codes", [403]) |
| reset_on_status = {int(c) for c in codes} |
| self._reset_on = reset_on_status |
| self._reset_pending = False |
| self._lock = asyncio.Lock() |
| self._session = self._create() |
|
|
| def _create(self): |
| from curl_cffi.requests import AsyncSession |
|
|
| return AsyncSession(**self._kwargs) |
|
|
| async def _maybe_reset(self) -> None: |
| if not self._reset_pending: |
| return |
| async with self._lock: |
| if not self._reset_pending: |
| return |
| self._reset_pending = False |
| old, self._session = self._session, self._create() |
| try: |
| await old.close() |
| except Exception: |
| pass |
|
|
| async def _request(self, method: str, *args: Any, **kwargs: Any): |
| await self._maybe_reset() |
| try: |
| response = await getattr(self._session, method)(*args, **kwargs) |
| except Exception as exc: |
| self._reset_pending = True |
| raise _wrap_transport_error(exc) from exc |
| if self._reset_on and response.status_code in self._reset_on: |
| self._reset_pending = True |
| return response |
|
|
| async def get(self, *args: Any, **kwargs: Any): |
| return await self._request("get", *args, **kwargs) |
|
|
| async def post(self, *args: Any, **kwargs: Any): |
| return await self._request("post", *args, **kwargs) |
|
|
| async def delete(self, *args: Any, **kwargs: Any): |
| return await self._request("delete", *args, **kwargs) |
|
|
| async def close(self) -> None: |
| if self._session is not None: |
| try: |
| await self._session.close() |
| finally: |
| self._session = None |
|
|
| async def __aenter__(self) -> "ResettableSession": |
| return self |
|
|
| async def __aexit__(self, *_: Any) -> None: |
| await self.close() |
|
|
| def __getattr__(self, name: str) -> Any: |
| return getattr(self._session, name) |
|
|
|
|
| __all__ = [ |
| "ResettableSession", |
| "build_session_kwargs", |
| "normalize_proxy_url", |
| ] |
|
|