gemini / app /service /proxy /proxy_check_service.py
yiset's picture
Upload folder using huggingface_hub
6dfddfb verified
"""
Proxy detection service module
"""
import asyncio
import time
from typing import Dict, List, Optional
from urllib.parse import urlparse
import httpx
from pydantic import BaseModel
from app.log.logger import get_config_routes_logger
logger = get_config_routes_logger()
class ProxyCheckResult(BaseModel):
"""Proxy check result model"""
proxy: str
is_available: bool
response_time: Optional[float] = None
error_message: Optional[str] = None
checked_at: float
class ProxyCheckService:
"""Proxy detection service class"""
# Target URL for checking
CHECK_URL = "https://www.google.com"
# Timeout in seconds
TIMEOUT_SECONDS = 10
# Cache duration in seconds
CACHE_DURATION = 10 # 10s
def __init__(self):
self._cache: Dict[str, ProxyCheckResult] = {}
def _is_valid_proxy_format(self, proxy: str) -> bool:
"""Validate proxy format"""
try:
parsed = urlparse(proxy)
return parsed.scheme in ['http', 'https', 'socks5'] and parsed.hostname
except Exception:
return False
def _get_cached_result(self, proxy: str) -> Optional[ProxyCheckResult]:
"""Get cached check result"""
if proxy in self._cache:
result = self._cache[proxy]
# Check if cache is expired
if time.time() - result.checked_at < self.CACHE_DURATION:
logger.debug(f"Using cached proxy check result: {proxy}")
return result
else:
# Remove expired cache
del self._cache[proxy]
return None
def _cache_result(self, result: ProxyCheckResult) -> None:
"""Cache check result"""
self._cache[result.proxy] = result
async def check_single_proxy(self, proxy: str, use_cache: bool = True) -> ProxyCheckResult:
"""
Check if a single proxy is available
Args:
proxy: Proxy address in format like http://host:port or socks5://host:port
use_cache: Whether to use cached results
Returns:
ProxyCheckResult: Check result
"""
# Check cache first
if use_cache:
cached = self._get_cached_result(proxy)
if cached:
return cached
# Validate proxy format
if not self._is_valid_proxy_format(proxy):
result = ProxyCheckResult(
proxy=proxy,
is_available=False,
error_message="Invalid proxy format",
checked_at=time.time()
)
self._cache_result(result)
return result
# Perform check
start_time = time.time()
try:
logger.info(f"Starting proxy check: {proxy}")
timeout = httpx.Timeout(self.TIMEOUT_SECONDS, read=self.TIMEOUT_SECONDS)
async with httpx.AsyncClient(timeout=timeout, proxy=proxy) as client:
response = await client.head(self.CHECK_URL)
response_time = time.time() - start_time
# Check response status
is_available = response.status_code in [200, 204, 301, 302, 307, 308]
result = ProxyCheckResult(
proxy=proxy,
is_available=is_available,
response_time=round(response_time, 3),
error_message=None if is_available else f"HTTP {response.status_code}",
checked_at=time.time()
)
logger.info(f"Proxy check completed: {proxy}, available: {is_available}, response_time: {response_time:.3f}s")
except asyncio.TimeoutError:
result = ProxyCheckResult(
proxy=proxy,
is_available=False,
error_message="Connection timeout",
checked_at=time.time()
)
logger.warning(f"Proxy check timeout: {proxy}")
except Exception as e:
result = ProxyCheckResult(
proxy=proxy,
is_available=False,
error_message=str(e),
checked_at=time.time()
)
logger.error(f"Proxy check failed: {proxy}, error: {str(e)}")
# Cache result
self._cache_result(result)
return result
async def check_multiple_proxies(
self,
proxies: List[str],
use_cache: bool = True,
max_concurrent: int = 5
) -> List[ProxyCheckResult]:
"""
Check multiple proxies concurrently
Args:
proxies: List of proxy addresses
use_cache: Whether to use cached results
max_concurrent: Maximum concurrent check count
Returns:
List[ProxyCheckResult]: List of check results
"""
if not proxies:
return []
logger.info(f"Starting batch proxy check for {len(proxies)} proxies")
# Use semaphore to limit concurrency
semaphore = asyncio.Semaphore(max_concurrent)
async def check_with_semaphore(proxy: str) -> ProxyCheckResult:
async with semaphore:
return await self.check_single_proxy(proxy, use_cache)
# Execute checks concurrently
tasks = [check_with_semaphore(proxy) for proxy in proxies]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle exception results
final_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"Proxy check task exception: {proxies[i]}, error: {str(result)}")
final_results.append(ProxyCheckResult(
proxy=proxies[i],
is_available=False,
error_message=f"Check task exception: {str(result)}",
checked_at=time.time()
))
else:
final_results.append(result)
available_count = sum(1 for r in final_results if r.is_available)
logger.info(f"Batch proxy check completed: {available_count}/{len(proxies)} proxies available")
return final_results
def get_cache_stats(self) -> Dict[str, int]:
"""Get cache statistics"""
current_time = time.time()
valid_cache_count = sum(
1 for result in self._cache.values()
if current_time - result.checked_at < self.CACHE_DURATION
)
return {
"total_cached": len(self._cache),
"valid_cached": valid_cache_count,
"expired_cached": len(self._cache) - valid_cache_count
}
def clear_cache(self) -> None:
"""Clear all cache"""
self._cache.clear()
logger.info("Proxy check cache cleared")
# Global instance
_proxy_check_service: Optional[ProxyCheckService] = None
def get_proxy_check_service() -> ProxyCheckService:
"""Get proxy check service instance"""
global _proxy_check_service
if _proxy_check_service is None:
_proxy_check_service = ProxyCheckService()
return _proxy_check_service