hermesinho
Initial import of grok2api with Dockerfile for HF Spaces
bdc2878
"""curl_cffi session builder for reverse-proxy requests."""
import asyncio
from typing import Any
from urllib.parse import urlparse
from curl_cffi.const import CurlOpt
from app.platform.config.snapshot import get_config
from app.platform.errors import UpstreamError
from app.control.proxy.models import ProxyLease
from app.dataplane.proxy.adapters.profile import resolve_proxy_profile
def _skip_proxy_ssl(proxy_url: str) -> bool:
if not proxy_url:
return False
cfg = get_config()
return cfg.get_bool("proxy.egress.skip_ssl_verify", False)
def normalize_proxy_url(url: str) -> str:
"""Normalize SOCKS schemes for consistent DNS-over-proxy behaviour."""
if not url:
return url
scheme = urlparse(url).scheme.lower()
if scheme == "socks":
return "socks5h://" + url[len("socks://") :]
if scheme == "socks5":
return "socks5h://" + url[len("socks5://") :]
if scheme == "socks4":
return "socks4a://" + url[len("socks4://") :]
return url
def build_session_kwargs(
*,
lease: ProxyLease | None = None,
browser_override: str | None = None,
extra: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Build kwargs suitable for ``curl_cffi.requests.AsyncSession``."""
kwargs: dict[str, Any] = dict(extra or {})
# Browser impersonation.
if not kwargs.get("impersonate"):
browser = browser_override or resolve_proxy_profile(lease).browser
if browser:
kwargs["impersonate"] = browser
# Proxy URL.
proxy_url = ""
if lease is not None and lease.proxy_url:
proxy_url = normalize_proxy_url(lease.proxy_url)
scheme = urlparse(proxy_url).scheme.lower()
if scheme.startswith("socks"):
kwargs.setdefault("proxy", proxy_url)
else:
kwargs.setdefault("proxies", {"http": proxy_url, "https": proxy_url})
# curl SSL options for proxy.
if _skip_proxy_ssl(proxy_url):
opts = dict(kwargs.get("curl_options") or {})
opts[CurlOpt.PROXY_SSL_VERIFYPEER] = 0
opts[CurlOpt.PROXY_SSL_VERIFYHOST] = 0
kwargs["curl_options"] = opts
return kwargs
def _wrap_transport_error(exc: BaseException) -> UpstreamError:
if isinstance(exc, UpstreamError):
return exc
body = str(exc).replace("\n", "\\n")[:400]
return UpstreamError(
f"Transport request failed: {exc}",
status=502,
body=body,
)
class ResettableSession:
"""AsyncSession wrapper that resets connection on configurable status codes.
Designed for long-lived hot-path use; session is recreated transparently
when a reset-triggering status code is received.
"""
def __init__(
self,
*,
lease: ProxyLease | None = None,
browser_override: str | None = None,
reset_on_status: set[int] | None = None,
**session_kwargs: Any,
) -> None:
self._kwargs = build_session_kwargs(
lease=lease,
browser_override=browser_override,
extra=session_kwargs or None,
)
if reset_on_status is None:
codes = get_config().get_list("retry.reset_session_status_codes", [403])
reset_on_status = {int(c) for c in codes}
self._reset_on = reset_on_status
self._reset_pending = False
self._lock = asyncio.Lock()
self._session = self._create()
def _create(self):
from curl_cffi.requests import AsyncSession
return AsyncSession(**self._kwargs)
async def _maybe_reset(self) -> None:
if not self._reset_pending:
return
async with self._lock:
if not self._reset_pending:
return
self._reset_pending = False
old, self._session = self._session, self._create()
try:
await old.close()
except Exception:
pass
async def _request(self, method: str, *args: Any, **kwargs: Any):
await self._maybe_reset()
try:
response = await getattr(self._session, method)(*args, **kwargs)
except Exception as exc:
self._reset_pending = True
raise _wrap_transport_error(exc) from exc
if self._reset_on and response.status_code in self._reset_on:
self._reset_pending = True
return response
async def get(self, *args: Any, **kwargs: Any):
return await self._request("get", *args, **kwargs)
async def post(self, *args: Any, **kwargs: Any):
return await self._request("post", *args, **kwargs)
async def delete(self, *args: Any, **kwargs: Any):
return await self._request("delete", *args, **kwargs)
async def close(self) -> None:
if self._session is not None:
try:
await self._session.close()
finally:
self._session = None # type: ignore[assignment]
async def __aenter__(self) -> "ResettableSession":
return self
async def __aexit__(self, *_: Any) -> None:
await self.close()
def __getattr__(self, name: str) -> Any:
return getattr(self._session, name)
__all__ = [
"ResettableSession",
"build_session_kwargs",
"normalize_proxy_url",
]