File size: 5,843 Bytes
050a103 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 | import aiohttp
import asyncio
import time
import logging
from typing import List, Optional, Dict
from dataclasses import dataclass, asdict
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
@dataclass
class Proxy:
url: str
protocol: str
ip: str
port: str
latency: float = float('inf')
def to_dict(self):
return asdict(self)
class ProxyManager:
def __init__(self):
self.proxies: List[Proxy] = [] # This will hold ONLY working proxies
self.best_proxy: Optional[Proxy] = None
self.test_target = "http://www.google.com"
self.timeout = 5 # seconds
self.concurrency_limit = 200 # Higher concurrency
self._lock = asyncio.Lock() # Lock for thread-safe updates to self.proxies
async def fetch_json_proxies(self) -> List[Proxy]:
"""Fetch proxies from proxies.free JSON source."""
url = f"https://proxies.free/data/proxies.json?t={int(time.time())}"
fetched = []
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
data = await response.json()
for p in data.get('proxies', []):
# Filter for http/https/socks5 only
protocol = p.get('protocol', '').lower()
if protocol in ['http', 'https', 'socks5']:
proxy_url = f"{protocol}://{p['ip']}:{p['port']}"
fetched.append(Proxy(
url=proxy_url,
protocol=protocol,
ip=p['ip'],
port=p['port']
))
except Exception as e:
logger.error(f"Error fetching JSON proxies: {e}")
return fetched
async def fetch_txt_proxies(self) -> List[Proxy]:
"""Fetch proxies from proxifly TXT source."""
url = "https://cdn.jsdelivr.net/gh/proxifly/free-proxy-list@main/proxies/all/data.txt"
fetched = []
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
text = await response.text()
for line in text.splitlines():
line = line.strip()
if not line:
continue
try:
protocol, rest = line.split('://')
ip, port = rest.split(':')
if protocol.lower() in ['http', 'https', 'socks5']:
fetched.append(Proxy(
url=line,
protocol=protocol.lower(),
ip=ip,
port=port
))
except ValueError:
continue
except Exception as e:
logger.error(f"Error fetching TXT proxies: {e}")
return fetched
async def check_and_add_proxy(self, proxy: Proxy, semaphore: asyncio.Semaphore):
"""Test a proxy and add it to the valid list immediately if successful."""
async with semaphore:
start_time = time.time()
try:
# Use a new session for each check to avoid connection pool limits/issues with proxies
timeout = aiohttp.ClientTimeout(total=self.timeout)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(self.test_target, proxy=proxy.url) as response:
if response.status == 200:
proxy.latency = (time.time() - start_time) * 1000 # ms
# Add to working list immediately
async with self._lock:
self.proxies.append(proxy)
self.proxies.sort(key=lambda x: x.latency)
self.best_proxy = self.proxies[0]
except Exception:
pass # Check failed, just ignore
async def refresh_proxies(self) -> List[Proxy]:
"""Fetch and test all proxies."""
logger.info("Fetching proxies...")
p1 = await self.fetch_json_proxies()
p2 = await self.fetch_txt_proxies()
# Deduplicate
seen = set()
unique_proxies = []
for p in p1 + p2:
if p.url not in seen:
seen.add(p.url)
unique_proxies.append(p)
logger.info(f"Testing {len(unique_proxies)} proxies with concurrency {self.concurrency_limit}...")
# Reset current list before refresh?
# Ideally, we keep old ones until new ones replace, but to keep it simple and stateless:
# We will clear it. UI might flicker but updates will come fast.
async with self._lock:
self.proxies = []
self.best_proxy = None
semaphore = asyncio.Semaphore(self.concurrency_limit)
tasks = [self.check_and_add_proxy(p, semaphore) for p in unique_proxies]
# Run all checks
await asyncio.gather(*tasks)
logger.info(f"Refresh complete. Found {len(self.proxies)} working proxies.")
return self.proxies
|