Spaces:
Paused
Paused
| """ | |
| Proxy pool with sticky selection and failover rotation. | |
| Supports comma-separated proxy URLs in config. Callers keep using the | |
| current proxy until a retry path explicitly rotates to the next one. | |
| """ | |
| import threading | |
| from typing import Optional | |
| from app.core.logger import logger | |
| # ---- internal state ---- | |
| _lock = threading.Lock() | |
| _pools: dict[str, list[str]] = {} # key -> parsed list | |
| _indexes: dict[str, int] = {} # key -> current index | |
| _raw_cache: dict[str, str] = {} # key -> last raw config value | |
| _FAILOVER_STATUS_CODES = frozenset({403, 429, 502}) | |
| def _parse_proxies(raw: str) -> list[str]: | |
| """Parse comma-separated proxy URLs, stripping whitespace and empties.""" | |
| if not raw: | |
| return [] | |
| return [p.strip() for p in raw.split(",") if p.strip()] | |
| def _ensure_pool(config_key: str) -> list[str]: | |
| """Load and cache the proxy list for *config_key*.""" | |
| from app.core.config import config # avoid circular at module level | |
| raw = config.get(config_key, "") or "" | |
| if raw != _raw_cache.get(config_key): | |
| proxies = _parse_proxies(raw) | |
| _pools[config_key] = proxies | |
| _indexes[config_key] = 0 | |
| _raw_cache[config_key] = raw | |
| if len(proxies) > 1: | |
| logger.info( | |
| f"ProxyPool: {config_key} loaded {len(proxies)} proxies for failover" | |
| ) | |
| return _pools.get(config_key, []) | |
| def get_current_proxy(config_key: str) -> str: | |
| """Return the current sticky proxy URL for *config_key*.""" | |
| with _lock: | |
| pool = _ensure_pool(config_key) | |
| if not pool: | |
| return "" | |
| idx = _indexes.get(config_key, 0) % len(pool) | |
| _indexes[config_key] = idx | |
| return pool[idx] | |
| def get_current_proxy_from(*config_keys: str) -> tuple[Optional[str], str]: | |
| """Return the first configured sticky proxy from *config_keys*.""" | |
| for config_key in config_keys: | |
| proxy = get_current_proxy(config_key) | |
| if proxy: | |
| return config_key, proxy | |
| return None, "" | |
| def rotate_proxy(config_key: str) -> str: | |
| """Advance *config_key* to the next proxy and return it.""" | |
| with _lock: | |
| pool = _ensure_pool(config_key) | |
| if not pool: | |
| return "" | |
| if len(pool) == 1: | |
| return pool[0] | |
| next_idx = (_indexes.get(config_key, 0) + 1) % len(pool) | |
| _indexes[config_key] = next_idx | |
| proxy = pool[next_idx] | |
| logger.warning( | |
| f"ProxyPool: rotate {config_key} to index {next_idx + 1}/{len(pool)}" | |
| ) | |
| return proxy | |
| def should_rotate_proxy(status_code: Optional[int]) -> bool: | |
| """Return whether *status_code* should trigger proxy failover.""" | |
| return status_code in _FAILOVER_STATUS_CODES | |
| def build_http_proxies(proxy_url: str) -> Optional[dict[str, str]]: | |
| """Build curl_cffi-style proxies mapping from a single proxy URL.""" | |
| if not proxy_url: | |
| return None | |
| return {"http": proxy_url, "https": proxy_url} | |
| __all__ = [ | |
| "build_http_proxies", | |
| "get_current_proxy", | |
| "get_current_proxy_from", | |
| "rotate_proxy", | |
| "should_rotate_proxy", | |
| ] | |