import os import time import asyncio import aiohttp from utils.logger import log from bot.database import db from bot.state import state class ProxyManager: def __init__(self): self.proxies = [] # Explicit list of validated node dictionaries self.is_enabled = False self._semaphore = asyncio.BoundedSemaphore(5) async def fetch_webshare(self): """Fetches 10 free proxies from Webshare.io API.""" api_key = os.environ.get("WEBSHARE_API_KEY") if not api_key: log("⚠️ WEBSHARE_API_KEY missing from environment variables.") return False url = "https://proxy.webshare.io/api/v2/proxy/list/" headers = {"Authorization": f"Token {api_key}"} params = {"mode": "direct", "page": 1, "page_size": 100} try: async with aiohttp.ClientSession() as session: async with session.get(url, headers=headers, params=params, timeout=10) as resp: if resp.status != 200: log(f"⚠️ Webshare API returned {resp.status}") return False data = await resp.json() self.proxies.clear() for p in data.get("results", []): self.proxies.append({ "scheme": "socks5", "hostname": p["proxy_address"], "port": p["port"], "username": p["username"], "password": p["password"], "ping": 9999, "status": "untested" }) log(f"🌐 Fetched {len(self.proxies)} proxies from Webshare.") return True except Exception as e: log(f"⚠️ Failed to fetch Webshare proxies: {e}") return False async def _ping_proxy(self, proxy_dict): """Measures network health via an authenticated SOCKS5 handshake loop with strict socket cleanup.""" writer = None async with self._semaphore: start = time.time() try: reader, writer = await asyncio.wait_for( asyncio.open_connection(proxy_dict["hostname"], proxy_dict["port"]), timeout=3.0 ) writer.write(b"\x05\x01\x02") await writer.drain() res = await asyncio.wait_for(reader.read(2), timeout=2.0) if len(res) != 2 or res[0] != 5 or res[1] != 2: raise Exception("Auth Method Rejected") user_bytes = proxy_dict["username"].encode('utf-8') pass_bytes = proxy_dict["password"].encode('utf-8') auth_packet = bytes([1, len(user_bytes)]) + user_bytes + bytes([len(pass_bytes)]) + pass_bytes writer.write(auth_packet) await writer.drain() auth_res = await asyncio.wait_for(reader.read(2), timeout=2.0) if len(auth_res) == 2 and auth_res[0] == 1 and auth_res[1] == 0: ping = int((time.time() - start) * 1000) proxy_dict["ping"] = ping proxy_dict["status"] = "🟢 Alive" return True except Exception: pass finally: if writer: try: writer.close() await asyncio.sleep(0.01) if hasattr(writer, 'transport') and writer.transport: writer.transport.abort() except Exception: pass proxy_dict["ping"] = 9999 proxy_dict["status"] = "🔴 Dead" return False async def test_all_proxies(self): """Pings all proxies concurrently using authenticated TCP loops and flushes state to DB.""" if not self.proxies: return log("🔍 Pinging proxy fleet with authenticated credentials...") tasks = [self._ping_proxy(p) for p in self.proxies] await asyncio.gather(*tasks) self.proxies.sort(key=lambda x: x["ping"]) # Save structural array state directly into persistent store if hasattr(db, "save_setting"): asyncio.create_task(db.save_setting("cached_proxy_fleet", self.proxies)) asyncio.create_task(db.save_setting("proxy_mgr_enabled", self.is_enabled)) alive = sum(1 for p in self.proxies if p["status"] == "🟢 Alive") log(f"📊 Proxy Health Check complete. {alive}/{len(self.proxies)} alive.") def get_best_proxies(self, count): """Returns the top X fastest, alive proxies.""" alive_proxies = [p for p in self.proxies if p["status"] == "🟢 Alive"] return alive_proxies[:count] def handle_runtime_routing_fault(self, dead_hostname: str): """ Executes an instant, hot-swap redirection sequence. Rotates out a failing proxy from the pool path dynamically, avoiding active locks. """ log(f"🛡️ Dynamic Routing Fault Caught on route: {dead_hostname}") best_nodes = self.get_best_proxies(len(self.proxies)) allocated_proxies = state.get("allocated_proxies", set()) fallback_node = None # Priority 1: Find an alive proxy that is not currently locked by another camper thread for p in best_nodes: if p["hostname"] != dead_hostname and p["hostname"] not in allocated_proxies: fallback_node = p break # Priority 2: Fallback to any unlocked or available alive node if allocation limits are saturated if not fallback_node: for p in best_nodes: if p["hostname"] != dead_hostname: fallback_node = p break if fallback_node: log(f"🔄 Hot-Swap Target Selected: {fallback_node['hostname']} ({fallback_node['ping']}ms)") return { "scheme": fallback_node["scheme"], "hostname": fallback_node["hostname"], "port": fallback_node["port"], "username": fallback_node["username"], "password": fallback_node["password"] } log("⚠️ No standby backup proxy found in active pool. Dropping to Direct Home IP.") return None proxy_mgr = ProxyManager()