File size: 7,171 Bytes
6c9376d | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 | """
Proxy Manager for Viral Clip Extractor
Handles proxy rotation, WebShare API integration, and bandwidth tracking
"""
import os
import json
import requests
import logging
from typing import List, Dict, Optional
from datetime import datetime
import threading
logger = logging.getLogger(__name__)
class ProxyManager:
def __init__(self, proxy_url: Optional[str] = None, api_url: Optional[str] = None):
"""
Initialize ProxyManager
Args:
proxy_url: Single proxy in format 'http://user:pass@host:port' or 'host:port:user:pass'
api_url: WebShare API URL for dynamic proxy list
"""
self.proxies: List[str] = []
self.current_index = 0
self.bandwidth_stats: Dict[str, int] = {}
self.total_bandwidth = 0
self.stats_file = "proxy_stats.json"
self.lock = threading.Lock()
# Load existing stats
self._load_stats()
# Initialize proxies
if api_url:
logger.info("Initializing with WebShare API")
self._fetch_from_api(api_url)
elif proxy_url:
logger.info("Initializing with single proxy")
self._add_single_proxy(proxy_url)
else:
logger.warning("No proxy configuration provided")
def _parse_proxy_format(self, proxy_str: str) -> str:
"""
Parse proxy from 'host:port:user:pass' to 'http://user:pass@host:port'
Also handles already formatted proxies
"""
proxy_str = proxy_str.strip()
# Already in correct format
if proxy_str.startswith('http://') or proxy_str.startswith('https://'):
return proxy_str
# Parse host:port:user:pass format
parts = proxy_str.split(':')
if len(parts) == 4:
host, port, user, password = parts
return f"http://{user}:{password}@{host}:{port}"
elif len(parts) == 2:
# Just host:port (no auth)
host, port = parts
return f"http://{host}:{port}"
else:
logger.error(f"Invalid proxy format: {proxy_str}")
return proxy_str
def _add_single_proxy(self, proxy_url: str):
"""Add a single proxy"""
formatted = self._parse_proxy_format(proxy_url)
self.proxies.append(formatted)
if formatted not in self.bandwidth_stats:
self.bandwidth_stats[formatted] = 0
def _fetch_from_api(self, api_url: str):
"""Fetch proxy list from WebShare API"""
try:
logger.info(f"Fetching proxies from API: {api_url[:50]}...")
response = requests.get(api_url, timeout=10)
response.raise_for_status()
# Parse response - WebShare returns one proxy per line
proxy_list = response.text.strip().split('\n')
for proxy_line in proxy_list:
if proxy_line.strip():
formatted = self._parse_proxy_format(proxy_line)
self.proxies.append(formatted)
if formatted not in self.bandwidth_stats:
self.bandwidth_stats[formatted] = 0
logger.info(f"Loaded {len(self.proxies)} proxies from API")
except Exception as e:
logger.error(f"Failed to fetch proxies from API: {e}")
def get_next_proxy(self) -> Optional[str]:
"""Get next proxy using round-robin rotation"""
with self.lock:
if not self.proxies:
return None
proxy = self.proxies[self.current_index]
self.current_index = (self.current_index + 1) % len(self.proxies)
return proxy
def track_bandwidth(self, proxy: str, bytes_used: int):
"""Track bandwidth usage for a proxy"""
with self.lock:
if proxy in self.bandwidth_stats:
self.bandwidth_stats[proxy] += bytes_used
else:
self.bandwidth_stats[proxy] = bytes_used
self.total_bandwidth += bytes_used
self._save_stats()
def get_stats(self) -> Dict:
"""Get bandwidth statistics"""
with self.lock:
return {
"total_proxies": len(self.proxies),
"total_bandwidth_bytes": self.total_bandwidth,
"total_bandwidth_mb": round(self.total_bandwidth / (1024 * 1024), 2),
"total_bandwidth_gb": round(self.total_bandwidth / (1024 * 1024 * 1024), 3),
"bandwidth_remaining_mb": max(0, 1024 - round(self.total_bandwidth / (1024 * 1024), 2)),
"per_proxy_stats": [
{
"proxy": self._mask_proxy(proxy),
"bandwidth_bytes": bytes_used,
"bandwidth_mb": round(bytes_used / (1024 * 1024), 2)
}
for proxy, bytes_used in self.bandwidth_stats.items()
],
"current_proxy_index": self.current_index,
"last_updated": datetime.now().isoformat()
}
def _mask_proxy(self, proxy: str) -> str:
"""Mask proxy credentials for display"""
if '@' in proxy:
parts = proxy.split('@')
return f"***@{parts[1]}"
return proxy
def _load_stats(self):
"""Load stats from file"""
try:
if os.path.exists(self.stats_file):
with open(self.stats_file, 'r') as f:
data = json.load(f)
self.bandwidth_stats = data.get('bandwidth_stats', {})
self.total_bandwidth = data.get('total_bandwidth', 0)
logger.info(f"Loaded stats: {self.total_bandwidth} bytes total")
except Exception as e:
logger.error(f"Failed to load stats: {e}")
def _save_stats(self):
"""Save stats to file"""
try:
with open(self.stats_file, 'w') as f:
json.dump({
'bandwidth_stats': self.bandwidth_stats,
'total_bandwidth': self.total_bandwidth,
'last_updated': datetime.now().isoformat()
}, f, indent=2)
except Exception as e:
logger.error(f"Failed to save stats: {e}")
def refresh_proxies(self, api_url: str):
"""Refresh proxy list from API"""
with self.lock:
old_count = len(self.proxies)
self.proxies.clear()
self._fetch_from_api(api_url)
logger.info(f"Refreshed proxies: {old_count} -> {len(self.proxies)}")
def add_proxy(self, proxy_str: str):
"""Add a new proxy"""
with self.lock:
formatted = self._parse_proxy_format(proxy_str)
if formatted not in self.proxies:
self.proxies.append(formatted)
if formatted not in self.bandwidth_stats:
self.bandwidth_stats[formatted] = 0
logger.info(f"Added proxy: {self._mask_proxy(formatted)}")
|