hub / backend /api /proxy_manager.py
Yashwanth
Restructure project: separate backend and frontend folders
6c9376d
"""
Proxy Manager for Viral Clip Extractor
Handles proxy rotation, WebShare API integration, and bandwidth tracking
"""
import os
import json
import requests
import logging
from typing import List, Dict, Optional
from datetime import datetime
import threading
logger = logging.getLogger(__name__)
class ProxyManager:
def __init__(self, proxy_url: Optional[str] = None, api_url: Optional[str] = None):
"""
Initialize ProxyManager
Args:
proxy_url: Single proxy in format 'http://user:pass@host:port' or 'host:port:user:pass'
api_url: WebShare API URL for dynamic proxy list
"""
self.proxies: List[str] = []
self.current_index = 0
self.bandwidth_stats: Dict[str, int] = {}
self.total_bandwidth = 0
self.stats_file = "proxy_stats.json"
self.lock = threading.Lock()
# Load existing stats
self._load_stats()
# Initialize proxies
if api_url:
logger.info("Initializing with WebShare API")
self._fetch_from_api(api_url)
elif proxy_url:
logger.info("Initializing with single proxy")
self._add_single_proxy(proxy_url)
else:
logger.warning("No proxy configuration provided")
def _parse_proxy_format(self, proxy_str: str) -> str:
"""
Parse proxy from 'host:port:user:pass' to 'http://user:pass@host:port'
Also handles already formatted proxies
"""
proxy_str = proxy_str.strip()
# Already in correct format
if proxy_str.startswith('http://') or proxy_str.startswith('https://'):
return proxy_str
# Parse host:port:user:pass format
parts = proxy_str.split(':')
if len(parts) == 4:
host, port, user, password = parts
return f"http://{user}:{password}@{host}:{port}"
elif len(parts) == 2:
# Just host:port (no auth)
host, port = parts
return f"http://{host}:{port}"
else:
logger.error(f"Invalid proxy format: {proxy_str}")
return proxy_str
def _add_single_proxy(self, proxy_url: str):
"""Add a single proxy"""
formatted = self._parse_proxy_format(proxy_url)
self.proxies.append(formatted)
if formatted not in self.bandwidth_stats:
self.bandwidth_stats[formatted] = 0
def _fetch_from_api(self, api_url: str):
"""Fetch proxy list from WebShare API"""
try:
logger.info(f"Fetching proxies from API: {api_url[:50]}...")
response = requests.get(api_url, timeout=10)
response.raise_for_status()
# Parse response - WebShare returns one proxy per line
proxy_list = response.text.strip().split('\n')
for proxy_line in proxy_list:
if proxy_line.strip():
formatted = self._parse_proxy_format(proxy_line)
self.proxies.append(formatted)
if formatted not in self.bandwidth_stats:
self.bandwidth_stats[formatted] = 0
logger.info(f"Loaded {len(self.proxies)} proxies from API")
except Exception as e:
logger.error(f"Failed to fetch proxies from API: {e}")
def get_next_proxy(self) -> Optional[str]:
"""Get next proxy using round-robin rotation"""
with self.lock:
if not self.proxies:
return None
proxy = self.proxies[self.current_index]
self.current_index = (self.current_index + 1) % len(self.proxies)
return proxy
def track_bandwidth(self, proxy: str, bytes_used: int):
"""Track bandwidth usage for a proxy"""
with self.lock:
if proxy in self.bandwidth_stats:
self.bandwidth_stats[proxy] += bytes_used
else:
self.bandwidth_stats[proxy] = bytes_used
self.total_bandwidth += bytes_used
self._save_stats()
def get_stats(self) -> Dict:
"""Get bandwidth statistics"""
with self.lock:
return {
"total_proxies": len(self.proxies),
"total_bandwidth_bytes": self.total_bandwidth,
"total_bandwidth_mb": round(self.total_bandwidth / (1024 * 1024), 2),
"total_bandwidth_gb": round(self.total_bandwidth / (1024 * 1024 * 1024), 3),
"bandwidth_remaining_mb": max(0, 1024 - round(self.total_bandwidth / (1024 * 1024), 2)),
"per_proxy_stats": [
{
"proxy": self._mask_proxy(proxy),
"bandwidth_bytes": bytes_used,
"bandwidth_mb": round(bytes_used / (1024 * 1024), 2)
}
for proxy, bytes_used in self.bandwidth_stats.items()
],
"current_proxy_index": self.current_index,
"last_updated": datetime.now().isoformat()
}
def _mask_proxy(self, proxy: str) -> str:
"""Mask proxy credentials for display"""
if '@' in proxy:
parts = proxy.split('@')
return f"***@{parts[1]}"
return proxy
def _load_stats(self):
"""Load stats from file"""
try:
if os.path.exists(self.stats_file):
with open(self.stats_file, 'r') as f:
data = json.load(f)
self.bandwidth_stats = data.get('bandwidth_stats', {})
self.total_bandwidth = data.get('total_bandwidth', 0)
logger.info(f"Loaded stats: {self.total_bandwidth} bytes total")
except Exception as e:
logger.error(f"Failed to load stats: {e}")
def _save_stats(self):
"""Save stats to file"""
try:
with open(self.stats_file, 'w') as f:
json.dump({
'bandwidth_stats': self.bandwidth_stats,
'total_bandwidth': self.total_bandwidth,
'last_updated': datetime.now().isoformat()
}, f, indent=2)
except Exception as e:
logger.error(f"Failed to save stats: {e}")
def refresh_proxies(self, api_url: str):
"""Refresh proxy list from API"""
with self.lock:
old_count = len(self.proxies)
self.proxies.clear()
self._fetch_from_api(api_url)
logger.info(f"Refreshed proxies: {old_count} -> {len(self.proxies)}")
def add_proxy(self, proxy_str: str):
"""Add a new proxy"""
with self.lock:
formatted = self._parse_proxy_format(proxy_str)
if formatted not in self.proxies:
self.proxies.append(formatted)
if formatted not in self.bandwidth_stats:
self.bandwidth_stats[formatted] = 0
logger.info(f"Added proxy: {self._mask_proxy(formatted)}")