Spaces:
Sleeping
Sleeping
| import os | |
| import random | |
| import json | |
| import time | |
| from pathlib import Path | |
| from typing import Dict, Optional, List | |
| # Hardcoded Default Pool | |
| DEFAULT_POOL = [ | |
| "http://Z9M25R:0QHP8X@206.168.90.60:50100", | |
| "http://Z9M25R:0QHP8X@206.168.88.98:50100" | |
| ] | |
| class ProxyManager: | |
| """ | |
| Manages proxy rotation and configuration for the V16 Pipeline. | |
| Reads from version16/pipeline_settings.json for configuration. | |
| """ | |
| def __init__(self): | |
| self.settings = self._load_settings() | |
| self.proxy_config = self.settings.get("proxy", {}) | |
| self.enabled = self.proxy_config.get("use_proxy", True) | |
| self.method = self.proxy_config.get("pool_method", "only_pool") | |
| self.rotation = self.proxy_config.get("rotation_strategy", "sequential") | |
| self.custom_proxies = self.proxy_config.get("custom_proxies", []) | |
| self._pool = self._build_pool() | |
| self._current_index = 0 | |
| # Check environment override (Highest Priority) | |
| if os.environ.get("WCS_FORCE_PROXY"): | |
| self._pool = [os.environ["WCS_FORCE_PROXY"]] | |
| def _load_settings(self) -> dict: | |
| """Attempts to find and load pipeline_settings.json""" | |
| try: | |
| # 1. Look in version16 sibling directory (High Priority) | |
| current = Path(__file__).parent | |
| v16_settings = current.parent / "version16" / "pipeline_settings.json" | |
| if v16_settings.exists(): | |
| with open(v16_settings, 'r', encoding='utf-8') as f: | |
| return json.load(f) | |
| # 2. Look in version15/14/13 (Legacy Fallback) | |
| for v in ["version15", "version14", "version13"]: | |
| settings = current.parent / v / "pipeline_settings.json" | |
| if settings.exists(): | |
| with open(settings, 'r', encoding='utf-8') as f: | |
| return json.load(f) | |
| # 3. Fallback: check current directory | |
| if Path("pipeline_settings.json").exists(): | |
| with open("pipeline_settings.json", 'r', encoding='utf-8') as f: | |
| return json.load(f) | |
| return {} | |
| except Exception: | |
| return {} | |
| def _build_pool(self) -> List[str]: | |
| """Constructs the active proxy pool based on settings.""" | |
| if not self.enabled: | |
| return [] | |
| pool = [] | |
| # 1. Add Default Pool | |
| if self.method in ["only_pool", "pool_plus_mine"]: | |
| pool.extend(DEFAULT_POOL) | |
| # 2. Add Custom Proxies | |
| if self.method in ["only_mine", "pool_plus_mine"]: | |
| if self.custom_proxies: | |
| pool.extend(self.custom_proxies) | |
| # Safety fallback | |
| if not pool and self.enabled: | |
| # print("⚠️ Proxy checks enabled but pool is empty! Using default fallback.") | |
| pool.extend(DEFAULT_POOL) | |
| return pool | |
| def get_proxy(self) -> Optional[str]: | |
| """Get the current or next proxy string.""" | |
| if not self.enabled or not self._pool: | |
| return None | |
| if self.rotation == "random": | |
| return random.choice(self._pool) | |
| else: | |
| # Sequential | |
| proxy = self._pool[self._current_index % len(self._pool)] | |
| self._current_index += 1 | |
| return proxy | |
| def get_playwright_proxy(self) -> Optional[Dict]: | |
| """Returns Playwright proxy dictionary or None.""" | |
| proxy_str = self.get_proxy() | |
| if not proxy_str: | |
| return None | |
| parse_result = {} | |
| if "@" in proxy_str: | |
| # PROTOCOL://USER:PASS@IP:PORT | |
| try: | |
| protocol_auth, address = proxy_str.split("@") | |
| if "://" in protocol_auth: | |
| protocol, auth = protocol_auth.split("://") | |
| else: | |
| protocol = "http" # Default | |
| auth = protocol_auth | |
| username, password = auth.split(":") | |
| parse_result = { | |
| "server": f"{protocol}://{address}", | |
| "username": username, | |
| "password": password | |
| } | |
| except: | |
| # Parse fail fallback | |
| parse_result = {"server": proxy_str} | |
| else: | |
| parse_result = {"server": proxy_str} | |
| return parse_result | |
| def inject_env(): | |
| """ | |
| Injects proxy settings into WCS environment variables. | |
| Called by Pipeline S0. | |
| """ | |
| manager = ProxyManager() | |
| proxy = manager.get_proxy() | |
| # V16: Track failure count if needed (future) | |
| # For now, we just return the next one in the pool | |
| if proxy: | |
| safe_ip = proxy.split('@')[-1] if '@' in proxy else proxy | |
| print(f"🔒 Proxy Manager: Active Proxy -> {safe_ip}") | |
| print(f" [Method: {manager.method} | Rotation: {manager.rotation}]") | |
| os.environ["WCS_TARGET_PROXY"] = proxy | |
| return proxy | |
| else: | |
| print("🔓 Proxy Manager: DIRECT MODE (No Proxy)") | |
| if "WCS_TARGET_PROXY" in os.environ: | |
| del os.environ["WCS_TARGET_PROXY"] | |
| return "DIRECT" | |