File size: 6,717 Bytes
22ada80 b69a123 3cdd115 22ada80 b69a123 22ada80 5290736 22ada80 5290736 b69a123 5290736 0f8d534 5290736 727cfc3 5290736 b69a123 5290736 b69a123 5290736 22ada80 b69a123 22ada80 5290736 22ada80 b69a123 22ada80 b69a123 3cdd115 b69a123 3cdd115 b69a123 3cdd115 b69a123 3cdd115 b69a123 3cdd115 b69a123 22ada80 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 | import os
import time
import asyncio
import aiohttp
from utils.logger import log
from bot.database import db
from bot.state import state
class ProxyManager:
def __init__(self):
self.proxies = [] # Explicit list of validated node dictionaries
self.is_enabled = False
self._semaphore = asyncio.BoundedSemaphore(5)
async def fetch_webshare(self):
"""Fetches 10 free proxies from Webshare.io API."""
api_key = os.environ.get("WEBSHARE_API_KEY")
if not api_key:
log("β οΈ WEBSHARE_API_KEY missing from environment variables.")
return False
url = "https://proxy.webshare.io/api/v2/proxy/list/"
headers = {"Authorization": f"Token {api_key}"}
params = {"mode": "direct", "page": 1, "page_size": 100}
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, headers=headers, params=params, timeout=10) as resp:
if resp.status != 200:
log(f"β οΈ Webshare API returned {resp.status}")
return False
data = await resp.json()
self.proxies.clear()
for p in data.get("results", []):
self.proxies.append({
"scheme": "socks5",
"hostname": p["proxy_address"],
"port": p["port"],
"username": p["username"],
"password": p["password"],
"ping": 9999,
"status": "untested"
})
log(f"π Fetched {len(self.proxies)} proxies from Webshare.")
return True
except Exception as e:
log(f"β οΈ Failed to fetch Webshare proxies: {e}")
return False
async def _ping_proxy(self, proxy_dict):
"""Measures network health via an authenticated SOCKS5 handshake loop with strict socket cleanup."""
writer = None
async with self._semaphore:
start = time.time()
try:
reader, writer = await asyncio.wait_for(
asyncio.open_connection(proxy_dict["hostname"], proxy_dict["port"]),
timeout=3.0
)
writer.write(b"\x05\x01\x02")
await writer.drain()
res = await asyncio.wait_for(reader.read(2), timeout=2.0)
if len(res) != 2 or res[0] != 5 or res[1] != 2:
raise Exception("Auth Method Rejected")
user_bytes = proxy_dict["username"].encode('utf-8')
pass_bytes = proxy_dict["password"].encode('utf-8')
auth_packet = bytes([1, len(user_bytes)]) + user_bytes + bytes([len(pass_bytes)]) + pass_bytes
writer.write(auth_packet)
await writer.drain()
auth_res = await asyncio.wait_for(reader.read(2), timeout=2.0)
if len(auth_res) == 2 and auth_res[0] == 1 and auth_res[1] == 0:
ping = int((time.time() - start) * 1000)
proxy_dict["ping"] = ping
proxy_dict["status"] = "π’ Alive"
return True
except Exception:
pass
finally:
if writer:
try:
writer.close()
await asyncio.sleep(0.01)
if hasattr(writer, 'transport') and writer.transport:
writer.transport.abort()
except Exception:
pass
proxy_dict["ping"] = 9999
proxy_dict["status"] = "π΄ Dead"
return False
async def test_all_proxies(self):
"""Pings all proxies concurrently using authenticated TCP loops and flushes state to DB."""
if not self.proxies:
return
log("π Pinging proxy fleet with authenticated credentials...")
tasks = [self._ping_proxy(p) for p in self.proxies]
await asyncio.gather(*tasks)
self.proxies.sort(key=lambda x: x["ping"])
# Save structural array state directly into persistent store
if hasattr(db, "save_setting"):
asyncio.create_task(db.save_setting("cached_proxy_fleet", self.proxies))
asyncio.create_task(db.save_setting("proxy_mgr_enabled", self.is_enabled))
alive = sum(1 for p in self.proxies if p["status"] == "π’ Alive")
log(f"π Proxy Health Check complete. {alive}/{len(self.proxies)} alive.")
def get_best_proxies(self, count):
"""Returns the top X fastest, alive proxies."""
alive_proxies = [p for p in self.proxies if p["status"] == "π’ Alive"]
return alive_proxies[:count]
def handle_runtime_routing_fault(self, dead_hostname: str):
"""
Executes an instant, hot-swap redirection sequence.
Rotates out a failing proxy from the pool path dynamically, avoiding active locks.
"""
log(f"π‘οΈ Dynamic Routing Fault Caught on route: {dead_hostname}")
best_nodes = self.get_best_proxies(len(self.proxies))
allocated_proxies = state.get("allocated_proxies", set())
fallback_node = None
# Priority 1: Find an alive proxy that is not currently locked by another camper thread
for p in best_nodes:
if p["hostname"] != dead_hostname and p["hostname"] not in allocated_proxies:
fallback_node = p
break
# Priority 2: Fallback to any unlocked or available alive node if allocation limits are saturated
if not fallback_node:
for p in best_nodes:
if p["hostname"] != dead_hostname:
fallback_node = p
break
if fallback_node:
log(f"π Hot-Swap Target Selected: {fallback_node['hostname']} ({fallback_node['ping']}ms)")
return {
"scheme": fallback_node["scheme"],
"hostname": fallback_node["hostname"],
"port": fallback_node["port"],
"username": fallback_node["username"],
"password": fallback_node["password"]
}
log("β οΈ No standby backup proxy found in active pool. Dropping to Direct Home IP.")
return None
proxy_mgr = ProxyManager() |