| import os |
| import time |
| import asyncio |
| import aiohttp |
| from utils.logger import log |
| from bot.database import db |
| from bot.state import state |
|
|
| class ProxyManager: |
| def __init__(self): |
| self.proxies = [] |
| self.is_enabled = False |
| self._semaphore = asyncio.BoundedSemaphore(5) |
| |
| async def fetch_webshare(self): |
| """Fetches 10 free proxies from Webshare.io API.""" |
| api_key = os.environ.get("WEBSHARE_API_KEY") |
| if not api_key: |
| log("β οΈ WEBSHARE_API_KEY missing from environment variables.") |
| return False |
| |
| url = "https://proxy.webshare.io/api/v2/proxy/list/" |
| headers = {"Authorization": f"Token {api_key}"} |
| params = {"mode": "direct", "page": 1, "page_size": 100} |
| |
| try: |
| async with aiohttp.ClientSession() as session: |
| async with session.get(url, headers=headers, params=params, timeout=10) as resp: |
| if resp.status != 200: |
| log(f"β οΈ Webshare API returned {resp.status}") |
| return False |
| data = await resp.json() |
| |
| self.proxies.clear() |
| for p in data.get("results", []): |
| self.proxies.append({ |
| "scheme": "socks5", |
| "hostname": p["proxy_address"], |
| "port": p["port"], |
| "username": p["username"], |
| "password": p["password"], |
| "ping": 9999, |
| "status": "untested" |
| }) |
| log(f"π Fetched {len(self.proxies)} proxies from Webshare.") |
| return True |
| except Exception as e: |
| log(f"β οΈ Failed to fetch Webshare proxies: {e}") |
| return False |
|
|
| async def _ping_proxy(self, proxy_dict): |
| """Measures network health via an authenticated SOCKS5 handshake loop with strict socket cleanup.""" |
| writer = None |
| async with self._semaphore: |
| start = time.time() |
| try: |
| reader, writer = await asyncio.wait_for( |
| asyncio.open_connection(proxy_dict["hostname"], proxy_dict["port"]), |
| timeout=3.0 |
| ) |
| |
| writer.write(b"\x05\x01\x02") |
| await writer.drain() |
| |
| res = await asyncio.wait_for(reader.read(2), timeout=2.0) |
| if len(res) != 2 or res[0] != 5 or res[1] != 2: |
| raise Exception("Auth Method Rejected") |
|
|
| user_bytes = proxy_dict["username"].encode('utf-8') |
| pass_bytes = proxy_dict["password"].encode('utf-8') |
| |
| auth_packet = bytes([1, len(user_bytes)]) + user_bytes + bytes([len(pass_bytes)]) + pass_bytes |
| writer.write(auth_packet) |
| await writer.drain() |
| |
| auth_res = await asyncio.wait_for(reader.read(2), timeout=2.0) |
| |
| if len(auth_res) == 2 and auth_res[0] == 1 and auth_res[1] == 0: |
| ping = int((time.time() - start) * 1000) |
| proxy_dict["ping"] = ping |
| proxy_dict["status"] = "π’ Alive" |
| return True |
| |
| except Exception: |
| pass |
| finally: |
| if writer: |
| try: |
| writer.close() |
| await asyncio.sleep(0.01) |
| if hasattr(writer, 'transport') and writer.transport: |
| writer.transport.abort() |
| except Exception: |
| pass |
| |
| proxy_dict["ping"] = 9999 |
| proxy_dict["status"] = "π΄ Dead" |
| return False |
|
|
| async def test_all_proxies(self): |
| """Pings all proxies concurrently using authenticated TCP loops and flushes state to DB.""" |
| if not self.proxies: |
| return |
| |
| log("π Pinging proxy fleet with authenticated credentials...") |
| tasks = [self._ping_proxy(p) for p in self.proxies] |
| await asyncio.gather(*tasks) |
| |
| self.proxies.sort(key=lambda x: x["ping"]) |
| |
| |
| if hasattr(db, "save_setting"): |
| asyncio.create_task(db.save_setting("cached_proxy_fleet", self.proxies)) |
| asyncio.create_task(db.save_setting("proxy_mgr_enabled", self.is_enabled)) |
| |
| alive = sum(1 for p in self.proxies if p["status"] == "π’ Alive") |
| log(f"π Proxy Health Check complete. {alive}/{len(self.proxies)} alive.") |
| |
| def get_best_proxies(self, count): |
| """Returns the top X fastest, alive proxies.""" |
| alive_proxies = [p for p in self.proxies if p["status"] == "π’ Alive"] |
| return alive_proxies[:count] |
|
|
| def handle_runtime_routing_fault(self, dead_hostname: str): |
| """ |
| Executes an instant, hot-swap redirection sequence. |
| Rotates out a failing proxy from the pool path dynamically, avoiding active locks. |
| """ |
| log(f"π‘οΈ Dynamic Routing Fault Caught on route: {dead_hostname}") |
| best_nodes = self.get_best_proxies(len(self.proxies)) |
| allocated_proxies = state.get("allocated_proxies", set()) |
| |
| fallback_node = None |
| |
| for p in best_nodes: |
| if p["hostname"] != dead_hostname and p["hostname"] not in allocated_proxies: |
| fallback_node = p |
| break |
| |
| |
| if not fallback_node: |
| for p in best_nodes: |
| if p["hostname"] != dead_hostname: |
| fallback_node = p |
| break |
| |
| if fallback_node: |
| log(f"π Hot-Swap Target Selected: {fallback_node['hostname']} ({fallback_node['ping']}ms)") |
| return { |
| "scheme": fallback_node["scheme"], |
| "hostname": fallback_node["hostname"], |
| "port": fallback_node["port"], |
| "username": fallback_node["username"], |
| "password": fallback_node["password"] |
| } |
| |
| log("β οΈ No standby backup proxy found in active pool. Dropping to Direct Home IP.") |
| return None |
|
|
| proxy_mgr = ProxyManager() |