File size: 5,843 Bytes
050a103
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import aiohttp
import asyncio
import time
import logging
from typing import List, Optional, Dict
from dataclasses import dataclass, asdict

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

@dataclass
class Proxy:
    url: str
    protocol: str
    ip: str
    port: str
    latency: float = float('inf')

    def to_dict(self):
        return asdict(self)

class ProxyManager:
    def __init__(self):
        self.proxies: List[Proxy] = [] # This will hold ONLY working proxies
        self.best_proxy: Optional[Proxy] = None
        self.test_target = "http://www.google.com" 
        self.timeout = 5 # seconds
        self.concurrency_limit = 200 # Higher concurrency
        self._lock = asyncio.Lock() # Lock for thread-safe updates to self.proxies

    async def fetch_json_proxies(self) -> List[Proxy]:
        """Fetch proxies from proxies.free JSON source."""
        url = f"https://proxies.free/data/proxies.json?t={int(time.time())}"
        fetched = []
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(url) as response:
                    if response.status == 200:
                        data = await response.json()
                        for p in data.get('proxies', []):
                            # Filter for http/https/socks5 only
                            protocol = p.get('protocol', '').lower()
                            if protocol in ['http', 'https', 'socks5']:
                                proxy_url = f"{protocol}://{p['ip']}:{p['port']}"
                                fetched.append(Proxy(
                                    url=proxy_url,
                                    protocol=protocol,
                                    ip=p['ip'],
                                    port=p['port']
                                ))
        except Exception as e:
            logger.error(f"Error fetching JSON proxies: {e}")
        return fetched

    async def fetch_txt_proxies(self) -> List[Proxy]:
        """Fetch proxies from proxifly TXT source."""
        url = "https://cdn.jsdelivr.net/gh/proxifly/free-proxy-list@main/proxies/all/data.txt"
        fetched = []
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(url) as response:
                    if response.status == 200:
                        text = await response.text()
                        for line in text.splitlines():
                            line = line.strip()
                            if not line:
                                continue
                            try:
                                protocol, rest = line.split('://')
                                ip, port = rest.split(':')
                                if protocol.lower() in ['http', 'https', 'socks5']:
                                    fetched.append(Proxy(
                                        url=line,
                                        protocol=protocol.lower(),
                                        ip=ip,
                                        port=port
                                    ))
                            except ValueError:
                                continue
        except Exception as e:
            logger.error(f"Error fetching TXT proxies: {e}")
        return fetched

    async def check_and_add_proxy(self, proxy: Proxy, semaphore: asyncio.Semaphore):
        """Test a proxy and add it to the valid list immediately if successful."""
        async with semaphore:
            start_time = time.time()
            try:
                # Use a new session for each check to avoid connection pool limits/issues with proxies
                timeout = aiohttp.ClientTimeout(total=self.timeout)
                async with aiohttp.ClientSession(timeout=timeout) as session:
                    async with session.get(self.test_target, proxy=proxy.url) as response:
                        if response.status == 200:
                            proxy.latency = (time.time() - start_time) * 1000 # ms
                            
                            # Add to working list immediately
                            async with self._lock:
                                self.proxies.append(proxy)
                                self.proxies.sort(key=lambda x: x.latency)
                                self.best_proxy = self.proxies[0]
                                
            except Exception:
                pass # Check failed, just ignore

    async def refresh_proxies(self) -> List[Proxy]:
        """Fetch and test all proxies."""
        logger.info("Fetching proxies...")
        p1 = await self.fetch_json_proxies()
        p2 = await self.fetch_txt_proxies()
        
        # Deduplicate
        seen = set()
        unique_proxies = []
        for p in p1 + p2:
            if p.url not in seen:
                seen.add(p.url)
                unique_proxies.append(p)
        
        logger.info(f"Testing {len(unique_proxies)} proxies with concurrency {self.concurrency_limit}...")
        
        # Reset current list before refresh? 
        # Ideally, we keep old ones until new ones replace, but to keep it simple and stateless:
        # We will clear it. UI might flicker but updates will come fast.
        async with self._lock:
            self.proxies = []
            self.best_proxy = None

        semaphore = asyncio.Semaphore(self.concurrency_limit)
        tasks = [self.check_and_add_proxy(p, semaphore) for p in unique_proxies]
        
        # Run all checks
        await asyncio.gather(*tasks)
        
        logger.info(f"Refresh complete. Found {len(self.proxies)} working proxies.")
        return self.proxies