g2a / app /core /proxy_pool.py
skatef's picture
HF Space: 7860 监听修复 + 快照
2e58b54
"""
Proxy pool with sticky selection and failover rotation.
Supports comma-separated proxy URLs in config. Callers keep using the
current proxy until a retry path explicitly rotates to the next one.
"""
import threading
from typing import Optional
from app.core.logger import logger
# ---- internal state ----
_lock = threading.Lock()
_pools: dict[str, list[str]] = {} # key -> parsed list
_indexes: dict[str, int] = {} # key -> current index
_raw_cache: dict[str, str] = {} # key -> last raw config value
_FAILOVER_STATUS_CODES = frozenset({403, 429, 502})
def _parse_proxies(raw: str) -> list[str]:
"""Parse comma-separated proxy URLs, stripping whitespace and empties."""
if not raw:
return []
return [p.strip() for p in raw.split(",") if p.strip()]
def _ensure_pool(config_key: str) -> list[str]:
"""Load and cache the proxy list for *config_key*."""
from app.core.config import config # avoid circular at module level
raw = config.get(config_key, "") or ""
if raw != _raw_cache.get(config_key):
proxies = _parse_proxies(raw)
_pools[config_key] = proxies
_indexes[config_key] = 0
_raw_cache[config_key] = raw
if len(proxies) > 1:
logger.info(
f"ProxyPool: {config_key} loaded {len(proxies)} proxies for failover"
)
return _pools.get(config_key, [])
def get_current_proxy(config_key: str) -> str:
"""Return the current sticky proxy URL for *config_key*."""
with _lock:
pool = _ensure_pool(config_key)
if not pool:
return ""
idx = _indexes.get(config_key, 0) % len(pool)
_indexes[config_key] = idx
return pool[idx]
def get_current_proxy_from(*config_keys: str) -> tuple[Optional[str], str]:
"""Return the first configured sticky proxy from *config_keys*."""
for config_key in config_keys:
proxy = get_current_proxy(config_key)
if proxy:
return config_key, proxy
return None, ""
def rotate_proxy(config_key: str) -> str:
"""Advance *config_key* to the next proxy and return it."""
with _lock:
pool = _ensure_pool(config_key)
if not pool:
return ""
if len(pool) == 1:
return pool[0]
next_idx = (_indexes.get(config_key, 0) + 1) % len(pool)
_indexes[config_key] = next_idx
proxy = pool[next_idx]
logger.warning(
f"ProxyPool: rotate {config_key} to index {next_idx + 1}/{len(pool)}"
)
return proxy
def should_rotate_proxy(status_code: Optional[int]) -> bool:
"""Return whether *status_code* should trigger proxy failover."""
return status_code in _FAILOVER_STATUS_CODES
def build_http_proxies(proxy_url: str) -> Optional[dict[str, str]]:
"""Build curl_cffi-style proxies mapping from a single proxy URL."""
if not proxy_url:
return None
return {"http": proxy_url, "https": proxy_url}
__all__ = [
"build_http_proxies",
"get_current_proxy",
"get_current_proxy_from",
"rotate_proxy",
"should_rotate_proxy",
]