1proxy / app /validator.py
paijo77's picture
update app/validator.py
15c4063 verified
import aiohttp
import asyncio
import time
import re
from typing import Optional, Dict, List
from pydantic import BaseModel
# Improved regex that matches IP:PORT format (doesn't validate ranges yet)
IP_REGEX = re.compile(r"(\d{1,3}\.){3}\d{1,3}:\d{1,5}")
def is_valid_ip(ip: str) -> bool:
"""Validate IP address octets are in range 0-255"""
try:
parts = ip.split(".")
if len(parts) != 4:
return False
return all(0 <= int(part) <= 255 for part in parts)
except (ValueError, AttributeError):
return False
def is_valid_port(port: int) -> bool:
"""Validate port is in range 1-65535"""
return 1 <= port <= 65535
class ValidationResult(BaseModel):
success: bool
latency_ms: Optional[int] = None
anonymity: Optional[str] = None
can_access_google: Optional[bool] = None
country_code: Optional[str] = None
country_name: Optional[str] = None
proxy_type: Optional[str] = None
quality_score: Optional[int] = None
error_message: Optional[str] = None
class ProxyValidator:
def __init__(self, timeout: int = 10, max_concurrent: int = 20):
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.semaphore = asyncio.Semaphore(max_concurrent)
async def validate_format(self, proxy: str) -> bool:
if proxy.startswith(("http://", "https://", "socks4://", "socks5://")):
proxy = proxy.split("://", 1)[1]
if not IP_REGEX.match(proxy):
return False
try:
ip_port = proxy.split(":")
if len(ip_port) != 2:
return False
ip, port_str = ip_port
port = int(port_str)
return is_valid_ip(ip) and is_valid_port(port)
except (ValueError, IndexError):
return False
async def validate_connectivity(
self, proxy_url: str
) -> tuple[bool, Optional[int], Optional[str]]:
async with self.semaphore:
try:
start_time = time.time()
async with aiohttp.ClientSession(timeout=self.timeout) as session:
async with session.get(
"http://httpbin.org/ip", proxy=proxy_url, ssl=False
) as resp:
latency_ms = int((time.time() - start_time) * 1000)
if resp.status == 200:
return True, latency_ms, None
else:
return False, None, f"HTTP {resp.status}"
except aiohttp.ClientProxyConnectionError:
return False, None, "Proxy connection failed"
except asyncio.TimeoutError:
return False, None, "Connection timeout"
except Exception as e:
return False, None, str(e)[:100]
async def check_anonymity(self, proxy_url: str) -> Optional[str]:
try:
async with aiohttp.ClientSession(timeout=self.timeout) as session:
async with session.get(
"http://httpbin.org/headers", proxy=proxy_url, ssl=False
) as resp:
if resp.status != 200:
return None
data = await resp.json()
headers = data.get("headers", {})
if "X-Forwarded-For" in headers or "Via" in headers:
return "transparent"
elif "Proxy-Connection" in headers or "X-Real-Ip" in headers:
return "anonymous"
else:
return "elite"
except Exception:
return None
async def test_google_access(self, proxy_url: str) -> bool:
try:
async with aiohttp.ClientSession(timeout=self.timeout) as session:
async with session.get(
"https://www.google.com", proxy=proxy_url, ssl=False
) as resp:
return resp.status == 200
except Exception:
return False
async def get_geo_info(self, ip: str) -> Dict[str, Optional[str]]:
try:
async with aiohttp.ClientSession(timeout=self.timeout) as session:
async with session.get(f"https://ipapi.co/{ip}/json/") as resp:
if resp.status == 200:
data = await resp.json()
return {
"country_code": data.get("country_code"),
"country_name": data.get("country_name"),
"state": data.get("region"),
"city": data.get("city"),
}
except Exception:
pass
return {"country_code": None, "country_name": None, "state": None, "city": None}
async def detect_proxy_type(self, ip: str) -> str:
try:
async with aiohttp.ClientSession(timeout=self.timeout) as session:
async with session.get(f"https://ipinfo.io/{ip}/json") as resp:
if resp.status == 200:
data = await resp.json()
org = data.get("org", "").lower()
datacenter_keywords = [
"amazon",
"aws",
"google",
"microsoft",
"azure",
"digitalocean",
"linode",
"ovh",
"hetzner",
"hosting",
"datacenter",
"data center",
"cloud",
]
for keyword in datacenter_keywords:
if keyword in org:
return "datacenter"
return "residential"
except Exception:
pass
return "unknown"
async def calculate_quality_score(
self,
latency_ms: Optional[int],
anonymity: Optional[str],
can_access_google: Optional[bool],
proxy_type: Optional[str],
) -> int:
score = 0
if latency_ms is not None:
if latency_ms < 200:
score += 40
elif latency_ms < 500:
score += 30
elif latency_ms < 1000:
score += 20
elif latency_ms < 2000:
score += 10
if anonymity == "elite":
score += 30
elif anonymity == "anonymous":
score += 20
elif anonymity == "transparent":
score += 5
if can_access_google:
score += 15
if proxy_type == "residential":
score += 15
elif proxy_type == "datacenter":
score += 5
return min(score, 100)
async def validate_comprehensive(self, proxy_url: str, ip: str) -> ValidationResult:
is_valid, latency_ms, error = await self.validate_connectivity(proxy_url)
if not is_valid:
return ValidationResult(success=False, error_message=error)
anonymity, can_access_google, geo_info, proxy_type = await asyncio.gather(
self.check_anonymity(proxy_url),
self.test_google_access(proxy_url),
self.get_geo_info(ip),
self.detect_proxy_type(ip),
return_exceptions=True,
)
if isinstance(anonymity, Exception):
anonymity = None
if isinstance(can_access_google, Exception):
can_access_google = None
if isinstance(geo_info, Exception):
geo_info = {}
if isinstance(proxy_type, Exception):
proxy_type = "unknown"
quality_score = await self.calculate_quality_score(
latency_ms, anonymity, can_access_google, proxy_type
)
return ValidationResult(
success=True,
latency_ms=latency_ms,
anonymity=anonymity,
can_access_google=can_access_google,
country_code=geo_info.get("country_code"),
country_name=geo_info.get("country_name"),
proxy_type=proxy_type,
quality_score=quality_score,
error_message=None,
)
async def validate_batch(
self, proxies: List[tuple[str, str]]
) -> List[tuple[str, ValidationResult]]:
tasks = []
for proxy_url, ip in proxies:
tasks.append(self.validate_comprehensive(proxy_url, ip))
results = await asyncio.gather(*tasks, return_exceptions=True)
output = []
for (proxy_url, ip), result in zip(proxies, results):
if isinstance(result, Exception):
output.append(
(
proxy_url,
ValidationResult(
success=False, error_message=str(result)[:100]
),
)
)
else:
output.append((proxy_url, result))
return output
proxy_validator = ProxyValidator()