File size: 6,717 Bytes
22ada80
 
 
 
 
b69a123
3cdd115
22ada80
 
 
b69a123
22ada80
5290736
22ada80
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5290736
 
b69a123
5290736
 
 
 
 
 
 
 
 
 
 
 
 
0f8d534
5290736
 
727cfc3
5290736
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b69a123
5290736
b69a123
5290736
 
 
 
 
 
22ada80
 
b69a123
22ada80
 
 
5290736
22ada80
 
 
 
 
b69a123
 
 
 
 
22ada80
 
 
 
 
 
 
 
b69a123
 
 
3cdd115
b69a123
 
 
3cdd115
b69a123
 
3cdd115
b69a123
3cdd115
b69a123
 
 
3cdd115
 
 
 
 
 
 
b69a123
 
 
 
 
 
 
 
 
 
 
 
 
22ada80
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
import os
import time
import asyncio
import aiohttp
from utils.logger import log
from bot.database import db
from bot.state import state

class ProxyManager:
    def __init__(self):
        self.proxies = []  # Explicit list of validated node dictionaries
        self.is_enabled = False
        self._semaphore = asyncio.BoundedSemaphore(5)
        
    async def fetch_webshare(self):
        """Fetches 10 free proxies from Webshare.io API."""
        api_key = os.environ.get("WEBSHARE_API_KEY")
        if not api_key:
            log("⚠️ WEBSHARE_API_KEY missing from environment variables.")
            return False
            
        url = "https://proxy.webshare.io/api/v2/proxy/list/"
        headers = {"Authorization": f"Token {api_key}"}
        params = {"mode": "direct", "page": 1, "page_size": 100}
        
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(url, headers=headers, params=params, timeout=10) as resp:
                    if resp.status != 200:
                        log(f"⚠️ Webshare API returned {resp.status}")
                        return False
                    data = await resp.json()
                    
            self.proxies.clear()
            for p in data.get("results", []):
                self.proxies.append({
                    "scheme": "socks5",
                    "hostname": p["proxy_address"],
                    "port": p["port"],
                    "username": p["username"],
                    "password": p["password"],
                    "ping": 9999,
                    "status": "untested"
                })
            log(f"🌐 Fetched {len(self.proxies)} proxies from Webshare.")
            return True
        except Exception as e:
            log(f"⚠️ Failed to fetch Webshare proxies: {e}")
            return False

    async def _ping_proxy(self, proxy_dict):
        """Measures network health via an authenticated SOCKS5 handshake loop with strict socket cleanup."""
        writer = None
        async with self._semaphore:
            start = time.time()
            try:
                reader, writer = await asyncio.wait_for(
                    asyncio.open_connection(proxy_dict["hostname"], proxy_dict["port"]),
                    timeout=3.0
                )
                
                writer.write(b"\x05\x01\x02")
                await writer.drain()
                
                res = await asyncio.wait_for(reader.read(2), timeout=2.0)
                if len(res) != 2 or res[0] != 5 or res[1] != 2:
                    raise Exception("Auth Method Rejected")

                user_bytes = proxy_dict["username"].encode('utf-8')
                pass_bytes = proxy_dict["password"].encode('utf-8')
                
                auth_packet = bytes([1, len(user_bytes)]) + user_bytes + bytes([len(pass_bytes)]) + pass_bytes
                writer.write(auth_packet)
                await writer.drain()
                
                auth_res = await asyncio.wait_for(reader.read(2), timeout=2.0)
                
                if len(auth_res) == 2 and auth_res[0] == 1 and auth_res[1] == 0:
                    ping = int((time.time() - start) * 1000)
                    proxy_dict["ping"] = ping
                    proxy_dict["status"] = "🟒 Alive"
                    return True
                    
            except Exception:
                pass
            finally:
                if writer:
                    try:
                        writer.close()
                        await asyncio.sleep(0.01)
                        if hasattr(writer, 'transport') and writer.transport:
                            writer.transport.abort()
                    except Exception:
                        pass
                
            proxy_dict["ping"] = 9999
            proxy_dict["status"] = "πŸ”΄ Dead"
            return False

    async def test_all_proxies(self):
        """Pings all proxies concurrently using authenticated TCP loops and flushes state to DB."""
        if not self.proxies:
            return
            
        log("πŸ” Pinging proxy fleet with authenticated credentials...")
        tasks = [self._ping_proxy(p) for p in self.proxies]
        await asyncio.gather(*tasks)
        
        self.proxies.sort(key=lambda x: x["ping"])
        
        # Save structural array state directly into persistent store
        if hasattr(db, "save_setting"):
            asyncio.create_task(db.save_setting("cached_proxy_fleet", self.proxies))
            asyncio.create_task(db.save_setting("proxy_mgr_enabled", self.is_enabled))
            
        alive = sum(1 for p in self.proxies if p["status"] == "🟒 Alive")
        log(f"πŸ“Š Proxy Health Check complete. {alive}/{len(self.proxies)} alive.")
        
    def get_best_proxies(self, count):
        """Returns the top X fastest, alive proxies."""
        alive_proxies = [p for p in self.proxies if p["status"] == "🟒 Alive"]
        return alive_proxies[:count]

    def handle_runtime_routing_fault(self, dead_hostname: str):
        """
        Executes an instant, hot-swap redirection sequence.
        Rotates out a failing proxy from the pool path dynamically, avoiding active locks.
        """
        log(f"πŸ›‘οΈ Dynamic Routing Fault Caught on route: {dead_hostname}")
        best_nodes = self.get_best_proxies(len(self.proxies))
        allocated_proxies = state.get("allocated_proxies", set())
        
        fallback_node = None
        # Priority 1: Find an alive proxy that is not currently locked by another camper thread
        for p in best_nodes:
            if p["hostname"] != dead_hostname and p["hostname"] not in allocated_proxies:
                fallback_node = p
                break
                
        # Priority 2: Fallback to any unlocked or available alive node if allocation limits are saturated
        if not fallback_node:
            for p in best_nodes:
                if p["hostname"] != dead_hostname:
                    fallback_node = p
                    break
                
        if fallback_node:
            log(f"πŸ”„ Hot-Swap Target Selected: {fallback_node['hostname']} ({fallback_node['ping']}ms)")
            return {
                "scheme": fallback_node["scheme"],
                "hostname": fallback_node["hostname"],
                "port": fallback_node["port"],
                "username": fallback_node["username"],
                "password": fallback_node["password"]
            }
        
        log("⚠️ No standby backup proxy found in active pool. Dropping to Direct Home IP.")
        return None

proxy_mgr = ProxyManager()