import os import random import json import time from pathlib import Path from typing import Dict, Optional, List # Hardcoded Default Pool DEFAULT_POOL = [ "http://Z9M25R:0QHP8X@206.168.90.60:50100", "http://Z9M25R:0QHP8X@206.168.88.98:50100" ] class ProxyManager: """ Manages proxy rotation and configuration for the V16 Pipeline. Reads from version16/pipeline_settings.json for configuration. """ def __init__(self): self.settings = self._load_settings() self.proxy_config = self.settings.get("proxy", {}) self.enabled = self.proxy_config.get("use_proxy", True) self.method = self.proxy_config.get("pool_method", "only_pool") self.rotation = self.proxy_config.get("rotation_strategy", "sequential") self.custom_proxies = self.proxy_config.get("custom_proxies", []) self._pool = self._build_pool() self._current_index = 0 # Check environment override (Highest Priority) if os.environ.get("WCS_FORCE_PROXY"): self._pool = [os.environ["WCS_FORCE_PROXY"]] def _load_settings(self) -> dict: """Attempts to find and load pipeline_settings.json""" try: # 1. Look in version16 sibling directory (High Priority) current = Path(__file__).parent v16_settings = current.parent / "version16" / "pipeline_settings.json" if v16_settings.exists(): with open(v16_settings, 'r', encoding='utf-8') as f: return json.load(f) # 2. Look in version15/14/13 (Legacy Fallback) for v in ["version15", "version14", "version13"]: settings = current.parent / v / "pipeline_settings.json" if settings.exists(): with open(settings, 'r', encoding='utf-8') as f: return json.load(f) # 3. Fallback: check current directory if Path("pipeline_settings.json").exists(): with open("pipeline_settings.json", 'r', encoding='utf-8') as f: return json.load(f) return {} except Exception: return {} def _build_pool(self) -> List[str]: """Constructs the active proxy pool based on settings.""" if not self.enabled: return [] pool = [] # 1. Add Default Pool if self.method in ["only_pool", "pool_plus_mine"]: pool.extend(DEFAULT_POOL) # 2. Add Custom Proxies if self.method in ["only_mine", "pool_plus_mine"]: if self.custom_proxies: pool.extend(self.custom_proxies) # Safety fallback if not pool and self.enabled: # print("⚠️ Proxy checks enabled but pool is empty! Using default fallback.") pool.extend(DEFAULT_POOL) return pool def get_proxy(self) -> Optional[str]: """Get the current or next proxy string.""" if not self.enabled or not self._pool: return None if self.rotation == "random": return random.choice(self._pool) else: # Sequential proxy = self._pool[self._current_index % len(self._pool)] self._current_index += 1 return proxy def get_playwright_proxy(self) -> Optional[Dict]: """Returns Playwright proxy dictionary or None.""" proxy_str = self.get_proxy() if not proxy_str: return None parse_result = {} if "@" in proxy_str: # PROTOCOL://USER:PASS@IP:PORT try: protocol_auth, address = proxy_str.split("@") if "://" in protocol_auth: protocol, auth = protocol_auth.split("://") else: protocol = "http" # Default auth = protocol_auth username, password = auth.split(":") parse_result = { "server": f"{protocol}://{address}", "username": username, "password": password } except: # Parse fail fallback parse_result = {"server": proxy_str} else: parse_result = {"server": proxy_str} return parse_result @staticmethod def inject_env(): """ Injects proxy settings into WCS environment variables. Called by Pipeline S0. """ manager = ProxyManager() proxy = manager.get_proxy() # V16: Track failure count if needed (future) # For now, we just return the next one in the pool if proxy: safe_ip = proxy.split('@')[-1] if '@' in proxy else proxy print(f"🔒 Proxy Manager: Active Proxy -> {safe_ip}") print(f" [Method: {manager.method} | Rotation: {manager.rotation}]") os.environ["WCS_TARGET_PROXY"] = proxy return proxy else: print("🔓 Proxy Manager: DIRECT MODE (No Proxy)") if "WCS_TARGET_PROXY" in os.environ: del os.environ["WCS_TARGET_PROXY"] return "DIRECT"