|
|
""" |
|
|
Proxy detection service module |
|
|
""" |
|
|
import asyncio |
|
|
import time |
|
|
from typing import Dict, List, Optional |
|
|
from urllib.parse import urlparse |
|
|
|
|
|
import httpx |
|
|
from pydantic import BaseModel |
|
|
|
|
|
from app.log.logger import get_config_routes_logger |
|
|
|
|
|
logger = get_config_routes_logger() |
|
|
|
|
|
|
|
|
class ProxyCheckResult(BaseModel): |
|
|
"""Proxy check result model""" |
|
|
proxy: str |
|
|
is_available: bool |
|
|
response_time: Optional[float] = None |
|
|
error_message: Optional[str] = None |
|
|
checked_at: float |
|
|
|
|
|
|
|
|
class ProxyCheckService: |
|
|
"""Proxy detection service class""" |
|
|
|
|
|
|
|
|
CHECK_URL = "https://www.google.com" |
|
|
|
|
|
TIMEOUT_SECONDS = 10 |
|
|
|
|
|
CACHE_DURATION = 10 |
|
|
|
|
|
def __init__(self): |
|
|
self._cache: Dict[str, ProxyCheckResult] = {} |
|
|
|
|
|
def _is_valid_proxy_format(self, proxy: str) -> bool: |
|
|
"""Validate proxy format""" |
|
|
try: |
|
|
parsed = urlparse(proxy) |
|
|
return parsed.scheme in ['http', 'https', 'socks5'] and parsed.hostname |
|
|
except Exception: |
|
|
return False |
|
|
|
|
|
def _get_cached_result(self, proxy: str) -> Optional[ProxyCheckResult]: |
|
|
"""Get cached check result""" |
|
|
if proxy in self._cache: |
|
|
result = self._cache[proxy] |
|
|
|
|
|
if time.time() - result.checked_at < self.CACHE_DURATION: |
|
|
logger.debug(f"Using cached proxy check result: {proxy}") |
|
|
return result |
|
|
else: |
|
|
|
|
|
del self._cache[proxy] |
|
|
return None |
|
|
|
|
|
def _cache_result(self, result: ProxyCheckResult) -> None: |
|
|
"""Cache check result""" |
|
|
self._cache[result.proxy] = result |
|
|
|
|
|
async def check_single_proxy(self, proxy: str, use_cache: bool = True) -> ProxyCheckResult: |
|
|
""" |
|
|
Check if a single proxy is available |
|
|
|
|
|
Args: |
|
|
proxy: Proxy address in format like http://host:port or socks5://host:port |
|
|
use_cache: Whether to use cached results |
|
|
|
|
|
Returns: |
|
|
ProxyCheckResult: Check result |
|
|
""" |
|
|
|
|
|
if use_cache: |
|
|
cached = self._get_cached_result(proxy) |
|
|
if cached: |
|
|
return cached |
|
|
|
|
|
|
|
|
if not self._is_valid_proxy_format(proxy): |
|
|
result = ProxyCheckResult( |
|
|
proxy=proxy, |
|
|
is_available=False, |
|
|
error_message="Invalid proxy format", |
|
|
checked_at=time.time() |
|
|
) |
|
|
self._cache_result(result) |
|
|
return result |
|
|
|
|
|
|
|
|
start_time = time.time() |
|
|
try: |
|
|
logger.info(f"Starting proxy check: {proxy}") |
|
|
|
|
|
timeout = httpx.Timeout(self.TIMEOUT_SECONDS, read=self.TIMEOUT_SECONDS) |
|
|
async with httpx.AsyncClient(timeout=timeout, proxy=proxy) as client: |
|
|
response = await client.head(self.CHECK_URL) |
|
|
|
|
|
response_time = time.time() - start_time |
|
|
|
|
|
|
|
|
is_available = response.status_code in [200, 204, 301, 302, 307, 308] |
|
|
|
|
|
result = ProxyCheckResult( |
|
|
proxy=proxy, |
|
|
is_available=is_available, |
|
|
response_time=round(response_time, 3), |
|
|
error_message=None if is_available else f"HTTP {response.status_code}", |
|
|
checked_at=time.time() |
|
|
) |
|
|
|
|
|
logger.info(f"Proxy check completed: {proxy}, available: {is_available}, response_time: {response_time:.3f}s") |
|
|
|
|
|
except asyncio.TimeoutError: |
|
|
result = ProxyCheckResult( |
|
|
proxy=proxy, |
|
|
is_available=False, |
|
|
error_message="Connection timeout", |
|
|
checked_at=time.time() |
|
|
) |
|
|
logger.warning(f"Proxy check timeout: {proxy}") |
|
|
|
|
|
except Exception as e: |
|
|
result = ProxyCheckResult( |
|
|
proxy=proxy, |
|
|
is_available=False, |
|
|
error_message=str(e), |
|
|
checked_at=time.time() |
|
|
) |
|
|
logger.error(f"Proxy check failed: {proxy}, error: {str(e)}") |
|
|
|
|
|
|
|
|
self._cache_result(result) |
|
|
return result |
|
|
|
|
|
async def check_multiple_proxies( |
|
|
self, |
|
|
proxies: List[str], |
|
|
use_cache: bool = True, |
|
|
max_concurrent: int = 5 |
|
|
) -> List[ProxyCheckResult]: |
|
|
""" |
|
|
Check multiple proxies concurrently |
|
|
|
|
|
Args: |
|
|
proxies: List of proxy addresses |
|
|
use_cache: Whether to use cached results |
|
|
max_concurrent: Maximum concurrent check count |
|
|
|
|
|
Returns: |
|
|
List[ProxyCheckResult]: List of check results |
|
|
""" |
|
|
if not proxies: |
|
|
return [] |
|
|
|
|
|
logger.info(f"Starting batch proxy check for {len(proxies)} proxies") |
|
|
|
|
|
|
|
|
semaphore = asyncio.Semaphore(max_concurrent) |
|
|
|
|
|
async def check_with_semaphore(proxy: str) -> ProxyCheckResult: |
|
|
async with semaphore: |
|
|
return await self.check_single_proxy(proxy, use_cache) |
|
|
|
|
|
|
|
|
tasks = [check_with_semaphore(proxy) for proxy in proxies] |
|
|
results = await asyncio.gather(*tasks, return_exceptions=True) |
|
|
|
|
|
|
|
|
final_results = [] |
|
|
for i, result in enumerate(results): |
|
|
if isinstance(result, Exception): |
|
|
logger.error(f"Proxy check task exception: {proxies[i]}, error: {str(result)}") |
|
|
final_results.append(ProxyCheckResult( |
|
|
proxy=proxies[i], |
|
|
is_available=False, |
|
|
error_message=f"Check task exception: {str(result)}", |
|
|
checked_at=time.time() |
|
|
)) |
|
|
else: |
|
|
final_results.append(result) |
|
|
|
|
|
available_count = sum(1 for r in final_results if r.is_available) |
|
|
logger.info(f"Batch proxy check completed: {available_count}/{len(proxies)} proxies available") |
|
|
|
|
|
return final_results |
|
|
|
|
|
def get_cache_stats(self) -> Dict[str, int]: |
|
|
"""Get cache statistics""" |
|
|
current_time = time.time() |
|
|
valid_cache_count = sum( |
|
|
1 for result in self._cache.values() |
|
|
if current_time - result.checked_at < self.CACHE_DURATION |
|
|
) |
|
|
|
|
|
return { |
|
|
"total_cached": len(self._cache), |
|
|
"valid_cached": valid_cache_count, |
|
|
"expired_cached": len(self._cache) - valid_cache_count |
|
|
} |
|
|
|
|
|
def clear_cache(self) -> None: |
|
|
"""Clear all cache""" |
|
|
self._cache.clear() |
|
|
logger.info("Proxy check cache cleared") |
|
|
|
|
|
|
|
|
|
|
|
_proxy_check_service: Optional[ProxyCheckService] = None |
|
|
|
|
|
|
|
|
def get_proxy_check_service() -> ProxyCheckService: |
|
|
"""Get proxy check service instance""" |
|
|
global _proxy_check_service |
|
|
if _proxy_check_service is None: |
|
|
_proxy_check_service = ProxyCheckService() |
|
|
return _proxy_check_service |