import aiohttp import asyncio import time import re from typing import Optional, Dict, List from pydantic import BaseModel # Improved regex that matches IP:PORT format (doesn't validate ranges yet) IP_REGEX = re.compile(r"(\d{1,3}\.){3}\d{1,3}:\d{1,5}") def is_valid_ip(ip: str) -> bool: """Validate IP address octets are in range 0-255""" try: parts = ip.split(".") if len(parts) != 4: return False return all(0 <= int(part) <= 255 for part in parts) except (ValueError, AttributeError): return False def is_valid_port(port: int) -> bool: """Validate port is in range 1-65535""" return 1 <= port <= 65535 class ValidationResult(BaseModel): success: bool latency_ms: Optional[int] = None anonymity: Optional[str] = None can_access_google: Optional[bool] = None country_code: Optional[str] = None country_name: Optional[str] = None proxy_type: Optional[str] = None quality_score: Optional[int] = None error_message: Optional[str] = None class ProxyValidator: def __init__(self, timeout: int = 10, max_concurrent: int = 20): self.timeout = aiohttp.ClientTimeout(total=timeout) self.semaphore = asyncio.Semaphore(max_concurrent) async def validate_format(self, proxy: str) -> bool: if proxy.startswith(("http://", "https://", "socks4://", "socks5://")): proxy = proxy.split("://", 1)[1] if not IP_REGEX.match(proxy): return False try: ip_port = proxy.split(":") if len(ip_port) != 2: return False ip, port_str = ip_port port = int(port_str) return is_valid_ip(ip) and is_valid_port(port) except (ValueError, IndexError): return False async def validate_connectivity( self, proxy_url: str ) -> tuple[bool, Optional[int], Optional[str]]: async with self.semaphore: try: start_time = time.time() async with aiohttp.ClientSession(timeout=self.timeout) as session: async with session.get( "http://httpbin.org/ip", proxy=proxy_url, ssl=False ) as resp: latency_ms = int((time.time() - start_time) * 1000) if resp.status == 200: return True, latency_ms, None else: return False, None, f"HTTP {resp.status}" except aiohttp.ClientProxyConnectionError: return False, None, "Proxy connection failed" except asyncio.TimeoutError: return False, None, "Connection timeout" except Exception as e: return False, None, str(e)[:100] async def check_anonymity(self, proxy_url: str) -> Optional[str]: try: async with aiohttp.ClientSession(timeout=self.timeout) as session: async with session.get( "http://httpbin.org/headers", proxy=proxy_url, ssl=False ) as resp: if resp.status != 200: return None data = await resp.json() headers = data.get("headers", {}) if "X-Forwarded-For" in headers or "Via" in headers: return "transparent" elif "Proxy-Connection" in headers or "X-Real-Ip" in headers: return "anonymous" else: return "elite" except Exception: return None async def test_google_access(self, proxy_url: str) -> bool: try: async with aiohttp.ClientSession(timeout=self.timeout) as session: async with session.get( "https://www.google.com", proxy=proxy_url, ssl=False ) as resp: return resp.status == 200 except Exception: return False async def get_geo_info(self, ip: str) -> Dict[str, Optional[str]]: try: async with aiohttp.ClientSession(timeout=self.timeout) as session: async with session.get(f"https://ipapi.co/{ip}/json/") as resp: if resp.status == 200: data = await resp.json() return { "country_code": data.get("country_code"), "country_name": data.get("country_name"), "state": data.get("region"), "city": data.get("city"), } except Exception: pass return {"country_code": None, "country_name": None, "state": None, "city": None} async def detect_proxy_type(self, ip: str) -> str: try: async with aiohttp.ClientSession(timeout=self.timeout) as session: async with session.get(f"https://ipinfo.io/{ip}/json") as resp: if resp.status == 200: data = await resp.json() org = data.get("org", "").lower() datacenter_keywords = [ "amazon", "aws", "google", "microsoft", "azure", "digitalocean", "linode", "ovh", "hetzner", "hosting", "datacenter", "data center", "cloud", ] for keyword in datacenter_keywords: if keyword in org: return "datacenter" return "residential" except Exception: pass return "unknown" async def calculate_quality_score( self, latency_ms: Optional[int], anonymity: Optional[str], can_access_google: Optional[bool], proxy_type: Optional[str], ) -> int: score = 0 if latency_ms is not None: if latency_ms < 200: score += 40 elif latency_ms < 500: score += 30 elif latency_ms < 1000: score += 20 elif latency_ms < 2000: score += 10 if anonymity == "elite": score += 30 elif anonymity == "anonymous": score += 20 elif anonymity == "transparent": score += 5 if can_access_google: score += 15 if proxy_type == "residential": score += 15 elif proxy_type == "datacenter": score += 5 return min(score, 100) async def validate_comprehensive(self, proxy_url: str, ip: str) -> ValidationResult: is_valid, latency_ms, error = await self.validate_connectivity(proxy_url) if not is_valid: return ValidationResult(success=False, error_message=error) anonymity, can_access_google, geo_info, proxy_type = await asyncio.gather( self.check_anonymity(proxy_url), self.test_google_access(proxy_url), self.get_geo_info(ip), self.detect_proxy_type(ip), return_exceptions=True, ) if isinstance(anonymity, Exception): anonymity = None if isinstance(can_access_google, Exception): can_access_google = None if isinstance(geo_info, Exception): geo_info = {} if isinstance(proxy_type, Exception): proxy_type = "unknown" quality_score = await self.calculate_quality_score( latency_ms, anonymity, can_access_google, proxy_type ) return ValidationResult( success=True, latency_ms=latency_ms, anonymity=anonymity, can_access_google=can_access_google, country_code=geo_info.get("country_code"), country_name=geo_info.get("country_name"), proxy_type=proxy_type, quality_score=quality_score, error_message=None, ) async def validate_batch( self, proxies: List[tuple[str, str]] ) -> List[tuple[str, ValidationResult]]: tasks = [] for proxy_url, ip in proxies: tasks.append(self.validate_comprehensive(proxy_url, ip)) results = await asyncio.gather(*tasks, return_exceptions=True) output = [] for (proxy_url, ip), result in zip(proxies, results): if isinstance(result, Exception): output.append( ( proxy_url, ValidationResult( success=False, error_message=str(result)[:100] ), ) ) else: output.append((proxy_url, result)) return output proxy_validator = ProxyValidator()