1proxy / app /source_validator.py
paijo77's picture
update app/source_validator.py
2e7b8f4 verified
import aiohttp
import asyncio
import socket
import ipaddress
from urllib.parse import urlparse
from typing import Optional, List
from pydantic import BaseModel
from app.models import SourceConfig, SourceType
from app.grabber import GitHubGrabber
class SourceValidationResult(BaseModel):
valid: bool
error_message: Optional[str] = None
proxy_count: int = 0
sample_proxies: List[str] = []
class SourceValidator:
def __init__(self, timeout: int = 15):
self.timeout = aiohttp.ClientTimeout(total=timeout)
self.grabber = GitHubGrabber()
def is_internal_url(self, url: str) -> bool:
"""Check if the URL points to an internal network (SSRF protection)."""
try:
parsed = urlparse(url)
hostname = parsed.hostname
if not hostname:
return True
# Check for common internal hostnames
if hostname.lower() in ["localhost", "127.0.0.1", "::1", "0.0.0.0"]:
return True
# Resolve hostname to IP and check if it's private
addr_info = socket.getaddrinfo(hostname, None)
for item in addr_info:
ip = item[4][0]
if ipaddress.ip_address(ip).is_private:
return True
if ipaddress.ip_address(ip).is_loopback:
return True
if ipaddress.ip_address(ip).is_link_local:
return True
return False
except Exception:
# If resolution fails, we'll treat it as potentially unsafe or handle it in reachable check
return False
async def validate_url_reachable(self, url: str) -> tuple[bool, Optional[str]]:
if self.is_internal_url(url):
return False, "Access to internal networks is restricted (SSRF protection)"
try:
async with aiohttp.ClientSession(timeout=self.timeout) as session:
async with session.get(url, ssl=False) as resp:
if resp.status == 200:
content_type = resp.headers.get("Content-Type", "")
content = await resp.text()
if len(content) < 10:
return False, "Source content too short (< 10 characters)"
if len(content) > 50_000_000:
return False, "Source content too large (> 50MB)"
return True, None
elif resp.status == 404:
return False, "Source not found (404)"
elif resp.status == 403:
return False, "Access forbidden (403)"
elif resp.status >= 500:
return False, f"Server error ({resp.status})"
else:
return False, f"HTTP error {resp.status}"
except asyncio.TimeoutError:
return False, "Connection timeout - source took too long to respond"
except aiohttp.ClientConnectorError:
return False, "Cannot connect to source URL"
except Exception as e:
return False, f"Error: {str(e)[:100]}"
async def validate_source_format(
self, source: SourceConfig
) -> tuple[bool, Optional[str]]:
url_str = str(source.url)
if source.type == SourceType.GITHUB_RAW:
if "github.com" not in url_str:
return False, "GitHub source must contain 'github.com'"
if "/raw/" not in url_str and "githubusercontent.com" not in url_str:
return False, "GitHub source must be a raw file URL"
elif source.type == SourceType.SUBSCRIPTION_BASE64:
if not url_str.startswith(("http://", "https://")):
return False, "Subscription source must start with http:// or https://"
return True, None
async def test_proxy_extraction(
self, source: SourceConfig
) -> tuple[int, List[str], Optional[str]]:
try:
proxies = await self.grabber.extract_proxies(source)
if not proxies:
return 0, [], "No proxies found in source"
proxy_urls = [p.url for p in proxies[:5]]
return len(proxies), proxy_urls, None
except Exception as e:
return 0, [], f"Failed to extract proxies: {str(e)[:100]}"
async def validate_source(self, source: SourceConfig) -> SourceValidationResult:
is_format_valid, format_error = await self.validate_source_format(source)
if not is_format_valid:
return SourceValidationResult(valid=False, error_message=format_error)
is_reachable, reachable_error = await self.validate_url_reachable(
str(source.url)
)
if not is_reachable:
return SourceValidationResult(valid=False, error_message=reachable_error)
(
proxy_count,
sample_proxies,
extraction_error,
) = await self.test_proxy_extraction(source)
if extraction_error:
return SourceValidationResult(
valid=False, error_message=extraction_error, proxy_count=proxy_count
)
return SourceValidationResult(
valid=True, proxy_count=proxy_count, sample_proxies=sample_proxies
)
source_validator = SourceValidator()