File size: 5,299 Bytes
7e55e53 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 | """curl_cffi session builder for reverse-proxy requests."""
import asyncio
from typing import Any
from urllib.parse import urlparse
from curl_cffi.const import CurlOpt
from app.platform.config.snapshot import get_config
from app.platform.errors import UpstreamError
from app.control.proxy.models import ProxyLease
from app.dataplane.proxy.adapters.profile import resolve_proxy_profile
def _skip_proxy_ssl(proxy_url: str) -> bool:
if not proxy_url:
return False
cfg = get_config()
return cfg.get_bool("proxy.egress.skip_ssl_verify", False)
def normalize_proxy_url(url: str) -> str:
"""Normalize SOCKS schemes for consistent DNS-over-proxy behaviour."""
if not url:
return url
scheme = urlparse(url).scheme.lower()
if scheme == "socks":
return "socks5h://" + url[len("socks://") :]
if scheme == "socks5":
return "socks5h://" + url[len("socks5://") :]
if scheme == "socks4":
return "socks4a://" + url[len("socks4://") :]
return url
def build_session_kwargs(
*,
lease: ProxyLease | None = None,
browser_override: str | None = None,
extra: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Build kwargs suitable for ``curl_cffi.requests.AsyncSession``."""
kwargs: dict[str, Any] = dict(extra or {})
# Browser impersonation.
if not kwargs.get("impersonate"):
browser = browser_override or resolve_proxy_profile(lease).browser
if browser:
kwargs["impersonate"] = browser
# Proxy URL.
proxy_url = ""
if lease is not None and lease.proxy_url:
proxy_url = normalize_proxy_url(lease.proxy_url)
scheme = urlparse(proxy_url).scheme.lower()
if scheme.startswith("socks"):
kwargs.setdefault("proxy", proxy_url)
else:
kwargs.setdefault("proxies", {"http": proxy_url, "https": proxy_url})
# curl SSL options for proxy.
if _skip_proxy_ssl(proxy_url):
opts = dict(kwargs.get("curl_options") or {})
opts[CurlOpt.PROXY_SSL_VERIFYPEER] = 0
opts[CurlOpt.PROXY_SSL_VERIFYHOST] = 0
kwargs["curl_options"] = opts
return kwargs
def _wrap_transport_error(exc: BaseException) -> UpstreamError:
if isinstance(exc, UpstreamError):
return exc
body = str(exc).replace("\n", "\\n")[:400]
return UpstreamError(
f"Transport request failed: {exc}",
status=502,
body=body,
)
class ResettableSession:
"""AsyncSession wrapper that resets connection on configurable status codes.
Designed for long-lived hot-path use; session is recreated transparently
when a reset-triggering status code is received.
"""
def __init__(
self,
*,
lease: ProxyLease | None = None,
browser_override: str | None = None,
reset_on_status: set[int] | None = None,
**session_kwargs: Any,
) -> None:
self._kwargs = build_session_kwargs(
lease=lease,
browser_override=browser_override,
extra=session_kwargs or None,
)
if reset_on_status is None:
codes = get_config().get_list("retry.reset_session_status_codes", [403])
reset_on_status = {int(c) for c in codes}
self._reset_on = reset_on_status
self._reset_pending = False
self._lock = asyncio.Lock()
self._session = self._create()
def _create(self):
from curl_cffi.requests import AsyncSession
return AsyncSession(**self._kwargs)
async def _maybe_reset(self) -> None:
if not self._reset_pending:
return
async with self._lock:
if not self._reset_pending:
return
self._reset_pending = False
old, self._session = self._session, self._create()
try:
await old.close()
except Exception:
pass
async def _request(self, method: str, *args: Any, **kwargs: Any):
await self._maybe_reset()
try:
response = await getattr(self._session, method)(*args, **kwargs)
except Exception as exc:
self._reset_pending = True
raise _wrap_transport_error(exc) from exc
if self._reset_on and response.status_code in self._reset_on:
self._reset_pending = True
return response
async def get(self, *args: Any, **kwargs: Any):
return await self._request("get", *args, **kwargs)
async def post(self, *args: Any, **kwargs: Any):
return await self._request("post", *args, **kwargs)
async def delete(self, *args: Any, **kwargs: Any):
return await self._request("delete", *args, **kwargs)
async def close(self) -> None:
if self._session is not None:
try:
await self._session.close()
finally:
self._session = None # type: ignore[assignment]
async def __aenter__(self) -> "ResettableSession":
return self
async def __aexit__(self, *_: Any) -> None:
await self.close()
def __getattr__(self, name: str) -> Any:
return getattr(self._session, name)
__all__ = [
"ResettableSession",
"build_session_kwargs",
"normalize_proxy_url",
]
|