File size: 9,564 Bytes
15c4063 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 | import aiohttp
import asyncio
import time
import re
from typing import Optional, Dict, List
from pydantic import BaseModel
# Improved regex that matches IP:PORT format (doesn't validate ranges yet)
IP_REGEX = re.compile(r"(\d{1,3}\.){3}\d{1,3}:\d{1,5}")
def is_valid_ip(ip: str) -> bool:
"""Validate IP address octets are in range 0-255"""
try:
parts = ip.split(".")
if len(parts) != 4:
return False
return all(0 <= int(part) <= 255 for part in parts)
except (ValueError, AttributeError):
return False
def is_valid_port(port: int) -> bool:
"""Validate port is in range 1-65535"""
return 1 <= port <= 65535
class ValidationResult(BaseModel):
success: bool
latency_ms: Optional[int] = None
anonymity: Optional[str] = None
can_access_google: Optional[bool] = None
country_code: Optional[str] = None
country_name: Optional[str] = None
proxy_type: Optional[str] = None
quality_score: Optional[int] = None
error_message: Optional[str] = None
class ProxyValidator:
def __init__(self, timeout: int = 10, max_concurrent: int = 20):
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.semaphore = asyncio.Semaphore(max_concurrent)
async def validate_format(self, proxy: str) -> bool:
if proxy.startswith(("http://", "https://", "socks4://", "socks5://")):
proxy = proxy.split("://", 1)[1]
if not IP_REGEX.match(proxy):
return False
try:
ip_port = proxy.split(":")
if len(ip_port) != 2:
return False
ip, port_str = ip_port
port = int(port_str)
return is_valid_ip(ip) and is_valid_port(port)
except (ValueError, IndexError):
return False
async def validate_connectivity(
self, proxy_url: str
) -> tuple[bool, Optional[int], Optional[str]]:
async with self.semaphore:
try:
start_time = time.time()
async with aiohttp.ClientSession(timeout=self.timeout) as session:
async with session.get(
"http://httpbin.org/ip", proxy=proxy_url, ssl=False
) as resp:
latency_ms = int((time.time() - start_time) * 1000)
if resp.status == 200:
return True, latency_ms, None
else:
return False, None, f"HTTP {resp.status}"
except aiohttp.ClientProxyConnectionError:
return False, None, "Proxy connection failed"
except asyncio.TimeoutError:
return False, None, "Connection timeout"
except Exception as e:
return False, None, str(e)[:100]
async def check_anonymity(self, proxy_url: str) -> Optional[str]:
try:
async with aiohttp.ClientSession(timeout=self.timeout) as session:
async with session.get(
"http://httpbin.org/headers", proxy=proxy_url, ssl=False
) as resp:
if resp.status != 200:
return None
data = await resp.json()
headers = data.get("headers", {})
if "X-Forwarded-For" in headers or "Via" in headers:
return "transparent"
elif "Proxy-Connection" in headers or "X-Real-Ip" in headers:
return "anonymous"
else:
return "elite"
except Exception:
return None
async def test_google_access(self, proxy_url: str) -> bool:
try:
async with aiohttp.ClientSession(timeout=self.timeout) as session:
async with session.get(
"https://www.google.com", proxy=proxy_url, ssl=False
) as resp:
return resp.status == 200
except Exception:
return False
async def get_geo_info(self, ip: str) -> Dict[str, Optional[str]]:
try:
async with aiohttp.ClientSession(timeout=self.timeout) as session:
async with session.get(f"https://ipapi.co/{ip}/json/") as resp:
if resp.status == 200:
data = await resp.json()
return {
"country_code": data.get("country_code"),
"country_name": data.get("country_name"),
"state": data.get("region"),
"city": data.get("city"),
}
except Exception:
pass
return {"country_code": None, "country_name": None, "state": None, "city": None}
async def detect_proxy_type(self, ip: str) -> str:
try:
async with aiohttp.ClientSession(timeout=self.timeout) as session:
async with session.get(f"https://ipinfo.io/{ip}/json") as resp:
if resp.status == 200:
data = await resp.json()
org = data.get("org", "").lower()
datacenter_keywords = [
"amazon",
"aws",
"google",
"microsoft",
"azure",
"digitalocean",
"linode",
"ovh",
"hetzner",
"hosting",
"datacenter",
"data center",
"cloud",
]
for keyword in datacenter_keywords:
if keyword in org:
return "datacenter"
return "residential"
except Exception:
pass
return "unknown"
async def calculate_quality_score(
self,
latency_ms: Optional[int],
anonymity: Optional[str],
can_access_google: Optional[bool],
proxy_type: Optional[str],
) -> int:
score = 0
if latency_ms is not None:
if latency_ms < 200:
score += 40
elif latency_ms < 500:
score += 30
elif latency_ms < 1000:
score += 20
elif latency_ms < 2000:
score += 10
if anonymity == "elite":
score += 30
elif anonymity == "anonymous":
score += 20
elif anonymity == "transparent":
score += 5
if can_access_google:
score += 15
if proxy_type == "residential":
score += 15
elif proxy_type == "datacenter":
score += 5
return min(score, 100)
async def validate_comprehensive(self, proxy_url: str, ip: str) -> ValidationResult:
is_valid, latency_ms, error = await self.validate_connectivity(proxy_url)
if not is_valid:
return ValidationResult(success=False, error_message=error)
anonymity, can_access_google, geo_info, proxy_type = await asyncio.gather(
self.check_anonymity(proxy_url),
self.test_google_access(proxy_url),
self.get_geo_info(ip),
self.detect_proxy_type(ip),
return_exceptions=True,
)
if isinstance(anonymity, Exception):
anonymity = None
if isinstance(can_access_google, Exception):
can_access_google = None
if isinstance(geo_info, Exception):
geo_info = {}
if isinstance(proxy_type, Exception):
proxy_type = "unknown"
quality_score = await self.calculate_quality_score(
latency_ms, anonymity, can_access_google, proxy_type
)
return ValidationResult(
success=True,
latency_ms=latency_ms,
anonymity=anonymity,
can_access_google=can_access_google,
country_code=geo_info.get("country_code"),
country_name=geo_info.get("country_name"),
proxy_type=proxy_type,
quality_score=quality_score,
error_message=None,
)
async def validate_batch(
self, proxies: List[tuple[str, str]]
) -> List[tuple[str, ValidationResult]]:
tasks = []
for proxy_url, ip in proxies:
tasks.append(self.validate_comprehensive(proxy_url, ip))
results = await asyncio.gather(*tasks, return_exceptions=True)
output = []
for (proxy_url, ip), result in zip(proxies, results):
if isinstance(result, Exception):
output.append(
(
proxy_url,
ValidationResult(
success=False, error_message=str(result)[:100]
),
)
)
else:
output.append((proxy_url, result))
return output
proxy_validator = ProxyValidator()
|