| """ProxyDirectory — control-plane proxy pool coordinator. |
| |
| Maintains the list of EgressNodes and ClearanceBundles. |
| Selection delegates to the dataplane ProxyTable; this module owns |
| configuration loading and clearance refresh lifecycle. |
| """ |
|
|
| import asyncio |
| from urllib.parse import urlparse |
|
|
| from app.platform.logging.logger import logger |
| from app.platform.config.snapshot import get_config |
| from app.platform.runtime.clock import now_ms |
| from app.platform.runtime.ids import next_hex |
| from .config import resolve_clearance_config |
| from .models import ( |
| EgressMode, |
| ClearanceMode, |
| EgressNode, |
| ClearanceBundle, |
| ProxyLease, |
| ProxyFeedback, |
| ProxyFeedbackKind, |
| RequestKind, |
| ProxyScope, |
| ) |
| from .providers.manual import ManualClearanceProvider |
| from .providers.flaresolverr import FlareSolverrClearanceProvider |
|
|
| _DEFAULT_CLEARANCE_ORIGIN = "https://grok.com" |
| BundleKey = tuple[str, str] |
|
|
|
|
| def _clearance_host(clearance_origin: str | None) -> str: |
| host = urlparse(clearance_origin or _DEFAULT_CLEARANCE_ORIGIN).hostname |
| return (host or "grok.com").lower() |
|
|
|
|
| class ProxyDirectory: |
| """Owns egress nodes and clearance bundles. |
| |
| Thread-safety: all mutations are protected by ``_lock``. |
| """ |
|
|
| def __init__(self) -> None: |
| self._nodes: list[EgressNode] = [] |
| self._resource_nodes: list[EgressNode] = [] |
| self._bundles: dict[BundleKey, ClearanceBundle] = {} |
| self._lock = asyncio.Lock() |
| |
| |
| self._refresh_events: dict[BundleKey, asyncio.Event] = {} |
| self._manual = ManualClearanceProvider() |
| self._flare = FlareSolverrClearanceProvider() |
| self._egress_mode: EgressMode = EgressMode.DIRECT |
| self._clearance_mode: ClearanceMode = ClearanceMode.NONE |
| self._config_sig: tuple | None = None |
| |
| |
| self._pool_cursor: int = 0 |
|
|
| |
| |
| |
|
|
| async def load(self) -> None: |
| """Load proxy configuration from the current config snapshot.""" |
| cfg = get_config() |
| egress_mode = EgressMode(cfg.get_str("proxy.egress.mode", "direct")) |
| clearance_mode = ClearanceMode.parse( |
| cfg.get_str("proxy.clearance.mode", "none") |
| ) |
| base_url = cfg.get_str("proxy.egress.proxy_url", "") |
| res_url = cfg.get_str("proxy.egress.resource_proxy_url", "") |
| base_pool = tuple(cfg.get_list("proxy.egress.proxy_pool", [])) |
| res_pool = tuple(cfg.get_list("proxy.egress.resource_proxy_pool", [])) |
| clearance = resolve_clearance_config(cfg) |
| config_sig = ( |
| egress_mode.value, |
| clearance_mode.value, |
| base_url, |
| res_url, |
| base_pool, |
| res_pool, |
| cfg.get_str("proxy.clearance.flaresolverr_url", ""), |
| clearance.cf_cookies, |
| clearance.user_agent, |
| clearance.cf_clearance, |
| clearance.browser, |
| cfg.get_int("proxy.clearance.timeout_sec", 60), |
| ) |
|
|
| nodes: list[EgressNode] = [] |
| resource_nodes: list[EgressNode] = [] |
|
|
| if egress_mode == EgressMode.SINGLE_PROXY: |
| if base_url: |
| nodes.append(EgressNode(node_id="single", proxy_url=base_url)) |
| if res_url: |
| resource_nodes.append( |
| EgressNode(node_id="res-single", proxy_url=res_url) |
| ) |
|
|
| elif egress_mode == EgressMode.PROXY_POOL: |
| for i, url in enumerate(base_pool): |
| nodes.append(EgressNode(node_id=f"pool-{i}", proxy_url=url)) |
| for i, url in enumerate(res_pool): |
| resource_nodes.append( |
| EgressNode(node_id=f"res-pool-{i}", proxy_url=url) |
| ) |
|
|
| valid_affinities = {n.proxy_url or "direct" for n in [*nodes, *resource_nodes]} |
| if not valid_affinities: |
| valid_affinities = {"direct"} |
|
|
| async with self._lock: |
| if self._config_sig == config_sig: |
| return |
| from .models import ClearanceBundleState |
|
|
| self._egress_mode = egress_mode |
| self._clearance_mode = clearance_mode |
| self._nodes = nodes |
| self._resource_nodes = resource_nodes |
| self._pool_cursor = 0 |
| self._bundles = { |
| key: bundle.model_copy(update={"state": ClearanceBundleState.INVALID}) |
| for key, bundle in self._bundles.items() |
| if key[0] in valid_affinities |
| } |
| self._refresh_events = { |
| key: event |
| for key, event in self._refresh_events.items() |
| if key[0] in valid_affinities |
| } |
| self._config_sig = config_sig |
|
|
| logger.info( |
| "proxy directory loaded: egress_mode={} clearance_mode={} node_count={} resource_node_count={}", |
| egress_mode, |
| clearance_mode, |
| len(nodes), |
| len(resource_nodes), |
| ) |
|
|
| |
| |
| |
|
|
| async def acquire( |
| self, |
| *, |
| scope: ProxyScope = ProxyScope.APP, |
| kind: RequestKind = RequestKind.HTTP, |
| resource: bool = False, |
| clearance_origin: str | None = None, |
| ) -> ProxyLease: |
| """Return a ProxyLease for the next request. |
| |
| For DIRECT mode, returns a lease with no proxy or clearance. |
| """ |
| proxy_url = await self._pick_proxy_url(resource=resource) |
| affinity = proxy_url or "direct" |
| clearance_host = _clearance_host(clearance_origin) |
|
|
| bundle = await self._get_or_build_bundle( |
| affinity_key=affinity, |
| proxy_url=proxy_url or "", |
| clearance_origin=clearance_origin or _DEFAULT_CLEARANCE_ORIGIN, |
| ) |
|
|
| return ProxyLease( |
| lease_id=next_hex(), |
| proxy_url=proxy_url, |
| cf_cookies=bundle.cf_cookies if bundle else "", |
| user_agent=bundle.user_agent if bundle else "", |
| clearance_host=clearance_host, |
| scope=scope, |
| kind=kind, |
| acquired_at=now_ms(), |
| ) |
|
|
| async def feedback(self, lease: ProxyLease, result: ProxyFeedback) -> None: |
| """Apply upstream feedback to the appropriate egress node.""" |
| if result.kind in ( |
| ProxyFeedbackKind.CHALLENGE, |
| ProxyFeedbackKind.UNAUTHORIZED, |
| ): |
| |
| key = (lease.proxy_url or "direct", lease.clearance_host) |
| async with self._lock: |
| from .models import ClearanceBundleState |
|
|
| bundle = self._bundles.get(key) |
| if bundle: |
| self._bundles[key] = bundle.model_copy( |
| update={"state": ClearanceBundleState.INVALID} |
| ) |
|
|
| |
| |
| |
| if ( |
| self._egress_mode == EgressMode.PROXY_POOL |
| and lease.proxy_url |
| and result.kind |
| in ( |
| ProxyFeedbackKind.CHALLENGE, |
| ProxyFeedbackKind.UNAUTHORIZED, |
| ProxyFeedbackKind.FORBIDDEN, |
| ProxyFeedbackKind.TRANSPORT_ERROR, |
| ) |
| ): |
| async with self._lock: |
| self._pool_cursor += 1 |
| logger.debug( |
| "proxy pool cursor advanced: proxy={} kind={} cursor={}", |
| lease.proxy_url, |
| result.kind, |
| self._pool_cursor, |
| ) |
|
|
| |
| |
| |
|
|
| async def _pick_proxy_url(self, resource: bool = False) -> str | None: |
| if self._egress_mode == EgressMode.DIRECT: |
| return None |
| async with self._lock: |
| |
| nodes = ( |
| self._resource_nodes |
| if resource and self._resource_nodes |
| else self._nodes |
| ) |
| if not nodes: |
| return None |
| if self._egress_mode == EgressMode.SINGLE_PROXY: |
| return nodes[0].proxy_url |
| |
| idx = self._pool_cursor % len(nodes) |
| return nodes[idx].proxy_url |
|
|
| async def _get_or_build_bundle( |
| self, |
| *, |
| affinity_key: str, |
| proxy_url: str, |
| clearance_origin: str, |
| ) -> ClearanceBundle | None: |
| if self._clearance_mode == ClearanceMode.NONE: |
| return None |
| clearance_host = _clearance_host(clearance_origin) |
| key: BundleKey = (affinity_key, clearance_host) |
|
|
| |
| |
| while True: |
| async with self._lock: |
| bundle = self._bundles.get(key) |
| if bundle and bundle.state.value == 0: |
| return bundle |
| event = self._refresh_events.get(key) |
| if event is None: |
| |
| event = asyncio.Event() |
| self._refresh_events[key] = event |
| break |
| |
| await event.wait() |
|
|
| try: |
| if self._clearance_mode == ClearanceMode.MANUAL: |
| bundle = self._manual.build_bundle( |
| affinity_key=affinity_key, |
| clearance_host=clearance_host, |
| ) |
| else: |
| bundle = await self._flare.refresh_bundle( |
| affinity_key=affinity_key, |
| proxy_url=proxy_url, |
| target_url=clearance_origin, |
| ) |
| if bundle: |
| async with self._lock: |
| self._bundles[key] = bundle |
| return bundle |
| finally: |
| async with self._lock: |
| self._refresh_events.pop(key, None) |
| event.set() |
|
|
| |
| |
| |
|
|
| async def invalidate_clearance(self) -> None: |
| """Mark all cached clearance bundles as invalid. |
| |
| The next ``acquire()`` call for each affinity key will trigger a fresh |
| FlareSolverr fetch (serialised by the single-flight guard). |
| """ |
| from .models import ClearanceBundleState |
|
|
| async with self._lock: |
| self._bundles = { |
| k: b.model_copy(update={"state": ClearanceBundleState.INVALID}) |
| for k, b in self._bundles.items() |
| } |
| logger.debug("clearance bundles invalidated: count={}", len(self._bundles)) |
|
|
| async def warm_up(self) -> None: |
| """Pre-fetch clearance bundles for all configured affinity keys. |
| |
| Called once at startup so the first real request does not have to wait |
| for FlareSolverr. Does NOT invalidate existing bundles first. |
| """ |
| if self._clearance_mode == ClearanceMode.NONE: |
| return |
| async with self._lock: |
| nodes = list(self._nodes) |
| affinity_keys = ( |
| [(n.proxy_url or "direct", n.proxy_url or "") for n in nodes] |
| if nodes |
| else [("direct", "")] |
| ) |
| for affinity, proxy_url in affinity_keys: |
| await self._get_or_build_bundle( |
| affinity_key=affinity, |
| proxy_url=proxy_url, |
| clearance_origin=_DEFAULT_CLEARANCE_ORIGIN, |
| ) |
|
|
| async def refresh_clearance_safe(self) -> None: |
| """Scheduled clearance refresh: build new bundles then swap atomically. |
| |
| Unlike ``invalidate_clearance() + warm_up()``, this never discards a |
| working bundle before a replacement is ready. If FlareSolverr is |
| temporarily unavailable the old bundle remains valid and continues to |
| serve requests. |
| """ |
| if self._clearance_mode == ClearanceMode.NONE: |
| return |
| async with self._lock: |
| nodes = list(self._nodes) |
| existing = list(self._bundles.keys()) |
|
|
| refresh_targets: dict[BundleKey, tuple[str, str]] = {} |
| default_items = ( |
| [(n.proxy_url or "direct", n.proxy_url or "") for n in nodes] |
| if nodes |
| else [("direct", "")] |
| ) |
| for affinity, proxy_url in default_items: |
| key: BundleKey = (affinity, _clearance_host(_DEFAULT_CLEARANCE_ORIGIN)) |
| refresh_targets[key] = (proxy_url, _DEFAULT_CLEARANCE_ORIGIN) |
| for key in existing: |
| affinity, clearance_host = key |
| refresh_targets.setdefault( |
| key, |
| ("" if affinity == "direct" else affinity, f"https://{clearance_host}"), |
| ) |
|
|
| for key, (proxy_url, clearance_origin) in refresh_targets.items(): |
| affinity, clearance_host = key |
| if self._clearance_mode == ClearanceMode.MANUAL: |
| new_bundle = self._manual.build_bundle( |
| affinity_key=affinity, |
| clearance_host=clearance_host, |
| ) |
| else: |
| new_bundle = await self._flare.refresh_bundle( |
| affinity_key=affinity, |
| proxy_url=proxy_url, |
| target_url=clearance_origin, |
| ) |
| if new_bundle: |
| async with self._lock: |
| self._bundles[key] = new_bundle |
| logger.debug("clearance bundle refreshed: bundle={}", key) |
| else: |
| logger.warning( |
| "clearance refresh failed, keeping old bundle: bundle={}", |
| key, |
| ) |
|
|
| |
| |
| |
|
|
| @property |
| def egress_mode(self) -> EgressMode: |
| return self._egress_mode |
|
|
| @property |
| def clearance_mode(self) -> ClearanceMode: |
| return self._clearance_mode |
|
|
| @property |
| def node_count(self) -> int: |
| return len(self._nodes) |
|
|
| @property |
| def nodes(self) -> list[EgressNode]: |
| """Read-only snapshot of the current egress node list.""" |
| return list(self._nodes) |
|
|
| @property |
| def bundles(self) -> dict[BundleKey, ClearanceBundle]: |
| """Read-only snapshot of the current clearance bundles.""" |
| return dict(self._bundles) |
|
|
|
|
| |
| |
| |
|
|
| _directory: ProxyDirectory | None = None |
|
|
|
|
| async def get_proxy_directory() -> ProxyDirectory: |
| """Return the module-level ProxyDirectory, reloading config if it changed.""" |
| global _directory |
| if _directory is None: |
| _directory = ProxyDirectory() |
| await _directory.load() |
| return _directory |
|
|
|
|
| __all__ = ["ProxyDirectory", "get_proxy_directory"] |
|
|