dfr / bot /proxy_manager.py
UNUSUALxd's picture
Update bot/proxy_manager.py
3cdd115 verified
import os
import time
import asyncio
import aiohttp
from utils.logger import log
from bot.database import db
from bot.state import state
class ProxyManager:
def __init__(self):
self.proxies = [] # Explicit list of validated node dictionaries
self.is_enabled = False
self._semaphore = asyncio.BoundedSemaphore(5)
async def fetch_webshare(self):
"""Fetches 10 free proxies from Webshare.io API."""
api_key = os.environ.get("WEBSHARE_API_KEY")
if not api_key:
log("⚠️ WEBSHARE_API_KEY missing from environment variables.")
return False
url = "https://proxy.webshare.io/api/v2/proxy/list/"
headers = {"Authorization": f"Token {api_key}"}
params = {"mode": "direct", "page": 1, "page_size": 100}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers, params=params, timeout=10) as resp:
if resp.status != 200:
log(f"⚠️ Webshare API returned {resp.status}")
return False
data = await resp.json()
self.proxies.clear()
for p in data.get("results", []):
self.proxies.append({
"scheme": "socks5",
"hostname": p["proxy_address"],
"port": p["port"],
"username": p["username"],
"password": p["password"],
"ping": 9999,
"status": "untested"
})
log(f"🌐 Fetched {len(self.proxies)} proxies from Webshare.")
return True
except Exception as e:
log(f"⚠️ Failed to fetch Webshare proxies: {e}")
return False
async def _ping_proxy(self, proxy_dict):
"""Measures network health via an authenticated SOCKS5 handshake loop with strict socket cleanup."""
writer = None
async with self._semaphore:
start = time.time()
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(proxy_dict["hostname"], proxy_dict["port"]),
timeout=3.0
)
writer.write(b"\x05\x01\x02")
await writer.drain()
res = await asyncio.wait_for(reader.read(2), timeout=2.0)
if len(res) != 2 or res[0] != 5 or res[1] != 2:
raise Exception("Auth Method Rejected")
user_bytes = proxy_dict["username"].encode('utf-8')
pass_bytes = proxy_dict["password"].encode('utf-8')
auth_packet = bytes([1, len(user_bytes)]) + user_bytes + bytes([len(pass_bytes)]) + pass_bytes
writer.write(auth_packet)
await writer.drain()
auth_res = await asyncio.wait_for(reader.read(2), timeout=2.0)
if len(auth_res) == 2 and auth_res[0] == 1 and auth_res[1] == 0:
ping = int((time.time() - start) * 1000)
proxy_dict["ping"] = ping
proxy_dict["status"] = "🟒 Alive"
return True
except Exception:
pass
finally:
if writer:
try:
writer.close()
await asyncio.sleep(0.01)
if hasattr(writer, 'transport') and writer.transport:
writer.transport.abort()
except Exception:
pass
proxy_dict["ping"] = 9999
proxy_dict["status"] = "πŸ”΄ Dead"
return False
async def test_all_proxies(self):
"""Pings all proxies concurrently using authenticated TCP loops and flushes state to DB."""
if not self.proxies:
return
log("πŸ” Pinging proxy fleet with authenticated credentials...")
tasks = [self._ping_proxy(p) for p in self.proxies]
await asyncio.gather(*tasks)
self.proxies.sort(key=lambda x: x["ping"])
# Save structural array state directly into persistent store
if hasattr(db, "save_setting"):
asyncio.create_task(db.save_setting("cached_proxy_fleet", self.proxies))
asyncio.create_task(db.save_setting("proxy_mgr_enabled", self.is_enabled))
alive = sum(1 for p in self.proxies if p["status"] == "🟒 Alive")
log(f"πŸ“Š Proxy Health Check complete. {alive}/{len(self.proxies)} alive.")
def get_best_proxies(self, count):
"""Returns the top X fastest, alive proxies."""
alive_proxies = [p for p in self.proxies if p["status"] == "🟒 Alive"]
return alive_proxies[:count]
def handle_runtime_routing_fault(self, dead_hostname: str):
"""
Executes an instant, hot-swap redirection sequence.
Rotates out a failing proxy from the pool path dynamically, avoiding active locks.
"""
log(f"πŸ›‘οΈ Dynamic Routing Fault Caught on route: {dead_hostname}")
best_nodes = self.get_best_proxies(len(self.proxies))
allocated_proxies = state.get("allocated_proxies", set())
fallback_node = None
# Priority 1: Find an alive proxy that is not currently locked by another camper thread
for p in best_nodes:
if p["hostname"] != dead_hostname and p["hostname"] not in allocated_proxies:
fallback_node = p
break
# Priority 2: Fallback to any unlocked or available alive node if allocation limits are saturated
if not fallback_node:
for p in best_nodes:
if p["hostname"] != dead_hostname:
fallback_node = p
break
if fallback_node:
log(f"πŸ”„ Hot-Swap Target Selected: {fallback_node['hostname']} ({fallback_node['ping']}ms)")
return {
"scheme": fallback_node["scheme"],
"hostname": fallback_node["hostname"],
"port": fallback_node["port"],
"username": fallback_node["username"],
"password": fallback_node["password"]
}
log("⚠️ No standby backup proxy found in active pool. Dropping to Direct Home IP.")
return None
proxy_mgr = ProxyManager()