| import asyncio |
| import logging |
| import os |
| import random |
| import threading |
| import time |
| from collections import Counter, deque |
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Any, Dict, List, Optional |
|
|
| import httpx |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| @dataclass |
| class ProxyRecord: |
| proxy: str |
| score: float |
| success_count: int = 0 |
| fail_count: int = 0 |
| last_ok_at: float | None = None |
| last_fail_at: float | None = None |
| last_fail_reason: str | None = None |
|
|
|
|
| class ProxyProvider: |
| name: str |
|
|
| async def fetch(self, *, client: httpx.AsyncClient) -> List[str]: |
| raise NotImplementedError() |
|
|
|
|
| class HttpProxyProvider(ProxyProvider): |
| def __init__(self, urls: List[str]): |
| self.urls = [str(u).strip() for u in (urls or []) if str(u).strip() != ""] |
| self.name = "http" |
|
|
| @staticmethod |
| def _parse_response(resp: httpx.Response) -> List[str]: |
| proxies_fetched: List[str] = [] |
| try: |
| data = resp.json() |
| if isinstance(data, list): |
| proxies_fetched = [str(p) for p in data] |
| elif isinstance(data, dict) and "data" in data: |
| raw = data.get("data") |
| if isinstance(raw, list): |
| proxies_fetched = [str(p) for p in raw] |
| else: |
| proxies_fetched = [str(raw)] |
| except Exception: |
| proxies_fetched = [line.strip() for line in str(resp.text).split("\n") if line.strip()] |
| return [p for p in proxies_fetched if str(p).strip() != ""] |
|
|
| async def _fetch_one(self, *, client: httpx.AsyncClient, url: str) -> List[str]: |
| u = str(url).strip() |
| if u == "": |
| return [] |
| try: |
| resp = await client.get(u) |
| resp.raise_for_status() |
| return self._parse_response(resp) |
| except Exception as e: |
| logger.warning(f"ProxyPool provider http failed url={u} err={e}") |
| return [] |
|
|
| async def fetch(self, *, client: httpx.AsyncClient) -> List[str]: |
| if not self.urls: |
| return [] |
| results = await asyncio.gather(*[self._fetch_one(client=client, url=u) for u in self.urls]) |
| merged: List[str] = [] |
| for items in results: |
| merged.extend(items or []) |
| return merged |
|
|
|
|
| class FileProxyProvider(ProxyProvider): |
| def __init__(self, path: str): |
| self.path = Path(str(path)).expanduser() |
| self.name = "file" |
|
|
| def _read_lines(self) -> List[str]: |
| if not self.path.exists() or not self.path.is_file(): |
| return [] |
| text = self.path.read_text(encoding="utf-8", errors="ignore") |
| return [line.strip() for line in text.splitlines() if line.strip()] |
|
|
| async def fetch(self, *, client: httpx.AsyncClient) -> List[str]: |
| try: |
| return await asyncio.to_thread(self._read_lines) |
| except Exception as e: |
| logger.warning(f"ProxyPool provider file failed path={self.path} err={e}") |
| return [] |
|
|
|
|
| class ProxyPool: |
| _instance = None |
|
|
| def __new__(cls): |
| if cls._instance is None: |
| cls._instance = super(ProxyPool, cls).__new__(cls) |
| cls._instance._initialized = False |
| return cls._instance |
|
|
| def __init__(self): |
| if self._initialized: |
| return |
| self._initialized = True |
| self.proxies: List[str] = [] |
| self._records: Dict[str, ProxyRecord] = {} |
| self.api_url = os.getenv("PROXY_API_URL") |
| self.verify_url = os.getenv("PROXY_VERIFY_URL", "https://www.xiaohongshu.com/favicon.ico") |
| self.fetch_interval = int(os.getenv("PROXY_FETCH_INTERVAL", "60")) |
| self._task: Optional[asyncio.Task] = None |
| self._lock = threading.RLock() |
| self.initial_score = float(os.getenv("PROXY_POOL_INITIAL_SCORE", "3.0")) |
| self.max_score = float(os.getenv("PROXY_POOL_MAX_SCORE", "10.0")) |
| self.min_score = float(os.getenv("PROXY_POOL_MIN_SCORE", "-5.0")) |
| self.success_reward = float(os.getenv("PROXY_POOL_SUCCESS_REWARD", "1.0")) |
| self.failure_penalty = float(os.getenv("PROXY_POOL_FAILURE_PENALTY", "1.5")) |
| self.eject_fail_count = int(os.getenv("PROXY_POOL_EJECT_FAIL_COUNT", "3")) |
| self.eject_score = float(os.getenv("PROXY_POOL_EJECT_SCORE", "0.0")) |
| self._failures_total: Counter[str] = Counter() |
| self._recent_failures: deque[tuple[float, str]] = deque(maxlen=int(os.getenv("PROXY_POOL_RECENT_MAXLEN", "2000"))) |
| self._recent_window_seconds = int(os.getenv("PROXY_POOL_RECENT_WINDOW_SECONDS", "900")) |
| self._ejected_total = 0 |
|
|
| async def start(self): |
| if self._task is None: |
| logger.info(f"Starting ProxyPool, API URL: {self.api_url}") |
| self._task = asyncio.create_task(self._run()) |
|
|
| async def stop(self): |
| if self._task: |
| self._task.cancel() |
| try: |
| await self._task |
| except asyncio.CancelledError: |
| pass |
| self._task = None |
| logger.info("ProxyPool stopped") |
|
|
| async def _run(self): |
| while True: |
| try: |
| await self._fetch_and_verify() |
| except asyncio.CancelledError: |
| break |
| except Exception as e: |
| logger.error(f"Error in proxy pool run loop: {e}") |
| |
| try: |
| await asyncio.sleep(self.fetch_interval) |
| except asyncio.CancelledError: |
| break |
|
|
| @staticmethod |
| def _split_env_list(value: str | None) -> List[str]: |
| raw = str(value or "").strip() |
| if raw == "": |
| return [] |
| if "," in raw: |
| return [t.strip() for t in raw.split(",") if t.strip()] |
| return [t.strip() for t in raw.split() if t.strip()] |
|
|
| def _build_providers(self) -> List[ProxyProvider]: |
| urls = self._split_env_list(os.getenv("PROXY_API_URLS")) |
| if not urls: |
| legacy = str(os.getenv("PROXY_API_URL") or "").strip() |
| if legacy != "": |
| urls = [legacy] |
|
|
| providers: List[ProxyProvider] = [] |
| if urls: |
| providers.append(HttpProxyProvider(urls)) |
|
|
| file_path = str(os.getenv("PROXY_FILE_PATH") or "").strip() |
| if file_path != "": |
| providers.append(FileProxyProvider(file_path)) |
|
|
| return providers |
|
|
| @staticmethod |
| def _classify_verify_error(err: Exception) -> str: |
| if isinstance(err, httpx.TimeoutException): |
| return "timeout" |
| if isinstance(err, httpx.ProxyError): |
| return "proxy_error" |
| if isinstance(err, httpx.ConnectError): |
| return "connect_error" |
| if isinstance(err, httpx.RemoteProtocolError): |
| return "protocol_error" |
| if isinstance(err, httpx.HTTPStatusError): |
| code = getattr(getattr(err, "response", None), "status_code", None) |
| try: |
| return f"http_{int(code)}" |
| except Exception: |
| return "http_status" |
| return "unknown" |
|
|
| async def _fetch_and_verify(self): |
| providers = self._build_providers() |
| if not providers: |
| logger.debug("Proxy providers not configured, doing nothing.") |
| return |
|
|
| candidates: List[str] = [] |
| async with httpx.AsyncClient(timeout=10.0) as client: |
| results = await asyncio.gather(*[p.fetch(client=client) for p in providers]) |
| for items in results: |
| candidates.extend(items or []) |
|
|
| if not candidates: |
| logger.warning("No proxies fetched from providers") |
| return |
|
|
| normalized_candidates: List[str] = [] |
| seen: set[str] = set() |
| for raw in candidates: |
| v = self._normalize_proxy(raw) |
| if v is None: |
| continue |
| if v in seen: |
| continue |
| seen.add(v) |
| normalized_candidates.append(v) |
|
|
| if not normalized_candidates: |
| logger.warning("No valid proxy candidates after normalization") |
| return |
|
|
| async def verify(proxy_url: str) -> str | None: |
| try: |
| async with httpx.AsyncClient( |
| proxies={"http://": proxy_url, "https://": proxy_url}, |
| timeout=5.0, |
| ) as verify_client: |
| res = await verify_client.get(self.verify_url) |
| if int(res.status_code) == 200: |
| return proxy_url |
| except Exception as e: |
| reason = self._classify_verify_error(e) |
| logger.debug(f"Proxy {proxy_url} verification failed: {reason}") |
| return None |
|
|
| results = await asyncio.gather(*[verify(p) for p in normalized_candidates]) |
| valid_proxies = [p for p in results if p is not None] |
|
|
| with self._lock: |
| self._sync_proxies(valid_proxies) |
| logger.info(f"ProxyPool updated: {len(self.proxies)} valid proxies") |
|
|
| @staticmethod |
| def _normalize_proxy(value: str | None) -> str | None: |
| if value is None: |
| return None |
| v = str(value).strip() |
| if v == "": |
| return None |
| if "://" not in v: |
| v = f"http://{v}" |
| return v |
|
|
| def _sync_proxies(self, proxies: List[str]) -> None: |
| normalized: List[str] = [] |
| seen: set[str] = set() |
| for raw in proxies: |
| v = self._normalize_proxy(raw) |
| if v is None: |
| continue |
| if v in seen: |
| continue |
| seen.add(v) |
| normalized.append(v) |
|
|
| keep = set(normalized) |
| for key in list(self._records.keys()): |
| if key not in keep: |
| self._records.pop(key, None) |
|
|
| for proxy in normalized: |
| if proxy not in self._records: |
| self._records[proxy] = ProxyRecord(proxy=proxy, score=float(self.initial_score)) |
|
|
| self.proxies = normalized |
|
|
| def get_random_proxy(self) -> Optional[str]: |
| with self._lock: |
| if not self.proxies: |
| return None |
| candidates = list(self.proxies) |
| weights: List[float] = [] |
| for proxy in candidates: |
| rec = self._records.get(proxy) |
| score = float(rec.score) if rec is not None else float(self.initial_score) |
| score = max(float(self.eject_score), min(float(self.max_score), score)) |
| weights.append(max(0.1, score)) |
| return random.choices(candidates, weights=weights, k=1)[0] |
|
|
| def snapshot(self) -> Dict[str, Any]: |
| now = time.time() |
| with self._lock: |
| while self._recent_failures and now - float(self._recent_failures[0][0]) > float(self._recent_window_seconds): |
| self._recent_failures.popleft() |
|
|
| size = int(len(self.proxies)) |
| total_score = 0.0 |
| for p in self.proxies: |
| rec = self._records.get(p) |
| if rec is None: |
| total_score += float(self.initial_score) |
| else: |
| total_score += float(rec.score) |
| avg_score = float(total_score) / float(size) if size > 0 else 0.0 |
|
|
| last_fail_reason_counts: Counter[str] = Counter() |
| for p in self.proxies: |
| rec = self._records.get(p) |
| reason = str(getattr(rec, "last_fail_reason", "") or "").strip().lower() |
| if reason != "": |
| last_fail_reason_counts[reason] += 1 |
|
|
| recent_counts: Counter[str] = Counter() |
| for _, reason in self._recent_failures: |
| r = str(reason or "").strip().lower() or "unknown" |
| recent_counts[r] += 1 |
|
|
| return { |
| "available_count": size, |
| "avg_score": avg_score, |
| "ejected_total": int(self._ejected_total), |
| "failures_total_by_reason": dict(self._failures_total), |
| "recent_failures_by_reason": dict(recent_counts), |
| "last_fail_reasons_by_reason": dict(last_fail_reason_counts), |
| } |
|
|
| def report_success(self, proxy: str) -> None: |
| normalized = self._normalize_proxy(proxy) |
| if normalized is None: |
| return |
| now = time.time() |
| with self._lock: |
| if normalized not in self.proxies: |
| return |
| rec = self._records.get(normalized) |
| if rec is None: |
| rec = ProxyRecord(proxy=normalized, score=float(self.initial_score)) |
| self._records[normalized] = rec |
| rec.success_count = int(rec.success_count) + 1 |
| rec.fail_count = 0 |
| rec.last_ok_at = now |
| rec.last_fail_reason = None |
| rec.score = min(float(self.max_score), float(rec.score) + float(self.success_reward)) |
|
|
| def report_failure(self, proxy: str, reason: str | None = None) -> None: |
| normalized = self._normalize_proxy(proxy) |
| if normalized is None: |
| return |
| reason_s = (str(reason or "").strip() or "unknown").lower() |
| now = time.time() |
| with self._lock: |
| if normalized not in self.proxies: |
| return |
| self._failures_total[reason_s] += 1 |
| self._recent_failures.append((now, reason_s)) |
| rec = self._records.get(normalized) |
| if rec is None: |
| rec = ProxyRecord(proxy=normalized, score=float(self.initial_score)) |
| self._records[normalized] = rec |
| rec.fail_count = int(rec.fail_count) + 1 |
| rec.last_fail_at = now |
| rec.last_fail_reason = reason_s |
| rec.score = max(float(self.min_score), float(rec.score) - float(self.failure_penalty)) |
| if int(rec.fail_count) >= int(self.eject_fail_count) or float(rec.score) <= float(self.eject_score): |
| try: |
| self.proxies.remove(normalized) |
| except ValueError: |
| pass |
| self._records.pop(normalized, None) |
| self._ejected_total = int(self._ejected_total) + 1 |
|
|
| proxy_pool = ProxyPool() |
|
|